建立 Amazon Managed Streaming for Apache Kafka 匯入主題

您可以使用 Amazon Managed Streaming for Apache Kafka (Amazon MSK) 匯入主題,持續從 Amazon MSK 擷取資料,並將資料匯入 Pub/Sub。接著,您可以將資料串流至 Pub/Sub 支援的任何目的地。

本文將說明如何建立及管理 Amazon MSK 匯入主題。如要建立標準主題,請參閱「建立標準主題」一文。

如要進一步瞭解匯入主題,請參閱「關於匯入主題」。

事前準備

必要角色和權限

如要取得建立及管理 Amazon MSK 匯入主題所需的權限,請要求管理員為您授予主題或專案的 Pub/Sub 編輯者 (roles/pubsub.editor) IAM 角色。如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和機構的存取權」。

這個預先定義的角色包含建立及管理 Amazon MSK 匯入主題所需的權限。如要查看確切的必要權限,請展開「必要權限」部分:

所需權限

您必須具備下列權限,才能建立及管理 Amazon MSK 匯入主題:

  • 建立匯入主題: pubsub.topics.create
  • 刪除匯入主題: pubsub.topics.delete
  • 取得匯入主題: pubsub.topics.get
  • 列出匯入主題: pubsub.topics.list
  • 發布至匯入主題: pubsub.topics.publish
  • 更新匯入主題: pubsub.topics.update
  • 取得匯入主題的身分與存取權管理政策: pubsub.topics.getIamPolicy
  • 為匯入主題設定 IAM 政策 pubsub.topics.setIamPolicy

您或許還可透過自訂角色或其他預先定義的角色取得這些權限。

您可以在專案層級和個別資源層級設定存取權控管。

設定聯合身分,以便存取 Amazon MSK

Workload Identity 聯盟可讓 Google Cloud 服務存取在 Google Cloud以外執行的工作負載。有了身分聯盟,您就不必維護或傳遞憑證,就能 Google Cloud 存取其他雲端中的資源。您可以改用工作負載本身的識別資訊,驗證 Google Cloud 並存取資源。

在 Google Cloud中建立服務帳戶

這是選用步驟。如果您已有服務帳戶,可以在這項程序中使用該帳戶,而不需要建立新的服務帳戶。如果您使用現有的服務帳戶,請參閱記錄服務帳戶的唯一 ID,瞭解下一個步驟。

對於 Amazon MSK 匯入主題,Pub/Sub 會使用服務帳戶做為身分,以便存取 AWS 的資源。

如要進一步瞭解如何建立服務帳戶,包括必要條件、必要角色和權限,以及命名規範,請參閱「建立服務帳戶」一文。建立服務帳戶後,您可能需要等待 60 秒或更長的時間,才能使用服務帳戶。這種行為的發生,是因為讀取作業最終會一致;新服務帳戶可能需要一段時間才會顯示。

記下服務帳戶專屬 ID

您需要服務帳戶專屬 ID,才能在 AWS 控制台中設定角色。

  1. 前往 Google Cloud 控制台的「Service account」詳細資料頁面。

    前往服務帳戶

  2. 按一下您剛建立的服務帳戶,或您打算使用的服務帳戶。

  3. 在「服務帳戶詳細資料」頁面中記下專屬 ID 編號。

    您需要這組 ID 才能在工作流程中在 AWS 控制台設定角色

為 Pub/Sub 服務帳戶新增服務帳戶權杖建立者角色

服務帳戶權杖建立者角色 (roles/iam.serviceAccountTokenCreator) 可讓實體為服務帳戶建立短期憑證。這些權杖或憑證用於模擬服務帳戶。

如要進一步瞭解服務帳戶模擬功能,請參閱「服務帳戶模擬功能」。

您也可以在這個程序中新增 Pub/Sub 發布者角色 (roles/pubsub.publisher)。如要進一步瞭解這個角色以及為何要新增這個角色,請參閱「將 Pub/Sub 發布者角色新增至 Pub/Sub 服務帳戶」一文。

  1. 前往 Google Cloud 控制台的「IAM」頁面。

    前往身分與存取權管理頁面

  2. 按一下「包含 Google提供的角色授予項目」核取方塊。

  3. 找出格式為 service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com 的服務帳戶。

  4. 針對這個服務帳戶,按一下「Edit Principal」(編輯主體) 按鈕。

  5. 如有需要,請按一下「新增其他角色」

  6. 搜尋並點選「服務帳戶憑證建立者角色」圖示 (roles/iam.serviceAccountTokenCreator)。

  7. 按一下 [儲存]

在 AWS 中建立政策

您需要在 AWS 中建立政策,讓 Pub/Sub 驗證 AWS,以便 Pub/Sub 擷取 Amazon MSK 的資料。

  • 如要進一步瞭解如何在 AWS 中建立政策,請參閱「建立 IAM 政策」一文。

如要在 AWS 中建立政策,請執行下列步驟:

  1. 登入 AWS 管理主控台並開啟 IAM 主控台

  2. IAM 控制台的導覽窗格中,依序點選「Access Management」(存取權管理) >「Policies」(政策)

  3. 按一下「建立政策」

  4. 在「點選服務」部分,按一下「MSK」

  5. 如要設定允許的動作,請依序點選「讀取」 >「GetBootstrapBrokers」

    這項動作會授予權限,可取得 Pub/Sub 用於連線至 MSK 叢集的 Bootstrap 中介軟體。

  6. 按一下「新增更多權限」

  7. 在「選取服務」部分,按一下「適用於 MSK 的 Apache Kafka API」

  8. 在「允許的動作」中選取下列選項:

    • 清單 > DescribeTopic

      這項動作會授予權限,允許 Pub/Sub 攝入主題取得 Amazon MSK Kafka 主題的詳細資料。

    • Read > ReadData

      這項操作會授予讀取 Amazon MSK Kafka 主題資料的權限。

    • 「Write」 >「Connect」

      這項動作會授予連線至 Amazon MSK Kafka 叢集並驗證的權限。

  9. 針對「資源」,請指定叢集 ARN (如果您想將政策限制在特定叢集,這是建議做法)。

  10. 按一下「新增更多權限」

  11. 在「選取服務」部分,按一下「STS」

  12. 針對「允許的動作」,依序點選「寫入」 >「AssumeRoleWithWebIdentity」AssumeRoleWithWebIdentity

    這項操作會授予權限,讓 Pub/Sub 透過身分聯盟取得一組臨時安全性憑證,以便向 Amazon MSK 進行驗證。

  13. 點按「Next」

  14. 輸入政策名稱和說明。

  15. 按一下「建立政策」

使用自訂信任政策在 AWS 中建立角色

您必須在 AWS 中建立角色,讓 Pub/Sub 能夠驗證 AWS,以便擷取 Amazon MSK 中的資料。

  1. 登入 AWS 管理主控台並開啟 IAM 主控台

  2. IAM 控制台的導覽窗格中,按一下「角色」

  3. 按一下「建立角色」

  4. 在「選取信任的實體」中,按一下「自訂信任政策」

  5. 在「自訂信任政策」部分輸入或貼上下列內容:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
         "Effect": "Allow",
         "Principal": {
            "Federated": "accounts.google.com"
         },
         "Action": "sts:AssumeRoleWithWebIdentity",
         "Condition": {
             "StringEquals": {
               "accounts.google.com:sub": "<SERVICE_ACCOUNT_UNIQUE_ID>"
             }
          }
        }
      ]
    }
    

    <SERVICE_ACCOUNT_UNIQUE_ID> 替換為您在「記錄服務帳戶專屬 ID」一節中記錄的服務帳戶專屬 ID。

  6. 點按「Next」

  7. 針對「新增權限」,搜尋並按一下剛建立的自訂政策。

  8. 點按「Next」

  9. 輸入角色名稱和說明。

  10. 按一下「建立角色」

將 Pub/Sub 發布者角色新增至 Pub/Sub 授權對象

如要啟用發布功能,您必須為 Pub/Sub 服務帳戶指派發布者角色,讓 Pub/Sub 能夠發布至 Amazon MSK 匯入主題。

啟用從所有主題發布內容的權限

如果您尚未建立任何 Amazon MSK 匯入主題,請使用這個方法。

  1. 前往 Google Cloud 控制台的「IAM」頁面。

    前往身分與存取權管理頁面

  2. 按一下「包含 Google提供的角色授予項目」核取方塊。

  3. 找出格式為 service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com 的服務帳戶。

  4. 針對這個服務帳戶,按一下「Edit Principal」(編輯主體) 按鈕。

  5. 如有需要,請按一下「新增其他角色」

  6. 搜尋並點選「Pub/Sub 發布者角色」 (roles/pubsub.publisher)。

  7. 按一下 [儲存]

啟用單一主題的發布功能

只有在 Amazon MSK 匯入主題已存在時,才使用這個方法。

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 執行 gcloud pubsub topics add-iam-policy-binding 指令:

    gcloud pubsub topics add-iam-policy-binding TOPIC_ID \
       --member="serviceAccount:service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com" \
       --role="roles/pubsub.publisher"

    更改下列內容:

    • TOPIC_ID:Amazon MSK 匯入主題的主題 ID。

    • PROJECT_NUMBER:專案編號。如要查看專案編號,請參閱「識別專案」。

將服務帳戶使用者角色新增至服務帳戶

「服務帳戶使用者」角色 (roles/iam.serviceAccountUser) 包含 iam.serviceAccounts.actAs 權限,可讓授權實體將服務帳戶連結至 Amazon MSK 匯入主題的擷取設定,並使用該服務帳戶建立聯合式身分。

  1. 前往 Google Cloud 控制台的「IAM」頁面。

    前往身分與存取權管理頁面

  2. 針對發出建立或更新主題呼叫的主體,按一下「Edit Principal」按鈕。

  3. 如有需要,請按一下「新增其他角色」

  4. 搜尋並點選「服務帳戶使用者角色」(roles/iam.serviceAccountUser)。

  5. 按一下 [儲存]

使用 Amazon MSK 匯入主題

您可以建立新的匯入主題,或編輯現有主題。

注意事項

  • 即使快速依序建立主題和訂閱項目,也可能導致資料遺失。在訂閱前,您可以短暫地查看該主題。如果在這個時間內有任何資料傳送至主題,就會遺失。請先建立主題、建立訂閱項目,然後再將主題轉換為匯入主題,這樣就能確保在匯入程序中不會遺漏任何訊息。

  • 如果您需要重新建立現有匯入主題的 Kafka 主題,且兩者名稱相同,則不可以只刪除 Kafka 主題並重新建立。這項操作可能會使 Pub/Sub 的偏移管理無效,進而導致資料遺失。如要減輕這種情況,請按照下列步驟操作:

    • 刪除 Pub/Sub 匯入主題。
    • 刪除 Kafka 主題。
    • 建立 Kafka 主題。
    • 建立 Pub/Sub 匯入主題。
  • 系統一律會從最早的偏移值讀取 Amazon MSK Kafka 主題的資料。

建立 Amazon MSK 匯入主題

如要進一步瞭解與主題相關聯的屬性,請參閱「主題的屬性」。

請確認你已完成下列程序:

如要建立 Amazon MSK 匯入主題,請按照下列步驟操作:

控制台

  1. 前往 Google Cloud 控制台的「Topics」頁面。

    前往「主題」

  2. 按一下「建立主題」

  3. 在「主題 ID」欄位中,輸入 Amazon MSK 匯入主題的 ID。如要進一步瞭解主題命名方式,請參閱命名規範

  4. 選取「Add a default subscription」

  5. 選取「啟用擷取」

  6. 在擷取來源中,選取「Amazon MSK」

  7. 輸入下列詳細資訊:

    • 叢集 ARN:您要擷取至 Pub/Sub 的 Amazon MSK 的 ARN。ARN 格式如下:arn:aws:kafka:${Region}:${Account}:cluster/${ClusterName}/${ClusterId}

    • 主題:您要擷取至 Pub/Sub 的 Amazon MSK Kafka 主題名稱。

    • AWS 角色 ARN:AWS 角色的 ARN。角色的 ARN 格式如下:arn:aws:iam::${Account}:role/${RoleName}

    • 服務帳戶:您在 Google Cloud中建立服務帳戶時所建立的服務帳戶。

  8. 按一下「建立主題」

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 執行 gcloud pubsub topics create 指令:

    gcloud pubsub topics create TOPIC_ID \
         --aws-msk-ingestion-cluster-arn MSK_CLUSTER_ARN \
         --aws-msk-ingestion-topic MSK_TOPIC \
         --aws-msk-ingestion-aws-role-arn MSK_ROLE_ARN \
         --aws-msk-ingestion-service-account PUBSUB_SERVICE_ACCOUNT

    更改下列內容:

    • TOPIC_ID:Pub/Sub 主題的名稱或 ID。
    • MSK_CLUSTER_ARN:您要擷取至 Pub/Sub 的 Amazon MSK 叢集 ARN。ARN 格式如下:arn:aws:kafka:${Region}:${Account}:cluster/${ClusterName}/${ClusterId}
    • MSK_TOPIC:您要擷取至 Pub/Sub 的 Amazon MSK Kafka 主題名稱。
    • MSK_ROLE_ARN:AWS 角色的 ARN。角色的 ARN 格式如下:arn:aws:iam::${Account}:role/${RoleName}
    • PUBSUB_SERVICE_ACCOUNT:您在「在 Google Cloud 中建立服務帳戶」中建立的服務帳戶。

Go

在嘗試這個範例之前,請先按照 Pub/Sub 快速入門:使用用戶端程式庫中的操作說明設定 Go。詳情請參閱 Pub/Sub Go API 參考說明文件

如要向 Pub/Sub 進行驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定驗證機制」。

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

func createTopicWithAWSMSKIngestion(w io.Writer, projectID, topicID, clusterARN, mskTopic, awsRoleARN, gcpSA string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"

	// // AWS MSK ingestion settings.
	// clusterARN := "cluster-arn"
	// mskTopic := "msk-topic"
	// awsRoleARN := "aws-role-arn"
	// gcpSA := "gcp-service-account"

	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	cfg := &pubsub.TopicConfig{
		IngestionDataSourceSettings: &pubsub.IngestionDataSourceSettings{
			Source: &pubsub.IngestionDataSourceAmazonMSK{
				ClusterARN:        clusterARN,
				Topic:             mskTopic,
				AWSRoleARN:        awsRoleARN,
				GCPServiceAccount: gcpSA,
			},
		},
	}
	t, err := client.CreateTopicWithConfig(ctx, topicID, cfg)
	if err != nil {
		return fmt.Errorf("CreateTopic: %w", err)
	}
	fmt.Fprintf(w, "Created topic with AWS MSK ingestion settings: %v\n", t)
	return nil
}

Java

在嘗試這個範例之前,請先按照 Pub/Sub 快速入門:使用用戶端程式庫中的操作說明設定 Java。詳情請參閱 Pub/Sub Java API 參考說明文件

如要向 Pub/Sub 進行驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定驗證機制」。


import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.pubsub.v1.IngestionDataSourceSettings;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;

public class CreateTopicWithAwsMskIngestionExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    // AWS MSK ingestion settings.
    String clusterArn = "cluster-arn";
    String mskTopic = "msk-topic";
    String awsRoleArn = "aws-role-arn";
    String gcpServiceAccount = "gcp-service-account";

    createTopicWithAwsMskIngestionExample(
        projectId, topicId, clusterArn, mskTopic, awsRoleArn, gcpServiceAccount);
  }

  public static void createTopicWithAwsMskIngestionExample(
      String projectId,
      String topicId,
      String clusterArn,
      String mskTopic,
      String awsRoleArn,
      String gcpServiceAccount)
      throws IOException {
    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
      TopicName topicName = TopicName.of(projectId, topicId);

      IngestionDataSourceSettings.AwsMsk awsMsk =
          IngestionDataSourceSettings.AwsMsk.newBuilder()
              .setClusterArn(clusterArn)
              .setTopic(mskTopic)
              .setAwsRoleArn(awsRoleArn)
              .setGcpServiceAccount(gcpServiceAccount)
              .build();
      IngestionDataSourceSettings ingestionDataSourceSettings =
          IngestionDataSourceSettings.newBuilder().setAwsMsk(awsMsk).build();

      Topic topic =
          topicAdminClient.createTopic(
              Topic.newBuilder()
                  .setName(topicName.toString())
                  .setIngestionDataSourceSettings(ingestionDataSourceSettings)
                  .build());

      System.out.println("Created topic with AWS MSK ingestion settings: " + topic.getAllFields());
    }
  }
}

Node.js

在嘗試這個範例之前,請先按照 Pub/Sub 快速入門:使用用戶端程式庫中的操作說明設定 Node.js。詳情請參閱 Pub/Sub Node.js API 參考說明文件

如要向 Pub/Sub 進行驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定驗證機制」。

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const clusterArn = 'arn:aws:kafka:...';
// const mskTopic = 'YOUR_MSK_TOPIC';
// const roleArn = 'arn:aws:iam:...';
// const gcpServiceAccount = 'ingestion-account@...';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithAwsMskIngestion(
  topicNameOrId,
  clusterArn,
  mskTopic,
  awsRoleArn,
  gcpServiceAccount,
) {
  // Creates a new topic with AWS MSK ingestion.
  await pubSubClient.createTopic({
    name: topicNameOrId,
    ingestionDataSourceSettings: {
      awsMsk: {
        clusterArn,
        topic: mskTopic,
        awsRoleArn,
        gcpServiceAccount,
      },
    },
  });
  console.log(`Topic ${topicNameOrId} created with AWS MSK ingestion.`);
}

Python

在嘗試這個範例之前,請先按照 Pub/Sub 快速入門:使用用戶端程式庫中的操作說明設定 Python。詳情請參閱 Pub/Sub Python API 參考說明文件

如要向 Pub/Sub 進行驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定驗證機制」。

from google.cloud import pubsub_v1
from google.pubsub_v1.types import Topic
from google.pubsub_v1.types import IngestionDataSourceSettings

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# cluster_arn = "your-cluster-arn"
# msk_topic = "your-msk-topic"
# aws_role_arn = "your-aws-role-arn"
# gcp_service_account = "your-gcp-service-account"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

request = Topic(
    name=topic_path,
    ingestion_data_source_settings=IngestionDataSourceSettings(
        aws_msk=IngestionDataSourceSettings.AwsMsk(
            cluster_arn=cluster_arn,
            topic=msk_topic,
            aws_role_arn=aws_role_arn,
            gcp_service_account=gcp_service_account,
        )
    ),
)

topic = publisher.create_topic(request=request)

print(f"Created topic: {topic.name} with AWS MSK Ingestion Settings")

C++

在嘗試這個範例之前,請先按照 Pub/Sub 快速入門:使用用戶端程式庫中的操作說明設定 C++。詳情請參閱 Pub/Sub C++ API 參考說明文件

如要向 Pub/Sub 進行驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定驗證機制」。

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::TopicAdminClient client, std::string project_id,
   std::string topic_id, std::string const& cluster_arn,
   std::string const& msk_topic, std::string const& aws_role_arn,
   std::string const& gcp_service_account) {
  google::pubsub::v1::Topic request;
  request.set_name(
      pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
  auto* aws_msk =
      request.mutable_ingestion_data_source_settings()->mutable_aws_msk();
  aws_msk->set_cluster_arn(cluster_arn);
  aws_msk->set_topic(msk_topic);
  aws_msk->set_aws_role_arn(aws_role_arn);
  aws_msk->set_gcp_service_account(gcp_service_account);

  auto topic = client.CreateTopic(request);
  // Note that kAlreadyExists is a possible error when the library retries.
  if (topic.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The topic already exists\n";
    return;
  }
  if (!topic) throw std::move(topic).status();

  std::cout << "The topic was successfully created: " << topic->DebugString()
            << "\n";
}

Node.js (TypeScript)

在嘗試這個範例之前,請先按照 Pub/Sub 快速入門:使用用戶端程式庫中的操作說明設定 Node.js 環境。 詳情請參閱 Pub/Sub Node.js API 參考說明文件

如要向 Pub/Sub 進行驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定驗證機制」。

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const clusterArn = 'arn:aws:kafka:...';
// const mskTopic = 'YOUR_MSK_TOPIC';
// const roleArn = 'arn:aws:iam:...';
// const gcpServiceAccount = 'ingestion-account@...';

// Imports the Google Cloud client library
import {PubSub} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithAwsMskIngestion(
  topicNameOrId: string,
  clusterArn: string,
  mskTopic: string,
  awsRoleArn: string,
  gcpServiceAccount: string,
) {
  // Creates a new topic with AWS MSK ingestion.
  await pubSubClient.createTopic({
    name: topicNameOrId,
    ingestionDataSourceSettings: {
      awsMsk: {
        clusterArn,
        topic: mskTopic,
        awsRoleArn,
        gcpServiceAccount,
      },
    },
  });
  console.log(`Topic ${topicNameOrId} created with AWS MSK ingestion.`);
}

如要進一步瞭解 ARN,請參閱「Amazon 資源名稱 (ARN)」和「IAM 識別碼」。

如果發生問題,請參閱「排解 Amazon MSK 匯入主題問題」。

編輯 Amazon MSK 匯入主題

如要編輯 Amazon MSK 匯入主題的攝入資料來源設定,請按照下列步驟操作:

控制台

  1. 前往 Google Cloud 控制台的「Topics」頁面。

    前往「主題」

  2. 按一下 Amazon MSK 匯入主題。

  3. 在主題詳細資料頁面中,按一下「編輯」

  4. 更新要變更的欄位。

  5. 按一下 [Update]

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 執行 gcloud pubsub topics update 指令,並使用下列範例中提到的所有標記:

    gcloud pubsub topics update TOPIC_ID \
        --aws-msk-ingestion-cluster-arn MSK_CLUSTER_ARN \
        --aws-msk-ingestion-topic MSK_TOPIC \
        --aws-msk-ingestion-aws-role-arn MSK_ROLE_ARN \
        --aws-msk-ingestion-service-account PUBSUB_SERVICE_ACCOUNT

    更改下列內容:

    • TOPIC_ID:Pub/Sub 主題的名稱或 ID。
    • MSK_CLUSTER_ARN:您要擷取至 Pub/Sub 的 Amazon MSK 叢集 ARN。ARN 格式如下:arn:aws:kafka:${Region}:${Account}:cluster/${ClusterName}/${ClusterId}
    • MSK_TOPIC:您要擷取至 Pub/Sub 的 Amazon MSK Kafka 主題名稱。
    • MSK_ROLE_ARN:AWS 角色的 ARN。角色的 ARN 格式如下:arn:aws:iam::${Account}:role/${RoleName}
    • PUBSUB_SERVICE_ACCOUNT:您在「在 Google Cloud 中建立服務帳戶」中建立的服務帳戶。

配額與限制

匯入主題的發布者傳送量受限於主題的發布配額。詳情請參閱「Pub/Sub 配額與限制」。

後續步驟

Apache Kafka® 是 Apache 軟體基金會 (Apache Software Foundation) 或其關聯企業在美國與/或其他國家/地區的註冊商標。