- Notifications
You must be signed in to change notification settings - Fork 196
/
Copy pathquery_task.py
115 lines (95 loc) · 3.59 KB
/
query_task.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
"""Given a model, view, and fields, create a query, and then a query task to asynchronously execute.
$ python query_task.py <model_name> <view_name> <field_1> <field_2> ...
Examples:
$ python query_task.py thelook users users.first_name users.last_name ...
Last modified: August 25
"""
importsys
importtextwrap
importtime
fromtypingimportList
importlooker_sdk
fromlooker_sdkimportmodels40asmodels
sdk=looker_sdk.init40("../../looker.ini")
defmain_models(model: str, view: str, fields: List[str]) ->str:
"""QueryTask logic implemented using model class instances."""
print("Running model class instance form of SDK")
query=sdk.create_query(
body=models.WriteQuery(model=model, view=view, fields=fields)
)
# WriteCreateQueryTask.result_format is an enum
create_query_task=models.WriteCreateQueryTask(
query_id=query.id, result_format=models.ResultFormat.csv
)
task=sdk.create_query_task(
body=create_query_task,
limit=10,
)
elapsed=0.0
delay=0.5# wait .5 seconds
whileTrue:
poll=sdk.query_task(query_task_id=task.id)
ifpoll.status=="failure"orpoll.status=="error":
print(poll)
raiseException("Query failed")
elifpoll.status=="complete":
break
time.sleep(delay)
elapsed+=delay
print(f"query task completed in {elapsed} seconds")
returnsdk.query_task_results(task.id)
defmain_dictionaries(model, view, fields):
"""QueryTask logic implemented using dictionaries."""
# Note - dictionary form does not adhere to the type safety in place
# for the SDK. Here you will see the following mypy error:
#
# Error: mypy: Argument "body" to "create_query" of "Looker40SDK" has
# incompatible type "Dict[str, Any]"; expected "WriteQuery"
#
# As long as you are respecting what the API can take as input you should
# be fine. However, the SDK will send whatever you provide, including
# extraneous input and give you back whatever the API server replies with.
print("Running dictionary form of SDK")
query=sdk.create_query(
body={"model": model, "view": view, "fields": fields} # type: ignore
)
# In dictionary form "result_format" key is just a string, not an enum.
task=sdk.create_query_task(
body={"query_id": query["id"], "result_format": "csv"}, # type: ignore
limit=10,
)
elapsed=0.0
delay=0.5# wait .5 seconds
whileTrue:
poll=sdk.query_task(query_task_id=task["id"])
ifpoll["status"] =="error":
print(poll)
raiseException("Query failed")
elifpoll["status"] =="complete":
break
time.sleep(delay)
elapsed+=delay
print(f"query task completed in {elapsed} seconds")
returnsdk.query_task_results(task["id"])
defmain():
model=sys.argv[1] iflen(sys.argv) >1else""
view=sys.argv[1] iflen(sys.argv) >1else""
try:
model, view, *fields=sys.argv[1:]
exceptValueError:
raiseException(
textwrap.dedent(
"""
Please provide: <model> <view> <field1> [<field2> ...]
"""
)
)
result=main_models(model, view, fields)
# an alternate implementation using dictionaries
# result = main_dictionaries(model, view, fields)
filename=f"{model}--{view}--{('-').join(fields)}"
withopen(filename, "w") asf:
f.write(result)
print(f"Look saved to '{filename}'")
if__name__=="__main__":
sys.exit(main())