Microservices
Core content of the nlds-processors.
The Consumer class
- class nlds.rabbit.consumer.FilelistType(value)
Bases:
enum.Enum
An enumeration.
- class nlds.rabbit.consumer.RabbitMQConsumer(queue: Optional[str] = None, setup_logging_fl=False)
Bases:
abc.ABC
,nlds.rabbit.publisher.RabbitMQPublisher
- acknowledge_message(channel: pika.spec.Channel, delivery_tag: str, connection: pika.connection.Connection) None
Method for acknowledging a message so the next can be fetched. This should be called at the end of a consumer callback, and - in order to do so thread-safely - from within connection object. All of the required params come from the standard callback params.
- Parameters
channel – Callback channel param
delivery_tag – From the callback method param. eg. method.delivery_tag
connection – Connection object from the callback param
- abstract callback(ch: pika.spec.Channel, method: pika.amqp_object.Method, properties: pika.frame.Header, body: bytes, connection: pika.connection.Connection) None
Standard consumer callback function as defined by rabbitMQ, with the standard callback parameters of Channel, Method, Header, Body (in bytes) and Connection.
This is the working method of a consumer, i.e. it does the job the consumer has been designed to do, and so must be implemented and overridden by any child consumer classes.
- declare_bindings() None
Overridden method from Publisher, additionally declares the queues and queue-exchange bindings outlined in the config file. If no queues were set then the default - generated within __init__ - is used instead.
- dedup_filelist(filelist: List[nlds.details.PathDetails]) List[nlds.details.PathDetails]
De-duplicate filelist
- get_retries(body: Dict) nlds.details.Retries
Retrieve the retries from a message. If no present or inaccessible then add new ones.
- load_config_value(config_option: str, path_listify_fl: bool = False)
Function for verification and loading of options from the consumer- specific section of the .server_config file. Attempts to load from the config section and reverts to hardcoded default value if an error is encountered. Will not attempt to load an option if no default value is available.
- Parameters
config_option – (str) The option in the indexer section of the .server_config file to be verified and loaded.
path_listify – (boolean) Optional argument to control whether value should be treated as a list and each item converted to a pathlib.Path() object.
- Returns
The value at config_option, otherwise the default value as defined in Consumer.DEFAULT_CONSUMER_CONFIG. This is overloadable by
- nack_message(channel: pika.spec.Channel, delivery_tag: str, connection: pika.connection.Connection) None
Method for nacknowledging a message so that it can be requeued and the next can be fetched. This is called after a consumer callback if, and only if, it returns a False. As in the case of acking, in order to do this thread-safely it is done from within a connection object. All of the required params come from the standard callback params.
- Parameters
channel – Callback channel param
delivery_tag – From the callback method param. eg. method.delivery_tag
connection – Connection object from the callback param
- parse_filelist(body_json: dict) List[nlds.details.PathDetails]
Convert flat list from message json into list of PathDetails objects and the check it is, in fact, a list
- run()
Method to run when thread is started. Creates an AMQP connection and sets some exception handling.
A common exception which occurs is StreamLostError. The connection should get reset if that happens.
- Returns
- send_pathlist(pathlist: List[nlds.details.PathDetails], routing_key: str, body_json: Dict[str, Any], state: Optional[nlds.rabbit.consumer.State] = None, mode: nlds.rabbit.consumer.FilelistType = FilelistType.processed, warning: Optional[List[str]] = None, delay: Optional[int] = None, save_reasons_fl: bool = False) None
Convenience function which sends the given list of PathDetails objects to the exchange with the given routing key and message body. Mode specifies what to put into the log message, as well as determining whether the list should be retry-reset and whether the message should be delayed. Optionally, when resetting the retries the retry-reasons can be saved for later perusal by setting the save_reasons_fl.
Additionally forwards transaction state info on to the monitor. As part of this it keeps track of the number of messages sent and reassigns message sub_ids appropriately so that monitoring can keep track of the transaction’s state more easily.
- setup_logging(enable=False, log_level: Optional[str] = None, log_format: Optional[str] = None, add_stdout_fl: bool = False, stdout_log_level: Optional[str] = None, log_files: Optional[List[str]] = None, log_max_bytes: Optional[int] = None, log_backup_count: Optional[int] = None) None
Override of the publisher method which allows consumer-specific logging to take precedence over the general logging configuration.
- static split_routing_key(routing_key: str) None
Method to simply verify and split the routing key into parts.
- Returns
3-tuple of routing key parts
- exception nlds.rabbit.consumer.SigTermError
Bases:
Exception
- class nlds.rabbit.consumer.State(value)
Bases:
enum.Enum
An enumeration.
The processors
Also referred to as ‘microservices’, ‘consumers’, or ‘workers’:
- class nlds_processors.nlds_worker.NLDSWorkerConsumer(queue='nlds_q')
- callback(ch: pika.channel.Channel, method: pika.frame.Method, properties: pika.frame.Header, body: bytes, connection: pika.connection.Connection) None
Standard consumer callback function as defined by rabbitMQ, with the standard callback parameters of Channel, Method, Header, Body (in bytes) and Connection.
This is the working method of a consumer, i.e. it does the job the consumer has been designed to do, and so must be implemented and overridden by any child consumer classes.
- publish_and_log_message(routing_key: str, msg: dict, log_fl=True) None
Wrapper around original publish message to additionally send message to logging queue. Useful for debugging purposes to be able to see the content being managed by the worker.
- class nlds_processors.index.IndexerConsumer(queue='index_q')
- callback(ch, method, properties, body, connection)
Standard consumer callback function as defined by rabbitMQ, with the standard callback parameters of Channel, Method, Header, Body (in bytes) and Connection.
This is the working method of a consumer, i.e. it does the job the consumer has been designed to do, and so must be implemented and overridden by any child consumer classes.
- check_path_access(path: pathlib.Path, stat_result: Optional[NamedTuple] = None, access: int = 4) bool
Checks that the given path is accessible, either by checking for its existence or, if the check_permissions_fl is set, by doing a permissions check on the file’s bitmask. This requires a stat of the file, so one must be provided via stat_result else one is performed. The uid and gid of the user must be set in the object as well, usually by having performed RabbitMQConsumer.set_ids() beforehand.
- index(raw_filelist: List[NamedTuple], rk_origin: str, body_json: Dict[str, Any])
Indexes a list of PathDetails.
Each IndexItem is a named tuple consisting of an item (a file or directory) and an associated number of attempted accesses. This function checks if each item exists, fully walking any directories and subdirectories in the process, and then checks permissions on each available file. All accessible files are added to an ‘indexed’ list and sent back to the exchange for transfer once that list has reached a pre-configured size (default 1000MB) or the end of IndexItem list has been reached, whichever comes first.
If any item cannot be found, indexed or accessed then it is added to a ‘problem’ list for another attempt at indexing. If a maximum number of retries is reached and the item has still not been indexed then it is added to a final ‘failed’ list which is sent back to the exchange so the user can be informed via monitoring.
- Parameters
raw_filelist (List[NamedTuple]) – List of IndexItems containing paths to files or indexable directories and the number of times each has been attempted to be indexed.
rk_origin (str) – The first section of the received message’s routing key which designates its origin.
body_json (dict) – The message body in dict form.
- split(filelist: List[nlds.details.PathDetails], rk_origin: str, body_json: Dict[str, Any]) None
Split the given filelist into batches of some configurable max length and resubmit each to exchange for indexing proper.
- class nlds_processors.transferers.base_transfer.BaseTransferConsumer(queue='transfer_q')
- callback(ch, method, properties, body, connection)
Standard consumer callback function as defined by rabbitMQ, with the standard callback parameters of Channel, Method, Header, Body (in bytes) and Connection.
This is the working method of a consumer, i.e. it does the job the consumer has been designed to do, and so must be implemented and overridden by any child consumer classes.
- check_path_access(path: pathlib.Path, stat_result: Optional[NamedTuple] = None, access: int = 4) bool
Checks that the given path is accessible, either by checking for its existence or, if the check_permissions_fl is set, by doing a permissions check on the file’s bitmask. This requires a stat of the file, so one must be provided via stat_result else one is performed. The uid and gid of the user must be set in the object as well, usually by having performed RabbitMQConsumer.set_ids() beforehand.
- exception nlds_processors.transferers.base_transfer.TransferError
- class nlds_processors.transferers.put_transfer.PutTransferConsumer(queue='transfer_put_q')
- exception nlds_processors.db_mixin.DBError(message, *args)
- class nlds_processors.db_mixin.DBMixin
Mixin refactored from monitor and catalog classes
- end_session()
Close the SQL alchemy session
- save()
Commit all pending transactions
- start_session()
Create a SQL alchemy session
Declare the SQLAlchemy ORM models for the NLDS Catalog database
- class nlds_processors.catalog.catalog_models.Aggregation(**kwargs)
Class containing the details of file aggregations made for writing files to tape (specifically CTA) as tars
- class nlds_processors.catalog.catalog_models.Checksum(**kwargs)
Class containing checksum and algorithm used to calculate checksum
- class nlds_processors.catalog.catalog_models.File(**kwargs)
Class containing the details of a single File
- class nlds_processors.catalog.catalog_models.Holding(**kwargs)
Class containing the details of a Holding - i.e. a batch
- class nlds_processors.catalog.catalog_models.Location(**kwargs)
Class containing the location on NLDS of a single File
- class nlds_processors.catalog.catalog_models.Storage(value)
An enumeration.
- class nlds_processors.catalog.catalog_models.Tag(**kwargs)
Class containing the details of a Tag that can be assigned to a Holding
- class nlds_processors.catalog.catalog_models.Transaction(**kwargs)
Class containing details of a transaction. Note that a holding can consist of many transactions.
- class nlds_processors.catalog.catalog.Catalog(db_engine: str, db_options: str)
Catalog object containing methods to manipulate the Catalog Database
- create_aggregation(tarname: str, checksum: Optional[str] = None, algorithm: Optional[str] = None, failed_fl: bool = False) nlds_processors.catalog.catalog_models.Aggregation
Create an aggregation of files to write to tape as a tar file
- create_file(transaction: nlds_processors.catalog.catalog_models.Transaction, user: Optional[str] = None, group: Optional[str] = None, original_path: Optional[str] = None, path_type: Optional[str] = None, link_path: Optional[str] = None, size: Optional[str] = None, file_permissions: Optional[str] = None) nlds_processors.catalog.catalog_models.File
Create a file that belongs to a transaction and will contain locations
- create_holding(user: str, group: str, label: str) nlds_processors.catalog.catalog_models.Holding
Create the new Holding with the label, user, group
- create_location(file_: nlds_processors.catalog.catalog_models.File, storage_type: sqlalchemy.sql.sqltypes.Enum, url_scheme: str, url_netloc: str, root: str, path: str, access_time: float, aggregation: Optional[nlds_processors.catalog.catalog_models.Aggregation] = None) nlds_processors.catalog.catalog_models.Location
Add the storage location for either object storage or tape
- create_tag(holding: nlds_processors.catalog.catalog_models.Holding, key: str, value: str)
Create a tag and add it to a holding
- create_transaction(holding: nlds_processors.catalog.catalog_models.Holding, transaction_id: str) nlds_processors.catalog.catalog_models.Transaction
Create a transaction that belongs to a holding and will contain files
- del_tag(holding: nlds_processors.catalog.catalog_models.Holding, key: str)
Delete a tag that has the key
- delete_aggregation(aggregation: nlds_processors.catalog.catalog_models.Aggregation) None
Delete a given aggregation
- delete_files(user: str, group: str, holding_label: Optional[str] = None, holding_id: Optional[int] = None, path: Optional[str] = None, tag: Optional[dict] = None) list
Delete a given path from the catalog. If a holding is specified only the matching file from that holding will be deleted, otherwise all matching files will. Utilises get_files().
- delete_location(file: nlds_processors.catalog.catalog_models.File, storage_type: sqlalchemy.sql.sqltypes.Enum) None
Delete the location for a given file and storage_type
- fail_aggregation(aggregation: nlds_processors.catalog.catalog_models.Aggregation) nlds_processors.catalog.catalog_models.Aggregation
Mark an aggregation as failed, as the final step of a failed archive-put.
- get_aggregation(aggregation_id: int) nlds_processors.catalog.catalog_models.Aggregation
Simple function for getting of Aggregation from aggregation_id.
- get_aggregation_by_file(file_: nlds_processors.catalog.catalog_models.File, storage_type: nlds_processors.catalog.catalog_models.Storage = Storage.TAPE) nlds_processors.catalog.catalog_models.Aggregation
Get the aggregation associated with a particular file’s location on tape. Storage type has been left as a kwarg in case future storage types are added which will utilise aggregations.
- get_files(user: str, group: str, groupall: bool = False, holding_label: Optional[str] = None, holding_id: Optional[int] = None, transaction_id: Optional[str] = None, original_path: Optional[str] = None, tag: Optional[dict] = None) list
Get a multitude of file details from the database, given the user, group, label, holding_id, path (can be regex) or tag(s)
- get_holding(user: str, group: str, groupall: bool = False, label: Optional[str] = None, holding_id: Optional[int] = None, transaction_id: Optional[str] = None, tag: Optional[dict] = None) List[nlds_processors.catalog.catalog_models.Holding]
Get a holding from the database
- get_location(file: nlds_processors.catalog.catalog_models.File, storage_type: sqlalchemy.sql.sqltypes.Enum) nlds_processors.catalog.catalog_models.Location
Get a storage location for a file, given the file and the storage type
- get_location_file(location: nlds_processors.catalog.catalog_models.Location) nlds_processors.catalog.catalog_models.File
Get a File but from the other end of the database tree, starting from a location.
- get_location_transaction(location: nlds_processors.catalog.catalog_models.Location) nlds_processors.catalog.catalog_models.Transaction
Get a transaction but from the other end of the database tree, from a location’s file_id.
- get_next_holding() nlds_processors.catalog.catalog_models.Holding
The principal function for getting the next unarchived holding to archive aggregate.
- get_tag(holding: nlds_processors.catalog.catalog_models.Holding, key: str)
Get the tag with a specific key
- get_transaction(id: Optional[int] = None, transaction_id: Optional[str] = None) nlds_processors.catalog.catalog_models.Transaction
Get a transaction from the database
- get_unarchived_files(holding: nlds_processors.catalog.catalog_models.Holding) List[nlds_processors.catalog.catalog_models.File]
The principal function for getting unarchived files to aggregate and send to archive put.
- modify_holding(holding: nlds_processors.catalog.catalog_models.Holding, new_label: Optional[str] = None, new_tags: Optional[dict] = None, del_tags: Optional[dict] = None) nlds_processors.catalog.catalog_models.Holding
Find a holding and modify the information in it
- modify_tag(holding: nlds_processors.catalog.catalog_models.Holding, key: str, value: str)
Modify a tag that has the key, with a new value. Tag has to exist, current value will be overwritten.
- update_aggregation(aggregation: nlds_processors.catalog.catalog_models.Aggregation, checksum: str, algorithm: str, tarname: Optional[str] = None) nlds_processors.catalog.catalog_models.Aggregation
Add a missing checksum & algorithm to an aggregation after a successful write to tape. Can also optionally rename the tarname, at which point the
- exception nlds_processors.catalog.catalog.CatalogError(message, *args)
- class nlds_processors.catalog.catalog_worker.CatalogConsumer(queue='catalog_q')
- attach_database(create_db_fl: bool = True)
Attach the Catalog to the consumer
- callback(ch: pika.channel.Channel, method: pika.frame.Method, properties: pika.frame.Header, body: bytes, connection: pika.connection.Connection) None
Standard consumer callback function as defined by rabbitMQ, with the standard callback parameters of Channel, Method, Header, Body (in bytes) and Connection.
This is the working method of a consumer, i.e. it does the job the consumer has been designed to do, and so must be implemented and overridden by any child consumer classes.
- get_url()
Method for making the sqlalchemy url available to alembic
- class nlds_processors.catalog.catalog_worker.Metadata(body: Dict)
Container class for the meta section of the message body.
Declare the SQLAlchemy ORM models for the NLDS Monitoring database
- class nlds_processors.monitor.monitor_models.FailedFile(**kwargs)
- class nlds_processors.monitor.monitor_models.SubRecord(**kwargs)
- has_finished()
Convenience method for checking whether a given SubRecord is in a ‘final’ state, i.e. is no longer going to change and the transaction can therefore be marked as COMPLETE.
Checks whether all states have gotten to the final stage of a workflow (CATALOG_PUT or TRANSFER_GET) and are not retrying, OR have failed. This should cover all bases.
- class nlds_processors.monitor.monitor_models.TransactionRecord(**kwargs)
Class containing the details of the state of a transaction
- class nlds_processors.monitor.monitor_models.Warning(**kwargs)
- class nlds_processors.monitor.monitor.Monitor(db_engine: str, db_options: str)
Monitor object containing methods to manipulate the Monitor Database
- check_completion(transaction_record: nlds_processors.monitor.monitor_models.TransactionRecord) None
Get the complete list of sub records from a transaction record and check whether they are all in a final state, and update them to COMPLETE if so.
- create_failed_file(sub_record: nlds_processors.monitor.monitor_models.SubRecord, path_details: nlds.details.PathDetails, reason: Optional[str] = None) nlds_processors.monitor.monitor_models.FailedFile
Creates a FailedFile object for the monitoring database. Requires the input of the parent SubRecord and the PathDetails object of the failed file in question. Optionally requires a reason str, which will otherwise be attempted to be taken from the PathDetails object. If no reason can be found then a MonitorError will be raised.
- create_sub_record(transaction_record: nlds_processors.monitor.monitor_models.TransactionRecord, sub_id: str, state: Optional[nlds.rabbit.consumer.State] = None) nlds_processors.monitor.monitor_models.SubRecord
Creates a SubRecord with the minimum required input. Optionally adds to a session and flushes to get the id field populated.
- create_transaction_record(user: str, group: str, transaction_id: str, job_label: str, api_action: str) nlds_processors.monitor.monitor_models.TransactionRecord
Creates a transaction_record with the minimum required input
- create_warning(transaction_record: nlds_processors.monitor.monitor_models.TransactionRecord, warning: str) nlds_processors.monitor.monitor_models.Warning
Create a warning and add it to the TransactionRecord
- get_sub_record(sub_id: str) nlds_processors.monitor.monitor_models.SubRecord
Return a single sub record identified by the sub_id
- get_sub_records(transaction_record: nlds_processors.monitor.monitor_models.TransactionRecord, sub_id: Optional[str] = None, user: Optional[str] = None, group: Optional[str] = None, state: Optional[nlds.rabbit.consumer.State] = None, retry_count: Optional[int] = None, api_action: Optional[str] = None) list
Return many sub records, identified by one of the (many) function parameters
- get_transaction_record(user: str, group: str, groupall: bool = False, idd: Optional[int] = None, transaction_id: Optional[str] = None, job_label: Optional[str] = None) list
Gets a TransactionRecord from the DB from the given transaction_id, or the primary key (id)
- update_sub_record(sub_record: nlds_processors.monitor.monitor_models.SubRecord, new_state: nlds.rabbit.consumer.State, retry_fl: bool) nlds_processors.monitor.monitor_models.SubRecord
Update a retrieved SubRecord to reflect the new monitoring info. Furthest state is updated, if required, and the retry count is incremented by one if appropriate. TODO: Should retrying be a flag instead of a separate state? Probably, yes
- exception nlds_processors.monitor.monitor.MonitorError(message, *args)
- class nlds_processors.monitor.monitor_worker.MonitorConsumer(queue='monitor_q')
- attach_database(create_db_fl: bool = True)
Attach the Monitor to the consumer
- callback(ch: pika.channel.Channel, method: pika.frame.Method, properties: pika.frame.Header, body: bytes, connection: pika.connection.Connection) None
Standard consumer callback function as defined by rabbitMQ, with the standard callback parameters of Channel, Method, Header, Body (in bytes) and Connection.
This is the working method of a consumer, i.e. it does the job the consumer has been designed to do, and so must be implemented and overridden by any child consumer classes.
- get_url()
Method for making the sqlalchemy url available to alembic
- class nlds_processors.logger.LoggingConsumer(queue='logging_q', setup_logging_fl=True)
- callback(ch, method, properties, body, connection)
Standard consumer callback function as defined by rabbitMQ, with the standard callback parameters of Channel, Method, Header, Body (in bytes) and Connection.
This is the working method of a consumer, i.e. it does the job the consumer has been designed to do, and so must be implemented and overridden by any child consumer classes.
- static get_logging_func(log_level: str, logger_like: logging.Logger = <Logger nlds.root (WARNING)>)
Selects the appropriate logging function, given a logging level, from a Logger object or Logger-like object. The global logger is used by default (logging.getLogger(__name__)) but anything which has the standard logging operations (debug, info, warning etc. ) can be used i.e. the logging package
- Parameters
log_level (str) – Logging level requested
logger_like (Logger) – Logger object to get function from
- Returns
The logging function to use