1616
1717import bigframes
1818from bigframes .core import identifiers , local_data , nodes
19- from bigframes .session import polars_executor , semi_executor
19+ from bigframes .session import polars_executor
20+ from tests .system .small .engines .engine_utils import assert_equivalence_execution
2021
2122pytest .importorskip ("polars" )
2223
2324# Polars used as reference as its fast and local. Generally though, prefer gbq engine where they disagree.
2425REFERENCE_ENGINE = polars_executor .PolarsExecutor ()
2526
2627
27- def ensure_equivalence (
28- node : nodes .BigFrameNode ,
29- engine1 : semi_executor .SemiExecutor ,
30- engine2 : semi_executor .SemiExecutor ,
31- ):
32- e1_result = engine1 .execute (node , ordered = True )
33- e2_result = engine2 .execute (node , ordered = True )
34- assert e1_result is not None
35- assert e2_result is not None
36- # Schemas might have extra nullity markers, normalize to node expected schema, which should be looser
37- e1_table = e1_result .to_arrow_table ().cast (node .schema .to_pyarrow ())
38- e2_table = e2_result .to_arrow_table ().cast (node .schema .to_pyarrow ())
39- assert e1_table .equals (e2_table ), f"{ e1_table } is not equal to { e2_table } "
40-
41-
4228def test_engines_read_local (
4329 fake_session : bigframes .Session ,
4430 managed_data_source : local_data .ManagedArrowTable ,
@@ -51,7 +37,7 @@ def test_engines_read_local(
5137 local_node = nodes .ReadLocalNode (
5238 managed_data_source , scan_list , fake_session , offsets_col = None
5339 )
54- ensure_equivalence (local_node , REFERENCE_ENGINE , engine )
40+ assert_equivalence_execution (local_node , REFERENCE_ENGINE , engine )
5541
5642
5743def test_engines_read_local_w_offsets (
@@ -69,7 +55,7 @@ def test_engines_read_local_w_offsets(
6955 fake_session ,
7056 offsets_col = identifiers .ColumnId ("offsets" ),
7157 )
72- ensure_equivalence (local_node , REFERENCE_ENGINE , engine )
58+ assert_equivalence_execution (local_node , REFERENCE_ENGINE , engine )
7359
7460
7561def test_engines_read_local_w_col_subset (
@@ -84,7 +70,7 @@ def test_engines_read_local_w_col_subset(
8470 local_node = nodes .ReadLocalNode (
8571 managed_data_source , scan_list , fake_session , offsets_col = None
8672 )
87- ensure_equivalence (local_node , REFERENCE_ENGINE , engine )
73+ assert_equivalence_execution (local_node , REFERENCE_ENGINE , engine )
8874
8975
9076def test_engines_read_local_w_zero_row_source (
@@ -99,7 +85,7 @@ def test_engines_read_local_w_zero_row_source(
9985 local_node = nodes .ReadLocalNode (
10086 zero_row_source , scan_list , fake_session , offsets_col = None
10187 )
102- ensure_equivalence (local_node , REFERENCE_ENGINE , engine )
88+ assert_equivalence_execution (local_node , REFERENCE_ENGINE , engine )
10389
10490
10591def test_engines_read_local_w_nested_source (
@@ -114,7 +100,7 @@ def test_engines_read_local_w_nested_source(
114100 local_node = nodes .ReadLocalNode (
115101 nested_data_source , scan_list , fake_session , offsets_col = None
116102 )
117- ensure_equivalence (local_node , REFERENCE_ENGINE , engine )
103+ assert_equivalence_execution (local_node , REFERENCE_ENGINE , engine )
118104
119105
120106def test_engines_read_local_w_repeated_source (
@@ -129,4 +115,4 @@ def test_engines_read_local_w_repeated_source(
129115 local_node = nodes .ReadLocalNode (
130116 repeated_data_source , scan_list , fake_session , offsets_col = None
131117 )
132- ensure_equivalence (local_node , REFERENCE_ENGINE , engine )
118+ assert_equivalence_execution (local_node , REFERENCE_ENGINE , engine )
0 commit comments