Motivated by the first commentor's suggestion on my MSAccess question: Python SQL insert to MSAccess with VBScript
I am moving to use SQLite databases for a python application and I created this query builder class to help implement the SQL logic. One constraint is having to use Python 3.6 so that is why I used .format()
instead of the more modern F-string for string formatting. I'm looking for an overall review of my code quality, etc. Any suggested improvements to the code...
import sqlite3 from typing import Union, List, Optional import logging class QueryBuilder: ''' Query Builder for SQLite databases Usage: query_builder = QueryBuilder("database.db") query_builder.insert("users", {"username": "John", "password": "johnspassword"}) query_builder.update("users", "password = ?", "username = ?") query_builder.delete("users", {"username": "John"}) query_builder.select("users", ["username", "password"], "username = ?") ''' def __init__(self, db_name: str, log_file_path: str = 'querybuilder.log'): ''' Establish database connection and cursor creation and configure logging. Arguments: - db_name: The name of the SQLite database Usage: query_builder = QueryBuilder("database.db") ''' logging.basicConfig(filename=log_file_path, level=logging.INFO, format='%(asctime)s:%(levelname)s:%(message)s') self.conn = sqlite3.connect(db_name, isolation_level=None) self.cursor = self.conn.cursor() def __execute_query(self, query: str, params: Union[tuple, list] = None): ''' Execute SQL query Arguments: - query: SQL query as a string - params: Parameters for the SQL query ''' try: if params: self.cursor.execute(query, params) else: self.cursor.execute(query) return self.cursor.fetchall() except sqlite3.Error as error: logging.error("Error while executing SQLite Script: {}".format(error)) raise def select(self, table: str, fields: List[str] = ['*'], where: Optional[str] = None, join: Optional[str] = None, group_by: Optional[str] = None, order_by: Optional[str] = None): ''' Select records from a table with various clauses Arguments: - table: Table to select from - fields: List of fields to be selected - where: WHERE clause - join: JOIN clause - group_by: GROUP BY clause - order_by: ORDER BY clause Usage: result = query_builder.select("users", ["username", "password"], "username = ?", "LEFT JOIN orders ON users.id = orders.user_id", "username", "orders.id DESC") ''' selected_fields = ', '.join(fields) base_query = "SELECT {} FROM {}".format(selected_fields, table) if join: base_query += " {}".format(join) if where: base_query += " WHERE {}".format(where) if group_by: base_query += " GROUP BY {}".format(group_by) if order_by: base_query += " ORDER BY {}".format(order_by) return self.__execute_query(base_query) def insert(self, table: str, data: dict): ''' Insert a new record into a table Arguments: - table: Table to insert into - data: Dictionary of columns and values to insert Usage: query_builder.insert("users", {"username": "John", "password": "johnspassword"}) This will execute: INSERT INTO users (username,password) VALUES (?,?) with parameters ('John', 'johnspassword') ''' fields = ', '.join(data.keys()) placeholder = ', '.join('?' * len(data)) query = "INSERT INTO {} ({}) VALUES ({})".format(table, fields, placeholder) return self.__execute_query(query, list(data.values())) def update(self, table: str, data: Optional[dict] = None, where: Optional[dict] = None): ''' Update an existing record in a table Arguments: - table: Table to update - data: Dictionary of new data for the record - where: Dictionary for WHERE clause to select the record Usage: query_builder.update("users", {"password": "new_password"}, {"username": "John"}) This will execute: UPDATE users SET password = ? WHERE username = ? with parameters ('new_password', 'John') ''' if data: set_query = ', '.join(["{} = ?".format(k) for k in data.keys()]) query = "UPDATE {} SET {}".format(table, set_query) else: print("Error: No data provided to update.") return if where: where_query = ' AND '.join(["{} = ?".format(k) for k in where.keys()]) query += " WHERE {}".format(where_query) params = tuple(list(data.values()) + list(where.values() if where else [])) return self.__execute_query(query, params) def delete(self, table: str, where: Optional[dict] = None): ''' Delete an existing record from a table Arguments: - table: Table to delete from - where: WHERE clause to select the record Usage: query_builder.delete("users", {"username": "John"}) This will execute: DELETE FROM users WHERE username = ? with parameter ('John') ''' query = "DELETE FROM {}".format(table) if where is not None: condition = ' AND '.join(["{} = ?".format(key) for key in where.keys()]) query += " WHERE {}".format(condition) params = tuple(where.values()) self.__execute_query(query, params) return self.__execute_query(query, params) def close_connection(self): ''' Close the connection to the database Usage: query_builder.close_connection() ''' self.conn.close() def __del__(self): ''' Class destructor. Closes the connection to the database ''' self.conn.close()