Gear: Asynchronous Event-Driven Gearman Interface

This module implements an asynchronous event-driven interface to Gearman. It provides interfaces to build a client or worker, and access to the administrative protocol. The design approach is to keep it simple, with a relatively thin abstration of the Gearman protocol itself. It should be easy to use to build a client or worker that operates either synchronously or asynchronously.

The module also provides a simple Gearman server for use as a convenience in unit tests. The server is not designed for production use under load.

Client Example

To use the client interface, instantiate a Client, and submit a Job. For example:

import gear
client = gear.Client()
client.addServer('gearman.example.com')
client.waitForServer()  # Wait for at least one server to be connected

job = gear.Job("reverse", "test string")
client.submitJob(job)

The waitForServer() call is only necessary when running in a synchronous context. When running asynchronously, it is probably more desirable to omit that call and instead handle the NoConnectedServersError exception that submitJob may raise if no servers are connected at the time.

When Gearman returns data to the client, the Job object is updated immediately. Event handlers are called on the Client object so that subclasses have ample facilities for reacting to events synchronously.

Worker Example

To use the worker interface, create a Worker, register at least one function that the worker supports, and then wait for a Job to be dispatched to the worker.

An example of a Gearman worker:

import gear
worker = gear.Worker('reverser')
worker.addServer('gearman.example.com')
worker.registerFunction("reverse")

while True:
    job = worker.getJob()
    job.sendWorkComplete(job.arguments[::-1])

Server Example

You can run the Gearman server by executing the geard command. For help execute geard –help

usage: geard [-h] [-d] [-p PORT] [--listen-address LISTEN_ADDRESS]
             [--log-config LOG_CONFIG] [--pidfile PIDFILE] [--ssl-ca PATH]
             [--ssl-cert PATH] [--ssl-key PATH] [--acl PATH] [--keepalive]
             [--keepalive-idle TCP_KEEPIDLE]
             [--keepalive-interval TCP_KEEPINTVL]
             [--keepalive-count TCP_KEEPCNT] [--version]

Gearman server. If the statsd python module is available, set STATSD_HOST,
STATSD_PORT, and STATSD_PREFIX environment variables for statsd support.

options:
  -h, --help            show this help message and exit
  -d                    do not run as a daemon
  -p PORT               port on which to listen
  --listen-address LISTEN_ADDRESS
                        IP address or domain name to listen on
  --log-config LOG_CONFIG
                        logging config file
  --pidfile PIDFILE     PID file
  --ssl-ca PATH         path to CA certificate
  --ssl-cert PATH       path to SSL public certificate
  --ssl-key PATH        path to SSL private key
  --acl PATH            path to ACL file
  --keepalive           enable TCP keepalives in socket
  --keepalive-idle TCP_KEEPIDLE
                        TCP keepalive idle time
  --keepalive-interval TCP_KEEPINTVL
                        TCP keepalive probe interval
  --keepalive-count TCP_KEEPCNT
                        TCP keepalive probes count
  --version             show version

SSL Connections

For versions of Gearman supporting SSL connections, specify the files containing the SSL private key, public certificate, and CA certificate in the addServer() call. For example:

ssl_key = '/path/to/key.pem'
ssl_cert = '/path/to/cert.pem'
ssl_ca = '/path/to/ca.pem'
client.addServer('gearman.example.com', 4730, ssl_key, ssl_cert, ssl_ca)

All three files must be specified for SSL to be used.

API Reference

The following sections document the module’s public API. It is divided into sections focusing on implementing a client, a worker, using the administrative protocol, and then the classes that are common to all usages of the module.

Client Usage

The classes in this section should be all that are needed in order to implement a Gearman client.

Client Objects

class gear.Client(client_id='unknown')[source]

A Gearman client.

You may wish to subclass this class in order to override the default event handlers to react to Gearman events. Be sure to call the superclass event handlers so that they may perform job-related housekeeping.

Parameters:

client_id (str) – The client ID to provide to Gearman. It will appear in administrative output and be appended to the name of the logger (e.g., gear.Client.client_id). Defaults to ‘unknown’.

addServer(host, port=4730, ssl_key=None, ssl_cert=None, ssl_ca=None, keepalive=False, tcp_keepidle=7200, tcp_keepintvl=75, tcp_keepcnt=9)

Add a server to the client’s connection pool.

Any number of Gearman servers may be added to a client. The client will connect to all of them and send jobs to them in a round-robin fashion. When servers are disconnected, the client will automatically remove them from the pool, continuously try to reconnect to them, and return them to the pool when reconnected. New servers may be added at any time.

This is a non-blocking call that will return regardless of whether the initial connection succeeded. If you need to ensure that a connection is ready before proceeding, see waitForServer().

When using SSL connections, all SSL files must be specified.

Parameters:
  • host (str) – The hostname or IP address of the server.

  • port (int) – The port on which the gearman server is listening.

  • ssl_key (str) – Path to the SSL private key.

  • ssl_cert (str) – Path to the SSL certificate.

  • ssl_ca (str) – Path to the CA certificate.

  • keepalive (bool) – Whether to use TCP keepalives

  • tcp_keepidle (int) – Idle time after which to start keepalives sending

  • tcp_keepintvl (int) – Interval in seconds between TCP keepalives

  • tcp_keepcnt (int) – Count of TCP keepalives to send before disconnect

Raises:

ConfigurationError – If the host/port combination has already been added to the client.

broadcast(packet)

Send a packet to all currently connected servers.

Parameters:

packet (Packet) – The Packet to send.

getConnection()

Return a connected server.

Finds the next scheduled connected server in the round-robin rotation and returns it. It is not usually necessary to use this method external to the library, as more consumer-oriented methods such as submitJob already use it internally, but is available nonetheless if necessary.

Returns:

The next scheduled Connection object.

Return type:

Connection

Raises:

NoConnectedServersError – If there are not currently connected servers.

handleAdminRequest(request)

Handle an administrative command response from Gearman.

This method is called whenever a response to a previously issued administrative command is received from one of this client’s connections. It normally releases the wait lock on the initiating AdminRequest object.

Parameters:

request (AdminRequest) – The AdminRequest that initiated the received response.

handleDisconnect(job)

Handle a Gearman server disconnection.

If the Gearman server is disconnected, this will be called for any jobs currently associated with the server.

Parameters:

packet (Job) – The Job that was running when the server disconnected.

handleEchoRes(packet)

Handle an ECHO_RES packet.

Causes the blocking Connection.echo() invocation to return.

Parameters:

packet (Packet) – The Packet that was received.

Returns:

None

handleError(packet)

Handle an ERROR packet.

Logs the error.

Parameters:

packet (Packet) – The Packet that was received.

Returns:

None

handleJobCreated(packet)[source]

Handle a JOB_CREATED packet.

Updates the appropriate Job with the newly returned job handle.

Parameters:

packet (Packet) – The Packet that was received.

Returns:

The Job object associated with the job request.

Return type:

Job

handleOptionRes(packet)[source]

Handle an OPTION_RES packet.

Updates the set of options for the connection.

Parameters:

packet (Packet) – The Packet that was received.

Returns:

None.

handlePacket(packet)

Handle a received packet.

This method is called whenever a packet is received from any connection. It normally calls the handle method appropriate for the specific packet.

Parameters:

packet (Packet) – The Packet that was received.

handleStatusRes(packet)[source]

Handle a STATUS_RES packet.

Updates the referenced Job with the returned data.

Parameters:

packet (Packet) – The Packet that was received.

Returns:

The Job object associated with the job request.

Return type:

Job

handleWorkComplete(packet)[source]

Handle a WORK_COMPLETE packet.

Updates the referenced Job with the returned data and removes it from the list of jobs associated with the connection.

Parameters:

packet (Packet) – The Packet that was received.

Returns:

The Job object associated with the job request.

Return type:

Job

handleWorkData(packet)[source]

Handle a WORK_DATA packet.

Updates the referenced Job with the returned data.

Parameters:

packet (Packet) – The Packet that was received.

Returns:

The Job object associated with the job request.

Return type:

Job

handleWorkException(packet)[source]

Handle a WORK_Exception packet.

Updates the referenced Job with the returned data and removes it from the list of jobs associated with the connection.

Parameters:

packet (Packet) – The Packet that was received.

Returns:

The Job object associated with the job request.

Return type:

Job

handleWorkFail(packet)[source]

Handle a WORK_FAIL packet.

Updates the referenced Job with the returned data and removes it from the list of jobs associated with the connection.

Parameters:

packet (Packet) – The Packet that was received.

Returns:

The Job object associated with the job request.

Return type:

Job

handleWorkStatus(packet)[source]

Handle a WORK_STATUS packet.

Updates the referenced Job with the returned data.

Parameters:

packet (Packet) – The Packet that was received.

Returns:

The Job object associated with the job request.

Return type:

Job

handleWorkWarning(packet)[source]

Handle a WORK_WARNING packet.

Updates the referenced Job with the returned data.

Parameters:

packet (Packet) – The Packet that was received.

Returns:

The Job object associated with the job request.

Return type:

Job

reportTimingStats(ptype, duration)

Report processing times by packet type

This method is called by handlePacket to report how long processing took for each packet. The default implementation does nothing.

Parameters:
  • ptype (bytes) – The packet type (one of the packet types in constants).

  • duration (float) – The time (in seconds) it took to process the packet.

sendPacket(packet, connection)

Send a packet to a single connection, removing it from the list of active connections if that fails.

Parameters:
setOption(name, timeout=30)[source]

Set an option for all connections.

Parameters:
  • name (str) – The option name to set.

  • timeout (int) – How long to wait (in seconds) for a response from the server before giving up (default: 30 seconds).

Returns:

True if the option was set on all connections, otherwise False

Return type:

bool

shutdown()

Close all connections and stop all running threads.

The object may no longer be used after shutdown is called.

submitJob(job, background=False, precedence=0, timeout=30)[source]

Submit a job to a Gearman server.

Submits the provided job to the next server in this client’s round-robin connection pool.

If the job is a foreground job, updates will be made to the supplied Job object as they are received.

Parameters:
  • job (Job) – The Job to submit.

  • background (bool) – Whether the job should be backgrounded.

  • precedence (int) – Whether the job should have normal, low, or high precedence. One of PRECEDENCE_NORMAL, PRECEDENCE_LOW, or PRECEDENCE_HIGH

  • timeout (int) – How long to wait (in seconds) for a response from the server before giving up (default: 30 seconds).

Raises:

ConfigurationError – If an invalid precendence value is supplied.

waitForServer(timeout=None)

Wait for at least one server to be connected.

Block until at least one gearman server is connected.

Parameters:

timeout (numeric) – Number of seconds to wait for a connection. If None, wait forever (default: no timeout).

Raises:

TimeoutError – If the timeout is reached before any server connects.

Job Objects

class gear.Job(name, arguments, unique=None)[source]

A job to run or being run by Gearman.

Parameters:
  • name (str) – The name of the job.

  • arguments (bytes) – The opaque data blob to be passed to the worker as arguments.

  • unique (str) – A byte string to uniquely identify the job to Gearman (optional).

The following instance attributes are available:

name (str)

The name of the job. Assumed to be utf-8.

arguments (bytes)

The opaque data blob passed to the worker as arguments.

unique (str or None)

The unique ID of the job (if supplied).

handle (bytes or None)

The Gearman job handle. None if no job handle has been received yet.

data (list of byte-arrays)

The result data returned from Gearman. Each packet appends an element to the list. Depending on the nature of the data, the elements may need to be concatenated before use. This is returned as a snapshot copy of the data to prevent accidental attempts at modification which will be lost.

exception (bytes or None)

Exception information returned from Gearman. None if no exception has been received.

warning (bool)

Whether the worker has reported a warning.

complete (bool)

Whether the job is complete.

failure (bool)

Whether the job has failed. Only set when complete is True.

numerator (bytes or None)

The numerator of the completion ratio reported by the worker. Only set when a status update is sent by the worker.

denominator (bytes or None)

The denominator of the completion ratio reported by the worker. Only set when a status update is sent by the worker.

fraction_complete (float or None)

The fractional complete ratio reported by the worker. Only set when a status update is sent by the worker.

known (bool or None)

Whether the job is known to Gearman. Only set by handleStatusRes() in response to a getStatus() query.

running (bool or None)

Whether the job is running. Only set by handleStatusRes() in response to a getStatus() query.

connection (Connection or None)

The connection associated with the job. Only set after the job has been submitted to a Gearman server.

data_type

alias of list

Worker Usage

The classes in this section should be all that are needed in order to implement a Gearman worker.

Worker Objects

class gear.Worker(client_id=None, worker_id=None)[source]

A Gearman worker.

Parameters:
  • client_id (str) – The client ID to provide to Gearman. It will appear in administrative output and be appended to the name of the logger (e.g., gear.Worker.client_id).

  • worker_id (str) – The client ID to provide to Gearman. It will appear in administrative output and be appended to the name of the logger (e.g., gear.Worker.client_id). This parameter name is deprecated, use client_id instead.

addServer(host, port=4730, ssl_key=None, ssl_cert=None, ssl_ca=None, keepalive=False, tcp_keepidle=7200, tcp_keepintvl=75, tcp_keepcnt=9)

Add a server to the client’s connection pool.

Any number of Gearman servers may be added to a client. The client will connect to all of them and send jobs to them in a round-robin fashion. When servers are disconnected, the client will automatically remove them from the pool, continuously try to reconnect to them, and return them to the pool when reconnected. New servers may be added at any time.

This is a non-blocking call that will return regardless of whether the initial connection succeeded. If you need to ensure that a connection is ready before proceeding, see waitForServer().

When using SSL connections, all SSL files must be specified.

Parameters:
  • host (str) – The hostname or IP address of the server.

  • port (int) – The port on which the gearman server is listening.

  • ssl_key (str) – Path to the SSL private key.

  • ssl_cert (str) – Path to the SSL certificate.

  • ssl_ca (str) – Path to the CA certificate.

  • keepalive (bool) – Whether to use TCP keepalives

  • tcp_keepidle (int) – Idle time after which to start keepalives sending

  • tcp_keepintvl (int) – Interval in seconds between TCP keepalives

  • tcp_keepcnt (int) – Count of TCP keepalives to send before disconnect

Raises:

ConfigurationError – If the host/port combination has already been added to the client.

broadcast(packet)

Send a packet to all currently connected servers.

Parameters:

packet (Packet) – The Packet to send.

getConnection()

Return a connected server.

Finds the next scheduled connected server in the round-robin rotation and returns it. It is not usually necessary to use this method external to the library, as more consumer-oriented methods such as submitJob already use it internally, but is available nonetheless if necessary.

Returns:

The next scheduled Connection object.

Return type:

Connection

Raises:

NoConnectedServersError – If there are not currently connected servers.

getJob()[source]

Get a job from Gearman.

Blocks until a job is received. This method is re-entrant, so it is safe to call this method on a single worker from multiple threads. In that case, one of them at random will receive the job assignment.

Returns:

The WorkerJob assigned.

Return type:

WorkerJob.

Raises:

InterruptedError – If interrupted (by stopWaitingForJobs()) before a job is received.

handleAdminRequest(request)

Handle an administrative command response from Gearman.

This method is called whenever a response to a previously issued administrative command is received from one of this client’s connections. It normally releases the wait lock on the initiating AdminRequest object.

Parameters:

request (AdminRequest) – The AdminRequest that initiated the received response.

handleDisconnect(job)

Handle a Gearman server disconnection.

If the Gearman server is disconnected, this will be called for any jobs currently associated with the server.

Parameters:

packet (Job) – The Job that was running when the server disconnected.

handleEchoRes(packet)

Handle an ECHO_RES packet.

Causes the blocking Connection.echo() invocation to return.

Parameters:

packet (Packet) – The Packet that was received.

Returns:

None

handleError(packet)

Handle an ERROR packet.

Logs the error.

Parameters:

packet (Packet) – The Packet that was received.

Returns:

None

handleJobAssign(packet)[source]

Handle a JOB_ASSIGN packet.

Adds a WorkerJob to the internal queue to be picked up by any threads waiting in getJob().

Parameters:

packet (Packet) – The Packet that was received.

handleJobAssignUnique(packet)[source]

Handle a JOB_ASSIGN_UNIQ packet.

Adds a WorkerJob to the internal queue to be picked up by any threads waiting in getJob().

Parameters:

packet (Packet) – The Packet that was received.

handleNoJob(packet)[source]

Handle a NO_JOB packet.

Sends a PRE_SLEEP packet on the same connection.

Parameters:

packet (Packet) – The Packet that was received.

handleNoop(packet)[source]

Handle a NOOP packet.

Sends a GRAB_JOB_UNIQ packet on the same connection. GRAB_JOB_UNIQ will return jobs regardless of whether they have been specified with a unique identifier when submitted. If they were not, then WorkerJob.unique attribute will be None.

Parameters:

packet (Packet) – The Packet that was received.

handlePacket(packet)

Handle a received packet.

This method is called whenever a packet is received from any connection. It normally calls the handle method appropriate for the specific packet.

Parameters:

packet (Packet) – The Packet that was received.

job_class

alias of WorkerJob

registerFunction(name, timeout=None)[source]

Register a function with Gearman.

If a timeout value is supplied, the function will be registered with CAN_DO_TIMEOUT.

Parameters:
  • name (str) – The name of the function to register.

  • timeout (numeric) – The timeout value (optional).

reportTimingStats(ptype, duration)

Report processing times by packet type

This method is called by handlePacket to report how long processing took for each packet. The default implementation does nothing.

Parameters:
  • ptype (bytes) – The packet type (one of the packet types in constants).

  • duration (float) – The time (in seconds) it took to process the packet.

sendPacket(packet, connection)

Send a packet to a single connection, removing it from the list of active connections if that fails.

Parameters:
setFunctions(functions)[source]

Replace the set of functions registered with Gearman.

Accepts a list of FunctionRecord objects which represents the complete set of functions that should be registered with Gearman. Any existing functions will be unregistered and these registered in their place. If the empty list is supplied, then the Gearman registered function set will be cleared.

Parameters:

functions (list) – A list of FunctionRecord objects.

shutdown()

Close all connections and stop all running threads.

The object may no longer be used after shutdown is called.

stopWaitingForJobs()[source]

Interrupts all running getJob() calls, which will raise an exception.

unRegisterFunction(name)[source]

Remove a function from Gearman’s registry.

Parameters:

name (str) – The name of the function to remove.

waitForServer(timeout=None)

Wait for at least one server to be connected.

Block until at least one gearman server is connected.

Parameters:

timeout (numeric) – Number of seconds to wait for a connection. If None, wait forever (default: no timeout).

Raises:

TimeoutError – If the timeout is reached before any server connects.

FunctionRecord Objects

class gear.FunctionRecord(name, timeout=None)[source]

Represents a function that should be registered with Gearman.

This class only directly needs to be instatiated for use with Worker.setFunctions(). If a timeout value is supplied, the function will be registered with CAN_DO_TIMEOUT.

Parameters:
  • name (str) – The name of the function to register.

  • timeout (numeric) – The timeout value (optional).

WorkerJob Objects

class gear.WorkerJob(handle, name, arguments, unique=None)[source]

A job that Gearman has assigned to a Worker. Not intended to be instantiated directly, but rather returned by Worker.getJob().

Parameters:
  • handle (str) – The job handle assigned by gearman.

  • name (str) – The name of the job.

  • arguments (bytes) – The opaque data blob passed to the worker as arguments.

  • unique (str) – A byte string to uniquely identify the job to Gearman (optional).

The following instance attributes are available:

name (str)

The name of the job. Assumed to be utf-8.

arguments (bytes)

The opaque data blob passed to the worker as arguments.

unique (str or None)

The unique ID of the job (if supplied).

handle (bytes)

The Gearman job handle.

connection (Connection or None)

The connection associated with the job. Only set after the job has been submitted to a Gearman server.

sendWorkComplete(data=b'')[source]

Send a WORK_COMPLETE packet to the client.

Parameters:

data (bytes) – The data to be sent to the client (optional).

sendWorkData(data=b'')[source]

Send a WORK_DATA packet to the client.

Parameters:

data (bytes) – The data to be sent to the client (optional).

sendWorkException(data=b'')[source]

Send a WORK_EXCEPTION packet to the client.

Parameters:

data (bytes) – The exception data to be sent to the client (optional).

sendWorkFail()[source]

Send a WORK_FAIL packet to the client.

sendWorkStatus(numerator, denominator)[source]

Send a WORK_STATUS packet to the client.

Sends a numerator and denominator that together represent the fraction complete of the job.

Parameters:
  • numerator (numeric) – The numerator of the fraction complete.

  • denominator (numeric) – The denominator of the fraction complete.

sendWorkWarning(data=b'')[source]

Send a WORK_WARNING packet to the client.

Parameters:

data (bytes) – The data to be sent to the client (optional).

Administrative Protocol

Gearman provides an administrative protocol that is multiplexed on the same connection as the normal binary protocol for jobs. The classes in this section are useful for working with that protocol. They need to be used with an existing Connection object; either one obtained via a Client or Worker, or via direct instantiation of Connection to a Gearman server.

AdminRequest Objects

class gear.AdminRequest(*arguments)[source]

Encapsulates a request (and response) sent over the administrative protocol. This is a base class that may not be instantiated dircectly; a subclass implementing a specific command must be used instead.

Parameters:

arguments (list) – A list of byte string arguments for the command.

The following instance attributes are available:

response (bytes)

The response from the server.

arguments (bytes)

The argument supplied with the constructor.

command (bytes)

The administrative command.

class gear.StatusAdminRequest[source]

A “status” administrative request.

The response from gearman may be found in the response attribute.

class gear.ShowJobsAdminRequest[source]

A “show jobs” administrative request.

The response from gearman may be found in the response attribute.

class gear.ShowUniqueJobsAdminRequest[source]

A “show unique jobs” administrative request.

The response from gearman may be found in the response attribute.

class gear.CancelJobAdminRequest(handle)[source]

A “cancel job” administrative request.

Parameters:

handle (str) – The job handle to be canceled.

The response from gearman may be found in the response attribute.

class gear.VersionAdminRequest[source]

A “version” administrative request.

The response from gearman may be found in the response attribute.

Server Usage

Logging

To enable Gearman server logging you can setup a log configuration file and pass it to geard (i.e. geard –log-config=logging.config)

Example logging.config:

[loggers]
keys=root,gear

[handlers]
keys=console,debug,info

[formatters]
keys=simple

[logger_root]
level=WARNING
handlers=console

[logger_gear]
level=INFO
handlers=debug,info
qualname=gear

[handler_console]
level=WARNING
class=StreamHandler
formatter=simple
args=(sys.stdout,)

[handler_debug]
level=DEBUG
class=logging.handlers.TimedRotatingFileHandler
formatter=simple
args=('/var/log/gear/debug.log', 'midnight', 1, 30,)

[handler_info]
level=INFO
class=logging.handlers.TimedRotatingFileHandler
formatter=simple
args=('/var/log/gear/info.log', 'midnight', 1, 30,)

[formatter_simple]
format=%(asctime)s %(levelname)s %(name)s: %(message)s
datefmt=

ACL

The syntax of the optional ACL file consists of a number of sections identified by the SSL certificate Common Name Subject, and the arguments to the ACLEntry constructor as key-value pairs:

[<subject>]
register=<regex>
invoke=<regex>
grant=<boolean>

For example:

[my_worker]
register=.*

[my_client]
invoke=.*

[my_node_manager]
grant=True

Server Objects

class gear.Server(port=4730, ssl_key=None, ssl_cert=None, ssl_ca=None, statsd_host=None, statsd_port=8125, statsd_prefix=None, server_id=None, acl=None, host=None, keepalive=False, tcp_keepidle=7200, tcp_keepintvl=75, tcp_keepcnt=9)[source]

A simple gearman server implementation for testing (not for production use).

Parameters:
  • port (int) – The TCP port on which to listen.

  • ssl_key (str) – Path to the SSL private key.

  • ssl_cert (str) – Path to the SSL certificate.

  • ssl_ca (str) – Path to the CA certificate.

  • statsd_host (str) – statsd hostname. None means disabled (the default).

  • statsd_port (str) – statsd port (defaults to 8125).

  • statsd_prefix (str) – statsd key prefix.

  • client_id (str) – The ID associated with this server. It will be appending to the name of the logger (e.g., gear.Server.server_id). Defaults to None (unused).

  • acl (ACL) – An ACL object if the server should apply access control rules to its connections.

  • host (str) – Host name or IPv4/IPv6 address to bind to. Defaults to “whatever getaddrinfo() returns”, which might be IPv4-only.

  • keepalive (bool) – Whether to use TCP keepalives

  • tcp_keepidle (int) – Idle time after which to start keepalives sending

  • tcp_keepintvl (int) – Interval in seconds between TCP keepalives

  • tcp_keepcnt (int) – Count of TCP keepalives to send before disconnect

getQueue()[source]

Returns a copy of all internal queues in a flattened form.

Returns:

The Gearman queue.

Return type:

list of WorkerJob.

handleAdminRequest(request)[source]

Handle an administrative command response from Gearman.

This method is called whenever a response to a previously issued administrative command is received from one of this client’s connections. It normally releases the wait lock on the initiating AdminRequest object.

Parameters:

request (AdminRequest) – The AdminRequest that initiated the received response.

handleDisconnect(job)

Handle a Gearman server disconnection.

If the Gearman server is disconnected, this will be called for any jobs currently associated with the server.

Parameters:

packet (Job) – The Job that was running when the server disconnected.

handlePacket(packet)

Handle a received packet.

This method is called whenever a packet is received from any connection. It normally calls the handle method appropriate for the specific packet.

Parameters:

packet (Packet) – The Packet that was received.

reportTimingStats(ptype, duration)[source]

Report processing times by packet type

This method is called by handlePacket to report how long processing took for each packet. If statsd is configured, timing and counts are reported with the key “prefix.packet.NAME”.

Parameters:
  • ptype (bytes) – The packet type (one of the packet types in constants).

  • duration (float) – The time (in seconds) it took to process the packet.

shutdown()

Close all connections and stop all running threads.

The object may no longer be used after shutdown is called.

Access Control

The gear server supports authorization via access control lists. When an ACL object is supplied to the server (or a file on the command line), gear changes from the normal Gearman mode of allow-by-default to deny-by-default and only clients with ACL entries will be able to perform actions such as registering functions or submitting jobs. Authorization is based on the SSL certificate Common Name Subject associated with the connection. An ACL object may be modified programatically at run-time.

The administrative protocol supports modifying ACLs with the following commands:

acl list

List the current acls:

acl list
client      register=None   invoke=.*       grant=True
worker      register=.*     invoke=None     grant=True
.
acl grant <verb> <subject> <pattern>

Grant the <verb> action for functions matching <pattern> to <subject>. Verbs can be one of register, invoke, or grant. This requires the current connection have the grant permission. Example:

acl grant register worker .*
OK
acl revoke <verb> <subject>

Revoke the <verb> action from <subject>. Verbs can be one of register, invoke, grant, or all to indicate all permissions for the subject should be revoked. This requires the grant permission, except that a subject may always revoke its own permissions. Example:

acl revoke register worker
OK
acl self-revoke <verb>

Revoke the <verb> action from connections associted with the current certificate subject. Verbs can be one of register, invoke, grant, or all to indicate all permissions for the subject should be revoked. This is similar to acl revoke but is a convenience method so that a subject does not need to know its own common name. A subject always has permission to revoke its own permissions. Example:

acl self-revoke register
OK

ACL Objects

class gear.ACL[source]

An access control list.

ACLs are deny-by-default. The checked actions are only allowed if there is an explicit rule in the ACL granting permission for a given client (identified by SSL certificate Common Name Subject) to perform that action.

add(entry)[source]

Add an ACL entry.

Parameters:

entry (Entry) – The ACLEntry to add.

Raises:

ACLError – If there is already an entry for the subject.

canGrant(subject)[source]

Check whether a subject is permitted to grant access to others.

Parameters:

subject (str) – The SSL certificate Subject Common Name to check against.

Returns:

A boolean indicating whether the action should be permitted.

canInvoke(subject, name)[source]

Check whether a subject is permitted to invoke a function.

Parameters:
  • subject (str) – The SSL certificate Subject Common Name to check against.

  • name (str) – The function name to check.

Returns:

A boolean indicating whether the action should be permitted.

canRegister(subject, name)[source]

Check whether a subject is permitted to register a function.

Parameters:
  • subject (str) – The SSL certificate Subject Common Name to check against.

  • name (str) – The function name to check.

Returns:

A boolean indicating whether the action should be permitted.

getEntries()[source]

Return a list of current ACL entries.

Returns:

A list of ACLEntry objects.

grantGrant(subject)[source]

Grant permission to grant permissions to other connections.

Parameters:

subject (str) – The SSL certificate Subject Common Name to which the entry applies.

grantInvoke(subject, invoke)[source]

Grant permission to invoke certain functions.

Parameters:
  • subject (str) – The SSL certificate Subject Common Name to which the entry applies.

  • invoke (str) – A regular expression that matches the jobs that connections with this certificate are permitted to invoke. Also implies the permission to cancel the same set of jobs in the queue.

grantRegister(subject, register)[source]

Grant permission to register certain functions.

Parameters:
  • subject (str) – The SSL certificate Subject Common Name to which the entry applies.

  • register (str) – A regular expression that matches the jobs that connections with this certificate are permitted to register.

remove(subject)[source]

Remove an ACL entry.

Parameters:

subject (str) – The SSL certificate Subject Common Name to remove from the ACL.

Raises:

ACLError – If there is no entry for the subject.

revokeGrant(subject)[source]

Revoke permission to grant permissions to other connections.

Parameters:

subject (str) – The SSL certificate Subject Common Name to which the entry applies.

revokeInvoke(subject)[source]

Revoke permission to invoke all functions.

Parameters:

subject (str) – The SSL certificate Subject Common Name to which the entry applies.

revokeRegister(subject)[source]

Revoke permission to register all functions.

Parameters:

subject (str) – The SSL certificate Subject Common Name to which the entry applies.

ACLEntry Objects

class gear.ACLEntry(subject, register=None, invoke=None, grant=False)[source]

An access control list entry.

Parameters:
  • subject (str) – The SSL certificate Subject Common Name to which the entry applies.

  • register (str) – A regular expression that matches the jobs that connections with this certificate are permitted to register.

  • invoke (str) – A regular expression that matches the jobs that connections with this certificate are permitted to invoke. Also implies the permission to cancel the same set of jobs in the queue.

  • grant (boolean) – A flag indicating whether connections with this certificate are permitted to grant access to other connections. Also implies the permission to revoke access from other connections. The ability to self-revoke access is always implied.

canInvoke(name)[source]

Check whether this subject is permitted to register a function.

Parameters:

name (str) – The function name to check.

Returns:

A boolean indicating whether the action should be permitted.

canRegister(name)[source]

Check whether this subject is permitted to register a function.

Parameters:

name (str) – The function name to check.

Returns:

A boolean indicating whether the action should be permitted.

isEmpty()[source]

Checks whether this entry grants any permissions at all.

Returns:

False if any permission is granted, otherwise True.

setGrant(grant)[source]

Sets whether this subject can grant ACLs to others.

Parameters:

grant (boolean) – A flag indicating whether connections with this certificate are permitted to grant access to other connections. Also implies the permission to revoke access from other connections. The ability to self-revoke access is always implied.

setInvoke(invoke)[source]

Sets the functions that this subject can invoke.

Parameters:

invoke (str) – A regular expression that matches the jobs that connections with this certificate are permitted to invoke.

setRegister(register)[source]

Sets the functions that this subject can register.

Parameters:

register (str) – A regular expression that matches the jobs that connections with this certificate are permitted to register.

Common

These classes do not normally need to be directly instatiated to use the gear API, but they may be returned or otherwise be accessible from other classes in this module. They generally operate at a lower level, but still form part of the public API.

Connection Objects

class gear.Connection(host, port, ssl_key=None, ssl_cert=None, ssl_ca=None, client_id='unknown', keepalive=False, tcp_keepidle=7200, tcp_keepintvl=75, tcp_keepcnt=9)[source]

A Connection to a Gearman Server.

Parameters:
  • client_id (str) – The client ID associated with this connection. It will be appending to the name of the logger (e.g., gear.Connection.client_id). Defaults to ‘unknown’.

  • keepalive (bool) – Whether to use TCP keepalives

  • tcp_keepidle (int) – Idle time after which to start keepalives sending

  • tcp_keepintvl (int) – Interval in seconds between TCP keepalives

  • tcp_keepcnt (int) – Count of TCP keepalives to send before disconnect

connect()[source]

Open a connection to the server.

Raises:

ConnectionError – If unable to open the socket.

disconnect()[source]

Disconnect from the server and remove all associated state data.

echo(data=None, timeout=30)[source]

Perform an echo test on the server.

This method waits until the echo response has been received or the timeout has been reached.

Parameters:
  • data (bytes) – The data to request be echoed. If None, a random unique byte string will be generated.

  • timeout (numeric) – Number of seconds to wait until the response is received. If None, wait forever (default: 30 seconds).

Raises:

TimeoutError – If the timeout is reached before the response is received.

readPacket()[source]

Read one packet or administrative response from the server.

Returns:

The Packet or AdminRequest read.

Return type:

Packet or AdminRequest

reconnect()[source]

Disconnect from and reconnect to the server, removing all associated state data.

sendAdminRequest(request, timeout=90)[source]

Send an administrative request to the server.

Parameters:
  • request (AdminRequest) – The AdminRequest to send.

  • timeout (numeric) – Number of seconds to wait until the response is received. If None, wait forever (default: 90 seconds).

Raises:

TimeoutError – If the timeout is reached before the response is received.

sendPacket(packet)[source]

Send a packet to the server.

Parameters:

packet (Packet) – The Packet to send.

sendRaw(data)[source]

Send raw data over the socket.

:arg bytes data The raw data to send

Packet Objects

class gear.Packet(code, ptype, data, connection=None)[source]

A data packet received from or to be sent over a Connection.

Parameters:
  • code (bytes) – The Gearman magic code (constants.REQ or constants.RES)

  • ptype (bytes) – The packet type (one of the packet types in constants).

  • data (bytes) – The data portion of the packet.

  • connection (Connection) – The connection on which the packet was received (optional).

Raises:

InvalidDataError – If the magic code is unknown.

getArgument(index, last=False)[source]

Get the nth argument from the packet data.

Parameters:
  • index (int) – The argument index to look up.

  • last (bool) – Whether this is the last argument (and thus nulls should be ignored)

Returns:

The argument value.

Return type:

bytes

getJob()[source]

Get the Job associated with the job handle in this packet.

Returns:

The Job for this packet.

Return type:

Job

Raises:

UnknownJobError – If the job is not known.

toBinary()[source]

Return a Gearman wire protocol binary representation of the packet.

Returns:

The packet in binary form.

Return type:

bytes

Exceptions

exception gear.ConnectionError[source]
exception gear.InvalidDataError[source]
exception gear.ConfigurationError[source]
exception gear.NoConnectedServersError[source]
exception gear.UnknownJobError[source]
exception gear.InterruptedError[source]

Constants

These constants are used by public API classes.

gear.PRECEDENCE_NORMAL

Normal job precedence.

gear.PRECEDENCE_LOW

Low job precedence.

gear.PRECEDENCE_HIGH

High job precedence.

Protocol Constants

These are not necessary for normal API usage. See the Gearman protocol reference for an explanation of each of these.

Magic Codes

gear.constants.REQ

The Gearman magic code for a request.

gear.constants.RES

The Gearman magic code for a response.

Packet Types

gear.constants.CAN_DO
gear.constants.CANT_DO
gear.constants.RESET_ABILITIES
gear.constants.PRE_SLEEP
gear.constants.NOOP
gear.constants.SUBMIT_JOB
gear.constants.JOB_CREATED
gear.constants.GRAB_JOB
gear.constants.NO_JOB
gear.constants.JOB_ASSIGN
gear.constants.WORK_STATUS
gear.constants.WORK_COMPLETE
gear.constants.WORK_FAIL
gear.constants.GET_STATUS
gear.constants.ECHO_REQ
gear.constants.ECHO_RES
gear.constants.SUBMIT_JOB_BG
gear.constants.ERROR
gear.constants.STATUS_RES
gear.constants.SUBMIT_JOB_HIGH
gear.constants.SET_CLIENT_ID
gear.constants.CAN_DO_TIMEOUT
gear.constants.ALL_YOURS
gear.constants.WORK_EXCEPTION
gear.constants.OPTION_REQ
gear.constants.OPTION_RES
gear.constants.WORK_DATA
gear.constants.WORK_WARNING
gear.constants.GRAB_JOB_UNIQ
gear.constants.JOB_ASSIGN_UNIQ
gear.constants.SUBMIT_JOB_HIGH_BG
gear.constants.SUBMIT_JOB_LOW
gear.constants.SUBMIT_JOB_LOW_BG
gear.constants.SUBMIT_JOB_SCHED
gear.constants.SUBMIT_JOB_EPOCH

Indices and tables