Skip to content

Commit 61f18cb

Browse files
authored
fix: Improve to_pandas_batches for large results (#746)
* fix: Improve to_pandas_batches for large results * remove page_size and use 1tb * don't pass bqstorage client * use page_size * still pass storage client * use max_results instead of iterator trick
1 parent 4915424 commit 61f18cb

File tree

3 files changed

+49
-21
lines changed

3 files changed

+49
-21
lines changed

bigframes/core/blocks.py

+16-3
Original file line numberDiff line numberDiff line change
@@ -508,11 +508,24 @@ def try_peek(
508508
else:
509509
return None
510510

511-
def to_pandas_batches(self):
512-
"""Download results one message at a time."""
511+
def to_pandas_batches(
512+
self, page_size: Optional[int] = None, max_results: Optional[int] = None
513+
):
514+
"""Download results one message at a time.
515+
516+
page_size and max_results determine the size and number of batches,
517+
see https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.job.QueryJob#google_cloud_bigquery_job_QueryJob_result"""
513518
dtypes = dict(zip(self.index_columns, self.index.dtypes))
514519
dtypes.update(zip(self.value_columns, self.dtypes))
515-
results_iterator, _ = self.session._execute(self.expr, sorted=True)
520+
_, query_job = self.session._query_to_destination(
521+
self.session._to_sql(self.expr, sorted=True),
522+
list(self.index_columns),
523+
api_name="cached",
524+
do_clustering=False,
525+
)
526+
results_iterator = query_job.result(
527+
page_size=page_size, max_results=max_results
528+
)
516529
for arrow_table in results_iterator.to_arrow_iterable(
517530
bqstorage_client=self.session.bqstoragereadclient
518531
):

bigframes/dataframe.py

+23-3
Original file line numberDiff line numberDiff line change
@@ -1215,10 +1215,30 @@ def to_pandas(
12151215
self._set_internal_query_job(query_job)
12161216
return df.set_axis(self._block.column_labels, axis=1, copy=False)
12171217

1218-
def to_pandas_batches(self) -> Iterable[pandas.DataFrame]:
1219-
"""Stream DataFrame results to an iterable of pandas DataFrame"""
1218+
def to_pandas_batches(
1219+
self, page_size: Optional[int] = None, max_results: Optional[int] = None
1220+
) -> Iterable[pandas.DataFrame]:
1221+
"""Stream DataFrame results to an iterable of pandas DataFrame.
1222+
1223+
page_size and max_results determine the size and number of batches,
1224+
see https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.job.QueryJob#google_cloud_bigquery_job_QueryJob_result
1225+
1226+
Args:
1227+
page_size (int, default None):
1228+
The size of each batch.
1229+
max_results (int, default None):
1230+
If given, only download this many rows at maximum.
1231+
1232+
Returns:
1233+
Iterable[pandas.DataFrame]:
1234+
An iterable of smaller dataframes which combine to
1235+
form the original dataframe. Results stream from bigquery,
1236+
see https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.table.RowIterator#google_cloud_bigquery_table_RowIterator_to_arrow_iterable
1237+
"""
12201238
self._optimize_query_complexity()
1221-
return self._block.to_pandas_batches()
1239+
return self._block.to_pandas_batches(
1240+
page_size=page_size, max_results=max_results
1241+
)
12221242

12231243
def _compute_dry_run(self) -> bigquery.QueryJob:
12241244
return self._block._compute_dry_run()

tests/system/load/test_large_tables.py

+10-15
Original file line numberDiff line numberDiff line change
@@ -75,22 +75,17 @@ def test_index_repr_large_table():
7575

7676

7777
def test_to_pandas_batches_large_table():
78-
df = bpd.read_gbq("load_testing.scalars_10gb")
79-
# df will be downloaded locally
80-
expected_row_count, expected_column_count = df.shape
81-
82-
row_count = 0
83-
# TODO(b/340890167): fix type error
84-
for df in df.to_pandas_batches(): # type: ignore
85-
batch_row_count, batch_column_count = df.shape
78+
df = bpd.read_gbq("load_testing.scalars_1tb")
79+
_, expected_column_count = df.shape
80+
81+
# download only a few batches, since 1tb would be too much
82+
iterable = df.to_pandas_batches(page_size=500, max_results=1500)
83+
# use page size since client library doesn't support
84+
# streaming only part of the dataframe via bqstorage
85+
for pdf in iterable:
86+
batch_row_count, batch_column_count = pdf.shape
8687
assert batch_column_count == expected_column_count
87-
row_count += batch_row_count
88-
89-
# Attempt to save on memory by manually removing the batch df
90-
# from local memory after finishing with processing.
91-
del df
92-
93-
assert row_count == expected_row_count
88+
assert batch_row_count > 0
9489

9590

9691
@pytest.mark.skip(reason="See if it caused kokoro build aborted.")

0 commit comments

Comments
 (0)