Skip to content

Commit e0e2d83

Browse files
docs(samples): Update Topic with Kinesis Ingestion Settings (#1123)
Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
1 parent 83dc9ff commit e0e2d83

File tree

2 files changed

+113
-2
lines changed

2 files changed

+113
-2
lines changed

samples/snippets/publisher.py

Lines changed: 67 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,6 @@ def create_topic_kinesis_ingestion(
6969
gcp_service_account: str,
7070
) -> None:
7171
"""Create a new Pub/Sub topic with AWS Kinesis Ingestion Settings."""
72-
# [START pubsub_quickstart_create_topic_kinesis_ingestion]
7372
# [START pubsub_create_topic_kinesis_ingestion]
7473
from google.cloud import pubsub_v1
7574
from google.pubsub_v1.types import Topic
@@ -101,10 +100,58 @@ def create_topic_kinesis_ingestion(
101100
topic = publisher.create_topic(request=request)
102101

103102
print(f"Created topic: {topic.name} with AWS Kinesis Ingestion Settings")
104-
# [END pubsub_quickstart_create_topic_kinesis_ingestion]
105103
# [END pubsub_create_topic_kinesis_ingestion]
106104

107105

106+
def update_topic_kinesis_ingestion(
107+
project_id: str,
108+
topic_id: str,
109+
stream_arn: str,
110+
consumer_arn: str,
111+
aws_role_arn: str,
112+
gcp_service_account: str,
113+
) -> None:
114+
"""Update Pub/Sub topic with AWS Kinesis Ingestion Settings."""
115+
# [START pubsub_update_topic_kinesis_ingestion]
116+
from google.cloud import pubsub_v1
117+
from google.pubsub_v1.types import Topic
118+
from google.pubsub_v1.types import IngestionDataSourceSettings
119+
from google.pubsub_v1.types import UpdateTopicRequest
120+
from google.protobuf import field_mask_pb2
121+
122+
# TODO(developer)
123+
# project_id = "your-project-id"
124+
# topic_id = "your-topic-id"
125+
# stream_arn = "your-stream-arn"
126+
# consumer_arn = "your-consumer-arn"
127+
# aws_role_arn = "your-aws-role-arn"
128+
# gcp_service_account = "your-gcp-service-account"
129+
130+
publisher = pubsub_v1.PublisherClient()
131+
topic_path = publisher.topic_path(project_id, topic_id)
132+
133+
update_request = UpdateTopicRequest(
134+
topic=Topic(
135+
name=topic_path,
136+
ingestion_data_source_settings=IngestionDataSourceSettings(
137+
aws_kinesis=IngestionDataSourceSettings.AwsKinesis(
138+
stream_arn=stream_arn,
139+
consumer_arn=consumer_arn,
140+
aws_role_arn=aws_role_arn,
141+
gcp_service_account=gcp_service_account,
142+
)
143+
),
144+
),
145+
update_mask=field_mask_pb2.FieldMask(paths=["ingestion_data_source_settings"]),
146+
)
147+
148+
topic = publisher.update_topic(request=update_request)
149+
print(f"Updated topic: {topic.name} with AWS Kinesis Ingestion Settings")
150+
151+
152+
# [END pubsub_update_topic_kinesis_ingestion]
153+
154+
108155
def delete_topic(project_id: str, topic_id: str) -> None:
109156
"""Deletes an existing Pub/Sub topic."""
110157
# [START pubsub_delete_topic]
@@ -484,6 +531,15 @@ def detach_subscription(project_id: str, subscription_id: str) -> None:
484531
create_topic_kinesis_ingestion_parser.add_argument("aws_role_arn")
485532
create_topic_kinesis_ingestion_parser.add_argument("gcp_service_account")
486533

534+
update_topic_kinesis_ingestion_parser = subparsers.add_parser(
535+
"update_kinesis_ingestion", help=update_topic_kinesis_ingestion.__doc__
536+
)
537+
update_topic_kinesis_ingestion_parser.add_argument("topic_id")
538+
update_topic_kinesis_ingestion_parser.add_argument("stream_arn")
539+
update_topic_kinesis_ingestion_parser.add_argument("consumer_arn")
540+
update_topic_kinesis_ingestion_parser.add_argument("aws_role_arn")
541+
update_topic_kinesis_ingestion_parser.add_argument("gcp_service_account")
542+
487543
delete_parser = subparsers.add_parser("delete", help=delete_topic.__doc__)
488544
delete_parser.add_argument("topic_id")
489545

@@ -553,6 +609,15 @@ def detach_subscription(project_id: str, subscription_id: str) -> None:
553609
args.aws_role_arn,
554610
args.gcp_service_account,
555611
)
612+
elif args.command == "update_kinesis_ingestion":
613+
update_topic_kinesis_ingestion(
614+
args.project_id,
615+
args.topic_id,
616+
args.stream_arn,
617+
args.consumer_arn,
618+
args.aws_role_arn,
619+
args.gcp_service_account,
620+
)
556621
elif args.command == "delete":
557622
delete_topic(args.project_id, args.topic_id)
558623
elif args.command == "publish":

samples/snippets/publisher_test.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,9 @@ def test_create(
123123
out, _ = capsys.readouterr()
124124
assert f"Created topic: {topic_path}" in out
125125

126+
# Clean up resource created for the test.
127+
publisher_client.delete_topic(request={"topic": topic_path})
128+
126129

127130
def test_create_kinesis_ingestion(
128131
publisher_client: pubsub_v1.PublisherClient, capsys: CaptureFixture[str]
@@ -155,6 +158,49 @@ def test_create_kinesis_ingestion(
155158
out, _ = capsys.readouterr()
156159
assert f"Created topic: {topic_path} with AWS Kinesis Ingestion Settings" in out
157160

161+
# Clean up resource created for the test.
162+
publisher_client.delete_topic(request={"topic": topic_path})
163+
164+
165+
def test_update_kinesis_ingestion(
166+
publisher_client: pubsub_v1.PublisherClient, capsys: CaptureFixture[str]
167+
) -> None:
168+
# The scope of `topic_path` is limited to this function.
169+
topic_path = publisher_client.topic_path(PROJECT_ID, TOPIC_ID)
170+
171+
# Outside of automated CI tests, these values must be of actual AWS resources for the test to pass.
172+
stream_arn = "arn:aws:kinesis:us-west-2:111111111111:stream/fake-stream-name"
173+
consumer_arn = "arn:aws:kinesis:us-west-2:111111111111:stream/fake-stream-name/consumer/consumer-1:1111111111"
174+
aws_role_arn = "arn:aws:iam::111111111111:role/fake-role-name"
175+
gcp_service_account = (
176+
177+
)
178+
179+
try:
180+
publisher_client.delete_topic(request={"topic": topic_path})
181+
except NotFound:
182+
pass
183+
184+
publisher.create_topic(PROJECT_ID, TOPIC_ID)
185+
186+
out, _ = capsys.readouterr()
187+
assert f"Created topic: {topic_path}" in out
188+
189+
publisher.update_topic_kinesis_ingestion(
190+
PROJECT_ID,
191+
TOPIC_ID,
192+
stream_arn,
193+
consumer_arn,
194+
aws_role_arn,
195+
gcp_service_account,
196+
)
197+
198+
out, _ = capsys.readouterr()
199+
assert f"Updated topic: {topic_path} with AWS Kinesis Ingestion Settings" in out
200+
201+
# Clean up resource created for the test.
202+
publisher_client.delete_topic(request={"topic": topic_path})
203+
158204

159205
def test_list(topic_path: str, capsys: CaptureFixture[str]) -> None:
160206
publisher.list_topics(PROJECT_ID)

0 commit comments

Comments
 (0)