你正在阅读 Celery 3.1 的文档。开发版本文档见: 此处.

celery.worker.state 源代码

# -*- coding: utf-8 -*-
"""
    celery.worker.state
    ~~~~~~~~~~~~~~~~~~~

    Internal worker state (global)

    This includes the currently active and reserved tasks,
    statistics, and revoked tasks.

"""
from __future__ import absolute_import

import os
import sys
import platform
import shelve

from collections import defaultdict

from kombu.utils import cached_property

from celery import __version__
from celery.datastructures import LimitedSet

#: Worker software/platform information.
SOFTWARE_INFO = {'sw_ident': 'py-celery',
                 'sw_ver': __version__,
                 'sw_sys': platform.system()}

#: maximum number of revokes to keep in memory.
REVOKES_MAX = 10000

#: how many seconds a revoke will be active before
#: being expired when the max limit has been exceeded.
REVOKE_EXPIRES = 3600

#: set of all reserved :class:`~celery.worker.job.Request`'s.
reserved_requests = set()

#: set of currently active :class:`~celery.worker.job.Request`'s.
active_requests = set()

#: count of tasks executed by the worker, sorted by type.
total_count = defaultdict(int)

#: the list of currently revoked tasks.  Persistent if statedb set.
revoked = LimitedSet(maxlen=REVOKES_MAX, expires=REVOKE_EXPIRES)

#: Updates global state when a task has been reserved.
task_reserved = reserved_requests.add

should_stop = False
should_terminate = False


[文档]def task_accepted(request): """Updates global state when a task has been accepted.""" active_requests.add(request) total_count[request.name] += 1
def task_ready(request): """Updates global state when a task is ready.""" active_requests.discard(request) reserved_requests.discard(request) C_BENCH = os.environ.get('C_BENCH') or os.environ.get('CELERY_BENCH') C_BENCH_EVERY = int(os.environ.get('C_BENCH_EVERY') or os.environ.get('CELERY_BENCH_EVERY') or 1000) if C_BENCH: # pragma: no cover import atexit from time import time from billiard import current_process from celery.utils.debug import memdump, sample_mem all_count = 0 bench_first = None bench_start = None bench_last = None bench_every = C_BENCH_EVERY bench_sample = [] __reserved = task_reserved __ready = task_ready if current_process()._name == 'MainProcess': @atexit.register def on_shutdown(): if bench_first is not None and bench_last is not None: print('- Time spent in benchmark: %r' % ( bench_last - bench_first)) print('- Avg: %s' % (sum(bench_sample) / len(bench_sample))) memdump()
[文档] def task_reserved(request): # noqa global bench_start global bench_first now = None if bench_start is None: bench_start = now = time() if bench_first is None: bench_first = now return __reserved(request)
[文档] def task_ready(request): # noqa global all_count global bench_start global bench_last all_count += 1 if not all_count % bench_every: now = time() diff = now - bench_start print('- Time spent processing %s tasks (since first ' 'task received): ~%.4fs\n' % (bench_every, diff)) sys.stdout.flush() bench_start = bench_last = now bench_sample.append(diff) sample_mem() return __ready(request)
[文档]class Persistent(object): """This is the persistent data stored by the worker when :option:`--statedb` is enabled. It currently only stores revoked task id's. """ storage = shelve _is_open = False def __init__(self, filename): self.filename = filename self._load()
[文档] def save(self): self.sync(self.db) self.db.sync() self.close()
[文档] def merge(self, d): revoked.update(d.get('revoked') or {}) return d
[文档] def sync(self, d): prev = d.get('revoked') or {} prev.update(revoked.as_dict()) d['revoked'] = prev return d
[文档] def open(self): return self.storage.open(self.filename, writeback=True)
[文档] def close(self): if self._is_open: self.db.close() self._is_open = False
def _load(self): self.merge(self.db) @cached_property
[文档] def db(self): self._is_open = True return self.open()