你正在阅读 Celery 3.1 的文档。开发版本文档见:
Client for worker remote control commands.
Server implementation is in celery.worker.control.
class celery.app.control.Inspect(destination=None, timeout=1, callback=None, connection=None, app=None, limit=None)[源代码]
app = None
hello(from_node, revoked=None)
objgraph(type='Request', n=200, max_depth=10)
class celery.app.control.Control(app=None)[源代码]
class Mailbox(namespace, type='direct', connection=None, clock=None, accept=None)
Node(hostname=None, state=None, channel=None, handlers=None)
abcast(command, kwargs={})
accept = ['json']
call(destination, command, kwargs={}, timeout=None, callback=None, channel=None)
cast(destination, command, kwargs={})
connection = None
exchange = None
exchange_fmt = '%s.pidbox'
multi_call(command, kwargs={}, timeout=1, limit=None, callback=None, channel=None)
namespace = None
Node 的别名
oid None
reply_exchange = None
reply_exchange_fmt = 'reply.%s.pidbox'
reply_queue None
type = 'direct'
Control.add_consumer(queue, exchange=None, exchange_type='direct', routing_key=None, options=None, **kwargs)[源代码]
Tell all (or specific) workers to start consuming from a new queue.
Only the queue name is required as if only the queue is specified
then the exchange/routing key will be set to the same name (
like automatic queues do).
This command does not respect the default queue/exchange
options in the configuration.
参数: |
- queue – Name of queue to start consuming from.
- exchange – Optional name of exchange.
- exchange_type – Type of exchange (defaults to ‘direct’)
command to, when empty broadcast to all workers.
- routing_key – Optional routing key.
- options – Additional options as supported
by kombu.entitiy.Queue.from_dict().
See broadcast() for supported keyword arguments.
Control.broadcast(command, arguments=None, destination=None, connection=None, reply=False, timeout=1, limit=None, callback=None, channel=None, **extra_kwargs)[源代码]
Broadcast a control command to the celery workers.
参数: |
- command – Name of command to send.
- arguments – Keyword arguments for the command.
- destination – If set, a list of the hosts to send the
command to, when empty broadcast to all workers.
- connection – Custom broker connection to use, if not set,
a connection will be established automatically.
- reply – Wait for and return the reply.
- timeout – Timeout in seconds to wait for the reply.
- limit – Limit number of replies.
- callback – Callback called immediately for each reply
Control.cancel_consumer(queue, **kwargs)[源代码]
Tell all (or specific) workers to stop consuming from queue.
Supports the same keyword arguments as broadcast().
Control.disable_events(destination=None, **kwargs)[源代码]
Tell all (or specific) workers to enable events.
Discard all waiting tasks.
This will ignore all tasks waiting for execution, and they will
be deleted from the messaging server.
返回: | the number of tasks discarded. |
Control.election(id, topic, action=None, connection=None)
Control.enable_events(destination=None, **kwargs)[源代码]
Tell all (or specific) workers to enable events.
Control.inspect None[源代码]
Control.ping(destination=None, timeout=1, **kwargs)[源代码]
Ping all (or specific) workers.
Will return the list of answers.
See broadcast() for supported keyword arguments.
Control.pool_grow(n=1, destination=None, **kwargs)[源代码]
Tell all (or specific) workers to grow the pool by n.
Supports the same arguments as broadcast().
Control.pool_shrink(n=1, destination=None, **kwargs)[源代码]
Tell all (or specific) workers to shrink the pool by n.
Supports the same arguments as broadcast().
Discard all waiting tasks.
This will ignore all tasks waiting for execution, and they will
be deleted from the messaging server.
返回: | the number of tasks discarded. |
Control.rate_limit(task_name, rate_limit, destination=None, **kwargs)[源代码]
Tell all (or specific) workers to set a new rate limit
for task by type.
参数: |
- task_name – Name of task to change rate limit for.
- rate_limit – The rate limit as tasks per second, or a rate limit
string (‘100/m’, etc.
see celery.task.base.Task.rate_limit for
more information).
See broadcast() for supported keyword arguments.
Control.revoke(task_id, destination=None, terminate=False, signal='SIGTERM', **kwargs)[源代码]
Tell all (or specific) workers to revoke a task by id.
If a task is revoked, the workers will ignore the task and
not execute it after all.
参数: |
- task_id – Id of the task to revoke.
- terminate – Also terminate the process currently working
on the task (if any).
- signal – Name of signal to send to process if terminate.
Default is TERM.
See broadcast() for supported keyword arguments.
Control.time_limit(task_name, soft=None, hard=None, **kwargs)[源代码]
Tell all (or specific) workers to set time limits for
a task by type.
参数: |
- task_name – Name of task to change time limits for.
- soft – New soft time limit (in seconds).
- hard – New hard time limit (in seconds).
Any additional keyword arguments are passed on to broadcast().