Skip to content

Commit 75bb240

Browse files
authored
feat: support primary key(s) in read_gbq by using as the index_col by default (#625)
* feat: support primary key(s) in `read_gbq` by using as the `index_col` by default * revert WIP commit * address type error in tests
1 parent 70015b7 commit 75bb240

File tree

7 files changed

+68
-23
lines changed

7 files changed

+68
-23
lines changed

bigframes/session/__init__.py

+14-11
Original file line numberDiff line numberDiff line change
@@ -708,13 +708,15 @@ def _get_snapshot_sql_and_primary_key(
708708
f"Current session is in {self._location} but dataset '{table.project}.{table.dataset_id}' is located in {table.location}"
709709
)
710710

711-
# TODO(b/305264153): Use public properties to fetch primary keys once
712-
# added to google-cloud-bigquery.
713-
primary_keys = (
714-
table._properties.get("tableConstraints", {})
715-
.get("primaryKey", {})
716-
.get("columns")
717-
)
711+
primary_keys = None
712+
if (
713+
(table_constraints := getattr(table, "table_constraints", None)) is not None
714+
and (primary_key := table_constraints.primary_key) is not None
715+
# This will be False for either None or empty list.
716+
# We want primary_keys = None if no primary keys are set.
717+
and (columns := primary_key.columns)
718+
):
719+
primary_keys = columns
718720

719721
job_config = bigquery.QueryJobConfig()
720722
job_config.labels["bigframes-api"] = api_name
@@ -777,12 +779,13 @@ def _read_gbq_table(
777779
query, default_project=self.bqclient.project
778780
)
779781

780-
(
781-
table_expression,
782-
total_ordering_cols,
783-
) = self._get_snapshot_sql_and_primary_key(
782+
(table_expression, primary_keys,) = self._get_snapshot_sql_and_primary_key(
784783
table_ref, api_name=api_name, use_cache=use_cache
785784
)
785+
total_ordering_cols = primary_keys
786+
787+
if not index_col and primary_keys is not None:
788+
index_col = primary_keys
786789

787790
for key in columns:
788791
if key not in table_expression.columns:

setup.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
"gcsfs >=2023.3.0",
3939
"geopandas >=0.12.2",
4040
"google-auth >=2.15.0,<3.0dev",
41-
"google-cloud-bigquery[bqstorage,pandas] >=3.10.0",
41+
"google-cloud-bigquery[bqstorage,pandas] >=3.16.0",
4242
"google-cloud-functions >=1.12.0",
4343
"google-cloud-bigquery-connection >=1.12.0",
4444
"google-cloud-iam >=2.12.1",

testing/constraints-3.9.txt

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ fsspec==2023.3.0
44
gcsfs==2023.3.0
55
geopandas==0.12.2
66
google-auth==2.15.0
7-
google-cloud-bigquery==3.10.0
7+
google-cloud-bigquery==3.16.0
88
google-cloud-functions==1.12.0
99
google-cloud-bigquery-connection==1.12.0
1010
google-cloud-iam==2.12.1

tests/system/small/test_session.py

+6-7
Original file line numberDiff line numberDiff line change
@@ -236,14 +236,13 @@ def test_read_gbq_w_anonymous_query_results_table(session: bigframes.Session):
236236
def test_read_gbq_w_primary_keys_table(
237237
session: bigframes.Session, usa_names_grouped_table: bigquery.Table
238238
):
239+
# Validate that the table we're querying has a primary key.
239240
table = usa_names_grouped_table
240-
# TODO(b/305264153): Use public properties to fetch primary keys once
241-
# added to google-cloud-bigquery.
242-
primary_keys = (
243-
table._properties.get("tableConstraints", {})
244-
.get("primaryKey", {})
245-
.get("columns")
246-
)
241+
table_constraints = table.table_constraints
242+
assert table_constraints is not None
243+
primary_key = table_constraints.primary_key
244+
assert primary_key is not None
245+
primary_keys = primary_key.columns
247246
assert len(primary_keys) != 0
248247

249248
df = session.read_gbq(f"{table.project}.{table.dataset_id}.{table.table_id}")

tests/unit/resources.py

+4-3
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
# limitations under the License.
1414

1515
import datetime
16-
from typing import Dict, List, Optional
16+
from typing import Dict, List, Optional, Sequence
1717
import unittest.mock as mock
1818

1919
import google.auth.credentials
@@ -37,6 +37,7 @@
3737
def create_bigquery_session(
3838
bqclient: Optional[mock.Mock] = None,
3939
session_id: str = "abcxyz",
40+
table_schema: Sequence[google.cloud.bigquery.SchemaField] = TEST_SCHEMA,
4041
anonymous_dataset: Optional[google.cloud.bigquery.DatasetReference] = None,
4142
) -> bigframes.Session:
4243
credentials = mock.create_autospec(
@@ -51,7 +52,7 @@ def create_bigquery_session(
5152
table = mock.create_autospec(google.cloud.bigquery.Table, instance=True)
5253
table._properties = {}
5354
type(table).location = mock.PropertyMock(return_value="test-region")
54-
type(table).schema = mock.PropertyMock(return_value=TEST_SCHEMA)
55+
type(table).schema = mock.PropertyMock(return_value=table_schema)
5556
bqclient.get_table.return_value = table
5657

5758
if anonymous_dataset is None:
@@ -72,7 +73,7 @@ def query_mock(query, *args, **kwargs):
7273
if query.startswith("SELECT CURRENT_TIMESTAMP()"):
7374
query_job.result = mock.MagicMock(return_value=[[datetime.datetime.now()]])
7475
else:
75-
type(query_job).schema = mock.PropertyMock(return_value=TEST_SCHEMA)
76+
type(query_job).schema = mock.PropertyMock(return_value=table_schema)
7677

7778
return query_job
7879

tests/unit/session/test_session.py

+39
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,11 @@
1919

2020
import google.api_core.exceptions
2121
import google.cloud.bigquery
22+
import google.cloud.bigquery.table
2223
import pytest
2324

2425
import bigframes
26+
import bigframes.exceptions
2527

2628
from .. import resources
2729

@@ -50,6 +52,43 @@ def test_read_gbq_cached_table():
5052
assert "1999-01-02T03:04:05.678901" in df.sql
5153

5254

55+
def test_read_gbq_clustered_table_ok_default_index_with_primary_key():
56+
"""If a primary key is set on the table, we use that as the index column
57+
by default, no error should be raised in this case.
58+
59+
See internal issue 335727141.
60+
"""
61+
table = google.cloud.bigquery.Table("my-project.my_dataset.my_table")
62+
table.clustering_fields = ["col1", "col2"]
63+
table.schema = (
64+
google.cloud.bigquery.SchemaField("pk_1", "INT64"),
65+
google.cloud.bigquery.SchemaField("pk_2", "INT64"),
66+
google.cloud.bigquery.SchemaField("col_1", "INT64"),
67+
google.cloud.bigquery.SchemaField("col_2", "INT64"),
68+
)
69+
70+
# TODO(b/305264153): use setter for table_constraints in client library
71+
# when available.
72+
table._properties["tableConstraints"] = {
73+
"primaryKey": {
74+
"columns": ["pk_1", "pk_2"],
75+
},
76+
}
77+
bqclient = mock.create_autospec(google.cloud.bigquery.Client, instance=True)
78+
bqclient.project = "test-project"
79+
bqclient.get_table.return_value = table
80+
session = resources.create_bigquery_session(
81+
bqclient=bqclient, table_schema=table.schema
82+
)
83+
table._properties["location"] = session._location
84+
85+
df = session.read_gbq("my-project.my_dataset.my_table")
86+
87+
# There should be no analytic operators to prevent row filtering pushdown.
88+
assert "OVER" not in df.sql
89+
assert tuple(df.index.names) == ("pk_1", "pk_2")
90+
91+
5392
@pytest.mark.parametrize(
5493
"not_found_table_id",
5594
[("unknown.dataset.table"), ("project.unknown.table"), ("project.dataset.unknown")],

third_party/bigframes_vendored/pandas/io/gbq.py

+3
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,9 @@ def read_gbq(
109109
In tha case, will read all the matched table as one DataFrame.
110110
index_col (Iterable[str] or str):
111111
Name of result column(s) to use for index in results DataFrame.
112+
113+
**New in bigframes version 1.3.0**: If ``index_cols`` is not
114+
set, the primary key(s) of the table are used as the index.
112115
columns (Iterable[str]):
113116
List of BigQuery column names in the desired order for results
114117
DataFrame.

0 commit comments

Comments
 (0)