Skip to content

Commit e1817c9

Browse files
authored
feat: to_gbq without a destination table writes to a temporary table (#158)
* feat: `to_gbq` without a destination table writes to a temporary table * add unit test covering happy path for to_gbq * update to_gbq docs
1 parent b9cb55c commit e1817c9

File tree

9 files changed

+225
-20
lines changed

9 files changed

+225
-20
lines changed

bigframes/constants.py

+4
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
import datetime
16+
1517
"""Constants used across BigQuery DataFrames.
1618
1719
This module should not depend on any others in the package.
@@ -23,3 +25,5 @@
2325
)
2426

2527
ABSTRACT_METHOD_ERROR_MESSAGE = f"Abstract method. You have likely encountered a bug. Please share this stacktrace and how you reached it with the BigQuery DataFrames team. {FEEDBACK_LINK}"
28+
29+
DEFAULT_EXPIRATION = datetime.timedelta(days=1)

bigframes/dataframe.py

+37-10
Original file line numberDiff line numberDiff line change
@@ -2289,25 +2289,51 @@ def to_json(
22892289

22902290
def to_gbq(
22912291
self,
2292-
destination_table: str,
2292+
destination_table: Optional[str] = None,
22932293
*,
2294-
if_exists: Optional[Literal["fail", "replace", "append"]] = "fail",
2294+
if_exists: Optional[Literal["fail", "replace", "append"]] = None,
22952295
index: bool = True,
22962296
ordering_id: Optional[str] = None,
2297-
) -> None:
2298-
if "." not in destination_table:
2299-
raise ValueError(
2300-
"Invalid Table Name. Should be of the form 'datasetId.tableId' or "
2301-
"'projectId.datasetId.tableId'"
2302-
)
2303-
2297+
) -> str:
23042298
dispositions = {
23052299
"fail": bigquery.WriteDisposition.WRITE_EMPTY,
23062300
"replace": bigquery.WriteDisposition.WRITE_TRUNCATE,
23072301
"append": bigquery.WriteDisposition.WRITE_APPEND,
23082302
}
2303+
2304+
if destination_table is None:
2305+
# TODO(swast): If there have been no modifications to the DataFrame
2306+
# since the last time it was written (cached), then return that.
2307+
# For `read_gbq` nodes, return the underlying table clone.
2308+
destination_table = bigframes.session._io.bigquery.create_temp_table(
2309+
self._session.bqclient,
2310+
self._session._anonymous_dataset,
2311+
# TODO(swast): allow custom expiration times, probably via session configuration.
2312+
constants.DEFAULT_EXPIRATION,
2313+
)
2314+
2315+
if if_exists is not None and if_exists != "replace":
2316+
raise ValueError(
2317+
f"Got invalid value {repr(if_exists)} for if_exists. "
2318+
"When no destination table is specified, a new table is always created. "
2319+
"None or 'replace' are the only valid options in this case."
2320+
)
2321+
if_exists = "replace"
2322+
2323+
if "." not in destination_table:
2324+
raise ValueError(
2325+
f"Got invalid value for destination_table {repr(destination_table)}. "
2326+
"Should be of the form 'datasetId.tableId' or 'projectId.datasetId.tableId'."
2327+
)
2328+
2329+
if if_exists is None:
2330+
if_exists = "fail"
2331+
23092332
if if_exists not in dispositions:
2310-
raise ValueError("'{0}' is not valid for if_exists".format(if_exists))
2333+
raise ValueError(
2334+
f"Got invalid value {repr(if_exists)} for if_exists. "
2335+
f"Valid options include None or one of {dispositions.keys()}."
2336+
)
23112337

23122338
job_config = bigquery.QueryJobConfig(
23132339
write_disposition=dispositions[if_exists],
@@ -2318,6 +2344,7 @@ def to_gbq(
23182344
)
23192345

23202346
self._run_io_query(index=index, ordering_id=ordering_id, job_config=job_config)
2347+
return destination_table
23212348

23222349
def to_numpy(
23232350
self, dtype=None, copy=False, na_value=None, **kwargs

bigframes/session/__init__.py

+11
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,17 @@ def _create_and_bind_bq_session(self):
223223
query_job.result() # blocks until finished
224224
self._session_id = query_job.session_info.session_id
225225

226+
# The anonymous dataset is used by BigQuery to write query results and
227+
# session tables. BigQuery DataFrames also writes temp tables directly
228+
# to the dataset, no BigQuery Session required. Note: there is a
229+
# different anonymous dataset per location. See:
230+
# https://cloud.google.com/bigquery/docs/cached-results#how_cached_results_are_stored
231+
query_destination = query_job.destination
232+
self._anonymous_dataset = bigquery.DatasetReference(
233+
query_destination.project,
234+
query_destination.dataset_id,
235+
)
236+
226237
self.bqclient.default_query_job_config = bigquery.QueryJobConfig(
227238
connection_properties=[
228239
bigquery.ConnectionProperty("session_id", self._session_id)

bigframes/session/_io/bigquery.py

+20
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,12 @@
1818
import textwrap
1919
import types
2020
from typing import Dict, Iterable, Union
21+
import uuid
2122

2223
import google.cloud.bigquery as bigquery
2324

2425
IO_ORDERING_ID = "bqdf_row_nums"
26+
TEMP_TABLE_PREFIX = "bqdf{date}_{random_id}"
2527

2628

2729
def create_export_csv_statement(
@@ -90,6 +92,24 @@ def create_snapshot_sql(
9092
)
9193

9294

95+
def create_temp_table(
96+
bqclient: bigquery.Client,
97+
dataset: bigquery.DatasetReference,
98+
expiration: datetime.timedelta,
99+
) -> str:
100+
"""Create an empty table with an expiration in the desired dataset."""
101+
now = datetime.datetime.now(datetime.timezone.utc)
102+
random_id = uuid.uuid4().hex
103+
table_id = TEMP_TABLE_PREFIX.format(
104+
date=now.strftime("%Y%m%d"), random_id=random_id
105+
)
106+
table_ref = dataset.table(table_id)
107+
destination = bigquery.Table(table_ref)
108+
destination.expires = now + expiration
109+
bqclient.create_table(destination)
110+
return f"{table_ref.project}.{table_ref.dataset_id}.{table_ref.table_id}"
111+
112+
93113
# BigQuery REST API returns types in Legacy SQL format
94114
# https://cloud.google.com/bigquery/docs/data-types but we use Standard SQL
95115
# names

tests/unit/resources.py

+33-1
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,21 @@
1919
import google.cloud.bigquery
2020
import ibis
2121
import pandas
22+
import pytest
2223

2324
import bigframes
2425
import bigframes.core as core
2526
import bigframes.core.ordering
27+
import bigframes.dataframe
2628
import bigframes.session.clients
2729

2830
"""Utilities for creating test resources."""
2931

3032

3133
def create_bigquery_session(
32-
bqclient: Optional[google.cloud.bigquery.Client] = None, session_id: str = "abcxyz"
34+
bqclient: Optional[mock.Mock] = None,
35+
session_id: str = "abcxyz",
36+
anonymous_dataset: Optional[google.cloud.bigquery.DatasetReference] = None,
3337
) -> bigframes.Session:
3438
credentials = mock.create_autospec(
3539
google.auth.credentials.Credentials, instance=True
@@ -39,6 +43,21 @@ def create_bigquery_session(
3943
bqclient = mock.create_autospec(google.cloud.bigquery.Client, instance=True)
4044
bqclient.project = "test-project"
4145

46+
if anonymous_dataset is None:
47+
anonymous_dataset = google.cloud.bigquery.DatasetReference(
48+
"test-project",
49+
"test_dataset",
50+
)
51+
52+
query_job = mock.create_autospec(google.cloud.bigquery.QueryJob)
53+
type(query_job).destination = mock.PropertyMock(
54+
return_value=anonymous_dataset.table("test_table"),
55+
)
56+
type(query_job).session_info = google.cloud.bigquery.SessionInfo(
57+
{"sessionInfo": {"sessionId": session_id}},
58+
)
59+
bqclient.query.return_value = query_job
60+
4261
clients_provider = mock.create_autospec(bigframes.session.clients.ClientsProvider)
4362
type(clients_provider).bqclient = mock.PropertyMock(return_value=bqclient)
4463
clients_provider._credentials = credentials
@@ -51,6 +70,19 @@ def create_bigquery_session(
5170
return session
5271

5372

73+
def create_dataframe(
74+
monkeypatch: pytest.MonkeyPatch, session: Optional[bigframes.Session] = None
75+
) -> bigframes.dataframe.DataFrame:
76+
if session is None:
77+
session = create_bigquery_session()
78+
79+
# Since this may create a ReadLocalNode, the session we explicitly pass in
80+
# might not actually be used. Mock out the global session, too.
81+
monkeypatch.setattr(bigframes.core.global_session, "_global_session", session)
82+
bigframes.options.bigquery._session_started = True
83+
return bigframes.dataframe.DataFrame({}, session=session)
84+
85+
5486
def create_pandas_session(tables: Dict[str, pandas.DataFrame]) -> bigframes.Session:
5587
# TODO(tswast): Refactor to make helper available for all tests. Consider
5688
# providing a proper "local Session" for use by downstream developers.

tests/unit/session/test_io_bigquery.py

+25-1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
import datetime
1616
from typing import Iterable
17+
import unittest.mock as mock
1718

1819
import google.cloud.bigquery as bigquery
1920
import pytest
@@ -37,7 +38,7 @@ def test_create_snapshot_sql_doesnt_timetravel_anonymous_datasets():
3738
assert "`my-test-project`.`_e8166e0cdb`.`anonbb92cd`" in sql
3839

3940

40-
def test_create_snapshot_sql_doesnt_timetravel_session_datasets():
41+
def test_create_snapshot_sql_doesnt_timetravel_session_tables():
4142
table_ref = bigquery.TableReference.from_string("my-test-project._session.abcdefg")
4243

4344
sql = bigframes.session._io.bigquery.create_snapshot_sql(
@@ -51,6 +52,29 @@ def test_create_snapshot_sql_doesnt_timetravel_session_datasets():
5152
assert "my-test-project" not in sql
5253

5354

55+
def test_create_temp_table_default_expiration():
56+
"""Make sure the created table has an expiration."""
57+
bqclient = mock.create_autospec(bigquery.Client)
58+
dataset = bigquery.DatasetReference("test-project", "test_dataset")
59+
now = datetime.datetime.now(datetime.timezone.utc)
60+
expiration = datetime.timedelta(days=3)
61+
expected_expires = now + expiration
62+
63+
bigframes.session._io.bigquery.create_temp_table(bqclient, dataset, expiration)
64+
65+
bqclient.create_table.assert_called_once()
66+
call_args = bqclient.create_table.call_args
67+
table = call_args.args[0]
68+
assert table.project == "test-project"
69+
assert table.dataset_id == "test_dataset"
70+
assert table.table_id.startswith("bqdf")
71+
assert (
72+
(expected_expires - datetime.timedelta(minutes=1))
73+
< table.expires
74+
< (expected_expires + datetime.timedelta(minutes=1))
75+
)
76+
77+
5478
@pytest.mark.parametrize(
5579
("schema", "expected"),
5680
(

tests/unit/test_dataframe.py

+59
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
# Copyright 2023 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import google.cloud.bigquery
16+
import pytest
17+
18+
from . import resources
19+
20+
21+
def test_dataframe_to_gbq_invalid_destination(monkeypatch: pytest.MonkeyPatch):
22+
dataframe = resources.create_dataframe(monkeypatch)
23+
24+
with pytest.raises(ValueError, match="no_dataset_or_project"):
25+
dataframe.to_gbq("no_dataset_or_project")
26+
27+
28+
def test_dataframe_to_gbq_invalid_if_exists(monkeypatch: pytest.MonkeyPatch):
29+
dataframe = resources.create_dataframe(monkeypatch)
30+
31+
with pytest.raises(ValueError, match="notreallyanoption"):
32+
# Even though the type is annotated with the literals we accept, users
33+
# might not be using a type checker, especially not in an interactive
34+
# notebook.
35+
dataframe.to_gbq(if_exists="notreallyanoption") # type: ignore
36+
37+
38+
def test_dataframe_to_gbq_invalid_if_exists_no_destination(
39+
monkeypatch: pytest.MonkeyPatch,
40+
):
41+
dataframe = resources.create_dataframe(monkeypatch)
42+
43+
with pytest.raises(ValueError, match="append"):
44+
dataframe.to_gbq(if_exists="append")
45+
46+
47+
def test_dataframe_to_gbq_writes_to_anonymous_dataset(
48+
monkeypatch: pytest.MonkeyPatch,
49+
):
50+
anonymous_dataset_id = "my-anonymous-project.my_anonymous_dataset"
51+
anonymous_dataset = google.cloud.bigquery.DatasetReference.from_string(
52+
anonymous_dataset_id
53+
)
54+
session = resources.create_bigquery_session(anonymous_dataset=anonymous_dataset)
55+
dataframe = resources.create_dataframe(monkeypatch, session=session)
56+
57+
destination = dataframe.to_gbq()
58+
59+
assert destination.startswith(anonymous_dataset_id)

tests/unit/test_pandas.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ def test_pandas_attribute():
116116
assert bpd.ArrowDtype is pd.ArrowDtype
117117

118118

119-
def test_close_session_after_bq_session_ended(monkeypatch):
119+
def test_close_session_after_bq_session_ended(monkeypatch: pytest.MonkeyPatch):
120120
bqclient = mock.create_autospec(google.cloud.bigquery.Client, instance=True)
121121
bqclient.project = "test-project"
122122
session = resources.create_bigquery_session(
@@ -141,7 +141,7 @@ def test_close_session_after_bq_session_ended(monkeypatch):
141141
google.api_core.exceptions.BadRequest,
142142
match="Session JUST_A_TEST has expired and is no longer available.",
143143
):
144-
bpd.read_gbq("SELECT 1")
144+
bpd.read_gbq("SELECT 'ABC'")
145145

146146
# Even though the query to stop the session raises an exception, we should
147147
# still be able to close it without raising an error to the user.

0 commit comments

Comments
 (0)