- Notifications
You must be signed in to change notification settings - Fork 196
/
Copy pathdownload_dashboard_csv.py
115 lines (84 loc) · 3.28 KB
/
download_dashboard_csv.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
""" Given a dashboard id, download each of its tiles as csv files
$ python download_dashboard_csv.py <dashboard_id>
Note: dashboard_id is required
Example:
$ python download_dashboard_csv.py 1234
"""
importos
importtime
importsys
importtextwrap
fromtypingimportSequence
fromlooker_sdk.sdk.api40.modelsimport (
DashboardElement,
WriteCreateQueryTask,
ResultFormat,
)
importlooker_sdk
sdk=looker_sdk.init40("../../looker.ini")
defmain():
dashboard_id=sys.argv[1] iflen(sys.argv) >1else""
ifnotdashboard_id:
raiseException(
textwrap.dedent(
"""
Please provide: <dashboard_id>"""
)
)
download_dashboard_csv(dashboard_id)
defget_all_dashboard_elements(dashboard_id: str) ->Sequence[DashboardElement]:
"""Get all dashboard elements by dashboard id."""
elements=sdk.dashboard_dashboard_elements(dashboard_id, fields="query")
iflen(elements) ==0:
raiseException(f'Dashboard "{dashboard_id}" not found')
returnelements
defsave_data_to_csv(data: str, folder: str, filename: str):
withopen(os.path.join(folder, f"{filename}.csv"), "w") asf:
f.write(data)
defcreate_folder(folder_name):
ifnotos.path.exists(folder_name):
os.mkdir(folder_name)
returnos.path.join(os.getcwd(), folder_name)
defdownload_dashboard_csv(dashboard_id: str):
elements=sdk.dashboard_dashboard_elements(dashboard_id, fields="query")
query_tasks_ids= []
# Create Async Query Task for each element of the dashboard
forelementinelements:
ifnotelement.queryornotelement.query.id:
continue
query_id=element.query.id
body=WriteCreateQueryTask(
query_id=query_id, result_format=ResultFormat("csv")
)
query_task=sdk.create_query_task(
body=body,
)
ifquery_task.id:
query_tasks_ids.append(query_task.id)
completed_async_tasks= []
create_folder(dashboard_id)
# Do a polling for completed Query Tasks
# Any Query Task that completes is downloaded and saved to CSV
# Polling will be triggered every second. Will stop after 1 min
retries=60
while (len(query_tasks_ids) >0orlen(completed_async_tasks) >0) andretries>0:
# Get status for each task_id and move to completed list if "complete"
fortask_idinquery_tasks_ids.copy():
query_task_info=sdk.query_task(task_id)
status=query_task_info.status
ifstatus=="complete":
completed_async_tasks.append(task_id)
query_tasks_ids.remove(task_id)
elifstatus=="error":
print(f"Query Task {task_id} failed with an error")
return
# Download results for all completed pending tasks and write to csv per task
forcompleted_task_idincompleted_async_tasks.copy():
results=sdk.query_task_results(completed_task_id)
save_data_to_csv(results, dashboard_id, completed_task_id)
completed_async_tasks.remove(completed_task_id)
iflen(query_tasks_ids) >0orlen(completed_async_tasks) >0:
retries-=1
time.sleep(1)
print("Process complete")
main()