Get the source code for this video for FREE → the-dotnet-weekly.ck.page/outbox Want to master Clean Architecture? Go here: bit.ly/3PupkOJ Want to unlock Modular Monoliths? Go here: bit.ly/3SXlzSt
The greatest implementation of an outbox I ever seen. Great merge of knowledge about DDD and communication in distributed systems. Thanks to your videos I propose a similar solution to my team to secure consistency of transaction between updating a database and producing an event to a message queue. Glad you are on the YT. Thank you
Note that this isn't production ready, Radek. You need to add proper error handling, possibly retries. With retries, you need to think about idempotency of event handlers and so on. In any case, I'm glad you liked it. I hope it serves your team and you well 😁
@@MilanJovanovicTech Thank you for advice. I bear it in my mind. You have talked about retries in the video, does it cover the things that you mentioned above?: ruclips.net/video/xajVttkZntU/видео.html
@@MilanJovanovicTech Hi, thank you. Can you tell what approach is used for multiple application instances? Each instance can push same messages from the DB 😱
Год назад
@@omgoooddistributed locking, or a proper message queue service available at every cloud provider.
Excellent Job @Milan. Thanks for the video. I use to do the Background Job processing in a separate app. Will use Outbox Pattern from now on. Keep up the Good work.
Milan, great video as usual, following every one. Do you maybe have plans to do a video on AddScoped/Transient/Singleton with practical examples of when you should use which?
I think there are two use cases for events. One is internal (domain events) and one is external (integration events). Internal events should be handled before the save changes since it mutates state within the unit of work. Sometimes an internal event would do nothing more than raise an external event. Could use a message queue or service buss or whatever. My thoughts would be: Create the domain event in the command handler and add it to the aggregate. Before save changes (in interceptor) publish the domain events. The domain event handler can do it's thing, which might include adding a message to OutBox (an external event) SaveChanges Now outbox processor still works as you said and publishes the events. But, the subscribers could either handle this event itself (something the current app/service does) or it can create an IntegrationEvent and publish it to a message queue/buss. Use Case would be something like: When an PO is closed, an Invoice should be created. (internal domain event) When an invoice is created it is sent to the customer. (external notification service event) ClosePOCommand rasies a POClosedDomainEvent. ClosePOCommand SavesChanges() BeforeSaveChanges publishes the POClosedDomainEvent POClosedDomainEventHandler creates an invoice (should it call CreateInvoiceCommand idk?) and adds an OutBox event (InvoiceCreatedEvent) SaveChanges() Outbox picks it up and publishes the event. The handler creates a message with all the info for the notification message and adds it to the NotificationServiceQueue. This way we have DomainEvents handled atomically and then decoupled IntegrationEvents published that any service can listen too. Thoughts?
Consider this case. You want to publish internal domain events before we save the changes - or rather, before we commit the transaction. What happens when you publish a domain event which talks to any *external* service. Let's say: 1. We publish to a queue successfully. 2. We attempt to complete our database transaction and this fails. 3. But we still published our message to the queue? 3.1. This could trigger other behavior downstream. We have now left our system in an inconsistent. This is why I believe that you simply can't publish your domain events before completing your transaction. Look at it from this angle also. An event is something which is in the past. A Domain Event is supposed to represent a fact in your system. It does not become a fact until you complete your transaction.
@@MilanJovanovicTech As I said, some stuff needs to be part of the unit of work. Because if I close the PO, then raise an event that is published after saving the PO, and the invoice create fails, I have lost the atomic business process. So, now my system is in an invalid state. I have to deal with all types of compensating events, or have some distributed transaction handling. Your point is exactly what I want to avoid. The answer to your "what if" is it doesn't happen. I never said the domain event is exposed externally.
@@pilotboba In the case of the invoice creation failing, the outbox table would still have a null Completion date field, therefore it could be attempted again? Does the fact that we have a record of the incomplete domain event handler help mitigate the risk of the invoice creation not occurring within the same transaction as the original domain event?
A great video that explain Transactional Outbox Pattern with a clean way, thank you Milan! (thank's also for using Quartz instead of boring while loops in background services)
I just can't stop watching your Milan videos, your explanation is soooo good. I'll do my homework by adding try catch, and I'll study a little more about domain events. I'm reading Vlad's Learning DDD after reading Eric Evans and Vernon, have you read the Blue and Red books? Have you thought about making videos discussing these books? Ty again.
Thanks for your knowledge! I have a question. Given a thousands of events that has to be send into external providers mqtt, before the midnight, I would need to increase the publication events velocity into the rabbitmq , so my first approach will have more than one service that reads the outbox table parallely. With the idea of load balancing the process of reads ( a bunch of 10 events) *( 2 read services parallely) . So how can I manage some kind of lock for each 10 events that has been processing for each services in order to when other service has to query the events don't take the same events of the other services?
That's where you run into problems. And the locking would probably kill your performance anyhow. You would either have 1 process reading the outbox, and scale it up as much as possible. Or partition the Outbox somehow, and have a process per partition.
Something like this may work just playing around with it at the moment, this way each process can read messages from the table, but not the same messages. using (var transaction = _dbContext.Database.BeginTransaction()) { var messages2 = _dbContext.Set() .FromSqlRaw("SELECT top 2 * FROM dbo.DomainEvents WITH (UPDLOCK, ROWLOCK, READPAST) WHERE ProcessedOnUtc IS NULL AND RetryAttempts
Thanks, great video. I have a question about the job, wouldn't using built-in job cause multiple publish of the same message in a multi-pod environment?
Hi! I'm loving your videos! I just have one suggestion to add always on to your explanations. In this case, for example. What if I have 5 instances of this microservice running the backgroundjob at the same time? It would be nice if you always take in to account the scalability of every video you are showing and how it should be resolved. Thank you!
That's a valid concern, I appreciate that you brought it up. I will try to address this in the videos going forward :) As for your scenario. I would actually try to avoid it as much as possible. Why should we need 5 instance of our job, if we are scaling our application horizontally? Perhaps we should move the background job into a separate service, so that we only have one instance running. Now, suppose we want to actually scale our background job horizontally. This is where I kind of struggle with ideas also. One thing that came to mind was partitioning the outbox messages produced by the application services somehow. And the each service's background job would only process the outbox messages in that partition. This might work, but I'm just theorizing right now. I don't have proof this works in practice. Do you have any ideas?
@@MilanJovanovicTech You can have a service bus that you can publish and read from the same microservice and if you have several instances then once one instance has taken the message you already know that other services won't process it (also you don't lose information). I work for a saas company and scalability is the most important thing to think about any functionality we add. It's very common to scale horitzontally an application in a saas world. You can also make sure that one function or background job is kind of a "singleton" in the arquitecture if you are thinking in an horitonzally scaling if you don't want to duplicate the job. This way you don't have deadlocks in whatever db you use. Depending if you want a fast processing or you are not in a rush and depending if you want to make sure you don't want to lose information between deploys as a background job in the same microservice if you have a dead instance or you have to swap you will probbably lose information/actions. And I can promise you that in cloud, a dead instance or recurrently swaps for deployments are very common. You can also use tablestorage with good partitionkeys.. as I said, depending on the needs and purposes, as always. I want to make clear that I learn from your videos because we all know in companies code is not as we would like see and we end up losing our technique or not updating ourselves! Keep the work up, I will keep watching and always try to think about a scalability way of doing the same! (The way you showed us will work for a little application and it's perfect though)
I think I'm a bit confused. If you use a service bus to handle the issue on concurrency, are we skipping the inbox pattern and publishing events straight to the service bus? That's definitely an idea, but you lose the atomicity benefit of the outbox pattern. I do believe that larger service bus wrappers (nservicebus) have outbox pattern built in, so you could go that route. But could do a poor man's 2 phase commit using transactions: grab your batch of unprocessed messages, immediately flag them as pending and save the transaction. After all the publishing is done, set them to processed and save. Thoughts?
If the database is the single source of truth for all instances, then add one more column to the OutboxMessage model from 02:28. Let that be: bool IsBeingProcessed. Then the background service must get first X amount of OutboxMessage-s, where IsBeingProcessed = false, mark them as IsBeingProcessed = true, process them, mark them as IsBeingProcessed = false + set the ProcessedOnUtc date. All that must be done in a transaction. That's how you could coordinate handling the workload between multiple instances.
It is a choice. Built-in background services require you to write logic for the recurring tasks yourself - and handle bugs and errors yourself. Quartz has already thought of all that it gives you a convenient interface for coding.
It's easier for me than the built-in background service. Plus, Quartz is much more robust in the features that it gives you. Check it out, read the documentation. You can also use something like Hangfire.
Yes I do understand the idea of using it as I have been using Hangfire prior to .Net 5 Background services. But due to its hits load on the database server, I switched to using Native Background services whenever possible, and thought that for a case similar to publishing the domain events a background service could still be good enough, but would require some errors handling as you suggested.
What will happen in the situation when there are multiple instances of this application running. There will be a background service job on every instance, probably processing the same outbox item, thus duplicate processing.
@@boris.stanojevic distributed locking is costly. Rather have optimistic lock with a table where static string used as unique index. Whenever who is first to obtain it does the job, at the end then cleans the record of static string when job is finished.
This is for an exceptional explanation of concepts and use case for outbox pattern. I do have some questions regarding execution of the domain event. Assuming we successfully created and added the user to database and then commit the outbox to database, how would you handle the case when sending email operation fails when the background service executed the domain event? Do you revert the entire transaction (adding user to database) back? What is the common approach for handling this? Thanks Milan!
The change that triggered the domain event should remain in the database, that's non-negotiable for me. The domain event can be retried, dead-lettered, etc. Many ways to solve it, depending on the importance of the domain event.
How do you handle exceptions inside event handlers ? Because we will have already marked the outbox message as processed, is there another queue maintained for event handler failures ?
@@marna_li Great question! Let's consider: - We have 1 transaction open - Saving changes fails - Nothing is stored in database - Excellent! Because we're saving our business logic + domain events in the same transaction we don't have to think about this.
@@MilanJovanovicTech Ah. You mean thar since the entities and the domain events are saved together. Now I see! Btw. EF Core 7 has said that they will drop wrapping in a default Db transaction. For performance reasons.
Is it a good option to keep domain events and outbox messages separate? What I mean by that is I use domain events often to achieve eventual consistency between my aggregates, so these are the things that need to happen immediately when SaveChangesAsync is called. On the other hand my applications often need to perform some side effects while handling domain events, like sending welcome message via email to a newly created user. However, if the SaveChangesAsync encounters an error, and handler which sends an email has already executed, then my user is misinformed. So how I see that is that email sending handler shouldn't send an email per se, but rather add an outbox message to the db context. When everything is saved correctly, the background job will scan any pending outbox messages, and the welcome email is gonna be sent to the user. Is it a good approach?
Great video, Milan! Sorry if you covered this elsewhere, but I'm new to .NET and microservices and would like to get your opinion on using the MassTransit outbox implementation (without RabbitMQ) for domain events, particularly in regards to resiliency when dealing with one service running in multiple Kubernetes pods. Would this package be a good idea rather than rolling my own outbox implementation, or is this over-engineering? And, is there a better option out there? Also, do any of these implementations prevent a race condition wherein two pods might try to process the same event at the same time?
Let's break this down into two separate parts. 1) "Would this package be a good idea rather than rolling my own outbox implementation, or is this over-engineering?" - The MassTransit Outbox supports works pretty well, but at the end of the day you're still publishing to a queue. So these are two separate problems. The Outbox solves reliable publishing (which is what you want), but consumption is a different issue. Whether you roll your own (I have a nice article coming up), or use a ready-made solution, it's not much difference. Although, depending on your scale and how many messages you want to publish, one may be better than the other. You'll have to benchmark this. 2) "Also, do any of these implementations prevent a race condition wherein two pods might try to process the same event at the same time?" - Would your Outbox be in one central place, and you'd be processing it from multiple workers? In that case I would look into row-level locking to prevent competing consumers.
@@MilanJovanovicTech Thanks! That makes a lot of sense. I'll try both approaches (rolling my own vs MassTransit) in some demo applications, as well as implementing row-level locking to prevent conflicts. The reason I thought about MassTransit for domain events is because I was planning on adding the package to my infrastructure project for integration events anyway, so I thought it might be worth considering for both scenarios.
Hi Milan, great video. If we had this running on clustered environment would there be anything that we would need to consider? Would there be multiple instances of the job running, one on each server, possibly resulting in the same message being pulled from the outbox multiple times, or does the quartz attribute stop that?
@@MilanJovanovicTech Fantastic thanks, any recomendations on the queue solution, ideally I would like to keep it in the main Db (MSSQL) and not in Azure or Rabbit. This was to ensure that if a backup is restored, any outstanding events would be processed and not lost. I'm probably overthinking things here
Hello Milan! Have you used domain events and an outbox message pattern to communicate between multiple microservices through a message broker, instead of using a handler within the same service? Is it worth using domain events in this way, or is this type of event not going to be domain, and is it better to separate them?
I also wrote a more in-depth blog post explaining the WHY of the Outbox pattern: www.milanjovanovic.tech/blog/outbox-pattern-for-reliable-microservices-messaging
@@MilanJovanovicTech Feature request: add commenting functionality in your blog. I wanted to say that if you add transaction on your architecture diagram then its understandable without reading. :)
Great video! What is your motivation to make your domain events persistent in the first place? Why not just using an EventBus (in memory) to publish the domain events?
As soon as you introduce an external system in the transaction, you need to start thinking about: what if one of these systems is down. To avoid that, we persist everything in a single transaction. We trade off eventual consistency, to have atomic transactions.
@@AboutCleanCode imho the most important part here is that the outbox messages are saved in the same transaction with the state changes to the domain objects. This way it is guaranteed that the events will only published if persisting the domain changes succeeded and domain changes will only be persisted if the events are created. Handling/publishing of the events is another part but having persisted events or outbox messsages clearly helps making that more robust.
As far as I understand an interceptor will be called each time SaveChanges executed. Is there a way to filter out, such that it will not fetch DB on commands which don't affect its subject? *Edit:* sorry for my retarted question, for some reason I thought events are in DB, but they are actually in memory, so it will be alright.
Great. So am I correct that the publishing of a domain event is done by the background job? Not anymore by the Entity responsible for generating the actual event?
@@MilanJovanovicTech all the major guides from microsoft and other sources suggest the same way to get at least once delivery garanty + solve duplicates problems down the way of event flow. I was seeking other variations of publisher but didnt found any example.
Thanks for sharing your way of doing transaction pattern. I have a few questions (maybe because I missed the previous video) 1) The message in the outbox pattern, once published are handled by the same app or are sent to other micro service (I think it should be the same app) 2) When processing outbox something can go wrong between publishing and saving the date of publishing, this would result in sending the same message at least another time. Is there a way to do outbox pattern and sending exactly one message? Or is sending at least once part of the pattern? 3) Handling of domain event to store them in outbox is « hidden » in an infrastructure interceptor. Wouldn’t it be even more DDDish to handle these event in the infrastructure agnostic core (with abstraction of the outbox repository) 4) Shouldn’t the core of your command make explicit the need to be in the same transaction for the business repository and the event save with outbox pattern ? (By having an abstraction of the transaction explicitly surrounding both repository save)
1. Same app 2. Indeed, as I mentioned I did not cover error handling when publishing messages. You can't, and shouldn't, guarantee to send exactly once. But you can ensure that you receive it only once, on the consuming side. This will be a topic in one of the following videos. 3. How would you go about this? I think the outbox is an Infra concern and not the Domain concern. 4. It is in the same transaction. Not sure what you meant here.
@@MilanJovanovicTech In my domain code I like making explicit what’s implicit. For example if it is important for my domain to have the domain event saved in the same transaction as the domain objects, I explicitly inject inside the core a “transaction manager” that wraps those concern. (In the core it is just an interface so I do not break the rule of not having dependencies to infrastructure). if I change my infra (for example from eclipselink to hibernate or spring data in java) I won’t forget this important same tranasaction concern and inject a new appropriate transactionManager. Hence my questions 3 and 4
@@MilanJovanovicTech more often than the core. I don’t know stability of .net world but in Java I already had to change from plain Java EE to Spring or from Java EE to Quarkus
Hi Milan, how do you handle transient exceptions that may occur when publishing an event? I currently use Polly, but I would like to know what you would do?
In terms of the clean architecture, there is application, domain, presentation, and infrastructure. what layer does persistence fall into? it seems like it is almost like the outermost layer similar to the web project. I just thought it was confusing because you can only reference projects on layers lower down and the infrastructure project references the persistence project.
I consider Persistence to be part of Infrastructure, but sometimes it makes sense to create it as a separate project. If the projects are in the same layer they can also reference each other - WHEN it makes sense. I like to be pragmatic about my clean architecture.
So, I guess that the benefit, compared to directly emitting event, is that you decouple the event emitting process from state mutation, hence increase the speed and reliability of the mutation process, but pay with reduced speed overall?
Not just decoupling (which is great). The reliability is the major reason, since the transaction is atomic by design. The speed is identical. The only difference now is there is "delay" because we are introducing eventual consistency.
@@MilanJovanovicTech I'm not sure, how do you achieve atomicity? You need to emit event during your transaction. You delayed this part of the transaction, after saving changes. What's the difference with: context.BeginTransaction -> DbSet.Add -> context.SaveChanges() -> producer.Produce -> context.CommitTransaction (maybe transaction.Commit, can't remember). In fact, I can name an advantage - a guarantee of at least once event delivery. In your case, you can't guarantee it, since your system only hopes that it will be emitted eventually, no? That's why i was talking about reliability. In the example I mentioned - if broker is down, you can't complete the db transaction. In your example - you can, but you will need some way of knowing that the event wasn't emitted. ps: maybe it needs to be said, but I see 2 types of transactions in play. A database one and a business one. When i'm asking about atomicy - I'm asking it in relation to the business transaction. pps: genuine question, those distributed systems are one of the banes of my existence.
@@Lammot No - it's not after saving changes. It's *during saving changes* which is super important. This means that we are saving everything in the same transaction. At least once delivery can easily be implemented here, with error handling and retries. I just didn't cover it in the video. I like to leave a few topics open for the following videos.
Its not during saving changes. During saving changes you only commit a need to emit event later. The emission itself is handled by a job in a separate process.
Great video !!, if we scale this application wouldn't there be a contention for picking up outbox messages, and 2 scaled applications, might pick up the same record and process twice, meaning sending multiple emails.
Great video! Thanks again. I'm trying to make a "base" clean architecture solution with similar structure as yours and this videos are very helpful. What can we expect in future? 😀
The next 3 videos are set: - DDD Cap theorem - Repository pattern - CQRS After that I'm planning another one (or few) for the Outbox, there's more things I want to cover. And also how to do validation with CQRS in an elegant way (MediatR behavior)
Great video as always, Milan! The background jobs work well with a single instance which is not the case in most of scenarios. Any suggestions on how to deal with this concurrent problem when having multiple replicas? Like having Hangfire to control the concurrency of the jobs?
The question is: Do you even need multiple instances of the job running? At that point, you are probably running at such a massive scale that you need to rethink this implementation. But let's try to brainstorm a solution: - Each service can publish Outbox messages into a partition - The job for that service only queries this partition - The partition can be some fixed value in the table, assigned to a service through some mechanism This seems feasible to me. Probably not too complicated to implement. What do you think?
@@MilanJovanovicTech I think what Marco is saying is that you now have a Quartz job and your API in a single deployable application. If you want to run multiple instances of the application to accomplish zero downtime, then for each instance, you will also be running a Quartz job. The attribute [DisableConcurrentProcessing] won't help you in this case. To fix this, you would need a separate deployable application that reads the outbox and then publishes the messages using something like RabbitMQ instead of MediatR. This does make things more complicated to setup
@@johntyrrell5047 basically when you have multiple instances of the service we end up dealing with concurrency issues. There are many different ways to solve the problem. You can have an internal endpoint that is pinged by a job, a cloud function or use Hangfire to sync all the background workers. But it’s not as simple as people think 😁 because we might end up sending duplicate messages and the subscribers need to be aware of that.
@@Talento90 This should have been handled at Quartz level same as hangfire which makes sure multiple instances are not able to run the same job. or if that's not possible then it should be select and update with another column in the table to show it's already been picked and in-progress with the expiration time so in case of exception, other instances can pick after expiry.
But what will happen if SaveChangesAsync in Execute method inside Job fail ? By that time you already can publish an email , but the due this failure ProcessedOnUtc will not be set, so next time you scheduled job runs it publish email again… looks like it will get too complicated if we want to handle such cases as well
Yes, this is "at least once" messaging semantics :) So you can expect duplicate processing. For an email? Can't do much, as its not a reversible operation
Don’t think you get the idea. Of course in case we are good with “at least once” strategy your approach is ok. So My idea about idempotency key is just to save it somewhere , (for example in memory ) after you published an event for the first time and in case you try to publish same event again you can reject publishing and you can still try to call SaveChanges and hoping that this time it will work . We can use event id as idempotency key, or generate hash based on domain event itself. Do you see any issues with this approach?
Thanks for your content, I just buyed all of your courses. Can I ask if there is already a containerized implementation of the outbox pattern? something similar to RabbitMQ
Thank you for this video. This approach is awesome. Until now I just knew about an approach of raising Domain Events in a middleware on the request.Completed callback. But the error handling in this callback is kind of tricky. What do you think about raising domain events within a middleware in a request.Completed callback within a transaction? :) Great Work Milan!!
@@MilanJovanovicTech Sure... this is the code executed in the Middleware: var transaction = await dbContext.Database.BeginTransactionAsync(); context.Response.OnCompleted(async () => { try { if (context.Items.TryGetValue(DomainEventsKey, out var value) && value is Queue domainEvents) { while (domainEvents.TryDequeue(out var nextEvent)) { await publisher.Publish(nextEvent); } } await transaction.CommitAsync(); } catch (EventualConsistencyException e) { // Handle eventual consistency exceptions } finally { await transaction.DisposeAsync(); } }); so this happens after the user got the response and if something goes wrong a rollback will be done
Great video Milan! Thanks for your hard work. Do you have any suggestion on how to implement such thing without ORM? Is there a way to implement something similar to an interceptor? Maybe a different approach?
@@amirpeeri More on the service that is saving to database. Of course, you could take a separate route. Create an IOutboxDispatcher, and publish them manually
@@MilanJovanovicTech Why do you think it's another problem? Outbox is a microservices pattern. In a loaded distributed system, one relational database may not be able to handle the load, so you can use a distributed queue as an intermediate solution, and in the consumer, iteratively try to wait for data that appeared, if after N attempts no data, then at the level above, something went wrong in the database and the message is discarded.
@@MilanJovanovicTechThanks for the answer! Maybe you know how it is with the scalability of this type of solution in terms of deploying it on Azure and running the api on multiple instances. Can you recommend what would be the best way to achieve this?
Hi Milan! Thanks for the great content. One question here. When overriding SavedChangedAsync method, how would you track instances of type AggregateRoot instead of simple AggregateRoot? I would like to apply the outboxpattern to aggregate roots which make use of strongly typed IDs. Thanks and keep up with the good work!
What can I do when my Event Properties are value objects/non-primitive? I can save it and the types are also saved correctly but when deserializing I either receive a null value or default values for the classes.
What happens when background processor try to actually try to execute and couldnt process it. And another service depends on the result. Do you then put new events saying that didn't actually go through?
Well, Polly can retry but what if it fails still? I'm guessing adding all those rollback events is the only way. So there is a log of events that happen but the problem is how fast it happens when do you release the last event to depending system. Ex: one service user buy cup cakes another service used for payments other service keep cup cake stock maintained. If payment fail now all the events must get what happened and essentially rollback events and payment shouldnt be processed even. Is this the case?
@MilanJovanovicTech Atomicity being the whole theme here, I'm wondering how you would handle the last service in the chain failing in this scenario. Say after the email step there is a second Domain Event Handler for the same domain event and it raises an IntegrationEvent to another service or something. If that were to fail, the transaction wouldn't be atomic, right? Because the email will have sent. I might be missing something simple here but just curious what your thoughts are.
Since the domain event publish would be a fanout in that case (1 domain event, multiple handlers): - We can either have the first event handler publish another domain event (a bit strange, but we keep it 1 event - 1 handler) - Or we introduce another concept - event handler idempotency. Now, we record if each event handler processed the event. Which allows us to retry publishing the event without duplicating side effects (in theory). I talked about it in this video: ruclips.net/video/mGeEtokcjVQ/видео.html [even in this case, you get at least once publish :)] Happy to discuss this further.
Excellent video as always! It's possible that when an http request comes in, we want the domain events to complete before returning a response to the client. If we limit the batch size and how often the job runs, the client might be sitting for awhile before their request is complete. Any ideas on how to tackle this or is this a non-issue? I'm thinking about hundreds of users using the app.
Hi, thanks for a video 👍. Is it possible invoke domains even on save change after save change in DB, just wondering about a way without background jobs
Yes - but you are then coupling the execution of your domain events in the same HTTP request as your business transaction. What if you have long-running event handlers? Not to mention the problems if any of the event handlers fails and throws an exception.
@@MilanJovanovicTech yes, make sense, why i asked, because we used the approach when we do explicit rise event to rabbit in every handler, wondering how we can reduce this. Thank for answer.
@@vallJune You can await as handling the event is asynchronous and publishing the event itself should be fast. Your message broker will probably send an acknowledgment back as soon as it gets the message and your command continues to do whatever it needs to. But with that approach, you create temporal coupling and don't have transactional behavior. If that is something acceptable in your context, publishing an event in every handler is the simplest option.
Awesome vid ! but what if the program fails in the ProcessOutboxMessagesJob background job before _dbcontext.saveChangesAsync() but after we publish the event with mediatr ? (code in minute 15:54, line 42)
Hi, Milan. Thanks a lot for sharing your knowledge with us! I have a question for you: what if between calling GetDomainEvents and ClearDomainEvnets in this time somehow one new domain event will be added, how you can be sure that this DomainEvent will ever be processed? Wouldn't be better just remove the DomainEvent from the collection not clear the whole collection instead?
EF Core should be configured as per-request lifetime scope, that is “scoped” (more on that on learn.microsoft.com/en-us/ef/core/dbcontext-configuration/). Also, aggregate root is the one enforcing rules, so for given use case, we expect 0 or more events within a given request, depending on how you choose to utilize domain events for communication in the first place. There should not be possible to get any new events, as persisting those events will be the last thing to do, together with other tracking changes.
This can't happen in practice, Kirill. The code for creating the outbox messages is executed after we call SaveChangesAsync in the command handler. After that point we won't have anything raising domain events, so we will be fine.
Hi, Thanks for the great video. In my personal project, I followed same step however I got "Type is an interface or abstract class and cannot be instantiated" error while deserialization the IDomianEvent. A little help will be much appreciated.
@@MilanJovanovicTech solved, I used Assembly Full Name, instead of Name while saving into the outbox messages and it works. However, what if domain event needs to store entity Id (identity) while creating new entity. If the domain event was raised with in the entity, the Id will be remain 0. So I raised the event after calling the EF save changes function and then raise the LogActivity domain event. So, is this good approach?
@@MilanJovanovicTech but, that class (MemberRegistetedDomainEventHandler) is *NOT* even inherited from any of MediatR class. Then how come it will be called automatically?
hi milan . how do you Stor Entities in ef database in DDD approch ? for example n-to-n relationShip between entities .please create a content about mapping DDD approch with Ef Core
Cant we have a backgrownd job without any interval which is triggered by the insertion of event???? Even if the backgrownd job block fir a concurrent queue, or blocking collection? As a singleton service inside of backgrownd job. Infinitly running and blocking or bad idea
I think it's possible, but I didn't explore what you have to change in the configure. I think calling RepeatFovere() without WithInterval() should get the job done.
You cannot really have a job triggered by an external event such as the insertion of a record in the database, as in this example. A way to do this is another technology called Change Data Capture (CDC). Take a look at what Debezium does.
@@MilanJovanovicTech I am trying but need help with that interface, would it need to include both ClearDomainEvents() and RaiseDomainEvents() methods declaration in the interface? If yes, then would I need to keep RaiseDomainEvents() method public in AggregateRoot implementation class?
What happens if the SaveChangesAsync method in the ProcessOutboxMessageJob class encounters with an error and changes are not saved? Then another email will be sent in the next job processing. Is there a solution for such a scenario?
Hi Milan, Thanks for this. Got a question: Should we clear out all the domain events within the 'Save change Interceptor' or only the one that we select? I see we select only the outBoxMessage events and we delete the rest. happy to hear your thoughts - thanks
A very good explanation of the pattern. Actually, I'm addicted to this pattern, using in every part of the app. 🙂 I came across one problem implementing that, there is an api which is responsible for creating user, and at that time need to send email to the user with the password on it. so if I am saving that email event on database, it has password on it. how to handle such situations with the pattern. thanks in advance.
should we see the refactoring tips, as this insrastructural code is becoming a mess..? Not only in case of this pattern, but in general. e .g. Startup gets bulky as well
@@MilanJovanovicTech I was surprised with video about DB configuration, as there were a ways to simplify set up if anything, not just DB. Maybe you know some other tricks)
So how does the outbox pattern fit with something like RabbitMq, do you publish to a queue and once that message is acknowledged you mark it as process but what if the other service failed to process it, would you need an inbox pattern too. My mind is just rushing with ideas, it would be great to hear your take. . .
I would treat them as separate things, since they solve different problems. The Outbox is great for giving you an atomic transaction when you need to trigger behavior related to external services. You could definitely publish to a queue when processing the Outbox, and this would be considered an *integration event*. So we have: - Domain Events -> Outbox - Integration Events -> Queue Of course, on the consumer side it's up to you to ensure idempotency and "at least once", "at most once" or "exactly once" semantics. Definitely a topic for a future video!
Thanks you for wonderful and very detailed video. I have bit of struggle in interceptor class. My problem is that this part of code: var outboxMessages = dbContext.ChangeTracker .Entries() .Select(x => x.Entity) .SelectMany(aggregateRoot => in your example has . In my example AggregateRoot is generic and TKey is whatever the primary key type is. Can you help me out on this matter please? Thank you, and keep doing what you do because is AWESOME!
@@MilanJovanovicTech Did I understand you correctly? Namely, to create an Interface instead of a TKey? In my case, Tkey or Id is of record type like: public record UserId(Guid UserId). In this case this record should be: public record UserId(Guid UserId) : ISomeInterface. AND than inside interceptor i can have var outboxMessages = dataContext.ChangeTracker .Entries()
@@33bgatti33 This is how I solved it. public interface IAggregateRoot { IReadOnlyCollection GetDomainEvents(); void ClearDomainEvents(); } public abstract class AggregateRoot : Entity, IAggregateRoot where TId : notnull { ... removed for brevity } Then you can search for the entities which implements the interface and use the methods that it implements var outboxMessages = _context.ChangeTracker .Entries() .Select(x => x.Entity)
4:04 - You say that only aggregates can raise domain events. Why is that? Can't entities do that too? Or maybe my understanding of DDD is poor and I always assume an aggregate to be something that contains other entities. Maybe what you say implies that it's fine to have an aggregate consisting of one entity, it's still about enforcing invariants.
Yes, one entity is also fine and it can raise a domain event. What I'm trying to avoid here is having an aggregate raise a domain event, and a nested entity also raising a domain event. This is hard to manage and easily grows in complexity.
I would use a MessageBus: RabbitMQ, Kafka, Azure Service Bus, AWS SQS, etc for such a thing. The wheel is already implemented so why doing it manually? And it solves any of the possible issues when scaling
Get the source code for this video for FREE → the-dotnet-weekly.ck.page/outbox
Want to master Clean Architecture? Go here: bit.ly/3PupkOJ
Want to unlock Modular Monoliths? Go here: bit.ly/3SXlzSt
I watch every single video you've published. That's real gem.
Thank you!
The greatest implementation of an outbox I ever seen.
Great merge of knowledge about DDD and communication in distributed systems.
Thanks to your videos I propose a similar solution to my team to secure consistency of transaction between updating a database and producing an event to a message queue.
Glad you are on the YT.
Thank you
Note that this isn't production ready, Radek. You need to add proper error handling, possibly retries. With retries, you need to think about idempotency of event handlers and so on.
In any case, I'm glad you liked it. I hope it serves your team and you well 😁
@@MilanJovanovicTech Thank you for advice. I bear it in my mind. You have talked about retries in the video, does it cover the things that you mentioned above?: ruclips.net/video/xajVttkZntU/видео.html
@@MilanJovanovicTech Hi, thank you. Can you tell what approach is used for multiple application instances? Each instance can push same messages from the DB 😱
@@omgoooddistributed locking, or a proper message queue service available at every cloud provider.
I just yesterday read an article about this and found it very interesting. Funny to now run into your video. Good timing.
I'm reading your mind Martin 😂
Dear Milan, I absolutely love these videos! They are well structured, well preparedand you explain what you're doing. Amazing.
Thank you so much!
Very well explained without complications.
Thanks a lot!
Nice camera work in the introduction of your videos.
One sentence zoom in.
Other statement zoom out.
And then the code editor appears.
👌👌👌
Thanks. Something I thought to increase retention in the start of the video 😅
You always take these concepts to the next level!
Hah, far from it 😅 I need to do a V2 of this video with some improvements
Excellent Job @Milan. Thanks for the video. I use to do the Background Job processing in a separate app. Will use Outbox Pattern from now on. Keep up the Good work.
That approach is good for scaling
Thank you, finally I saw some ef interceptors in action
They aren't too complicated to work with. Just have to understand which method to use.
Like the practical and on point videos very much, looking forward to the next!
Thank you very much Constantin
You are really good explaining and showing concepts. Keep these way i love your videos.
Thanks a lot! I'm getting better with every video
Clear and concise. Good work! Thx! 😊
Glad it was helpful!
Milan, great video as usual, following every one.
Do you maybe have plans to do a video on AddScoped/Transient/Singleton with practical examples of when you should use which?
Sure thing! I'll add to my schedule :)
Great video - makes this concept very clear.
Glad it was helpful!
I think there are two use cases for events. One is internal (domain events) and one is external (integration events).
Internal events should be handled before the save changes since it mutates state within the unit of work.
Sometimes an internal event would do nothing more than raise an external event. Could use a message queue or service buss or whatever.
My thoughts would be:
Create the domain event in the command handler and add it to the aggregate.
Before save changes (in interceptor) publish the domain events.
The domain event handler can do it's thing, which might include adding a message to OutBox (an external event)
SaveChanges
Now outbox processor still works as you said and publishes the events. But, the subscribers could either handle this event itself (something the current app/service does) or it can create an IntegrationEvent and publish it to a message queue/buss.
Use Case would be something like:
When an PO is closed, an Invoice should be created. (internal domain event)
When an invoice is created it is sent to the customer. (external notification service event)
ClosePOCommand rasies a POClosedDomainEvent.
ClosePOCommand SavesChanges()
BeforeSaveChanges publishes the POClosedDomainEvent
POClosedDomainEventHandler creates an invoice (should it call CreateInvoiceCommand idk?) and adds an OutBox event (InvoiceCreatedEvent)
SaveChanges()
Outbox picks it up and publishes the event. The handler creates a message with all the info for the notification message and adds it to the NotificationServiceQueue.
This way we have DomainEvents handled atomically and then decoupled IntegrationEvents published that any service can listen too.
Thoughts?
Consider this case.
You want to publish internal domain events before we save the changes - or rather, before we commit the transaction.
What happens when you publish a domain event which talks to any *external* service.
Let's say:
1. We publish to a queue successfully.
2. We attempt to complete our database transaction and this fails.
3. But we still published our message to the queue?
3.1. This could trigger other behavior downstream.
We have now left our system in an inconsistent.
This is why I believe that you simply can't publish your domain events before completing your transaction.
Look at it from this angle also.
An event is something which is in the past.
A Domain Event is supposed to represent a fact in your system.
It does not become a fact until you complete your transaction.
@@MilanJovanovicTech As I said, some stuff needs to be part of the unit of work.
Because if I close the PO, then raise an event that is published after saving the PO, and the invoice create fails, I have lost the atomic business process. So, now my system is in an invalid state. I have to deal with all types of compensating events, or have some distributed transaction handling. Your point is exactly what I want to avoid.
The answer to your "what if" is it doesn't happen. I never said the domain event is exposed externally.
@@pilotboba In the case of the invoice creation failing, the outbox table would still have a null Completion date field, therefore it could be attempted again? Does the fact that we have a record of the incomplete domain event handler help mitigate the risk of the invoice creation not occurring within the same transaction as the original domain event?
A great video that explain Transactional Outbox Pattern with a clean way, thank you Milan! (thank's also for using Quartz instead of boring while loops in background services)
I might do a V2 of this soon
please cover how to rollback state after DE handlers fails @@MilanJovanovicTech
I just can't stop watching your Milan videos, your explanation is soooo good. I'll do my homework by adding try catch, and I'll study a little more about domain events.
I'm reading Vlad's Learning DDD after reading Eric Evans and Vernon, have you read the Blue and Red books? Have you thought about making videos discussing these books?
Ty again.
I haven't read Learning DDD, but I will have to. Not sure about book comparison video though 🤔
Thanks for your knowledge! I have a question. Given a thousands of events that has to be send into external providers mqtt, before the midnight, I would need to increase the publication events velocity into the rabbitmq , so my first approach will have more than one service that reads the outbox table parallely. With the idea of load balancing the process of reads ( a bunch of 10 events) *( 2 read services parallely) . So how can I manage some kind of lock for each 10 events that has been processing for each services in order to when other service has to query the events don't take the same events of the other services?
That's where you run into problems. And the locking would probably kill your performance anyhow.
You would either have 1 process reading the outbox, and scale it up as much as possible.
Or partition the Outbox somehow, and have a process per partition.
Something like this may work just playing around with it at the moment, this way each process can read messages from the table, but not the same messages.
using (var transaction = _dbContext.Database.BeginTransaction())
{
var messages2 = _dbContext.Set()
.FromSqlRaw("SELECT top 2 * FROM dbo.DomainEvents WITH (UPDLOCK, ROWLOCK, READPAST) WHERE ProcessedOnUtc IS NULL AND RetryAttempts
Thanks, great video. I have a question about the job, wouldn't using built-in job cause multiple publish of the same message in a multi-pod environment?
Yes
Great video, thanks Milan!
You're welcome :)
Awesome explanation👌. Thanks
You're very welcome! :)
Hi! I'm loving your videos! I just have one suggestion to add always on to your explanations. In this case, for example. What if I have 5 instances of this microservice running the backgroundjob at the same time? It would be nice if you always take in to account the scalability of every video you are showing and how it should be resolved. Thank you!
That's a valid concern, I appreciate that you brought it up. I will try to address this in the videos going forward :)
As for your scenario. I would actually try to avoid it as much as possible. Why should we need 5 instance of our job, if we are scaling our application horizontally? Perhaps we should move the background job into a separate service, so that we only have one instance running.
Now, suppose we want to actually scale our background job horizontally.
This is where I kind of struggle with ideas also.
One thing that came to mind was partitioning the outbox messages produced by the application services somehow.
And the each service's background job would only process the outbox messages in that partition.
This might work, but I'm just theorizing right now.
I don't have proof this works in practice.
Do you have any ideas?
@@MilanJovanovicTech You can have a service bus that you can publish and read from the same microservice and if you have several instances then once one instance has taken the message you already know that other services won't process it (also you don't lose information). I work for a saas company and scalability is the most important thing to think about any functionality we add. It's very common to scale horitzontally an application in a saas world. You can also make sure that one function or background job is kind of a "singleton" in the arquitecture if you are thinking in an horitonzally scaling if you don't want to duplicate the job. This way you don't have deadlocks in whatever db you use. Depending if you want a fast processing or you are not in a rush and depending if you want to make sure you don't want to lose information between deploys as a background job in the same microservice if you have a dead instance or you have to swap you will probbably lose information/actions. And I can promise you that in cloud, a dead instance or recurrently swaps for deployments are very common. You can also use tablestorage with good partitionkeys.. as I said, depending on the needs and purposes, as always. I want to make clear that I learn from your videos because we all know in companies code is not as we would like see and we end up losing our technique or not updating ourselves! Keep the work up, I will keep watching and always try to think about a scalability way of doing the same! (The way you showed us will work for a little application and it's perfect though)
I think I'm a bit confused. If you use a service bus to handle the issue on concurrency, are we skipping the inbox pattern and publishing events straight to the service bus?
That's definitely an idea, but you lose the atomicity benefit of the outbox pattern. I do believe that larger service bus wrappers (nservicebus) have outbox pattern built in, so you could go that route.
But could do a poor man's 2 phase commit using transactions: grab your batch of unprocessed messages, immediately flag them as pending and save the transaction. After all the publishing is done, set them to processed and save.
Thoughts?
If the database is the single source of truth for all instances, then add one more column to the OutboxMessage model from 02:28. Let that be: bool IsBeingProcessed.
Then the background service must get first X amount of OutboxMessage-s, where IsBeingProcessed = false, mark them as IsBeingProcessed = true, process them, mark them as IsBeingProcessed = false + set the ProcessedOnUtc date. All that must be done in a transaction.
That's how you could coordinate handling the workload between multiple instances.
Great Video, thakn you milan.
Just wondering why using Quartz instead of the built-in background services?
It is a choice. Built-in background services require you to write logic for the recurring tasks yourself - and handle bugs and errors yourself. Quartz has already thought of all that it gives you a convenient interface for coding.
It's easier for me than the built-in background service. Plus, Quartz is much more robust in the features that it gives you. Check it out, read the documentation. You can also use something like Hangfire.
Yes I do understand the idea of using it as I have been using Hangfire prior to .Net 5 Background services.
But due to its hits load on the database server, I switched to using Native Background services whenever possible, and thought that for a case similar to publishing the domain events a background service could still be good enough, but would require some errors handling as you suggested.
@@mohammad.daglas This Quartz job actually uses the native background service, so you can conside it almost identicap
Awesome explanation ❤
Could you please make a video using Hangfire?
Coming out in a few weeks :)
@@MilanJovanovicTech ❤❤❤
For this example, why is it beneficial to use the outbox pattern vs publishing the domain event handler right after creating a new member?
ruclips.net/video/5L2BUuSdZBI/видео.html
Great video
Thanks!
What will happen in the situation when there are multiple instances of this application running. There will be a background service job on every instance, probably processing the same outbox item, thus duplicate processing.
Right, you would need to think of a way to solve this. Or have a separate instance running only the background job, so that only one exists at a time.
See my comment to Gerard Uab , see if that works for you
I would use a MessageBus: RabbitMQ, Kafka, Azure Service Bus, AWS SQS, etc for such a thing and won't worry about multiple instances of my application
@@boris.stanojevic distributed locking is costly. Rather have optimistic lock with a table where static string used as unique index. Whenever who is first to obtain it does the job, at the end then cleans the record of static string when job is finished.
Persist events in redis stream and register job worker under a consumer group.
This is for an exceptional explanation of concepts and use case for outbox pattern. I do have some questions regarding execution of the domain event. Assuming we successfully created and added the user to database and then commit the outbox to database, how would you handle the case when sending email operation fails when the background service executed the domain event? Do you revert the entire transaction (adding user to database) back? What is the common approach for handling this? Thanks Milan!
The change that triggered the domain event should remain in the database, that's non-negotiable for me. The domain event can be retried, dead-lettered, etc. Many ways to solve it, depending on the importance of the domain event.
How do you handle exceptions inside event handlers ? Because we will have already marked the outbox message as processed, is there another queue maintained for event handler failures ?
You could bubble the exception and fail the message, have it retried
Great video!
Thanks, Robert!
Awesome! I’m stealing and improving on this 🙂
What will you improve on? 🧠
@@MilanJovanovicTech What happens if SaveChanges fails. Invalidate domain events. Or have I missed something about that? :)
@@marna_li Great question! Let's consider:
- We have 1 transaction open
- Saving changes fails
- Nothing is stored in database
- Excellent!
Because we're saving our business logic + domain events in the same transaction we don't have to think about this.
@@MilanJovanovicTech Ah. You mean thar since the entities and the domain events are saved together. Now I see!
Btw. EF Core 7 has said that they will drop wrapping in a default Db transaction. For performance reasons.
@@marna_li Only if there is a single insert statement I believe. In our case, it would have to remain inside a transaction, because it must be atomic
Is it a good option to keep domain events and outbox messages separate? What I mean by that is I use domain events often to achieve eventual consistency between my aggregates, so these are the things that need to happen immediately when SaveChangesAsync is called. On the other hand my applications often need to perform some side effects while handling domain events, like sending welcome message via email to a newly created user. However, if the SaveChangesAsync encounters an error, and handler which sends an email has already executed, then my user is misinformed. So how I see that is that email sending handler shouldn't send an email per se, but rather add an outbox message to the db context. When everything is saved correctly, the background job will scan any pending outbox messages, and the welcome email is gonna be sent to the user. Is it a good approach?
It is a good approach, but it's also probably overkill. You're increasing complexity a lot, for the slight chance that sending an email will fail.
Great video, Milan! Sorry if you covered this elsewhere, but I'm new to .NET and microservices and would like to get your opinion on using the MassTransit outbox implementation (without RabbitMQ) for domain events, particularly in regards to resiliency when dealing with one service running in multiple Kubernetes pods. Would this package be a good idea rather than rolling my own outbox implementation, or is this over-engineering? And, is there a better option out there? Also, do any of these implementations prevent a race condition wherein two pods might try to process the same event at the same time?
Let's break this down into two separate parts.
1) "Would this package be a good idea rather than rolling my own outbox implementation, or is this over-engineering?" - The MassTransit Outbox supports works pretty well, but at the end of the day you're still publishing to a queue. So these are two separate problems. The Outbox solves reliable publishing (which is what you want), but consumption is a different issue. Whether you roll your own (I have a nice article coming up), or use a ready-made solution, it's not much difference. Although, depending on your scale and how many messages you want to publish, one may be better than the other. You'll have to benchmark this.
2) "Also, do any of these implementations prevent a race condition wherein two pods might try to process the same event at the same time?" - Would your Outbox be in one central place, and you'd be processing it from multiple workers? In that case I would look into row-level locking to prevent competing consumers.
@@MilanJovanovicTech Thanks! That makes a lot of sense. I'll try both approaches (rolling my own vs MassTransit) in some demo applications, as well as implementing row-level locking to prevent conflicts. The reason I thought about MassTransit for domain events is because I was planning on adding the package to my infrastructure project for integration events anyway, so I thought it might be worth considering for both scenarios.
Hi Milan, great video. If we had this running on clustered environment would there be anything that we would need to consider? Would there be multiple instances of the job running, one on each server, possibly resulting in the same message being pulled from the outbox multiple times, or does the quartz attribute stop that?
Yes, that's a problem with this setup! Probably the simplest solution would be to extract the worker into a separate project, and publish to a queue.
@@MilanJovanovicTech Fantastic thanks, any recomendations on the queue solution, ideally I would like to keep it in the main Db (MSSQL) and not in Azure or Rabbit. This was to ensure that if a backup is restored, any outstanding events would be processed and not lost. I'm probably overthinking things here
Hello Milan! Have you used domain events and an outbox message pattern to communicate between multiple microservices through a message broker, instead of using a handler within the same service? Is it worth using domain events in this way, or is this type of event not going to be domain, and is it better to separate them?
You can use domain events to publish a message to the queue, that's perfectly acceptable.
This is very useful content
I also wrote a more in-depth blog post explaining the WHY of the Outbox pattern: www.milanjovanovic.tech/blog/outbox-pattern-for-reliable-microservices-messaging
@@MilanJovanovicTech Feature request: add commenting functionality in your blog. I wanted to say that if you add transaction on your architecture diagram then its understandable without reading. :)
Great video! What is your motivation to make your domain events persistent in the first place? Why not just using an EventBus (in memory) to publish the domain events?
As soon as you introduce an external system in the transaction, you need to start thinking about: what if one of these systems is down. To avoid that, we persist everything in a single transaction.
We trade off eventual consistency, to have atomic transactions.
@@MilanJovanovicTech and i assume the external system in this example is the mail service? ok - got it. thx for explaining!
@@AboutCleanCode imho the most important part here is that the outbox messages are saved in the same transaction with the state changes to the domain objects.
This way it is guaranteed that the events will only published if persisting the domain changes succeeded and domain changes will only be persisted if the events are created.
Handling/publishing of the events is another part but having persisted events or outbox messsages clearly helps making that more robust.
How to migrate domain event if domain model changed?
You dont lock event entry, other instance can be handle it in parallel
I always have one worker process running at a time
As far as I understand an interceptor will be called each time SaveChanges executed. Is there a way to filter out, such that it will not fetch DB on commands which don't affect its subject?
*Edit:* sorry for my retarted question, for some reason I thought events are in DB, but they are actually in memory, so it will be alright.
I didn't quite get your question?
how did u convert message into interface without getting an exception ?
Set up the serializer to include the type
Great. So am I correct that the publishing of a domain event is done by the background job? Not anymore by the Entity responsible for generating the actual event?
Correct. The entity just "raises" the event, but this is in-memory. We publish it separately.
Hey nice example. Im afraid on 15:44 we have the same dual write - publish + update "ProcessedOn"
Yes, should've made it more robust
@@MilanJovanovicTech all the major guides from microsoft and other sources suggest the same way to get at least once delivery garanty + solve duplicates problems down the way of event flow.
I was seeking other variations of publisher but didnt found any example.
Thanks for sharing your way of doing transaction pattern. I have a few questions (maybe because I missed the previous video)
1) The message in the outbox pattern, once published are handled by the same app or are sent to other micro service (I think it should be the same app)
2) When processing outbox something can go wrong between publishing and saving the date of publishing, this would result in sending the same message at least another time. Is there a way to do outbox pattern and sending exactly one message? Or is sending at least once part of the pattern?
3) Handling of domain event to store them in outbox is « hidden » in an infrastructure interceptor. Wouldn’t it be even more DDDish to handle these event in the infrastructure agnostic core (with abstraction of the outbox repository)
4) Shouldn’t the core of your command make explicit the need to be in the same transaction for the business repository and the event save with outbox pattern ? (By having an abstraction of the transaction explicitly surrounding both repository save)
1. Same app
2. Indeed, as I mentioned I did not cover error handling when publishing messages. You can't, and shouldn't, guarantee to send exactly once. But you can ensure that you receive it only once, on the consuming side. This will be a topic in one of the following videos.
3. How would you go about this? I think the outbox is an Infra concern and not the Domain concern.
4. It is in the same transaction. Not sure what you meant here.
@@MilanJovanovicTech In my domain code I like making explicit what’s implicit. For example if it is important for my domain to have the domain event saved in the same transaction as the domain objects, I explicitly inject inside the core a “transaction manager” that wraps those concern. (In the core it is just an interface so I do not break the rule of not having dependencies to infrastructure). if I change my infra (for example from eclipselink to hibernate or spring data in java) I won’t forget this important same tranasaction concern and inject a new appropriate transactionManager. Hence my questions 3 and 4
@@pierre-jeanmartin5621 But how often will you actually chane your infrastructure?
@@MilanJovanovicTech more often than the core. I don’t know stability of .net world but in Java I already had to change from plain Java EE to Spring or from Java EE to Quarkus
What do you think about using Hangfire to handle the outbox?
Same thing. Anything that can fire a background job gets the job done. Right?
Hi Milan, how do you handle transient exceptions that may occur when publishing an event?
I currently use Polly, but I would like to know what you would do?
Retry the publish a few times, and if it fails move it to a DLQ
Nice, dark mode in postman ;-)
Yes! I listen to suggestions in my comments 😅
In terms of the clean architecture, there is application, domain, presentation, and infrastructure. what layer does persistence fall into? it seems like it is almost like the outermost layer similar to the web project.
I just thought it was confusing because you can only reference projects on layers lower down and the infrastructure project references the persistence project.
I consider Persistence to be part of Infrastructure, but sometimes it makes sense to create it as a separate project. If the projects are in the same layer they can also reference each other - WHEN it makes sense. I like to be pragmatic about my clean architecture.
That makes sense! Thanks!
So, I guess that the benefit, compared to directly emitting event, is that you decouple the event emitting process from state mutation, hence increase the speed and reliability of the mutation process, but pay with reduced speed overall?
Not just decoupling (which is great). The reliability is the major reason, since the transaction is atomic by design.
The speed is identical. The only difference now is there is "delay" because we are introducing eventual consistency.
@@MilanJovanovicTech I'm not sure, how do you achieve atomicity? You need to emit event during your transaction. You delayed this part of the transaction, after saving changes. What's the difference with:
context.BeginTransaction -> DbSet.Add -> context.SaveChanges() -> producer.Produce -> context.CommitTransaction (maybe transaction.Commit, can't remember).
In fact, I can name an advantage - a guarantee of at least once event delivery. In your case, you can't guarantee it, since your system only hopes that it will be emitted eventually, no?
That's why i was talking about reliability. In the example I mentioned - if broker is down, you can't complete the db transaction. In your example - you can, but you will need some way of knowing that the event wasn't emitted.
ps: maybe it needs to be said, but I see 2 types of transactions in play. A database one and a business one. When i'm asking about atomicy - I'm asking it in relation to the business transaction.
pps: genuine question, those distributed systems are one of the banes of my existence.
@@Lammot No - it's not after saving changes. It's *during saving changes* which is super important. This means that we are saving everything in the same transaction.
At least once delivery can easily be implemented here, with error handling and retries. I just didn't cover it in the video.
I like to leave a few topics open for the following videos.
Its not during saving changes. During saving changes you only commit a need to emit event later. The emission itself is handled by a job in a separate process.
Great video !!, if we scale this application wouldn't there be a contention for picking up outbox messages, and 2 scaled applications, might pick up the same record and process twice, meaning sending multiple emails.
Indeed, you can't really scale this horizontally. You would have to create a separate service (application) that will execute the background job.
thank you, it was very interesting
Glad you enjoyed it
Is this not a performance plenty by reading the domain events from Db in background Job continueusly? how can we get rid of this ?
I don't think so, no. It's the price you pay to enable the Outbox.
Great video! Thanks again. I'm trying to make a "base" clean architecture solution with similar structure as yours and this videos are very helpful. What can we expect in future? 😀
The next 3 videos are set:
- DDD Cap theorem
- Repository pattern
- CQRS
After that I'm planning another one (or few) for the Outbox, there's more things I want to cover. And also how to do validation with CQRS in an elegant way (MediatR behavior)
Great video as always, Milan! The background jobs work well with a single instance which is not the case in most of scenarios. Any suggestions on how to deal with this concurrent problem when having multiple replicas? Like having Hangfire to control the concurrency of the jobs?
The question is: Do you even need multiple instances of the job running?
At that point, you are probably running at such a massive scale that you need to rethink this implementation.
But let's try to brainstorm a solution:
- Each service can publish Outbox messages into a partition
- The job for that service only queries this partition
- The partition can be some fixed value in the table, assigned to a service through some mechanism
This seems feasible to me. Probably not too complicated to implement. What do you think?
@@MilanJovanovicTech I think what Marco is saying is that you now have a Quartz job and your API in a single deployable application. If you want to run multiple instances of the application to accomplish zero downtime, then for each instance, you will also be running a Quartz job. The attribute [DisableConcurrentProcessing] won't help you in this case.
To fix this, you would need a separate deployable application that reads the outbox and then publishes the messages using something like RabbitMQ instead of MediatR.
This does make things more complicated to setup
@@johntyrrell5047 basically when you have multiple instances of the service we end up dealing with concurrency issues. There are many different ways to solve the problem. You can have an internal endpoint that is pinged by a job, a cloud function or use Hangfire to sync all the background workers. But it’s not as simple as people think 😁 because we might end up sending duplicate messages and the subscribers need to be aware of that.
@@Talento90 This should have been handled at Quartz level same as hangfire which makes sure multiple instances are not able to run the same job. or if that's not possible then it should be select and update with another column in the table to show it's already been picked and in-progress with the expiration time so in case of exception, other instances can pick after expiry.
Can we soft lock the row in table using boolean flag with an expiry using date column and deal with concurrency?
But what will happen if SaveChangesAsync in Execute method inside Job fail ? By that time you already can publish an email , but the due this failure ProcessedOnUtc will not be set, so next time you scheduled job runs it publish email again… looks like it will get too complicated if we want to handle such cases as well
Yes, this is "at least once" messaging semantics :) So you can expect duplicate processing. For an email? Can't do much, as its not a reversible operation
I think we should have an idempotency key, but the question is in which scope to create it and where to store it
@@VLazariv What if it's an external service for sending emails?
Don’t think you get the idea. Of course in case we are good with “at least once” strategy your approach is ok. So My idea about idempotency key is just to save it somewhere , (for example in memory ) after you published an event for the first time and in case you try to publish same event again you can reject publishing and you can still try to call SaveChanges and hoping that this time it will work . We can use event id as idempotency key, or generate hash based on domain event itself. Do you see any issues with this approach?
Of course it can be handled by message broker itself, so it can reject duplicate, but I’m talking in case it’s not
Great video Milan! It’s possible and would be good idea implement this pattern using Marten DB Library?
It's an option, I haven't used Marten's Outbox so I don't know the pros and cons
I'm certain you have seen that System.Text.Json is more performant (by a lot) than Newtonsoft. Have you tried implementing this using System.Text?
There is some feature parity issue that prevented me using System.Text.Json.
I'll check if things changed in recent releases.
Thanks for your content, I just buyed all of your courses. Can I ask if there is already a containerized implementation of the outbox pattern? something similar to RabbitMQ
Both courses showcase an Outbox implementation. The one in MMA is probably more robust.
Thank you for this video. This approach is awesome. Until now I just knew about an approach of raising Domain Events in a middleware on the request.Completed callback. But the error handling in this callback is kind of tricky.
What do you think about raising domain events within a middleware in a request.Completed callback within a transaction? :)
Great Work Milan!!
Do you have a small code sample for that? Not sure what callback you're talking about
@@MilanJovanovicTech Sure... this is the code executed in the Middleware: var transaction = await dbContext.Database.BeginTransactionAsync();
context.Response.OnCompleted(async () =>
{
try
{
if (context.Items.TryGetValue(DomainEventsKey, out var value) && value is Queue domainEvents)
{
while (domainEvents.TryDequeue(out var nextEvent))
{
await publisher.Publish(nextEvent);
}
}
await transaction.CommitAsync();
}
catch (EventualConsistencyException e)
{
// Handle eventual consistency exceptions
}
finally
{
await transaction.DisposeAsync();
}
});
so this happens after the user got the response and if something goes wrong a rollback will be done
How would you achieve the same without change tracker and AsNoTracking?
SQL
Great video Milan! Thanks for your hard work. Do you have any suggestion on how to implement such thing without ORM? Is there a way to implement something similar to an interceptor? Maybe a different approach?
There's always the option to create a generic decorator that can do something like this. What do you think?
@@MilanJovanovicTech not sure exactly how you mean... Can you please elaborate? You mean a decorator on the AggregateRoot?
@@amirpeeri More on the service that is saving to database.
Of course, you could take a separate route. Create an IOutboxDispatcher, and publish them manually
Cool! What about if you use two different database, for store business data and message?
Than that's an entirely different problem. You would need a distributed transaction which is a bad idea.
@@MilanJovanovicTech Why do you think it's another problem? Outbox is a microservices pattern. In a loaded distributed system, one relational database may not be able to handle the load, so you can use a distributed queue as an intermediate solution, and in the consumer, iteratively try to wait for data that appeared, if after N attempts no data, then at the level above, something went wrong in the database and the message is discarded.
Hi Milan, great video. I was wondering wouldn't it be better to implement publishing events via queue instead of background processor?
I should maybe make a video discussing domain and integration events. IMO domain events are handled in-memory, integration events go to the queue
@@MilanJovanovicTechThanks for the answer! Maybe you know how it is with the scalability of this type of solution in terms of deploying it on Azure and running the api on multiple instances. Can you recommend what would be the best way to achieve this?
Hi Milan! Thanks for the great content. One question here. When overriding SavedChangedAsync method, how would you track instances of type AggregateRoot instead of simple AggregateRoot? I would like to apply the outboxpattern to aggregate roots which make use of strongly typed IDs. Thanks and keep up with the good work!
Create an interface, you can always reference that regardless of strongly typed IDs
What can I do when my Event Properties are value objects/non-primitive? I can save it and the types are also saved correctly but when deserializing I either receive a null value or default values for the classes.
Just pass it a custom JsonSerializerOptions to use private setters/ ctors
@@MilanJovanovicTech btw, great Video! Would you generally prefer passing non-primitives as event payload or just the raw values/primitives?
Very good video , thanks :)
Glad you liked it!
Nice video, can I use some queue put my outbox object instead to save in database?
Yes, you can use a queue - but you still have an issue with atomicity, because you are introducing an external system (the queue).
Does the Quartz [DisallowConcurrentExecution] work only in-process, or will it guarantee single execution even when the app service scales out?
In-process only. Needs a more robust solution when scaled out. I'd look at Hangfire.
Thanks!
Np!
What happens when background processor try to actually try to execute and couldnt process it. And another service depends on the result. Do you then put new events saying that didn't actually go through?
I have another video about that with Polly
Well, Polly can retry but what if it fails still? I'm guessing adding all those rollback events is the only way. So there is a log of events that happen but the problem is how fast it happens when do you release the last event to depending system.
Ex: one service user buy cup cakes another service used for payments other service keep cup cake stock maintained. If payment fail now all the events must get what happened and essentially rollback events and payment shouldnt be processed even. Is this the case?
@@ravindranathwi If Retry fails you stop processing that message. Because if it fails, you either have a bug. Or there's a major service outage.
@MilanJovanovicTech Atomicity being the whole theme here, I'm wondering how you would handle the last service in the chain failing in this scenario. Say after the email step there is a second Domain Event Handler for the same domain event and it raises an IntegrationEvent to another service or something. If that were to fail, the transaction wouldn't be atomic, right? Because the email will have sent. I might be missing something simple here but just curious what your thoughts are.
Since the domain event publish would be a fanout in that case (1 domain event, multiple handlers):
- We can either have the first event handler publish another domain event (a bit strange, but we keep it 1 event - 1 handler)
- Or we introduce another concept - event handler idempotency. Now, we record if each event handler processed the event. Which allows us to retry publishing the event without duplicating side effects (in theory). I talked about it in this video: ruclips.net/video/mGeEtokcjVQ/видео.html [even in this case, you get at least once publish :)]
Happy to discuss this further.
Hey, why did you not use unitofwork in background job?
Background job is infrastructure, so there's no need
Hi Milan, a background job does not seem a good idea, as one year passed since you published this video, do you have a better solution?
Why doesn't it sound like a good idea?
Why you did not use hangfire instead of quartz,since you already have a video or simple background service?
I have a new video that uses Hangfire
Excellent video as always!
It's possible that when an http request comes in, we want the domain events to complete before returning a response to the client. If we limit the batch size and how often the job runs, the client might be sitting for awhile before their request is complete. Any ideas on how to tackle this or is this a non-issue? I'm thinking about hundreds of users using the app.
The clients request won't wait for the events to complete at all - that's kind of the whole point of using an outbox
Hi, thanks for a video 👍. Is it possible invoke domains even on save change after save change in DB, just wondering about a way without background jobs
Yes - but you are then coupling the execution of your domain events in the same HTTP request as your business transaction.
What if you have long-running event handlers?
Not to mention the problems if any of the event handlers fails and throws an exception.
@@MilanJovanovicTech yes, make sense, why i asked, because we used the approach when we do explicit rise event to rabbit in every handler, wondering how we can reduce this. Thank for answer.
@@vallJune I think you case is different. Sicne RabbitMQ consumer will trigger asynchronously.
@@MilanJovanovicTech can we await it in interceptor after save change, for instance?
@@vallJune You can await as handling the event is asynchronous and publishing the event itself should be fast. Your message broker will probably send an acknowledgment back as soon as it gets the message and your command continues to do whatever it needs to. But with that approach, you create temporal coupling and don't have transactional behavior. If that is something acceptable in your context, publishing an event in every handler is the simplest option.
Awesome vid ! but what if the program fails in the ProcessOutboxMessagesJob background job before _dbcontext.saveChangesAsync() but after we publish the event with mediatr ? (code in minute 15:54, line 42)
Take a look at the Polly video
Hi, Milan. Thanks a lot for sharing your knowledge with us! I have a question for you: what if between calling GetDomainEvents and ClearDomainEvnets in this time somehow one new domain event will be added, how you can be sure that this DomainEvent will ever be processed? Wouldn't be better just remove the DomainEvent from the collection not clear the whole collection instead?
EF Core should be configured as per-request lifetime scope, that is “scoped” (more on that on learn.microsoft.com/en-us/ef/core/dbcontext-configuration/). Also, aggregate root is the one enforcing rules, so for given use case, we expect 0 or more events within a given request, depending on how you choose to utilize domain events for communication in the first place.
There should not be possible to get any new events, as persisting those events will be the last thing to do, together with other tracking changes.
@@stjepankarin thank you for explanation!
This can't happen in practice, Kirill. The code for creating the outbox messages is executed after we call SaveChangesAsync in the command handler. After that point we won't have anything raising domain events, so we will be fine.
@@MilanJovanovicTech thanks a lot for explanation!!!
Hi, Thanks for the great video.
In my personal project, I followed same step however I got "Type is an interface or abstract class and cannot be instantiated" error while deserialization the IDomianEvent.
A little help will be much appreciated.
Seems like a JSON problem? What are you using?
@@MilanJovanovicTech solved, I used Assembly Full Name, instead of Name while saving into the outbox messages and it works.
However, what if domain event needs to store entity Id (identity) while creating new entity. If the domain event was raised with in the entity, the Id will be remain 0. So I raised the event after calling the EF save changes function and then raise the LogActivity domain event.
So, is this good approach?
How MemberRegisteredDomainEventHandler is being called automatically? Am I missing anything? Plz help me.
MediatR
@@MilanJovanovicTech but, that class (MemberRegistetedDomainEventHandler) is *NOT* even inherited from any of MediatR class. Then how come it will be called automatically?
hi milan . how do you Stor Entities in ef database in DDD approch ? for example n-to-n relationShip between entities .please create a content about mapping DDD approch with Ef Core
Yeah, I'm going to record a video about that. In the meantime, Amichai did a great video with that exact topic, you should check it out
Cant we have a backgrownd job without any interval which is triggered by the insertion of event????
Even if the backgrownd job block fir a concurrent queue, or blocking collection? As a singleton service inside of backgrownd job.
Infinitly running and blocking or bad idea
I think it's possible, but I didn't explore what you have to change in the configure.
I think calling RepeatFovere() without WithInterval() should get the job done.
You cannot really have a job triggered by an external event such as the insertion of a record in the database, as in this example. A way to do this is another technology called Change Data Capture (CDC). Take a look at what Debezium does.
Milan, Yours videos are amazing! Can you share this source code?
Send an email
Hello Milan, can you please confirm how we can get generic aggregate root from in context change tracker?
Create a marker interface
@@MilanJovanovicTech Got it, thank you for the response .
@@MilanJovanovicTech I am trying but need help with that interface, would it need to include both ClearDomainEvents() and RaiseDomainEvents() methods declaration in the interface? If yes, then would I need to keep RaiseDomainEvents() method public in AggregateRoot implementation class?
What happens if the SaveChangesAsync method in the ProcessOutboxMessageJob class encounters with an error and changes are not saved? Then another email will be sent in the next job processing. Is there a solution for such a scenario?
Idempotent consumers
Is That why you introduced guid identifier for the event?
How to deploy it on Azure Cloud and under which resource we should deploy it?
Azure App Service? But you'd need to prevent it from going idle.
Instead of saving in database why not push domain events to RabbitMQ?
That's another possibility, but it's an extra infrastructure component
Why you are not using CAP library instead of this?
I'm not familiar with it, and I prefer rolling my own implementation for something this important to my system
Hi Milan, Thanks for this.
Got a question:
Should we clear out all the domain events within the 'Save change Interceptor' or only the one that we select? I see we select only the outBoxMessage events and we delete the rest.
happy to hear your thoughts - thanks
The idea behind clearing the domain events is a "defensive" one, just so that we don't end up creating the same domain event multiple times.
A very good explanation of the pattern. Actually, I'm addicted to this pattern, using in every part of the app. 🙂
I came across one problem implementing that, there is an api which is responsible for creating user, and at that time need to send email to the user with the password on it. so if I am saving that email event on database, it has password on it. how to handle such situations with the pattern. thanks in advance.
How about sending a one-time login link instead of the password?
should we see the refactoring tips, as this insrastructural code is becoming a mess..? Not only in case of this pattern, but in general. e
.g. Startup gets bulky as well
There are ways to go about this. Like extracting the registration into a service or extension method
@@MilanJovanovicTech I was surprised with video about DB configuration, as there were a ways to simplify set up if anything, not just DB. Maybe you know some other tricks)
@@11r3start11 I'm preparing a video on the topic
I am little bit confused.
You did not add OutboxMessage DbSet in the context then how and when that table will be created in Database?
I added via Entity config class
@@MilanJovanovicTech Thanks for your reply
Won't calling cleardomainevents after getdomainevents clear both as list is a reference type? Just curious to know.
Entity returns a new list
@MilanJovanovicTech Was confused as tolist returns shallow copy.
What about Change Data Capture?
It's a different thing entirely
@@MilanJovanovicTech yes, but I meant CDC as a way to get events from db
So how does the outbox pattern fit with something like RabbitMq, do you publish to a queue and once that message is acknowledged you mark it as process but what if the other service failed to process it, would you need an inbox pattern too. My mind is just rushing with ideas, it would be great to hear your take. . .
I would treat them as separate things, since they solve different problems. The Outbox is great for giving you an atomic transaction when you need to trigger behavior related to external services. You could definitely publish to a queue when processing the Outbox, and this would be considered an *integration event*.
So we have:
- Domain Events -> Outbox
- Integration Events -> Queue
Of course, on the consumer side it's up to you to ensure idempotency and "at least once", "at most once" or "exactly once" semantics.
Definitely a topic for a future video!
@@MilanJovanovicTech Thank you, makes sense. Looking forward to future videos.
Thanks you for wonderful and very detailed video. I have bit of struggle in interceptor class. My problem is that this part of code:
var outboxMessages = dbContext.ChangeTracker
.Entries()
.Select(x => x.Entity)
.SelectMany(aggregateRoot =>
in your example has . In my example AggregateRoot is generic and TKey is whatever the primary key type is. Can you help me out on this matter please?
Thank you, and keep doing what you do because is AWESOME!
Create an interface with the methods you need, it'll work :)
@@MilanJovanovicTech Did I understand you correctly? Namely, to create an Interface instead of a TKey? In my case, Tkey or Id is of record type like: public record UserId(Guid UserId). In this case this record should be: public record UserId(Guid UserId) : ISomeInterface. AND than inside interceptor i can have
var outboxMessages = dataContext.ChangeTracker
.Entries()
@@33bgatti33 This is how I solved it.
public interface IAggregateRoot
{
IReadOnlyCollection GetDomainEvents();
void ClearDomainEvents();
}
public abstract class AggregateRoot : Entity, IAggregateRoot
where TId : notnull
{
... removed for brevity
}
Then you can search for the entities which implements the interface and use the methods that it implements
var outboxMessages = _context.ChangeTracker
.Entries()
.Select(x => x.Entity)
4:04 - You say that only aggregates can raise domain events. Why is that? Can't entities do that too? Or maybe my understanding of DDD is poor and I always assume an aggregate to be something that contains other entities. Maybe what you say implies that it's fine to have an aggregate consisting of one entity, it's still about enforcing invariants.
Yes, one entity is also fine and it can raise a domain event.
What I'm trying to avoid here is having an aggregate raise a domain event, and a nested entity also raising a domain event. This is hard to manage and easily grows in complexity.
@@MilanJovanovicTech Thanks for the reply. One more question - when do you actually delete these Outbox messages ?
@@iliyan-kulishev Either you don't, and leave them for archival reasons. Or you can delete it as it's processed.
Why not just use the .net CAP?
Go ahead
I would use a MessageBus: RabbitMQ, Kafka, Azure Service Bus, AWS SQS, etc for such a thing. The wheel is already implemented so why doing it manually? And it solves any of the possible issues when scaling
But it's an external component to the system
outbox provides at least one msg delivery guarantee
Can we get the github repo?
At the moment, I'm sharing the code with my Patreons. You can find something similar on my GItHub though