Reference¶
Reference information for each class and method accessible by developer using this library.
Distribution and Collection¶
Collector¶
- class zmqpipeline.Collector¶
Collects results from the workers and sends ACKs (acknowledgements) back to the distributor
- ack_endpoint¶
Endpoint for sending ACKs (acknowledgements) back to the distributor. The distributor should connect to this same endpoint for receiving ACKs.
This property must be defined by the sublcassed implementation.
Returns: An EndpointAdress instance
- endpoint¶
Endpoint to bind the collector to for receiving messages from workers. Workers should connect to this same endpoint for sending data.
This property must be defined by the subclassed implementation.
Returns: An EndpointAddress instance
- get_ack_data()¶
Optionally override this method to attach data to the ACK that will be sent back to the distributor.
This method is typically used when deciding to override Task.is_available_for_handling(), since the data received by the distributor will be forwarded to the Task via is_available_for_handling() for determining whether the task can be executed or not.
The use case of implementing these methods in conjuction is for the task to wait on the receipt of a particular ACK - that is, if the currently processing ACK requires data from a previously processed result. These types of tasks are time and order-dependent.
Returns: A dictionary of data
- handle_collection(data, task_type, msgtype)¶
Invoked by the collector when data is received from a worker.
Parameters: - data (dict) – Data supplied by the worker (a dictionary). If the worker doesn’t return anything this will be an empty dict
- task_type (str) – The task type of the worker and corresponding task
- msgtype (str) – The message type. Typically zmqpipeline.messages.MESSAGE_TYPE_DATA
Returns: None
- handle_finished(data, task_type)¶
Invoked by the collector when message
Parameters: - data (dict) – Data received from the worker on a termination signal
- task_type (string) – The task type of the worker and correspond task
Return dict: A dictionary of data to be sent to the distributor along with the ACK, or None to send nothing back to the distributor
- run()¶
Runs the collector. Invoke this method to start the collector
Returns: None
Distributor¶
- class zmqpipeline.Distributor(collector_endpoint, collector_ack_endpoint, receive_metadata=False, metadata_endpoint=None)¶
Responsible for distributing (pushing) tasks to workers. What gets distributed is determined by Tasks, which are user-implementations that configure how the distributor works.
The pipeline pattern assumes there is only one distributor with one or more registered tasks.
- run()¶
Runs the distributor and blocks. Call this immediately after instantiating the distributor.
Since this method will block the calling program, it should be the last thing the calling code does before exiting. Run() will automatically shutdown the distributor gracefully when finished.
Returns: None
- shutdown()¶
Shuts down the distributor. This is automatically called when run() is complete and the distributor exits gracefully. Client code should only invoke this method directly on exiting prematurely, for example on a KeyboardInterruptException
Returns: None
Task¶
- class zmqpipeline.Task¶
Tasks define what information the workers receive at each invocation as well as how many items to be processed.
Tasks dynamically configure the Distributor, which looks up and invokes registered tasks.
- dependencies¶
Zero or more tasks, identified by task type, that must be executed in full before this task can begin processing.
Returns: A list of TaskType instances
- endpoint¶
A valid EndpointAddress used to push data to worker clients.
Returns: An EndpointAddress instance
- handle(data, address, msgtype, ack_data={})¶
Handle invocation by the distributor.
Parameters: - data (dict) – Meta data, if provided, otherwise an empty dictionary
- address (EndpointAddress) – The address of the worker data will be sent to.
- msgtype (str) – The message type received from the worker. Typically zmqpipeline.messages.MESSAGE_TYPE_READY
- ack_data (dict) – Data received in the most recent ACK from collector
Return dict: A dictionary of data to be sent to the worker, or None, in which case the worker will receive no information
- initialize(metadata={})¶
Initializes the task. Default implementation is to store metadata on the object instance :param metadata: Metadata received from the distributor :return:
- is_available_for_handling(last_ack_data)¶
Optionally override this if the task requires the ACK of a previously sent task.
If this method is overriden, get_ack_data() should be overrridden in your custom collector. The default is to always assume the task is available to process the next task. This is the fastest behavior, but also doesn’t guarantee the last message sent by the task (and then the corresponding worker) has been ACKed by the distributor.
Parameters: last_ack_data (dict) – A dictionary of data received in the last ACK. All instances include _task_type and _id, a globally incrementing counter for the latest ACK. This dictionary will include whatever you attach to get_ack_data() in the collector. If you override this method, you should override get_col Returns: True if the task is available for handling the next received message; false if otherwise
- task_type¶
The type of task being defined. This type must be registered with TaskType before definition, otherwise an exception will be thrown
Returns: A TaskType instance
Single-threaded Worker¶
- class zmqpipeline.SingleThreadedWorker(*args, **kwargs)¶
A worker that processes data on a single thread
- handle_execution(data, *args, **kwargs)¶
Invoked in the worker’s main loop whenever a task is received from the distributor. This is where client implemntations should process data and forward results to the collector.
Parameters: - data (dict) – Data provided as a dictionary from the distributor
- args – A list of additional positional arguments
- kwargs – A list of additional keyword arguments
Returns: A dictionary of data to be passed to the collector, or None, in which case no data will be forwarded to the collector
Multi-threaded Worker¶
- class zmqpipeline.MultiThreadedWorker(*args, **kwargs)¶
A worker that processes data on multiple threads.
Threads are instantiated and pooled at initialization time to minimize the cost of context switching.
Morever, data is forwarded from the worker to each thread over the inproc protocol by default, which is significantly faster than tcp or ipc.
- handle_execution(data, *args, **kwargs)¶
This method is invoked when the worker’s main loop is executed. Client implementions of this method should, unlike the SingleThreadedWorker, not process data but instead forward the relevant data to the thread by returning a dictionary of information.
Parameters: - data (dict) – A dictionary of data received by the worker
- args – Additional arguments
- kwargs – Additional keyword arguments
Return dict: Data to be forwarded to the worker thread
- handle_thread_execution(data, index)¶
This method is invoked in the working thread. This is where data processing should be handled.
Parameters: - data (dict) – A dictionary of data provided by the worker
- index (int) – The index number of the thread that’s been invoked
Return dict: A dictionary of information to be forwarded to the collector
- n_threads¶
The number of threads used.
Return int: A positive integer
Worker¶
Meta data Worker¶
- class zmqpipeline.MetaDataWorker¶
Transmits meta information to the distributor for dynamic configuration at runtime. When using a meta data worker, the Distributor should be instantiated with receive_metadata boolean turned on.
- get_metadata()¶
Retrieves meta data to be sent to tasks and workers. :return: A dictionary of meta data
- run()¶
Runs the meta worker, sending meta data to the distributor. :return:
Services¶
Service¶
- class zmqpipeline.Service(frontend_endpoint, backend_endpoint)¶
A service is a load-balanced router, accepting multiple client requests asynchronously and distributing the work to multiple workers.
- run()¶
Runs the service. This is a blocking call.
Since this method will block the calling program, it should be the last thing the calling code does before exiting. run() will automatically shutdown the service gracefully when finished.
Returns: None
- shutdown()¶
Shuts down the service. This is automatically called when run() is complete and the distributor exits gracefully. Client code should only invoke this method directly on exiting prematurely, for example on a KeyboardInterruptException
Returns: None
Service Worker¶
- class zmqpipeline.ServiceWorker(id_prefix='worker')¶
A worker that plugs into a service. Regular workers plug into a distributor / collector pair
- endpoint¶
The address of the broker’s backend
Return EndpointAddress: A valid EndpointAddress instance
- handle_message(data, task_type, msgtype)¶
Overridden by subclassed ServiceWorkers.
Parameters: - data – Data the client has submitted for processing, typically in a dictionary
- task_type (TaskType) – A registered TaskType enumerated value
- msgtype (str) – A message type enumerated value
Returns: Data to be sent back to the requesting client, typically a dictionary
- run()¶
Run the service worker. This call blocks until an END message is received from the broker
Returns: None
- task_type¶
A registered task type, or a blank string.
Return TaskType: A properly registered task type or an empty string
Enumerated typed / helpers¶
EndpointAddress¶
- class zmqpipeline.EndpointAddress(address)¶
A valid network address for one of the components of the pipeline pattern to either bind to or connect to.
- The supported protocols are:
- tcp for cross-machine communication
- ipc for cross-process communication
- inproc for cross-thread communication
Note that on unix systems cross-process sockets are files, as it recommended to specify the .ipc file extension when naming ipc socket addresses.