Generic Repository with SQLAlchemy and Python






4.27/5 (5 votes)
Example of Generic Repository Class with SQLAlchemy and Python
Introduction
The Repository Pattern, as well as the Unit of Work Pattern, allows creating an abstraction layer between the data access layer and the business logic layer of an application. The purpose of creating this layer is to isolate the data access layer so that the changes we may operate cannot affect the business logic layer directly. Most of the time, generic repository classes are used, to avoid redundant codes.
Here, we will focus more to create a generic repository class for SQLAlchemy
using Python
and will use it in a FastAPI
project.
Generic Repository
Table
AppBaseModelOrm
base class for common property or columnsTaskQueue
Db table modelGroupQueue
Db table model
models.py:
import datetime from sqlalchemy import Boolean, Column, Integer, String, \ DateTime, PickleType, Enum as EnumType, JSON from sqlalchemy.dialects.postgresql import UUID from db.database import Base # common fields for all entities class AppBaseModelOrm: id = Column(Integer, primary_key=True, index=True, autoincrement=True) is_active = Column(Boolean, default=True) # soft delete created_by = Column(Integer) updated_by = Column(Integer, default=None) created_datetime = Column(DateTime(timezone=True), default=datetime.datetime.utcnow) updated_datetime = Column(DateTime(timezone=True), \ default=None, onupdate=datetime.datetime.utcnow) account_id = Column(Integer) # tables class TaskQueue(AppBaseModelOrm, Base): __tablename__ = "task_queues" name = Column(String, index=True) class GroupQueue(AppBaseModelOrm, Base): __tablename__ = "group_queues" name = Column(String, index=True)
Repository Class
TableRepository
is the base repo class which contains some basic operations/methods, like:
- Read data from a table
- Add/update/delete rows from/to a table
At the constructor level, the repository class expects:
db:Session
DB session objectentity:object
Table entity
table_repo.py:
from sqlalchemy import and_ from sqlalchemy.orm import Session from sqlalchemy.sql.expression import false from datetime import datetime class TableRepository: entity:object = NotImplementedError db:Session = NotImplementedError def __init__(self, db:Session, entity:object): self.db = db self.entity = entity def get_all(self): return self.db.query(self.entity) def get_by_id(self, id:int): return self.db.query(self.entity).filter(self.entity.id==id).one() def find_by_id(self, id:int): return self.db.query(self.entity).filter(self.entity.id==id).first() def get_actives(self): return self.db.query(self.entity).filter(self.entity.is_active==True) def get_by_account_id(self, account_id:int): return self.db.query(self.entity).filter(self.entity.account_id==account_id) def get_actives_by_account_id(self, account_id:int): return self.db.query(self.entity).filter\ (self.entity.is_active==True, self.entity.account_id==account_id) def get_by_create_datetime_range(self, from_datetime:datetime, to_datetime:datetime): data = self.db.query(self.entity).filter\ (self.entity.created_datetime >= from_datetime, \ self.entity.created_datetime <= to_datetime) return data def add(self, entity, created_by_user_id:int = None): entity.created_by = created_by_user_id self.db.add(entity) def update(self, entity, updated_by_user_id:int = None): entity.updated_by = updated_by_user_id def delete(self, entity, deleted_by_user_id:int = None): entity.is_active = False self.update(entity, updated_by_user_id=deleted_by_user_id) def permanent_delete(self, entity): self.db.delete(entity)
Using Repository Class
Here, we are using TableRepository
repo class without any inheritance, for TaskQueue
entity and read/write table task_queues
.
task_queue.py:
from typing import Optional, List from fastapi import FastAPI, Request, Depends from fastapi_utils.cbv import cbv from fastapi_utils.inferring_router import InferringRouter from sqlalchemy.orm import Session from app import schemas from app.depends.db_depend import get_db from app.depends.auth_depend import get_current_user, CurrentUser from db import models from db.table_repo import TableRepository router = InferringRouter() @cbv(router) class TaskQueue: db: Session = Depends(get_db) current_user:CurrentUser = Depends(get_current_user) @router.get("/", response_model=List[schemas.TaskQueueSchema]) def get_all(self): repo = TableRepository(self.db, models.TaskQueue) items = repo.get_all().all() return items @router.get("/actives", response_model=List[schemas.TaskQueueSchema]) def get_actives(self): repo = TableRepository(self.db, models.TaskQueue) items = repo.get_actives().all() return items @router.get("/account/{account_id}", response_model=List[schemas.TaskQueueSchema]) def get_by_account(self, account_id: int): repo = TableRepository(self.db, models.TaskQueue) items = repo.get_by_account_id(account_id).all() return items @router.get("/account/{account_id}/actives", \ response_model=List[schemas.TaskQueueSchema]) def get_actives_by_account(self, account_id: int): repo = TableRepository(self.db, models.TaskQueue) items = repo.get_actives_by_account_id(account_id).all() return items @router.get("/{id}", response_model=schemas.TaskQueueSchema) def get_by_id(self, id: int): repo = TableRepository(self.db, models.TaskQueue) item = repo.get_by_id(id) return item @router.get("/find/{id}", response_model=schemas.TaskQueueSchema) def find_by_id(self, id: int): '''can be null''' repo = TableRepository(self.db, models.TaskQueue) item = repo.find_by_id(id) return item @router.post("/", response_model=schemas.TaskQueueSchema) def post_item(self, model: schemas.TaskQueueCreate): item = models.TaskQueue(name=model.name, account_id=model.account_id) repo = TableRepository(self.db, models.TaskQueue) repo.add(item, self.current_user.id) self.db.commit() self.db.refresh(item) return item @router.put("/{id}", response_model=schemas.TaskQueueSchema) def put_item(self, id:int, model: schemas.TaskQueueUpdate): '''can be null''' repo = TableRepository(self.db, models.TaskQueue) item = repo.find_by_id(id) if item: item.name = model.name repo.update(item, self.current_user.id) self.db.commit() self.db.refresh(item) return item @router.delete("/{id}", response_model=schemas.TaskQueueSchema) def delete_item(self, id: int): '''can be null''' repo = TableRepository(self.db, models.TaskQueue) item = repo.find_by_id(id) if item: repo.delete(item, self.current_user.id) self.db.commit() self.db.refresh(item) return item @router.delete("/permanent/{id}", response_model=schemas.TaskQueueSchema) def permanent_delete_item(self, id: int): '''can be null''' repo = TableRepository(self.db, models.TaskQueue) item = repo.find_by_id(id) if item: repo.permanent_delete(item) self.db.commit() return item
Using Repository Class as Base
GroupQueueCrud
is inheriting the repo class TableRepository
for GroupQueue
entity and read/write table group_queues
:
Inheriting Repository Class
group_queue_crud.py:
from sqlalchemy.orm import Session from app import schemas from db import models from db.table_repo import TableRepository class GroupQueueCrud(TableRepository): def __init__(self, db:Session): super().__init__(db=db, entity=models.GroupQueue)
Using the CRUD Class
group_queue.py:
from datetime import datetime from typing import Optional, List from fastapi import FastAPI, Request, Depends, Query from fastapi_utils.cbv import cbv from fastapi_utils.inferring_router import InferringRouter from sqlalchemy.orm import Session from app import schemas from app.depends.db_depend import get_db from app.depends.auth_depend import get_current_user, CurrentUser from app.cruds.group_queue_crud import GroupQueueCrud from db import models router = InferringRouter() @cbv(router) class GroupQueue: db: Session = Depends(get_db) current_user:CurrentUser = Depends(get_current_user) @router.get("/{id}", response_model=schemas.GroupQueueSchema) def get_by_id(self, id: int): repo = GroupQueueCrud(self.db) item = repo.find_by_id(id) return item @router.post("/", response_model=schemas.GroupQueueSchema) def post_item(self, model: schemas.GroupQueueCreate): item = models.GroupQueue(name=model.name, account_id=model.account_id) repo = GroupQueueCrud(self.db) repo.add(item, self.current_user.id) self.db.commit() self.db.refresh(item) return item @router.put("/{id}", response_model=schemas.GroupQueueSchema) def put_item(self, id:int, model: schemas.GroupQueueUpdate): '''can be null''' repo = GroupQueueCrud(self.db) item = repo.find_by_id(id) if item: item.name = model.name repo.update(item, self.current_user.id) self.db.commit() self.db.refresh(item) return item @router.delete("/{id}", response_model=schemas.GroupQueueSchema) def delete_item(self, id: int): '''can be null''' repo = GroupQueueCrud(self.db) item = repo.find_by_id(id) if item: repo.delete(item, self.current_user.id) self.db.commit() self.db.refresh(item) return item
Using the Code
Go to backend folder Open cmd Type docker-compose up -d \backend> docker-compose up -d project will run http://localhost:4003 Go to Api Doc http://localhost:4003/docs#/
Check Task Queue and Group Queue sections:
References
History
- 5th July, 2022: Initial version