Skip to content

Commit 72e9583

Browse files
authored
feat: add slot_millis and add stats to session object (#725)
* feat: add slot_millis and add stats to session object * fix none handling * fix indent * make incrementor internal * update noxfile * update comment * add comment
1 parent d850da6 commit 72e9583

File tree

3 files changed

+107
-32
lines changed

3 files changed

+107
-32
lines changed

bigframes/session/__init__.py

+24-2
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,10 @@ def __init__(
289289
nodes.BigFrameNode, nodes.BigFrameNode
290290
] = weakref.WeakKeyDictionary()
291291

292+
# performance logging
293+
self._bytes_processed_sum = 0
294+
self._slot_millis_sum = 0
295+
292296
@property
293297
def bqclient(self):
294298
return self._clients_provider.bqclient
@@ -338,6 +342,24 @@ def objects(
338342
def _project(self):
339343
return self.bqclient.project
340344

345+
@property
346+
def bytes_processed_sum(self):
347+
"""The sum of all bytes processed by bigquery jobs using this session."""
348+
return self._bytes_processed_sum
349+
350+
@property
351+
def slot_millis_sum(self):
352+
"""The sum of all slot time used by bigquery jobs in this session."""
353+
return self._slot_millis_sum
354+
355+
def _add_bytes_processed(self, amount: int):
356+
"""Increment bytes_processed_sum by amount."""
357+
self._bytes_processed_sum += amount
358+
359+
def _add_slot_millis(self, amount: int):
360+
"""Increment slot_millis_sum by amount."""
361+
self._slot_millis_sum += amount
362+
341363
def __hash__(self):
342364
# Stable hash needed to use in expression tree
343365
return hash(str(self._anonymous_dataset))
@@ -1825,7 +1847,7 @@ def _start_query(
18251847
"""
18261848
job_config = self._prepare_query_job_config(job_config)
18271849
return bigframes.session._io.bigquery.start_query_with_client(
1828-
self.bqclient,
1850+
self,
18291851
sql,
18301852
job_config,
18311853
max_results,
@@ -1849,7 +1871,7 @@ def _start_query_ml_ddl(
18491871
job_config.destination_encryption_configuration = None
18501872

18511873
return bigframes.session._io.bigquery.start_query_with_client(
1852-
self.bqclient, sql, job_config
1874+
self, sql, job_config
18531875
)
18541876

18551877
def _cache_with_cluster_cols(

bigframes/session/_io/bigquery/__init__.py

+40-11
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,7 @@ def add_labels(job_config, api_name: Optional[str] = None):
219219

220220

221221
def start_query_with_client(
222-
bq_client: bigquery.Client,
222+
session: bigframes.session.Session,
223223
sql: str,
224224
job_config: bigquery.job.QueryJobConfig,
225225
max_results: Optional[int] = None,
@@ -229,6 +229,7 @@ def start_query_with_client(
229229
"""
230230
Starts query job and waits for results.
231231
"""
232+
bq_client: bigquery.Client = session.bqclient
232233
add_labels(job_config, api_name=api_name)
233234

234235
try:
@@ -246,14 +247,41 @@ def start_query_with_client(
246247
else:
247248
results_iterator = query_job.result(max_results=max_results)
248249

249-
if LOGGING_NAME_ENV_VAR in os.environ:
250-
# when running notebooks via pytest nbmake
251-
pytest_log_job(query_job)
250+
stats = get_performance_stats(query_job)
251+
if stats is not None:
252+
bytes_processed, slot_millis = stats
253+
session._add_bytes_processed(bytes_processed)
254+
session._add_slot_millis(slot_millis)
255+
if LOGGING_NAME_ENV_VAR in os.environ:
256+
# when running notebooks via pytest nbmake
257+
write_stats_to_disk(bytes_processed, slot_millis)
252258

253259
return results_iterator, query_job
254260

255261

256-
def pytest_log_job(query_job: bigquery.QueryJob):
262+
def get_performance_stats(query_job: bigquery.QueryJob) -> Optional[Tuple[int, int]]:
263+
"""Parse the query job for performance stats.
264+
265+
Return None if the stats do not reflect real work done in bigquery.
266+
"""
267+
bytes_processed = query_job.total_bytes_processed
268+
if not isinstance(bytes_processed, int):
269+
return None # filter out mocks
270+
if query_job.configuration.dry_run:
271+
# dry run stats are just predictions of the real run
272+
bytes_processed = 0
273+
274+
slot_millis = query_job.slot_millis
275+
if not isinstance(slot_millis, int):
276+
return None # filter out mocks
277+
if query_job.configuration.dry_run:
278+
# dry run stats are just predictions of the real run
279+
slot_millis = 0
280+
281+
return bytes_processed, slot_millis
282+
283+
284+
def write_stats_to_disk(bytes_processed: int, slot_millis: int):
257285
"""For pytest runs only, log information about the query job
258286
to a file in order to create a performance report.
259287
"""
@@ -265,16 +293,17 @@ def pytest_log_job(query_job: bigquery.QueryJob):
265293
)
266294
test_name = os.environ[LOGGING_NAME_ENV_VAR]
267295
current_directory = os.getcwd()
268-
bytes_processed = query_job.total_bytes_processed
269-
if not isinstance(bytes_processed, int):
270-
return # filter out mocks
271-
if query_job.configuration.dry_run:
272-
# dry runs don't process their total_bytes_processed
273-
bytes_processed = 0
296+
297+
# store bytes processed
274298
bytes_file = os.path.join(current_directory, test_name + ".bytesprocessed")
275299
with open(bytes_file, "a") as f:
276300
f.write(str(bytes_processed) + "\n")
277301

302+
# store slot milliseconds
303+
bytes_file = os.path.join(current_directory, test_name + ".slotmillis")
304+
with open(bytes_file, "a") as f:
305+
f.write(str(slot_millis) + "\n")
306+
278307

279308
def delete_tables_matching_session_id(
280309
client: bigquery.Client, dataset: bigquery.DatasetReference, session_id: str

noxfile.py

+43-19
Original file line numberDiff line numberDiff line change
@@ -800,33 +800,57 @@ def notebook(session: nox.Session):
800800
for process in processes:
801801
process.join()
802802

803-
# when run via pytest, notebooks output a .bytesprocessed report
803+
# when the environment variable is set as it is above,
804+
# notebooks output a .bytesprocessed and .slotmillis report
804805
# collect those reports and print a summary
805-
_print_bytes_processed_report()
806+
_print_performance_report()
806807

807808

808-
def _print_bytes_processed_report():
809-
"""Add an informational report about http queries and bytes
810-
processed to the testlog output for purposes of measuring
811-
bigquery-related performance changes.
809+
def _print_performance_report():
810+
"""Add an informational report about http queries, bytes
811+
processed, and slot time to the testlog output for purposes
812+
of measuring bigquery-related performance changes.
812813
"""
813814
print("---BIGQUERY USAGE REPORT---")
814-
cumulative_queries = 0
815-
cumulative_bytes = 0
816-
for report in Path("notebooks/").glob("*/*.bytesprocessed"):
817-
with open(report, "r") as f:
818-
filename = report.stem
819-
lines = f.read().splitlines()
815+
results_dict = {}
816+
for bytes_report in Path("notebooks/").glob("*/*.bytesprocessed"):
817+
with open(bytes_report, "r") as bytes_file:
818+
filename = bytes_report.stem
819+
lines = bytes_file.read().splitlines()
820820
query_count = len(lines)
821821
total_bytes = sum([int(line) for line in lines])
822-
format_string = f"{filename} - query count: {query_count}, bytes processed sum: {total_bytes}"
823-
print(format_string)
824-
cumulative_bytes += total_bytes
825-
cumulative_queries += query_count
826-
print(
827-
"---total queries: {total_queries}, total bytes: {total_bytes}---".format(
828-
total_queries=cumulative_queries, total_bytes=cumulative_bytes
822+
results_dict[filename] = [query_count, total_bytes]
823+
for millis_report in Path("notebooks/").glob("*/*.slotmillis"):
824+
with open(millis_report, "r") as millis_file:
825+
filename = millis_report.stem
826+
lines = millis_file.read().splitlines()
827+
total_slot_millis = sum([int(line) for line in lines])
828+
results_dict[filename] += [total_slot_millis]
829+
830+
cumulative_queries = 0
831+
cumulative_bytes = 0
832+
cumulative_slot_millis = 0
833+
for results in results_dict.values():
834+
if len(results) != 3:
835+
raise IOError(
836+
"Mismatch in performance logging output. "
837+
"Expected one .bytesprocessed and one .slotmillis "
838+
"file for each notebook."
839+
)
840+
query_count, total_bytes, total_slot_millis = results
841+
cumulative_queries += query_count
842+
cumulative_bytes += total_bytes
843+
cumulative_slot_millis += total_slot_millis
844+
print(
845+
f"{filename} - query count: {query_count},"
846+
f" bytes processed sum: {total_bytes},"
847+
f" slot millis sum: {total_slot_millis}"
829848
)
849+
850+
print(
851+
f"---total queries: {cumulative_queries}, "
852+
f"total bytes: {cumulative_bytes}, "
853+
f"total slot millis: {cumulative_slot_millis}---"
830854
)
831855

832856

0 commit comments

Comments
 (0)