你正在阅读 Celery 3.1 的文档。开发版本文档见: 此处.
# -*- coding: utf-8 -*-
"""
celery.concurrency.base
~~~~~~~~~~~~~~~~~~~~~~~
TaskPool interface.
"""
from __future__ import absolute_import
import logging
import os
import time
from kombu.utils.encoding import safe_repr
from celery.utils import timer2
from celery.utils.log import get_logger
logger = get_logger('celery.concurrency')
[文档]def apply_target(target, args=(), kwargs={}, callback=None,
accept_callback=None, pid=None, **_):
if accept_callback:
accept_callback(pid or os.getpid(), time.time())
callback(target(*args, **kwargs))
[文档]class BasePool(object):
RUN = 0x1
CLOSE = 0x2
TERMINATE = 0x3
Timer = timer2.Timer
#: set to true if the pool can be shutdown from within
#: a signal handler.
signal_safe = True
#: set to true if pool supports rate limits.
#: (this is here for gevent, which currently does not implement
#: the necessary timers).
rlimit_safe = True
#: set to true if pool requires the use of a mediator
#: thread (e.g. if applying new items can block the current thread).
requires_mediator = False
#: set to true if pool uses greenlets.
is_green = False
_state = None
_pool = None
#: only used by multiprocessing pool
uses_semaphore = False
def __init__(self, limit=None, putlocks=True,
forking_enable=True, **options):
self.limit = limit
self.putlocks = putlocks
self.options = options
self.forking_enable = forking_enable
self._does_debug = logger.isEnabledFor(logging.DEBUG)
[文档] def terminate_job(self, pid):
raise NotImplementedError(
'%s does not implement kill_job' % (self.__class__, ))
[文档] def restart(self):
raise NotImplementedError(
'%s does not implement restart' % (self.__class__, ))
[文档] def apply_async(self, target, args=[], kwargs={}, **options):
"""Equivalent of the :func:`apply` built-in function.
Callbacks should optimally return as soon as possible since
otherwise the thread which handles the result will get blocked.
"""
if self._does_debug:
logger.debug('TaskPool: Apply %s (args:%s kwargs:%s)',
target, safe_repr(args), safe_repr(kwargs))
return self.on_apply(target, args, kwargs,
waitforslot=self.putlocks,
**options)
def _get_info(self):
return {}
@property
@property
@property
@property
@property
@property