添加和更新 DAG

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

本页介绍了如何在 Cloud Composer 环境中管理 DAG。

Cloud Composer 使用 Cloud Storage 存储分区来存储 Cloud Composer 环境的 DAG。您的环境会将 DAG 从此存储分区同步到 Airflow 组件,例如 Airflow 工作器和调度器。

准备工作

  • 由于 Apache Airflow 不提供强大的 DAG 隔离功能,建议您分开维护生产环境和测试环境,以防止产生 DAG 干扰。如需了解详情,请参阅测试 DAG
  • 确保您的账号具有足够的权限来管理 DAG。
  • 对 DAG 的更改会在 3 到 5 分钟内传播到 Airflow。您可以在 Airflow 网页界面中查看任务状态。

访问您环境的存储分区

如需访问与您的环境关联的存储分区,请执行以下操作:

控制台

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 在环境列表中,找到包含环境名称的行,然后在 DAG 文件夹列中,点击 DAG 链接。存储分区详情页面随即打开。它会显示环境存储分区中 /dags 文件夹的内容。

gcloud

gcloud CLI 提供了用于在环境存储分区中添加删除 DAG 的单独命令。

如果您想与环境的存储分区进行交互,还可以使用 Google Cloud CLI。如需获取环境存储分区的地址,请运行以下 gcloud CLI 命令:

gcloud composer environments describe ENVIRONMENT_NAME \
    --location LOCATION \
    --format="get(config.dagGcsPrefix)"

您需要进行如下替换:

  • ENVIRONMENT_NAME 替换为环境的名称。
  • LOCATION 替换为环境所在的区域。

例如:

gcloud beta composer environments describe example-environment \
    --location us-central1 \
    --format="get(config.dagGcsPrefix)"

API

构建 environments.get API 请求。在 Environment 资源的 EnvironmentConfig 资源的 dagGcsPrefix 资源中,是环境存储分区的地址。

示例:

GET https://composer.googleapis.com/v1/projects/example-project/
locations/us-central1/environments/example-environment

Python

使用 google-auth 库获取凭据,并使用 requests 库来调用 REST API。

import google.auth
import google.auth.transport.requests

# Authenticate with Google Cloud.
# See: https://cloud.google.com/docs/authentication/getting-started
credentials, _ = google.auth.default(
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
authed_session = google.auth.transport.requests.AuthorizedSession(credentials)

# project_id = 'YOUR_PROJECT_ID'
# location = 'us-central1'
# composer_environment = 'YOUR_COMPOSER_ENVIRONMENT_NAME'

environment_url = (
    "https://composer.googleapis.com/v1beta1/projects/{}/locations/{}"
    "/environments/{}"
).format(project_id, location, composer_environment)
response = authed_session.request("GET", environment_url)
environment_data = response.json()

# Print the bucket name from the response body.
print(environment_data["config"]["dagGcsPrefix"])

添加或更新 DAG

如需添加或更新 DAG,请将 DAG 的 Python .py 文件移至环境存储分区中的 /dags 文件夹。

控制台

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 在环境列表中,找到包含环境名称的行,然后在 DAG 文件夹列中,点击 DAG 链接。存储分区详情页面随即打开。它会显示环境存储分区中 /dags 文件夹的内容。

  3. 点击上传文件。然后,使用浏览器的对话框选择 DAG 的 Python .py 文件,并进行确认。

gcloud

gcloud composer environments storage dags import \
    --environment ENVIRONMENT_NAME \
    --location LOCATION \
    --source="LOCAL_FILE_TO_UPLOAD"

替换:

  • ENVIRONMENT_NAME 替换为环境的名称。
  • LOCATION 替换为环境所在的区域。
  • LOCAL_FILE_TO_UPLOAD 是 DAG 的 Python .py 文件。

示例:

gcloud composer environments storage dags import \
    --environment example-environment \
    --location us-central1 \
    --source="example_dag.py"

更新具有活跃 DAG 运行作业的 DAG

如果您更新具有活跃 DAG 运行作业的 DAG:

  • 所有当前正在执行的任务都会使用原始 DAG 文件继续运行。
  • 已安排但尚未运行的所有任务都会使用更新后的 DAG 文件。
  • 更新后的 DAG 文件中不再存在的所有任务都会被标记为已移除。

更新频繁运行的 DAG

上传 DAG 文件后,Airflow 需要一些时间来加载此文件并更新此 DAG。如果您的 DAG 频繁运行,您可能需要确保 DAG 使用更新后的 DAG 文件版本。为此,请执行以下操作:

  1. 在 Airflow 界面中暂停 DAG。

  2. 上传更新后的 DAG 文件。

  3. 请等待,直到您在 Airflow 界面中看到更新。这意味着调度器已正确解析 DAG,并在 Airflow 数据库中进行了更新。

    即使 Airflow 界面显示更新后的 DAG,也无法保证 Airflow 工作器拥有更新后的 DAG 文件版本。这是因为 DAG 文件不与调度器和工作器一起同步。

  4. 您可能需要延长等待时间,以确保 DAG 文件与环境中的所有工作器同步。系统每分钟执行多次同步。在运行状况良好的环境中,等待大约 20-30 秒足以让所有工作器完成同步。

  5. (可选)如果要完全确保所有工作器都具有新版 DAG 文件,请检查每个工作器的日志。为此,请执行以下操作:

    1. 在 Google Cloud 控制台中打开环境的日志标签页。

    2. 依次前往 Composer 日志 > 基础架构 > Cloud Storage 同步项,然后检查环境中每个工作器的日志。查找时间戳在您上传新 DAG 文件之后的最新 Syncing dags directory 日志项。如果您看到后面跟随 Finished syncing 项,则相应 DAG 在此工作器上成功同步。

  6. 取消暂停 DAG。

重新解析 DAG

由于 DAG 存储在环境的存储分区中,因此每个 DAG 都会先同步到 DAG 处理器,然后 DAG 处理器会稍微延迟进行解析。如果您手动重新解析 DAG(例如通过 Airflow 界面),则 DAG 处理器会重新解析可供其使用的当前 DAG 版本,该版本可能不是您上传到环境存储分区的最新 DAG 版本。

我们建议仅在遇到解析时间过长时使用按需重新解析。例如,如果您的环境中有大量文件,或者在 Airflow 配置选项中配置了较长的 DAG 解析间隔时间,就可能会发生这种情况。

在您的环境中删除 DAG

如需删除 DAG,请从环境存储分区内的环境 /dags 文件夹中移除 DAG 的 Python .py 文件。

控制台

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 在环境列表中,找到包含环境名称的行,然后在 DAG 文件夹列中,点击 DAG 链接。存储分区详情页面随即打开。它会显示环境存储分区中的 /dags 文件夹的内容。

  3. 选择 DAG 文件,点击删除,然后确认操作。

gcloud

gcloud composer environments storage dags delete \
    --environment ENVIRONMENT_NAME \
    --location LOCATION \
    DAG_FILE

替换:

  • ENVIRONMENT_NAME 替换为环境的名称。
  • LOCATION 替换为环境所在的区域。
  • DAG_FILE 替换为 DAG 的 Python .py 文件。

示例:

gcloud composer environments storage dags delete \
    --environment example-environment \
    --location us-central1 \
    example_dag.py

从 Airflow 界面中移除 DAG

如需从 Airflow 网页界面中移除 DAG 元数据,请执行以下操作:

Airflow 界面

  1. 前往您的环境的 Airflow 界面
  2. 对于 DAG,请点击删除 DAG

gcloud

在 gcloud CLI 中运行以下命令:

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags delete -- DAG_NAME

您需要进行如下替换:

  • ENVIRONMENT_NAME 替换为环境的名称。
  • LOCATION 替换为环境所在的区域。
  • DAG_NAME 是要删除的 DAG 的名称。

后续步骤