Skip to content

Commit 32274b1

Browse files
perf: Reduce CURRENT_TIMESTAMP queries (#1114)
* perf: Reduce CURRENT_TIMESTAMP queries * fix unit tests * add unit tests with freezegun
1 parent 3759c63 commit 32274b1

File tree

6 files changed

+144
-17
lines changed

6 files changed

+144
-17
lines changed

bigframes/session/_io/bigquery/read_gbq_table.py

+2-16
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,8 @@
4545
def get_table_metadata(
4646
bqclient: bigquery.Client,
4747
table_ref: google.cloud.bigquery.table.TableReference,
48+
bq_time: datetime.datetime,
4849
*,
49-
api_name: str,
5050
cache: Dict[bigquery.TableReference, Tuple[datetime.datetime, bigquery.Table]],
5151
use_cache: bool = True,
5252
) -> Tuple[datetime.datetime, google.cloud.bigquery.table.Table]:
@@ -76,23 +76,9 @@ def get_table_metadata(
7676
)
7777
return cached_table
7878

79-
# TODO(swast): It's possible that the table metadata is changed between now
80-
# and when we run the CURRENT_TIMESTAMP() query to see when we can time
81-
# travel to. Find a way to fetch the table metadata and BQ's current time
82-
# atomically.
8379
table = bqclient.get_table(table_ref)
8480

85-
# TODO(swast): Use session._start_query instead?
86-
# TODO(swast): Use query_and_wait since we know these are small results.
87-
job_config = bigquery.QueryJobConfig()
88-
bigframes.session._io.bigquery.add_labels(job_config, api_name=api_name)
89-
snapshot_timestamp = list(
90-
bqclient.query(
91-
"SELECT CURRENT_TIMESTAMP() AS `current_timestamp`",
92-
job_config=job_config,
93-
).result()
94-
)[0][0]
95-
cached_table = (snapshot_timestamp, table)
81+
cached_table = (bq_time, table)
9682
cache[table_ref] = cached_table
9783
return cached_table
9884

bigframes/session/loader.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
import bigframes.session.metrics
6060
import bigframes.session.planner
6161
import bigframes.session.temp_storage
62+
import bigframes.session.time as session_time
6263
import bigframes.version
6364

6465
# Avoid circular imports.
@@ -128,6 +129,8 @@ def __init__(
128129
self._metrics = metrics
129130
# Unfortunate circular reference, but need to pass reference when constructing objects
130131
self._session = session
132+
self._clock = session_time.BigQuerySyncedClock(bqclient)
133+
self._clock.sync()
131134

132135
def read_pandas_load_job(
133136
self, pandas_dataframe: pandas.DataFrame, api_name: str
@@ -246,7 +249,7 @@ def read_gbq_table(
246249
time_travel_timestamp, table = bf_read_gbq_table.get_table_metadata(
247250
self._bqclient,
248251
table_ref=table_ref,
249-
api_name=api_name,
252+
bq_time=self._clock.get_time(),
250253
cache=self._df_snapshot,
251254
use_cache=use_cache,
252255
)

bigframes/session/time.py

+59
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
# Copyright 2024 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import datetime
16+
import threading
17+
import time
18+
from typing import cast, Optional
19+
20+
import google.cloud.bigquery as bigquery
21+
22+
MIN_RESYNC_SECONDS = 100
23+
24+
25+
class BigQuerySyncedClock:
26+
"""
27+
Local clock that attempts to synchronize its time with the bigquery service.
28+
"""
29+
30+
def __init__(self, bqclient: bigquery.Client):
31+
self._bqclient = bqclient
32+
self._sync_lock = threading.Lock()
33+
self._sync_remote_time: Optional[datetime.datetime] = None
34+
self._sync_monotonic_time: Optional[float] = None
35+
36+
def get_time(self):
37+
if (self._sync_monotonic_time is None) or (self._sync_remote_time is None):
38+
self.sync()
39+
assert self._sync_remote_time is not None
40+
assert self._sync_monotonic_time is not None
41+
return self._sync_remote_time + datetime.timedelta(
42+
seconds=time.monotonic() - self._sync_monotonic_time
43+
)
44+
45+
def sync(self):
46+
with self._sync_lock:
47+
if (self._sync_monotonic_time is not None) and (
48+
time.monotonic() - self._sync_monotonic_time
49+
) < MIN_RESYNC_SECONDS:
50+
return
51+
current_bq_time = list(
52+
next(
53+
self._bqclient.query_and_wait(
54+
"SELECT CURRENT_TIMESTAMP() AS `current_timestamp`",
55+
)
56+
)
57+
)[0]
58+
self._sync_remote_time = cast(datetime.datetime, current_bq_time)
59+
self._sync_monotonic_time = time.monotonic()

noxfile.py

+1
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
UNIT_TEST_STANDARD_DEPENDENCIES = [
5252
"mock",
5353
"asyncmock",
54+
"freezegun",
5455
PYTEST_VERSION,
5556
"pytest-cov",
5657
"pytest-asyncio",

tests/unit/resources.py

+9
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,16 @@ def query_mock(query, *args, **kwargs):
9191

9292
return query_job
9393

94+
existing_query_and_wait = bqclient.query_and_wait
95+
96+
def query_and_wait_mock(query, *args, **kwargs):
97+
if query.startswith("SELECT CURRENT_TIMESTAMP()"):
98+
return iter([[datetime.datetime.now()]])
99+
else:
100+
return existing_query_and_wait(query, *args, **kwargs)
101+
94102
bqclient.query = query_mock
103+
bqclient.query_and_wait = query_and_wait_mock
95104

96105
clients_provider = mock.create_autospec(bigframes.session.clients.ClientsProvider)
97106
type(clients_provider).bqclient = mock.PropertyMock(return_value=bqclient)

tests/unit/session/test_time.py

+69
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
# Copyright 2024 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import datetime
16+
import unittest.mock as mock
17+
18+
import freezegun
19+
import google.cloud.bigquery
20+
import pytest
21+
22+
import bigframes.session.time
23+
24+
INITIAL_BQ_TIME = datetime.datetime(
25+
year=2020,
26+
month=4,
27+
day=24,
28+
hour=8,
29+
minute=55,
30+
second=29,
31+
tzinfo=datetime.timezone.utc,
32+
)
33+
34+
35+
@pytest.fixture()
36+
def bq_client():
37+
bqclient = mock.create_autospec(google.cloud.bigquery.Client, instance=True)
38+
39+
def query_and_wait_mock(query, *args, **kwargs):
40+
if query.startswith("SELECT CURRENT_TIMESTAMP()"):
41+
return iter([[INITIAL_BQ_TIME]])
42+
else:
43+
return ValueError(f"mock cannot handle query : {query}")
44+
45+
bqclient.query_and_wait = query_and_wait_mock
46+
return bqclient
47+
48+
49+
def test_bqsyncedclock_get_time(bq_client):
50+
# this initial local time is actually irrelevant, only the ticks matter
51+
initial_local_datetime = datetime.datetime(
52+
year=1, month=7, day=12, hour=15, minute=6, second=3
53+
)
54+
55+
with freezegun.freeze_time(initial_local_datetime) as frozen_datetime:
56+
clock = bigframes.session.time.BigQuerySyncedClock(bq_client)
57+
58+
t1 = clock.get_time()
59+
assert t1 == INITIAL_BQ_TIME
60+
61+
frozen_datetime.tick(datetime.timedelta(seconds=3))
62+
t2 = clock.get_time()
63+
assert t2 == INITIAL_BQ_TIME + datetime.timedelta(seconds=3)
64+
65+
frozen_datetime.tick(datetime.timedelta(seconds=23529385))
66+
t3 = clock.get_time()
67+
assert t3 == INITIAL_BQ_TIME + datetime.timedelta(
68+
seconds=3
69+
) + datetime.timedelta(seconds=23529385)

0 commit comments

Comments
 (0)