Skip to content

Commit a5439ee

Browse files
deedmitrijDmytro Khimich
andauthored
Add index to the dataset name to have separate dataset for each example DAG (#18459)
Co-authored-by: Dmytro Khimich <[email protected]>
1 parent 9bf0ed2 commit a5439ee

File tree

1 file changed

+24
-25
lines changed

1 file changed

+24
-25
lines changed

airflow/providers/google/cloud/example_dags/example_bigquery_queries.py

Lines changed: 24 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -43,50 +43,49 @@
4343
TABLE_1 = "table1"
4444
TABLE_2 = "table2"
4545

46-
47-
INSERT_DATE = datetime.now().strftime("%Y-%m-%d")
48-
49-
# [START howto_operator_bigquery_query]
50-
INSERT_ROWS_QUERY = (
51-
f"INSERT {DATASET_NAME}.{TABLE_1} VALUES "
52-
f"(42, 'monthy python', '{INSERT_DATE}'), "
53-
f"(42, 'fishy fish', '{INSERT_DATE}');"
54-
)
55-
# [END howto_operator_bigquery_query]
56-
5746
SCHEMA = [
5847
{"name": "value", "type": "INTEGER", "mode": "REQUIRED"},
5948
{"name": "name", "type": "STRING", "mode": "NULLABLE"},
6049
{"name": "ds", "type": "DATE", "mode": "NULLABLE"},
6150
]
6251

63-
for location in [None, LOCATION]:
52+
locations = [None, LOCATION]
53+
for index, location in enumerate(locations, 1):
6454
dag_id = "example_bigquery_queries_location" if location else "example_bigquery_queries"
55+
DATASET = DATASET_NAME + str(index)
56+
INSERT_DATE = datetime.now().strftime("%Y-%m-%d")
57+
# [START howto_operator_bigquery_query]
58+
INSERT_ROWS_QUERY = (
59+
f"INSERT {DATASET}.{TABLE_1} VALUES "
60+
f"(42, 'monthy python', '{INSERT_DATE}'), "
61+
f"(42, 'fishy fish', '{INSERT_DATE}');"
62+
)
63+
# [END howto_operator_bigquery_query]
6564

6665
with models.DAG(
6766
dag_id,
6867
schedule_interval='@once', # Override to match your needs
6968
start_date=days_ago(1),
7069
tags=["example"],
71-
user_defined_macros={"DATASET": DATASET_NAME, "TABLE": TABLE_1},
70+
user_defined_macros={"DATASET": DATASET, "TABLE": TABLE_1},
7271
) as dag_with_locations:
7372
create_dataset = BigQueryCreateEmptyDatasetOperator(
7473
task_id="create-dataset",
75-
dataset_id=DATASET_NAME,
74+
dataset_id=DATASET,
7675
location=location,
7776
)
7877

7978
create_table_1 = BigQueryCreateEmptyTableOperator(
8079
task_id="create_table_1",
81-
dataset_id=DATASET_NAME,
80+
dataset_id=DATASET,
8281
table_id=TABLE_1,
8382
schema_fields=SCHEMA,
8483
location=location,
8584
)
8685

8786
create_table_2 = BigQueryCreateEmptyTableOperator(
8887
task_id="create_table_2",
89-
dataset_id=DATASET_NAME,
88+
dataset_id=DATASET,
9089
table_id=TABLE_2,
9190
schema_fields=SCHEMA,
9291
location=location,
@@ -95,7 +94,7 @@
9594
create_dataset >> [create_table_1, create_table_2]
9695

9796
delete_dataset = BigQueryDeleteDatasetOperator(
98-
task_id="delete_dataset", dataset_id=DATASET_NAME, delete_contents=True
97+
task_id="delete_dataset", dataset_id=DATASET, delete_contents=True
9998
)
10099

101100
# [START howto_operator_bigquery_insert_job]
@@ -140,8 +139,8 @@
140139
configuration={
141140
"query": {
142141
"query": [
143-
f"SELECT * FROM {DATASET_NAME}.{TABLE_2}",
144-
f"SELECT COUNT(*) FROM {DATASET_NAME}.{TABLE_2}",
142+
f"SELECT * FROM {DATASET}.{TABLE_2}",
143+
f"SELECT COUNT(*) FROM {DATASET}.{TABLE_2}",
145144
],
146145
"useLegacySql": False,
147146
}
@@ -153,11 +152,11 @@
153152
task_id="execute_query_save",
154153
configuration={
155154
"query": {
156-
"query": f"SELECT * FROM {DATASET_NAME}.{TABLE_1}",
155+
"query": f"SELECT * FROM {DATASET}.{TABLE_1}",
157156
"useLegacySql": False,
158157
"destinationTable": {
159158
'projectId': PROJECT_ID,
160-
'datasetId': DATASET_NAME,
159+
'datasetId': DATASET,
161160
'tableId': TABLE_2,
162161
},
163162
}
@@ -168,7 +167,7 @@
168167
# [START howto_operator_bigquery_get_data]
169168
get_data = BigQueryGetDataOperator(
170169
task_id="get_data",
171-
dataset_id=DATASET_NAME,
170+
dataset_id=DATASET,
172171
table_id=TABLE_1,
173172
max_results=10,
174173
selected_fields="value,name",
@@ -184,7 +183,7 @@
184183
# [START howto_operator_bigquery_check]
185184
check_count = BigQueryCheckOperator(
186185
task_id="check_count",
187-
sql=f"SELECT COUNT(*) FROM {DATASET_NAME}.{TABLE_1}",
186+
sql=f"SELECT COUNT(*) FROM {DATASET}.{TABLE_1}",
188187
use_legacy_sql=False,
189188
location=location,
190189
)
@@ -193,7 +192,7 @@
193192
# [START howto_operator_bigquery_value_check]
194193
check_value = BigQueryValueCheckOperator(
195194
task_id="check_value",
196-
sql=f"SELECT COUNT(*) FROM {DATASET_NAME}.{TABLE_1}",
195+
sql=f"SELECT COUNT(*) FROM {DATASET}.{TABLE_1}",
197196
pass_value=4,
198197
use_legacy_sql=False,
199198
location=location,
@@ -203,7 +202,7 @@
203202
# [START howto_operator_bigquery_interval_check]
204203
check_interval = BigQueryIntervalCheckOperator(
205204
task_id="check_interval",
206-
table=f"{DATASET_NAME}.{TABLE_1}",
205+
table=f"{DATASET}.{TABLE_1}",
207206
days_back=1,
208207
metrics_thresholds={"COUNT(*)": 1.5},
209208
use_legacy_sql=False,

0 commit comments

Comments
 (0)