Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
本頁面說明如何在 Cloud Composer 中啟用 CeleryKubernetesExecutor,以及如何在 DAG 中使用 KubernetesExecutor。
關於 CeleryKubernetesExecutor
CeleryKubernetesExecutor 是一種執行緒,可同時使用 CeleryExecutor 和 KubernetesExecutor。Airflow 會根據您為工作定義的佇列,選取執行程。在一個 DAG 中,您可以使用 CeleryExecutor 執行部分工作,並使用 KubernetesExecutor 執行其他工作:
- CeleryExecutor 已針對快速且可擴充的工作執行作業進行最佳化。
- KubernetesExecutor 是專為執行資源密集型工作和隔離執行工作而設計。
Cloud Composer 中的 CeleryKubernetesExecutor
Cloud Composer 中的 CeleryKubernetesExecutor 可讓您使用 KubernetesExecutor 執行工作。您無法在 Cloud Composer 中單獨使用 CeleryKubernetesExecutor。
Cloud Composer 會在環境叢集中使用 KubernetesExecutor 執行的任務,在與 Airflow 工作站相同的命名空間中執行。這類工作與 Airflow 工作者具有相同的繫結,且可存取專案中的資源。
您使用 KubernetesExecutor 執行的工作會使用 Cloud Composer 定價模式,因為這些工作會在環境叢集中執行。這些 Pod 適用於 Cloud Composer 運算 SKU (適用於 CPU、記憶體和儲存空間)。
建議您在下列情況下使用 CeleryExecutor 執行工作:
- 任務啟動時間非常重要。
- 工作不需要執行階段隔離,也不需要大量資源。
建議您在下列情況下使用 KubernetesExecutor 執行工作:
- 工作需要執行階段隔離。舉例來說,這樣一來,工作就不會在自己的 Pod 中運作,因此不會爭奪記憶體和 CPU。
- 工作需要大量資源,您想控管可用的 CPU 和記憶體資源。
比較 KubernetesExecutor 與 KubernetesPodOperator
使用 KubernetesExecutor 執行工作的方式,與使用 KubernetesPodOperator 執行工作類似。工作會在 Pod 中執行,因此可提供 Pod 層級的工作隔離功能,並提供更完善的資源管理功能。
但兩者之間還是有一些重要差異:
- KubernetesExecutor 只會在環境的版本 Cloud Composer 命名空間中執行工作。您無法在 Cloud Composer 中變更這個命名空間。您可以指定 KubernetesPodOperator 執行 Pod 工作的工作命名空間。
- KubernetesExecutor 可使用任何內建的 Airflow 運算子。KubernetesPodOperator 只會執行容器的進入點定義的提供指令碼。
- KubernetesExecutor 會使用預設的 Cloud Composer Docker 映像檔,其中包含與 Cloud Composer 環境中定義的 Python、Airflow 設定選項覆寫值、環境變數和 PyPI 套件相同的內容。
關於 Docker 映像檔
根據預設,KubernetesExecutor 會使用 Cloud Composer 為 Celery 工作站使用的 Docker 映像檔啟動工作。這是環境的 Cloud Composer 映像檔,其中包含您為環境指定的所有變更,例如自訂 PyPI 套件或環境變數。
事前準備
您可以在 Cloud Composer 3 中使用 CeleryKubernetesExecutor。
在 Cloud Composer 3 中,您只能使用 CeleryKubernetesExecutor 以外的執行緒。也就是說,您可以在單一 DAG 中使用 CeleryExecutor、KubernetesExecutor 或兩者執行工作,但無法將環境設為只使用 KubernetesExecutor 或 CeleryExecutor。
設定 CeleryKubernetesExecutor
您可能需要覆寫與 KubernetesExecutor 相關的現有 Airflow 設定選項:
[kubernetes]worker_pods_creation_batch_size
這個選項會定義每個排程器迴圈的 Kubernetes 工作站 Pod 建立呼叫數量。預設值為
1
,因此每個調度器心跳都只會啟動一個 Pod。如果您大量使用 KubernetesExecutor,建議您提高這個值。[kubernetes]worker_pods_pending_timeout
這個選項會以秒為單位,定義 worker 可在
Pending
狀態 (Pod 正在建立) 中停留多久,之後就會視為失敗。預設值為 5 分鐘。
使用 KubernetesExecutor 或 CeleryExecutor 執行工作
您可以在單一 DAG 中使用 CeleryExecutor、KubernetesExecutor 或兩者來執行工作:
- 如要使用 KubernetesExecutor 執行工作,請在工作
queue
參數中指定kubernetes
值。 - 如要使用 CeleryExecutor 執行工作,請省略
queue
參數。
以下範例會使用 KubernetesExecutor 執行 task-kubernetes
工作,並使用 CeleryExecutor 執行 task-celery
工作:
import datetime
import airflow
from airflow.operators.python_operator import PythonOperator
with airflow.DAG(
"composer_sample_celery_kubernetes",
start_date=datetime.datetime(2022, 1, 1),
schedule_interval="@daily") as dag:
def kubernetes_example():
print("This task runs using KubernetesExecutor")
def celery_example():
print("This task runs using CeleryExecutor")
# To run with KubernetesExecutor, set queue to kubernetes
task_kubernetes = PythonOperator(
task_id='task-kubernetes',
python_callable=kubernetes_example,
dag=dag,
queue='kubernetes')
# To run with CeleryExecutor, omit the queue argument
task_celery = PythonOperator(
task_id='task-celery',
python_callable=celery_example,
dag=dag)
task_kubernetes >> task_celery
執行與 KubernetesExecutor 相關的 Airflow CLI 指令
您可以使用 gcloud
執行多個與 KubernetesExecutor 相關的 Airflow CLI 指令。
自訂 worker pod 規格
您可以透過在工作 executor_config
參數中傳遞 worker pod 規格,自訂 worker pod 規格。您可以使用這個值來定義自訂 CPU 和記憶體需求。
您可以覆寫用於執行工作的整個 worker pod 規格。如要擷取 KubernetesExecutor 使用的任務的 Pod 規格,您可以執行 kubernetes generate-dag-yaml
Airflow CLI 指令。
如要進一步瞭解如何自訂 worker pod 規格,請參閱 Airflow 說明文件。
Cloud Composer 3 支援下列資源需求值:
資源 | 下限 | 上限 | 步驟 |
---|---|---|---|
CPU | 0.25 | 32 | 間距值:0.25、0.5、1、2、4、6、8、10、...、32。系統會將要求的值無條件進位至最接近的支援步驟值 (例如 5 到 6)。 |
記憶體 | 2G (GB) | 128G (GB) | 步長值:2、3、4、5、...、128。系統會將要求的值無條件進位至最接近的支援步數值 (例如 3.5G 到 4G)。 |
儲存空間 | - | 100G (GB) | 任何值。如果要求的容量超過 100 GB,系統只會提供 100 GB。 |
如要進一步瞭解 Kubernetes 中的資源單位,請參閱「Kubernetes 中的資源單位」。
以下範例說明使用自訂 worker pod 規格的任務:
PythonOperator(
task_id='custom-spec-example',
python_callable=f,
dag=dag,
queue='kubernetes',
executor_config={
'pod_override': k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name='base',
resources=k8s.V1ResourceRequirements(requests={
'cpu': '0.5',
'memory': '2G',
})
),
],
),
)
},
)
查看工作記錄檔
您可以在「Logs」分頁中查看 KubernetesExecutor 執行的工作記錄,以及 CeleryExecutor 執行的工作記錄:
前往 Google Cloud 控制台的「Environments」頁面。
在環境清單中,按一下環境名稱。「環境詳細資料」頁面隨即開啟。
前往「Logs」分頁。
依序前往「All logs」>「Airflow logs」>「Workers」。
名為
airflow-k8s-worker
的工作站會執行 KubernetesExecutor 工作。如要查看特定工作記錄,您可以在搜尋中使用 DAG ID 或工作 ID 做為關鍵字。