Skip to content
This repository was archived by the owner on May 7, 2026. It is now read-only.
Prev Previous commit
Next Next commit
internalize pandas conversion to result object
  • Loading branch information
TrevorBergeron committed Apr 28, 2025
commit ca643f43c8ae8a7ca349501ecf4a4214b1032422
18 changes: 4 additions & 14 deletions bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
Optional,
Sequence,
Tuple,
TYPE_CHECKING,
Union,
)
import warnings
Expand Down Expand Up @@ -68,13 +67,8 @@
import bigframes.core.window_spec as windows
import bigframes.dtypes
import bigframes.exceptions as bfe
import bigframes.features
import bigframes.operations as ops
import bigframes.operations.aggregations as agg_ops
import bigframes.session._io.pandas as io_pandas

if TYPE_CHECKING:
import bigframes.session.executor

# Type constraint for wherever column labels are used
Label = typing.Hashable
Expand Down Expand Up @@ -580,7 +574,7 @@ def try_peek(
result = self.session._executor.peek(
self.expr, n, use_explicit_destination=allow_large_results
)
df = io_pandas.arrow_to_pandas(result.to_arrow_table(), self.expr.schema)
df = result.to_pandas()
self._copy_index_to_pandas(df)
return df
else:
Expand All @@ -604,8 +598,7 @@ def to_pandas_batches(
page_size=page_size,
max_results=max_results,
)
for record_batch in execute_result.arrow_batches():
df = io_pandas.arrow_to_pandas(record_batch, self.expr.schema)
for df in execute_result.to_pandas_batches():
self._copy_index_to_pandas(df)
if squeeze:
yield df.squeeze(axis=1)
Expand Down Expand Up @@ -690,8 +683,7 @@ def _materialize_local(
MaterializationOptions(ordered=materialize_options.ordered)
)
else:
arrow = execute_result.to_arrow_table()
df = io_pandas.arrow_to_pandas(arrow, schema=self.expr.schema)
df = execute_result.to_pandas()
self._copy_index_to_pandas(df)

return df, execute_result.query_job
Expand Down Expand Up @@ -1571,9 +1563,7 @@ def retrieve_repr_request_results(
head_result = self.session._executor.head(self.expr, max_results)
row_count = self.session._executor.execute(self.expr.row_count()).to_py_scalar()

df = io_pandas.arrow_to_pandas(
head_result.to_arrow_table(), schema=head_result.schema
)
df = head_result.to_pandas()
self._copy_index_to_pandas(df)
return df, row_count, head_result.query_job

Expand Down
12 changes: 12 additions & 0 deletions bigframes/session/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,17 @@

import abc
import dataclasses
import functools
import itertools
from typing import Callable, Iterator, Literal, Mapping, Optional, Sequence, Union

from google.cloud import bigquery
import pandas as pd
import pyarrow

import bigframes.core
import bigframes.core.schema
import bigframes.session._io.pandas as io_pandas


@dataclasses.dataclass(frozen=True)
Expand All @@ -49,6 +52,15 @@ def to_arrow_table(self) -> pyarrow.Table:
else:
return self.schema.to_pyarrow().empty_table()

def to_pandas(self) -> pd.DataFrame:
return io_pandas.arrow_to_pandas(self.to_arrow_table(), self.schema)

def to_pandas_batches(self) -> Iterator[pd.DataFrame]:
yield from map(
functools.partial(io_pandas.arrow_to_pandas, schema=self.schema),
self.arrow_batches(),
)

def to_py_scalar(self):
columns = list(self.to_arrow_table().to_pydict().values())
if len(columns) != 1:
Expand Down