65.9K
CodeProject is changing. Read more.
Home

Generic Repository with SQLAlchemy and Python

Jul 5, 2022

CPOL

1 min read

viewsIcon

14471

downloadIcon

139

Example of Generic Repository Class with SQLAlchemy and Python

Introduction

The Repository Pattern, as well as the Unit of Work Pattern, allows creating an abstraction layer between the data access layer and the business logic layer of an application. The purpose of creating this layer is to isolate the data access layer so that the changes we may operate cannot affect the business logic layer directly. Most of the time, generic repository classes are used, to avoid redundant codes.

Here, we will focus more to create a generic repository class for SQLAlchemy using Python and will use it in a FastAPI project.

Generic Repository

Table

  • AppBaseModelOrm base class for common property or columns
  • TaskQueue Db table model
  • GroupQueue Db table model

models.py:

 import datetime from sqlalchemy import Boolean, Column, Integer, String, \ DateTime, PickleType, Enum as EnumType, JSON from sqlalchemy.dialects.postgresql import UUID from db.database import Base # common fields for all entities class AppBaseModelOrm: id = Column(Integer, primary_key=True, index=True, autoincrement=True) is_active = Column(Boolean, default=True) # soft delete created_by = Column(Integer) updated_by = Column(Integer, default=None) created_datetime = Column(DateTime(timezone=True), default=datetime.datetime.utcnow) updated_datetime = Column(DateTime(timezone=True), \ default=None, onupdate=datetime.datetime.utcnow) account_id = Column(Integer) # tables class TaskQueue(AppBaseModelOrm, Base): __tablename__ = "task_queues" name = Column(String, index=True) class GroupQueue(AppBaseModelOrm, Base): __tablename__ = "group_queues" name = Column(String, index=True)

Repository Class

TableRepository is the base repo class which contains some basic operations/methods, like:

  • Read data from a table
  • Add/update/delete rows from/to a table

At the constructor level, the repository class expects:

  • db:Session DB session object
  • entity:object Table entity

table_repo.py:

 from sqlalchemy import and_ from sqlalchemy.orm import Session from sqlalchemy.sql.expression import false from datetime import datetime class TableRepository: entity:object = NotImplementedError db:Session = NotImplementedError def __init__(self, db:Session, entity:object): self.db = db self.entity = entity def get_all(self): return self.db.query(self.entity) def get_by_id(self, id:int): return self.db.query(self.entity).filter(self.entity.id==id).one() def find_by_id(self, id:int): return self.db.query(self.entity).filter(self.entity.id==id).first() def get_actives(self): return self.db.query(self.entity).filter(self.entity.is_active==True) def get_by_account_id(self, account_id:int): return self.db.query(self.entity).filter(self.entity.account_id==account_id) def get_actives_by_account_id(self, account_id:int): return self.db.query(self.entity).filter\ (self.entity.is_active==True, self.entity.account_id==account_id) def get_by_create_datetime_range(self, from_datetime:datetime, to_datetime:datetime): data = self.db.query(self.entity).filter\ (self.entity.created_datetime >= from_datetime, \ self.entity.created_datetime <= to_datetime) return data def add(self, entity, created_by_user_id:int = None): entity.created_by = created_by_user_id self.db.add(entity) def update(self, entity, updated_by_user_id:int = None): entity.updated_by = updated_by_user_id def delete(self, entity, deleted_by_user_id:int = None): entity.is_active = False self.update(entity, updated_by_user_id=deleted_by_user_id) def permanent_delete(self, entity): self.db.delete(entity) 

Using Repository Class

Here, we are using TableRepository repo class without any inheritance, for TaskQueue entity and read/write table task_queues.

task_queue.py:

 from typing import Optional, List from fastapi import FastAPI, Request, Depends from fastapi_utils.cbv import cbv from fastapi_utils.inferring_router import InferringRouter from sqlalchemy.orm import Session from app import schemas from app.depends.db_depend import get_db from app.depends.auth_depend import get_current_user, CurrentUser from db import models from db.table_repo import TableRepository router = InferringRouter() @cbv(router) class TaskQueue: db: Session = Depends(get_db) current_user:CurrentUser = Depends(get_current_user) @router.get("/", response_model=List[schemas.TaskQueueSchema]) def get_all(self): repo = TableRepository(self.db, models.TaskQueue) items = repo.get_all().all() return items @router.get("/actives", response_model=List[schemas.TaskQueueSchema]) def get_actives(self): repo = TableRepository(self.db, models.TaskQueue) items = repo.get_actives().all() return items @router.get("/account/{account_id}", response_model=List[schemas.TaskQueueSchema]) def get_by_account(self, account_id: int): repo = TableRepository(self.db, models.TaskQueue) items = repo.get_by_account_id(account_id).all() return items @router.get("/account/{account_id}/actives", \ response_model=List[schemas.TaskQueueSchema]) def get_actives_by_account(self, account_id: int): repo = TableRepository(self.db, models.TaskQueue) items = repo.get_actives_by_account_id(account_id).all() return items @router.get("/{id}", response_model=schemas.TaskQueueSchema) def get_by_id(self, id: int): repo = TableRepository(self.db, models.TaskQueue) item = repo.get_by_id(id) return item @router.get("/find/{id}", response_model=schemas.TaskQueueSchema) def find_by_id(self, id: int): '''can be null''' repo = TableRepository(self.db, models.TaskQueue) item = repo.find_by_id(id) return item @router.post("/", response_model=schemas.TaskQueueSchema) def post_item(self, model: schemas.TaskQueueCreate): item = models.TaskQueue(name=model.name, account_id=model.account_id) repo = TableRepository(self.db, models.TaskQueue) repo.add(item, self.current_user.id) self.db.commit() self.db.refresh(item) return item @router.put("/{id}", response_model=schemas.TaskQueueSchema) def put_item(self, id:int, model: schemas.TaskQueueUpdate): '''can be null''' repo = TableRepository(self.db, models.TaskQueue) item = repo.find_by_id(id) if item: item.name = model.name repo.update(item, self.current_user.id) self.db.commit() self.db.refresh(item) return item @router.delete("/{id}", response_model=schemas.TaskQueueSchema) def delete_item(self, id: int): '''can be null''' repo = TableRepository(self.db, models.TaskQueue) item = repo.find_by_id(id) if item: repo.delete(item, self.current_user.id) self.db.commit() self.db.refresh(item) return item @router.delete("/permanent/{id}", response_model=schemas.TaskQueueSchema) def permanent_delete_item(self, id: int): '''can be null''' repo = TableRepository(self.db, models.TaskQueue) item = repo.find_by_id(id) if item: repo.permanent_delete(item) self.db.commit() return item

Using Repository Class as Base

GroupQueueCrud is inheriting the repo class TableRepository for GroupQueue entity and read/write table group_queues:

Inheriting Repository Class

group_queue_crud.py:

 from sqlalchemy.orm import Session from app import schemas from db import models from db.table_repo import TableRepository class GroupQueueCrud(TableRepository): def __init__(self, db:Session): super().__init__(db=db, entity=models.GroupQueue)

Using the CRUD Class

group_queue.py:

 from datetime import datetime from typing import Optional, List from fastapi import FastAPI, Request, Depends, Query from fastapi_utils.cbv import cbv from fastapi_utils.inferring_router import InferringRouter from sqlalchemy.orm import Session from app import schemas from app.depends.db_depend import get_db from app.depends.auth_depend import get_current_user, CurrentUser from app.cruds.group_queue_crud import GroupQueueCrud from db import models router = InferringRouter() @cbv(router) class GroupQueue: db: Session = Depends(get_db) current_user:CurrentUser = Depends(get_current_user) @router.get("/{id}", response_model=schemas.GroupQueueSchema) def get_by_id(self, id: int): repo = GroupQueueCrud(self.db) item = repo.find_by_id(id) return item @router.post("/", response_model=schemas.GroupQueueSchema) def post_item(self, model: schemas.GroupQueueCreate): item = models.GroupQueue(name=model.name, account_id=model.account_id) repo = GroupQueueCrud(self.db) repo.add(item, self.current_user.id) self.db.commit() self.db.refresh(item) return item @router.put("/{id}", response_model=schemas.GroupQueueSchema) def put_item(self, id:int, model: schemas.GroupQueueUpdate): '''can be null''' repo = GroupQueueCrud(self.db) item = repo.find_by_id(id) if item: item.name = model.name repo.update(item, self.current_user.id) self.db.commit() self.db.refresh(item) return item @router.delete("/{id}", response_model=schemas.GroupQueueSchema) def delete_item(self, id: int): '''can be null''' repo = GroupQueueCrud(self.db) item = repo.find_by_id(id) if item: repo.delete(item, self.current_user.id) self.db.commit() self.db.refresh(item) return item 

Using the Code

 Go to backend folder Open cmd Type docker-compose up -d \backend> docker-compose up -d project will run http://localhost:4003 Go to Api Doc http://localhost:4003/docs#/

Check Task Queue and Group Queue sections:

References

History

  • 5th July, 2022: Initial version
close