你正在阅读 Celery 3.1 的文档。开发版本文档见: 此处.
# -*- coding: utf-8 -*-
"""
celery.apps.beat
~~~~~~~~~~~~~~~~
This module is the 'program-version' of :mod:`celery.beat`.
It does everything necessary to run that module
as an actual application, like installing signal handlers
and so on.
"""
from __future__ import absolute_import
import socket
import sys
from celery import VERSION_BANNER, platforms, beat
from celery.app import app_or_default
from celery.app.abstract import configurated, from_config
from celery.utils.imports import qualname
from celery.utils.log import LOG_LEVELS, get_logger
from celery.utils.timeutils import humanize_seconds
STARTUP_INFO_FMT = """
Configuration ->
. broker -> %(conninfo)s
. loader -> %(loader)s
. scheduler -> %(scheduler)s
%(scheduler_info)s
. logfile -> %(logfile)s@%(loglevel)s
. maxinterval -> %(hmax_interval)s (%(max_interval)ss)
""".strip()
logger = get_logger('celery.beat')
[文档]class Beat(configurated):
Service = beat.Service
app = None
loglevel = from_config('log_level')
logfile = from_config('log_file')
schedule = from_config('schedule_filename')
scheduler_cls = from_config('scheduler')
redirect_stdouts = from_config()
redirect_stdouts_level = from_config()
def __init__(self, max_interval=None, app=None,
socket_timeout=30, pidfile=None, no_color=None, **kwargs):
"""Starts the celerybeat task scheduler."""
self.app = app = app_or_default(app or self.app)
self.setup_defaults(kwargs, namespace='celerybeat')
self.max_interval = max_interval
self.socket_timeout = socket_timeout
self.no_color = no_color
self.colored = app.log.colored(
self.logfile,
enabled=not no_color if no_color is not None else no_color,
)
self.pidfile = pidfile
if not isinstance(self.loglevel, int):
self.loglevel = LOG_LEVELS[self.loglevel.upper()]
[文档] def run(self):
print(str(self.colored.cyan(
'celerybeat v%s is starting.' % VERSION_BANNER)))
self.init_loader()
self.set_process_title()
self.start_scheduler()
[文档] def setup_logging(self, colorize=None):
if colorize is None and self.no_color is not None:
colorize = not self.no_color
self.app.log.setup(self.loglevel, self.logfile,
self.redirect_stdouts, self.redirect_stdouts_level,
colorize=colorize)
[文档] def start_scheduler(self):
c = self.colored
if self.pidfile:
platforms.create_pidlock(self.pidfile)
beat = self.Service(app=self.app,
max_interval=self.max_interval,
scheduler_cls=self.scheduler_cls,
schedule_filename=self.schedule)
print(str(c.blue('__ ', c.magenta('-'),
c.blue(' ... __ '), c.magenta('-'),
c.blue(' _\n'),
c.reset(self.startup_info(beat)))))
self.setup_logging()
if self.socket_timeout:
logger.debug('Setting default socket timeout to %r',
self.socket_timeout)
socket.setdefaulttimeout(self.socket_timeout)
try:
self.install_sync_handler(beat)
beat.start()
except Exception, exc:
logger.critical('celerybeat raised exception %s: %r',
exc.__class__, exc,
exc_info=True)
[文档] def init_loader(self):
# Run the worker init handler.
# (Usually imports task modules and such.)
self.app.loader.init_worker()
self.app.finalize()
[文档] def startup_info(self, beat):
scheduler = beat.get_scheduler(lazy=True)
return STARTUP_INFO_FMT % {
'conninfo': self.app.connection().as_uri(),
'logfile': self.logfile or '[stderr]',
'loglevel': LOG_LEVELS[self.loglevel],
'loader': qualname(self.app.loader),
'scheduler': qualname(scheduler),
'scheduler_info': scheduler.info,
'hmax_interval': humanize_seconds(beat.max_interval),
'max_interval': beat.max_interval,
}
[文档] def set_process_title(self):
arg_start = 'manage' in sys.argv[0] and 2 or 1
platforms.set_process_title(
'celerybeat', info=' '.join(sys.argv[arg_start:]),
)
[文档] def install_sync_handler(self, beat):
"""Install a `SIGTERM` + `SIGINT` handler that saves
the celerybeat schedule."""
def _sync(signum, frame):
beat.sync()
raise SystemExit()
platforms.signals.update(SIGTERM=_sync, SIGINT=_sync)