From 3d7a0d6f7172b8b38ac1825990e612ce03645743 Mon Sep 17 00:00:00 2001 From: Ashley Xu <139821907+ashleyxuu@users.noreply.github.com> Date: Wed, 1 Nov 2023 15:45:08 -0700 Subject: [PATCH 01/10] test: add code snippets for using bigframes.ml (#159) * test: add code snippets for using bigframes.ml --- samples/snippets/clustering_model_test.py | 35 ++++++++++++++ samples/snippets/gen_ai_model_test.py | 39 ++++++++++++++++ samples/snippets/regression_model_test.py | 57 +++++++++++++++++++++++ 3 files changed, 131 insertions(+) create mode 100644 samples/snippets/clustering_model_test.py create mode 100644 samples/snippets/gen_ai_model_test.py create mode 100644 samples/snippets/regression_model_test.py diff --git a/samples/snippets/clustering_model_test.py b/samples/snippets/clustering_model_test.py new file mode 100644 index 0000000000..a407fc7805 --- /dev/null +++ b/samples/snippets/clustering_model_test.py @@ -0,0 +1,35 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def test_clustering_model(): + # [START bigquery_dataframes_clustering_model] + from bigframes.ml.cluster import KMeans + import bigframes.pandas as bpd + + # Load data from BigQuery + query_or_table = "bigquery-public-data.ml_datasets.penguins" + bq_df = bpd.read_gbq(query_or_table) + + # Create the KMeans model + cluster_model = KMeans(n_clusters=10) + cluster_model.fit(bq_df["culmen_length_mm"], bq_df["sex"]) + + # Predict using the model + result = cluster_model.predict(bq_df) + # Score the model + score = cluster_model.score(bq_df) + # [END bigquery_dataframes_clustering_model] + assert result is not None + assert score is not None diff --git a/samples/snippets/gen_ai_model_test.py b/samples/snippets/gen_ai_model_test.py new file mode 100644 index 0000000000..7cbc90d4c0 --- /dev/null +++ b/samples/snippets/gen_ai_model_test.py @@ -0,0 +1,39 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def test_llm_model(): + PROJECT_ID = "bigframes-dev" + REGION = "us" + CONN_NAME = "bigframes-ml" + # [START bigquery_dataframes_gen_ai_model] + from bigframes.ml.llm import PaLM2TextGenerator + import bigframes.pandas as bpd + + # Create the LLM model + session = bpd.get_global_session() + connection = f"{PROJECT_ID}.{REGION}.{CONN_NAME}" + model = PaLM2TextGenerator(session=session, connection_name=connection) + + df_api = bpd.read_csv("gs://cloud-samples-data/vertex-ai/bigframe/df.csv") + + # Prepare the prompts and send them to the LLM model for prediction + df_prompt_prefix = "Generate Pandas sample code for DataFrame." + df_prompt = df_prompt_prefix + df_api["API"] + + # Predict using the model + df_pred = model.predict(df_prompt.to_frame(), max_output_tokens=1024) + # [END bigquery_dataframes_gen_ai_model] + assert df_pred["ml_generate_text_llm_result"] is not None + assert df_pred["ml_generate_text_llm_result"].iloc[0] is not None diff --git a/samples/snippets/regression_model_test.py b/samples/snippets/regression_model_test.py new file mode 100644 index 0000000000..7d1bde689c --- /dev/null +++ b/samples/snippets/regression_model_test.py @@ -0,0 +1,57 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def test_regression_model(): + # [START bigquery_dataframes_regression_model] + from bigframes.ml.linear_model import LinearRegression + import bigframes.pandas as bpd + + # Load data from BigQuery + query_or_table = "bigquery-public-data.ml_datasets.penguins" + bq_df = bpd.read_gbq(query_or_table) + + # Filter down to the data to the Adelie Penguin species + adelie_data = bq_df[bq_df.species == "Adelie Penguin (Pygoscelis adeliae)"] + + # Drop the species column + adelie_data = adelie_data.drop(columns=["species"]) + + # Drop rows with nulls to get training data + training_data = adelie_data.dropna() + + # Specify your feature (or input) columns and the label (or output) column: + feature_columns = training_data[ + ["island", "culmen_length_mm", "culmen_depth_mm", "flipper_length_mm", "sex"] + ] + label_columns = training_data[["body_mass_g"]] + + test_data = adelie_data[adelie_data.body_mass_g.isnull()] + + # Create the linear model + model = LinearRegression() + model.fit(feature_columns, label_columns) + + # Score the model + score = model.score(feature_columns, label_columns) + + # Predict using the model + result = model.predict(test_data) + # [END bigquery_dataframes_regression_model] + assert test_data is not None + assert feature_columns is not None + assert label_columns is not None + assert model is not None + assert score is not None + assert result is not None From b9cb55c5b9354f9ff60de0aad66fe60049876055 Mon Sep 17 00:00:00 2001 From: TrevorBergeron Date: Wed, 1 Nov 2023 17:44:12 -0700 Subject: [PATCH 02/10] feat: add interpolate() to series and dataframe (#157) --- bigframes/core/block_transforms.py | 91 +++++++++++++++++++ bigframes/dataframe.py | 4 + bigframes/series.py | 4 + tests/system/small/test_dataframe.py | 16 ++++ tests/system/small/test_series.py | 26 ++++++ .../bigframes_vendored/pandas/core/frame.py | 37 ++++++++ .../bigframes_vendored/pandas/core/series.py | 32 +++++++ 7 files changed, 210 insertions(+) diff --git a/bigframes/core/block_transforms.py b/bigframes/core/block_transforms.py index 3706bf1681..917edac0de 100644 --- a/bigframes/core/block_transforms.py +++ b/bigframes/core/block_transforms.py @@ -105,6 +105,97 @@ def indicate_duplicates( ) +def interpolate(block: blocks.Block, method: str = "linear") -> blocks.Block: + if method != "linear": + raise NotImplementedError( + f"Only 'linear' interpolate method supported. {constants.FEEDBACK_LINK}" + ) + backwards_window = windows.WindowSpec(following=0) + forwards_window = windows.WindowSpec(preceding=0) + + output_column_ids = [] + + original_columns = block.value_columns + original_labels = block.column_labels + block, offsets = block.promote_offsets() + for column in original_columns: + # null in same places column is null + should_interpolate = block._column_type(column) in [ + pd.Float64Dtype(), + pd.Int64Dtype(), + ] + if should_interpolate: + block, notnull = block.apply_unary_op(column, ops.notnull_op) + block, masked_offsets = block.apply_binary_op( + offsets, notnull, ops.partial_arg3(ops.where_op, None) + ) + + block, previous_value = block.apply_window_op( + column, agg_ops.LastNonNullOp(), backwards_window + ) + block, next_value = block.apply_window_op( + column, agg_ops.FirstNonNullOp(), forwards_window + ) + block, previous_value_offset = block.apply_window_op( + masked_offsets, + agg_ops.LastNonNullOp(), + backwards_window, + skip_reproject_unsafe=True, + ) + block, next_value_offset = block.apply_window_op( + masked_offsets, + agg_ops.FirstNonNullOp(), + forwards_window, + skip_reproject_unsafe=True, + ) + + block, prediction_id = _interpolate( + block, + previous_value_offset, + previous_value, + next_value_offset, + next_value, + offsets, + ) + + block, interpolated_column = block.apply_binary_op( + column, prediction_id, ops.fillna_op + ) + # Pandas performs ffill-like behavior to extrapolate forwards + block, interpolated_and_ffilled = block.apply_binary_op( + interpolated_column, previous_value, ops.fillna_op + ) + + output_column_ids.append(interpolated_and_ffilled) + else: + output_column_ids.append(column) + + # Force reproject since used `skip_project_unsafe` perviously + block = block.select_columns(output_column_ids)._force_reproject() + return block.with_column_labels(original_labels) + + +def _interpolate( + block: blocks.Block, + x0_id: str, + y0_id: str, + x1_id: str, + y1_id: str, + xpredict_id: str, +) -> typing.Tuple[blocks.Block, str]: + """Applies linear interpolation equation to predict y values for xpredict.""" + block, x1x0diff = block.apply_binary_op(x1_id, x0_id, ops.sub_op) + block, y1y0diff = block.apply_binary_op(y1_id, y0_id, ops.sub_op) + block, xpredictx0diff = block.apply_binary_op(xpredict_id, x0_id, ops.sub_op) + + block, y1_weight = block.apply_binary_op(y1y0diff, x1x0diff, ops.div_op) + block, y1_part = block.apply_binary_op(xpredictx0diff, y1_weight, ops.mul_op) + + block, prediction_id = block.apply_binary_op(y0_id, y1_part, ops.add_op) + block = block.drop_columns([x1x0diff, y1y0diff, xpredictx0diff, y1_weight, y1_part]) + return block, prediction_id + + def drop_duplicates( block: blocks.Block, columns: typing.Sequence[str], keep: str = "first" ) -> blocks.Block: diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 3369fb4868..ffcaf0d613 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -1434,6 +1434,10 @@ def _reindex_columns(self, columns): def reindex_like(self, other: DataFrame, *, validate: typing.Optional[bool] = None): return self.reindex(index=other.index, columns=other.columns, validate=validate) + def interpolate(self, method: str = "linear") -> DataFrame: + result = block_ops.interpolate(self._block, method) + return DataFrame(result) + def fillna(self, value=None) -> DataFrame: return self._apply_binop(value, ops.fillna_op, how="left") diff --git a/bigframes/series.py b/bigframes/series.py index 37d00d16f3..824757cf52 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -468,6 +468,10 @@ def replace( ) return Series(block.select_column(result_col)) + def interpolate(self, method: str = "linear") -> Series: + result = block_ops.interpolate(self._block, method) + return Series(result) + def dropna( self, *, diff --git a/tests/system/small/test_dataframe.py b/tests/system/small/test_dataframe.py index c96faa3526..2b710d692a 100644 --- a/tests/system/small/test_dataframe.py +++ b/tests/system/small/test_dataframe.py @@ -711,6 +711,22 @@ def test_df_dropna(scalars_dfs, axis, how, ignore_index): pandas.testing.assert_frame_equal(bf_result, pd_result) +def test_df_interpolate(scalars_dfs): + scalars_df, scalars_pandas_df = scalars_dfs + columns = ["int64_col", "int64_too", "float64_col"] + bf_result = scalars_df[columns].interpolate().to_pandas() + # Pandas can only interpolate on "float64" columns + # https://github.com/pandas-dev/pandas/issues/40252 + pd_result = scalars_pandas_df[columns].astype("float64").interpolate() + + pandas.testing.assert_frame_equal( + bf_result, + pd_result, + check_index_type=False, + check_dtype=False, + ) + + def test_df_fillna(scalars_dfs): scalars_df, scalars_pandas_df = scalars_dfs df = scalars_df[["int64_col", "float64_col"]].fillna(3) diff --git a/tests/system/small/test_series.py b/tests/system/small/test_series.py index 05d8b84185..183ba01c0e 100644 --- a/tests/system/small/test_series.py +++ b/tests/system/small/test_series.py @@ -272,6 +272,32 @@ def test_series_replace_list_scalar(scalars_dfs): ) +@pytest.mark.parametrize( + ("values",), + ( + ([None, 1, 2, None, None, 16, None],), + ([None, None, 3.6, None],), + ([403.2, None, 352.1, None, None, 111.9],), + ), +) +def test_series_interpolate(values): + pd_series = pd.Series(values) + bf_series = series.Series(pd_series) + + # Pandas can only interpolate on "float64" columns + # https://github.com/pandas-dev/pandas/issues/40252 + pd_result = pd_series.astype("float64").interpolate() + bf_result = bf_series.interpolate().to_pandas() + + # pd uses non-null types, while bf uses nullable types + pd.testing.assert_series_equal( + pd_result, + bf_result, + check_index_type=False, + check_dtype=False, + ) + + @pytest.mark.parametrize( ("ignore_index",), ( diff --git a/third_party/bigframes_vendored/pandas/core/frame.py b/third_party/bigframes_vendored/pandas/core/frame.py index 013d170114..12bd053179 100644 --- a/third_party/bigframes_vendored/pandas/core/frame.py +++ b/third_party/bigframes_vendored/pandas/core/frame.py @@ -2756,6 +2756,43 @@ def value_counts( """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) + def interpolate(self, method: str = "linear"): + """ + Fill NaN values using an interpolation method. + + Args: + method (str, default 'linear'): + Interpolation technique to use. Only 'linear' supported. + 'linear': Ignore the index and treat the values as equally spaced. + This is the only method supported on MultiIndexes. + + Returns: + DataFrame: + Returns the same object type as the caller, interpolated at + some or all ``NaN`` values + + **Examples:** + + >>> import bigframes.pandas as bpd + >>> bpd.options.display.progress_bar = None + + >>> df = bpd.DataFrame({ + ... 'A': [1, 2, 3, None, None, 6], + ... 'B': [None, 6, None, 2, None, 3], + ... }) + >>> df.interpolate() + A B + 0 1.0 + 1 2.0 6.0 + 2 3.0 4.0 + 3 4.0 2.0 + 4 5.0 2.5 + 5 6.0 3.0 + + [6 rows x 2 columns] + """ + raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) + def fillna(self, value): """ Fill NA/NaN values using the specified method. diff --git a/third_party/bigframes_vendored/pandas/core/series.py b/third_party/bigframes_vendored/pandas/core/series.py index f0e13e16f5..b569e5699c 100644 --- a/third_party/bigframes_vendored/pandas/core/series.py +++ b/third_party/bigframes_vendored/pandas/core/series.py @@ -916,6 +916,38 @@ def droplevel(self, level, axis): """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) + def interpolate(self, method: str = "linear"): + """ + Fill NaN values using an interpolation method. + + Args: + method (str, default 'linear'): + Interpolation technique to use. Only 'linear' supported. + 'linear': Ignore the index and treat the values as equally spaced. + This is the only method supported on MultiIndexes. + + Returns: + Series: + Returns the same object type as the caller, interpolated at + some or all ``NaN`` values + + **Examples:** + + >>> import bigframes.pandas as bpd + >>> bpd.options.display.progress_bar = None + + >>> series = bpd.Series([1, 2, 3, None, None, 6]) + >>> series.interpolate() + 0 1.0 + 1 2.0 + 2 3.0 + 3 4.0 + 4 5.0 + 5 6.0 + dtype: Float64 + """ + raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) + def fillna( self, value=None, From e1817c9201ba4ea7fd2f8b6f4a667b010a6fec1b Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Thu, 2 Nov 2023 10:55:01 -0500 Subject: [PATCH 03/10] feat: `to_gbq` without a destination table writes to a temporary table (#158) * feat: `to_gbq` without a destination table writes to a temporary table * add unit test covering happy path for to_gbq * update to_gbq docs --- bigframes/constants.py | 4 ++ bigframes/dataframe.py | 47 +++++++++++---- bigframes/session/__init__.py | 11 ++++ bigframes/session/_io/bigquery.py | 20 +++++++ tests/unit/resources.py | 34 ++++++++++- tests/unit/session/test_io_bigquery.py | 26 +++++++- tests/unit/test_dataframe.py | 59 +++++++++++++++++++ tests/unit/test_pandas.py | 4 +- .../bigframes_vendored/pandas/core/frame.py | 40 +++++++++++-- 9 files changed, 225 insertions(+), 20 deletions(-) create mode 100644 tests/unit/test_dataframe.py diff --git a/bigframes/constants.py b/bigframes/constants.py index 90837c79eb..82b48dc967 100644 --- a/bigframes/constants.py +++ b/bigframes/constants.py @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import datetime + """Constants used across BigQuery DataFrames. This module should not depend on any others in the package. @@ -23,3 +25,5 @@ ) ABSTRACT_METHOD_ERROR_MESSAGE = f"Abstract method. You have likely encountered a bug. Please share this stacktrace and how you reached it with the BigQuery DataFrames team. {FEEDBACK_LINK}" + +DEFAULT_EXPIRATION = datetime.timedelta(days=1) diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index ffcaf0d613..4932008f09 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -2289,25 +2289,51 @@ def to_json( def to_gbq( self, - destination_table: str, + destination_table: Optional[str] = None, *, - if_exists: Optional[Literal["fail", "replace", "append"]] = "fail", + if_exists: Optional[Literal["fail", "replace", "append"]] = None, index: bool = True, ordering_id: Optional[str] = None, - ) -> None: - if "." not in destination_table: - raise ValueError( - "Invalid Table Name. Should be of the form 'datasetId.tableId' or " - "'projectId.datasetId.tableId'" - ) - + ) -> str: dispositions = { "fail": bigquery.WriteDisposition.WRITE_EMPTY, "replace": bigquery.WriteDisposition.WRITE_TRUNCATE, "append": bigquery.WriteDisposition.WRITE_APPEND, } + + if destination_table is None: + # TODO(swast): If there have been no modifications to the DataFrame + # since the last time it was written (cached), then return that. + # For `read_gbq` nodes, return the underlying table clone. + destination_table = bigframes.session._io.bigquery.create_temp_table( + self._session.bqclient, + self._session._anonymous_dataset, + # TODO(swast): allow custom expiration times, probably via session configuration. + constants.DEFAULT_EXPIRATION, + ) + + if if_exists is not None and if_exists != "replace": + raise ValueError( + f"Got invalid value {repr(if_exists)} for if_exists. " + "When no destination table is specified, a new table is always created. " + "None or 'replace' are the only valid options in this case." + ) + if_exists = "replace" + + if "." not in destination_table: + raise ValueError( + f"Got invalid value for destination_table {repr(destination_table)}. " + "Should be of the form 'datasetId.tableId' or 'projectId.datasetId.tableId'." + ) + + if if_exists is None: + if_exists = "fail" + if if_exists not in dispositions: - raise ValueError("'{0}' is not valid for if_exists".format(if_exists)) + raise ValueError( + f"Got invalid value {repr(if_exists)} for if_exists. " + f"Valid options include None or one of {dispositions.keys()}." + ) job_config = bigquery.QueryJobConfig( write_disposition=dispositions[if_exists], @@ -2318,6 +2344,7 @@ def to_gbq( ) self._run_io_query(index=index, ordering_id=ordering_id, job_config=job_config) + return destination_table def to_numpy( self, dtype=None, copy=False, na_value=None, **kwargs diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 5a61ed534f..a1eae69715 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -223,6 +223,17 @@ def _create_and_bind_bq_session(self): query_job.result() # blocks until finished self._session_id = query_job.session_info.session_id + # The anonymous dataset is used by BigQuery to write query results and + # session tables. BigQuery DataFrames also writes temp tables directly + # to the dataset, no BigQuery Session required. Note: there is a + # different anonymous dataset per location. See: + # https://cloud.google.com/bigquery/docs/cached-results#how_cached_results_are_stored + query_destination = query_job.destination + self._anonymous_dataset = bigquery.DatasetReference( + query_destination.project, + query_destination.dataset_id, + ) + self.bqclient.default_query_job_config = bigquery.QueryJobConfig( connection_properties=[ bigquery.ConnectionProperty("session_id", self._session_id) diff --git a/bigframes/session/_io/bigquery.py b/bigframes/session/_io/bigquery.py index d47efbdddc..d200a9a861 100644 --- a/bigframes/session/_io/bigquery.py +++ b/bigframes/session/_io/bigquery.py @@ -18,10 +18,12 @@ import textwrap import types from typing import Dict, Iterable, Union +import uuid import google.cloud.bigquery as bigquery IO_ORDERING_ID = "bqdf_row_nums" +TEMP_TABLE_PREFIX = "bqdf{date}_{random_id}" def create_export_csv_statement( @@ -90,6 +92,24 @@ def create_snapshot_sql( ) +def create_temp_table( + bqclient: bigquery.Client, + dataset: bigquery.DatasetReference, + expiration: datetime.timedelta, +) -> str: + """Create an empty table with an expiration in the desired dataset.""" + now = datetime.datetime.now(datetime.timezone.utc) + random_id = uuid.uuid4().hex + table_id = TEMP_TABLE_PREFIX.format( + date=now.strftime("%Y%m%d"), random_id=random_id + ) + table_ref = dataset.table(table_id) + destination = bigquery.Table(table_ref) + destination.expires = now + expiration + bqclient.create_table(destination) + return f"{table_ref.project}.{table_ref.dataset_id}.{table_ref.table_id}" + + # BigQuery REST API returns types in Legacy SQL format # https://cloud.google.com/bigquery/docs/data-types but we use Standard SQL # names diff --git a/tests/unit/resources.py b/tests/unit/resources.py index f660d774f0..8fc8acd175 100644 --- a/tests/unit/resources.py +++ b/tests/unit/resources.py @@ -19,17 +19,21 @@ import google.cloud.bigquery import ibis import pandas +import pytest import bigframes import bigframes.core as core import bigframes.core.ordering +import bigframes.dataframe import bigframes.session.clients """Utilities for creating test resources.""" def create_bigquery_session( - bqclient: Optional[google.cloud.bigquery.Client] = None, session_id: str = "abcxyz" + bqclient: Optional[mock.Mock] = None, + session_id: str = "abcxyz", + anonymous_dataset: Optional[google.cloud.bigquery.DatasetReference] = None, ) -> bigframes.Session: credentials = mock.create_autospec( google.auth.credentials.Credentials, instance=True @@ -39,6 +43,21 @@ def create_bigquery_session( bqclient = mock.create_autospec(google.cloud.bigquery.Client, instance=True) bqclient.project = "test-project" + if anonymous_dataset is None: + anonymous_dataset = google.cloud.bigquery.DatasetReference( + "test-project", + "test_dataset", + ) + + query_job = mock.create_autospec(google.cloud.bigquery.QueryJob) + type(query_job).destination = mock.PropertyMock( + return_value=anonymous_dataset.table("test_table"), + ) + type(query_job).session_info = google.cloud.bigquery.SessionInfo( + {"sessionInfo": {"sessionId": session_id}}, + ) + bqclient.query.return_value = query_job + clients_provider = mock.create_autospec(bigframes.session.clients.ClientsProvider) type(clients_provider).bqclient = mock.PropertyMock(return_value=bqclient) clients_provider._credentials = credentials @@ -51,6 +70,19 @@ def create_bigquery_session( return session +def create_dataframe( + monkeypatch: pytest.MonkeyPatch, session: Optional[bigframes.Session] = None +) -> bigframes.dataframe.DataFrame: + if session is None: + session = create_bigquery_session() + + # Since this may create a ReadLocalNode, the session we explicitly pass in + # might not actually be used. Mock out the global session, too. + monkeypatch.setattr(bigframes.core.global_session, "_global_session", session) + bigframes.options.bigquery._session_started = True + return bigframes.dataframe.DataFrame({}, session=session) + + def create_pandas_session(tables: Dict[str, pandas.DataFrame]) -> bigframes.Session: # TODO(tswast): Refactor to make helper available for all tests. Consider # providing a proper "local Session" for use by downstream developers. diff --git a/tests/unit/session/test_io_bigquery.py b/tests/unit/session/test_io_bigquery.py index d2255d5edf..cb3003b1cc 100644 --- a/tests/unit/session/test_io_bigquery.py +++ b/tests/unit/session/test_io_bigquery.py @@ -14,6 +14,7 @@ import datetime from typing import Iterable +import unittest.mock as mock import google.cloud.bigquery as bigquery import pytest @@ -37,7 +38,7 @@ def test_create_snapshot_sql_doesnt_timetravel_anonymous_datasets(): assert "`my-test-project`.`_e8166e0cdb`.`anonbb92cd`" in sql -def test_create_snapshot_sql_doesnt_timetravel_session_datasets(): +def test_create_snapshot_sql_doesnt_timetravel_session_tables(): table_ref = bigquery.TableReference.from_string("my-test-project._session.abcdefg") sql = bigframes.session._io.bigquery.create_snapshot_sql( @@ -51,6 +52,29 @@ def test_create_snapshot_sql_doesnt_timetravel_session_datasets(): assert "my-test-project" not in sql +def test_create_temp_table_default_expiration(): + """Make sure the created table has an expiration.""" + bqclient = mock.create_autospec(bigquery.Client) + dataset = bigquery.DatasetReference("test-project", "test_dataset") + now = datetime.datetime.now(datetime.timezone.utc) + expiration = datetime.timedelta(days=3) + expected_expires = now + expiration + + bigframes.session._io.bigquery.create_temp_table(bqclient, dataset, expiration) + + bqclient.create_table.assert_called_once() + call_args = bqclient.create_table.call_args + table = call_args.args[0] + assert table.project == "test-project" + assert table.dataset_id == "test_dataset" + assert table.table_id.startswith("bqdf") + assert ( + (expected_expires - datetime.timedelta(minutes=1)) + < table.expires + < (expected_expires + datetime.timedelta(minutes=1)) + ) + + @pytest.mark.parametrize( ("schema", "expected"), ( diff --git a/tests/unit/test_dataframe.py b/tests/unit/test_dataframe.py new file mode 100644 index 0000000000..17a8290889 --- /dev/null +++ b/tests/unit/test_dataframe.py @@ -0,0 +1,59 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import google.cloud.bigquery +import pytest + +from . import resources + + +def test_dataframe_to_gbq_invalid_destination(monkeypatch: pytest.MonkeyPatch): + dataframe = resources.create_dataframe(monkeypatch) + + with pytest.raises(ValueError, match="no_dataset_or_project"): + dataframe.to_gbq("no_dataset_or_project") + + +def test_dataframe_to_gbq_invalid_if_exists(monkeypatch: pytest.MonkeyPatch): + dataframe = resources.create_dataframe(monkeypatch) + + with pytest.raises(ValueError, match="notreallyanoption"): + # Even though the type is annotated with the literals we accept, users + # might not be using a type checker, especially not in an interactive + # notebook. + dataframe.to_gbq(if_exists="notreallyanoption") # type: ignore + + +def test_dataframe_to_gbq_invalid_if_exists_no_destination( + monkeypatch: pytest.MonkeyPatch, +): + dataframe = resources.create_dataframe(monkeypatch) + + with pytest.raises(ValueError, match="append"): + dataframe.to_gbq(if_exists="append") + + +def test_dataframe_to_gbq_writes_to_anonymous_dataset( + monkeypatch: pytest.MonkeyPatch, +): + anonymous_dataset_id = "my-anonymous-project.my_anonymous_dataset" + anonymous_dataset = google.cloud.bigquery.DatasetReference.from_string( + anonymous_dataset_id + ) + session = resources.create_bigquery_session(anonymous_dataset=anonymous_dataset) + dataframe = resources.create_dataframe(monkeypatch, session=session) + + destination = dataframe.to_gbq() + + assert destination.startswith(anonymous_dataset_id) diff --git a/tests/unit/test_pandas.py b/tests/unit/test_pandas.py index 5d4f69c7c0..70c5441c68 100644 --- a/tests/unit/test_pandas.py +++ b/tests/unit/test_pandas.py @@ -116,7 +116,7 @@ def test_pandas_attribute(): assert bpd.ArrowDtype is pd.ArrowDtype -def test_close_session_after_bq_session_ended(monkeypatch): +def test_close_session_after_bq_session_ended(monkeypatch: pytest.MonkeyPatch): bqclient = mock.create_autospec(google.cloud.bigquery.Client, instance=True) bqclient.project = "test-project" session = resources.create_bigquery_session( @@ -141,7 +141,7 @@ def test_close_session_after_bq_session_ended(monkeypatch): google.api_core.exceptions.BadRequest, match="Session JUST_A_TEST has expired and is no longer available.", ): - bpd.read_gbq("SELECT 1") + bpd.read_gbq("SELECT 'ABC'") # Even though the query to stop the session raises an exception, we should # still be able to close it without raising an error to the user. diff --git a/third_party/bigframes_vendored/pandas/core/frame.py b/third_party/bigframes_vendored/pandas/core/frame.py index 12bd053179..e267fac0f7 100644 --- a/third_party/bigframes_vendored/pandas/core/frame.py +++ b/third_party/bigframes_vendored/pandas/core/frame.py @@ -125,12 +125,12 @@ def to_numpy(self, dtype=None, copy=False, na_value=None, **kwargs) -> np.ndarra def to_gbq( self, - destination_table: str, + destination_table: Optional[str], *, - if_exists: Optional[Literal["fail", "replace", "append"]] = "fail", + if_exists: Optional[Literal["fail", "replace", "append"]] = None, index: bool = True, ordering_id: Optional[str] = None, - ) -> None: + ) -> str: """Write a DataFrame to a BigQuery table. **Examples:** @@ -138,17 +138,40 @@ def to_gbq( >>> import bigframes.pandas as bpd >>> bpd.options.display.progress_bar = None + Write a DataFrame to a BigQuery table. + >>> df = bpd.DataFrame({'col1': [1, 2], 'col2': [3, 4]}) >>> # destination_table = PROJECT_ID + "." + DATASET_ID + "." + TABLE_NAME >>> df.to_gbq("bigframes-dev.birds.test-numbers", if_exists="replace") + 'bigframes-dev.birds.test-numbers' + + Write a DataFrame to a temporary BigQuery table in the anonymous dataset. + + >>> df = bpd.DataFrame({'col1': [1, 2], 'col2': [3, 4]}) + >>> destination = df.to_gbq(ordering_id="ordering_id") + >>> # The table created can be read outside of the current session. + >>> bpd.close_session() # For demonstration, only. + >>> bpd.read_gbq(destination, index_col="ordering_id") + col1 col2 + ordering_id + 0 1 3 + 1 2 4 + + [2 rows x 2 columns] Args: - destination_table (str): + destination_table (Optional[str]): Name of table to be written, in the form ``dataset.tablename`` or ``project.dataset.tablename``. - if_exists (str, default 'fail'): - Behavior when the destination table exists. Value can be one of: + If no ``destination_table`` is set, a new temporary table is + created in the BigQuery anonymous dataset. + + if_exists (Optional[str]): + Behavior when the destination table exists. When + ``destination_table`` is set, this defaults to ``'fail'``. When + ``destination_table`` is not set, this field is not applicable. + A new table is always created. Value can be one of: ``'fail'`` If table exists raise pandas_gbq.gbq.TableCreationError. @@ -163,6 +186,11 @@ def to_gbq( ordering_id (Optional[str], default None): If set, write the ordering of the DataFrame as a column in the result table with this name. + + Returns: + str: + The fully-qualified ID for the written table, in the form + ``project.dataset.tablename``. """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) From 5f0ea37fffff792fc3fbed65e6ace846d8ef6a06 Mon Sep 17 00:00:00 2001 From: Ashley Xu <139821907+ashleyxuu@users.noreply.github.com> Date: Thu, 2 Nov 2023 10:49:11 -0700 Subject: [PATCH 04/10] feat: support 32k text-generation and multilingual embedding models (#161) * feat: support 32k text-generation and embedding multilingual models --- bigframes/ml/llm.py | 47 ++++++++++++++++------ tests/system/small/ml/conftest.py | 18 +++++++++ tests/system/small/ml/test_llm.py | 65 +++++++++++++++++++++++++++++++ 3 files changed, 118 insertions(+), 12 deletions(-) diff --git a/bigframes/ml/llm.py b/bigframes/ml/llm.py index d78f467537..2e5a9a1e5e 100644 --- a/bigframes/ml/llm.py +++ b/bigframes/ml/llm.py @@ -16,7 +16,7 @@ from __future__ import annotations -from typing import cast, Optional, Union +from typing import cast, Literal, Optional, Union import bigframes from bigframes import clients, constants @@ -25,9 +25,11 @@ import bigframes.pandas as bpd _REMOTE_TEXT_GENERATOR_MODEL_CODE = "CLOUD_AI_LARGE_LANGUAGE_MODEL_V1" +_REMOTE_TEXT_GENERATOR_32K_MODEL_CODE = "text-bison-32k" _TEXT_GENERATE_RESULT_COLUMN = "ml_generate_text_llm_result" _REMOTE_EMBEDDING_GENERATOR_MODEL_CODE = "CLOUD_AI_TEXT_EMBEDDING_MODEL_V1" +_REMOTE_EMBEDDING_GENERATOR_MUlTILINGUAL_MODEL_CODE = "textembedding-gecko-multilingual" _EMBED_TEXT_RESULT_COLUMN = "text_embedding" @@ -35,19 +37,25 @@ class PaLM2TextGenerator(base.Predictor): """PaLM2 text generator LLM model. Args: + model_name (str, Default to "text-bison"): + The model for natural language tasks. “text-bison” returns model fine-tuned to follow natural language instructions + and is suitable for a variety of language tasks. "text-bison-32k" supports up to 32k tokens per request. + Default to "text-bison". session (bigframes.Session or None): BQ session to create the model. If None, use the global default session. connection_name (str or None): - connection to connect with remote service. str of the format ... + Connection to connect with remote service. str of the format ... if None, use default connection in session context. BigQuery DataFrame will try to create the connection and attach permission if the connection isn't fully setup. """ def __init__( self, + model_name: Literal["text-bison", "text-bison-32k"] = "text-bison", session: Optional[bigframes.Session] = None, connection_name: Optional[str] = None, ): + self.model_name = model_name self.session = session or bpd.get_global_session() self._bq_connection_manager = clients.BqConnectionManager( self.session.bqconnectionclient, self.session.resourcemanagerclient @@ -80,11 +88,14 @@ def _create_bqml_model(self): connection_id=connection_name_parts[2], iam_role="aiplatform.user", ) - - options = { - "remote_service_type": _REMOTE_TEXT_GENERATOR_MODEL_CODE, - } - + if self.model_name == "text-bison": + options = { + "remote_service_type": _REMOTE_TEXT_GENERATOR_MODEL_CODE, + } + else: + options = { + "endpoint": _REMOTE_TEXT_GENERATOR_32K_MODEL_CODE, + } return self._bqml_model_factory.create_remote_model( session=self.session, connection_name=self.connection_name, options=options ) @@ -118,7 +129,7 @@ def predict( top_k (int, default 40): Top-k changes how the model selects tokens for output. A top-k of 1 means the selected token is the most probable among all tokens - in the model’s vocabulary (also called greedy decoding), while a top-k of 3 means that the next token is selected from among the 3 most probable tokens (using temperature). + in the model's vocabulary (also called greedy decoding), while a top-k of 3 means that the next token is selected from among the 3 most probable tokens (using temperature). For each token selection step, the top K tokens with the highest probabilities are sampled. Then tokens are further filtered based on topP with the final token selected using temperature sampling. Specify a lower value for less random responses and a higher value for more random responses. Default 40. Possible values [1, 40]. @@ -175,6 +186,10 @@ class PaLM2TextEmbeddingGenerator(base.Predictor): """PaLM2 text embedding generator LLM model. Args: + model_name (str, Default to "textembedding-gecko"): + The model for text embedding. “textembedding-gecko” returns model embeddings for text inputs. + "textembedding-gecko-multilingual" returns model embeddings for text inputs which support over 100 languages + Default to "textembedding-gecko". session (bigframes.Session or None): BQ session to create the model. If None, use the global default session. connection_name (str or None): @@ -184,9 +199,13 @@ class PaLM2TextEmbeddingGenerator(base.Predictor): def __init__( self, + model_name: Literal[ + "textembedding-gecko", "textembedding-gecko-multilingual" + ] = "textembedding-gecko", session: Optional[bigframes.Session] = None, connection_name: Optional[str] = None, ): + self.model_name = model_name self.session = session or bpd.get_global_session() self._bq_connection_manager = clients.BqConnectionManager( self.session.bqconnectionclient, self.session.resourcemanagerclient @@ -219,10 +238,14 @@ def _create_bqml_model(self): connection_id=connection_name_parts[2], iam_role="aiplatform.user", ) - - options = { - "remote_service_type": _REMOTE_EMBEDDING_GENERATOR_MODEL_CODE, - } + if self.model_name == "textembedding-gecko": + options = { + "remote_service_type": _REMOTE_EMBEDDING_GENERATOR_MODEL_CODE, + } + else: + options = { + "endpoint": _REMOTE_EMBEDDING_GENERATOR_MUlTILINGUAL_MODEL_CODE, + } return self._bqml_model_factory.create_remote_model( session=self.session, connection_name=self.connection_name, options=options diff --git a/tests/system/small/ml/conftest.py b/tests/system/small/ml/conftest.py index 1dd1c813b8..c11445b79a 100644 --- a/tests/system/small/ml/conftest.py +++ b/tests/system/small/ml/conftest.py @@ -213,6 +213,13 @@ def palm2_text_generator_model(session, bq_connection) -> llm.PaLM2TextGenerator return llm.PaLM2TextGenerator(session=session, connection_name=bq_connection) +@pytest.fixture(scope="session") +def palm2_text_generator_32k_model(session, bq_connection) -> llm.PaLM2TextGenerator: + return llm.PaLM2TextGenerator( + model_name="text-bison-32k", session=session, connection_name=bq_connection + ) + + @pytest.fixture(scope="function") def ephemera_palm2_text_generator_model( session, bq_connection @@ -229,6 +236,17 @@ def palm2_embedding_generator_model( ) +@pytest.fixture(scope="session") +def palm2_embedding_generator_multilingual_model( + session, bq_connection +) -> llm.PaLM2TextEmbeddingGenerator: + return llm.PaLM2TextEmbeddingGenerator( + model_name="textembedding-gecko-multilingual", + session=session, + connection_name=bq_connection, + ) + + @pytest.fixture(scope="session") def time_series_bqml_arima_plus_model( session, time_series_arima_plus_model_name diff --git a/tests/system/small/ml/test_llm.py b/tests/system/small/ml/test_llm.py index b7257dde1b..79d3c40317 100644 --- a/tests/system/small/ml/test_llm.py +++ b/tests/system/small/ml/test_llm.py @@ -26,6 +26,12 @@ def test_create_text_generator_model(palm2_text_generator_model): assert palm2_text_generator_model._bqml_model is not None +def test_create_text_generator_32k_model(palm2_text_generator_32k_model): + # Model creation doesn't return error + assert palm2_text_generator_32k_model is not None + assert palm2_text_generator_32k_model._bqml_model is not None + + @pytest.mark.flaky(retries=2, delay=120) def test_create_text_generator_model_default_session(bq_connection, llm_text_pandas_df): import bigframes.pandas as bpd @@ -48,6 +54,30 @@ def test_create_text_generator_model_default_session(bq_connection, llm_text_pan assert all(series.str.len() > 20) +@pytest.mark.flaky(retries=2, delay=120) +def test_create_text_generator_32k_model_default_session( + bq_connection, llm_text_pandas_df +): + import bigframes.pandas as bpd + + bpd.close_session() + bpd.options.bigquery.bq_connection = bq_connection + bpd.options.bigquery.location = "us" + + model = llm.PaLM2TextGenerator(model_name="text-bison-32k") + assert model is not None + assert model._bqml_model is not None + assert model.connection_name.casefold() == "bigframes-dev.us.bigframes-rf-conn" + + llm_text_df = bpd.read_pandas(llm_text_pandas_df) + + df = model.predict(llm_text_df).to_pandas() + TestCase().assertSequenceEqual(df.shape, (3, 1)) + assert "ml_generate_text_llm_result" in df.columns + series = df["ml_generate_text_llm_result"] + assert all(series.str.len() > 20) + + @pytest.mark.flaky(retries=2, delay=120) def test_create_text_generator_model_default_connection(llm_text_pandas_df): from bigframes import _config @@ -127,6 +157,14 @@ def test_create_embedding_generator_model(palm2_embedding_generator_model): assert palm2_embedding_generator_model._bqml_model is not None +def test_create_embedding_generator_multilingual_model( + palm2_embedding_generator_multilingual_model, +): + # Model creation doesn't return error + assert palm2_embedding_generator_multilingual_model is not None + assert palm2_embedding_generator_multilingual_model._bqml_model is not None + + def test_create_text_embedding_generator_model_defaults(bq_connection): import bigframes.pandas as bpd @@ -139,6 +177,20 @@ def test_create_text_embedding_generator_model_defaults(bq_connection): assert model._bqml_model is not None +def test_create_text_embedding_generator_multilingual_model_defaults(bq_connection): + import bigframes.pandas as bpd + + bpd.close_session() + bpd.options.bigquery.bq_connection = bq_connection + bpd.options.bigquery.location = "us" + + model = llm.PaLM2TextEmbeddingGenerator( + model_name="textembedding-gecko-multilingual" + ) + assert model is not None + assert model._bqml_model is not None + + @pytest.mark.flaky(retries=2, delay=120) def test_embedding_generator_predict_success( palm2_embedding_generator_model, llm_text_df @@ -152,6 +204,19 @@ def test_embedding_generator_predict_success( assert value.size == 768 +@pytest.mark.flaky(retries=2, delay=120) +def test_embedding_generator_multilingual_predict_success( + palm2_embedding_generator_multilingual_model, llm_text_df +): + df = palm2_embedding_generator_multilingual_model.predict(llm_text_df).to_pandas() + TestCase().assertSequenceEqual(df.shape, (3, 1)) + assert "text_embedding" in df.columns + series = df["text_embedding"] + value = series[0] + assert isinstance(value, np.ndarray) + assert value.size == 768 + + @pytest.mark.flaky(retries=2, delay=120) def test_embedding_generator_predict_series_success( palm2_embedding_generator_model, llm_text_df From 6d1953b7b46b402a2ea52233141ed6fd338b0098 Mon Sep 17 00:00:00 2001 From: "gcf-owl-bot[bot]" <78513119+gcf-owl-bot[bot]@users.noreply.github.com> Date: Thu, 2 Nov 2023 20:32:49 -0700 Subject: [PATCH 05/10] chore: update docfx minimum Python version (#167) * chore: update docfx minimum Python version Source-Link: https://github.com/googleapis/synthtool/commit/bc07fd415c39853b382bcf8315f8eeacdf334055 Post-Processor: gcr.io/cloud-devrel-public-resources/owlbot-python:latest@sha256:30470597773378105e239b59fce8eb27cc97375580d592699206d17d117143d0 * chore: remove restriction on noxfile.py --------- Co-authored-by: Owl Bot Co-authored-by: Dan Lee <71398022+dandhlee@users.noreply.github.com> --- .github/.OwlBot.lock.yaml | 4 ++-- .github/workflows/docs.yml | 2 +- noxfile.py | 4 +--- 3 files changed, 4 insertions(+), 6 deletions(-) diff --git a/.github/.OwlBot.lock.yaml b/.github/.OwlBot.lock.yaml index 7f291dbd5f..ec696b558c 100644 --- a/.github/.OwlBot.lock.yaml +++ b/.github/.OwlBot.lock.yaml @@ -13,5 +13,5 @@ # limitations under the License. docker: image: gcr.io/cloud-devrel-public-resources/owlbot-python:latest - digest: sha256:4f9b3b106ad0beafc2c8a415e3f62c1a0cc23cabea115dbe841b848f581cfe99 -# created: 2023-10-18T20:26:37.410353675Z + digest: sha256:30470597773378105e239b59fce8eb27cc97375580d592699206d17d117143d0 +# created: 2023-11-03T00:57:07.335914631Z diff --git a/.github/workflows/docs.yml b/.github/workflows/docs.yml index e97d89e484..221806cedf 100644 --- a/.github/workflows/docs.yml +++ b/.github/workflows/docs.yml @@ -28,7 +28,7 @@ jobs: - name: Setup Python uses: actions/setup-python@v4 with: - python-version: "3.9" + python-version: "3.10" - name: Install nox run: | python -m pip install --upgrade setuptools pip wheel diff --git a/noxfile.py b/noxfile.py index d0bbda80fd..34b055de44 100644 --- a/noxfile.py +++ b/noxfile.py @@ -451,9 +451,7 @@ def docs(session): ) -# docfx doesn't yet support Python 3.10. -# https://github.com/googleapis/sphinx-docfx-yaml/issues/305 -@nox.session(python="3.9") +@nox.session(python=DEFAULT_PYTHON_VERSION) def docfx(session): """Build the docfx yaml files for this library.""" From 031f253890f8a212309097554c3462980654201d Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Fri, 3 Nov 2023 13:20:36 -0500 Subject: [PATCH 06/10] fix: use table clone instead of system time for `read_gbq_table` (#109) * fix: use table clone instead of system time for `read_gbq_table` * accept expiration datetime instead of timedelta for easier testing * don't use table clone on _session tables * remove unnecessary assert * add docstrings --- bigframes/constants.py | 2 +- bigframes/dataframe.py | 4 +- bigframes/session/__init__.py | 59 +++++++---------- bigframes/session/_io/bigquery.py | 89 ++++++++++++++++++++------ tests/system/small/test_session.py | 3 - tests/unit/session/test_io_bigquery.py | 64 +++++++++++------- 6 files changed, 137 insertions(+), 84 deletions(-) diff --git a/bigframes/constants.py b/bigframes/constants.py index 82b48dc967..a1ffd2b755 100644 --- a/bigframes/constants.py +++ b/bigframes/constants.py @@ -26,4 +26,4 @@ ABSTRACT_METHOD_ERROR_MESSAGE = f"Abstract method. You have likely encountered a bug. Please share this stacktrace and how you reached it with the BigQuery DataFrames team. {FEEDBACK_LINK}" -DEFAULT_EXPIRATION = datetime.timedelta(days=1) +DEFAULT_EXPIRATION = datetime.timedelta(days=7) diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 4932008f09..45dbcdc78d 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -16,6 +16,7 @@ from __future__ import annotations +import datetime import re import textwrap import typing @@ -2309,7 +2310,8 @@ def to_gbq( self._session.bqclient, self._session._anonymous_dataset, # TODO(swast): allow custom expiration times, probably via session configuration. - constants.DEFAULT_EXPIRATION, + datetime.datetime.now(datetime.timezone.utc) + + constants.DEFAULT_EXPIRATION, ) if if_exists is not None and if_exists != "replace": diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index a1eae69715..12ee91a13a 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -16,6 +16,7 @@ from __future__ import annotations +import datetime import logging import os import re @@ -430,7 +431,9 @@ def _read_gbq_query( index_cols = list(index_col) destination, query_job = self._query_to_destination( - query, index_cols, api_name="read_gbq_query" + query, + index_cols, + api_name=api_name, ) # If there was no destination table, that means the query must have @@ -508,6 +511,12 @@ def _read_gbq_table_to_ibis_with_total_ordering( If we can get a total ordering from the table, such as via primary key column(s), then return those too so that ordering generation can be avoided. + + For tables that aren't already read-only, this creates Create a table + clone so that any changes to the underlying table don't affect the + DataFrame and break our assumptions, especially with regards to unique + index and ordering. See: + https://cloud.google.com/bigquery/docs/table-clones-create """ if table_ref.dataset_id.upper() == "_SESSION": # _SESSION tables aren't supported by the tables.get REST API. @@ -518,15 +527,24 @@ def _read_gbq_table_to_ibis_with_total_ordering( None, ) + now = datetime.datetime.now(datetime.timezone.utc) + destination = bigframes_io.create_table_clone( + table_ref, + self._anonymous_dataset, + # TODO(swast): Allow the default expiration to be configured. + now + constants.DEFAULT_EXPIRATION, + self, + api_name, + ) table_expression = self.ibis_client.table( - table_ref.table_id, - database=f"{table_ref.project}.{table_ref.dataset_id}", + destination.table_id, + database=f"{destination.project}.{destination.dataset_id}", ) # If there are primary keys defined, the query engine assumes these # columns are unique, even if the constraint is not enforced. We make # the same assumption and use these columns as the total ordering keys. - table = self.bqclient.get_table(table_ref) + table = self.bqclient.get_table(destination) # TODO(b/305264153): Use public properties to fetch primary keys once # added to google-cloud-bigquery. @@ -535,23 +553,7 @@ def _read_gbq_table_to_ibis_with_total_ordering( .get("primaryKey", {}) .get("columns") ) - - if not primary_keys: - return table_expression, None - else: - # Read from a snapshot since we won't have to copy the table data to create a total ordering. - job_config = bigquery.QueryJobConfig() - job_config.labels["bigframes-api"] = api_name - current_timestamp = list( - self.bqclient.query( - "SELECT CURRENT_TIMESTAMP() AS `current_timestamp`", - job_config=job_config, - ).result() - )[0][0] - table_expression = self.ibis_client.sql( - bigframes_io.create_snapshot_sql(table_ref, current_timestamp) - ) - return table_expression, primary_keys + return table_expression, primary_keys def _read_gbq_table( self, @@ -662,20 +664,7 @@ def _read_gbq_table( total_ordering_columns=frozenset(index_cols), ) - # We have a total ordering, so query via "time travel" so that - # the underlying data doesn't mutate. - if is_total_ordering: - # Get the timestamp from the job metadata rather than the query - # text so that the query for determining uniqueness of the ID - # columns can be cached. - current_timestamp = query_job.started - - # The job finished, so we should have a start time. - assert current_timestamp is not None - table_expression = self.ibis_client.sql( - bigframes_io.create_snapshot_sql(table_ref, current_timestamp) - ) - else: + if not is_total_ordering: # Make sure when we generate an ordering, the row_number() # coresponds to the index columns. table_expression = table_expression.order_by(index_cols) diff --git a/bigframes/session/_io/bigquery.py b/bigframes/session/_io/bigquery.py index d200a9a861..fd3b1c59a7 100644 --- a/bigframes/session/_io/bigquery.py +++ b/bigframes/session/_io/bigquery.py @@ -14,14 +14,21 @@ """Private module: Helpers for I/O operations.""" +from __future__ import annotations + import datetime import textwrap import types +import typing from typing import Dict, Iterable, Union import uuid import google.cloud.bigquery as bigquery +if typing.TYPE_CHECKING: + import bigframes.session + + IO_ORDERING_ID = "bqdf_row_nums" TEMP_TABLE_PREFIX = "bqdf{date}_{random_id}" @@ -69,43 +76,83 @@ def create_export_data_statement( ) -def create_snapshot_sql( - table_ref: bigquery.TableReference, current_timestamp: datetime.datetime -) -> str: - """Query a table via 'time travel' for consistent reads.""" +def random_table(dataset: bigquery.DatasetReference) -> bigquery.TableReference: + """Generate a random table ID with BigQuery DataFrames prefix. + + Args: + dataset (google.cloud.bigquery.DatasetReference): + The dataset to make the table reference in. Usually the anonymous + dataset for the session. + + Returns: + google.cloud.bigquery.TableReference: + Fully qualified table ID of a table that doesn't exist. + """ + now = datetime.datetime.now(datetime.timezone.utc) + random_id = uuid.uuid4().hex + table_id = TEMP_TABLE_PREFIX.format( + date=now.strftime("%Y%m%d"), random_id=random_id + ) + return dataset.table(table_id) + - # If we have a _SESSION table, assume that it's already a copy. Nothing to do here. - if table_ref.dataset_id.upper() == "_SESSION": - return f"SELECT * FROM `_SESSION`.`{table_ref.table_id}`" +def table_ref_to_sql(table: bigquery.TableReference) -> str: + """Format a table reference as escaped SQL.""" + return f"`{table.project}`.`{table.dataset_id}`.`{table.table_id}`" + +def create_table_clone( + source: bigquery.TableReference, + dataset: bigquery.DatasetReference, + expiration: datetime.datetime, + session: bigframes.session.Session, + api_name: str, +) -> bigquery.TableReference: + """Create a table clone for consistent reads.""" # If we have an anonymous query results table, it can't be modified and # there isn't any BigQuery time travel. - if table_ref.dataset_id.startswith("_"): - return f"SELECT * FROM `{table_ref.project}`.`{table_ref.dataset_id}`.`{table_ref.table_id}`" + if source.dataset_id.startswith("_"): + return source - return textwrap.dedent( + fully_qualified_source_id = table_ref_to_sql(source) + destination = random_table(dataset) + fully_qualified_destination_id = table_ref_to_sql(destination) + + # Include a label so that Dataplex Lineage can identify temporary + # tables that BigQuery DataFrames creates. Googlers: See internal issue + # 296779699. + ddl = textwrap.dedent( f""" - SELECT * - FROM `{table_ref.project}`.`{table_ref.dataset_id}`.`{table_ref.table_id}` - FOR SYSTEM_TIME AS OF TIMESTAMP({repr(current_timestamp.isoformat())}) + CREATE OR REPLACE TABLE + {fully_qualified_destination_id} + CLONE {fully_qualified_source_id} + OPTIONS( + expiration_timestamp=TIMESTAMP "{expiration.isoformat()}", + labels=[ + ("source", "bigquery-dataframes-temp"), + ("bigframes-api", {repr(api_name)}) + ] + ) """ ) + job_config = bigquery.QueryJobConfig() + job_config.labels = { + "source": "bigquery-dataframes-temp", + "bigframes-api": api_name, + } + session._start_query(ddl, job_config=job_config) + return destination def create_temp_table( bqclient: bigquery.Client, dataset: bigquery.DatasetReference, - expiration: datetime.timedelta, + expiration: datetime.datetime, ) -> str: """Create an empty table with an expiration in the desired dataset.""" - now = datetime.datetime.now(datetime.timezone.utc) - random_id = uuid.uuid4().hex - table_id = TEMP_TABLE_PREFIX.format( - date=now.strftime("%Y%m%d"), random_id=random_id - ) - table_ref = dataset.table(table_id) + table_ref = random_table(dataset) destination = bigquery.Table(table_ref) - destination.expires = now + expiration + destination.expires = expiration bqclient.create_table(destination) return f"{table_ref.project}.{table_ref.dataset_id}.{table_ref.table_id}" diff --git a/tests/system/small/test_session.py b/tests/system/small/test_session.py index bf72e444eb..28486a1269 100644 --- a/tests/system/small/test_session.py +++ b/tests/system/small/test_session.py @@ -252,9 +252,6 @@ def test_read_gbq_w_primary_keys_table( sorted_result = result.sort_values(primary_keys) pd.testing.assert_frame_equal(result, sorted_result) - # Verify that we're working from a snapshot rather than a copy of the table. - assert "FOR SYSTEM_TIME AS OF TIMESTAMP" in df.sql - @pytest.mark.parametrize( ("query_or_table", "max_results"), diff --git a/tests/unit/session/test_io_bigquery.py b/tests/unit/session/test_io_bigquery.py index cb3003b1cc..7a8691232b 100644 --- a/tests/unit/session/test_io_bigquery.py +++ b/tests/unit/session/test_io_bigquery.py @@ -19,46 +19,63 @@ import google.cloud.bigquery as bigquery import pytest +import bigframes.session import bigframes.session._io.bigquery -def test_create_snapshot_sql_doesnt_timetravel_anonymous_datasets(): - table_ref = bigquery.TableReference.from_string( +def test_create_table_clone_doesnt_clone_anonymous_datasets(): + session = mock.create_autospec(bigframes.session.Session) + source = bigquery.TableReference.from_string( "my-test-project._e8166e0cdb.anonbb92cd" ) - sql = bigframes.session._io.bigquery.create_snapshot_sql( - table_ref, datetime.datetime.now(datetime.timezone.utc) + destination = bigframes.session._io.bigquery.create_table_clone( + source, + bigquery.DatasetReference("other-project", "other_dataset"), + datetime.datetime(2023, 11, 2, 15, 43, 21, tzinfo=datetime.timezone.utc), + session, + "test_api", ) - # Anonymous query results tables don't support time travel. - assert "SYSTEM_TIME" not in sql + # Anonymous query results tables don't support CLONE + assert destination is source + session._start_query.assert_not_called() - # Need fully-qualified table name. - assert "`my-test-project`.`_e8166e0cdb`.`anonbb92cd`" in sql - -def test_create_snapshot_sql_doesnt_timetravel_session_tables(): - table_ref = bigquery.TableReference.from_string("my-test-project._session.abcdefg") - - sql = bigframes.session._io.bigquery.create_snapshot_sql( - table_ref, datetime.datetime.now(datetime.timezone.utc) +def test_create_table_clone_sets_expiration(): + session = mock.create_autospec(bigframes.session.Session) + source = bigquery.TableReference.from_string( + "my-test-project.test_dataset.some_table" ) - # We aren't modifying _SESSION tables, so don't use time travel. - assert "SYSTEM_TIME" not in sql + expiration = datetime.datetime( + 2023, 11, 2, 15, 43, 21, tzinfo=datetime.timezone.utc + ) + bigframes.session._io.bigquery.create_table_clone( + source, + bigquery.DatasetReference("other-project", "other_dataset"), + expiration, + session, + "test_api", + ) - # Don't need the project ID for _SESSION tables. - assert "my-test-project" not in sql + session._start_query.assert_called_once() + call_args = session._start_query.call_args + query = call_args.args[0] + assert "CREATE OR REPLACE TABLE" in query + assert "CLONE" in query + assert f'expiration_timestamp=TIMESTAMP "{expiration.isoformat()}"' in query + assert '("source", "bigquery-dataframes-temp")' in query + assert call_args.kwargs["job_config"].labels["bigframes-api"] == "test_api" def test_create_temp_table_default_expiration(): """Make sure the created table has an expiration.""" bqclient = mock.create_autospec(bigquery.Client) dataset = bigquery.DatasetReference("test-project", "test_dataset") - now = datetime.datetime.now(datetime.timezone.utc) - expiration = datetime.timedelta(days=3) - expected_expires = now + expiration + expiration = datetime.datetime( + 2023, 11, 2, 13, 44, 55, 678901, datetime.timezone.utc + ) bigframes.session._io.bigquery.create_temp_table(bqclient, dataset, expiration) @@ -68,10 +85,11 @@ def test_create_temp_table_default_expiration(): assert table.project == "test-project" assert table.dataset_id == "test_dataset" assert table.table_id.startswith("bqdf") + # TODO(swast): Why isn't the expiration exactly what we set it to? assert ( - (expected_expires - datetime.timedelta(minutes=1)) + (expiration - datetime.timedelta(minutes=1)) < table.expires - < (expected_expires + datetime.timedelta(minutes=1)) + < (expiration + datetime.timedelta(minutes=1)) ) From c065071028c2f4ac80ee7f84dbeb1df385c2a512 Mon Sep 17 00:00:00 2001 From: TrevorBergeron Date: Fri, 3 Nov 2023 13:42:14 -0700 Subject: [PATCH 07/10] feat: add __iter__, iterrows, itertuples, keys methods (#164) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly: - [ ] Make sure to open an issue as a [bug/issue](https://togithub.com/googleapis/python-bigquery-dataframes/issues/new/choose) before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea - [ ] Ensure the tests and linter pass - [ ] Code coverage does not decrease (if any source code was changed) - [ ] Appropriate docs were updated (if necessary) Fixes # 🦕 --- bigframes/dataframe.py | 18 +++++ bigframes/series.py | 6 ++ tests/system/small/test_dataframe.py | 49 ++++++++++++ .../bigframes_vendored/pandas/core/frame.py | 79 +++++++++++++++++++ .../bigframes_vendored/pandas/core/generic.py | 31 +++++++- 5 files changed, 182 insertions(+), 1 deletion(-) diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 45dbcdc78d..40f12671ae 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -304,6 +304,9 @@ def __len__(self): rows, _ = self.shape return rows + def __iter__(self): + return iter(self.columns) + def astype( self, dtype: Union[bigframes.dtypes.DtypeString, bigframes.dtypes.Dtype], @@ -1477,12 +1480,27 @@ def isin(self, values) -> DataFrame: f"isin(), you passed a [{type(values).__name__}]" ) + def keys(self) -> pandas.Index: + return self.columns + def items(self): column_ids = self._block.value_columns column_labels = self._block.column_labels for col_id, col_label in zip(column_ids, column_labels): yield col_label, bigframes.series.Series(self._block.select_column(col_id)) + def iterrows(self) -> Iterable[tuple[typing.Any, pandas.Series]]: + for df in self.to_pandas_batches(): + for item in df.iterrows(): + yield item + + def itertuples( + self, index: bool = True, name: typing.Optional[str] = "Pandas" + ) -> Iterable[tuple[typing.Any, ...]]: + for df in self.to_pandas_batches(): + for item in df.itertuples(index=index, name=name): + yield item + def dropna( self, *, diff --git a/bigframes/series.py b/bigframes/series.py index 824757cf52..032bdf6c42 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -16,6 +16,7 @@ from __future__ import annotations +import itertools import numbers import textwrap import typing @@ -148,6 +149,11 @@ def _set_internal_query_job(self, query_job: bigquery.QueryJob): def __len__(self): return self.shape[0] + def __iter__(self) -> typing.Iterator: + return itertools.chain.from_iterable( + map(lambda x: x.index, self._block.to_pandas_batches()) + ) + def copy(self) -> Series: return Series(self._block) diff --git a/tests/system/small/test_dataframe.py b/tests/system/small/test_dataframe.py index 2b710d692a..bd5930e508 100644 --- a/tests/system/small/test_dataframe.py +++ b/tests/system/small/test_dataframe.py @@ -803,6 +803,55 @@ def test_apply_series_scalar_callable( pandas.testing.assert_series_equal(bf_result, pd_result) +def test_df_keys( + scalars_df_index, + scalars_pandas_df_index, +): + pandas.testing.assert_index_equal( + scalars_df_index.keys(), scalars_pandas_df_index.keys() + ) + + +def test_df_iter( + scalars_df_index, + scalars_pandas_df_index, +): + for bf_i, df_i in zip(scalars_df_index, scalars_pandas_df_index): + assert bf_i == df_i + + +def test_iterrows( + scalars_df_index, + scalars_pandas_df_index, +): + for (bf_index, bf_series), (pd_index, pd_series) in zip( + scalars_df_index.iterrows(), scalars_pandas_df_index.iterrows() + ): + assert bf_index == pd_index + pandas.testing.assert_series_equal(bf_series, pd_series) + + +@pytest.mark.parametrize( + ( + "index", + "name", + ), + [ + ( + True, + "my_df", + ), + (False, None), + ], +) +def test_itertuples(scalars_df_index, index, name): + # Numeric has slightly different representation as a result of conversions. + bf_tuples = scalars_df_index.itertuples(index, name) + pd_tuples = scalars_df_index.to_pandas().itertuples(index, name) + for bf_tuple, pd_tuple in zip(bf_tuples, pd_tuples): + assert bf_tuple == pd_tuple + + def test_df_isin_list(scalars_dfs): scalars_df, scalars_pandas_df = scalars_dfs values = ["Hello, World!", 55555, 2.51, pd.NA, True] diff --git a/third_party/bigframes_vendored/pandas/core/frame.py b/third_party/bigframes_vendored/pandas/core/frame.py index e267fac0f7..6f4f6be35d 100644 --- a/third_party/bigframes_vendored/pandas/core/frame.py +++ b/third_party/bigframes_vendored/pandas/core/frame.py @@ -975,6 +975,85 @@ def isin(self, values): """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) + def keys(self): + """ + Get the 'info axis'. + + This is index for Series, columns for DataFrame. + + Returns: + Index: Info axis. + + **Examples:** + + >>> import bigframes.pandas as bpd + >>> bpd.options.display.progress_bar = None + + >>> df = bpd.DataFrame({ + ... 'A': [1, 2, 3], + ... 'B': [4, 5, 6], + ... }) + >>> df.keys() + Index(['A', 'B'], dtype='object') + """ + raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) + + def iterrows(self): + """ + Iterate over DataFrame rows as (index, Series) pairs. + + Yields: + a tuple (index, data) where data contains row values as a Series + + **Examples:** + + >>> import bigframes.pandas as bpd + >>> bpd.options.display.progress_bar = None + >>> df = bpd.DataFrame({ + ... 'A': [1, 2, 3], + ... 'B': [4, 5, 6], + ... }) + >>> index, row = next(df.iterrows()) + >>> index + 0 + >>> row + A 1 + B 4 + Name: 0, dtype: object + """ + raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) + + def itertuples(self, index: bool = True, name: str | None = "Pandas"): + """ + Iterate over DataFrame rows as namedtuples. + + Args: + index (bool, default True): + If True, return the index as the first element of the tuple. + name (str or None, default "Pandas"): + The name of the returned namedtuples or None to return regular + tuples. + + Returns: + iterator: + An object to iterate over namedtuples for each row in the + DataFrame with the first field possibly being the index and + following fields being the column values. + + + **Examples:** + + >>> import bigframes.pandas as bpd + >>> bpd.options.display.progress_bar = None + >>> df = bpd.DataFrame({ + ... 'A': [1, 2, 3], + ... 'B': [4, 5, 6], + ... }) + >>> next(df.itertuples(name="Pair")) + Pair(Index=0, A=1, B=4) + """ + raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) + def items(self): """ Iterate over (column name, Series) pairs. diff --git a/third_party/bigframes_vendored/pandas/core/generic.py b/third_party/bigframes_vendored/pandas/core/generic.py index 27d2e84537..127efe6a3d 100644 --- a/third_party/bigframes_vendored/pandas/core/generic.py +++ b/third_party/bigframes_vendored/pandas/core/generic.py @@ -1,7 +1,7 @@ # Contains code from https://github.com/pandas-dev/pandas/blob/main/pandas/core/generic.py from __future__ import annotations -from typing import Literal, Optional +from typing import Iterator, Literal, Optional from bigframes import constants from third_party.bigframes_vendored.pandas.core import indexing @@ -35,6 +35,35 @@ def size(self) -> int: """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) + def __iter__(self) -> Iterator: + """ + Iterate over info axis. + + Returns + iterator: Info axis as iterator. + + **Examples:** + >>> import bigframes.pandas as bpd + >>> bpd.options.display.progress_bar = None + + >>> df = bpd.DataFrame({ + ... 'A': [1, 2, 3], + ... 'B': [4, 5, 6], + ... }) + >>> for x in df: + ... print(x) + A + B + + >>> series = bpd.Series(["a", "b", "c"], index=[10, 20, 30]) + >>> for x in series: + ... print(x) + 10 + 20 + 30 + """ + raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) + # ------------------------------------------------------------------------- # Unary Methods From dfcc2d3f6918785a3048681c61e0f5f6c99d9d95 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Fri, 3 Nov 2023 21:44:23 -0500 Subject: [PATCH 08/10] Revert "fix: use table clone instead of system time for `read_gbq_table` (#109)" (#171) This reverts commit 031f253890f8a212309097554c3462980654201d. --- bigframes/constants.py | 2 +- bigframes/dataframe.py | 4 +- bigframes/session/__init__.py | 59 ++++++++++------- bigframes/session/_io/bigquery.py | 89 ++++++-------------------- tests/system/small/test_session.py | 3 + tests/unit/session/test_io_bigquery.py | 64 +++++++----------- 6 files changed, 84 insertions(+), 137 deletions(-) diff --git a/bigframes/constants.py b/bigframes/constants.py index a1ffd2b755..82b48dc967 100644 --- a/bigframes/constants.py +++ b/bigframes/constants.py @@ -26,4 +26,4 @@ ABSTRACT_METHOD_ERROR_MESSAGE = f"Abstract method. You have likely encountered a bug. Please share this stacktrace and how you reached it with the BigQuery DataFrames team. {FEEDBACK_LINK}" -DEFAULT_EXPIRATION = datetime.timedelta(days=7) +DEFAULT_EXPIRATION = datetime.timedelta(days=1) diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 40f12671ae..04a5456e26 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -16,7 +16,6 @@ from __future__ import annotations -import datetime import re import textwrap import typing @@ -2328,8 +2327,7 @@ def to_gbq( self._session.bqclient, self._session._anonymous_dataset, # TODO(swast): allow custom expiration times, probably via session configuration. - datetime.datetime.now(datetime.timezone.utc) - + constants.DEFAULT_EXPIRATION, + constants.DEFAULT_EXPIRATION, ) if if_exists is not None and if_exists != "replace": diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 12ee91a13a..a1eae69715 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -16,7 +16,6 @@ from __future__ import annotations -import datetime import logging import os import re @@ -431,9 +430,7 @@ def _read_gbq_query( index_cols = list(index_col) destination, query_job = self._query_to_destination( - query, - index_cols, - api_name=api_name, + query, index_cols, api_name="read_gbq_query" ) # If there was no destination table, that means the query must have @@ -511,12 +508,6 @@ def _read_gbq_table_to_ibis_with_total_ordering( If we can get a total ordering from the table, such as via primary key column(s), then return those too so that ordering generation can be avoided. - - For tables that aren't already read-only, this creates Create a table - clone so that any changes to the underlying table don't affect the - DataFrame and break our assumptions, especially with regards to unique - index and ordering. See: - https://cloud.google.com/bigquery/docs/table-clones-create """ if table_ref.dataset_id.upper() == "_SESSION": # _SESSION tables aren't supported by the tables.get REST API. @@ -527,24 +518,15 @@ def _read_gbq_table_to_ibis_with_total_ordering( None, ) - now = datetime.datetime.now(datetime.timezone.utc) - destination = bigframes_io.create_table_clone( - table_ref, - self._anonymous_dataset, - # TODO(swast): Allow the default expiration to be configured. - now + constants.DEFAULT_EXPIRATION, - self, - api_name, - ) table_expression = self.ibis_client.table( - destination.table_id, - database=f"{destination.project}.{destination.dataset_id}", + table_ref.table_id, + database=f"{table_ref.project}.{table_ref.dataset_id}", ) # If there are primary keys defined, the query engine assumes these # columns are unique, even if the constraint is not enforced. We make # the same assumption and use these columns as the total ordering keys. - table = self.bqclient.get_table(destination) + table = self.bqclient.get_table(table_ref) # TODO(b/305264153): Use public properties to fetch primary keys once # added to google-cloud-bigquery. @@ -553,7 +535,23 @@ def _read_gbq_table_to_ibis_with_total_ordering( .get("primaryKey", {}) .get("columns") ) - return table_expression, primary_keys + + if not primary_keys: + return table_expression, None + else: + # Read from a snapshot since we won't have to copy the table data to create a total ordering. + job_config = bigquery.QueryJobConfig() + job_config.labels["bigframes-api"] = api_name + current_timestamp = list( + self.bqclient.query( + "SELECT CURRENT_TIMESTAMP() AS `current_timestamp`", + job_config=job_config, + ).result() + )[0][0] + table_expression = self.ibis_client.sql( + bigframes_io.create_snapshot_sql(table_ref, current_timestamp) + ) + return table_expression, primary_keys def _read_gbq_table( self, @@ -664,7 +662,20 @@ def _read_gbq_table( total_ordering_columns=frozenset(index_cols), ) - if not is_total_ordering: + # We have a total ordering, so query via "time travel" so that + # the underlying data doesn't mutate. + if is_total_ordering: + # Get the timestamp from the job metadata rather than the query + # text so that the query for determining uniqueness of the ID + # columns can be cached. + current_timestamp = query_job.started + + # The job finished, so we should have a start time. + assert current_timestamp is not None + table_expression = self.ibis_client.sql( + bigframes_io.create_snapshot_sql(table_ref, current_timestamp) + ) + else: # Make sure when we generate an ordering, the row_number() # coresponds to the index columns. table_expression = table_expression.order_by(index_cols) diff --git a/bigframes/session/_io/bigquery.py b/bigframes/session/_io/bigquery.py index fd3b1c59a7..d200a9a861 100644 --- a/bigframes/session/_io/bigquery.py +++ b/bigframes/session/_io/bigquery.py @@ -14,21 +14,14 @@ """Private module: Helpers for I/O operations.""" -from __future__ import annotations - import datetime import textwrap import types -import typing from typing import Dict, Iterable, Union import uuid import google.cloud.bigquery as bigquery -if typing.TYPE_CHECKING: - import bigframes.session - - IO_ORDERING_ID = "bqdf_row_nums" TEMP_TABLE_PREFIX = "bqdf{date}_{random_id}" @@ -76,83 +69,43 @@ def create_export_data_statement( ) -def random_table(dataset: bigquery.DatasetReference) -> bigquery.TableReference: - """Generate a random table ID with BigQuery DataFrames prefix. - - Args: - dataset (google.cloud.bigquery.DatasetReference): - The dataset to make the table reference in. Usually the anonymous - dataset for the session. - - Returns: - google.cloud.bigquery.TableReference: - Fully qualified table ID of a table that doesn't exist. - """ - now = datetime.datetime.now(datetime.timezone.utc) - random_id = uuid.uuid4().hex - table_id = TEMP_TABLE_PREFIX.format( - date=now.strftime("%Y%m%d"), random_id=random_id - ) - return dataset.table(table_id) - - -def table_ref_to_sql(table: bigquery.TableReference) -> str: - """Format a table reference as escaped SQL.""" - return f"`{table.project}`.`{table.dataset_id}`.`{table.table_id}`" +def create_snapshot_sql( + table_ref: bigquery.TableReference, current_timestamp: datetime.datetime +) -> str: + """Query a table via 'time travel' for consistent reads.""" + # If we have a _SESSION table, assume that it's already a copy. Nothing to do here. + if table_ref.dataset_id.upper() == "_SESSION": + return f"SELECT * FROM `_SESSION`.`{table_ref.table_id}`" -def create_table_clone( - source: bigquery.TableReference, - dataset: bigquery.DatasetReference, - expiration: datetime.datetime, - session: bigframes.session.Session, - api_name: str, -) -> bigquery.TableReference: - """Create a table clone for consistent reads.""" # If we have an anonymous query results table, it can't be modified and # there isn't any BigQuery time travel. - if source.dataset_id.startswith("_"): - return source - - fully_qualified_source_id = table_ref_to_sql(source) - destination = random_table(dataset) - fully_qualified_destination_id = table_ref_to_sql(destination) + if table_ref.dataset_id.startswith("_"): + return f"SELECT * FROM `{table_ref.project}`.`{table_ref.dataset_id}`.`{table_ref.table_id}`" - # Include a label so that Dataplex Lineage can identify temporary - # tables that BigQuery DataFrames creates. Googlers: See internal issue - # 296779699. - ddl = textwrap.dedent( + return textwrap.dedent( f""" - CREATE OR REPLACE TABLE - {fully_qualified_destination_id} - CLONE {fully_qualified_source_id} - OPTIONS( - expiration_timestamp=TIMESTAMP "{expiration.isoformat()}", - labels=[ - ("source", "bigquery-dataframes-temp"), - ("bigframes-api", {repr(api_name)}) - ] - ) + SELECT * + FROM `{table_ref.project}`.`{table_ref.dataset_id}`.`{table_ref.table_id}` + FOR SYSTEM_TIME AS OF TIMESTAMP({repr(current_timestamp.isoformat())}) """ ) - job_config = bigquery.QueryJobConfig() - job_config.labels = { - "source": "bigquery-dataframes-temp", - "bigframes-api": api_name, - } - session._start_query(ddl, job_config=job_config) - return destination def create_temp_table( bqclient: bigquery.Client, dataset: bigquery.DatasetReference, - expiration: datetime.datetime, + expiration: datetime.timedelta, ) -> str: """Create an empty table with an expiration in the desired dataset.""" - table_ref = random_table(dataset) + now = datetime.datetime.now(datetime.timezone.utc) + random_id = uuid.uuid4().hex + table_id = TEMP_TABLE_PREFIX.format( + date=now.strftime("%Y%m%d"), random_id=random_id + ) + table_ref = dataset.table(table_id) destination = bigquery.Table(table_ref) - destination.expires = expiration + destination.expires = now + expiration bqclient.create_table(destination) return f"{table_ref.project}.{table_ref.dataset_id}.{table_ref.table_id}" diff --git a/tests/system/small/test_session.py b/tests/system/small/test_session.py index 28486a1269..bf72e444eb 100644 --- a/tests/system/small/test_session.py +++ b/tests/system/small/test_session.py @@ -252,6 +252,9 @@ def test_read_gbq_w_primary_keys_table( sorted_result = result.sort_values(primary_keys) pd.testing.assert_frame_equal(result, sorted_result) + # Verify that we're working from a snapshot rather than a copy of the table. + assert "FOR SYSTEM_TIME AS OF TIMESTAMP" in df.sql + @pytest.mark.parametrize( ("query_or_table", "max_results"), diff --git a/tests/unit/session/test_io_bigquery.py b/tests/unit/session/test_io_bigquery.py index 7a8691232b..cb3003b1cc 100644 --- a/tests/unit/session/test_io_bigquery.py +++ b/tests/unit/session/test_io_bigquery.py @@ -19,63 +19,46 @@ import google.cloud.bigquery as bigquery import pytest -import bigframes.session import bigframes.session._io.bigquery -def test_create_table_clone_doesnt_clone_anonymous_datasets(): - session = mock.create_autospec(bigframes.session.Session) - source = bigquery.TableReference.from_string( +def test_create_snapshot_sql_doesnt_timetravel_anonymous_datasets(): + table_ref = bigquery.TableReference.from_string( "my-test-project._e8166e0cdb.anonbb92cd" ) - destination = bigframes.session._io.bigquery.create_table_clone( - source, - bigquery.DatasetReference("other-project", "other_dataset"), - datetime.datetime(2023, 11, 2, 15, 43, 21, tzinfo=datetime.timezone.utc), - session, - "test_api", + sql = bigframes.session._io.bigquery.create_snapshot_sql( + table_ref, datetime.datetime.now(datetime.timezone.utc) ) - # Anonymous query results tables don't support CLONE - assert destination is source - session._start_query.assert_not_called() + # Anonymous query results tables don't support time travel. + assert "SYSTEM_TIME" not in sql + # Need fully-qualified table name. + assert "`my-test-project`.`_e8166e0cdb`.`anonbb92cd`" in sql -def test_create_table_clone_sets_expiration(): - session = mock.create_autospec(bigframes.session.Session) - source = bigquery.TableReference.from_string( - "my-test-project.test_dataset.some_table" - ) - expiration = datetime.datetime( - 2023, 11, 2, 15, 43, 21, tzinfo=datetime.timezone.utc - ) - bigframes.session._io.bigquery.create_table_clone( - source, - bigquery.DatasetReference("other-project", "other_dataset"), - expiration, - session, - "test_api", +def test_create_snapshot_sql_doesnt_timetravel_session_tables(): + table_ref = bigquery.TableReference.from_string("my-test-project._session.abcdefg") + + sql = bigframes.session._io.bigquery.create_snapshot_sql( + table_ref, datetime.datetime.now(datetime.timezone.utc) ) - session._start_query.assert_called_once() - call_args = session._start_query.call_args - query = call_args.args[0] - assert "CREATE OR REPLACE TABLE" in query - assert "CLONE" in query - assert f'expiration_timestamp=TIMESTAMP "{expiration.isoformat()}"' in query - assert '("source", "bigquery-dataframes-temp")' in query - assert call_args.kwargs["job_config"].labels["bigframes-api"] == "test_api" + # We aren't modifying _SESSION tables, so don't use time travel. + assert "SYSTEM_TIME" not in sql + + # Don't need the project ID for _SESSION tables. + assert "my-test-project" not in sql def test_create_temp_table_default_expiration(): """Make sure the created table has an expiration.""" bqclient = mock.create_autospec(bigquery.Client) dataset = bigquery.DatasetReference("test-project", "test_dataset") - expiration = datetime.datetime( - 2023, 11, 2, 13, 44, 55, 678901, datetime.timezone.utc - ) + now = datetime.datetime.now(datetime.timezone.utc) + expiration = datetime.timedelta(days=3) + expected_expires = now + expiration bigframes.session._io.bigquery.create_temp_table(bqclient, dataset, expiration) @@ -85,11 +68,10 @@ def test_create_temp_table_default_expiration(): assert table.project == "test-project" assert table.dataset_id == "test_dataset" assert table.table_id.startswith("bqdf") - # TODO(swast): Why isn't the expiration exactly what we set it to? assert ( - (expiration - datetime.timedelta(minutes=1)) + (expected_expires - datetime.timedelta(minutes=1)) < table.expires - < (expiration + datetime.timedelta(minutes=1)) + < (expected_expires + datetime.timedelta(minutes=1)) ) From 4ff26cdf862e9f9b91a3a1d2abfa7fbdf0af9c5b Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Mon, 6 Nov 2023 12:35:06 -0600 Subject: [PATCH 09/10] fix: update default temp table expiration to 7 days (#174) --- bigframes/constants.py | 2 +- bigframes/dataframe.py | 4 ++- bigframes/session/__init__.py | 4 ++- bigframes/session/_io/bigquery.py | 36 ++++++++++++++++++++------ tests/unit/session/test_io_bigquery.py | 10 +++---- 5 files changed, 40 insertions(+), 16 deletions(-) diff --git a/bigframes/constants.py b/bigframes/constants.py index 82b48dc967..a1ffd2b755 100644 --- a/bigframes/constants.py +++ b/bigframes/constants.py @@ -26,4 +26,4 @@ ABSTRACT_METHOD_ERROR_MESSAGE = f"Abstract method. You have likely encountered a bug. Please share this stacktrace and how you reached it with the BigQuery DataFrames team. {FEEDBACK_LINK}" -DEFAULT_EXPIRATION = datetime.timedelta(days=1) +DEFAULT_EXPIRATION = datetime.timedelta(days=7) diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 04a5456e26..40f12671ae 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -16,6 +16,7 @@ from __future__ import annotations +import datetime import re import textwrap import typing @@ -2327,7 +2328,8 @@ def to_gbq( self._session.bqclient, self._session._anonymous_dataset, # TODO(swast): allow custom expiration times, probably via session configuration. - constants.DEFAULT_EXPIRATION, + datetime.datetime.now(datetime.timezone.utc) + + constants.DEFAULT_EXPIRATION, ) if if_exists is not None and if_exists != "replace": diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index a1eae69715..2537e81e19 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -430,7 +430,9 @@ def _read_gbq_query( index_cols = list(index_col) destination, query_job = self._query_to_destination( - query, index_cols, api_name="read_gbq_query" + query, + index_cols, + api_name=api_name, ) # If there was no destination table, that means the query must have diff --git a/bigframes/session/_io/bigquery.py b/bigframes/session/_io/bigquery.py index d200a9a861..06d240fec6 100644 --- a/bigframes/session/_io/bigquery.py +++ b/bigframes/session/_io/bigquery.py @@ -14,6 +14,8 @@ """Private module: Helpers for I/O operations.""" +from __future__ import annotations + import datetime import textwrap import types @@ -69,6 +71,29 @@ def create_export_data_statement( ) +def random_table(dataset: bigquery.DatasetReference) -> bigquery.TableReference: + """Generate a random table ID with BigQuery DataFrames prefix. + Args: + dataset (google.cloud.bigquery.DatasetReference): + The dataset to make the table reference in. Usually the anonymous + dataset for the session. + Returns: + google.cloud.bigquery.TableReference: + Fully qualified table ID of a table that doesn't exist. + """ + now = datetime.datetime.now(datetime.timezone.utc) + random_id = uuid.uuid4().hex + table_id = TEMP_TABLE_PREFIX.format( + date=now.strftime("%Y%m%d"), random_id=random_id + ) + return dataset.table(table_id) + + +def table_ref_to_sql(table: bigquery.TableReference) -> str: + """Format a table reference as escaped SQL.""" + return f"`{table.project}`.`{table.dataset_id}`.`{table.table_id}`" + + def create_snapshot_sql( table_ref: bigquery.TableReference, current_timestamp: datetime.datetime ) -> str: @@ -95,17 +120,12 @@ def create_snapshot_sql( def create_temp_table( bqclient: bigquery.Client, dataset: bigquery.DatasetReference, - expiration: datetime.timedelta, + expiration: datetime.datetime, ) -> str: """Create an empty table with an expiration in the desired dataset.""" - now = datetime.datetime.now(datetime.timezone.utc) - random_id = uuid.uuid4().hex - table_id = TEMP_TABLE_PREFIX.format( - date=now.strftime("%Y%m%d"), random_id=random_id - ) - table_ref = dataset.table(table_id) + table_ref = random_table(dataset) destination = bigquery.Table(table_ref) - destination.expires = now + expiration + destination.expires = expiration bqclient.create_table(destination) return f"{table_ref.project}.{table_ref.dataset_id}.{table_ref.table_id}" diff --git a/tests/unit/session/test_io_bigquery.py b/tests/unit/session/test_io_bigquery.py index cb3003b1cc..03470208e4 100644 --- a/tests/unit/session/test_io_bigquery.py +++ b/tests/unit/session/test_io_bigquery.py @@ -56,9 +56,9 @@ def test_create_temp_table_default_expiration(): """Make sure the created table has an expiration.""" bqclient = mock.create_autospec(bigquery.Client) dataset = bigquery.DatasetReference("test-project", "test_dataset") - now = datetime.datetime.now(datetime.timezone.utc) - expiration = datetime.timedelta(days=3) - expected_expires = now + expiration + expiration = datetime.datetime( + 2023, 11, 2, 13, 44, 55, 678901, datetime.timezone.utc + ) bigframes.session._io.bigquery.create_temp_table(bqclient, dataset, expiration) @@ -69,9 +69,9 @@ def test_create_temp_table_default_expiration(): assert table.dataset_id == "test_dataset" assert table.table_id.startswith("bqdf") assert ( - (expected_expires - datetime.timedelta(minutes=1)) + (expiration - datetime.timedelta(minutes=1)) < table.expires - < (expected_expires + datetime.timedelta(minutes=1)) + < (expiration + datetime.timedelta(minutes=1)) ) From 8b6b1c610750e2d75d465a36f829af0b8c835b19 Mon Sep 17 00:00:00 2001 From: "release-please[bot]" <55107282+release-please[bot]@users.noreply.github.com> Date: Tue, 7 Nov 2023 16:56:16 +0000 Subject: [PATCH 10/10] chore(main): release 0.13.0 (#165) :robot: I have created a release *beep* *boop* --- ## [0.13.0](https://togithub.com/googleapis/python-bigquery-dataframes/compare/v0.12.0...v0.13.0) (2023-11-07) ### Features * `to_gbq` without a destination table writes to a temporary table ([#158](https://togithub.com/googleapis/python-bigquery-dataframes/issues/158)) ([e1817c9](https://togithub.com/googleapis/python-bigquery-dataframes/commit/e1817c9201ba4ea7fd2f8b6f4a667b010a6fec1b)) * Add `DataFrame.__iter__`, `DataFrame.iterrows`, `DataFrame.itertuples`, and `DataFrame.keys` methods ([#164](https://togithub.com/googleapis/python-bigquery-dataframes/issues/164)) ([c065071](https://togithub.com/googleapis/python-bigquery-dataframes/commit/c065071028c2f4ac80ee7f84dbeb1df385c2a512)) * Add `Series.__iter__` method ([#164](https://togithub.com/googleapis/python-bigquery-dataframes/issues/164)) ([c065071](https://togithub.com/googleapis/python-bigquery-dataframes/commit/c065071028c2f4ac80ee7f84dbeb1df385c2a512)) * Add interpolate() to series and dataframe ([#157](https://togithub.com/googleapis/python-bigquery-dataframes/issues/157)) ([b9cb55c](https://togithub.com/googleapis/python-bigquery-dataframes/commit/b9cb55c5b9354f9ff60de0aad66fe60049876055)) * Support 32k text-generation and multilingual embedding models ([#161](https://togithub.com/googleapis/python-bigquery-dataframes/issues/161)) ([5f0ea37](https://togithub.com/googleapis/python-bigquery-dataframes/commit/5f0ea37fffff792fc3fbed65e6ace846d8ef6a06)) ### Bug Fixes * Update default temp table expiration to 7 days ([#174](https://togithub.com/googleapis/python-bigquery-dataframes/issues/174)) ([4ff26cd](https://togithub.com/googleapis/python-bigquery-dataframes/commit/4ff26cdf862e9f9b91a3a1d2abfa7fbdf0af9c5b)) --- This PR was generated with [Release Please](https://togithub.com/googleapis/release-please). See [documentation](https://togithub.com/googleapis/release-please#release-please). --- CHANGELOG.md | 16 ++++++++++++++++ bigframes/version.py | 2 +- 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 845d3634bc..fc327b2e96 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,22 @@ [1]: https://pypi.org/project/bigframes/#history +## [0.13.0](https://github.com/googleapis/python-bigquery-dataframes/compare/v0.12.0...v0.13.0) (2023-11-07) + + +### Features + +* `to_gbq` without a destination table writes to a temporary table ([#158](https://github.com/googleapis/python-bigquery-dataframes/issues/158)) ([e1817c9](https://github.com/googleapis/python-bigquery-dataframes/commit/e1817c9201ba4ea7fd2f8b6f4a667b010a6fec1b)) +* Add `DataFrame.__iter__`, `DataFrame.iterrows`, `DataFrame.itertuples`, and `DataFrame.keys` methods ([#164](https://github.com/googleapis/python-bigquery-dataframes/issues/164)) ([c065071](https://github.com/googleapis/python-bigquery-dataframes/commit/c065071028c2f4ac80ee7f84dbeb1df385c2a512)) +* Add `Series.__iter__` method ([#164](https://github.com/googleapis/python-bigquery-dataframes/issues/164)) ([c065071](https://github.com/googleapis/python-bigquery-dataframes/commit/c065071028c2f4ac80ee7f84dbeb1df385c2a512)) +* Add interpolate() to series and dataframe ([#157](https://github.com/googleapis/python-bigquery-dataframes/issues/157)) ([b9cb55c](https://github.com/googleapis/python-bigquery-dataframes/commit/b9cb55c5b9354f9ff60de0aad66fe60049876055)) +* Support 32k text-generation and multilingual embedding models ([#161](https://github.com/googleapis/python-bigquery-dataframes/issues/161)) ([5f0ea37](https://github.com/googleapis/python-bigquery-dataframes/commit/5f0ea37fffff792fc3fbed65e6ace846d8ef6a06)) + + +### Bug Fixes + +* Update default temp table expiration to 7 days ([#174](https://github.com/googleapis/python-bigquery-dataframes/issues/174)) ([4ff26cd](https://github.com/googleapis/python-bigquery-dataframes/commit/4ff26cdf862e9f9b91a3a1d2abfa7fbdf0af9c5b)) + ## [0.12.0](https://github.com/googleapis/python-bigquery-dataframes/compare/v0.11.0...v0.12.0) (2023-11-01) diff --git a/bigframes/version.py b/bigframes/version.py index b324ed7234..0a5df27479 100644 --- a/bigframes/version.py +++ b/bigframes/version.py @@ -12,4 +12,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -__version__ = "0.12.0" +__version__ = "0.13.0"