- Notifications
You must be signed in to change notification settings - Fork 31.7k
/
Copy path__main__.py
131 lines (110 loc) · 3.86 KB
/
__main__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
"""A simple SQLite CLI for the sqlite3 module.
Apart from using 'argparse' for the command-line interface,
this module implements the REPL as a thin wrapper around
the InteractiveConsole class from the 'code' stdlib module.
"""
importsqlite3
importsys
fromargparseimportArgumentParser
fromcodeimportInteractiveConsole
fromtextwrapimportdedent
defexecute(c, sql, suppress_errors=True):
"""Helper that wraps execution of SQL code.
This is used both by the REPL and by direct execution from the CLI.
'c' may be a cursor or a connection.
'sql' is the SQL string to execute.
"""
try:
forrowinc.execute(sql):
print(row)
exceptsqlite3.Errorase:
tp=type(e).__name__
try:
print(f"{tp} ({e.sqlite_errorname}): {e}", file=sys.stderr)
exceptAttributeError:
print(f"{tp}: {e}", file=sys.stderr)
ifnotsuppress_errors:
sys.exit(1)
classSqliteInteractiveConsole(InteractiveConsole):
"""A simple SQLite REPL."""
def__init__(self, connection):
super().__init__()
self._con=connection
self._cur=connection.cursor()
defrunsource(self, source, filename="<input>", symbol="single"):
"""Override runsource, the core of the InteractiveConsole REPL.
Return True if more input is needed; buffering is done automatically.
Return False is input is a complete statement ready for execution.
"""
matchsource:
case".version":
print(f"{sqlite3.sqlite_version}")
case".help":
print("Enter SQL code and press enter.")
case".quit":
sys.exit(0)
case _:
ifnotsqlite3.complete_statement(source):
returnTrue
execute(self._cur, source)
returnFalse
defmain(*args):
parser=ArgumentParser(
description="Python sqlite3 CLI",
prog="python -m sqlite3",
)
parser.add_argument(
"filename", type=str, default=":memory:", nargs="?",
help=(
"SQLite database to open (defaults to ':memory:'). "
"A new database is created if the file does not previously exist."
),
)
parser.add_argument(
"sql", type=str, nargs="?",
help=(
"An SQL query to execute. "
"Any returned rows are printed to stdout."
),
)
parser.add_argument(
"-v", "--version", action="version",
version=f"SQLite version {sqlite3.sqlite_version}",
help="Print underlying SQLite library version",
)
args=parser.parse_args(*args)
ifargs.filename==":memory:":
db_name="a transient in-memory database"
else:
db_name=repr(args.filename)
# Prepare REPL banner and prompts.
ifsys.platform=="win32"and"idlelib.run"notinsys.modules:
eofkey="CTRL-Z"
else:
eofkey="CTRL-D"
banner=dedent(f"""
sqlite3 shell, running on SQLite version {sqlite3.sqlite_version}
Connected to {db_name}
Each command will be run using execute() on the cursor.
Type ".help" for more information; type ".quit" or {eofkey} to quit.
""").strip()
sys.ps1="sqlite> "
sys.ps2=" ... "
con=sqlite3.connect(args.filename, isolation_level=None)
try:
ifargs.sql:
# SQL statement provided on the command-line; execute it directly.
execute(con, args.sql, suppress_errors=False)
else:
# No SQL provided; start the REPL.
console=SqliteInteractiveConsole(con)
try:
importreadline
exceptImportError:
pass
console.interact(banner, exitmsg="")
finally:
con.close()
sys.exit(0)
if__name__=="__main__":
main(sys.argv[1:])