你正在阅读 Celery 3.1 的文档。开发版本文档见: 此处.

celery.utils.timer2 源代码

# -*- coding: utf-8 -*-
"""
    timer2
    ~~~~~~

    Scheduler for Python functions.

"""
from __future__ import absolute_import
from __future__ import with_statement

import atexit
import heapq
import os
import sys
import threading

from datetime import datetime
from functools import wraps
from itertools import count
from time import time, sleep

from celery.utils.compat import THREAD_TIMEOUT_MAX
from celery.utils.timeutils import timedelta_seconds, timezone
from kombu.log import get_logger

VERSION = (1, 0, 0)
__version__ = '.'.join(map(str, VERSION))
__author__ = 'Ask Solem'
__contact__ = 'ask@celeryproject.org'
__homepage__ = 'http://github.com/ask/timer2/'
__docformat__ = 'restructuredtext'

DEFAULT_MAX_INTERVAL = 2
TIMER_DEBUG = os.environ.get('TIMER_DEBUG')
EPOCH = datetime.utcfromtimestamp(0).replace(tzinfo=timezone.utc)

logger = get_logger('timer2')


[文档]class Entry(object): cancelled = False def __init__(self, fun, args=None, kwargs=None): self.fun = fun self.args = args or [] self.kwargs = kwargs or {} self.tref = self def __call__(self): return self.fun(*self.args, **self.kwargs)
[文档] def cancel(self): self.tref.cancelled = True
def __repr__(self): return '<TimerEntry: %s(*%r, **%r)' % ( self.fun.__name__, self.args, self.kwargs) if sys.version_info[0] == 3: # pragma: no cover def __hash__(self): return hash('|'.join(map(repr, (self.fun, self.args, self.kwargs)))) def __lt__(self, other): return hash(self) < hash(other) def __gt__(self, other): return hash(self) > hash(other) def __eq__(self, other): return hash(self) == hash(other)
[文档]def to_timestamp(d, default_timezone=timezone.utc): if isinstance(d, datetime): if d.tzinfo is None: d = d.replace(tzinfo=default_timezone) return timedelta_seconds(d - EPOCH) return d
[文档]class Schedule(object): """ETA scheduler.""" Entry = Entry on_error = None def __init__(self, max_interval=None, on_error=None, **kwargs): self.max_interval = float(max_interval or DEFAULT_MAX_INTERVAL) self.on_error = on_error or self.on_error self._queue = []
[文档] def apply_entry(self, entry): try: entry() except Exception, exc: if not self.handle_error(exc): logger.error('Error in timer: %r', exc, exc_info=True)
[文档] def handle_error(self, exc_info): if self.on_error: self.on_error(exc_info) return True
[文档] def stop(self): pass
[文档] def enter(self, entry, eta=None, priority=0): """Enter function into the scheduler. :param entry: Item to enter. :keyword eta: Scheduled time as a :class:`datetime.datetime` object. :keyword priority: Unused. """ if eta is None: eta = time() if isinstance(eta, datetime): try: eta = to_timestamp(eta) except Exception, exc: if not self.handle_error(exc): raise return return self._enter(eta, priority, entry)
def _enter(self, eta, priority, entry): heapq.heappush(self._queue, (eta, priority, entry)) return entry
[文档] def apply_at(self, eta, fun, args=(), kwargs={}, priority=0): return self.enter(self.Entry(fun, args, kwargs), eta, priority)
[文档] def enter_after(self, msecs, entry, priority=0): return self.enter(entry, time() + (msecs / 1000.0), priority)
[文档] def apply_after(self, msecs, fun, args=(), kwargs={}, priority=0): return self.enter_after(msecs, self.Entry(fun, args, kwargs), priority)
[文档] def apply_interval(self, msecs, fun, args=(), kwargs={}, priority=0): tref = self.Entry(fun, args, kwargs) secs = msecs * 1000.0 @wraps(fun) def _reschedules(*args, **kwargs): last, now = tref._last_run, time() lsince = (now - tref._last_run) * 1000.0 if last else msecs try: if lsince and lsince >= msecs: tref._last_run = now return fun(*args, **kwargs) finally: if not tref.cancelled: last = tref._last_run next = secs - (now - last) if last else secs self.enter_after(next / 1000.0, tref, priority) tref.fun = _reschedules tref._last_run = None return self.enter_after(msecs, tref, priority)
@property
[文档] def schedule(self): return self
def __iter__(self): """The iterator yields the time to sleep for between runs.""" # localize variable access nowfun = time pop = heapq.heappop max_interval = self.max_interval queue = self._queue while 1: if queue: eta, priority, entry = verify = queue[0] now = nowfun() if now < eta: yield min(eta - now, max_interval), None else: event = pop(queue) if event is verify: if not entry.cancelled: yield None, entry continue else: heapq.heappush(queue, event) else: yield None, None
[文档] def empty(self): """Is the schedule empty?""" return not self._queue
[文档] def clear(self): self._queue[:] = [] # used because we can't replace the object # and the operation is atomic.
[文档] def info(self): return ({'eta': eta, 'priority': priority, 'item': item} for eta, priority, item in self.queue)
[文档] def cancel(self, tref): tref.cancel()
@property
[文档] def queue(self): events = list(self._queue) return map(heapq.heappop, [events] * len(events))
[文档]class Timer(threading.Thread): Entry = Entry Schedule = Schedule running = False on_tick = None _timer_count = count(1).next if TIMER_DEBUG: # pragma: no cover def start(self, *args, **kwargs): import traceback print('- Timer starting') traceback.print_stack() super(Timer, self).start(*args, **kwargs) def __init__(self, schedule=None, on_error=None, on_tick=None, max_interval=None, **kwargs): self.schedule = schedule or self.Schedule(on_error=on_error, max_interval=max_interval) self.on_tick = on_tick or self.on_tick threading.Thread.__init__(self) self._is_shutdown = threading.Event() self._is_stopped = threading.Event() self.mutex = threading.Lock() self.not_empty = threading.Condition(self.mutex) self.setDaemon(True) self.setName('Timer-%s' % (self._timer_count(), )) def _next_entry(self): with self.not_empty: delay, entry = self.scheduler.next() if entry is None: if delay is None: self.not_empty.wait(1.0) return delay return self.schedule.apply_entry(entry) __next__ = next = _next_entry # for 2to3
[文档] def run(self): try: self.running = True self.scheduler = iter(self.schedule) while not self._is_shutdown.isSet(): delay = self._next_entry() if delay: if self.on_tick: self.on_tick(delay) if sleep is None: # pragma: no cover break sleep(delay) try: self._is_stopped.set() except TypeError: # pragma: no cover # we lost the race at interpreter shutdown, # so gc collected built-in modules. pass except Exception, exc: logger.error('Thread Timer crashed: %r', exc, exc_info=True) os._exit(1)
[文档] def stop(self): if self.running: self._is_shutdown.set() self._is_stopped.wait() self.join(THREAD_TIMEOUT_MAX) self.running = False
[文档] def ensure_started(self): if not self.running and not self.isAlive(): self.start()
def _do_enter(self, meth, *args, **kwargs): self.ensure_started() with self.mutex: entry = getattr(self.schedule, meth)(*args, **kwargs) self.not_empty.notify() return entry
[文档] def enter(self, entry, eta, priority=None): return self._do_enter('enter', entry, eta, priority=priority)
[文档] def apply_at(self, *args, **kwargs): return self._do_enter('apply_at', *args, **kwargs)
[文档] def enter_after(self, *args, **kwargs): return self._do_enter('enter_after', *args, **kwargs)
[文档] def apply_after(self, *args, **kwargs): return self._do_enter('apply_after', *args, **kwargs)
[文档] def apply_interval(self, *args, **kwargs): return self._do_enter('apply_interval', *args, **kwargs)
[文档] def exit_after(self, msecs, priority=10): self.apply_after(msecs, sys.exit, priority)
[文档] def cancel(self, tref): tref.cancel()
[文档] def clear(self): self.schedule.clear()
[文档] def empty(self): return self.schedule.empty()
@property
[文档] def queue(self): return self.schedule.queue
default_timer = _default_timer = Timer() apply_after = _default_timer.apply_after apply_at = _default_timer.apply_at apply_interval = _default_timer.apply_interval enter_after = _default_timer.enter_after enter = _default_timer.enter exit_after = _default_timer.exit_after cancel = _default_timer.cancel clear = _default_timer.clear atexit.register(_default_timer.stop)