Skip to content

Commit 9386373

Browse files
authored
fix: read_gbq_table respects primary keys even when filters are set (#689)
* fix: `read_gbq_table` respects primary keys even when `filters` are set Closes internal issues 338039517 (primary key inconsistency), 338037499 (LIMIT for max_results), 340540991 (avoid running query immediately if time travel is supported), 337925142 (push down column filters to when we create the time travel subquery). feat: `read_gbq` suggests a correct column name when one is not found feat: `read_gbq_query` supports `filters` perf: use a `LIMIT` clause when `max_results` is set perf: don't run query immediately from `read_gbq_table` if `filters` is set
1 parent 412f28b commit 9386373

File tree

11 files changed

+364
-184
lines changed

11 files changed

+364
-184
lines changed

.pre-commit-config.yaml

+2-1
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ repos:
3535
hooks:
3636
- id: flake8
3737
- repo: https://github.com/pre-commit/mirrors-mypy
38-
rev: v1.1.1
38+
rev: v1.10.0
3939
hooks:
4040
- id: mypy
4141
additional_dependencies: [types-requests, types-tabulate, pandas-stubs]
42+
args: ["--check-untyped-defs", "--explicit-package-bases", '--exclude="^third_party"', "--ignore-missing-imports"]

bigframes/core/blocks.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -122,10 +122,10 @@ def __init__(
122122

123123
# If no index columns are set, create one.
124124
#
125-
# Note: get_index_cols_and_uniqueness in
125+
# Note: get_index_cols in
126126
# bigframes/session/_io/bigquery/read_gbq_table.py depends on this
127127
# being as sequential integer index column. If this default behavior
128-
# ever changes, please also update get_index_cols_and_uniqueness so
128+
# ever changes, please also update get_index_cols so
129129
# that users who explicitly request a sequential integer index can
130130
# still get one.
131131
if len(index_columns) == 0:

bigframes/core/sql.py

+3-5
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333

3434

3535
### Writing SQL Values (literals, column references, table references, etc.)
36-
def simple_literal(value: str | int | bool | float):
36+
def simple_literal(value: str | int | bool | float | datetime.datetime):
3737
"""Return quoted input string."""
3838
# https://cloud.google.com/bigquery/docs/reference/standard-sql/lexical#literals
3939
if isinstance(value, str):
@@ -50,6 +50,8 @@ def simple_literal(value: str | int | bool | float):
5050
if value == -math.inf:
5151
return 'CAST("-inf" as FLOAT)'
5252
return str(value)
53+
if isinstance(value, datetime.datetime):
54+
return f"TIMESTAMP('{value.isoformat()}')"
5355
else:
5456
raise ValueError(f"Cannot produce literal for {value}")
5557

@@ -156,7 +158,3 @@ def ordering_clause(
156158
part = f"`{ordering_expr.id}` {asc_desc} {null_clause}"
157159
parts.append(part)
158160
return f"ORDER BY {' ,'.join(parts)}"
159-
160-
161-
def snapshot_clause(time_travel_timestamp: datetime.datetime):
162-
return f"FOR SYSTEM_TIME AS OF TIMESTAMP({repr(time_travel_timestamp.isoformat())})"

bigframes/pandas/__init__.py

+2
Original file line numberDiff line numberDiff line change
@@ -549,6 +549,7 @@ def read_gbq_query(
549549
max_results: Optional[int] = None,
550550
use_cache: Optional[bool] = None,
551551
col_order: Iterable[str] = (),
552+
filters: vendored_pandas_gbq.FiltersType = (),
552553
) -> bigframes.dataframe.DataFrame:
553554
_set_default_session_location_if_possible(query)
554555
return global_session.with_default_session(
@@ -560,6 +561,7 @@ def read_gbq_query(
560561
max_results=max_results,
561562
use_cache=use_cache,
562563
col_order=col_order,
564+
filters=filters,
563565
)
564566

565567

bigframes/session/__init__.py

+111-66
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
import ibis
6363
import ibis.backends.bigquery as ibis_bigquery
6464
import ibis.expr.types as ibis_types
65+
import jellyfish
6566
import numpy as np
6667
import pandas
6768
from pandas._typing import (
@@ -339,19 +340,6 @@ def read_gbq(
339340
elif col_order:
340341
columns = col_order
341342

342-
filters = list(filters)
343-
if len(filters) != 0 or bf_io_bigquery.is_table_with_wildcard_suffix(
344-
query_or_table
345-
):
346-
# TODO(b/338111344): This appears to be missing index_cols, which
347-
# are necessary to be selected.
348-
# TODO(b/338039517): Refactor this to be called inside both
349-
# _read_gbq_query and _read_gbq_table (after detecting primary keys)
350-
# so we can make sure index_col/index_cols reflects primary keys.
351-
query_or_table = bf_io_bigquery.to_query(
352-
query_or_table, _to_index_cols(index_col), columns, filters
353-
)
354-
355343
if bf_io_bigquery.is_query(query_or_table):
356344
return self._read_gbq_query(
357345
query_or_table,
@@ -361,6 +349,7 @@ def read_gbq(
361349
max_results=max_results,
362350
api_name="read_gbq",
363351
use_cache=use_cache,
352+
filters=filters,
364353
)
365354
else:
366355
if configuration is not None:
@@ -377,6 +366,7 @@ def read_gbq(
377366
max_results=max_results,
378367
api_name="read_gbq",
379368
use_cache=use_cache if use_cache is not None else True,
369+
filters=filters,
380370
)
381371

382372
def _query_to_destination(
@@ -451,6 +441,7 @@ def read_gbq_query(
451441
max_results: Optional[int] = None,
452442
use_cache: Optional[bool] = None,
453443
col_order: Iterable[str] = (),
444+
filters: third_party_pandas_gbq.FiltersType = (),
454445
) -> dataframe.DataFrame:
455446
"""Turn a SQL query into a DataFrame.
456447
@@ -517,6 +508,7 @@ def read_gbq_query(
517508
max_results=max_results,
518509
api_name="read_gbq_query",
519510
use_cache=use_cache,
511+
filters=filters,
520512
)
521513

522514
def _read_gbq_query(
@@ -529,6 +521,7 @@ def _read_gbq_query(
529521
max_results: Optional[int] = None,
530522
api_name: str = "read_gbq_query",
531523
use_cache: Optional[bool] = None,
524+
filters: third_party_pandas_gbq.FiltersType = (),
532525
) -> dataframe.DataFrame:
533526
import bigframes.dataframe as dataframe
534527

@@ -557,6 +550,21 @@ def _read_gbq_query(
557550

558551
index_cols = _to_index_cols(index_col)
559552

553+
filters = list(filters)
554+
if len(filters) != 0 or max_results is not None:
555+
# TODO(b/338111344): If we are running a query anyway, we might as
556+
# well generate ROW_NUMBER() at the same time.
557+
query = bf_io_bigquery.to_query(
558+
query,
559+
index_cols,
560+
columns,
561+
filters,
562+
max_results=max_results,
563+
# We're executing the query, so we don't need time travel for
564+
# determinism.
565+
time_travel_timestamp=None,
566+
)
567+
560568
destination, query_job = self._query_to_destination(
561569
query,
562570
index_cols,
@@ -580,12 +588,14 @@ def _read_gbq_query(
580588
session=self,
581589
)
582590

583-
return self.read_gbq_table(
591+
return self._read_gbq_table(
584592
f"{destination.project}.{destination.dataset_id}.{destination.table_id}",
585593
index_col=index_col,
586594
columns=columns,
587-
max_results=max_results,
588595
use_cache=configuration["query"]["useQueryCache"],
596+
api_name=api_name,
597+
# max_results and filters are omitted because they are already
598+
# handled by to_query(), above.
589599
)
590600

591601
def read_gbq_table(
@@ -621,31 +631,14 @@ def read_gbq_table(
621631
elif col_order:
622632
columns = col_order
623633

624-
filters = list(filters)
625-
if len(filters) != 0 or bf_io_bigquery.is_table_with_wildcard_suffix(query):
626-
# TODO(b/338039517): Refactor this to be called inside both
627-
# _read_gbq_query and _read_gbq_table (after detecting primary keys)
628-
# so we can make sure index_col/index_cols reflects primary keys.
629-
query = bf_io_bigquery.to_query(
630-
query, _to_index_cols(index_col), columns, filters
631-
)
632-
633-
return self._read_gbq_query(
634-
query,
635-
index_col=index_col,
636-
columns=columns,
637-
max_results=max_results,
638-
api_name="read_gbq_table",
639-
use_cache=use_cache,
640-
)
641-
642634
return self._read_gbq_table(
643635
query=query,
644636
index_col=index_col,
645637
columns=columns,
646638
max_results=max_results,
647639
api_name="read_gbq_table",
648640
use_cache=use_cache,
641+
filters=filters,
649642
)
650643

651644
def _read_gbq_table(
@@ -657,6 +650,7 @@ def _read_gbq_table(
657650
max_results: Optional[int] = None,
658651
api_name: str,
659652
use_cache: bool = True,
653+
filters: third_party_pandas_gbq.FiltersType = (),
660654
) -> dataframe.DataFrame:
661655
import bigframes.dataframe as dataframe
662656

@@ -673,6 +667,9 @@ def _read_gbq_table(
673667
query, default_project=self.bqclient.project
674668
)
675669

670+
columns = list(columns)
671+
filters = list(filters)
672+
676673
# ---------------------------------
677674
# Fetch table metadata and validate
678675
# ---------------------------------
@@ -684,62 +681,110 @@ def _read_gbq_table(
684681
cache=self._df_snapshot,
685682
use_cache=use_cache,
686683
)
684+
table_column_names = {field.name for field in table.schema}
687685

688686
if table.location.casefold() != self._location.casefold():
689687
raise ValueError(
690688
f"Current session is in {self._location} but dataset '{table.project}.{table.dataset_id}' is located in {table.location}"
691689
)
692690

693-
# -----------------------------------------
694-
# Create Ibis table expression and validate
695-
# -----------------------------------------
696-
697-
# Use a time travel to make sure the DataFrame is deterministic, even
698-
# if the underlying table changes.
699-
table_expression = bf_read_gbq_table.get_ibis_time_travel_table(
700-
self.ibis_client,
701-
table_ref,
702-
time_travel_timestamp,
703-
)
704-
705691
for key in columns:
706-
if key not in table_expression.columns:
692+
if key not in table_column_names:
693+
possibility = min(
694+
table_column_names,
695+
key=lambda item: jellyfish.levenshtein_distance(key, item),
696+
)
707697
raise ValueError(
708-
f"Column '{key}' of `columns` not found in this table."
698+
f"Column '{key}' of `columns` not found in this table. Did you mean '{possibility}'?"
709699
)
710700

711-
# ---------------------------------------
712-
# Create a non-default index and validate
713-
# ---------------------------------------
714-
715-
# TODO(b/337925142): Move index_cols creation to before we create the
716-
# Ibis table expression so we don't have a "SELECT *" subquery in the
717-
# query that checks for index uniqueness.
718-
719-
index_cols, is_index_unique = bf_read_gbq_table.get_index_cols_and_uniqueness(
720-
bqclient=self.bqclient,
721-
ibis_client=self.ibis_client,
701+
# Converting index_col into a list of column names requires
702+
# the table metadata because we might use the primary keys
703+
# when constructing the index.
704+
index_cols = bf_read_gbq_table.get_index_cols(
722705
table=table,
723-
table_expression=table_expression,
724706
index_col=index_col,
725-
api_name=api_name,
726707
)
727708

728709
for key in index_cols:
729-
if key not in table_expression.columns:
710+
if key not in table_column_names:
711+
possibility = min(
712+
table_column_names,
713+
key=lambda item: jellyfish.levenshtein_distance(key, item),
714+
)
730715
raise ValueError(
731-
f"Column `{key}` of `index_col` not found in this table."
716+
f"Column '{key}' of `index_col` not found in this table. Did you mean '{possibility}'?"
732717
)
733718

734-
# TODO(b/337925142): We should push down column filters when we get the time
735-
# travel table to avoid "SELECT *" subqueries.
736-
if columns:
737-
table_expression = table_expression.select([*index_cols, *columns])
719+
# -----------------------------
720+
# Optionally, execute the query
721+
# -----------------------------
722+
723+
# max_results introduces non-determinism and limits the cost on
724+
# clustered tables, so fallback to a query. We do this here so that
725+
# the index is consistent with tables that have primary keys, even
726+
# when max_results is set.
727+
# TODO(b/338419730): We don't need to fallback to a query for wildcard
728+
# tables if we allow some non-determinism when time travel isn't supported.
729+
if max_results is not None or bf_io_bigquery.is_table_with_wildcard_suffix(
730+
query
731+
):
732+
# TODO(b/338111344): If we are running a query anyway, we might as
733+
# well generate ROW_NUMBER() at the same time.
734+
query = bf_io_bigquery.to_query(
735+
query,
736+
index_cols=index_cols,
737+
columns=columns,
738+
filters=filters,
739+
max_results=max_results,
740+
# We're executing the query, so we don't need time travel for
741+
# determinism.
742+
time_travel_timestamp=None,
743+
)
744+
745+
return self._read_gbq_query(
746+
query,
747+
index_col=index_cols,
748+
columns=columns,
749+
api_name="read_gbq_table",
750+
use_cache=use_cache,
751+
)
752+
753+
# -----------------------------------------
754+
# Create Ibis table expression and validate
755+
# -----------------------------------------
756+
757+
# Use a time travel to make sure the DataFrame is deterministic, even
758+
# if the underlying table changes.
759+
# TODO(b/340540991): If a dry run query fails with time travel but
760+
# succeeds without it, omit the time travel clause and raise a warning
761+
# about potential non-determinism if the underlying tables are modified.
762+
table_expression = bf_read_gbq_table.get_ibis_time_travel_table(
763+
ibis_client=self.ibis_client,
764+
table_ref=table_ref,
765+
index_cols=index_cols,
766+
columns=columns,
767+
filters=filters,
768+
time_travel_timestamp=time_travel_timestamp,
769+
)
738770

739771
# ----------------------------
740772
# Create ordering and validate
741773
# ----------------------------
742774

775+
# TODO(b/337925142): Generate a new subquery with just the index_cols
776+
# in the Ibis table expression so we don't have a "SELECT *" subquery
777+
# in the query that checks for index uniqueness.
778+
# TODO(b/338065601): Provide a way to assume uniqueness and avoid this
779+
# check.
780+
is_index_unique = bf_read_gbq_table.are_index_cols_unique(
781+
bqclient=self.bqclient,
782+
ibis_client=self.ibis_client,
783+
table=table,
784+
index_cols=index_cols,
785+
api_name=api_name,
786+
)
787+
743788
if is_index_unique:
744789
array_value = bf_read_gbq_table.to_array_value_with_total_ordering(
745790
session=self,

0 commit comments

Comments
 (0)