airflow中EmrStepSensor
时间: 2025-02-12 11:21:32 浏览: 39
### 使用 EmrStepOperator 和 EmrStepSensor 配合监控 EMR 步骤执行状态
在 Airflow 中,`EmrAddStepsOperator` 可用于向 Amazon EMR 发送步骤请求,而 `EmrStepSensor` 则可以用来轮询这些步骤的状态直到完成。这允许工作流暂停并等待特定的 EMR 处理任务结束再继续后续的任务。
#### 创建 EMR Step 并提交给集群
为了启动一个 EMR step,在 DAG 文件中定义如下操作符:
```python
from airflow.providers.amazon.aws.operators.emr_add_steps import EmrAddStepsOperator
add_steps_to_emr_cluster = EmrAddStepsOperator(
task_id='add_steps',
job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow', key='return_value') }}",
aws_conn_id='aws_default',
steps=[
{
'Name': 'calculate_pi',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': '/path/to/jar',
'Args': [
'--arg1=value1',
'--arg2=value2'
]
}
}
],
)
```
此部分代码展示了如何配置 `EmrAddStepsOperator` 来增加新的处理步骤至指定 ID 的 EMR 集群上[^1]。
#### 添加传感器来监测 EMR Steps 执行情况
一旦步骤被加入到 EMR 集群之后,就可以利用 `EmrStepSensor` 对其进行监视直至完成:
```python
from airflow.providers.amazon.aws.sensors.emr_step import EmrStepSensor
watch_step_completion = EmrStepSensor(
task_id='watch_step',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
step_id="{{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')[0] }}", # 假设只添加了一个step
aws_conn_id='aws_default',
)
```
这里设置了 `EmrStepSensor` 实例化对象,并指定了要监听的具体步骤 ID 和关联的 JobFlow (即 EMR Cluster)。
上述两个组件共同作用下实现了对 EMR 上运行作业的有效管理和跟踪,确保只有当先前的操作完成后才会触发下一个动作。
阅读全文
相关推荐


















