Skip to content

Commit a7e144b

Browse files
TobKedturbaszek
andauthored
Google Dataflow Hook to handle no Job Type (#14914)
Co-authored-by: Tomek Urbaszek <[email protected]>
1 parent 7c2ed53 commit a7e144b

File tree

2 files changed

+29
-1
lines changed

2 files changed

+29
-1
lines changed

airflow/providers/google/cloud/hooks/dataflow.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -404,7 +404,7 @@ def _check_dataflow_job_state(self, job) -> bool:
404404
:raise: Exception
405405
"""
406406
if self._wait_until_finished is None:
407-
wait_for_running = job['type'] == DataflowJobType.JOB_TYPE_STREAMING
407+
wait_for_running = job.get('type') == DataflowJobType.JOB_TYPE_STREAMING
408408
else:
409409
wait_for_running = not self._wait_until_finished
410410

tests/providers/google/cloud/hooks/test_dataflow.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1414,6 +1414,34 @@ def test_check_dataflow_job_state_wait_until_finished(
14141414
result = dataflow_job._check_dataflow_job_state(job)
14151415
assert result == expected_result
14161416

1417+
# fmt: off
1418+
@parameterized.expand([
1419+
# RUNNING
1420+
(DataflowJobStatus.JOB_STATE_RUNNING, None, False),
1421+
(DataflowJobStatus.JOB_STATE_RUNNING, True, False),
1422+
(DataflowJobStatus.JOB_STATE_RUNNING, False, True),
1423+
# AWAITING STATE
1424+
(DataflowJobStatus.JOB_STATE_PENDING, None, False),
1425+
(DataflowJobStatus.JOB_STATE_PENDING, True, False),
1426+
(DataflowJobStatus.JOB_STATE_PENDING, False, True),
1427+
])
1428+
# fmt: on
1429+
def test_check_dataflow_job_state_without_job_type(self, job_state, wait_until_finished, expected_result):
1430+
job = {"id": "id-2", "name": "name-2", "currentState": job_state}
1431+
dataflow_job = _DataflowJobsController(
1432+
dataflow=self.mock_dataflow,
1433+
project_number=TEST_PROJECT,
1434+
name="name-",
1435+
location=TEST_LOCATION,
1436+
poll_sleep=0,
1437+
job_id=None,
1438+
num_retries=20,
1439+
multiple_jobs=True,
1440+
wait_until_finished=wait_until_finished,
1441+
)
1442+
result = dataflow_job._check_dataflow_job_state(job)
1443+
assert result == expected_result
1444+
14171445
# fmt: off
14181446
@parameterized.expand([
14191447
(DataflowJobType.JOB_TYPE_BATCH, DataflowJobStatus.JOB_STATE_FAILED,

0 commit comments

Comments
 (0)