在 Dataproc 叢集中執行 Hadoop 字數計算工作

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

本教學課程將說明如何使用 Cloud Composer 建立 Apache Airflow DAG (有向非循環圖),在 Dataproc 叢集中執行 Apache Hadoop 字數計數工作。

目標

  1. 存取 Cloud Composer 環境並使用 Airflow UI
  2. 建立及查看 Airflow 環境變數。
  3. 建立並執行包含下列工作的 DAG:
    1. 建立 Dataproc 叢集。
    2. 在叢集中執行 Apache Hadoop 字數計算工作。
    3. 將字數結果輸出至 Cloud Storage 值區。
    4. 刪除叢集。

費用

在本文件中,您會使用 Google Cloud的下列計費元件:

  • Cloud Composer
  • Dataproc
  • Cloud Storage

您可以使用 Pricing Calculator 根據預測用量產生預估費用。 新 Google Cloud 使用者可能符合申請免費試用的資格。

事前準備

  • 請確認專案已啟用下列 API:

    主控台

    Enable the Dataproc, Cloud Storage APIs.

    Enable the APIs

    gcloud

    Enable the Dataproc, Cloud Storage APIs:

    gcloud services enable dataproc.googleapis.com storage-component.googleapis.com

  • 在專案中,建立 Cloud Storage 值區,可儲存任何儲存類別和區域的 Hadoop 字數計數工作結果。

  • 請記下您建立的值區路徑,例如 gs://example-bucket。您將為這個路徑定義 Airflow 變數,並在本教學課程稍後的範例 DAG 中使用該變數。

  • 使用預設參數建立 Cloud Composer 環境。等待環境建立作業完成。完成後,環境名稱左側會顯示綠色勾號。

  • 請記下建立環境的區域,例如 us-central。您將為這個區域定義 Airflow 變數,並在範例 DAG 中使用該變數,在同一個地區執行 Dataproc 叢集。

設定 Airflow 變數

設定 Airflow 變數,以便稍後在範例 DAG 中使用。舉例來說,您可以在 Airflow UI 中設定 Airflow 變數。

Airflow 變數
gcp_project 您在本教學課程中使用的專案的專案 ID,例如 example-project
gcs_bucket 您為本教學課程建立的 Cloud Storage URI 值區,例如 gs://example-bucket.
gce_region 您建立環境的區域,例如 us-central1。這是 Dataproc 叢集的建立區域。

查看工作流程範例

Airflow DAG 是您要排程及執行的經過組織的工作集合。DAG 是在標準 Python 檔案中定義。hadoop_tutorial.py 中的程式碼是工作流程程式碼。

"""Example Airflow DAG that creates a Cloud Dataproc cluster, runs the Hadoop
wordcount example, and deletes the cluster.

This DAG relies on three Airflow variables
https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
* gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster.
* gce_region - Google Compute Engine region where Cloud Dataproc cluster should be
  created.
* gcs_bucket - Google Cloud Storage bucket to use for result of Hadoop job.
  See https://cloud.google.com/storage/docs/creating-buckets for creating a
  bucket.
"""

import datetime
import os

from airflow import models
from airflow.providers.google.cloud.operators import dataproc
from airflow.utils import trigger_rule

# Output file for Cloud Dataproc job.
# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
output_file = (
    os.path.join(
        "{{ var.value.gcs_bucket }}",
        "wordcount",
        datetime.datetime.now().strftime("%Y%m%d-%H%M%S"),
    )
    + os.sep
)
# Path to Hadoop wordcount example available on every Dataproc cluster.
WORDCOUNT_JAR = "file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar"
# Arguments to pass to Cloud Dataproc job.
input_file = "gs://pub/shakespeare/rose.txt"
wordcount_args = ["wordcount", input_file, output_file]

HADOOP_JOB = {
    "reference": {"project_id": "{{ var.value.gcp_project }}"},
    "placement": {"cluster_name": "composer-hadoop-tutorial-cluster-{{ ds_nodash }}"},
    "hadoop_job": {
        "main_jar_file_uri": WORDCOUNT_JAR,
        "args": wordcount_args,
    },
}

CLUSTER_CONFIG = {
    "master_config": {"num_instances": 1, "machine_type_uri": "n1-standard-2"},
    "worker_config": {"num_instances": 2, "machine_type_uri": "n1-standard-2"},
}

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1), datetime.datetime.min.time()
)

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    "start_date": yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    "email_on_failure": False,
    "email_on_retry": False,
    # If a task fails, retry it once after waiting at least 5 minutes
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "project_id": "{{ var.value.gcp_project }}",
    "region": "{{ var.value.gce_region }}",
}


with models.DAG(
    "composer_hadoop_tutorial",
    # Continue to run DAG once per day
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:

    # Create a Cloud Dataproc cluster.
    create_dataproc_cluster = dataproc.DataprocCreateClusterOperator(
        task_id="create_dataproc_cluster",
        # Give the cluster a unique name by appending the date scheduled.
        # See https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
        cluster_name="composer-hadoop-tutorial-cluster-{{ ds_nodash }}",
        cluster_config=CLUSTER_CONFIG,
        region="{{ var.value.gce_region }}",
    )

    # Run the Hadoop wordcount example installed on the Cloud Dataproc cluster
    # master node.
    run_dataproc_hadoop = dataproc.DataprocSubmitJobOperator(
        task_id="run_dataproc_hadoop", job=HADOOP_JOB
    )

    # Delete Cloud Dataproc cluster.
    delete_dataproc_cluster = dataproc.DataprocDeleteClusterOperator(
        task_id="delete_dataproc_cluster",
        cluster_name="composer-hadoop-tutorial-cluster-{{ ds_nodash }}",
        region="{{ var.value.gce_region }}",
        # Setting trigger_rule to ALL_DONE causes the cluster to be deleted
        # even if the Dataproc job fails.
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE,
    )

    # Define DAG dependencies.
    create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

運算子

為了自動化調度管理範例工作流程中的三項工作,DAG 會匯入下列三個 Airflow 運算子:

  • DataprocClusterCreateOperator:建立 Dataproc 叢集。

  • DataProcHadoopOperator:提交 Hadoop 字數計數工作,並將結果寫入 Cloud Storage 值區。

  • DataprocClusterDeleteOperator:刪除叢集,避免產生持續性的 Compute Engine 費用。

依附元件

您可以按照工作之間的關係和依附元件,安排要執行的工作。這個 DAG 中的任務會依序執行。

# Define DAG dependencies.
create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

排程

DAG 的名稱為 composer_hadoop_tutorial,且每天執行一次。由於傳入 default_dag_argsstart_date 已設為 yesterday,Cloud Composer 會在 DAG 上傳至環境的值區後,立即排程開始工作流程。

with models.DAG(
    "composer_hadoop_tutorial",
    # Continue to run DAG once per day
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:

將 DAG 上傳至環境的值區

Cloud Composer 會將 DAG 儲存在環境值區的 /dags 資料夾中。

如何上傳 DAG:

  1. 在本機電腦上儲存 hadoop_tutorial.py

  2. 前往 Google Cloud 控制台的「Environments」頁面。

    前往「環境」

  3. 在環境清單中,按一下環境的「DAG 資料夾」欄中的「DAG」連結。

  4. 按一下「上傳」檔案。

  5. 選取本機上的 hadoop_tutorial.py,然後按一下「Open」

Cloud Composer 會將 DAG 新增至 Airflow,並自動排定 DAG。DAG 會在 3 到 5 分鐘內產生變化。

探索 DAG 執行作業

查看工作狀態

當您將 DAG 檔案上傳到 Cloud Storage 中的 dags/ 資料夾後,Cloud Composer 會剖析檔案。完成後,工作流程名稱會顯示在 DAG 清單中,且工作流程會排入要立即執行的作業佇列。

  1. 如要查看工作狀態,請前往 Airflow 網頁介面,然後按一下工具列中的「DAG」

  2. 如要開啟 DAG 詳細資料頁面,請按一下 composer_hadoop_tutorial。此頁面會透過圖形呈現工作流程工作和依附元件。

  3. 如要查看各項工作的狀態,請按一下「圖表檢視」,然後將滑鼠游標懸停在各項工作的圖表上。

再次將工作流程排入佇列

如要再次從「圖表檢視畫面」執行工作流程:

  1. 在 Airflow 使用者介面的圖表檢視畫面中,按一下 create_dataproc_cluster 圖示。
  2. 如要重設這三項工作,請按一下「清除」,然後點選「確定」確認。
  3. 再次在圖表檢視畫面中按一下 create_dataproc_cluster
  4. 如要再次排入工作流程,請按一下「執行」

查看工作結果

您也可以前往下列 Google Cloud 控制台頁面,查看 composer_hadoop_tutorial 工作流程的狀態和結果:

  • Dataproc 叢集:監控叢集建立和刪除作業。請注意,工作流程建立的叢集是暫時性的:只會在工作流程期間存在,並在最後一個工作流程工作結束時刪除。

    前往「Dataproc Clusters」(Dataproc 叢集)

  • Dataproc 工作:查看或監控 Apache Hadoop 字數計數工作。按一下工作 ID 即可查看工作記錄輸出內容。

    前往 Dataproc 工作

  • Cloud Storage 瀏覽器:查看您為本教學課程建立的 Cloud Storage 值區中 wordcount 資料夾中的 wordcount 結果。

    前往 Cloud Storage 瀏覽器

清除所用資源

刪除本教學課程中使用的資源

  1. 刪除 Cloud Composer 環境,包括手動刪除環境的值區。

  2. 刪除儲存 Hadoop 字數計數工作結果的 Cloud Storage 值區