排定及觸發 Airflow DAG

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

本頁面說明 Airflow 中的排程和 DAG 觸發功能運作方式、如何為 DAG 定義排程,以及如何手動觸發或暫停 DAG。

關於 Cloud Composer 中的 Airflow DAG

Cloud Composer 中的 Airflow DAG 會在專案中一或多個 Cloud Composer 環境中執行。您可以將 Airflow DAG 的來源檔案上傳至與環境相關聯的 Cloud Storage 值區。接著,環境的 Airflow 例項會剖析這些檔案,並依據各 DAG 的排程排定 DAG 執行作業。在 DAG 執行期間,Airflow 會依據 DAG 定義的順序,排定並執行組成 DAG 的個別工作。

如要進一步瞭解 Airflow 的核心概念,例如 Airflow DAG、DAG 執行作業、工作或運算子,請參閱 Airflow 說明文件中的「核心概念」頁面。

關於 Airflow 中的 DAG 排程

Airflow 提供下列排程機制概念:

邏輯日期

代表執行特定 DAG 執行作業的日期。

這不是 Airflow 執行 DAG 的實際日期,而是特定 DAG 執行作業必須處理的一段時間。舉例來說,如果 DAG 的排程是每天 12:00 執行,邏輯日期也會是特定日期的 12:00。由於這項作業每天執行兩次,因此必須處理的時間範圍是過去 12 小時。同時,DAG 中定義的邏輯可能完全不使用邏輯日期或時間間隔。舉例來說,DAG 可能會每天執行相同的指令碼一次,而不會使用邏輯日期的值。

在 2.2 之前的 Airflow 版本中,這個日期稱為「執行日期」

執行日期

代表執行特定 DAG 執行作業的日期。

舉例來說,如果 DAG 的排程是每天中午 12 點執行,實際執行時間可能會在邏輯日期過後的 12:05 發生。

排程間隔

以邏輯日期表示 DAG 必須執行的時機和頻率。

舉例來說,每日時間表表示 DAG 每天執行一次,且 DAG 執行作業的邏輯日期間隔為 24 小時。

開始日期

指定 Airflow 開始排程 DAG 的時間。

DAG 中的工作可以有個別的開始日期,也可以為所有工作指定單一開始日期。Airflow 會根據 DAG 中任務的最小開始日期和排程間隔,安排 DAG 執行作業。

追趕、回補和重試

執行過去日期的 DAG 執行作業的機制。

追趕會執行尚未執行的 DAG 執行作業,例如,如果 DAG 已暫停一段很長的時間,然後取消暫停,您可以使用補充作業,執行特定日期範圍內的 DAG 執行作業。重試次數可指定 Airflow 在執行 DAG 工作時必須嘗試幾次。

排程的運作方式如下:

  1. 開始日期過後,Airflow 會等待下次排程間隔發生。

  2. Airflow 會在這個排程間隔結束時,排定第一次 DAG 執行作業。

    舉例來說,如果 DAG 的排程為每小時執行一次,且開始日期為今天的 12:00,則第一次執行 DAG 的時間會是今天的 13:00。

本文的「安排 Airflow DAG」一節說明如何運用這些概念設定 DAG 的排程。如要進一步瞭解 DAG 執行作業和排程,請參閱 Airflow 說明文件中的「DAG 執行作業」。

關於觸發 DAG 的方式

Airflow 提供以下方式觸發 DAG:

  • 依排程觸發。Airflow 會根據 DAG 檔案中指定的排程,自動觸發 DAG。

  • 手動觸發。您可以透過Google Cloud 主控台、Airflow UI 手動觸發 DAG,也可以透過 Google Cloud CLI 執行 Airflow CLI 指令。

  • 在發生事件時觸發。如要根據事件觸發 DAG,標準做法是使用感應器

觸發 DAG 的其他方式:

事前準備

  • 請確認您的帳戶具有可管理環境 bucket 中的物件,以及查看及觸發 DAG 的角色。詳情請參閱「存取權控管」。

排定 Airflow DAG

您可以在 DAG 檔案中定義 DAG 的排程。請按照下列方式編輯 DAG 定義:

  1. 在電腦上找出並編輯 DAG 檔案。如果您沒有 DAG 檔案,可以從環境的儲存桶下載副本。針對新的 DAG,您可以在建立 DAG 檔案時定義所有參數。

  2. schedule_interval 參數中定義排程。您可以使用 Cron 運算式 (例如 0 0 * * *) 或預設值 (例如 @daily)。詳情請參閱 Airflow 說明文件中的「Cron 和時間間隔」。

    Airflow 會根據您設定的排程,決定 DAG 執行作業的邏輯日期。

  3. start_date 參數中定義開始日期。

    Airflow 會使用這個參數判斷第一次執行 DAG 的邏輯日期。

  4. (選用) 在 catchup 參數中,定義 Airflow 是否必須執行這個 DAG 從開始日期到目前日期之間所有先前未執行的執行作業。

    在追趕期間執行的 DAG 執行作業,其邏輯日期會是過去,而執行日期會反映 DAG 執行作業的實際執行時間。

  5. (選用) 在 retries 參數中,定義 Airflow 必須重試失敗工作的次數 (每個 DAG 包含一或多個個別工作)。根據預設,Cloud Composer 中的任務會重試兩次。

  6. 將新版 DAG 上傳至環境的值區。

  7. 等待 Airflow 成功剖析 DAG。舉例來說,您可以在Google Cloud 控制台或 Airflow UI 中,查看環境中的 DAG 清單

下列範例 DAG 定義會在每天 00:00 和 12:00 執行兩次,其開始日期設為 2024 年 1 月 1 日,但由於追趕功能已停用,因此 Airflow 不會在上傳或暫停後,針對過去日期執行這項作業。

DAG 包含一個名為 insert_query_job 的工作,該工作會使用 BigQueryInsertJobOperator 運算子將資料列插入資料表。這個運算子是 Google Cloud BigQuery 運算子之一,可用於管理資料集和資料表、執行查詢及驗證資料。如果這項工作的特定執行作業失敗,Airflow 會以預設重試間隔再重試四次。這些重試的邏輯日期保持不變。

這個資料列的 SQL 查詢會使用 Airflow 範本,將 DAG 的邏輯日期和名稱寫入該資料列。

import datetime

from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator

with DAG(
  "bq_example_scheduling_dag",
  start_date=datetime.datetime(2024, 1, 1),
  schedule_interval='0 */12 * * *',
  catchup=False
  ) as dag:

  insert_query_job = BigQueryInsertJobOperator(
    task_id="insert_query_job",
    retries=4,
    configuration={
        "query": {
            # schema: date (string), description (string)
            # example row: "20240101T120000", "DAG run: <DAG: bq_example_scheduling_dag>"
            "query": "INSERT example_dataset.example_table VALUES ('{{ ts_nodash }}', 'DAG run: {{ dag }}' )",
            "useLegacySql": False,
            "priority": "BATCH",
        }
    },
    location="us-central1"
  )

  insert_query_job

如要測試這個 DAG,您可以手動觸發,然後查看工作執行記錄

更多排程參數範例

以下排程參數範例說明排程如何搭配不同參數組合運作:

  • 如果 start_datedatetime(2024, 4, 4, 16, 25),而 schedule_interval30 16 * * *,則第一個 DAG 會在 2024 年 4 月 5 日 16 點 30 分執行。

  • 如果 start_datedatetime(2024, 4, 4, 16, 35),而 schedule_interval30 16 * * *,則第一個 DAG 會在 2024 年 4 月 6 日 16:30 執行。由於開始日期在 2024 年 4 月 4 日的排程間隔之後,因此 DAG 不會在 2024 年 4 月 5 日執行。相反地,時間間隔會在 2024 年 4 月 5 日 16 點 35 分結束,因此下次 DAG 執行作業的時間會在隔天 16 點 30 分。

  • 如果 start_datedatetime(2024, 4, 4),而 schedule_interval@daily,則第一個 DAG 執行作業的時間會排定在 2024 年 4 月 5 日 00:00。

  • 如果 start_datedatetime(2024, 4, 4, 16, 30),而 schedule_interval0 * * * *,則第一個 DAG 執行作業的時間會排定在 2024 年 4 月 4 日 18:00。指定的日期和時間過後,Airflow 會排定 DAG 執行作業,在每小時的 0 分鐘執行。最接近的時間點是 17:00。此時,Airflow 會在排程間隔結束時 (也就是 18:00) 排定 DAG 執行作業。

手動觸發 DAG

手動觸發 Airflow DAG 時,Airflow 會執行 DAG 一次,不受 DAG 檔案中指定的排程影響。

主控台

如要從 Google Cloud 控制台觸發 DAG,請按照下列步驟操作:

  1. 前往 Google Cloud 控制台的「Environments」頁面。

    前往「環境」

  2. 選取環境即可查看詳細資料。

  3. 在「Environment details」頁面中,前往「DAGs」分頁。

  4. 按一下 DAG 名稱。

  5. 在「DAG 詳細資料」頁面中,按一下「觸發 DAG」。系統會建立新的 DAG 執行作業。

Airflow UI

如何透過 Airflow UI 觸發 DAG:

  1. 前往 Google Cloud 控制台的「Environments」頁面。

    前往「環境」

  2. 在「Airflow 網路伺服器」欄中,按一下環境的「Airflow」連結。

  3. 使用具備適當權限的 Google 帳戶登入。

  4. 在 Airflow 網頁介面中的「DAG」頁面,點選 DAG 的「動作」欄中的「觸發 DAG」按鈕。

gcloud

執行 dags trigger Airflow CLI 指令:

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags trigger -- DAG_ID

更改下列內容:

  • ENVIRONMENT_NAME:環境名稱。
  • LOCATION:環境所在的地區。
  • DAG_ID:DAG 的名稱。

如要進一步瞭解如何在 Cloud Composer 環境中執行 Airflow CLI 指令,請參閱「執行 Airflow CLI 指令」。

如要進一步瞭解可用的 Airflow CLI 指令,請參閱 gcloud composer environments run 指令參考資料

查看 DAG 執行記錄檔和詳細資料

在 Google Cloud 控制台中,您可以:

此外,Cloud Composer 還提供 Airflow UI 存取權,這是 Airflow 本身的網頁介面。

暫停 DAG

主控台

如要透過 Google Cloud 主控台暫停 DAG,請按照下列步驟操作:

  1. 前往 Google Cloud 控制台的「Environments」頁面。

    前往「環境」

  2. 選取環境即可查看詳細資料。

  3. 在「Environment details」頁面中,前往「DAGs」分頁。

  4. 按一下 DAG 名稱。

  5. 在「DAG 詳細資料」頁面上,按一下「暫停 DAG」

Airflow UI

如要透過 Airflow UI 暫停 DAG,請按照下列步驟操作:

  1. 前往 Google Cloud 控制台的「Environments」頁面。

前往「環境」

  1. 在「Airflow 網路伺服器」欄中,按一下環境的「Airflow」連結。

  2. 使用具備適當權限的 Google 帳戶登入。

  3. 在 Airflow 網頁介面的「DAG」頁面中,按一下 DAG 名稱旁的切換鈕。

gcloud

執行 dags pause Airflow CLI 指令:

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags pause -- DAG_ID

更改下列內容:

  • ENVIRONMENT_NAME:環境名稱。
  • LOCATION:環境所在的地區。
  • DAG_ID:DAG 的名稱。

如要進一步瞭解如何在 Cloud Composer 環境中執行 Airflow CLI 指令,請參閱「執行 Airflow CLI 指令」。

如要進一步瞭解可用的 Airflow CLI 指令,請參閱 gcloud composer environments run 指令參考資料

後續步驟