Πηγαίος κώδικας για το django.db.models.query
"""
The main QuerySet implementation. This provides the public API for the ORM.
"""
import copy
import operator
import warnings
from collections import OrderedDict, namedtuple
from functools import lru_cache
from itertools import chain
from django.conf import settings
from django.core import exceptions
from django.db import (
DJANGO_VERSION_PICKLE_KEY, IntegrityError, connections, router,
transaction,
)
from django.db.models import DateField, DateTimeField, sql
from django.db.models.constants import LOOKUP_SEP
from django.db.models.deletion import Collector
from django.db.models.expressions import Case, Expression, F, Value, When
from django.db.models.fields import AutoField
from django.db.models.functions import Cast, Trunc
from django.db.models.query_utils import FilteredRelation, InvalidQuery, Q
from django.db.models.sql.constants import CURSOR, GET_ITERATOR_CHUNK_SIZE
from django.db.utils import NotSupportedError
from django.utils import timezone
from django.utils.deprecation import RemovedInDjango30Warning
from django.utils.functional import cached_property, partition
from django.utils.version import get_version
# The maximum number of items to display in a QuerySet.__repr__
REPR_OUTPUT_SIZE = 20
# Pull into this namespace for backwards compatibility.
EmptyResultSet = sql.EmptyResultSet
class BaseIterable:
def __init__(self, queryset, chunked_fetch=False, chunk_size=GET_ITERATOR_CHUNK_SIZE):
self.queryset = queryset
self.chunked_fetch = chunked_fetch
self.chunk_size = chunk_size
class ModelIterable(BaseIterable):
"""Iterable that yields a model instance for each row."""
def __iter__(self):
queryset = self.queryset
db = queryset.db
compiler = queryset.query.get_compiler(using=db)
# Execute the query. This will also fill compiler.select, klass_info,
# and annotations.
results = compiler.execute_sql(chunked_fetch=self.chunked_fetch, chunk_size=self.chunk_size)
select, klass_info, annotation_col_map = (compiler.select, compiler.klass_info,
compiler.annotation_col_map)
model_cls = klass_info['model']
select_fields = klass_info['select_fields']
model_fields_start, model_fields_end = select_fields[0], select_fields[-1] + 1
init_list = [f[0].target.attname
for f in select[model_fields_start:model_fields_end]]
related_populators = get_related_populators(klass_info, select, db)
known_related_objects = [
(field, related_objs, operator.attrgetter(*[
field.attname
if from_field == 'self' else
queryset.model._meta.get_field(from_field).attname
for from_field in field.from_fields
])) for field, related_objs in queryset._known_related_objects.items()
]
for row in compiler.results_iter(results):
obj = model_cls.from_db(db, init_list, row[model_fields_start:model_fields_end])
for rel_populator in related_populators:
rel_populator.populate(row, obj)
if annotation_col_map:
for attr_name, col_pos in annotation_col_map.items():
setattr(obj, attr_name, row[col_pos])
# Add the known related objects to the model.
for field, rel_objs, rel_getter in known_related_objects:
# Avoid overwriting objects loaded by, e.g., select_related().
if field.is_cached(obj):
continue
rel_obj_id = rel_getter(obj)
try:
rel_obj = rel_objs[rel_obj_id]
except KeyError:
pass # May happen in qs1 | qs2 scenarios.
else:
setattr(obj, field.name, rel_obj)
yield obj
class ValuesIterable(BaseIterable):
"""
Iterable returned by QuerySet.values() that yields a dict for each row.
"""
def __iter__(self):
queryset = self.queryset
query = queryset.query
compiler = query.get_compiler(queryset.db)
# extra(select=...) cols are always at the start of the row.
names = [
*query.extra_select,
*query.values_select,
*query.annotation_select,
]
indexes = range(len(names))
for row in compiler.results_iter(chunked_fetch=self.chunked_fetch, chunk_size=self.chunk_size):
yield {names[i]: row[i] for i in indexes}
class ValuesListIterable(BaseIterable):
"""
Iterable returned by QuerySet.values_list(flat=False) that yields a tuple
for each row.
"""
def __iter__(self):
queryset = self.queryset
query = queryset.query
compiler = query.get_compiler(queryset.db)
if queryset._fields:
# extra(select=...) cols are always at the start of the row.
names = [
*query.extra_select,
*query.values_select,
*query.annotation_select,
]
fields = [*queryset._fields, *(f for f in query.annotation_select if f not in queryset._fields)]
if fields != names:
# Reorder according to fields.
index_map = {name: idx for idx, name in enumerate(names)}
rowfactory = operator.itemgetter(*[index_map[f] for f in fields])
return map(
rowfactory,
compiler.results_iter(chunked_fetch=self.chunked_fetch, chunk_size=self.chunk_size)
)
return compiler.results_iter(tuple_expected=True, chunked_fetch=self.chunked_fetch, chunk_size=self.chunk_size)
class NamedValuesListIterable(ValuesListIterable):
"""
Iterable returned by QuerySet.values_list(named=True) that yields a
namedtuple for each row.
"""
@staticmethod
@lru_cache()
def create_namedtuple_class(*names):
# Cache namedtuple() with @lru_cache() since it's too slow to be
# called for every QuerySet evaluation.
return namedtuple('Row', names)
def __iter__(self):
queryset = self.queryset
if queryset._fields:
names = queryset._fields
else:
query = queryset.query
names = [*query.extra_select, *query.values_select, *query.annotation_select]
tuple_class = self.create_namedtuple_class(*names)
new = tuple.__new__
for row in super().__iter__():
yield new(tuple_class, row)
class FlatValuesListIterable(BaseIterable):
"""
Iterable returned by QuerySet.values_list(flat=True) that yields single
values.
"""
def __iter__(self):
queryset = self.queryset
compiler = queryset.query.get_compiler(queryset.db)
for row in compiler.results_iter(chunked_fetch=self.chunked_fetch, chunk_size=self.chunk_size):
yield row[0]
[τεκμηρίωση]class QuerySet:
"""Represent a lazy database lookup for a set of objects."""
def __init__(self, model=None, query=None, using=None, hints=None):
self.model = model
self._db = using
self._hints = hints or {}
self.query = query or sql.Query(self.model)
self._result_cache = None
self._sticky_filter = False
self._for_write = False
self._prefetch_related_lookups = ()
self._prefetch_done = False
self._known_related_objects = {} # {rel_field: {pk: rel_obj}}
self._iterable_class = ModelIterable
self._fields = None
def as_manager(cls):
# Address the circular dependency between `Queryset` and `Manager`.
from django.db.models.manager import Manager
manager = Manager.from_queryset(cls)()
manager._built_with_as_manager = True
return manager
as_manager.queryset_only = True
as_manager = classmethod(as_manager)
########################
# PYTHON MAGIC METHODS #
########################
def __deepcopy__(self, memo):
"""Don't populate the QuerySet's cache."""
obj = self.__class__()
for k, v in self.__dict__.items():
if k == '_result_cache':
obj.__dict__[k] = None
else:
obj.__dict__[k] = copy.deepcopy(v, memo)
return obj
def __getstate__(self):
# Force the cache to be fully populated.
self._fetch_all()
return {**self.__dict__, DJANGO_VERSION_PICKLE_KEY: get_version()}
def __setstate__(self, state):
msg = None
pickled_version = state.get(DJANGO_VERSION_PICKLE_KEY)
if pickled_version:
current_version = get_version()
if current_version != pickled_version:
msg = (
"Pickled queryset instance's Django version %s does not "
"match the current version %s." % (pickled_version, current_version)
)
else:
msg = "Pickled queryset instance's Django version is not specified."
if msg:
warnings.warn(msg, RuntimeWarning, stacklevel=2)
self.__dict__.update(state)
def __repr__(self):
data = list(self[:REPR_OUTPUT_SIZE + 1])
if len(data) > REPR_OUTPUT_SIZE:
data[-1] = "...(remaining elements truncated)..."
return '<%s %r>' % (self.__class__.__name__, data)
def __len__(self):
self._fetch_all()
return len(self._result_cache)
def __iter__(self):
"""
The queryset iterator protocol uses three nested iterators in the
default case:
1. sql.compiler.execute_sql()
- Returns 100 rows at time (constants.GET_ITERATOR_CHUNK_SIZE)
using cursor.fetchmany(). This part is responsible for
doing some column masking, and returning the rows in chunks.
2. sql.compiler.results_iter()
- Returns one row at time. At this point the rows are still just
tuples. In some cases the return values are converted to
Python values at this location.
3. self.iterator()
- Responsible for turning the rows into model objects.
"""
self._fetch_all()
return iter(self._result_cache)
def __bool__(self):
self._fetch_all()
return bool(self._result_cache)
def __getitem__(self, k):
"""Retrieve an item or slice from the set of results."""
if not isinstance(k, (int, slice)):
raise TypeError
assert ((not isinstance(k, slice) and (k >= 0)) or
(isinstance(k, slice) and (k.start is None or k.start >= 0) and
(k.stop is None or k.stop >= 0))), \
"Negative indexing is not supported."
if self._result_cache is not None:
return self._result_cache[k]
if isinstance(k, slice):
qs = self._chain()
if k.start is not None:
start = int(k.start)
else:
start = None
if k.stop is not None:
stop = int(k.stop)
else:
stop = None
qs.query.set_limits(start, stop)
return list(qs)[::k.step] if k.step else qs
qs = self._chain()
qs.query.set_limits(k, k + 1)
qs._fetch_all()
return qs._result_cache[0]
def __and__(self, other):
self._merge_sanity_check(other)
if isinstance(other, EmptyQuerySet):
return other
if isinstance(self, EmptyQuerySet):
return self
combined = self._chain()
combined._merge_known_related_objects(other)
combined.query.combine(other.query, sql.AND)
return combined
def __or__(self, other):
self._merge_sanity_check(other)
if isinstance(self, EmptyQuerySet):
return other
if isinstance(other, EmptyQuerySet):
return self
query = self if self.query.can_filter() else self.model._base_manager.filter(pk__in=self.values('pk'))
combined = query._chain()
combined._merge_known_related_objects(other)
if not other.query.can_filter():
other = other.model._base_manager.filter(pk__in=other.values('pk'))
combined.query.combine(other.query, sql.OR)
return combined
####################################
# METHODS THAT DO DATABASE QUERIES #
####################################
def _iterator(self, use_chunked_fetch, chunk_size):
yield from self._iter