Skip to content

Commit b47f32d

Browse files
milkshakeiiitswast
andauthored
feat: Add bigframes.streaming.to_pubsub method to create continuous query that writes to Pub/Sub (#801)
* Add to_pubsub streaming method * fix comment * fix comment further * remove accidental files * remove more accidental files * remove another accidental file * use service account * fix return value * pass session * have the user provide the service account * fix mypy error * address comments * update service account name * fix invalid character in label * rename service_account to service_account_email * move streaming tests to own tables --------- Co-authored-by: Tim Sweña (Swast) <[email protected]>
1 parent 87e6018 commit b47f32d

File tree

6 files changed

+213
-13
lines changed

6 files changed

+213
-13
lines changed

bigframes/streaming/__init__.py

+132-7
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
import json
1818
from typing import Optional
19+
import warnings
1920

2021
from google.cloud import bigquery
2122

@@ -24,9 +25,11 @@
2425

2526
def to_bigtable(
2627
query: str,
28+
*,
2729
instance: str,
2830
table: str,
29-
bq_client: Optional[bigquery.Client] = None,
31+
service_account_email: Optional[str] = None,
32+
session: Optional[bigframes.Session] = None,
3033
app_profile: Optional[str] = None,
3134
truncate: bool = False,
3235
overwrite: bool = False,
@@ -53,10 +56,15 @@ def to_bigtable(
5356
The name of the bigtable instance to export to.
5457
table (str):
5558
The name of the bigtable table to export to.
56-
bq_client (str, default None):
57-
The Client object to use for the query. This determines
59+
service_account_email (str):
60+
Full name of the service account to run the continuous query.
61+
62+
If not provided, the user account will be used, but this
63+
limits the lifetime of the continuous query.
64+
session (bigframes.Session, default None):
65+
The session object to use for the query. This determines
5866
the project id and location of the query. If None, will
59-
default to the bigframes global session default client.
67+
default to the bigframes global session.
6068
app_profile (str, default None):
6169
The bigtable app profile to export to. If None, no app
6270
profile will be used.
@@ -90,9 +98,16 @@ def to_bigtable(
9098
For example, the job can be cancelled or its error status
9199
can be examined.
92100
"""
101+
warnings.warn(
102+
"The bigframes.streaming module is a preview feature, and subject to change.",
103+
stacklevel=1,
104+
category=bigframes.exceptions.PreviewWarning,
105+
)
106+
93107
# get default client if not passed
94-
if bq_client is None:
95-
bq_client = bigframes.get_global_session().bqclient
108+
if session is None:
109+
session = bigframes.get_global_session()
110+
bq_client = session.bqclient
96111

97112
# build export string from parameters
98113
project = bq_client.project
@@ -123,7 +138,117 @@ def to_bigtable(
123138

124139
# override continuous http parameter
125140
job_config = bigquery.job.QueryJobConfig()
126-
job_config_filled = job_config.from_api_repr({"query": {"continuous": True}})
141+
142+
job_config_dict: dict = {"query": {"continuous": True}}
143+
if service_account_email is not None:
144+
job_config_dict["query"]["connectionProperties"] = {
145+
"key": "service_account",
146+
"value": service_account_email,
147+
}
148+
job_config_filled = job_config.from_api_repr(job_config_dict)
149+
job_config_filled.labels = {"bigframes-api": "streaming_to_bigtable"}
150+
151+
# begin the query job
152+
query_job = bq_client.query(
153+
sql,
154+
job_config=job_config_filled, # type:ignore
155+
# typing error above is in bq client library
156+
# (should accept abstract job_config, only takes concrete)
157+
job_id=job_id,
158+
job_id_prefix=job_id_prefix,
159+
)
160+
161+
# return the query job to the user for lifetime management
162+
return query_job
163+
164+
165+
def to_pubsub(
166+
query: str,
167+
*,
168+
topic: str,
169+
service_account_email: str,
170+
session: Optional[bigframes.Session] = None,
171+
job_id: Optional[str] = None,
172+
job_id_prefix: Optional[str] = None,
173+
) -> bigquery.QueryJob:
174+
"""Launches a BigQuery continuous query and returns a
175+
QueryJob object for some management functionality.
176+
177+
This method requires an existing pubsub topic. For instructions
178+
on creating a pubsub topic, see
179+
https://cloud.google.com/pubsub/docs/samples/pubsub-quickstart-create-topic?hl=en
180+
181+
Note that a service account is a requirement for continuous queries
182+
exporting to pubsub.
183+
184+
Args:
185+
query (str):
186+
The sql statement to execute as a continuous function.
187+
For example: "SELECT * FROM dataset.table"
188+
This will be wrapped in an EXPORT DATA statement to
189+
launch a continuous query writing to pubsub.
190+
topic (str):
191+
The name of the pubsub topic to export to.
192+
For example: "taxi-rides"
193+
service_account_email (str):
194+
Full name of the service account to run the continuous query.
195+
196+
session (bigframes.Session, default None):
197+
The session object to use for the query. This determines
198+
the project id and location of the query. If None, will
199+
default to the bigframes global session.
200+
job_id (str, default None):
201+
If specified, replace the default job id for the query,
202+
see job_id parameter of
203+
https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.client.Client#google_cloud_bigquery_client_Client_query
204+
job_id_prefix (str, default None):
205+
If specified, a job id prefix for the query, see
206+
job_id_prefix parameter of
207+
https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.client.Client#google_cloud_bigquery_client_Client_query
208+
209+
Returns:
210+
google.cloud.bigquery.QueryJob:
211+
See https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.job.QueryJob
212+
The ongoing query job can be managed using this object.
213+
For example, the job can be cancelled or its error status
214+
can be examined.
215+
"""
216+
warnings.warn(
217+
"The bigframes.streaming module is a preview feature, and subject to change.",
218+
stacklevel=1,
219+
category=bigframes.exceptions.PreviewWarning,
220+
)
221+
222+
# get default client if not passed
223+
if session is None:
224+
session = bigframes.get_global_session()
225+
bq_client = session.bqclient
226+
227+
# build export string from parameters
228+
sql = (
229+
"EXPORT DATA\n"
230+
"OPTIONS (\n"
231+
"format = 'CLOUD_PUBSUB',\n"
232+
f'uri = "https://pubsub.googleapis.com/projects/{bq_client.project}/topics/{topic}"\n'
233+
")\n"
234+
"AS (\n"
235+
f"{query});"
236+
)
237+
238+
# override continuous http parameter
239+
job_config = bigquery.job.QueryJobConfig()
240+
job_config_filled = job_config.from_api_repr(
241+
{
242+
"query": {
243+
"continuous": True,
244+
"connectionProperties": {
245+
"key": "service_account",
246+
"value": service_account_email,
247+
},
248+
}
249+
}
250+
)
251+
job_config_filled.labels = {"bigframes-api": "streaming_to_pubsub"}
127252

128253
# begin the query job
129254
query_job = bq_client.query(

scripts/create_bigtable.py

-3
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,10 @@
1616
# bigframes.streaming testing if they don't already exist
1717

1818
import os
19-
import pathlib
2019
import sys
2120

2221
import google.cloud.bigtable as bigtable
2322

24-
REPO_ROOT = pathlib.Path(__file__).parent.parent
25-
2623
PROJECT_ID = os.getenv("GOOGLE_CLOUD_PROJECT")
2724

2825
if not PROJECT_ID:

scripts/create_pubsub.py

+49
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
# Copyright 2024 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
# This script create the bigtable resources required for
16+
# bigframes.streaming testing if they don't already exist
17+
18+
import os
19+
import sys
20+
21+
from google.cloud import pubsub_v1
22+
23+
PROJECT_ID = os.getenv("GOOGLE_CLOUD_PROJECT")
24+
25+
if not PROJECT_ID:
26+
print(
27+
"Please set GOOGLE_CLOUD_PROJECT environment variable before running.",
28+
file=sys.stderr,
29+
)
30+
sys.exit(1)
31+
32+
33+
def create_topic(topic_id):
34+
# based on
35+
# https://cloud.google.com/pubsub/docs/samples/pubsub-quickstart-create-topic?hl=en
36+
37+
publisher = pubsub_v1.PublisherClient()
38+
topic_path = publisher.topic_path(PROJECT_ID, topic_id)
39+
40+
topic = publisher.create_topic(request={"name": topic_path})
41+
print(f"Created topic: {topic.name}")
42+
43+
44+
def main():
45+
create_topic("penguins")
46+
47+
48+
if __name__ == "__main__":
49+
main()

setup.py

+1
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
"geopandas >=0.12.2",
4141
"google-auth >=2.15.0,<3.0dev",
4242
"google-cloud-bigtable >=2.24.0",
43+
"google-cloud-pubsub >=2.21.4",
4344
"google-cloud-bigquery[bqstorage,pandas] >=3.16.0",
4445
"google-cloud-functions >=1.12.0",
4546
"google-cloud-bigquery-connection >=1.12.0",

testing/constraints-3.9.txt

+1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ gcsfs==2023.3.0
55
geopandas==0.12.2
66
google-auth==2.15.0
77
google-cloud-bigtable==2.24.0
8+
google-cloud-pubsub==2.21.4
89
google-cloud-bigquery==3.16.0
910
google-cloud-functions==1.12.0
1011
google-cloud-bigquery-connection==1.12.0

tests/system/large/test_streaming.py

+30-3
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,12 @@ def test_streaming_to_bigtable():
2222
job_id_prefix = "test_streaming_"
2323
sql = """SELECT
2424
body_mass_g, island as rowkey
25-
FROM birds.penguins"""
25+
FROM birds.penguins_bigtable_streaming"""
2626
query_job = bigframes.streaming.to_bigtable(
2727
sql,
28-
"streaming-testing-instance",
29-
"table-testing",
28+
instance="streaming-testing-instance",
29+
table="table-testing",
30+
service_account_email="[email protected]",
3031
app_profile=None,
3132
truncate=True,
3233
overwrite=True,
@@ -46,3 +47,29 @@ def test_streaming_to_bigtable():
4647
assert str(query_job.job_id).startswith(job_id_prefix)
4748
finally:
4849
query_job.cancel()
50+
51+
52+
def test_streaming_to_pubsub():
53+
# launch a continuous query
54+
job_id_prefix = "test_streaming_pubsub_"
55+
sql = """SELECT
56+
island
57+
FROM birds.penguins_pubsub_streaming"""
58+
query_job = bigframes.streaming.to_pubsub(
59+
sql,
60+
topic="penguins",
61+
service_account_email="[email protected]",
62+
job_id=None,
63+
job_id_prefix=job_id_prefix,
64+
)
65+
66+
try:
67+
# wait 100 seconds in order to ensure the query doesn't stop
68+
# (i.e. it is continuous)
69+
time.sleep(100)
70+
assert query_job.error_result is None
71+
assert query_job.errors is None
72+
assert query_job.running()
73+
assert str(query_job.job_id).startswith(job_id_prefix)
74+
finally:
75+
query_job.cancel()

0 commit comments

Comments
 (0)