編寫 Airflow DAG

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

本指南說明如何編寫在 Cloud Composer 環境中執行的 Apache Airflow 有向非循環圖 (DAG)。

Apache Airflow 不提供嚴密的 DAG 和工作隔離機制,因此建議您使用個別的實際工作環境和測試環境,以免發生 DAG 干擾情形。詳情請參閱測試 DAG 一文。

建構 Airflow DAG

Airflow DAG 是在 Python 檔案中定義,並且是由下列元件組成:

  • DAG 定義
  • Airflow 運算子
  • 運算子關係

下列程式碼片段會顯示去脈絡化的各元件範例。

DAG 定義

以下範例說明 Airflow DAG 定義:

import datetime

from airflow import models

default_dag_args = {
    # The start_date describes when a DAG is valid / can be run. Set this to a
    # fixed point in time rather than dynamically, since it is evaluated every
    # time a DAG is parsed. See:
    # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
    "start_date": datetime.datetime(2018, 1, 1),
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
    "composer_sample_simple_greeting",
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:

運算子和工作

Airflow 運算子會描述待完成的工作。工作task 是運算子的特定例項。

from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

    def greeting():
        import logging

        logging.info("Hello World!")

    # An instance of an operator is called a task. In this case, the
    # hello_python task calls the "greeting" Python function.
    hello_python = PythonOperator(task_id="hello", python_callable=greeting)

    # Likewise, the goodbye_bash task calls a Bash script.
    goodbye_bash = BashOperator(task_id="bye", bash_command="echo Goodbye.")

工作關係

工作關係:描述工作必須完成的順序。

# Define the order in which the tasks complete by using the >> and <<
# operators. In this example, hello_python executes before goodbye_bash.
hello_python >> goodbye_bash

Python 中的完整 DAG 工作流程範例

下列工作流程是完整的有效 DAG 範本,其中包含兩項工作:hello_python 工作和 goodbye_bash 工作:


import datetime

from airflow import models

from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator



default_dag_args = {
    # The start_date describes when a DAG is valid / can be run. Set this to a
    # fixed point in time rather than dynamically, since it is evaluated every
    # time a DAG is parsed. See:
    # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
    "start_date": datetime.datetime(2018, 1, 1),
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
    "composer_sample_simple_greeting",
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:
    def greeting():
        import logging

        logging.info("Hello World!")

    # An instance of an operator is called a task. In this case, the
    # hello_python task calls the "greeting" Python function.
    hello_python = PythonOperator(task_id="hello", python_callable=greeting)

    # Likewise, the goodbye_bash task calls a Bash script.
    goodbye_bash = BashOperator(task_id="bye", bash_command="echo Goodbye.")

    # Define the order in which the tasks complete by using the >> and <<
    # operators. In this example, hello_python executes before goodbye_bash.
    hello_python >> goodbye_bash

如要進一步瞭解如何定義 Airflow DAG,請參閱 Airflow 教學課程Airflow 概念

Airflow 運算子

以下是幾個常用 Airflow 運算子的範例。如需 Airflow 運算子的權威性參考資料,請參閱「運算子和鉤子參考資料」和「供應者索引」。

BashOperator

使用 BashOperator 執行指令列程式。

from airflow.operators import bash

    # Create BigQuery output dataset.
    make_bq_dataset = bash.BashOperator(
        task_id="make_bq_dataset",
        # Executing 'bq' command requires Google Cloud SDK which comes
        # preinstalled in Cloud Composer.
        bash_command=f"bq ls {bq_dataset_name} || bq mk {bq_dataset_name}",
    )

Cloud Composer 會在 Airflow 工作站上執行以 Bash 指令碼提供的指令。工作站是 Debian 式的 Docker 容器,並包含數個套件。

PythonOperator

您可以使用 PythonOperator 執行任意 Python 程式碼。

Cloud Composer 會在包含環境中使用的 Cloud Composer 映像檔版本套件的容器中執行 Python 程式碼。

如要安裝其他 Python 套件,請參閱「安裝 Python 依附元件」一文。

Google Cloud 運算子

如要執行使用 Google Cloud 產品的工作,請使用Google Cloud Airflow 運算子。舉例來說,BigQuery 運算子會在 BigQuery 中查詢及處理資料。

Google Cloud 和 Google Cloud提供的個別服務還有許多其他 Airflow 運算子。如需完整清單,請參閱「Google Cloud 運算子」。

from airflow.providers.google.cloud.operators import bigquery
from airflow.providers.google.cloud.transfers import bigquery_to_gcs

    bq_recent_questions_query = bigquery.BigQueryInsertJobOperator(
        task_id="bq_recent_questions_query",
        configuration={
            "query": {
                "query": RECENT_QUESTIONS_QUERY,
                "useLegacySql": False,
                "destinationTable": {
                    "projectId": project_id,
                    "datasetId": bq_dataset_name,
                    "tableId": bq_recent_questions_table_id,
                },
            }
        },
        location=location,
    )

EmailOperator

您可以使用 EmailOperator 從 DAG 傳送電子郵件。如要從 Cloud Composer 環境傳送電子郵件,請將環境設定為使用 SendGrid

from airflow.operators import email

    # Send email confirmation (you will need to set up the email operator
    # See https://cloud.google.com/composer/docs/how-to/managing/creating#notification
    # for more info on configuring the email operator in Cloud Composer)
    email_summary = email.EmailOperator(
        task_id="email_summary",
        to="{{var.value.email}}",
        subject="Sample BigQuery notify data ready",
        html_content="""
        Analyzed Stack Overflow posts data from {min_date} 12AM to {max_date}
        12AM. The most popular question was '{question_title}' with
        {view_count} views. Top 100 questions asked are now available at:
        {export_location}.
        """.format(
            min_date=min_query_date,
            max_date=max_query_date,
            question_title=(
                "{{ ti.xcom_pull(task_ids='bq_read_most_popular', "
                "key='return_value')[0][0] }}"
            ),
            view_count=(
                "{{ ti.xcom_pull(task_ids='bq_read_most_popular', "
                "key='return_value')[0][1] }}"
            ),
            export_location=output_file,
        ),
    )

作業員失敗通知

email_on_failure 設為 True,即可在 DAG 中的運算子失敗時傳送電子郵件通知。如要從 Cloud Composer 環境傳送電子郵件通知,您必須將環境設定為使用 SendGrid

from airflow import models

default_dag_args = {
    "start_date": yesterday,
    # Email whenever an Operator in the DAG fails.
    "email": "{{var.value.email}}",
    "email_on_failure": True,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "project_id": project_id,
}

with models.DAG(
    "composer_sample_bq_notify",
    schedule_interval=datetime.timedelta(weeks=4),
    default_args=default_dag_args,
) as dag:

DAG 工作流程指南

  • 請將任何自訂 Python 程式庫放到巢狀目錄的 DAG ZIP 封存檔中,而不要放到 DAG 目錄頂層。

    當 Airflow 掃描 dags/ 資料夾時,Airflow 只會檢查 DAG 資料夾頂層和同樣位在頂層 dags/ 資料夾中的 ZIP 封存檔頂層,看看當中的 Python 模組是否包含 DAG。如果 Airflow 碰到的 Python 模組所屬的 ZIP 封存檔不包含 airflowDAG 子字串,Airflow 就會停止處理該 ZIP 封存檔。Airflow 只會傳回停止處理為止所找到的 DAG。

  • 為了容錯,請勿在相同 Python 模組中定義多個 DAG 物件。

  • 請勿使用子 DAG。請改為在 DAG 中分組工作

  • 請將剖析 DAG 時需要的檔案放在 dags/ 資料夾,而非 data/ 資料夾。

  • 為 DAG 實作單元測試

  • 按照測試 DAG 的操作說明所述,測試開發或修改的 DAG。

  • Composer 本機開發 CLI 工具可在本機執行 Airflow 環境,簡化 Cloud Composer 2 的 Apache Airflow DAG 開發作業。這個本機 Airflow 環境會使用特定 Cloud Composer 2 版本的映像檔。

  • 確認開發的 DAG 不會過度增加DAG 剖析時間

  • Airflow 工作可能會因多種原因而失敗。為避免整個 DAG 執行作業失敗,建議您啟用工作重試功能。將重試次數上限設為 0 表示不會執行重試。

    建議您覆寫 default_task_retries 選項,使用 0 以外的值來重試工作。此外,您也可以在工作層級設定 retries 參數

  • 如果您想在 Airflow 工作中使用 GPU,請根據使用 GPU 的機器建立個別的 GKE 叢集。使用 GKEStartPodOperator 執行工作。

  • 請勿在叢集的節點集區中執行 CPU 和記憶體密集的作業,因為該集區會執行其他 Airflow 元件 (排程器、工作站、網路伺服器)。請改用 KubernetesPodOperatorGKEStartPodOperator

  • 將 DAG 部署至環境時,請只將解讀及執行 DAG 所需的檔案上傳至 /dags 資料夾。

  • 限制 /dags 資料夾中的 DAG 檔案數量。

    Airflow 會持續剖析 /dags 資料夾中的 DAG。剖析是一種循環處理 DAG 資料夾的程序,而需要載入的檔案數量 (連同其依附元件) 會影響 DAG 剖析和任務排程的效能。使用 100 個檔案 (每個檔案含 100 個 DAG) 比使用 10000 個檔案 (每個檔案含 1 個 DAG) 更有效率,因此建議進行這類最佳化。這項最佳化功能可在解析時間與 DAG 撰寫和管理效率之間取得平衡。

    例如,如果要部署 10000 個 DAG 檔案,您可以考慮建立 100 個 ZIP 檔案,每個檔案包含 100 個 DAG 檔案。

    除了上述提示之外,如果您有超過 10000 個 DAG 檔案,建議以程式輔助方式產生 DAG。舉例來說,您可以實作單一 Python DAG 檔案,產生一定數量的 DAG 物件 (例如 20、100 個 DAG 物件)。

  • 請避免使用已淘汰的 Airflow 運算子。請改為使用最新的替代方案

編寫 DAG 的常見問題

如要在多個 DAG 中執行相同或類似的工作,我該如何降低程式碼重複的情形?

建議您定義程式庫和包裝函式,以減少程式碼重複的情形。

如何在不同 DAG 檔案之間重複使用程式碼?

請將您的公用程式函式放到本機 Python 程式庫中並匯入函式。您可以在環境儲存桶的 dags/ 資料夾中,參照任何 DAG 中的函式。

如何降低產生不同定義的風險?

舉例來說,假設您有兩個團隊要將原始資料匯總成收益指標。這兩個團隊編寫了兩個稍有差異但用途相同的工作。您可以定義程式庫來處理收益資料,這樣 DAG 實作者就必須提供明確的匯總收益定義。

如何設定 DAG 之間的相依性?

這取決於您要如何定義相依性。

如果您有兩個 DAG (DAG A 和 DAG B),並想讓 DAG B 在 DAG A 之後觸發,您可以在 DAG A 的結尾加上 TriggerDagRunOperator

如果 DAG B 只依賴 DAG A 產生的成果 (例如 Pub/Sub 訊息),則或許較適合使用感應器。

如果 DAG B 與 DAG A 緊密整合,您或許能夠將這兩個 DAG 合併為單一 DAG。

如何將專屬執行 ID 傳送至 DAG 及其工作?

舉例來說,假設您要傳送 Dataproc 叢集名稱和檔案路徑。

您可以在 PythonOperator 中傳回 str(uuid.uuid4()),藉此產生隨機專屬 ID。這樣做會將 ID 加到 XComs 中,讓您可在其他運算子中透過範本欄位參照這個 ID。

產生 uuid 之前,請想想看 DagRun 專屬 ID 是否更有幫助。您也可以使用巨集在 Jinja 替換作業中參照這些 ID。

如何分隔 DAG 中的任務?

每項工作都應為整體作業的一部分,並具有冪等性質,因此請避免將多步驟工作流程封裝在單一工作中,例如在 PythonOperator 中執行的複雜程式。

我該在單一 DAG 中定義多項工作,以匯總來自多個來源的資料嗎?

舉例來說,假設您有多個包含原始資料的資料表,並想為每個資料表建立每日匯總數據。這些工作彼此不相關。您應該為每個資料表分別建立一項工作和 DAG 嗎?還是要建立一個通用的 DAG?

如果您允許每項工作共用相同的 DAG 層級屬性 (例如 schedule_interval),那麼在單一 DAG 中定義多項工作就很有意義。否則,為了盡量減少程式碼重複,您可以將多個 DAG 放入模組的 globals(),從單一 Python 模組產生多個 DAG。

如何限制 DAG 中執行的並行工作數量?

例如,您想避免超過 API 使用限制/配額,或避免同時執行太多個程序。

您可以在 Airflow 網頁 UI 中定義 Airflow 集區,並將工作與 DAG 中的現有集區建立關聯。

使用運算子的常見問題

我該使用 DockerOperator 嗎?

除非用於在遠端 Docker 安裝 (不在環境叢集中) 上啟動容器,否則不建議使用 DockerOperator。在 Cloud Composer 環境中,操作員無法存取 Docker 守護程序。

請改用 KubernetesPodOperatorGKEStartPodOperator。這些運算子會分別在 Kubernetes 或 GKE 叢集中啟動 Kubernetes Pod。請注意,我們不建議將 Pod 發布至環境的叢集,因為這可能會導致資源競爭。

我該使用 SubDagOperator 嗎?

我們不建議使用 SubDagOperator

請參考「分組工作」一節的建議,使用其他方法。

我應該只在 PythonOperators 中執行 Python 程式碼,以完全區隔 Python 運算子嗎?

視您的目標而定,您有幾種選項。

如果您只是要維持獨立的 Python 相依性,可以使用 PythonVirtualenvOperator

建議您使用 KubernetesPodOperator。這個運算子可讓您定義 Kubernetes pod,並在其他叢集中執行這些 pod。

如何新增自訂二進位檔或非 PyPI 套件?

您可以安裝私人套件存放區中託管的套件

如何將引數統一傳送至 DAG 及其工作?

您可以使用 Airflow 內建對 Jinja 範本的支援,傳遞可用於範本欄位的引數。

範本替換作業的發生時機為何?

呼叫運算子的 pre_execute 函式之前,系統會在 Airflow 工作站上替換範本。從實際執行的層面來看,即代表系統要到執行工作的前一刻才會替換範本。

如何確認哪些運算子引數支援範本替換作業?

運算子引數如果支援 Jinja2 範本替換作業,就會明確標示。

請查看運算子定義中的 template_fields 欄位,其中包含會經過範本替換作業的引數名稱清單。

例如,請查看 BashOperator,該運算子支援 bash_commandenv 引數的範本。

已淘汰及移除的 Airflow 運算子

下表列出的 Airflow 運算子已淘汰:

  • 避免在 DAG 中使用這些運算子。請改用提供的最新替換運算子。

  • 如果運算子列為「已移除」,表示該運算子已在某個 Cloud Composer 2 已發布版本中移除。

  • 如果運算子列為「已計劃移除」,表示該運算子已淘汰,並將於日後的 Cloud Composer 2 版本中移除。

  • 如果運算子列為「已在最新的 Google 供應端中移除」,表示該運算子已從最新版本的 apache-airflow-providers-google 套件中移除。同時,Cloud Composer 仍會使用此套件中未移除運算子的版本。

已淘汰的運算子 狀態 替換運算子 換貨服務開放時間:
CreateAutoMLTextTrainingJobOperator 已移除 SupervisedFineTuningTrainOperator composer-2.9.5-airflow-2.9.3
composer-2.9.5-airflow-2.9.1
GKEDeploymentHook 已移除 GKEKubernetesHook composer-2.7.1-airflow-2.7.3
GKECustomResourceHook 已移除 GKEKubernetesHook composer-2.7.1-airflow-2.7.3
GKEPodHook 已移除 GKEKubernetesHook composer-2.7.1-airflow-2.7.3
GKEJobHook 已移除 GKEKubernetesHook composer-2.7.1-airflow-2.7.3
GKEPodAsyncHook 已移除 GKEKubernetesAsyncHook composer-2.7.1-airflow-2.7.3
SecretsManagerHook 已移除 GoogleCloudSecretManagerHook composer-2.8.3-airflow-2.7.3
BigQueryExecuteQueryOperator 已移除 BigQueryInsertJobOperator 所有版本
BigQueryPatchDatasetOperator 已移除 BigQueryUpdateDatasetOperator 所有版本
DataflowCreateJavaJobOperator 已移除 beam.BeamRunJavaPipelineOperator 所有版本
DataflowCreatePythonJobOperator 已移除 beam.BeamRunPythonPipelineOperator 所有版本
DataprocSubmitPigJobOperator 已移除 DataprocSubmitJobOperator 所有版本
DataprocSubmitHiveJobOperator 已移除 DataprocSubmitJobOperator 所有版本
DataprocSubmitSparkSqlJobOperator 已移除 DataprocSubmitJobOperator 所有版本
DataprocSubmitSparkJobOperator 已移除 DataprocSubmitJobOperator 所有版本
DataprocSubmitHadoopJobOperator 已移除 DataprocSubmitJobOperator 所有版本
DataprocSubmitPySparkJobOperator 已移除 DataprocSubmitJobOperator 所有版本
BigQueryTableExistenceAsyncSensor 已移除 BigQueryTableExistenceSensor composer-2.3.0-airflow-2.5.1、composer-2.3.0-airflow-2.4.3
BigQueryTableExistencePartitionAsyncSensor 已移除 BigQueryTablePartitionExistenceSensor composer-2.3.0-airflow-2.5.1、composer-2.3.0-airflow-2.4.3
CloudComposerEnvironmentSensor 已移除 CloudComposerCreateEnvironmentOperator、CloudComposerDeleteEnvironmentOperator、CloudComposerUpdateEnvironmentOperator composer-2.3.0-airflow-2.5.1、composer-2.3.0-airflow-2.4.3
GCSObjectExistenceAsyncSensor 已移除 GCSObjectExistenceSensor composer-2.3.0-airflow-2.5.1、composer-2.3.0-airflow-2.4.3
GoogleAnalyticsHook 已移除 GoogleAnalyticsAdminHook composer-2.3.0-airflow-2.5.1、composer-2.3.0-airflow-2.4.3
GoogleAnalyticsListAccountsOperator 已移除 GoogleAnalyticsAdminListAccountsOperator composer-2.3.0-airflow-2.5.1、composer-2.3.0-airflow-2.4.3
GoogleAnalyticsGetAdsLinkOperator 已移除 GoogleAnalyticsAdminGetGoogleAdsLinkOperator composer-2.3.0-airflow-2.5.1、composer-2.3.0-airflow-2.4.3
GoogleAnalyticsRetrieveAdsLinksListOperator 已移除 GoogleAnalyticsAdminListGoogleAdsLinksOperator composer-2.3.0-airflow-2.5.1、composer-2.3.0-airflow-2.4.3
GoogleAnalyticsDataImportUploadOperator 已移除 GoogleAnalyticsAdminCreateDataStreamOperator composer-2.3.0-airflow-2.5.1、composer-2.3.0-airflow-2.4.3
GoogleAnalyticsDeletePreviousDataUploadsOperator 已移除 GoogleAnalyticsAdminDeleteDataStreamOperator composer-2.3.0-airflow-2.5.1、composer-2.3.0-airflow-2.4.3
DataPipelineHook 已移除 DataflowHook composer-2.8.6-airflow-2.9.1
composer-2.8.6-airflow-2.7.3
CreateDataPipelineOperator 已移除 DataflowCreatePipelineOperator composer-2.8.6-airflow-2.9.1
composer-2.8.6-airflow-2.7.3
RunDataPipelineOperator 已移除 DataflowRunPipelineOperator composer-2.8.6-airflow-2.9.1
composer-2.8.6-airflow-2.7.3
AutoMLDatasetLink 已淘汰,預計移除 TranslationLegacyDatasetLink composer-2.8.6-airflow-2.9.1
composer-2.8.6-airflow-2.7.3
AutoMLDatasetListLink 已淘汰,預計移除 TranslationDatasetListLink composer-2.8.6-airflow-2.9.1
composer-2.8.6-airflow-2.7.3
AutoMLModelLink 已淘汰,預計移除 TranslationLegacyModelLink composer-2.8.6-airflow-2.9.1
composer-2.8.6-airflow-2.7.3
AutoMLModelTrainLink 已淘汰,預計移除 TranslationLegacyModelTrainLink composer-2.8.6-airflow-2.9.1
composer-2.8.6-airflow-2.7.3
AutoMLModelPredictLink 已淘汰,預計移除 TranslationLegacyModelPredictLink composer-2.8.6-airflow-2.9.1
composer-2.8.6-airflow-2.7.3
AutoMLBatchPredictOperator 已移除 vertex_ai.batch_prediction_job composer-2.9.8-airflow-2.9.3
AutoMLPredictOperator 已淘汰,預計移除 vertex_aigenerative_model。TextGenerationModelPredictOperator、translate.TranslateTextOperator composer-2.8.3-airflow-2.7.3
PromptLanguageModelOperator 已移除 TextGenerationModelPredictOperator composer-2.8.6-airflow-2.9.1
composer-2.8.6-airflow-2.7.3
GenerateTextEmbeddingsOperator 已移除 TextEmbeddingModelGetEmbeddingsOperator composer-2.8.6-airflow-2.9.1
composer-2.8.6-airflow-2.7.3
PromptMultimodalModelOperator 已移除 GenerativeModelGenerateContentOperator composer-2.8.6-airflow-2.9.1
composer-2.8.6-airflow-2.7.3
PromptMultimodalModelWithMediaOperator 已移除 GenerativeModelGenerateContentOperator composer-2.8.6-airflow-2.9.1
composer-2.8.6-airflow-2.7.3
DataflowStartSqlJobOperator 已移除 DataflowStartYamlJobOperator composer-2.9.5-airflow-2.9.3
composer-2.9.5-airflow-2.9.1
LifeSciencesHook 已淘汰,預計移除 Google Cloud 批次運算子的掛鉤 敬請期待
DataprocScaleClusterOperator 已淘汰,預計移除 DataprocUpdateClusterOperator 敬請期待
MLEngineStartBatchPredictionJobOperator 已淘汰,預計移除 CreateBatchPredictionJobOperator 敬請期待
MLEngineManageModelOperator 已淘汰,預計移除 MLEngineCreateModelOperator、MLEngineGetModelOperator 敬請期待
MLEngineGetModelOperator 已淘汰,預計移除 GetModelOperator 敬請期待
MLEngineDeleteModelOperator 已淘汰,預計移除 DeleteModelOperator 敬請期待
MLEngineManageVersionOperator 已淘汰,預計移除 MLEngineCreateVersion、MLEngineSetDefaultVersion、MLEngineListVersions、MLEngineDeleteVersion 敬請期待
MLEngineCreateVersionOperator 已淘汰,預計移除 Vertex AI 運算子的 parent_model 參數 敬請期待
MLEngineSetDefaultVersionOperator 已淘汰,預計移除 SetDefaultVersionOnModelOperator 敬請期待
MLEngineListVersionsOperator 已淘汰,預計移除 ListModelVersionsOperator 敬請期待
MLEngineDeleteVersionOperator 已淘汰,預計移除 DeleteModelVersionOperator 敬請期待
MLEngineStartTrainingJobOperator 已淘汰,預計移除 CreateCustomPythonPackageTrainingJobOperator 敬請期待
MLEngineTrainingCancelJobOperator 已淘汰,預計移除 CancelCustomTrainingJobOperator 敬請期待
LifeSciencesRunPipelineOperator 已淘汰,預計移除 Google Cloud Batch 運算子 敬請期待
MLEngineCreateModelOperator 已淘汰,預計移除 對應的 VertexAI 運算子 敬請期待

後續步驟