Source code for gino.strategies

import asyncio

from sqlalchemy.engine import url
from sqlalchemy import util
from sqlalchemy.engine.strategies import EngineStrategy

from .engine import GinoEngine


[docs]class GinoStrategy(EngineStrategy): name = 'gino' engine_cls = GinoEngine async def create(self, name_or_url, loop=None, **kwargs): engine_cls = self.engine_cls u = url.make_url(name_or_url) if loop is None: loop = asyncio.get_event_loop() if u.drivername in {'postgresql', 'postgres'}: u.drivername = 'postgresql+asyncpg' dialect_cls = u.get_dialect() pop_kwarg = kwargs.pop dialect_args = {} # consume dialect arguments from kwargs for k in util.get_cls_kwargs(dialect_cls).union( getattr(dialect_cls, 'init_kwargs', set())): if k in kwargs: dialect_args[k] = pop_kwarg(k) kwargs.pop('module', None) # unused dbapi_args = {} for k in util.get_func_kwargs(dialect_cls.dbapi): if k in kwargs: dbapi_args[k] = pop_kwarg(k) dbapi = dialect_cls.dbapi(**dbapi_args) dialect_args['dbapi'] = dbapi dialect = dialect_cls(**dialect_args) pool_class = kwargs.pop('pool_class', None) pool = await dialect.init_pool(u, loop, pool_class=pool_class) engine_args = dict(loop=loop) for k in util.get_cls_kwargs(engine_cls): if k in kwargs: engine_args[k] = pop_kwarg(k) # all kwargs should be consumed if kwargs: raise TypeError( "Invalid argument(s) %s sent to create_engine(), " "using configuration %s/%s. Please check that the " "keyword arguments are appropriate for this combination " "of components." % (','.join("'%s'" % k for k in kwargs), dialect_cls.__name__, engine_cls.__name__)) engine = engine_cls(dialect, pool, **engine_args) dialect_cls.engine_created(engine) return engine
GinoStrategy()