Reference

Reference information for each class and method accessible by developer using this library.

Distribution and Collection

Collector

class zmqpipeline.Collector

Collects results from the workers and sends ACKs (acknowledgements) back to the distributor

ack_endpoint

Endpoint for sending ACKs (acknowledgements) back to the distributor. The distributor should connect to this same endpoint for receiving ACKs.

This property must be defined by the sublcassed implementation.

Returns:An EndpointAdress instance
endpoint

Endpoint to bind the collector to for receiving messages from workers. Workers should connect to this same endpoint for sending data.

This property must be defined by the subclassed implementation.

Returns:An EndpointAddress instance
get_ack_data()

Optionally override this method to attach data to the ACK that will be sent back to the distributor.

This method is typically used when deciding to override Task.is_available_for_handling(), since the data received by the distributor will be forwarded to the Task via is_available_for_handling() for determining whether the task can be executed or not.

The use case of implementing these methods in conjuction is for the task to wait on the receipt of a particular ACK - that is, if the currently processing ACK requires data from a previously processed result. These types of tasks are time and order-dependent.

Returns:A dictionary of data
handle_collection(data, task_type, msgtype)

Invoked by the collector when data is received from a worker.

Parameters:
  • data (dict) – Data supplied by the worker (a dictionary). If the worker doesn’t return anything this will be an empty dict
  • task_type (str) – The task type of the worker and corresponding task
  • msgtype (str) – The message type. Typically zmqpipeline.messages.MESSAGE_TYPE_DATA
Returns:

None

handle_finished(data, task_type)

Invoked by the collector when message

Parameters:
  • data (dict) – Data received from the worker on a termination signal
  • task_type (string) – The task type of the worker and correspond task
Return dict:

A dictionary of data to be sent to the distributor along with the ACK, or None to send nothing back to the distributor

run()

Runs the collector. Invoke this method to start the collector

Returns:None

Distributor

class zmqpipeline.Distributor(collector_endpoint, collector_ack_endpoint, receive_metadata=False, metadata_endpoint=None)

Responsible for distributing (pushing) tasks to workers. What gets distributed is determined by Tasks, which are user-implementations that configure how the distributor works.

The pipeline pattern assumes there is only one distributor with one or more registered tasks.

run()

Runs the distributor and blocks. Call this immediately after instantiating the distributor.

Since this method will block the calling program, it should be the last thing the calling code does before exiting. Run() will automatically shutdown the distributor gracefully when finished.

Returns:None
shutdown()

Shuts down the distributor. This is automatically called when run() is complete and the distributor exits gracefully. Client code should only invoke this method directly on exiting prematurely, for example on a KeyboardInterruptException

Returns:None

Task

class zmqpipeline.Task

Tasks define what information the workers receive at each invocation as well as how many items to be processed.

Tasks dynamically configure the Distributor, which looks up and invokes registered tasks.

dependencies

Zero or more tasks, identified by task type, that must be executed in full before this task can begin processing.

Returns:A list of TaskType instances
endpoint

A valid EndpointAddress used to push data to worker clients.

Returns:An EndpointAddress instance
handle(data, address, msgtype, ack_data={})

Handle invocation by the distributor.

Parameters:
  • data (dict) – Meta data, if provided, otherwise an empty dictionary
  • address (EndpointAddress) – The address of the worker data will be sent to.
  • msgtype (str) – The message type received from the worker. Typically zmqpipeline.messages.MESSAGE_TYPE_READY
  • ack_data (dict) – Data received in the most recent ACK from collector
Return dict:

A dictionary of data to be sent to the worker, or None, in which case the worker will receive no information

initialize(metadata={})

Initializes the task. Default implementation is to store metadata on the object instance :param metadata: Metadata received from the distributor :return:

is_available_for_handling(last_ack_data)

Optionally override this if the task requires the ACK of a previously sent task.

If this method is overriden, get_ack_data() should be overrridden in your custom collector. The default is to always assume the task is available to process the next task. This is the fastest behavior, but also doesn’t guarantee the last message sent by the task (and then the corresponding worker) has been ACKed by the distributor.

Parameters:last_ack_data (dict) – A dictionary of data received in the last ACK. All instances include _task_type and _id, a globally incrementing counter for the latest ACK. This dictionary will include whatever you attach to get_ack_data() in the collector. If you override this method, you should override get_col
Returns:True if the task is available for handling the next received message; false if otherwise
task_type

The type of task being defined. This type must be registered with TaskType before definition, otherwise an exception will be thrown

Returns:A TaskType instance

Single-threaded Worker

class zmqpipeline.SingleThreadedWorker(*args, **kwargs)

A worker that processes data on a single thread

handle_execution(data, *args, **kwargs)

Invoked in the worker’s main loop whenever a task is received from the distributor. This is where client implemntations should process data and forward results to the collector.

Parameters:
  • data (dict) – Data provided as a dictionary from the distributor
  • args – A list of additional positional arguments
  • kwargs – A list of additional keyword arguments
Returns:

A dictionary of data to be passed to the collector, or None, in which case no data will be forwarded to the collector

Multi-threaded Worker

class zmqpipeline.MultiThreadedWorker(*args, **kwargs)

A worker that processes data on multiple threads.

Threads are instantiated and pooled at initialization time to minimize the cost of context switching.

Morever, data is forwarded from the worker to each thread over the inproc protocol by default, which is significantly faster than tcp or ipc.

handle_execution(data, *args, **kwargs)

This method is invoked when the worker’s main loop is executed. Client implementions of this method should, unlike the SingleThreadedWorker, not process data but instead forward the relevant data to the thread by returning a dictionary of information.

Parameters:
  • data (dict) – A dictionary of data received by the worker
  • args – Additional arguments
  • kwargs – Additional keyword arguments
Return dict:

Data to be forwarded to the worker thread

handle_thread_execution(data, index)

This method is invoked in the working thread. This is where data processing should be handled.

Parameters:
  • data (dict) – A dictionary of data provided by the worker
  • index (int) – The index number of the thread that’s been invoked
Return dict:

A dictionary of information to be forwarded to the collector

n_threads

The number of threads used.

Return int:A positive integer

Worker

Meta data Worker

class zmqpipeline.MetaDataWorker

Transmits meta information to the distributor for dynamic configuration at runtime. When using a meta data worker, the Distributor should be instantiated with receive_metadata boolean turned on.

get_metadata()

Retrieves meta data to be sent to tasks and workers. :return: A dictionary of meta data

run()

Runs the meta worker, sending meta data to the distributor. :return:

Services

Service

class zmqpipeline.Service(frontend_endpoint, backend_endpoint)

A service is a load-balanced router, accepting multiple client requests asynchronously and distributing the work to multiple workers.

run()

Runs the service. This is a blocking call.

Since this method will block the calling program, it should be the last thing the calling code does before exiting. run() will automatically shutdown the service gracefully when finished.

Returns:None
shutdown()

Shuts down the service. This is automatically called when run() is complete and the distributor exits gracefully. Client code should only invoke this method directly on exiting prematurely, for example on a KeyboardInterruptException

Returns:None

Service Worker

class zmqpipeline.ServiceWorker(id_prefix='worker')

A worker that plugs into a service. Regular workers plug into a distributor / collector pair

endpoint

The address of the broker’s backend

Return EndpointAddress:
 A valid EndpointAddress instance
handle_message(data, task_type, msgtype)

Overridden by subclassed ServiceWorkers.

Parameters:
  • data – Data the client has submitted for processing, typically in a dictionary
  • task_type (TaskType) – A registered TaskType enumerated value
  • msgtype (str) – A message type enumerated value
Returns:

Data to be sent back to the requesting client, typically a dictionary

run()

Run the service worker. This call blocks until an END message is received from the broker

Returns:None
task_type

A registered task type, or a blank string.

Return TaskType:
 A properly registered task type or an empty string

Service Client

class zmqpipeline.ServiceClient(endpoint_address, task_type='', id_prefix='client')

Enumerated typed / helpers

EndpointAddress

class zmqpipeline.EndpointAddress(address)

A valid network address for one of the components of the pipeline pattern to either bind to or connect to.

The supported protocols are:
  • tcp for cross-machine communication
  • ipc for cross-process communication
  • inproc for cross-thread communication

Note that on unix systems cross-process sockets are files, as it recommended to specify the .ipc file extension when naming ipc socket addresses.

TaskType

class zmqpipeline.TaskType(v)

Represents a task type.

A task type is any valid string that has been previously registered with the TaskType class. Unregistered task types will raise an exception.