import inspect
import itertools
import weakref
import sqlalchemy as sa
from sqlalchemy.sql import ClauseElement
from . import json_support
from .declarative import Model, InvertDict
from .exceptions import NoSuchRowError
from .loader import AliasLoader, ModelLoader
DEFAULT = object()
class _Create:
def __get__(self, instance, owner):
if instance is None:
# noinspection PyProtectedMember
return owner._create_without_instance
else:
# noinspection PyProtectedMember
return instance._create
class _Query:
def __get__(self, instance, owner):
# noinspection PyProtectedMember
owner._check_abstract()
q = sa.select([owner.__table__])
if instance is not None:
q = q.where(instance.lookup())
return q.execution_options(model=weakref.ref(owner))
class _Select:
def __get__(self, instance, owner):
def select(*args):
q = sa.select([getattr(owner, x) for x in args])
if instance is not None:
q = q.where(instance.lookup())
return q.execution_options(model=weakref.ref(owner),
return_model=False)
return select
class _Update:
def __get__(self, instance, owner):
if instance is None:
# noinspection PyProtectedMember
owner._check_abstract()
q = owner.__table__.update()
return q.execution_options(model=weakref.ref(owner))
else:
# noinspection PyProtectedMember
return instance._update
class _Delete:
def __get__(self, instance, owner):
if instance is None:
# noinspection PyProtectedMember
owner._check_abstract()
q = owner.__table__.delete()
return q.execution_options(model=weakref.ref(owner))
else:
# noinspection PyProtectedMember
return instance._delete
[文档]class UpdateRequest:
"""
A collection of attributes and their new values to update on one model
instance.
:class:`.UpdateRequest` instances are created by :attr:`.CRUDModel.update`,
don't instantiate manually unless required. Every :class:`.UpdateRequest`
instance is bound to one model instance, all updates are for that one
specific model instance and its database row.
"""
def __init__(self, instance: 'CRUDModel'):
self._instance = instance
self._values = {}
self._props = {}
self._literal = True
self._locator = None
if instance.__table__ is not None:
try:
self._locator = instance.lookup()
except LookupError:
# apply() will fail anyway, but still allow update()
pass
def _set(self, key, value):
self._values[key] = value
def _set_prop(self, prop, value):
if isinstance(value, ClauseElement):
self._literal = False
self._props[prop] = value
async def apply(self, bind=None, timeout=DEFAULT):
"""
Apply pending updates into database by executing an ``UPDATE`` SQL.
:param bind: A :class:`~gino.engine.GinoEngine` to execute the SQL, or
``None`` (default) to use the bound engine in the metadata.
:param timeout: Seconds to wait for the database to finish executing,
``None`` for wait forever. By default it will use the ``timeout``
execution option value if unspecified.
:return: ``self`` for chaining calls.
"""
if self._locator is None:
raise TypeError(
'Model {} has no table, primary key or custom lookup()'.format(
self._instance.__class__.__name__))
cls = type(self._instance)
values = self._values.copy()
# handle JSON columns
json_updates = {}
for prop, value in self._props.items():
value = prop.save(self._instance, value)
updates = json_updates.setdefault(prop.prop_name, {})
if self._literal:
updates[prop.name] = value
else:
if isinstance(value, int):
value = sa.cast(value, sa.BigInteger)
elif not isinstance(value, ClauseElement):
value = sa.cast(value, sa.Unicode)
updates[sa.cast(prop.name, sa.Unicode)] = value
for prop_name, updates in json_updates.items():
prop = getattr(cls, prop_name)
from .dialects.asyncpg import JSONB
if isinstance(prop.type, JSONB):
if self._literal:
values[prop_name] = prop.concat(updates)
else:
values[prop_name] = prop.concat(
sa.func.jsonb_build_object(
*itertools.chain(*updates.items())))
else:
raise TypeError('{} is not supported to update json '
'properties in Gino. Please consider using '
'JSONB.'.format(prop.type))
opts = dict(return_model=False)
if timeout is not DEFAULT:
opts['timeout'] = timeout
clause = type(self._instance).update.where(
self._locator,
).values(
**self._instance._get_sa_values(values),
).returning(
*[getattr(cls, key) for key in values],
).execution_options(**opts)
if bind is None:
bind = cls.__metadata__.bind
row = await bind.first(clause)
if not row:
raise NoSuchRowError()
for k, v in row.items():
self._instance.__values__[
self._instance._column_name_map.invert_get(k)] = v
for prop in self._props:
prop.reload(self._instance)
return self
[文档] def update(self, **values):
"""
Set given attributes on the bound model instance, and add them into
the update collections for :meth:`.apply`.
Given keyword-only arguments are pairs of attribute names and values to
update. This is not a coroutine, calling :meth:`.update` will have
instant effect on the bound model instance - its in-memory values will
be updated immediately. Therefore this can be used individually as a
shortcut to update several attributes in a batch::
user.update(age=32, disabled=True)
:meth:`.update` returns ``self`` for chaining calls to either
:meth:`.apply` or another :meth:`.update`. If one attribute is updated
several times by the same :class:`.UpdateRequest`, then only the last
value is remembered for :meth:`.apply`.
Updated values can be SQLAlchemy expressions, for example an atomic
increment for user balance looks like this::
await user.update(balance=User.balance + 100).apply()
.. note::
Expression values will not affect the in-memory attribute value on
:meth:`.update` before :meth:`.apply`, because it has no knowledge
of the latest value in the database. After :meth:`.apply` the new
value will be automatically reloaded from database with
``RETURNING`` clause.
"""
cls = type(self._instance)
for key, value in values.items():
prop = cls.__dict__.get(key)
if isinstance(prop, json_support.JSONProperty):
value_from = '__profile__'
method = self._set_prop
k = prop
else:
value_from = '__values__'
method = self._set
k = key
if not isinstance(value, ClauseElement):
setattr(self._instance, key, value)
value = getattr(self._instance, value_from)[key]
method(k, value)
return self
[文档]class Alias:
"""
Experimental proxy for table alias on model.
"""
def __init__(self, model, *args, **kwargs):
# noinspection PyProtectedMember
model._check_abstract()
self.model = model
self.alias = model.__table__.alias(*args, **kwargs)
def __getattr__(self, item):
rv = getattr(self.alias.columns, item,
getattr(self.alias, item,
getattr(self.model, item, DEFAULT)))
if rv is DEFAULT:
raise AttributeError
return rv
def __iter__(self):
return iter(self.alias.columns)
def __call__(self, *args, **kwargs):
return self.model(*args, **kwargs)
[文档] def load(self, *column_names, **relationships):
return AliasLoader(self, *column_names, **relationships)
[文档] def on(self, on_clause):
return self.load().on(on_clause)
# noinspection PyProtectedMember
@sa.inspection._inspects(Alias)
def _inspect_alias(target):
return sa.inspection.inspect(target.alias)
[文档]class CRUDModel(Model):
"""
The base class for models with CRUD support.
Don't inherit from this class directly, because it has no metadata. Use
:attr:`db.Model <gino.api.Gino.Model>` instead.
"""
create = _Create()
"""
This ``create`` behaves a bit different on model classes compared to model
instances.
On model classes, ``create`` will create a new model instance and insert
it into database. On model instances, ``create`` will just insert the
instance into the database.
Under the hood :meth:`.create` uses ``INSERT ... RETURNING ...`` to
create the new model instance and load it with database default data if
not specified.
Some examples::
user1 = await User.create(name='fantix', age=32)
user2 = User(name='gino', age=42)
await user2.create()
:param bind: A :class:`~gino.engine.GinoEngine` to execute the
``INSERT`` statement with, or ``None`` (default) to use the bound
engine on the metadata (:class:`~gino.api.Gino`).
:param timeout: Seconds to wait for the database to finish executing,
``None`` for wait forever. By default it will use the ``timeout``
execution option value if unspecified.
:param values: Keyword arguments are pairs of attribute names and their
initial values. Only available when called on a model class.
:return: The instance of this model class (newly created or existing).
"""
query = _Query()
"""
Get a SQLAlchemy query clause of the table behind this model. This equals
to :func:`sqlalchemy.select([self.__table__])
<sqlalchemy.sql.expression.select>`. If this attribute is retrieved on a
model instance, then a where clause to locate this instance by its primary
key is appended to the returning query clause. This model type is set as
the execution option ``model`` in the returning clause, so by default the
query yields instances of this model instead of database rows.
"""
update = _Update()
"""
This ``update`` behaves quite different on model classes rather than model
instances.
On model classes, ``update`` is an attribute of type
:class:`~sqlalchemy.sql.expression.Update` for massive updates, for
example::
await User.update.values(enabled=True).where(...).gino.status()
Like :attr:`.query`, the update query also has the ``model`` execution
option of this model, so if you use the
:meth:`~sqlalchemy.sql.expression.Update.returning` clause, the query shall
return model objects.
However on model instances, ``update()`` is a method which accepts keyword
arguments only and returns an :class:`.UpdateRequest` to update this single
model instance. The keyword arguments are pairs of attribute names and new
values. This is the same as :meth:`.UpdateRequest.update`, feel free to
read more about it. A normal usage example would be like this::
await user.update(name='new name', age=32).apply()
Here, the :meth:`await ... apply() <.UpdateRequest.apply>` executes the
actual ``UPDATE`` SQL in the database, while ``user.update()`` only makes
changes in the memory, and collect all changes into an
:class:`.UpdateRequest` instance.
"""
delete = _Delete()
"""
Similar to :meth:`.update`, this ``delete`` is also different on model
classes than on model instances.
On model classes ``delete`` is an attribute of type
:class:`~sqlalchemy.sql.expression.Delete` for massive deletes, for
example::
await User.delete.where(User.enabled.is_(False)).gino.status()
Similarly you can add a :meth:`~sqlalchemy.sql.expression.Delete.returning`
clause to the query and it shall return the deleted rows as model objects.
And on model instances, ``delete()`` is a method to remove the
corresponding row in the database of this model instance. and returns the
status returned from the database::
print(await user.delete()) # e.g. prints DELETE 1
.. note::
``delete()`` only removes the row from database, it does not affect the
current model instance.
:param bind: An optional :class:`~gino.engine.GinoEngine` if current
metadata (:class:`~gino.api.Gino`) has no bound engine, or specifying a
different :class:`~gino.engine.GinoEngine` to execute the ``DELETE``.
:param timeout: Seconds to wait for the database to finish executing,
``None`` for wait forever. By default it will use the ``timeout``
execution option value if unspecified.
"""
select = _Select()
"""
Build a query to retrieve only specified columns from this table.
This method accepts positional string arguments as names of attributes to
retrieve, and returns a :class:`~sqlalchemy.sql.expression.Select` for
query. The returning query object is always set with two execution options:
1. ``model`` is set to this model type
2. ``return_model`` is set to ``False``
So that by default it always return rows instead of model instances, while
column types can be inferred correctly by the ``model`` option.
For example::
async for row in User.select('id', 'name').gino.iterate():
print(row['id'], row['name'])
If :meth:`.select` is invoked on a model instance, then a ``WHERE`` clause
to locate this instance by its primary key is appended to the returning
query clause. This is useful when you want to retrieve a latest value of a
field on current model instance from database::
db_age = await user.select('age').gino.scalar()
.. seealso::
:meth:`~gino.engine.GinoConnection.execution_options`
"""
_update_request_cls = UpdateRequest
_column_name_map = InvertDict()
def __init__(self, **values):
super().__init__()
self.__profile__ = None
self._update_request_cls(self).update(**values)
@classmethod
def _init_table(cls, sub_cls):
rv = Model._init_table(sub_cls)
for each_cls in sub_cls.__mro__[::-1]:
for k, v in each_cls.__dict__.items():
if isinstance(v, json_support.JSONProperty):
if not hasattr(sub_cls, v.prop_name):
raise AttributeError(
'Requires "{}" JSON[B] column.'.format(
v.prop_name))
v.name = k
if rv is not None:
rv.__model__ = weakref.ref(sub_cls)
return rv
@classmethod
async def _create_without_instance(cls, bind=None, timeout=DEFAULT,
**values):
return await cls(**values)._create(bind=bind, timeout=timeout)
async def _create(self, bind=None, timeout=DEFAULT):
# handle JSON properties
cls = type(self)
# noinspection PyUnresolvedReferences,PyProtectedMember
cls._check_abstract()
profile_keys = set(self.__profile__.keys() if self.__profile__ else [])
for key in profile_keys:
cls.__dict__.get(key).save(self)
# initialize default values
for key, prop in cls.__dict__.items():
if key in profile_keys:
continue
if isinstance(prop, json_support.JSONProperty):
if prop.default is None or prop.after_get.method is not None:
continue
setattr(self, key, getattr(self, key))
prop.save(self)
# insert into database
opts = dict(return_model=False, model=cls)
if timeout is not DEFAULT:
opts['timeout'] = timeout
# noinspection PyArgumentList
q = cls.__table__.insert().values(
**self._get_sa_values(self.__values__)
).returning(
*cls
).execution_options(**opts)
if bind is None:
bind = cls.__metadata__.bind
row = await bind.first(q)
for k, v in row.items():
self.__values__[self._column_name_map.invert_get(k)] = v
self.__profile__ = None
return self
def _get_sa_values(self, instance_values: dict) -> dict:
values = {}
for k, v in instance_values.items():
values[self._column_name_map[k]] = v
return values
[文档] @classmethod
async def get(cls, ident, bind=None, timeout=DEFAULT):
"""
Get an instance of this model class by primary key.
For example::
user = await User.get(request.args.get('user_id'))
:param ident: Value of the primary key. For composite primary keys this
should be a tuple of values for all keys in database order, or a dict
of names (or position numbers in database order starting from zero)
of all primary keys to their values.
:param bind: A :class:`~gino.engine.GinoEngine` to execute the
``INSERT`` statement with, or ``None`` (default) to use the bound
engine on the metadata (:class:`~gino.api.Gino`).
:param timeout: Seconds to wait for the database to finish executing,
``None`` for wait forever. By default it will use the ``timeout``
execution option value if unspecified.
:return: An instance of this model class, or ``None`` if no such row.
"""
# noinspection PyUnresolvedReferences,PyProtectedMember
cls._check_abstract()
if not isinstance(ident, (list, tuple, dict)):
ident_ = [ident]
else:
ident_ = ident
columns = cls.__table__.primary_key.columns
if len(ident_) != len(columns):
raise ValueError(
'Incorrect number of values as primary key: '
'expected {}, got {}.'.format(
len(columns), len(ident_)))
clause = cls.query
for i, c in enumerate(columns):
try:
val = ident_[i]
except KeyError:
val = ident_[c.name]
clause = clause.where(c == val)
if timeout is not DEFAULT:
clause = clause.execution_options(timeout=timeout)
if bind is None:
bind = cls.__metadata__.bind
return await bind.first(clause)
[文档] def append_where_primary_key(self, q):
"""
Append where clause to locate this model instance by primary on the
given query, and return the new query.
This is mostly used internally in GINO, but also available for such
usage::
await user.append_where_primary_key(User.query).gino.first()
which is identical to::
await user.query.gino.first()
.. deprecated:: 0.7.6
Use :meth:`.lookup` instead.
"""
return q.where(self.lookup()) # pragma: no cover
[文档] def lookup(self):
"""
Generate where-clause expression to locate this model instance.
By default this method uses current values of all primary keys, and you
can override it to behave differently. Most instance-level CRUD
operations depend on this method internally. Particularly while
:meth:`.lookup` is called in :meth:`.update`, the where condition is
used in :meth:`.UpdateRequest.apply`, so that queries like ``UPDATE ...
SET id = NEW WHERE id = OLD`` could work correctly.
:return:
.. versionadded:: 0.7.6
"""
exps = []
for c in self.__table__.primary_key.columns:
exps.append(c == getattr(self, c.name))
if exps:
return sa.and_(*exps)
else:
raise LookupError('Instance-level CRUD operations not allowed on '
'models without primary keys or lookup(), please'
' use model-level CRUD operations instead.')
def _update(self, **values):
return self._update_request_cls(self).update(**values)
async def _delete(self, bind=None, timeout=DEFAULT):
cls = type(self)
# noinspection PyUnresolvedReferences,PyProtectedMember
cls._check_abstract()
clause = cls.delete.where(self.lookup())
if timeout is not DEFAULT:
clause = clause.execution_options(timeout=timeout)
if bind is None:
bind = self.__metadata__.bind
return (await bind.status(clause))[0]
[文档] def to_dict(self):
"""
Convenient method to generate a dict from this model instance.
Keys will be attribute names, while values are loaded from memory (not
from database). If there are :class:`~gino.json_support.JSONProperty`
attributes in this model, their source JSON field will not be included
in the returning dict - instead the JSON attributes will be.
.. seealso::
:mod:`.json_support`
"""
cls = type(self)
# noinspection PyTypeChecker
keys = set(cls._column_name_map.invert_get(c.name) for c in cls)
for key, prop in cls.__dict__.items():
if isinstance(prop, json_support.JSONProperty):
keys.add(key)
keys.discard(prop.prop_name)
return dict((k, getattr(self, k)) for k in keys)
[文档] @classmethod
def load(cls, *column_names, **relationships):
"""
Populates a :class:`.loader.Loader` instance to be used by the
``loader`` execution option in order to customize the loading behavior
to load specified fields into instances of this model.
The basic usage of this method is to provide the ``loader`` execution
option (if you are looking for reloading
the instance from database, check :meth:`.get` or :attr:`.query`) for a
given query.
This method takes both positional arguments and keyword arguments with
very different meanings. The positional arguments should be column
names as strings, specifying only these columns should be loaded into
the model instance (other values are discarded even if they are
retrieved from database). Meanwhile, the keyword arguments should be
loaders for instance attributes. For example::
u = await User.query.gino.load(User.load('id', 'name')).first()
.. tip::
``gino.load`` is a shortcut for setting the execution option
``loader``.
This will populate a ``User`` instance with only ``id`` and ``name``
values, all the rest are simply ``None`` even if the query actually
returned all the column values.
::
q = User.join(Team).select()
u = await q.gino.load(User.load(team=Team)).first()
This will load two instances of model ``User`` and ``Team``, returning
the ``User`` instance with ``u.team`` set to the ``Team`` instance.
Both positional and keyword arguments can be used ath the same time. If
they are both omitted, like ``Team.load()``, it is equivalent to just
``Team`` as a loader.
Additionally, a :class:`.loader.Loader` instance can also be used to
generate queries, as its structure is usually the same as the query::
u = await User.load(team=Team).query.gino.first()
This generates a query like this::
SELECT users.xxx, ..., teams.xxx, ...
FROM users LEFT JOIN teams
ON ...
The :class:`~.loader.Loader` delegates attributes on the ``query``, so
``.query`` can be omitted. The ``LEFT JOIN`` is built-in behavior,
while the ``ON`` clause is generated based on foreign key. If there is
no foreign key, or the condition should be customized, you can use
this::
u = await User.load(
team=Team.on(User.team_id == Team.id)).gino.first()
And you can use both :meth:`~.load` and :meth:`~.on` at the same time
in a chain, in whatever order suits you.
.. seealso::
:meth:`~gino.engine.GinoConnection.execution_options`
"""
return ModelLoader(cls, *column_names, **relationships)
[文档] @classmethod
def on(cls, on_clause):
"""
Customize the on-clause for the auto-generated outer join query.
.. note::
This has no effect when provided as the ``loader`` execution option
for a given query.
.. seealso::
:meth:`.load`
"""
return cls.load().on(on_clause)
[文档] @classmethod
def distinct(cls, *columns):
"""
Experimental loader feature to yield only distinct instances by given
columns.
"""
return cls.load().distinct(*columns)
[文档] @classmethod
def none_as_none(cls, enabled=True):
return cls.load().none_as_none(enabled)
[文档] @classmethod
def alias(cls, *args, **kwargs):
"""
Experimental proxy for table alias on model.
"""
return Alias(cls, *args, **kwargs)
[文档] @classmethod
def in_query(cls, query):
"""
Convenient method to get a Model object when using subqueries.
Though with filters and aggregations, subqueries often return same
columns as the original table, but SQLAlchemy could not recognize them
as the columns are in subqueries, so technically they're columns in the
new "table".
With this method, the columns are loaded into the origintal models when
being used in subquries. For example::
query = query.alias('users')
MyUser = User.in_query(query)
loader = MyUser.distinct(User1.id).load()
users = await query.gino.load(loader).all()
"""
return _get_query_model(cls, query)
def _get_query_model(model, query):
return QueryModel(model.__name__, (), dict(_model=model, _query=query))
[文档]class QueryModel(type):
"""
Metaclass of Model classes used for subqueries.
"""
def __getattr__(self, item):
rv = getattr(self._query.columns, item,
getattr(self._model.__table__.columns, item,
getattr(self._model, item, DEFAULT)))
# replace `cls` in classmethod in models to `self`
if inspect.ismethod(rv) and inspect.isclass(rv.__self__):
return lambda *args, **kwargs: rv.__func__(self, *args, **kwargs)
if rv is DEFAULT:
raise AttributeError
return rv
def __iter__(self):
return iter(self._query.columns)
def __call__(self, *args, **kwargs):
return self._model(*args, **kwargs)