你正在阅读 Celery 3.1 的文档。开发版本文档见: 此处.

celery.worker.consumer

celery.worker.consumer

This module contains the components responsible for consuming messages from the broker, processing the messages and keeping the broker connections up and running.

class celery.worker.consumer.Consumer(on_task_request, init_callback=<function noop at 0xb08df7c>, hostname=None, pool=None, app=None, timer=None, controller=None, hub=None, amqheartbeat=None, worker_options=None, disable_rate_limits=False, initial_prefetch_count=2, prefetch_multiplier=1, **kwargs)[源代码]
class Blueprint(steps=None, name=None, app=None, on_start=None, on_close=None, on_stopped=None)
default_steps = ['celery.worker.consumer:Connection', 'celery.worker.consumer:Mingle', 'celery.worker.consumer:Events', 'celery.worker.consumer:Gossip', 'celery.worker.consumer:Heart', 'celery.worker.consumer:Control', 'celery.worker.consumer:Tasks', 'celery.worker.consumer:Evloop', 'celery.worker.consumer:Agent']
name = 'Consumer'
shutdown(parent)
Consumer.Strategies

dict 的别名

Consumer.add_task_queue(queue, exchange=None, exchange_type=None, routing_key=None, **options)[源代码]
Consumer.apply_eta_task(task)[源代码]

Method called by the timer to apply a task with an ETA/countdown.

Consumer.bucket_for_task(type)
Consumer.cancel_task_queue(queue)[源代码]
Consumer.connect()

Establish the broker connection.

Will retry establishing the connection if the BROKER_CONNECTION_RETRY setting is enabled

Consumer.create_task_handler()
Consumer.in_shutdown = False

set when consumer is shutting down.

Consumer.init_callback = None

Optional callback called the first time the worker is ready to receive tasks.

Consumer.loop_args()
Consumer.on_close()
Consumer.on_decode_error(message, exc)[源代码]

Callback called if an error occurs while decoding a message received.

Simply logs the error and acknowledges the message so it doesn’t enter a loop.

参数:
  • message – The message with errors.
  • exc – The original exception instance.
Consumer.on_invalid_task(body, message, exc)
Consumer.on_ready()
Consumer.on_unknown_message(body, message)
Consumer.on_unknown_task(body, message, exc)
Consumer.pool = None

The current worker pool instance.

Consumer.register_with_event_loop(hub)
Consumer.reset_rate_limits()
Consumer.restart_count = -1
Consumer.shutdown()
Consumer.start()[源代码]
Consumer.stop()[源代码]
Consumer.timer = None

A timer used for high-priority internal tasks, such as sending heartbeats.

Consumer.update_strategies()[源代码]
class celery.worker.consumer.Connection(c, **kwargs)
info(c, params='N/A')
name = u'celery.worker.consumer.Connection'
requires = ()
shutdown(c)
start(c)
class celery.worker.consumer.Events(c, send_events=None, **kwargs)
name = u'celery.worker.consumer.Events'
requires = (step:celery.worker.consumer.Connection{()},)
shutdown(c)
start(c)
stop(c)
class celery.worker.consumer.Heart(c, without_heartbeat=False, **kwargs)
name = u'celery.worker.consumer.Heart'
requires = (step:celery.worker.consumer.Events{(step:celery.worker.consumer.Connection{()},)},)
shutdown(c)
start(c)
stop(c)
class celery.worker.consumer.Control(c, **kwargs)
include_if(c)
name = u'celery.worker.consumer.Control'
requires = (step:celery.worker.consumer.Mingle{(step:celery.worker.consumer.Events{(step:celery.worker.consumer.Connection{()},)},)},)
class celery.worker.consumer.Tasks(c, **kwargs)
info(c)
name = u'celery.worker.consumer.Tasks'
requires = (step:celery.worker.consumer.Events{(step:celery.worker.consumer.Connection{()},)},)
shutdown(c)
start(c)
stop(c)
class celery.worker.consumer.Evloop(parent, **kwargs)
label = 'event loop'
last = True
name = u'celery.worker.consumer.Evloop'
patch_all(c)
requires = ()
start(c)
class celery.worker.consumer.Agent(c, **kwargs)
conditional = True
create(c)
name = u'celery.worker.consumer.Agent'
requires = (step:celery.worker.consumer.Connection{()},)
class celery.worker.consumer.Mingle(c, without_mingle=False, **kwargs)
compatible_transport(app)
compatible_transports = set(['redis', 'amqp'])
label = 'Mingle'
name = u'celery.worker.consumer.Mingle'
requires = (step:celery.worker.consumer.Events{(step:celery.worker.consumer.Connection{()},)},)
start(c)
class celery.worker.consumer.Gossip(c, without_gossip=False, interval=5.0, **kwargs)
call_task(task)
compatible_transport(app)
compatible_transports = set(['redis', 'amqp'])
election(id, topic, action=None)
get_consumers(channel)
label = 'Gossip'
name = u'celery.worker.consumer.Gossip'
on_elect(event)
on_elect_ack(event)
on_message(prepare, message)
on_node_join(worker)
on_node_leave(worker)
on_node_lost(worker)
periodic()
register_timer()
requires = (step:celery.worker.consumer.Mingle{(step:celery.worker.consumer.Events{(step:celery.worker.consumer.Connection{()},)},)},)
start(c)
celery.worker.consumer.dump_body(m, body)[源代码]

上一个主题

celery.worker

下一个主题

celery.worker.job

本页