你正在阅读 Celery 3.1 的文档。开发版本文档见: 此处.
# -*- coding: utf-8 -*-
"""
celery.backends.database
~~~~~~~~~~~~~~~~~~~~~~~~
SQLAlchemy result store backend.
"""
from __future__ import absolute_import
from functools import wraps
from celery import states
from celery.exceptions import ImproperlyConfigured
from celery.utils.timeutils import maybe_timedelta
from celery.backends.base import BaseDictBackend
from .models import Task, TaskSet
from .session import ResultSession
def _sqlalchemy_installed():
try:
import sqlalchemy
except ImportError:
raise ImproperlyConfigured(
'The database result backend requires SQLAlchemy to be installed.'
'See http://pypi.python.org/pypi/SQLAlchemy')
return sqlalchemy
_sqlalchemy_installed()
from sqlalchemy.exc import DatabaseError, OperationalError
[文档]def retry(fun):
@wraps(fun)
def _inner(*args, **kwargs):
max_retries = kwargs.pop('max_retries', 3)
for retries in xrange(max_retries + 1):
try:
return fun(*args, **kwargs)
except (DatabaseError, OperationalError):
if retries + 1 > max_retries:
raise
return _inner
[文档]class DatabaseBackend(BaseDictBackend):
"""The database result backend."""
# ResultSet.iterate should sleep this much between each pool,
# to not bombard the database with queries.
subpolling_interval = 0.5
def __init__(self, dburi=None, expires=None,
engine_options=None, **kwargs):
super(DatabaseBackend, self).__init__(**kwargs)
conf = self.app.conf
self.expires = maybe_timedelta(self.prepare_expires(expires))
self.dburi = dburi or conf.CELERY_RESULT_DBURI
self.engine_options = dict(
engine_options or {},
**conf.CELERY_RESULT_ENGINE_OPTIONS or {})
self.short_lived_sessions = kwargs.get(
'short_lived_sessions',
conf.CELERY_RESULT_DB_SHORT_LIVED_SESSIONS,
)
if not self.dburi:
raise ImproperlyConfigured(
'Missing connection string! Do you have '
'CELERY_RESULT_DBURI set to a real value?')
[文档] def ResultSession(self):
return ResultSession(
dburi=self.dburi,
short_lived_sessions=self.short_lived_sessions,
**self.engine_options
)
@retry
def _store_result(self, task_id, result, status,
traceback=None, max_retries=3):
"""Store return value and status of an executed task."""
session = self.ResultSession()
try:
task = session.query(Task).filter(Task.task_id == task_id).first()
if not task:
task = Task(task_id)
session.add(task)
session.flush()
task.result = result
task.status = status
task.traceback = traceback
session.commit()
return result
finally:
session.close()
@retry
def _get_task_meta_for(self, task_id):
"""Get task metadata for a task by id."""
session = self.ResultSession()
try:
task = session.query(Task).filter(Task.task_id == task_id).first()
if task is None:
task = Task(task_id)
task.status = states.PENDING
task.result = None
return task.to_dict()
finally:
session.close()
@retry
def _save_group(self, group_id, result):
"""Store the result of an executed group."""
session = self.ResultSession()
try:
group = TaskSet(group_id, result)
session.add(group)
session.flush()
session.commit()
return result
finally:
session.close()
@retry
def _restore_group(self, group_id):
"""Get metadata for group by id."""
session = self.ResultSession()
try:
group = session.query(TaskSet).filter(
TaskSet.taskset_id == group_id).first()
if group:
return group.to_dict()
finally:
session.close()
@retry
def _delete_group(self, group_id):
"""Delete metadata for group by id."""
session = self.ResultSession()
try:
session.query(TaskSet).filter(
TaskSet.taskset_id == group_id).delete()
session.flush()
session.commit()
finally:
session.close()
@retry
def _forget(self, task_id):
"""Forget about result."""
session = self.ResultSession()
try:
session.query(Task).filter(Task.task_id == task_id).delete()
session.commit()
finally:
session.close()
[文档] def cleanup(self):
"""Delete expired metadata."""
session = self.ResultSession()
expires = self.expires
now = self.app.now()
try:
session.query(Task).filter(
Task.date_done < (now - expires)).delete()
session.query(TaskSet).filter(
TaskSet.date_done < (now - expires)).delete()
session.commit()
finally:
session.close()
def __reduce__(self, args=(), kwargs={}):
kwargs.update(
dict(dburi=self.dburi,
expires=self.expires,
engine_options=self.engine_options))
return super(DatabaseBackend, self).__reduce__(args, kwargs)