-
Notifications
You must be signed in to change notification settings - Fork 196
/
Copy pathquery_task.py
115 lines (95 loc) · 3.59 KB
/
query_task.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
"""Given a model, view, and fields, create a query, and then a query task to asynchronously execute.
$ python query_task.py <model_name> <view_name> <field_1> <field_2> ...
Examples:
$ python query_task.py thelook users users.first_name users.last_name ...
Last modified: August 25
"""
import sys
import textwrap
import time
from typing import List
import looker_sdk
from looker_sdk import models40 as models
sdk = looker_sdk.init40("../../looker.ini")
def main_models(model: str, view: str, fields: List[str]) -> str:
"""QueryTask logic implemented using model class instances."""
print("Running model class instance form of SDK")
query = sdk.create_query(
body=models.WriteQuery(model=model, view=view, fields=fields)
)
# WriteCreateQueryTask.result_format is an enum
create_query_task = models.WriteCreateQueryTask(
query_id=query.id, result_format=models.ResultFormat.csv
)
task = sdk.create_query_task(
body=create_query_task,
limit=10,
)
elapsed = 0.0
delay = 0.5 # wait .5 seconds
while True:
poll = sdk.query_task(query_task_id=task.id)
if poll.status == "failure" or poll.status == "error":
print(poll)
raise Exception("Query failed")
elif poll.status == "complete":
break
time.sleep(delay)
elapsed += delay
print(f"query task completed in {elapsed} seconds")
return sdk.query_task_results(task.id)
def main_dictionaries(model, view, fields):
"""QueryTask logic implemented using dictionaries."""
# Note - dictionary form does not adhere to the type safety in place
# for the SDK. Here you will see the following mypy error:
#
# Error: mypy: Argument "body" to "create_query" of "Looker40SDK" has
# incompatible type "Dict[str, Any]"; expected "WriteQuery"
#
# As long as you are respecting what the API can take as input you should
# be fine. However, the SDK will send whatever you provide, including
# extraneous input and give you back whatever the API server replies with.
print("Running dictionary form of SDK")
query = sdk.create_query(
body={"model": model, "view": view, "fields": fields} # type: ignore
)
# In dictionary form "result_format" key is just a string, not an enum.
task = sdk.create_query_task(
body={"query_id": query["id"], "result_format": "csv"}, # type: ignore
limit=10,
)
elapsed = 0.0
delay = 0.5 # wait .5 seconds
while True:
poll = sdk.query_task(query_task_id=task["id"])
if poll["status"] == "error":
print(poll)
raise Exception("Query failed")
elif poll["status"] == "complete":
break
time.sleep(delay)
elapsed += delay
print(f"query task completed in {elapsed} seconds")
return sdk.query_task_results(task["id"])
def main():
model = sys.argv[1] if len(sys.argv) > 1 else ""
view = sys.argv[1] if len(sys.argv) > 1 else ""
try:
model, view, *fields = sys.argv[1:]
except ValueError:
raise Exception(
textwrap.dedent(
"""
Please provide: <model> <view> <field1> [<field2> ...]
"""
)
)
result = main_models(model, view, fields)
# an alternate implementation using dictionaries
# result = main_dictionaries(model, view, fields)
filename = f"{model}--{view}--{('-').join(fields)}"
with open(filename, "w") as f:
f.write(result)
print(f"Look saved to '{filename}'")
if __name__ == "__main__":
sys.exit(main())