2

This is just an example of an (still incomplete) real-world project written in Rust using a clean architecture: https://github.com/frederikhors/rust-clean-architecture-with-db-transactions.

Goals

My intent is to have an app build in 4 layers:

  • entities:

    • some call this layer "domain", not important for now, just the minimum
  • services:

    • some call this layer "use cases", this is where business logic lives (just CRUD methods for now)
  • repositories:

    • some call this layer "adapters", this is where concrete implementation of DB/cache/mail drivers lives
  • ports:

    • some call this layer "controllers or presenters", still not present and not important for now, I'm using main.rs for this

Reproduction

https://codesandbox.io/p/github/frederikhors/rust-clean-architecture-with-db-transactions/main

The issue

This issue is about the usage of a DB transaction in a service (of the same bounded context):

async fn execute(&self, input: &PlayerInput) -> Result<Player, String> { let player = self .deps .commands_repo .player_update(input, &|args| { Box::pin(async { // I want to verify if there is any place for my player before updating it by using a method like the below // but I wanna check this in a DB transaction // I cannot pass transaction using lambda function because in the service layer I don't want to specify which DB I'm using and which crate // So one way to do this is by passing the team in the lambda args in `PlayerUpdateLambdaArgs`. // The `team` is queried using the DB transaction on the repository level // but as you can imagine this is a mess: I'm writing code here and there, back and forth let team = self .deps .queries_repo .team_by_id(&input.team_id) .await .unwrap(); if let Some(team) = team { if team.missing_players == 0 { return Err("no place for your player!".to_string()); } } let obj = Player { id: args.actual.id, name: input.name.to_owned(), team_id: input.team_id.to_owned(), }; Ok(obj) }) }) .await?; Ok(player) } 

As you can see I'm using a lambda function with a struct as argument because this is the only way I can fetch in the repository level the objects I need on the business logic level.

But as you can imagine the code is not linear and I have to go back & forth.

I think I should have something (but I don't know what) on the service layer to start (and commit/rollback) a DB transaction from there: but - as properly established by the rules of Clean architecture - the service layer cannot know the implementation details of the underlying levels (repositories).

I would like to use in my services something like (pseudo code):

// Start a new DB transaction now to use with the below methods let transaction = [DONT_KNOW_HOW_PLEASE_START_A_NEW_DB_TRANSACTION](); let team = self.repo.team_by_id(transaction, team_id).await?; if !team.has_free_places() { return }; let mut player = self.repo.player_by_id(transaction, player_id).await?; player.team_id = team.id; let player = self.repo.player_update(player).await?; Ok(player) 

Is there a way to fix this?

Maybe yes and there is a project I found searching about this, but the code is too complex for me to completely understand how to do this in my project and if there is something better or even if I'm wrong and why.

The (maybe) interesting code is here: https://github.com/dpc/sniper/blob/master/src/persistence.rs.

Another way I found to fix this is using state machines. I created a dedicated branch with one state machine usage for the player_create method. like this:

// in the repository pub struct PlayerCreate<'a> { tx: sqlx::Transaction<'a, sqlx::Postgres>, pub input: &'a PlayerInput, } #[async_trait::async_trait] impl<'a> PlayerCreateTrait for PlayerCreate<'a> { async fn check_for_team_free_spaces(&mut self, team_id: &str) -> Result<bool, String> { let team = self::Repo::team_by_id_using_tx(&mut self.tx, team_id).await?; Ok(team.missing_players > 0) } async fn commit(mut self, _player: &Player) -> Result<Player, String> { // update the player here let saved_player = Player { ..Default::default() }; self.tx.commit().await.unwrap(); Ok(saved_player) } } #[async_trait::async_trait] impl commands::RepoPlayer for Repo { type PlayerCreate<'a> = PlayerCreate<'a>; async fn player_create_start<'a>( &self, input: &'a PlayerInput, ) -> Result<PlayerCreate<'a>, String> { let tx = self.pool.begin().await.unwrap(); Ok(PlayerCreate { tx, input }) } } // in the service async fn execute(&self, input: &PlayerInput) -> Result<Player, String> { let mut state_machine = self.deps.commands_repo.player_create_start(input).await?; if !(state_machine.check_for_team_free_spaces(&input.team_id)).await? { return Err("no free space available for this team".to_string()); } let obj = Player { id: "new_id".to_string(), name: input.name.to_owned(), team_id: input.team_id.to_owned(), }; let res = state_machine.commit(&obj).await?; Ok(res) } 

But there are two big cons to this:

  • a lot of code to write (also very repetitive);

  • the same concepts must be used and repeated both in the repository layer and in the service layer or in any case the synthesis work to be done is not profitable for business logic but only for finding an intelligent way to avoid repeating code;

  • you have to write repository methods which are very similar with the only difference that some take a db transaction as an argument and the other doesn't.

Alternative ways

Bloom legacy

I found the post: https://kerkour.com/rust-web-application-clean-architecture with the code here: https://github.com/skerkour/bloom-legacy.

I really like this code except for:

  1. the service layer knows about repository implementation details

  2. (and for this reason) it is impossible to change at runtime (or just to mock during tests) the DB driver.

I opened the issue: https://github.com/skerkour/bloom-legacy/issues/70 and I'm waiting for the author.

Questions

  1. Can you help me fix this issue?

  2. Can you suggest an alternative way?

14
  • 1
    @RikD Taking the example, how would you test that if two players are added to the team at once, only one gets added? Lots of properties are pretty heavily tied into the persistence layer.CommentedApr 1, 2023 at 2:46
  • 1
    An Aggregate is a consistency boundary, for example Team. Within team there is a collection of players. When AddPlayer is invoked on Team, check if it’s allowed. Team can be tested without database. The TeamRepository returns a team, fully loaded with players.
    – Rik D
    CommentedApr 1, 2023 at 8:34
  • 1
    The application service is responsible for handling a command, by fetching the appropriate aggregate from the repository, then invoke a method on the aggregate and finally save the changes.
    – Rik D
    CommentedApr 1, 2023 at 8:37
  • 1
    @RikD so are you suggesting that the answer is have a a single global repository? Or one respository per combination of datasets needed? How does that scale to more complex apps?CommentedApr 1, 2023 at 14:21
  • 1
    @rikd if you just have addplayer check of it's allowed, there's a possibility for RMW interleaving to allow 2 players to be added to a team. Something has to ensure that when writing back to the persistence layer we still have a view consistent with when we read it.CommentedApr 1, 2023 at 14:25

1 Answer 1

3

Using Repositories is often motivated by abstracting away storage-specific details, but transactions depend a lot on those underlying details and also on your consistency requirements. This can be difficult. This can be particularly difficult in Rust, where features like the borrow checker sometimes prevent an architecture from composing in the same way as it would compose in another language. Sometimes, abstractions are more trouble than they are worth.

When implementing transactions, there are two general strategies. The transaction can immediately apply changes, but try to roll them back in case of failure. Or, the transaction can buffer all proposed changes, and only apply them once the transaction is committed. In either case, the transaction will track information about the proposed changes. In some cases, this can be delegated to the underlying database.

In Rust, a potential strategy for representing transactional changes is to implement each repository trait twice (once for direct changes, once for transactional changes), or to force all usage of the repository object through a transaction.

The safest way to provide an object (like a transaction) for a limited time is to require the code using this transaction to live within a callback. Here, the borrow checker can prevent that transaction from escaping the callback. When the callback returns an error, rollback could be triggered automatically. Roughly:

fn apply_transaction<F, R, E>(&self, callback: F) -> Result<R, E> where F: FnOnce(&mut dyn Repository) -> Result<R, E>, { let mut transactional_repo = ...; let result = callback(&mut transactional_repo); match &result { Ok(_) => transactional_repo.commit(), Err(_) => transactional_repo.rollback(), } result } context.apply_transaction(|repo| { let mut entity = repo.get(); if !entity.condition() { return Err(...); } entity.change()?; repo.save(entity); Ok(()) })? 

An interesting consequence of this pattern is that, depending on the underlying guarantees of the transaction, it would be possible to automatically re-try the transaction until it succeeds. Such a retry-loop is appropriate in particular when the underlying data source doesn't support transactions, but does support atomic CAS operations of the form “here's the data that I read during the transaction. If this is still up to date, apply the following changes”.

Alternatively, we could give the using code ownership of a transactional repository, and require the user to explicitly commit or rollback. We can ensure that at most one of these options is chosen by making both methods consume self by value. However, Rust doesn't have "linear types" and cannot force the user to make exactly one of these choices. Thus, one of them should be provided by the transactional-repository's destructor (drop() method). Since early returns in a function typically relate to errors, it tends to be safer if the default action in the destructor is to roll back the transaction, and require an explicit commit() to be invoked if that is not desired. Then:

let mut repo = ...; let mut entity = repo.get(); if !entity.condition() { // will automatically drop() the repo return Err(...); } entity.change()?; repo.save(entity); Ok(()) 

This still leaves open the question of how to actually implement that transaction. As mentioned, in many cases you can delegate this to the underlying database. Then, the transaction object might internally contain a database transaction:

struct TransactionalRepo { inner: PostgresTransaction, } impl TransactionalRepo { fn new(db: &mut ConnectionPool) -> Result<Self> { let inner = db.new_transaction()?; Ok(Self { inner }) } fn get(&mut self) -> Entity { ... } fn save(&mut self, entity: Entity) { ... } fn commit(self) { ... } } impl Drop for TransactionalRepo { fn drop(&mut self) { ... // roll back, or panic on failure } } 

In practice, if your database connector library already provides a dedicated Transaction object, it is likely that it will already provide exactly this explicit-commit()/rollback-via-drop() behaviour. Then, you wouldn't need an explicit Drop implementation, and your commit() could be implemented as:

impl TransactionalRepo { ... fn commit(self) { self.inner.commit(); } } 

If you have multiple repositories you may want to perform cross-repository transactions. This is generally impossible. However, if you guarantee that all involved repositories use the same underlying data source, you could make the repositories share a transaction. Compare also the "unit of work" pattern. In Rust, such sharing would likely require an Arc<Transaction> or Arc<Mutex<Transaction>>. Making sure that the underlying database transaction is only committed or rolled back once is easier though when neither repository owns the database transaction. This is most easily achieveable in the callback-based design. It might be adapted to multiple repositories as follows:

fn unit_of_work_a_b<F, R, E>(&self, pool: &mut DbPool, callback: F) -> Result<R, E> where F: FnOnce(RepoA, RepoB) -> Result<R, E>, { let tx = Mutex::new(pool.new_transaction()?); let result = callback(RepoA::new(&tx), RepoB::new(&tx)); let tx = tx.into_inner().unwrap(); match &result { Ok(_) => tx.commit(), Err(_) => tx.rollback(), } result } unit_of_work_a_b(|repo_a, repo_b| { ... // business logic here })? struct RepoA<'a> { tx: &'a Mutex<Transaction>, } impl<'a> RepoA<'a> { fn new(tx: &'a Mutex<Transaction>) -> Self { Self { tx } } fn get(&self) -> EntityA { self.tx.lock().query(...) } } 

Depending on the guarantees of the database transaction, a mutex may or may not be necessary.

    Start asking to get answers

    Find the answer to your question by asking.

    Ask question

    Explore related questions

    See similar questions with these tags.