你正在阅读 Celery 3.1 的文档。开发版本文档见: 此处.
# -*- coding: utf-8 -*-
"""
celery.app.control
~~~~~~~~~~~~~~~~~~~
Client for worker remote control commands.
Server implementation is in :mod:`celery.worker.control`.
"""
from __future__ import absolute_import
from __future__ import with_statement
from kombu.pidbox import Mailbox
from kombu.utils import cached_property
from . import app_or_default
[文档]class Inspect(object):
app = None
def __init__(self, destination=None, timeout=1, callback=None,
connection=None, app=None, limit=None):
self.app = app or self.app
self.destination = destination
self.timeout = timeout
self.callback = callback
self.connection = connection
self.limit = limit
def _prepare(self, reply):
if not reply:
return
by_node = flatten_reply(reply)
if self.destination and \
not isinstance(self.destination, (list, tuple)):
return by_node.get(self.destination)
return by_node
def _request(self, command, **kwargs):
return self._prepare(self.app.control.broadcast(
command,
arguments=kwargs,
destination=self.destination,
callback=self.callback,
connection=self.connection,
limit=self.limit,
timeout=self.timeout, reply=True,
))
[文档] def registered(self, *taskinfoitems):
return self._request('dump_tasks', taskinfoitems=taskinfoitems)
registered_tasks = registered
[文档]class Control(object):
Mailbox = Mailbox
def __init__(self, app=None):
self.app = app_or_default(app)
self.mailbox = self.Mailbox('celery', type='fanout')
@cached_property
[文档] def purge(self, connection=None):
"""Discard all waiting tasks.
This will ignore all tasks waiting for execution, and they will
be deleted from the messaging server.
:returns: the number of tasks discarded.
"""
with self.app.connection_or_acquire(connection) as conn:
return self.app.amqp.TaskConsumer(conn).purge()
discard_all = purge
[文档] def revoke(self, task_id, destination=None, terminate=False,
signal='SIGTERM', **kwargs):
"""Tell all (or specific) workers to revoke a task by id.
If a task is revoked, the workers will ignore the task and
not execute it after all.
:param task_id: Id of the task to revoke.
:keyword terminate: Also terminate the process currently working
on the task (if any).
:keyword signal: Name of signal to send to process if terminate.
Default is TERM.
See :meth:`broadcast` for supported keyword arguments.
"""
return self.broadcast('revoke', destination=destination,
arguments={'task_id': task_id,
'terminate': terminate,
'signal': signal}, **kwargs)
[文档] def ping(self, destination=None, timeout=1, **kwargs):
"""Ping all (or specific) workers.
Returns answer from alive workers.
See :meth:`broadcast` for supported keyword arguments.
"""
return self.broadcast('ping', reply=True, destination=destination,
timeout=timeout, **kwargs)
[文档] def rate_limit(self, task_name, rate_limit, destination=None, **kwargs):
"""Tell all (or specific) workers to set a new rate limit
for task by type.
:param task_name: Name of task to change rate limit for.
:param rate_limit: The rate limit as tasks per second, or a rate limit
string (`'100/m'`, etc.
see :attr:`celery.task.base.Task.rate_limit` for
more information).
See :meth:`broadcast` for supported keyword arguments.
"""
return self.broadcast('rate_limit', destination=destination,
arguments={'task_name': task_name,
'rate_limit': rate_limit},
**kwargs)
[文档] def add_consumer(self, queue, exchange=None, exchange_type='direct',
routing_key=None, options=None, **kwargs):
"""Tell all (or specific) workers to start consuming from a new queue.
Only the queue name is required as if only the queue is specified
then the exchange/routing key will be set to the same name (
like automatic queues do).
.. note::
This command does not respect the default queue/exchange
options in the configuration.
:param queue: Name of queue to start consuming from.
:keyword exchange: Optional name of exchange.
:keyword exchange_type: Type of exchange (defaults to 'direct')
command to, when empty broadcast to all workers.
:keyword routing_key: Optional routing key.
:keyword options: Additional options as supported
by :meth:`kombu.entitiy.Queue.from_dict`.
See :meth:`broadcast` for supported keyword arguments.
"""
return self.broadcast(
'add_consumer',
arguments=dict({'queue': queue, 'exchange': exchange,
'exchange_type': exchange_type,
'routing_key': routing_key}, **options or {}),
**kwargs
)
[文档] def cancel_consumer(self, queue, **kwargs):
"""Tell all (or specific) workers to stop consuming from ``queue``.
Supports the same keyword arguments as :meth:`broadcast`.
"""
return self.broadcast(
'cancel_consumer', arguments={'queue': queue}, **kwargs
)
[文档] def time_limit(self, task_name, soft=None, hard=None, **kwargs):
"""Tell all (or specific) workers to set time limits for
a task by type.
:param task_name: Name of task to change time limits for.
:keyword soft: New soft time limit (in seconds).
:keyword hard: New hard time limit (in seconds).
Any additional keyword arguments are passed on to :meth:`broadcast`.
"""
return self.broadcast(
'time_limit',
arguments={'task_name': task_name,
'hard': hard, 'soft': soft}, **kwargs)
[文档] def enable_events(self, destination=None, **kwargs):
"""Tell all (or specific) workers to enable events."""
return self.broadcast('enable_events', {}, destination, **kwargs)
[文档] def disable_events(self, destination=None, **kwargs):
"""Tell all (or specific) workers to enable events."""
return self.broadcast('disable_events', {}, destination, **kwargs)
[文档] def pool_grow(self, n=1, destination=None, **kwargs):
"""Tell all (or specific) workers to grow the pool by ``n``.
Supports the same arguments as :meth:`broadcast`.
"""
return self.broadcast('pool_grow', {}, destination, **kwargs)
[文档] def pool_shrink(self, n=1, destination=None, **kwargs):
"""Tell all (or specific) workers to shrink the pool by ``n``.
Supports the same arguments as :meth:`broadcast`.
"""
return self.broadcast('pool_shrink', {}, destination, **kwargs)
[文档] def broadcast(self, command, arguments=None, destination=None,
connection=None, reply=False, timeout=1, limit=None,
callback=None, channel=None, **extra_kwargs):
"""Broadcast a control command to the celery workers.
:param command: Name of command to send.
:param arguments: Keyword arguments for the command.
:keyword destination: If set, a list of the hosts to send the
command to, when empty broadcast to all workers.
:keyword connection: Custom broker connection to use, if not set,
a connection will be established automatically.
:keyword reply: Wait for and return the reply.
:keyword timeout: Timeout in seconds to wait for the reply.
:keyword limit: Limit number of replies.
:keyword callback: Callback called immediately for each reply
received.
"""
with self.app.connection_or_acquire(connection) as conn:
arguments = dict(arguments or {}, **extra_kwargs)
return self.mailbox(conn)._broadcast(
command, arguments, destination, reply, timeout,
limit, callback, channel=channel,
)