- Notifications
You must be signed in to change notification settings - Fork 31.7k
/
Copy pathdump.py
113 lines (99 loc) · 4.14 KB
/
dump.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# Mimic the sqlite3 console shell's .dump command
# Author: Paul Kippes <kippesp@gmail.com>
# Every identifier in sql is quoted based on a comment in sqlite
# documentation "SQLite adds new keywords from time to time when it
# takes on new features. So to prevent your code from being broken by
# future enhancements, you should normally quote any identifier that
# is an English language word, even if you do not have to."
def_quote_name(name):
return'"{0}"'.format(name.replace('"', '""'))
def_quote_value(value):
return"'{0}'".format(value.replace("'", "''"))
def_iterdump(connection, *, filter=None):
"""
Returns an iterator to the dump of the database in an SQL text format.
Used to produce an SQL dump of the database. Useful to save an in-memory
database for later restoration. This function should not be called
directly but instead called from the Connection method, iterdump().
"""
writeable_schema=False
cu=connection.cursor()
cu.row_factory=None# Make sure we get predictable results.
# Disable foreign key constraints, if there is any foreign key violation.
violations=cu.execute("PRAGMA foreign_key_check").fetchall()
ifviolations:
yield('PRAGMA foreign_keys=OFF;')
yield('BEGIN TRANSACTION;')
iffilter:
# Return database objects which match the filter pattern.
filter_name_clause='AND "name" LIKE ?'
params= [filter]
else:
filter_name_clause=""
params= []
# sqlite_master table contains the SQL CREATE statements for the database.
q=f"""
SELECT "name", "type", "sql"
FROM "sqlite_master"
WHERE "sql" NOT NULL AND
"type" == 'table'
{filter_name_clause}
ORDER BY "name"
"""
schema_res=cu.execute(q, params)
sqlite_sequence= []
fortable_name, type, sqlinschema_res.fetchall():
iftable_name=='sqlite_sequence':
rows=cu.execute('SELECT * FROM "sqlite_sequence";')
sqlite_sequence= ['DELETE FROM "sqlite_sequence"']
sqlite_sequence+= [
f'INSERT INTO "sqlite_sequence" VALUES({_quote_value(table_name)},{seq_value})'
fortable_name, seq_valueinrows.fetchall()
]
continue
eliftable_name=='sqlite_stat1':
yield('ANALYZE "sqlite_master";')
eliftable_name.startswith('sqlite_'):
continue
elifsql.startswith('CREATE VIRTUAL TABLE'):
ifnotwriteable_schema:
writeable_schema=True
yield('PRAGMA writable_schema=ON;')
yield("INSERT INTO sqlite_master(type,name,tbl_name,rootpage,sql)"
"VALUES('table',{0},{0},0,{1});".format(
_quote_value(table_name),
_quote_value(sql),
))
else:
yield('{0};'.format(sql))
# Build the insert statement for each row of the current table
table_name_ident=_quote_name(table_name)
res=cu.execute(f'PRAGMA table_info({table_name_ident})')
column_names= [str(table_info[1]) fortable_infoinres.fetchall()]
q="SELECT 'INSERT INTO {0} VALUES('{1}')' FROM {0};".format(
table_name_ident,
"','".join(
"||quote({0})||".format(_quote_name(col)) forcolincolumn_names
)
)
query_res=cu.execute(q)
forrowinquery_res:
yield("{0};".format(row[0]))
# Now when the type is 'index', 'trigger', or 'view'
q=f"""
SELECT "name", "type", "sql"
FROM "sqlite_master"
WHERE "sql" NOT NULL AND
"type" IN ('index', 'trigger', 'view')
{filter_name_clause}
"""
schema_res=cu.execute(q, params)
forname, type, sqlinschema_res.fetchall():
yield('{0};'.format(sql))
ifwriteable_schema:
yield('PRAGMA writable_schema=OFF;')
# gh-79009: Yield statements concerning the sqlite_sequence table at the
# end of the transaction.
forrowinsqlite_sequence:
yield('{0};'.format(row))
yield('COMMIT;')