Skip to content

Commit 9005c6e

Browse files
fix: Extend row hash to 128 bits to guarantee unique row id (#632)
* fix: Extend row hash to 128 bits to guarantee unique row id * decide hash size based on row count * fix read_gbq tests * handle unknown row_num
1 parent dfeaad0 commit 9005c6e

File tree

3 files changed

+62
-19
lines changed

3 files changed

+62
-19
lines changed

bigframes/session/__init__.py

+35-15
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import google.api_core.gapic_v1.client_info
5555
import google.auth.credentials
5656
import google.cloud.bigquery as bigquery
57+
import google.cloud.bigquery.table
5758
import google.cloud.bigquery_connection_v1
5859
import google.cloud.bigquery_storage_v1
5960
import google.cloud.functions_v2
@@ -693,7 +694,7 @@ def read_gbq_table(
693694

694695
def _get_snapshot_sql_and_primary_key(
695696
self,
696-
table_ref: bigquery.table.TableReference,
697+
table: google.cloud.bigquery.table.Table,
697698
*,
698699
api_name: str,
699700
use_cache: bool = True,
@@ -709,7 +710,7 @@ def _get_snapshot_sql_and_primary_key(
709710
table,
710711
) = bigframes_io.get_snapshot_datetime_and_table_metadata(
711712
self.bqclient,
712-
table_ref=table_ref,
713+
table_ref=table.reference,
713714
api_name=api_name,
714715
cache=self._df_snapshot,
715716
use_cache=use_cache,
@@ -735,7 +736,7 @@ def _get_snapshot_sql_and_primary_key(
735736

736737
try:
737738
table_expression = self.ibis_client.sql(
738-
bigframes_io.create_snapshot_sql(table_ref, snapshot_timestamp)
739+
bigframes_io.create_snapshot_sql(table.reference, snapshot_timestamp)
739740
)
740741
except google.api_core.exceptions.Forbidden as ex:
741742
if "Drive credentials" in ex.message:
@@ -763,8 +764,9 @@ def _read_gbq_table(
763764
query, default_project=self.bqclient.project
764765
)
765766

767+
table = self.bqclient.get_table(table_ref)
766768
(table_expression, primary_keys,) = self._get_snapshot_sql_and_primary_key(
767-
table_ref, api_name=api_name, use_cache=use_cache
769+
table, api_name=api_name, use_cache=use_cache
768770
)
769771
total_ordering_cols = primary_keys
770772

@@ -836,9 +838,13 @@ def _read_gbq_table(
836838
ordering=ordering,
837839
)
838840
else:
839-
array_value = self._create_total_ordering(table_expression)
841+
array_value = self._create_total_ordering(
842+
table_expression, table_rows=table.num_rows
843+
)
840844
else:
841-
array_value = self._create_total_ordering(table_expression)
845+
array_value = self._create_total_ordering(
846+
table_expression, table_rows=table.num_rows
847+
)
842848

843849
value_columns = [col for col in array_value.column_ids if col not in index_cols]
844850
block = blocks.Block(
@@ -1459,10 +1465,19 @@ def _create_empty_temp_table(
14591465
def _create_total_ordering(
14601466
self,
14611467
table: ibis_types.Table,
1468+
table_rows: Optional[int],
14621469
) -> core.ArrayValue:
14631470
# Since this might also be used as the index, don't use the default
14641471
# "ordering ID" name.
1472+
1473+
# For small tables, 64 bits is enough to avoid collisions, 128 bits will never ever collide no matter what
1474+
# Assume table is large if table row count is unknown
1475+
use_double_hash = (
1476+
(table_rows is None) or (table_rows == 0) or (table_rows > 100000)
1477+
)
1478+
14651479
ordering_hash_part = guid.generate_guid("bigframes_ordering_")
1480+
ordering_hash_part2 = guid.generate_guid("bigframes_ordering_")
14661481
ordering_rand_part = guid.generate_guid("bigframes_ordering_")
14671482

14681483
# All inputs into hash must be non-null or resulting hash will be null
@@ -1475,25 +1490,30 @@ def _create_total_ordering(
14751490
else str_values[0]
14761491
)
14771492
full_row_hash = full_row_str.hash().name(ordering_hash_part)
1493+
# By modifying value slightly, we get another hash uncorrelated with the first
1494+
full_row_hash_p2 = (full_row_str + "_").hash().name(ordering_hash_part2)
14781495
# Used to disambiguate between identical rows (which will have identical hash)
14791496
random_value = ibis.random().name(ordering_rand_part)
14801497

1498+
order_values = (
1499+
[full_row_hash, full_row_hash_p2, random_value]
1500+
if use_double_hash
1501+
else [full_row_hash, random_value]
1502+
)
1503+
14811504
original_column_ids = table.columns
14821505
table_with_ordering = table.select(
1483-
itertools.chain(original_column_ids, [full_row_hash, random_value])
1506+
itertools.chain(original_column_ids, order_values)
14841507
)
14851508

1486-
ordering_ref1 = order.ascending_over(ordering_hash_part)
1487-
ordering_ref2 = order.ascending_over(ordering_rand_part)
14881509
ordering = order.ExpressionOrdering(
1489-
ordering_value_columns=(ordering_ref1, ordering_ref2),
1490-
total_ordering_columns=frozenset([ordering_hash_part, ordering_rand_part]),
1510+
ordering_value_columns=tuple(
1511+
order.ascending_over(col.get_name()) for col in order_values
1512+
),
1513+
total_ordering_columns=frozenset(col.get_name() for col in order_values),
14911514
)
14921515
columns = [table_with_ordering[col] for col in original_column_ids]
1493-
hidden_columns = [
1494-
table_with_ordering[ordering_hash_part],
1495-
table_with_ordering[ordering_rand_part],
1496-
]
1516+
hidden_columns = [table_with_ordering[col.get_name()] for col in order_values]
14971517
return core.ArrayValue.from_ibis(
14981518
self,
14991519
table_with_ordering,

tests/unit/resources.py

+10
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,12 @@ def create_bigquery_session(
4444
google.auth.credentials.Credentials, instance=True
4545
)
4646

47+
if anonymous_dataset is None:
48+
anonymous_dataset = google.cloud.bigquery.DatasetReference(
49+
"test-project",
50+
"test_dataset",
51+
)
52+
4753
if bqclient is None:
4854
bqclient = mock.create_autospec(google.cloud.bigquery.Client, instance=True)
4955
bqclient.project = "test-project"
@@ -53,6 +59,10 @@ def create_bigquery_session(
5359
table._properties = {}
5460
type(table).location = mock.PropertyMock(return_value="test-region")
5561
type(table).schema = mock.PropertyMock(return_value=table_schema)
62+
type(table).reference = mock.PropertyMock(
63+
return_value=anonymous_dataset.table("test_table")
64+
)
65+
type(table).num_rows = mock.PropertyMock(return_value=1000000000)
5666
bqclient.get_table.return_value = table
5767

5868
if anonymous_dataset is None:

tests/unit/session/test_session.py

+17-4
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,16 @@ def test_read_gbq_cached_table():
4949
table,
5050
)
5151

52+
def get_table_mock(table_ref):
53+
table = google.cloud.bigquery.Table(
54+
table_ref, (google.cloud.bigquery.SchemaField("col", "INTEGER"),)
55+
)
56+
table._properties["numRows"] = "1000000000"
57+
table._properties["location"] = session._location
58+
return table
59+
60+
session.bqclient.get_table = get_table_mock
61+
5262
with pytest.warns(UserWarning, match=re.escape("use_cache=False")):
5363
df = session.read_gbq("my-project.my_dataset.my_table")
5464

@@ -137,10 +147,13 @@ def query_mock(query, *args, **kwargs):
137147

138148
session.bqclient.query = query_mock
139149

140-
def get_table_mock(dataset_ref):
141-
dataset = google.cloud.bigquery.Dataset(dataset_ref)
142-
dataset.location = session._location
143-
return dataset
150+
def get_table_mock(table_ref):
151+
table = google.cloud.bigquery.Table(
152+
table_ref, (google.cloud.bigquery.SchemaField("col", "INTEGER"),)
153+
)
154+
table._properties["numRows"] = 1000000000
155+
table._properties["location"] = session._location
156+
return table
144157

145158
session.bqclient.get_table = get_table_mock
146159

0 commit comments

Comments
 (0)