使用 Cloud Tasks 將函式排入佇列


工作佇列函式會運用 Google Cloud Tasks,協助應用程式在主要應用程式流程之外,以非同步方式執行耗時、耗用大量資源或頻寬受限的工作。

舉例來說,假設您想備份目前在 API 上託管的大量圖片檔案,但 API 有速率限制。如要成為該 API 的負責任消費者,您必須遵守其速率限制。此外,這類長時間執行的工作可能會因逾時和記憶體限制而失敗。

為減輕這項複雜度,您可以編寫工作佇列函式,設定 scheduleTimedispatchDeadline 等基本工作選項,然後將函式交給 Cloud Tasks 中的佇列。Cloud Tasks 環境專門用於確保這類作業的有效壅塞控制和重試政策。

Firebase SDK for Cloud Functions for Firebase 3.20.1 以上版本可與 Firebase Admin SDK 10.2.0 以上版本互通,支援工作佇列函式。

在 Firebase 中使用工作佇列函式可能會產生 Cloud Tasks 處理費用。詳情請參閱Cloud Tasks定價

建立工作佇列函式

如要使用工作佇列函式,請按照下列工作流程操作:

  1. 使用 Firebase SDK for Cloud Functions 編寫工作佇列函式。
  2. 使用 HTTP 要求觸發函式,藉此測試函式。
  3. 使用 Firebase CLI 部署函式。首次部署工作佇列函式時,CLI 會在 Cloud Tasks 中建立工作佇列,並使用來源程式碼中指定的選項 (速率限制和重試)。
  4. 將工作新增至新建立的工作佇列,並視需要傳遞參數來設定執行時間表。如要達成這個目標,請使用 Admin SDK 編寫程式碼,然後部署至 Cloud Functions for Firebase

編寫工作佇列函式

本節中的程式碼範例是以應用程式為基礎,該應用程式會設定服務,備份 NASA「每日天文圖片」中的所有圖片。首先,請匯入必要模組:

Node.js

// Dependencies for task queue functions.
const {onTaskDispatched} = require("firebase-functions/v2/tasks");
const {onRequest, HttpsError} = require("firebase-functions/v2/https");
const {getFunctions} = require("firebase-admin/functions");
const {logger} = require("firebase-functions/v2");

// Dependencies for image backup.
const path = require("path");
const fetch = require("node-fetch");
const {initializeApp} = require("firebase-admin/app");
const {getStorage} = require("firebase-admin/storage");
const {GoogleAuth} = require("google-auth-library");

Python

# Dependencies for task queue functions.
from google.cloud import tasks_v2
import requests
from firebase_functions.options import RetryConfig, RateLimits, SupportedRegion

# Dependencies for image backup.
from datetime import datetime, timedelta
import json
import pathlib
from urllib.parse import urlparse
from firebase_admin import initialize_app, storage, functions
from firebase_functions import https_fn, tasks_fn, params
import google.auth
from google.auth.transport.requests import AuthorizedSession

請使用 onTaskDispatchedon_task_dispatched 函式,編寫工作佇列函式時,您可以設定每個佇列的重試和速率限制設定。

設定工作佇列函式

工作佇列函式提供強大的設定組合,可精確控制工作佇列的速率限制和重試行為:

Node.js

exports.backupapod = onTaskDispatched(
    {
      retryConfig: {
        maxAttempts: 5,
        minBackoffSeconds: 60,
      },
      rateLimits: {
        maxConcurrentDispatches: 6,
      },
    }, async (req) => {

Python

@tasks_fn.on_task_dispatched(retry_config=RetryConfig(max_attempts=5, min_backoff_seconds=60),
                             rate_limits=RateLimits(max_concurrent_dispatches=10))
def backupapod(req: tasks_fn.CallableRequest) -> str:
    """Grabs Astronomy Photo of the Day (APOD) using NASA's API."""
  • retryConfig.maxAttempts=5:工作佇列中的每個工作都會自動重試最多 5 次。這有助於減輕暫時性錯誤 (例如網路錯誤或依附的外部服務暫時中斷) 的影響。

  • retryConfig.minBackoffSeconds=60:每次重試間隔至少 60 秒。這會在每次嘗試之間提供大量緩衝區,因此我們不會急著用完 5 次重試嘗試。

  • rateLimits.maxConcurrentDispatch=6:一次最多可調度 6 項工作。這有助於確保基礎函式能持續接收要求,並減少有效執行個體數量和冷啟動次數。

測試工作佇列函式

在大多數情況下,Cloud Functions 模擬器是測試工作佇列函式的最佳方式。請參閱 Emulator Suite 說明文件,瞭解如何為工作佇列函式模擬功能設定應用程式

此外,工作佇列 functions_sdk 會在 Firebase Local Emulator Suite 中公開為簡單的 HTTP 函式。您可以傳送含有 JSON 資料酬載的 HTTP POST 要求,測試模擬工作函式:

 # start the Local Emulator Suite
 firebase emulators:start

 # trigger the emulated task queue function
 curl \
  -X POST                                            # An HTTP POST request...
  -H "content-type: application/json" \              # ... with a JSON body
  http://localhost:$PORT/$PROJECT_ID/$REGION/$NAME \ # ... to function url
  -d '{"data": { ... some data .... }}'              # ... with JSON encoded data

部署工作佇列函式

使用 Firebase CLI 部署工作佇列函式:

$ firebase deploy --only functions:backupapod

首次部署工作佇列函式時,CLI 會在 Cloud Tasks 中建立工作佇列,並使用來源程式碼中指定的選項 (速率限制和重試)。

如果在部署函式時發生權限錯誤,請確認執行部署指令的使用者已獲派適當的 IAM 角色

將工作佇列函式排入佇列

您可以使用 Node.js 適用的 Firebase Admin SDK 或 Python 適用的 Google Cloud 程式庫,從受信任的伺服器環境 (例如 Cloud Functions for Firebase) 將工作佇列函式排入 Cloud Tasks。如果您是 Admin SDK 新手,請參閱「在伺服器中新增 Firebase」一文,瞭解如何開始使用。

一般流程會建立新工作、將工作加入 Cloud Tasks 的佇列,並設定工作:

Node.js

exports.enqueuebackuptasks = onRequest(
    async (_request, response) => {
      const queue = getFunctions().taskQueue("backupapod");
      const targetUri = await getFunctionUrl("backupapod");

      const enqueues = [];
      for (let i = 0; i <= BACKUP_COUNT; i += 1) {
        const iteration = Math.floor(i / HOURLY_BATCH_SIZE);
        // Delay each batch by N * hour
        const scheduleDelaySeconds = iteration * (60 * 60);

        const backupDate = new Date(BACKUP_START_DATE);
        backupDate.setDate(BACKUP_START_DATE.getDate() + i);
        // Extract just the date portion (YYYY-MM-DD) as string.
        const date = backupDate.toISOString().substring(0, 10);
        enqueues.push(
            queue.enqueue({date}, {
              scheduleDelaySeconds,
              dispatchDeadlineSeconds: 60 * 5, // 5 minutes
              uri: targetUri,
            }),
        );
      }
      await Promise.all(enqueues);
      response.sendStatus(200);
    });

Python

@https_fn.on_request()
def enqueuebackuptasks(_: https_fn.Request) -> https_fn.Response:
    """Adds backup tasks to a Cloud Tasks queue."""
    task_queue = functions.task_queue("backupapod")
    target_uri = get_function_url("backupapod")

    for i in range(BACKUP_COUNT):
        batch = i // HOURLY_BATCH_SIZE

        # Delay each batch by N hours
        schedule_delay = timedelta(hours=batch)
        schedule_time = datetime.now() + schedule_delay

        dispatch_deadline_seconds = 60 * 5  # 5 minutes

        backup_date = BACKUP_START_DATE + timedelta(days=i)
        body = {"data": {"date": backup_date.isoformat()[:10]}}
        task_options = functions.TaskOptions(schedule_time=schedule_time,
                                             dispatch_deadline_seconds=dispatch_deadline_seconds,
                                             uri=target_uri)
        task_queue.enqueue(body, task_options)
    return https_fn.Response(status=200, response=f"Enqueued {BACKUP_COUNT} tasks")
  • 範例程式碼會為第 N 個工作關聯 N 分鐘的延遲時間,盡量分散工作執行時間。這表示每分鐘會觸發約 1 項工作。請注意,如要讓工作在特定時間觸發,也可以使用 scheduleTime (Node.js) 或 schedule_time (Python)。Cloud Tasks

  • 範例程式碼會設定等待工作完成的時間上限。Cloud TasksCloud Tasks 會根據佇列的重試設定重試工作,或直到達到這個期限為止。在範例中,佇列設定為最多重試工作 5 次,但如果整個程序 (包括重試嘗試) 超過 5 分鐘,工作就會自動取消。

擷取並加入目標 URI

由於 Cloud Tasks 會建立驗證權杖,以驗證對基礎工作佇列函式的要求,因此您必須在將工作加入佇列時,指定函式的 Cloud Run 網址。建議您以程式輔助方式擷取函式的網址,如下所示:

Node.js

/**
 * Get the URL of a given v2 cloud function.
 *
 * @param {string} name the function's name
 * @param {string} location the function's location
 * @return {Promise<string>} The URL of the function
 */
async function getFunctionUrl(name, location="us-central1") {
  if (!auth) {
    auth = new GoogleAuth({
      scopes: "https://www.googleapis.com/auth/cloud-platform",
    });
  }
  const projectId = await auth.getProjectId();
  const url = "https://cloudfunctions.googleapis.com/v2beta/" +
    `projects/${projectId}/locations/${location}/functions/${name}`;

  const client = await auth.getClient();
  const res = await client.request({url});
  const uri = res.data?.serviceConfig?.uri;
  if (!uri) {
    throw new Error(`Unable to retreive uri for function at ${url}`);
  }
  return uri;
}

Python

def get_function_url(name: str, location: str = SupportedRegion.US_CENTRAL1) -> str:
    """Get the URL of a given v2 cloud function.

    Params:
        name: the function's name
        location: the function's location

    Returns: The URL of the function
    """
    credentials, project_id = google.auth.default(
        scopes=["https://www.googleapis.com/auth/cloud-platform"])
    authed_session = AuthorizedSession(credentials)
    url = ("https://cloudfunctions.googleapis.com/v2beta/" +
           f"projects/{project_id}/locations/{location}/functions/{name}")
    response = authed_session.get(url)
    data = response.json()
    function_url = data["serviceConfig"]["uri"]
    return function_url

疑難排解

開啟 Cloud Tasks 記錄功能

Cloud Tasks 的記錄包含實用的診斷資訊,例如與工作相關聯的要求狀態。根據預設,系統會停用 Cloud Tasks 的記錄,因為這項服務可能會在專案中產生大量記錄。建議您在積極開發及偵錯工作佇列函式時,開啟偵錯記錄。請參閱「開啟記錄」。

IAM 權限

排隊工作時,或 Cloud Tasks 嘗試叫用工作佇列函式時,可能會出現 PERMISSION DENIED 錯誤。請確認專案具有下列 IAM 繫結:

  • 用來將工作加入佇列至 Cloud Tasks 的身分需要 cloudtasks.tasks.create IAM 權限。

    在範例中,這是App Engine預設服務帳戶

gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudtasks.enqueuer
  • 將工作加入 Cloud Tasks 時使用的身分,必須具備使用 Cloud Tasks 中與工作相關聯服務帳戶的權限。

    在範例中,這是App Engine預設服務帳戶

如要瞭解如何將App Engine預設服務帳戶新增為App Engine預設服務帳戶的使用者,請參閱 Google Cloud IAM 說明文件

gcloud functions add-iam-policy-binding $FUNCTION_NAME \
  --region=us-central1 \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudfunctions.invoker