Skip to content

Commit f91756a

Browse files
fix: Fix erroneous window bounds removal during compilation (#1163)
1 parent c71ec09 commit f91756a

File tree

9 files changed

+84
-815
lines changed

9 files changed

+84
-815
lines changed

bigframes/core/block_transforms.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -86,9 +86,10 @@ def indicate_duplicates(
8686
# Discard this value if there are copies ANYWHERE
8787
window_spec = windows.unbound(grouping_keys=tuple(columns))
8888
block, dummy = block.create_constant(1)
89+
# use row number as will work even with partial ordering
8990
block, val_count_col_id = block.apply_window_op(
9091
dummy,
91-
agg_ops.count_op,
92+
agg_ops.sum_op,
9293
window_spec=window_spec,
9394
)
9495
block, duplicate_indicator = block.project_expr(

bigframes/core/compile/aggregate_compiler.py

+9
Original file line numberDiff line numberDiff line change
@@ -479,6 +479,15 @@ def _(
479479
return _apply_window_if_present(column.dense_rank(), window) + 1
480480

481481

482+
@compile_unary_agg.register
483+
def _(
484+
op: agg_ops.RowNumberOp,
485+
column: ibis_types.Column,
486+
window=None,
487+
) -> ibis_types.IntegerValue:
488+
return _apply_window_if_present(ibis_api.row_number(), window)
489+
490+
482491
@compile_unary_agg.register
483492
def _(op: agg_ops.FirstOp, column: ibis_types.Column, window=None) -> ibis_types.Value:
484493
return _apply_window_if_present(column.first(), window)

bigframes/core/compile/compiled.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -1330,7 +1330,7 @@ def _ibis_window_from_spec(
13301330
if require_total_order or isinstance(window_spec.bounds, RowsWindowBounds):
13311331
# Some operators need an unambiguous ordering, so the table's total ordering is appended
13321332
order_by = tuple([*order_by, *self._ibis_order])
1333-
elif isinstance(window_spec.bounds, RowsWindowBounds):
1333+
elif require_total_order or isinstance(window_spec.bounds, RowsWindowBounds):
13341334
# If window spec has following or preceding bounds, we need to apply an unambiguous ordering.
13351335
order_by = tuple(self._ibis_order)
13361336
else:

bigframes/operations/aggregations.py

+13
Original file line numberDiff line numberDiff line change
@@ -379,6 +379,19 @@ def skips_nulls(self):
379379
return True
380380

381381

382+
# This should really by a NullaryWindowOp, but APIs don't support that yet.
383+
@dataclasses.dataclass(frozen=True)
384+
class RowNumberOp(UnaryWindowOp):
385+
name: ClassVar[str] = "rownumber"
386+
387+
@property
388+
def skips_nulls(self):
389+
return False
390+
391+
def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionType:
392+
return dtypes.INT_DTYPE
393+
394+
382395
@dataclasses.dataclass(frozen=True)
383396
class RankOp(UnaryWindowOp):
384397
name: ClassVar[str] = "rank"

tests/system/small/test_unordered.py

+21
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,27 @@ def test_unordered_merge(unordered_session):
152152
assert_pandas_df_equal(bf_result.to_pandas(), pd_result, ignore_order=True)
153153

154154

155+
def test_unordered_drop_duplicates_ambiguous(unordered_session):
156+
pd_df = pd.DataFrame(
157+
{"a": [1, 1, 1], "b": [4, 4, 6], "c": [1, 1, 3]}, dtype=pd.Int64Dtype()
158+
)
159+
bf_df = bpd.DataFrame(pd_df, session=unordered_session)
160+
161+
# merge first to discard original ordering
162+
bf_result = (
163+
bf_df.merge(bf_df, left_on="a", right_on="c")
164+
.sort_values("c_y")
165+
.drop_duplicates()
166+
)
167+
pd_result = (
168+
pd_df.merge(pd_df, left_on="a", right_on="c")
169+
.sort_values("c_y")
170+
.drop_duplicates()
171+
)
172+
173+
assert_pandas_df_equal(bf_result.to_pandas(), pd_result, ignore_order=True)
174+
175+
155176
def test_unordered_mode_cache_preserves_order(unordered_session):
156177
pd_df = pd.DataFrame(
157178
{"a": [1, 2, 3, 4, 5, 6], "b": [4, 5, 9, 3, 1, 6]}, dtype=pd.Int64Dtype()

0 commit comments

Comments
 (0)