Skip to content
This repository was archived by the owner on May 7, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
198 changes: 172 additions & 26 deletions bigframes/functions/remote_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,17 @@
import string
import sys
import tempfile
import threading
from typing import (
Any,
cast,
Dict,
List,
Mapping,
NamedTuple,
Optional,
Sequence,
Set,
Tuple,
TYPE_CHECKING,
Union,
Expand Down Expand Up @@ -67,10 +70,118 @@

logger = logging.getLogger(__name__)

# Naming convention for the remote function artifacts
_BIGFRAMES_REMOTE_FUNCTION_PREFIX = "bigframes"
_BQ_FUNCTION_NAME_SEPERATOR = "_"
_GCF_FUNCTION_NAME_SEPERATOR = "-"

# Protocol version 4 is available in python version 3.4 and above
# https://docs.python.org/3/library/pickle.html#data-stream-format
_pickle_protocol_version = 4

# Module level mapping of session-id to remote function artifacts
_temp_session_artifacts: Dict[str, Dict[str, str]] = {}
_session_artifacts_lock = threading.Lock()


def _update_alive_session_artifacts(session_id: str, bqrf_routine: str, gcf_path: str):
"""Update remote function artifacts for a session id in the current runtime."""
global _temp_session_artifacts, _session_artifacts_lock

with _session_artifacts_lock:
Comment thread
shobsi marked this conversation as resolved.
Outdated
if session_id not in _temp_session_artifacts:
_temp_session_artifacts[session_id] = {}
_temp_session_artifacts[session_id][bqrf_routine] = gcf_path


def _clean_up_alive_session(
bqclient: bigquery.Client,
gcfclient: functions_v2.FunctionServiceClient,
session_id: str,
):
"""Delete remote function artifacts for a session id in the current runtime."""
global _temp_session_artifacts, _session_artifacts_lock

with _session_artifacts_lock:
if session_id in _temp_session_artifacts:
for bqrf_routine, gcf_path in _temp_session_artifacts[session_id].items():
# Let's accept the possibility that the remote function may have
# been deleted directly by the user
bqclient.delete_routine(bqrf_routine, not_found_ok=True)

# Let's accept the possibility that the cloud function may have
# been deleted directly by the user
try:
gcfclient.delete_function(name=gcf_path)
except google.api_core.exceptions.NotFound:
pass

_temp_session_artifacts.pop(session_id)


def _clean_up_by_session_id(
bqclient: bigquery.Client,
gcfclient: functions_v2.FunctionServiceClient,
dataset: bigquery.DatasetReference,
session_id: str,
):
"""Delete remote function artifacts for a session id, where the session id
was not necessarily created in the current runtime. This is useful if the
user worked with a BigQuery DataFrames session previously and remembered the
session id, and now wants to clean up its temporary resources at a later
point in time.
"""

# First clean up the BQ remote functions and then the underlying
# cloud functions, so that at no point we are left with a remote function
# that is pointing to a cloud function that does not exist

endpoints_to_be_deleted: Set[str] = set()
match_prefix = "".join(
[
_BIGFRAMES_REMOTE_FUNCTION_PREFIX,
_BQ_FUNCTION_NAME_SEPERATOR,
session_id,
_BQ_FUNCTION_NAME_SEPERATOR,
]
)
for routine in bqclient.list_routines(dataset):
routine = cast(bigquery.Routine, routine)

# skip past the routines not belonging to the given session id, or
# non-remote-function routines
if (
routine.type_ != bigquery.RoutineType.SCALAR_FUNCTION
or not cast(str, routine.routine_id).startswith(match_prefix)
or not routine.remote_function_options
or not routine.remote_function_options.endpoint
):
continue

# Let's forgive the edge case possibility that the BQ remote function
# may have been deleted at the same time directly by the user
bqclient.delete_routine(routine, not_found_ok=True)
endpoints_to_be_deleted.add(routine.remote_function_options.endpoint)

# Now clean up the cloud functions
bq_location = bqclient.get_dataset(dataset).location
bq_location, gcf_location = get_remote_function_locations(bq_location)
parent_path = gcfclient.common_location_path(
project=dataset.project, location=gcf_location
)
for gcf in gcfclient.list_functions(parent=parent_path):
# skip past the cloud functions not attached to any BQ remote function
# belonging to the given session id
if gcf.service_config.uri not in endpoints_to_be_deleted:
continue

# Let's forgive the edge case possibility that the cloud function
# may have been deleted at the same time directly by the user
try:
gcfclient.delete_function(name=gcf.name)
except google.api_core.exceptions.NotFound:
pass


def get_remote_function_locations(bq_location):
"""Get BQ location and cloud functions region given a BQ client."""
Expand Down Expand Up @@ -102,7 +213,9 @@ def _get_hash(def_, package_requirements=None):
return hashlib.md5(def_repr).hexdigest()


def _get_updated_package_requirements(package_requirements, is_row_processor):
def _get_updated_package_requirements(
package_requirements=None, is_row_processor=False
):
requirements = [f"cloudpickle=={cloudpickle.__version__}"]
if is_row_processor:
# bigframes remote function will send an entire row of data as json,
Expand Down Expand Up @@ -130,31 +243,20 @@ class IbisSignature(NamedTuple):
output_type: IbisDataType


def get_cloud_function_name(
def_, uniq_suffix=None, package_requirements=None, is_row_processor=False
):
def get_cloud_function_name(function_hash, session_id, uniq_suffix=None):
"Get a name for the cloud function for the given user defined function."

# Augment user package requirements with any internal package
# requirements
package_requirements = _get_updated_package_requirements(
package_requirements, is_row_processor
)

cf_name = _get_hash(def_, package_requirements)
cf_name = f"bigframes-{cf_name}" # for identification
parts = [_BIGFRAMES_REMOTE_FUNCTION_PREFIX, session_id, function_hash]
if uniq_suffix:
cf_name = f"{cf_name}-{uniq_suffix}"
return cf_name, package_requirements
parts.append(uniq_suffix)
return _GCF_FUNCTION_NAME_SEPERATOR.join(parts)


def get_remote_function_name(def_, uniq_suffix=None, package_requirements=None):
def get_remote_function_name(function_hash, session_id, uniq_suffix=None):
"Get a name for the BQ remote function for the given user defined function."
bq_rf_name = _get_hash(def_, package_requirements)
bq_rf_name = f"bigframes_{bq_rf_name}" # for identification
parts = [_BIGFRAMES_REMOTE_FUNCTION_PREFIX, session_id, function_hash]
if uniq_suffix:
bq_rf_name = f"{bq_rf_name}_{uniq_suffix}"
return bq_rf_name
parts.append(uniq_suffix)
return _BQ_FUNCTION_NAME_SEPERATOR.join(parts)


class RemoteFunctionClient:
Expand Down Expand Up @@ -272,6 +374,10 @@ def get_cloud_function_fully_qualified_name(self, name):
self._gcp_project_id, self._cloud_function_region, name
)

def get_remote_function_fully_qualilfied_name(self, name):
"Get the fully qualilfied name for a BQ remote function."
return f"{self._gcp_project_id}.{self._bq_dataset}.{name}"

def get_cloud_function_endpoint(self, name):
"""Get the http endpoint of a cloud function if it exists."""
fully_qualified_name = self.get_cloud_function_fully_qualified_name(name)
Expand Down Expand Up @@ -462,6 +568,21 @@ def create_cloud_function(
)
return endpoint

def _record_session_artifacts(
self, remote_function_name: str, cloud_function_name: str
):
remote_function_full_name = self.get_remote_function_fully_qualilfied_name(
remote_function_name
)
cloud_function_full_name = self.get_cloud_function_fully_qualified_name(
cloud_function_name
)
_update_alive_session_artifacts(
self._session.session_id,
remote_function_full_name,
cloud_function_full_name,
)

def provision_bq_remote_function(
self,
def_,
Expand All @@ -478,20 +599,31 @@ def provision_bq_remote_function(
cloud_function_memory_mib,
):
"""Provision a BigQuery remote function."""
# Augment user package requirements with any internal package
# requirements
package_requirements = _get_updated_package_requirements(
package_requirements, is_row_processor
)

# Compute a unique hash representing the user code
function_hash = _get_hash(def_, package_requirements)

# If reuse of any existing function with the same name (indicated by the
# same hash of its source code) is not intended, then attach a unique
# suffix to the intended function name to make it unique.
uniq_suffix = None
if not reuse:
# use 4 digits as a unique suffix which should suffice for
# uniqueness per session
uniq_suffix = "".join(
random.choices(string.ascii_lowercase + string.digits, k=8)
random.choices(string.ascii_lowercase + string.digits, k=4)
)

# Derive the name of the cloud function underlying the intended BQ
# remote function, also collect updated package requirements as
# determined in the name resolution
cloud_function_name, package_requirements = get_cloud_function_name(
def_, uniq_suffix, package_requirements, is_row_processor
cloud_function_name = get_cloud_function_name(
function_hash, self._session.session_id, uniq_suffix
)
cf_endpoint = self.get_cloud_function_endpoint(cloud_function_name)

Expand All @@ -516,7 +648,7 @@ def provision_bq_remote_function(
remote_function_name = name
if not remote_function_name:
remote_function_name = get_remote_function_name(
def_, uniq_suffix, package_requirements
function_hash, self._session.session_id, uniq_suffix
)
rf_endpoint, rf_conn = self.get_remote_function_specs(remote_function_name)

Expand All @@ -540,6 +672,18 @@ def provision_bq_remote_function(
remote_function_name,
max_batching_rows,
)

# Update module level mapping of session id to the cloud artifacts
# created. This would be used to clean up any resources for a
# session. Note that we need to do this only for the case where an
# explicit name was not provided by the user and we used an internal
# name. For the cases where the user provided an explicit name, we
# are assuming that the user wants to persist them with that name
# and would directly manage their lifecycle.
if not name:
self._record_session_artifacts(
remote_function_name, cloud_function_name
)
else:
logger.info(f"Remote function {remote_function_name} already exists.")

Expand Down Expand Up @@ -926,7 +1070,7 @@ def remote_function(
" For more details see https://cloud.google.com/functions/docs/securing/cmek#before_you_begin"
)

bq_connection_manager = None if session is None else session.bqconnectionmanager
bq_connection_manager = session.bqconnectionmanager

def wrapper(func):
nonlocal input_types, output_type
Expand Down Expand Up @@ -1054,7 +1198,9 @@ def try_delattr(attr):
func.bigframes_cloud_function = (
remote_function_client.get_cloud_function_fully_qualified_name(cf_name)
)
func.bigframes_remote_function = str(dataset_ref.routine(rf_name)) # type: ignore
func.bigframes_remote_function = (
remote_function_client.get_remote_function_fully_qualilfied_name(rf_name)
)

func.output_dtype = (
bigframes.core.compile.ibis_types.ibis_dtype_to_bigframes_dtype(
Expand Down
10 changes: 7 additions & 3 deletions bigframes/pandas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import bigframes.core.tools
import bigframes.dataframe
import bigframes.enums
import bigframes.functions.remote_function as bigframes_rf
import bigframes.operations as ops
import bigframes.series
import bigframes.session
Expand Down Expand Up @@ -794,7 +795,6 @@ def clean_up_by_session_id(
None
"""
session = get_global_session()
client = session.bqclient

if (location is None) != (project is None):
raise ValueError(
Expand All @@ -804,14 +804,18 @@ def clean_up_by_session_id(
dataset = session._anonymous_dataset
else:
dataset = bigframes.session._io.bigquery.create_bq_dataset_reference(
client,
session.bqclient,
location=location,
project=project,
api_name="clean_up_by_session_id",
)

bigframes.session._io.bigquery.delete_tables_matching_session_id(
client, dataset, session_id
session.bqclient, dataset, session_id
)

bigframes_rf._clean_up_by_session_id(
session.bqclient, session.cloudfunctionsclient, dataset, session_id
)


Expand Down
16 changes: 11 additions & 5 deletions bigframes/session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,7 @@
import bigframes.dtypes
import bigframes.exceptions
import bigframes.formatting_helpers as formatting_helpers
from bigframes.functions.remote_function import read_gbq_function as bigframes_rgf
from bigframes.functions.remote_function import remote_function as bigframes_rf
import bigframes.functions.remote_function as bigframes_rf
import bigframes.session._io.bigquery as bf_io_bigquery
import bigframes.session._io.bigquery.read_gbq_table as bf_read_gbq_table
import bigframes.session.clients
Expand Down Expand Up @@ -383,7 +382,7 @@ def __hash__(self):
# Stable hash needed to use in expression tree
return hash(str(self._anonymous_dataset))

def close(self):
def _clean_up_tables(self):
"""Delete tables that were created with this session's session_id."""
client = self.bqclient
project_id = self._anonymous_dataset.project
Expand All @@ -393,6 +392,13 @@ def close(self):
full_id = ".".join([project_id, dataset_id, table_id])
client.delete_table(full_id, not_found_ok=True)

def close(self):
"""Delete resources that were created with this session's session_id."""
self._clean_up_tables()
bigframes_rf._clean_up_alive_session(
self.bqclient, self.cloudfunctionsclient, self.session_id
)

def read_gbq(
self,
query_or_table: str,
Expand Down Expand Up @@ -1689,7 +1695,7 @@ def remote_function(

`bigframes_remote_function` - The bigquery remote function capable of calling into `bigframes_cloud_function`.
"""
return bigframes_rf(
return bigframes_rf.remote_function(
input_types,
output_type,
session=self,
Expand Down Expand Up @@ -1769,7 +1775,7 @@ def read_gbq_function(
not including the `bigframes_cloud_function` property.
"""

return bigframes_rgf(
return bigframes_rf.read_gbq_function(
function_name=function_name,
session=self,
)
Expand Down
Loading