Skip to content
This repository was archived by the owner on May 7, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
add cleanup by previous session id
  • Loading branch information
shobsi committed Jul 9, 2024
commit c713e468f6a61651741ce1e927e5799c52e9cca1
124 changes: 100 additions & 24 deletions bigframes/functions/remote_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
NamedTuple,
Optional,
Sequence,
Set,
Tuple,
TYPE_CHECKING,
Union,
Expand Down Expand Up @@ -69,6 +70,11 @@

logger = logging.getLogger(__name__)

# Naming convention for the remote function artifacts
_BIGFRAMES_REMOTE_FUNCTION_PREFIX = "bigframes"
_BQ_FUNCTION_NAME_SEPERATOR = "_"
_GCF_FUNCTION_NAME_SEPERATOR = "-"

# Protocol version 4 is available in python version 3.4 and above
# https://docs.python.org/3/library/pickle.html#data-stream-format
_pickle_protocol_version = 4
Expand All @@ -78,8 +84,8 @@
_session_artifacts_lock = threading.Lock()


def _update_session_artifacts(session_id: str, bqrf_routine: str, gcf_path: str):
"""Update remote function artifacts for a session id."""
def _update_alive_session_artifacts(session_id: str, bqrf_routine: str, gcf_path: str):
"""Update remote function artifacts for a session id in the current runtime."""
global _temp_session_artifacts, _session_artifacts_lock

with _session_artifacts_lock:
Comment thread
shobsi marked this conversation as resolved.
Outdated
Expand All @@ -88,12 +94,12 @@ def _update_session_artifacts(session_id: str, bqrf_routine: str, gcf_path: str)
_temp_session_artifacts[session_id][bqrf_routine] = gcf_path


def _clean_up_session_artifacts(
def _clean_up_alive_session(
bqclient: bigquery.Client,
gcfclient: functions_v2.FunctionServiceClient,
session_id: str,
):
"""Delete remote function artifacts for a session id."""
"""Delete remote function artifacts for a session id in the current runtime."""
global _temp_session_artifacts, _session_artifacts_lock

with _session_artifacts_lock:
Expand All @@ -113,6 +119,70 @@ def _clean_up_session_artifacts(
_temp_session_artifacts.pop(session_id)


def _clean_up_by_session_id(
bqclient: bigquery.Client,
gcfclient: functions_v2.FunctionServiceClient,
dataset: bigquery.DatasetReference,
session_id: str,
):
"""Delete remote function artifacts for a session id, where the session id
was not necessarily created in the current runtime. This is useful if the
user worked with a BigQuery DataFrames session previously and remembered the
session id, and now wants to clean up its temporary resources at a later
point in time.
"""

# First clean up the BQ remote functions and then the underlying
# cloud functions, so that at no point we are left with a remote function
# that is pointing to a cloud function that does not exist

endpoints_to_be_deleted: Set[str] = set()
match_prefix = "".join(
[
_BIGFRAMES_REMOTE_FUNCTION_PREFIX,
_BQ_FUNCTION_NAME_SEPERATOR,
session_id,
_BQ_FUNCTION_NAME_SEPERATOR,
]
)
for routine in bqclient.list_routines(dataset):
routine = cast(bigquery.Routine, routine)

# skip past the routines not belonging to the given session id, or
# non-remote-function routines
if (
routine.type_ != bigquery.RoutineType.SCALAR_FUNCTION
or not cast(str, routine.routine_id).startswith(match_prefix)
or not routine.remote_function_options
or not routine.remote_function_options.endpoint
):
continue

# Let's forgive the edge case possibility that the BQ remote function
# may have been deleted at the same time directly by the user
bqclient.delete_routine(routine, not_found_ok=True)
endpoints_to_be_deleted.add(routine.remote_function_options.endpoint)

# Now clean up the cloud functions
bq_location = bqclient.get_dataset(dataset).location
bq_location, gcf_location = get_remote_function_locations(bq_location)
parent_path = gcfclient.common_location_path(
project=dataset.project, location=gcf_location
)
for gcf in gcfclient.list_functions(parent=parent_path):
# skip past the cloud functions not attached to any BQ remote function
# belonging to the given session id
if gcf.service_config.uri not in endpoints_to_be_deleted:
continue

# Let's forgive the edge case possibility that the cloud function
# may have been deleted at the same time directly by the user
try:
gcfclient.delete_function(name=gcf.name)
except google.api_core.exceptions.NotFound:
pass


def get_remote_function_locations(bq_location):
"""Get BQ location and cloud functions region given a BQ client."""
# TODO(shobs, b/274647164): Find the best way to determine default location.
Expand Down Expand Up @@ -173,20 +243,20 @@ class IbisSignature(NamedTuple):
output_type: IbisDataType


def get_cloud_function_name(function_hash, uniq_suffix=None):
def get_cloud_function_name(function_hash, session_id, uniq_suffix=None):
"Get a name for the cloud function for the given user defined function."
cf_name = f"bigframes-{function_hash}" # for identification
parts = [_BIGFRAMES_REMOTE_FUNCTION_PREFIX, session_id, function_hash]
if uniq_suffix:
cf_name = f"{cf_name}-{uniq_suffix}"
return cf_name
parts.append(uniq_suffix)
return _GCF_FUNCTION_NAME_SEPERATOR.join(parts)


def get_remote_function_name(function_hash, uniq_suffix=None):
def get_remote_function_name(function_hash, session_id, uniq_suffix=None):
"Get a name for the BQ remote function for the given user defined function."
bq_rf_name = f"bigframes_{function_hash}" # for identification
parts = [_BIGFRAMES_REMOTE_FUNCTION_PREFIX, session_id, function_hash]
if uniq_suffix:
bq_rf_name = f"{bq_rf_name}_{uniq_suffix}"
return bq_rf_name
parts.append(uniq_suffix)
return _BQ_FUNCTION_NAME_SEPERATOR.join(parts)


class RemoteFunctionClient:
Expand Down Expand Up @@ -507,7 +577,7 @@ def _record_session_artifacts(
cloud_function_full_name = self.get_cloud_function_fully_qualified_name(
cloud_function_name
)
_update_session_artifacts(
_update_alive_session_artifacts(
self._session.session_id,
remote_function_full_name,
cloud_function_full_name,
Expand All @@ -529,15 +599,6 @@ def provision_bq_remote_function(
cloud_function_memory_mib,
):
"""Provision a BigQuery remote function."""
# If reuse of any existing function with the same name (indicated by the
# same hash of its source code) is not intended, then attach a unique
# suffix to the intended function name to make it unique.
uniq_suffix = None
if not reuse:
uniq_suffix = "".join(
random.choices(string.ascii_lowercase + string.digits, k=8)
)

# Augment user package requirements with any internal package
# requirements
package_requirements = _get_updated_package_requirements(
Expand All @@ -547,10 +608,23 @@ def provision_bq_remote_function(
# Compute a unique hash representing the user code
function_hash = _get_hash(def_, package_requirements)

# If reuse of any existing function with the same name (indicated by the
# same hash of its source code) is not intended, then attach a unique
# suffix to the intended function name to make it unique.
uniq_suffix = None
if not reuse:
# use 4 digits as a unique suffix which should suffice for
# uniqueness per session
uniq_suffix = "".join(
random.choices(string.ascii_lowercase + string.digits, k=4)
)

# Derive the name of the cloud function underlying the intended BQ
# remote function, also collect updated package requirements as
# determined in the name resolution
cloud_function_name = get_cloud_function_name(function_hash, uniq_suffix)
cloud_function_name = get_cloud_function_name(
function_hash, self._session.session_id, uniq_suffix
)
cf_endpoint = self.get_cloud_function_endpoint(cloud_function_name)

# Create the cloud function if it does not exist
Expand All @@ -573,7 +647,9 @@ def provision_bq_remote_function(
# Derive the name of the remote function
remote_function_name = name
if not remote_function_name:
remote_function_name = get_remote_function_name(function_hash, uniq_suffix)
remote_function_name = get_remote_function_name(
function_hash, self._session.session_id, uniq_suffix
)
rf_endpoint, rf_conn = self.get_remote_function_specs(remote_function_name)

# Create the BQ remote function in following circumstances:
Expand Down
10 changes: 7 additions & 3 deletions bigframes/pandas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import bigframes.core.tools
import bigframes.dataframe
import bigframes.enums
import bigframes.functions.remote_function as bigframes_rf
import bigframes.operations as ops
import bigframes.series
import bigframes.session
Expand Down Expand Up @@ -794,7 +795,6 @@ def clean_up_by_session_id(
None
"""
session = get_global_session()
client = session.bqclient

if (location is None) != (project is None):
raise ValueError(
Expand All @@ -804,14 +804,18 @@ def clean_up_by_session_id(
dataset = session._anonymous_dataset
else:
dataset = bigframes.session._io.bigquery.create_bq_dataset_reference(
client,
session.bqclient,
location=location,
project=project,
api_name="clean_up_by_session_id",
)

bigframes.session._io.bigquery.delete_tables_matching_session_id(
client, dataset, session_id
session.bqclient, dataset, session_id
)

bigframes_rf._clean_up_by_session_id(
session.bqclient, session.cloudfunctionsclient, dataset, session_id
)


Expand Down
2 changes: 1 addition & 1 deletion bigframes/session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ def _clean_up_tables(self):
def close(self):
"""Delete resources that were created with this session's session_id."""
self._clean_up_tables()
bigframes_rf._clean_up_session_artifacts(
bigframes_rf._clean_up_alive_session(
self.bqclient, self.cloudfunctionsclient, self.session_id
)

Expand Down
66 changes: 65 additions & 1 deletion tests/system/large/test_remote_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import bigframes
import bigframes.functions.remote_function as bigframes_rf
import bigframes.pandas as bpd
import bigframes.series
from tests.system.utils import (
assert_pandas_df_equal,
Expand Down Expand Up @@ -592,7 +593,9 @@ def add_one(x):
# Expected cloud function name for the unique udf
package_requirements = bigframes_rf._get_updated_package_requirements()
add_one_uniq_hash = bigframes_rf._get_hash(add_one_uniq, package_requirements)
add_one_uniq_cf_name = bigframes_rf.get_cloud_function_name(add_one_uniq_hash)
add_one_uniq_cf_name = bigframes_rf.get_cloud_function_name(
add_one_uniq_hash, session.session_id
)

# There should be no cloud function yet for the unique udf
cloud_functions = list(
Expand Down Expand Up @@ -1939,3 +1942,64 @@ def foo(x: int) -> int:
cleanup_remote_function_assets(
session.bqclient, session.cloudfunctionsclient, foo
)


def test_remote_function_clean_up_by_session_id():
# Use a brand new session to avoid conflict with other tests
session = bigframes.Session()
session_id = session.session_id
try:
# we will create remote functions, one with explicit name and another
# without it, and later confirm that the former is deleted when the session
# is cleaned up by session id, but the latter remains
## unnamed
@session.remote_function(reuse=False)
def foo_unnamed(x: int) -> int:
return x + 1

## named
rf_name = test_utils.prefixer.Prefixer("bigframes", "").create_prefix()

@session.remote_function(reuse=False, name=rf_name)
def foo_named(x: int) -> int:
return x + 2

# check that BQ remote functiosn were created with corresponding cloud
# functions
for foo in [foo_unnamed, foo_named]:
assert foo.bigframes_remote_function is not None
session.bqclient.get_routine(foo.bigframes_remote_function) is not None
assert foo.bigframes_cloud_function is not None
session.cloudfunctionsclient.get_function(
name=foo.bigframes_cloud_function
) is not None

# clean up using explicit session id
bpd.clean_up_by_session_id(
session_id, location=session._location, project=session._project
)

# ensure that the unnamed bq remote function is deleted along with its
# corresponding cloud function
with pytest.raises(google.cloud.exceptions.NotFound):
session.bqclient.get_routine(foo_unnamed.bigframes_remote_function)
try:
gcf = session.cloudfunctionsclient.get_function(
name=foo_unnamed.bigframes_cloud_function
)
assert gcf.state is functions_v2.Function.State.DELETING
except google.cloud.exceptions.NotFound:
pass

# ensure that the named bq remote function still exists along with its
# corresponding cloud function
session.bqclient.get_routine(foo_named.bigframes_remote_function) is not None
gcf = session.cloudfunctionsclient.get_function(
name=foo_named.bigframes_cloud_function
)
assert gcf.state is functions_v2.Function.State.ACTIVE
finally:
# clean up the gcp assets created for the remote function
cleanup_remote_function_assets(
session.bqclient, session.cloudfunctionsclient, foo_named
)
10 changes: 4 additions & 6 deletions tests/system/large/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import bigframes
import bigframes.pandas as bpd
import bigframes.session._io.bigquery


@pytest.mark.parametrize(
Expand Down Expand Up @@ -93,8 +94,7 @@ def test_clean_up_by_session_id():
session_id = session.session_id

# we will create two tables and confirm that they are deleted
# when the session is closed by id

# when the session is cleaned up by id
bqclient = session.bqclient
dataset = session._anonymous_dataset
expiration = (
Expand All @@ -110,9 +110,7 @@ def test_clean_up_by_session_id():
max_results=bigframes.session._io.bigquery._LIST_TABLES_LIMIT,
page_size=bigframes.session._io.bigquery._LIST_TABLES_LIMIT,
)
assert any(
[(session.session_id in table.full_table_id) for table in list(tables_before)]
)
assert any([(session.session_id in table.full_table_id) for table in tables_before])

bpd.clean_up_by_session_id(
session_id, location=session._location, project=session._project
Expand All @@ -125,5 +123,5 @@ def test_clean_up_by_session_id():
page_size=bigframes.session._io.bigquery._LIST_TABLES_LIMIT,
)
assert not any(
[(session.session_id in table.full_table_id) for table in list(tables_after)]
[(session.session_id in table.full_table_id) for table in tables_after]
)