Skip to content

Commit 9dcf1aa

Browse files
authored
perf: update df.corr, df.cov to be used with more than 30 columns case. (#1161)
* perf: update df.corr, df.cov to be used with more than 30 columns case. * add large test * remove print * fix_index * fix index * test fix * fix test * fix test * slightly improve multi_apply_unary_op to avoid RecursionError * update recursion limit for nox session * skip the test in e2e/python 3.12 * simplify code * simplify code
1 parent 3072d38 commit 9dcf1aa

File tree

6 files changed

+268
-54
lines changed

6 files changed

+268
-54
lines changed

bigframes/core/blocks.py

+4-45
Original file line numberDiff line numberDiff line change
@@ -912,14 +912,17 @@ def multi_apply_unary_op(
912912
input_varname = input_varnames[0]
913913

914914
block = self
915+
916+
result_ids = []
915917
for col_id in columns:
916918
label = self.col_id_to_label[col_id]
917919
block, result_id = block.project_expr(
918920
expr.bind_variables({input_varname: ex.deref(col_id)}),
919921
label=label,
920922
)
921923
block = block.copy_values(result_id, col_id)
922-
block = block.drop_columns([result_id])
924+
result_ids.append(result_id)
925+
block = block.drop_columns(result_ids)
923926
# Special case, we can preserve transpose cache for full-frame unary ops
924927
if (self._transpose_cache is not None) and set(self.value_columns) == set(
925928
columns
@@ -1317,50 +1320,6 @@ def summarize(
13171320
index_columns=index_cols,
13181321
)
13191322

1320-
def calculate_pairwise_metric(self, op=agg_ops.CorrOp()):
1321-
"""
1322-
Returns a block object to compute pairwise metrics among all value columns in this block.
1323-
1324-
The metric to be computed is specified by the `op` parameter, which can be either a
1325-
correlation operation (default) or a covariance operation.
1326-
"""
1327-
if len(self.value_columns) > 30:
1328-
raise NotImplementedError(
1329-
"This function supports dataframes with 30 columns or fewer. "
1330-
f"Provided dataframe has {len(self.value_columns)} columns. {constants.FEEDBACK_LINK}"
1331-
)
1332-
1333-
aggregations = [
1334-
(
1335-
ex.BinaryAggregation(op, ex.deref(left_col), ex.deref(right_col)),
1336-
f"{left_col}-{right_col}",
1337-
)
1338-
for left_col in self.value_columns
1339-
for right_col in self.value_columns
1340-
]
1341-
expr = self.expr.aggregate(aggregations)
1342-
1343-
input_count = len(self.value_columns)
1344-
unpivot_columns = tuple(
1345-
tuple(expr.column_ids[input_count * i : input_count * (i + 1)])
1346-
for i in range(input_count)
1347-
)
1348-
labels = self._get_labels_for_columns(self.value_columns)
1349-
1350-
# TODO(b/340896143): fix type error
1351-
expr, (index_col_ids, _, _) = unpivot(
1352-
expr,
1353-
row_labels=labels,
1354-
unpivot_columns=unpivot_columns,
1355-
)
1356-
1357-
return Block(
1358-
expr,
1359-
column_labels=self.column_labels,
1360-
index_columns=index_col_ids,
1361-
index_labels=self.column_labels.names,
1362-
)
1363-
13641323
def explode(
13651324
self,
13661325
column_ids: typing.Sequence[str],

bigframes/dataframe.py

+190-2
Original file line numberDiff line numberDiff line change
@@ -1197,15 +1197,203 @@ def corr(self, method="pearson", min_periods=None, numeric_only=False) -> DataFr
11971197
else:
11981198
frame = self._drop_non_numeric()
11991199

1200-
return DataFrame(frame._block.calculate_pairwise_metric(op=agg_ops.CorrOp()))
1200+
orig_columns = frame.columns
1201+
# Replace column names with 0 to n - 1 to keep order
1202+
# and avoid the influence of duplicated column name
1203+
frame.columns = pandas.Index(range(len(orig_columns)))
1204+
frame = frame.astype(bigframes.dtypes.FLOAT_DTYPE)
1205+
block = frame._block
1206+
1207+
# A new column that uniquely identifies each row
1208+
block, ordering_col = frame._block.promote_offsets(label="_bigframes_idx")
1209+
1210+
val_col_ids = [
1211+
col_id for col_id in block.value_columns if col_id != ordering_col
1212+
]
1213+
1214+
block = block.melt(
1215+
[ordering_col], val_col_ids, ["_bigframes_variable"], "_bigframes_value"
1216+
)
1217+
1218+
block = block.merge(
1219+
block,
1220+
left_join_ids=[ordering_col],
1221+
right_join_ids=[ordering_col],
1222+
how="inner",
1223+
sort=False,
1224+
)
1225+
1226+
frame = DataFrame(block).dropna(
1227+
subset=["_bigframes_value_x", "_bigframes_value_y"]
1228+
)
1229+
1230+
paired_mean_frame = (
1231+
frame.groupby(["_bigframes_variable_x", "_bigframes_variable_y"])
1232+
.agg(
1233+
_bigframes_paired_mean_x=bigframes.pandas.NamedAgg(
1234+
column="_bigframes_value_x", aggfunc="mean"
1235+
),
1236+
_bigframes_paired_mean_y=bigframes.pandas.NamedAgg(
1237+
column="_bigframes_value_y", aggfunc="mean"
1238+
),
1239+
)
1240+
.reset_index()
1241+
)
1242+
1243+
frame = frame.merge(
1244+
paired_mean_frame, on=["_bigframes_variable_x", "_bigframes_variable_y"]
1245+
)
1246+
frame["_bigframes_value_x"] -= frame["_bigframes_paired_mean_x"]
1247+
frame["_bigframes_value_y"] -= frame["_bigframes_paired_mean_y"]
1248+
1249+
frame["_bigframes_dividend"] = (
1250+
frame["_bigframes_value_x"] * frame["_bigframes_value_y"]
1251+
)
1252+
frame["_bigframes_x_square"] = (
1253+
frame["_bigframes_value_x"] * frame["_bigframes_value_x"]
1254+
)
1255+
frame["_bigframes_y_square"] = (
1256+
frame["_bigframes_value_y"] * frame["_bigframes_value_y"]
1257+
)
1258+
1259+
result = (
1260+
frame.groupby(["_bigframes_variable_x", "_bigframes_variable_y"])
1261+
.agg(
1262+
_bigframes_dividend_sum=bigframes.pandas.NamedAgg(
1263+
column="_bigframes_dividend", aggfunc="sum"
1264+
),
1265+
_bigframes_x_square_sum=bigframes.pandas.NamedAgg(
1266+
column="_bigframes_x_square", aggfunc="sum"
1267+
),
1268+
_bigframes_y_square_sum=bigframes.pandas.NamedAgg(
1269+
column="_bigframes_y_square", aggfunc="sum"
1270+
),
1271+
)
1272+
.reset_index()
1273+
)
1274+
result["_bigframes_corr"] = result["_bigframes_dividend_sum"] / (
1275+
(
1276+
result["_bigframes_x_square_sum"] * result["_bigframes_y_square_sum"]
1277+
)._apply_unary_op(ops.sqrt_op)
1278+
)
1279+
result = result._pivot(
1280+
index="_bigframes_variable_x",
1281+
columns="_bigframes_variable_y",
1282+
values="_bigframes_corr",
1283+
)
1284+
1285+
map_data = {
1286+
f"_bigframes_level_{i}": orig_columns.get_level_values(i)
1287+
for i in range(orig_columns.nlevels)
1288+
}
1289+
map_data["_bigframes_keys"] = range(len(orig_columns))
1290+
map_df = bigframes.dataframe.DataFrame(
1291+
map_data,
1292+
session=self._get_block().expr.session,
1293+
).set_index("_bigframes_keys")
1294+
result = result.join(map_df).sort_index()
1295+
index_columns = [f"_bigframes_level_{i}" for i in range(orig_columns.nlevels)]
1296+
result = result.set_index(index_columns)
1297+
result.index.names = orig_columns.names
1298+
result.columns = orig_columns
1299+
1300+
return result
12011301

12021302
def cov(self, *, numeric_only: bool = False) -> DataFrame:
12031303
if not numeric_only:
12041304
frame = self._raise_on_non_numeric("corr")
12051305
else:
12061306
frame = self._drop_non_numeric()
12071307

1208-
return DataFrame(frame._block.calculate_pairwise_metric(agg_ops.CovOp()))
1308+
orig_columns = frame.columns
1309+
# Replace column names with 0 to n - 1 to keep order
1310+
# and avoid the influence of duplicated column name
1311+
frame.columns = pandas.Index(range(len(orig_columns)))
1312+
frame = frame.astype(bigframes.dtypes.FLOAT_DTYPE)
1313+
block = frame._block
1314+
1315+
# A new column that uniquely identifies each row
1316+
block, ordering_col = frame._block.promote_offsets(label="_bigframes_idx")
1317+
1318+
val_col_ids = [
1319+
col_id for col_id in block.value_columns if col_id != ordering_col
1320+
]
1321+
1322+
block = block.melt(
1323+
[ordering_col], val_col_ids, ["_bigframes_variable"], "_bigframes_value"
1324+
)
1325+
block = block.merge(
1326+
block,
1327+
left_join_ids=[ordering_col],
1328+
right_join_ids=[ordering_col],
1329+
how="inner",
1330+
sort=False,
1331+
)
1332+
1333+
frame = DataFrame(block).dropna(
1334+
subset=["_bigframes_value_x", "_bigframes_value_y"]
1335+
)
1336+
1337+
paired_mean_frame = (
1338+
frame.groupby(["_bigframes_variable_x", "_bigframes_variable_y"])
1339+
.agg(
1340+
_bigframes_paired_mean_x=bigframes.pandas.NamedAgg(
1341+
column="_bigframes_value_x", aggfunc="mean"
1342+
),
1343+
_bigframes_paired_mean_y=bigframes.pandas.NamedAgg(
1344+
column="_bigframes_value_y", aggfunc="mean"
1345+
),
1346+
)
1347+
.reset_index()
1348+
)
1349+
1350+
frame = frame.merge(
1351+
paired_mean_frame, on=["_bigframes_variable_x", "_bigframes_variable_y"]
1352+
)
1353+
frame["_bigframes_value_x"] -= frame["_bigframes_paired_mean_x"]
1354+
frame["_bigframes_value_y"] -= frame["_bigframes_paired_mean_y"]
1355+
1356+
frame["_bigframes_dividend"] = (
1357+
frame["_bigframes_value_x"] * frame["_bigframes_value_y"]
1358+
)
1359+
1360+
result = (
1361+
frame.groupby(["_bigframes_variable_x", "_bigframes_variable_y"])
1362+
.agg(
1363+
_bigframes_dividend_sum=bigframes.pandas.NamedAgg(
1364+
column="_bigframes_dividend", aggfunc="sum"
1365+
),
1366+
_bigframes_dividend_count=bigframes.pandas.NamedAgg(
1367+
column="_bigframes_dividend", aggfunc="count"
1368+
),
1369+
)
1370+
.reset_index()
1371+
)
1372+
result["_bigframes_cov"] = result["_bigframes_dividend_sum"] / (
1373+
result["_bigframes_dividend_count"] - 1
1374+
)
1375+
result = result._pivot(
1376+
index="_bigframes_variable_x",
1377+
columns="_bigframes_variable_y",
1378+
values="_bigframes_cov",
1379+
)
1380+
1381+
map_data = {
1382+
f"_bigframes_level_{i}": orig_columns.get_level_values(i)
1383+
for i in range(orig_columns.nlevels)
1384+
}
1385+
map_data["_bigframes_keys"] = range(len(orig_columns))
1386+
map_df = bigframes.dataframe.DataFrame(
1387+
map_data,
1388+
session=self._get_block().expr.session,
1389+
).set_index("_bigframes_keys")
1390+
result = result.join(map_df).sort_index()
1391+
index_columns = [f"_bigframes_level_{i}" for i in range(orig_columns.nlevels)]
1392+
result = result.set_index(index_columns)
1393+
result.index.names = orig_columns.names
1394+
result.columns = orig_columns
1395+
1396+
return result
12091397

12101398
def to_arrow(
12111399
self,

tests/system/conftest.py

+22
Original file line numberDiff line numberDiff line change
@@ -611,6 +611,28 @@ def scalars_dfs_maybe_ordered(
611611
)
612612

613613

614+
@pytest.fixture(scope="session")
615+
def scalars_df_numeric_150_columns_maybe_ordered(
616+
maybe_ordered_session,
617+
scalars_pandas_df_index,
618+
):
619+
"""DataFrame pointing at test data."""
620+
# TODO(b/379911038): After the error fixed, add numeric type.
621+
pandas_df = scalars_pandas_df_index.reset_index(drop=False)[
622+
[
623+
"rowindex",
624+
"rowindex_2",
625+
"float64_col",
626+
"int64_col",
627+
"int64_too",
628+
]
629+
* 30
630+
]
631+
632+
df = maybe_ordered_session.read_pandas(pandas_df)
633+
return (df, pandas_df)
634+
635+
614636
@pytest.fixture(scope="session")
615637
def hockey_df(
616638
hockey_table_id: str, session: bigframes.Session

tests/system/large/test_dataframe.py

+42
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
import sys
2+
3+
import pandas as pd
4+
import pytest
5+
6+
7+
@pytest.mark.skipif(
8+
sys.version_info >= (3, 12),
9+
# See: https://github.com/python/cpython/issues/112282
10+
reason="setrecursionlimit has no effect on the Python C stack since Python 3.12.",
11+
)
12+
def test_corr_w_numeric_only(scalars_df_numeric_150_columns_maybe_ordered):
13+
scalars_df, scalars_pandas_df = scalars_df_numeric_150_columns_maybe_ordered
14+
bf_result = scalars_df.corr(numeric_only=True).to_pandas()
15+
pd_result = scalars_pandas_df.corr(numeric_only=True)
16+
17+
pd.testing.assert_frame_equal(
18+
bf_result,
19+
pd_result,
20+
check_dtype=False,
21+
check_index_type=False,
22+
check_column_type=False,
23+
)
24+
25+
26+
@pytest.mark.skipif(
27+
sys.version_info >= (3, 12),
28+
# See: https://github.com/python/cpython/issues/112282
29+
reason="setrecursionlimit has no effect on the Python C stack since Python 3.12.",
30+
)
31+
def test_cov_w_numeric_only(scalars_df_numeric_150_columns_maybe_ordered):
32+
scalars_df, scalars_pandas_df = scalars_df_numeric_150_columns_maybe_ordered
33+
bf_result = scalars_df.cov(numeric_only=True).to_pandas()
34+
pd_result = scalars_pandas_df.cov(numeric_only=True)
35+
36+
pd.testing.assert_frame_equal(
37+
bf_result,
38+
pd_result,
39+
check_dtype=False,
40+
check_index_type=False,
41+
check_column_type=False,
42+
)

tests/system/small/test_dataframe.py

+4-5
Original file line numberDiff line numberDiff line change
@@ -2075,8 +2075,8 @@ def test_combine_first(
20752075
),
20762076
],
20772077
)
2078-
def test_corr_w_numeric_only(scalars_dfs, columns, numeric_only):
2079-
scalars_df, scalars_pandas_df = scalars_dfs
2078+
def test_corr_w_numeric_only(scalars_dfs_maybe_ordered, columns, numeric_only):
2079+
scalars_df, scalars_pandas_df = scalars_dfs_maybe_ordered
20802080

20812081
bf_result = scalars_df[columns].corr(numeric_only=numeric_only).to_pandas()
20822082
pd_result = scalars_pandas_df[columns].corr(numeric_only=numeric_only)
@@ -2115,11 +2115,10 @@ def test_corr_w_invalid_parameters(scalars_dfs):
21152115
),
21162116
],
21172117
)
2118-
def test_cov_w_numeric_only(scalars_dfs, columns, numeric_only):
2119-
scalars_df, scalars_pandas_df = scalars_dfs
2118+
def test_cov_w_numeric_only(scalars_dfs_maybe_ordered, columns, numeric_only):
2119+
scalars_df, scalars_pandas_df = scalars_dfs_maybe_ordered
21202120
bf_result = scalars_df[columns].cov(numeric_only=numeric_only).to_pandas()
21212121
pd_result = scalars_pandas_df[columns].cov(numeric_only=numeric_only)
2122-
21232122
# BigFrames and Pandas differ in their data type handling:
21242123
# - Column types: BigFrames uses Float64, Pandas uses float64.
21252124
# - Index types: BigFrames uses strign, Pandas uses object.

tests/system/small/test_multiindex.py

+6-2
Original file line numberDiff line numberDiff line change
@@ -910,7 +910,9 @@ def test_column_multi_index_unstack(scalars_df_index, scalars_pandas_df_index):
910910

911911
def test_corr_w_multi_index(scalars_df_index, scalars_pandas_df_index):
912912
columns = ["int64_too", "float64_col", "int64_col"]
913-
multi_columns = pandas.MultiIndex.from_tuples(zip(["a", "b", "b"], [1, 2, 2]))
913+
multi_columns = pandas.MultiIndex.from_tuples(
914+
zip(["a", "b", "b"], [1, 2, 2]), names=[None, "level_2"]
915+
)
914916

915917
bf = scalars_df_index[columns].copy()
916918
bf.columns = multi_columns
@@ -931,7 +933,9 @@ def test_corr_w_multi_index(scalars_df_index, scalars_pandas_df_index):
931933

932934
def test_cov_w_multi_index(scalars_df_index, scalars_pandas_df_index):
933935
columns = ["int64_too", "float64_col", "int64_col"]
934-
multi_columns = pandas.MultiIndex.from_tuples(zip(["a", "b", "b"], [1, 2, 2]))
936+
multi_columns = pandas.MultiIndex.from_tuples(
937+
zip(["a", "b", "b"], [1, 2, 2]), names=["level_1", None]
938+
)
935939

936940
bf = scalars_df_index[columns].copy()
937941
bf.columns = multi_columns

0 commit comments

Comments
 (0)