Skip to content

feat: add streaming.StreamingDataFrame class #864

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2354,7 +2354,7 @@ def is_monotonic_decreasing(
return self._is_monotonic(column_id, increasing=False)

def to_sql_query(
self, include_index: bool
self, include_index: bool, enable_cache: bool = True
) -> typing.Tuple[str, list[str], list[Label]]:
"""
Compiles this DataFrame's expression tree to SQL, optionally
Expand Down Expand Up @@ -2388,7 +2388,9 @@ def to_sql_query(
# the BigQuery unicode column name feature?
substitutions[old_id] = new_id

sql = self.session._to_sql(array_value, col_id_overrides=substitutions)
sql = self.session._to_sql(
array_value, col_id_overrides=substitutions, enable_cache=enable_cache
)
return (
sql,
new_ids[: len(idx_labels)],
Expand Down
8 changes: 6 additions & 2 deletions bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ def guarded_meth(df: DataFrame, *args, **kwargs):
@log_adapter.class_logger
class DataFrame(vendored_pandas_frame.DataFrame):
__doc__ = vendored_pandas_frame.DataFrame.__doc__
# internal flag to disable cache at all
_disable_cache_override: bool = False

def __init__(
self,
Expand Down Expand Up @@ -367,7 +369,7 @@ def astype(
return self._apply_unary_op(ops.AsTypeOp(to_type=dtype))

def _to_sql_query(
self, include_index: bool
self, include_index: bool, enable_cache: bool = True
) -> Tuple[str, list[str], list[blocks.Label]]:
"""Compiles this DataFrame's expression tree to SQL, optionally
including index columns.
Expand All @@ -381,7 +383,7 @@ def _to_sql_query(
If include_index is set to False, index_column_id_list and index_column_label_list
return empty lists.
"""
return self._block.to_sql_query(include_index)
return self._block.to_sql_query(include_index, enable_cache=enable_cache)

@property
def sql(self) -> str:
Expand Down Expand Up @@ -3628,6 +3630,8 @@ def _cached(self, *, force: bool = False) -> DataFrame:
No-op if the dataframe represents a trivial transformation of an existing materialization.
Force=True is used for BQML integration where need to copy data rather than use snapshot.
"""
if self._disable_cache_override:
return self
self._block.cached(force=force)
return self

Expand Down
51 changes: 44 additions & 7 deletions bigframes/session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@
import bigframes.core.indexes
import bigframes.dataframe as dataframe
import bigframes.series
import bigframes.streaming.dataframe as streaming_dataframe

_BIGFRAMES_DEFAULT_CONNECTION_ID = "bigframes-default-connection"

Expand Down Expand Up @@ -749,6 +750,38 @@ def read_gbq_table(
filters=filters,
)

def read_gbq_table_streaming(
self, table: str
) -> streaming_dataframe.StreamingDataFrame:
"""Turn a BigQuery table into a StreamingDataFrame.

Note: The bigframes.streaming module is a preview feature, and subject to change.

**Examples:**

>>> import bigframes.streaming as bst
>>> import bigframes.pandas as bpd
>>> bpd.options.display.progress_bar = None

>>> sdf = bst.read_gbq_table("bigquery-public-data.ml_datasets.penguins")
"""
warnings.warn(
"The bigframes.streaming module is a preview feature, and subject to change.",
stacklevel=1,
category=bigframes.exceptions.PreviewWarning,
)

import bigframes.streaming.dataframe as streaming_dataframe

df = self._read_gbq_table(
table,
api_name="read_gbq_table_steaming",
enable_snapshot=False,
index_col=bigframes.enums.DefaultIndexKind.NULL,
)

return streaming_dataframe.StreamingDataFrame._from_table_df(df)

def _read_gbq_table(
self,
query: str,
Expand All @@ -759,6 +792,7 @@ def _read_gbq_table(
api_name: str,
use_cache: bool = True,
filters: third_party_pandas_gbq.FiltersType = (),
enable_snapshot: bool = True,
) -> dataframe.DataFrame:
import bigframes.dataframe as dataframe

Expand Down Expand Up @@ -877,7 +911,7 @@ def _read_gbq_table(
else (*columns, *[col for col in index_cols if col not in columns])
)

supports_snapshot = bf_read_gbq_table.validate_table(
enable_snapshot = enable_snapshot and bf_read_gbq_table.validate_table(
self.bqclient, table_ref, all_columns, time_travel_timestamp, filter_str
)

Expand Down Expand Up @@ -905,7 +939,7 @@ def _read_gbq_table(
table,
schema=schema,
predicate=filter_str,
at_time=time_travel_timestamp if supports_snapshot else None,
at_time=time_travel_timestamp if enable_snapshot else None,
primary_key=index_cols if is_index_unique else (),
session=self,
)
Expand Down Expand Up @@ -2056,17 +2090,20 @@ def _to_sql(
offset_column: typing.Optional[str] = None,
col_id_overrides: typing.Mapping[str, str] = {},
ordered: bool = False,
enable_cache: bool = True,
) -> str:
if offset_column:
array_value = array_value.promote_offsets(offset_column)
node_w_cached = self._with_cached_executions(array_value.node)
node = (
self._with_cached_executions(array_value.node)
if enable_cache
else array_value.node
)
if ordered:
return self._compiler.compile_ordered(
node_w_cached, col_id_overrides=col_id_overrides
node, col_id_overrides=col_id_overrides
)
return self._compiler.compile_unordered(
node_w_cached, col_id_overrides=col_id_overrides
)
return self._compiler.compile_unordered(node, col_id_overrides=col_id_overrides)

def _get_table_size(self, destination_table):
table = self.bqclient.get_table(destination_table)
Expand Down
Loading