你正在阅读 Celery 3.1 的文档。开发版本文档见: 此处.

celery.backends.database.session 源代码

# -*- coding: utf-8 -*-
"""
    celery.backends.database.session
    ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

    SQLAlchemy sessions.

"""
from __future__ import absolute_import

from collections import defaultdict

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base

ResultModelBase = declarative_base()

_SETUP = defaultdict(lambda: False)
_ENGINES = {}
_SESSIONS = {}


[文档]def get_engine(dburi, **kwargs): if dburi not in _ENGINES: _ENGINES[dburi] = create_engine(dburi, **kwargs) return _ENGINES[dburi]
[文档]def create_session(dburi, short_lived_sessions=False, **kwargs): engine = get_engine(dburi, **kwargs) if short_lived_sessions or dburi not in _SESSIONS: _SESSIONS[dburi] = sessionmaker(bind=engine) return engine, _SESSIONS[dburi]
[文档]def setup_results(engine): if not _SETUP['results']: ResultModelBase.metadata.create_all(engine) _SETUP['results'] = True
[文档]def ResultSession(dburi, **kwargs): engine, session = create_session(dburi, **kwargs) setup_results(engine) return session()