Skip to content

Commit 1d385be

Browse files
authored
fix: create session dataset for remote functions only when needed (#94)
With this change BigFrames will not create a dataset upfront at the time of session creation, but instead leave it to the components which need the dataset to create it. Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly: - [ ] Make sure to open an issue as a [bug/issue](https://togithub.com/googleapis/python-bigquery-dataframes/issues/new/choose) before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea - [ ] Ensure the tests and linter pass - [ ] Code coverage does not decrease (if any source code was changed) - [ ] Appropriate docs were updated (if necessary) Fixes #<issue_number_goes_here> 🦕
1 parent d39134d commit 1d385be

File tree

5 files changed

+90
-27
lines changed

5 files changed

+90
-27
lines changed

bigframes/remote_function.py

+28-11
Original file line numberDiff line numberDiff line change
@@ -202,10 +202,22 @@ def create_bq_remote_function(
202202
OPTIONS (
203203
endpoint = "{endpoint}"
204204
)"""
205+
205206
logger.info(f"Creating BQ remote function: {create_function_ddl}")
207+
208+
# Make sure the dataset exists
209+
dataset = bigquery.Dataset(
210+
bigquery.DatasetReference.from_string(
211+
self._bq_dataset, default_project=self._gcp_project_id
212+
)
213+
)
214+
dataset.location = self._bq_location
215+
self._bq_client.create_dataset(dataset, exists_ok=True)
216+
206217
# TODO: Use session._start_query() so we get progress bar
207218
query_job = self._bq_client.query(create_function_ddl) # Make an API request.
208219
query_job.result() # Wait for the job to complete.
220+
209221
logger.info(f"Created remote function {query_job.ddl_target_routine}")
210222

211223
def get_cloud_function_fully_qualified_parent(self):
@@ -465,17 +477,22 @@ def get_remote_function_specs(self, remote_function_name):
465477
routines = self._bq_client.list_routines(
466478
f"{self._gcp_project_id}.{self._bq_dataset}"
467479
)
468-
for routine in routines:
469-
if routine.reference.routine_id == remote_function_name:
470-
# TODO(shobs): Use first class properties when they are available
471-
# https://github.com/googleapis/python-bigquery/issues/1552
472-
rf_options = routine._properties.get("remoteFunctionOptions")
473-
if rf_options:
474-
http_endpoint = rf_options.get("endpoint")
475-
bq_connection = rf_options.get("connection")
476-
if bq_connection:
477-
bq_connection = os.path.basename(bq_connection)
478-
break
480+
try:
481+
for routine in routines:
482+
if routine.reference.routine_id == remote_function_name:
483+
# TODO(shobs): Use first class properties when they are available
484+
# https://github.com/googleapis/python-bigquery/issues/1552
485+
rf_options = routine._properties.get("remoteFunctionOptions")
486+
if rf_options:
487+
http_endpoint = rf_options.get("endpoint")
488+
bq_connection = rf_options.get("connection")
489+
if bq_connection:
490+
bq_connection = os.path.basename(bq_connection)
491+
break
492+
except google.api_core.exceptions.NotFound:
493+
# The dataset might not exist, in which case the http_endpoint doesn't, either.
494+
# Note: list_routines doesn't make an API request until we iterate on the response object.
495+
pass
479496
return (http_endpoint, bq_connection)
480497

481498

bigframes/session.py

+1-6
Original file line numberDiff line numberDiff line change
@@ -381,17 +381,12 @@ def _create_and_bind_bq_session(self):
381381
]
382382
)
383383

384-
# Dataset for storing BQML models and remote functions, which don't yet
384+
# Dataset for storing remote functions, which don't yet
385385
# support proper session temporary storage yet
386386
self._session_dataset = bigquery.Dataset(
387387
f"{self.bqclient.project}.bigframes_temp_{self._location.lower().replace('-', '_')}"
388388
)
389389
self._session_dataset.location = self._location
390-
self._session_dataset.default_table_expiration_ms = 24 * 60 * 60 * 1000
391-
392-
# TODO: handle case when the dataset does not exist and the user does
393-
# not have permission to create one (bigquery.datasets.create IAM)
394-
self.bqclient.create_dataset(self._session_dataset, exists_ok=True)
395390

396391
def close(self):
397392
"""Terminated the BQ session, otherwises the session will be terminated automatically after

tests/system/conftest.py

+18-5
Original file line numberDiff line numberDiff line change
@@ -134,15 +134,28 @@ def cleanup_datasets(bigquery_client: bigquery.Client) -> None:
134134
)
135135

136136

137+
def get_dataset_id(project_id: str):
138+
"Get a fully qualified dataset id belonging to the given project."
139+
dataset_id = f"{project_id}.{prefixer.create_prefix()}_dataset_id"
140+
return dataset_id
141+
142+
137143
@pytest.fixture(scope="session")
138144
def dataset_id(bigquery_client: bigquery.Client):
139145
"""Create (and cleanup) a temporary dataset."""
140-
project_id = bigquery_client.project
141-
dataset_id = f"{project_id}.{prefixer.create_prefix()}_dataset_id"
142-
dataset = bigquery.Dataset(dataset_id)
143-
bigquery_client.create_dataset(dataset)
146+
dataset_id = get_dataset_id(bigquery_client.project)
147+
bigquery_client.create_dataset(dataset_id)
148+
yield dataset_id
149+
bigquery_client.delete_dataset(dataset_id, delete_contents=True)
150+
151+
152+
@pytest.fixture
153+
def dataset_id_not_created(bigquery_client: bigquery.Client):
154+
"""Return a temporary dataset object without creating it, and clean it up
155+
after it has been used."""
156+
dataset_id = get_dataset_id(bigquery_client.project)
144157
yield dataset_id
145-
bigquery_client.delete_dataset(dataset, delete_contents=True)
158+
bigquery_client.delete_dataset(dataset_id, delete_contents=True)
146159

147160

148161
@pytest.fixture(scope="session")

tests/system/large/test_remote_function.py

+43
Original file line numberDiff line numberDiff line change
@@ -408,6 +408,49 @@ def add_one(x):
408408
)
409409

410410

411+
@pytest.mark.flaky(retries=2, delay=120)
412+
def test_remote_function_explicit_dataset_not_created(
413+
session, scalars_dfs, dataset_id_not_created, bq_cf_connection, functions_client
414+
):
415+
try:
416+
417+
@session.remote_function(
418+
[int],
419+
int,
420+
dataset_id_not_created,
421+
bq_cf_connection,
422+
reuse=False,
423+
)
424+
def square(x):
425+
return x * x
426+
427+
scalars_df, scalars_pandas_df = scalars_dfs
428+
429+
bf_int64_col = scalars_df["int64_col"]
430+
bf_int64_col_filter = bf_int64_col.notnull()
431+
bf_int64_col_filtered = bf_int64_col[bf_int64_col_filter]
432+
bf_result_col = bf_int64_col_filtered.apply(square)
433+
bf_result = (
434+
bf_int64_col_filtered.to_frame().assign(result=bf_result_col).to_pandas()
435+
)
436+
437+
pd_int64_col = scalars_pandas_df["int64_col"]
438+
pd_int64_col_filter = pd_int64_col.notnull()
439+
pd_int64_col_filtered = pd_int64_col[pd_int64_col_filter]
440+
pd_result_col = pd_int64_col_filtered.apply(lambda x: x * x)
441+
# TODO(shobs): Figure why pandas .apply() changes the dtype, i.e.
442+
# pd_int64_col_filtered.dtype is Int64Dtype()
443+
# pd_int64_col_filtered.apply(lambda x: x * x).dtype is int64.
444+
# For this test let's force the pandas dtype to be same as bigframes' dtype.
445+
pd_result_col = pd_result_col.astype(pandas.Int64Dtype())
446+
pd_result = pd_int64_col_filtered.to_frame().assign(result=pd_result_col)
447+
448+
assert_pandas_df_equal_ignore_ordering(bf_result, pd_result)
449+
finally:
450+
# clean up the gcp assets created for the remote function
451+
cleanup_remote_function_assets(session.bqclient, functions_client, square)
452+
453+
411454
@pytest.mark.flaky(retries=2, delay=120)
412455
def test_remote_udf_referring_outside_var(
413456
session, scalars_dfs, dataset_id, bq_cf_connection, functions_client

tests/system/small/test_session.py

-5
Original file line numberDiff line numberDiff line change
@@ -894,11 +894,6 @@ def test_session_id(session):
894894
# TODO(chelsealin): Verify the session id can be binded with a load job.
895895

896896

897-
def test_session_dataset_exists_and_configured(session: bigframes.Session):
898-
dataset = session.bqclient.get_dataset(session._session_dataset_id)
899-
assert dataset.default_table_expiration_ms == 24 * 60 * 60 * 1000
900-
901-
902897
@pytest.mark.flaky(retries=2)
903898
def test_to_close_session():
904899
session = bigframes.Session()

0 commit comments

Comments
 (0)