Brief Outline 00:00:22 Amazon 00:00:57 Problem Requirements 00:02:02 Capacity Estimates 00:03:37 High Level Overview 00:04:23 Product DB 00:06:24 Building a Cart 00:08:10 Building a Cart Continued 00:09:36 Avoiding Contention 00:12:16 Observe Remove Set 00:15:22 Orders Service 00:16:58 Avoiding Contention on Orders 00:19:42 Order Processing With Streams 00:22:43 Optimizing Reads - Popular Items 00:25:56 Optimizing Reads - Search Indices 00:28:22 Search Index Local Indexing 00:30:48 Populating Caches and Indices 00:32:40 Final Diagram - Amazon Thanks, Jordan~
Hi Jordan. Thanks for clarifying my previous question. I have another question here. For the order service, I see that we avoid querying the products DB to know how many items of a product are available and we rely on the data present in Flink only. I have a couple of question with this approach. 1. Products Db is huge, and hence we have partitioned. Can Flink hold the entire data in flink?? 2. Lets say we partition Flink as well to accomodate huge amount of data. What happens if Flink goes down? Will other instance of Flink continue serving requests?? if so, when the 1st Flink node comes back up, it relys on the local state, while there might have been changes to the order count in Db. how do we ensure the data in flink is always consistent with the DB? 3. If we have a solution for 1 and 2. Do we even need a DB?? isnt flink taking care of everything? I am sorry if I was not able to articulate my question clearly.
1) Flink is also partitioned, we have one flink consumer per kafka partition 2) Either another instance of flink, or one of the other partitions. See Flink checkpointing. All of the DB changes are still in kafka, flink can play them back. (I have a video dedicated to flink, multiple in fact) 3) Our DB is still our source of truth for product data, we just want inventory count in flink.
Hi Jordan - In order service - the second Flink which is sharded by product ID --how does it know that it has received all the products of the cart before sending email to the user ?
I hadn't really made that my intention, and in the diagram we're willing to send multiple emails. If we wanted to do this, we'd probably have to split the products like we do here, have the original order id with a number of products and order id in the message, and then send to one final kafka queue + consumer to aggregate on order id
If you split orders now you have to handle new problems like one of the products is unavailable so you have to undo all the other products amount decreases. Of course you would have to undo the order anyway if payment fails for example so it might not be a big issue. Another problem is if you decreased an amount of product for order A then you found out another product is missing so you undo it, meanwhile other orders that could've passed failed. Isn't it simpler just keeping the orders as is not splitting them and using atomic operations to inc/dec? for example in mongodb you have atomic operations and you can decrement the amount you need atomically while providing a filter with amount > 0 for each item, this way if you failed your update operation you know at least one of the items in the order is out of stock therefore the order can not be processed.
It's absolutely simpler - it just becomes a question of whether it scales! If you use atomics for all order operations you risk a lot of contention due to grabbing locks there. It's probably fine IRL, but for the sake of taking everything to the max for these videos, I've chosen to discuss the tradeoffs of having to revert pieces of orders after the fact. Presumably, you wouldn't cancel the entire order, but just the part that was out of stock.
Thanks Jordan for your work! I hope Megan keeps posting you messages :) and meanwhile I have a question about CRDT: what if client one adds 12 products A, client two add 3 products A and client three deletes for example 7. Looks like CRDT are not going to manage such situations?
Thanks! I actually ghosted Megan fox for Corinna kopf. Anyways, it depends whether the person deleting sees all of the tags for product A at the time of deletion. In your case, it sounds like they probably wouldn't, so product A would still be in the cart. That being said, assuming each client writes to the same leader, client 1 can't just add A to the set 12 times, it should already be present.
7:15 I don't think the proposed scenario of overwriting "eggs, ham" with "milk, carrot" would happen given how the shopping cart works. There are only 2 operations when it comes to shopping carts - add or remove. So each request is incremental. I don't think there's a way to replace an entire cart by accident. E.g. when the user is at the "milk" item page, he can click on "add to cart". That adds that single item to the cart. Even if that user currently doesn't have the eggs and ham loaded in his browser UI (someone else added them there unbeknownst to him while he's looking at the milk page), this operation wouldn't cause a replacement, only an addition. Also 12:00 - instead of using leaderless replication + crdt, why not just lock the row (as suggested in previous slide)? Given how rare it is to have more than one person editing the same item in the same cart at the exact same millisecond, I think scaling for it wouldn't be a good idea. In fact, I don't even think locking is needed. If say I have a "milk" item in the cart, and I try to increment its count to 2 while personB tries to remove it at the same millisecond, it doesn't really matter which one is the final outcome, they are equally valid.
1) That first example was meant to be an extremely naive approach to the problem (e.g. representing a cart as a full object in the DB and updating that full object at once based on what the user sees as the full cart value rather than incrementally adding or removing to it which as you alluded to we should be doing). 2) That being said, even naive set additions need to be synchronized via some sort of lock, or else we're prone to read-modify-update race conditions. 3) Agreed locking is better. In reality, I believe Amazon kinda does use a CRDT here because they use something like cassandra for their cart service to be highly available and want to be able to rectify writes from different leaders. I do think row level locking is needed (think about concurrent updates to a set on the same node). The whole "count of an item" thing is only useful for CRDTs.
The usage of tag for solving race condition is not quite clear. I believe what you wanted is to implement optimistic locking - while reading we read the record with version number, and when updating the record we check if the version or the tag no. is still the same. If yes, the update goes through, if not, it doesn't and retry is needed.
Nope, I'm trying to implement convergence via multiple leaders. Using the tag ensures that deletes on one leader do not delete additions that have yet to make it to that leader. See: en.wikipedia.org/wiki/Conflict-free_replicated_data_type#OR-Set_(Observed-Remove_Set)
Hi Jordan! Thank you as always for a great content and nice job done on these videos. 1. Could please explain why is it optimal to shard products db by productID? Let's say a user opens amazon and goes to some category like "Gadgets". We will have to fetch some products to show to a user. We can go to preliminary Elasticseach since it is sharded by product category yet we will still have to eventually fetch them from db and all ids of these product can be spreader between shards. Doesn't it make more sense to shard this DB on product category as well? 2. Either I have a poor eyesight or I don't see to which place orders are being saved after all pipelines which they go through. Do we assume a DB before the first Kafka for full orders?
1) Realistically, there are too many search index terms (which can overlap in all types of ways), and the search index itself likely contains ids anyways. Additionally, you can add some additional metadata to the search index per listing just for the thumbnail page, so that you only have to fetch the full db row when you actually click a listing. 2) You can just sink them to any arbitrary "orders db".
Can we implement a product clustering approach using Natural Language Processing (NLP) techniques to group similar types of products together on the same node, thereby optimizing search query performance?
Hi Jordan, thank you for another great video. Can you help me to understand the slide "Order processing with streams" at 22:33. Each Flink server reponsible for subset of product, so each order will be processed by multiple Flink instances. How do we gather the information that all parts (products) of the particular order were processed? As I understand at the moment all products of the order were processed, we need to perform some action, for example update order status. Thank you
So in this part of the problem I am assuming that you can fill partial orders. Otherwise, I'd think you'd want to use a second set of aggregation queues sharded on order id as opposed to product ids, have one initial message with the number of items in the order, and then wait for messages coming in from each queue before sending out an email to the user. I use a similar pattern to this for uploading in the youtube systems design video with each chunk file if you want to check that out.
Hi Jordan Thanks for the video. Generally when we click on buy now, we go for payments. You mentioned that we will take the order, put it in kafka, split the order in flink, put the orderItems in different kaka queues again and finally the flink node will check inventory and responds via EMAIL. --> This is an async process isn't it ? This doesn't happen in general case correct ?
The screen itself may show that your order is confirmed, but sometimes Amazon will go back on your order. Perhaps instead you'll receive emails if your order couldn't be fulfilled, rather than if it could. Either way, you could do an additional aggregation step with more Kafka and flink queues.
Thanks for the great video as always. I am a bit lost on the cart part, why CRDT cannot solve the add / delete for the same item already (we just need a counter on it?) compared with the tag way? Also on the observe remove set page, not sure why the second person read from the DB1 to know a232 existing but write / read later on DB2 (we said same user should read and write from the same DB, anything I am missing?) Thanks in advance!
1) Using tagging preserves causality, in the sense that I can be sure the person deleting tag a232 is actually seeing the state from the person that added a232 as opposed to just hitting the delete button ten separate times. 2) The whole point of CRDTs is that I can read/write from/to any of the leaders. Depending on implementation details, I suppose a nice optimization would be to just have each user go to the same instance, but for CRDTs in general this may not be the case.
Amazing content as always. I really appreciated your videos! QQ about the ordering processing design: why do we need an order splitter that dumps orders into kafka queues sharded by productId? can't we just shard the initial order queue by productId? If the initial queue is already shared by productID, doesn't the flink instance that processes the order will already to setup to process on a product basis?
Yep! I think my reasoning is to ensure atomicity of orders into our system without needing a two phase commit. Otherwise we have to ensure that all writes to Kafka go through, as there could be many (one per relevant partition). With flink/Kafka once a message is in the system it will be processed.
Please clarify my doubts So, your Kafka solution to avoid contention related to add/removal of product concurrently to a cart, In order to ensure correctness kafka has to be partitioned based on hash of cart_id and also ensure exactly once processing. If we allow atleast once processing then the cart service have to be idempotent in nature. Suppose we design that idempotent cart service then we have to keep transaction_id and the schema will look something like this CartTable(cart_id, transaction_id, product_id, user_updation_id, timestamp) This structure of table doesn't quickly tell whether a product is in the cart or not and if it is in the cart how many quantity of that product is in it? we have to write some complex query like this to get the quantity and whether it is in the cart or not SELECT cart_id, product_id, SUM(CASE WHEN isRemoved = false THEN 1 ELSE 0 END) AS quantity FROM Cart_table GROUP BY cart_id, product_id; May be I can use 2 index one on (cart_id, transaction_id) for idempotency and another one (cart_id, product_id) for query part OR May be I can construct another table Cart(cart_id, product_id, quantity) from this CartTable(cart_id, transaction_id, product_id, user_updation_id, timestamp) table by using stream processing and tumbling window thing for speeding the query part. Ofcourse you propose a better design but I want to know If you went with this approach how did you solve?
Realistically, I think you're overthinking this one a bit. For a given user, we'll have like max 20 items in the cart. Just index on cart ID and product ID, and run the exact query that you mentioned, you can use this for idempotency too. It'll be fast enough due to the index
Hi @jordan do you have a video on how to configure a spark cluster . Things like partition size , number of cores/partitions( same I blv), number of executors , number of cores per executors , executor memory etc
For the order handling part, if I have multiple items in a single order, how do the flink nodes coordinate with each other to figure out if the whole order can be fulfilled? What happen if one of the items are out of stock? Are we sending out email once per item in the order?
Unfortunately, we give up atomicity here. The only way to do this would actually be to have a second queue that takes all of the events, once again grouped by order id, and combining them (almost like my design for youtube videos). In my current design though, I'm okay with partial orders.
I'm fairly concerned with the rollback idea in general, but you could try it out. I prefer change data capture to either two phase commit or compensating transactions where possible.
Hi Jordan How do we handle quantities while implementing CRDT for cart service ? Since the delete or remove 1 quantity would be same operation but that doesn’t mean that product is removed from cart. That product could have multiple quantities …
When I remove my product from the cart with the CRDT approach, I will create a remove operation for all of the tags on a given product that I currently see. If there happen to be other tags that I don't currently see, then those additions of the product will not be removed.
@@jordanhasnolife5163 hey jordan do you mean, multiple quantities of same item can have same tag when you add to crdt? When you want to remove you remove all the instances of the add operation with that tag? In that case, how would our cart DB look like? would it have different ids for each add operation but same tag?
@@martinwindsor4424 Each item that we add (every individual 1 lot quantity) can have a tag, or you can make it so that as an optimization when you add say 5 at a time the CRDT says ("eggs", 5, "kshfskjfdh"). Either will work. When someone removes, they read from their database all mentions of that product from their cart, and remove them all, including the associated tag.
Hey Jordan, won't ensuring Quorum reads and writes help in our cart service case? Is it possible to get quorum reads with R+W>N scenario from Riak? Any downsides in this? 2nd question, isn't the inventory data stream flink a spof here? Also since it's all in-memeory, if it goes down how do we plan to get all the inventory data back in memory of the new flink consumer? (I am thinking it'll have to construct the in-memeory data structure by making calls to inventory db and orders db, but that'd be very costly. Another option would be to have a background thread snapshot the data to disk, but that'd miss out on some data - if the consumer processes some order and before the snapshot it crashes?) 3rd qsn I see the flink writing something to products DB, what data is being written? Since this flink node is just capturing inv info.
1) I'm not sure what you mean "help". I assume you mean not having any conflicts. That would help, but now you need a quorum, which means your latency is limited by the slowest responder. You can also still have version conflicts due to things like a sloppy quorum. 2) You can always have multiple flink instances listening to the same kafka partition. Anyways, you should read up on flink a bit - it periodically checkpoints state to S3 so that if it goes down we can easily restore its state from a certain offset in kafka and then resume from there. 3) A boolean of whether the product is out of stock or not.
Hi Jordan. I have one question regarding the cart using multi leader setup. I do agree this helps solve contention and eventually we see the correct items in the cart. However, isn't cart supposed to be strongly consistent?. Because, suppose user A adds couple of items on one node1 and user B adds some other item on Node2. What happens when user A clicks checkout even before the 2 nodes exchange data and see the final state of cart???
I was mostly just using this as an example to stretch the boundaries here. It doesn't *have* to be strongly consistent, but I generally agree that it should. With that being said, I think using a single leader to perform writes is totally fine.
Hi @jordan. Awesome video. One question on using leaderless replication for cart service. Will the CRDT add read latency to the cart service since it is doing some aggregation stuff here. What is CRDT here anyway i mean is it just a method of aggregation Or a predefined solution for such problems . how much added latency this would be ? I understand your video is assuming we are pushing limits on the number of concurrent events on the cart but i think think might not be what an interviewer would agree with and can lead to noise while giving the interview. what do u think ?
I've been going through designing data intensive applications and using it to fill in the holes in the notes I took from your videos. It follows the same content in the same order 😅
Hi Jordan while implementing can we not just use an epoch timestamp for events and then decide the eventual cart picture ? Thanks for all ur videos, they are super cool 😊
Interesting. To me this seems like a combo of the job scheduler (for running the code in the cloud) as well as the leaderboard problem, which I'm doing next. Do you think that there are unique aspects to the problem outside of those?
@@jordanhasnolife5163 Thank you for doing the leaderboard! Other unique aspects could be plagiarism detection and handling time limits fairly. That time limit issue might get complex with different languages and hardware, but IDK, the ICPC has the same execution time limits no matter which language you use to solve their problems. The reason I'm asking for a video on this problem, is because Meta is currently asking it:)
For the cart service, since it’s not really a critical service, can we merge all the items in all the leaders and return that to the user when there’s a conflict and allow the user to resolve the conflict. I think Dynamo does this
Yes you could store siblings and have the user merge them if we couldn't do the merging logic in the database. Keep in mind that this is still eventually consistent though, there will be a period where each leader is only aware of its own write and not the impending merge conflict.
for popular items read part, can we rely on LFU caching strategies for productid key everywhere where reads are happening, like product features display, similar items related to a product display etc
@@jordanhasnolife5163 My thought process was that, lets say based on some capacity estimations, we can store info in cache for 10k popular items only, so limit the cache with ~10k keys and any key which is more frequent (i.e. more popular) will stay and less frequent will be dropped. I get that with this we will not have the popularity score which is being used at later point during search index.
hold your horses Bezos, this will x10 $AMZN ! jk. I liked your approach to use kafka to ensure atomicity and scale the order service. I have a similar idea, tho more heuristic and complex. The idea builds on top of write back cache, we can have the inventory of very popular items that are running low on stock (tbd) on Redis, and when an order comes comes in the count is first decremented from there, and then (eventually) propagated to the database. My only worry here (apart from the complexity of this solution, eg need to know popular items in advance and decide when to put them into cache) is what is if Redis goes down? I guess we can have a passive replica replicating everything or a WAL. Argh, anyway, I am sure there are many flaws in my design, really hard to wrapped me head around thru all corned cases in transactions
At the end of the day, unless you have strong consistency, nothing can ever be perfect :) In reality, my solution is pretty convoluted, and I think that yours probably is more practical. You can figure out from experience which items are popular, or just look at the ones that had the most orders the previous day or something and cache those.
Where the F did Elastic search come from? I didn't hear it mentioned until the last slide, also, what happened to the spark cosumer for counting orders/clicks in the last slide?
I talk about inverted elastic search pretty frequently, so I feel pretty justified in saying that at this point not spending 10 minutes a video talking about it is a good thing. That being said, I've brought it up in a variety of these other videos!
Its very clear how deeply you've read the DDIA book and its references. Can see all its concepts being applied with great detail in your videos. How long did it take you to go through all of that in a way you could digest the material? Great delivery style :) . Rooting for your content to emerge as the de-facto source for system design knowledge!
Hey! Thank you, that's what I'm hoping to go for! I'd say it took me two full passthroughs of DDIA + taking notes + googling what I didn't understand further to really feel like I had an okay grasp of it. Even then, trying to apply those principles in my work and in any designs that I make for this channel have made me feel like I've gotten to wrap my head around some of these principles even more!
Hi Jordan, thank you for your videos I want to ask one thing, you don't need kafka queue to ensure atomicity, as afterwards you are checking for each product seperately, so atomicity is not ensured, right?
I care about the atomicity of writes. Not sure what you mean about the part of checking each product separately. I just want to make sure that when I place an order, the stocks of all of the items are decreased atomically.
Flink is good for performing aggregation, and we don't have to worry about fault tolerance very much given the way that it checkpoints state. We can use a normal server and have it listen to kafka, but if it goes down the state that we've kept in memory gets lost forever.
Thanks for not having a life and answering so quickly. I also watched some other videos and it's interesting because Flink seems different to most of these infrastructure projects since it's concerned more with processing the data than storing it. That's what was confusing me.
Hi Jordan what is your recommendation to synchronize the product queues with the Inventory changes queue. What I mean is that how should we deal with the situation of lag on the inventory changes queue and we dont want orders to get rejected bcs of that lag. How much do u think should be our waiting period if add hold and is it ok for customer to get emails x minutes later or what is ur opinion on this ?
Not entirely sure what you mean here, feel free to clarify. The second a customer order comes into flink, we check whether we have inventory for it. If we don't we'll reject them right away. In reality, kafka should not have a significant lag period, but if for whatever reason it went down that's why we have replicas! The bigger thing is to just make sure to update the product db when we see that we don't have stock left so that people can stop placing orders.
Hey Jordan, I was wondering what your process was for coming up with these videos / where do you get your knowledge. Are you reading engineering blogs / textbooks / papers / farting around on wikipedia? It seems like you've built up an interesting set of knowledge I haven't seen in one place elsewhere.
Hey! Yeah, just read about concepts as I come across them! I started out by reading DDIA, felt that it was great but didn't get a sense of which actual products used which pieces of ddia, started going from there, and then as I continue to make videos try and fix any holes in my knowledge. When you come across a term you don't understand, which will happen often, spend 5 mins reading about it! For what it's worth, I admittedly only have a pretty high level understanding of all of this stuff. I don't have any advanced degrees, and if you deeply questioned me on many of these topics, I wouldn't be able to answer you. That being said, I think that I know enough for the purposes of having a youtube channel that is focused on being a generalist software engineer.
I spent the day mainlining your earlier videos and rereading chapters of DDIA, which I had forgotten. It made the newer videos make more sense. I almost wonder since the first 18 videos come before you do any Database comparisons, if they could make a sort of fundamentals playlist.
What's the difference between the Spark and the Flink processing? are they going to be 2 different consumers (flink and spark) or is the spark replacing the flink in the inventory quantity processing?
Hey yeah, in the final diagram I'm using fully flink, however generally speaking flink and spark streaming are basically the same, except spark streaming processes things in minibatches
@@jordanhasnolife5163 thank you for clarifying, I just didn't know where did the spark go. Do you see any advantages in terms of infra or maintenance between spark and flink?
I tend to not touch on payment systems too much as I suppose that's a video of it's own, but I'd imagine you'd just use something like stripe and reach out to a bunch of third party apis to handle this for you.
Curious at 19:19 how to use two phase commit to ensure idempotent on email notification for order confirmation? shouldn't we keep the idempotent key somewhere here? or use two phase commit feels ok for me (we don't need to replay the email if we already use two phase commit right)?
Yep that's the idea! Partial failures can happen, but in theory if you use two phase commit you're only acknowledging the kafka message if you send the email and vice versa.
Brief Outline
00:00:22 Amazon
00:00:57 Problem Requirements
00:02:02 Capacity Estimates
00:03:37 High Level Overview
00:04:23 Product DB
00:06:24 Building a Cart
00:08:10 Building a Cart Continued
00:09:36 Avoiding Contention
00:12:16 Observe Remove Set
00:15:22 Orders Service
00:16:58 Avoiding Contention on Orders
00:19:42 Order Processing With Streams
00:22:43 Optimizing Reads - Popular Items
00:25:56 Optimizing Reads - Search Indices
00:28:22 Search Index Local Indexing
00:30:48 Populating Caches and Indices
00:32:40 Final Diagram - Amazon
Thanks, Jordan~
Thanks for continuous delivery of great content!
Thanks for the comment! I only want to do it insofar as it is useful to people :)
Hi Jordan. Thanks for clarifying my previous question. I have another question here. For the order service, I see that we avoid querying the products DB to know how many items of a product are available and we rely on the data present in Flink only. I have a couple of question with this approach.
1. Products Db is huge, and hence we have partitioned. Can Flink hold the entire data in flink??
2. Lets say we partition Flink as well to accomodate huge amount of data. What happens if Flink goes down? Will other instance of Flink continue serving requests?? if so, when the 1st Flink node comes back up, it relys on the local state, while there might have been changes to the order count in Db. how do we ensure the data in flink is always consistent with the DB?
3. If we have a solution for 1 and 2. Do we even need a DB?? isnt flink taking care of everything?
I am sorry if I was not able to articulate my question clearly.
1) Flink is also partitioned, we have one flink consumer per kafka partition
2) Either another instance of flink, or one of the other partitions. See Flink checkpointing. All of the DB changes are still in kafka, flink can play them back. (I have a video dedicated to flink, multiple in fact)
3) Our DB is still our source of truth for product data, we just want inventory count in flink.
Hi Jordan - In order service - the second Flink which is sharded by product ID --how does it know that it has received all the products of the cart before sending email to the user ?
I hadn't really made that my intention, and in the diagram we're willing to send multiple emails. If we wanted to do this, we'd probably have to split the products like we do here, have the original order id with a number of products and order id in the message, and then send to one final kafka queue + consumer to aggregate on order id
If you split orders now you have to handle new problems like one of the products is unavailable so you have to undo all the other products amount decreases. Of course you would have to undo the order anyway if payment fails for example so it might not be a big issue.
Another problem is if you decreased an amount of product for order A then you found out another product is missing so you undo it, meanwhile other orders that could've passed failed.
Isn't it simpler just keeping the orders as is not splitting them and using atomic operations to inc/dec?
for example in mongodb you have atomic operations and you can decrement the amount you need atomically while providing a filter with amount > 0 for each item, this way if you failed your update operation you know at least one of the items in the order is out of stock therefore the order can not be processed.
It's absolutely simpler - it just becomes a question of whether it scales! If you use atomics for all order operations you risk a lot of contention due to grabbing locks there. It's probably fine IRL, but for the sake of taking everything to the max for these videos, I've chosen to discuss the tradeoffs of having to revert pieces of orders after the fact. Presumably, you wouldn't cancel the entire order, but just the part that was out of stock.
Thanks Jordan for your work! I hope Megan keeps posting you messages :) and meanwhile I have a question about CRDT:
what if client one adds 12 products A, client two add 3 products A and client three deletes for example 7. Looks like CRDT are not going to manage such situations?
Thanks! I actually ghosted Megan fox for Corinna kopf.
Anyways, it depends whether the person deleting sees all of the tags for product A at the time of deletion. In your case, it sounds like they probably wouldn't, so product A would still be in the cart.
That being said, assuming each client writes to the same leader, client 1 can't just add A to the set 12 times, it should already be present.
7:15 I don't think the proposed scenario of overwriting "eggs, ham" with "milk, carrot" would happen given how the shopping cart works. There are only 2 operations when it comes to shopping carts - add or remove. So each request is incremental. I don't think there's a way to replace an entire cart by accident.
E.g. when the user is at the "milk" item page, he can click on "add to cart". That adds that single item to the cart. Even if that user currently doesn't have the eggs and ham loaded in his browser UI (someone else added them there unbeknownst to him while he's looking at the milk page), this operation wouldn't cause a replacement, only an addition.
Also 12:00 - instead of using leaderless replication + crdt, why not just lock the row (as suggested in previous slide)? Given how rare it is to have more than one person editing the same item in the same cart at the exact same millisecond, I think scaling for it wouldn't be a good idea.
In fact, I don't even think locking is needed. If say I have a "milk" item in the cart, and I try to increment its count to 2 while personB tries to remove it at the same millisecond, it doesn't really matter which one is the final outcome, they are equally valid.
1) That first example was meant to be an extremely naive approach to the problem (e.g. representing a cart as a full object in the DB and updating that full object at once based on what the user sees as the full cart value rather than incrementally adding or removing to it which as you alluded to we should be doing).
2) That being said, even naive set additions need to be synchronized via some sort of lock, or else we're prone to read-modify-update race conditions.
3) Agreed locking is better. In reality, I believe Amazon kinda does use a CRDT here because they use something like cassandra for their cart service to be highly available and want to be able to rectify writes from different leaders.
I do think row level locking is needed (think about concurrent updates to a set on the same node). The whole "count of an item" thing is only useful for CRDTs.
The usage of tag for solving race condition is not quite clear. I believe what you wanted is to implement optimistic locking - while reading we read the record with version number, and when updating the record we check if the version or the tag no. is still the same. If yes, the update goes through, if not, it doesn't and retry is needed.
Nope, I'm trying to implement convergence via multiple leaders. Using the tag ensures that deletes on one leader do not delete additions that have yet to make it to that leader.
See: en.wikipedia.org/wiki/Conflict-free_replicated_data_type#OR-Set_(Observed-Remove_Set)
Hi Jordan!
Thank you as always for a great content and nice job done on these videos.
1. Could please explain why is it optimal to shard products db by productID?
Let's say a user opens amazon and goes to some category like "Gadgets". We will have to fetch some products to show to a user. We can go to preliminary Elasticseach since it is sharded by product category yet we will still have to eventually fetch them from db and all ids of these product can be spreader between shards. Doesn't it make more sense to shard this DB on product category as well?
2. Either I have a poor eyesight or I don't see to which place orders are being saved after all pipelines which they go through. Do we assume a DB before the first Kafka for full orders?
1) Realistically, there are too many search index terms (which can overlap in all types of ways), and the search index itself likely contains ids anyways. Additionally, you can add some additional metadata to the search index per listing just for the thumbnail page, so that you only have to fetch the full db row when you actually click a listing.
2) You can just sink them to any arbitrary "orders db".
I’m nodding like I understand, but I’m not so sure I do…
I'd recommend asking a specific question in that case
Can we implement a product clustering approach using Natural Language Processing (NLP) techniques to group similar types of products together on the same node, thereby optimizing search query performance?
Yep!
Hi Jordan, thank you for another great video. Can you help me to understand the slide "Order processing with streams" at 22:33. Each Flink server reponsible for subset of product, so each order will be processed by multiple Flink instances. How do we gather the information that all parts (products) of the particular order were processed? As I understand at the moment all products of the order were processed, we need to perform some action, for example update order status. Thank you
So in this part of the problem I am assuming that you can fill partial orders. Otherwise, I'd think you'd want to use a second set of aggregation queues sharded on order id as opposed to product ids, have one initial message with the number of items in the order, and then wait for messages coming in from each queue before sending out an email to the user.
I use a similar pattern to this for uploading in the youtube systems design video with each chunk file if you want to check that out.
@@jordanhasnolife5163 thank you Jordan, I didn’t know Amazon treat every product as separate order at the end.
@@andreybraslavskiy522 They may not, it's just a video. That being said, you could use the approach I described if you want to do an aggregation.
Hi Jordan
Thanks for the video.
Generally when we click on buy now, we go for payments.
You mentioned that we will take the order, put it in kafka, split the order in flink, put the orderItems in different kaka queues again and finally the flink node will check inventory and responds via EMAIL. --> This is an async process isn't it ? This doesn't happen in general case correct ?
The screen itself may show that your order is confirmed, but sometimes Amazon will go back on your order. Perhaps instead you'll receive emails if your order couldn't be fulfilled, rather than if it could. Either way, you could do an additional aggregation step with more Kafka and flink queues.
Thanks for the great video as always. I am a bit lost on the cart part, why CRDT cannot solve the add / delete for the same item already (we just need a counter on it?) compared with the tag way? Also on the observe remove set page, not sure why the second person read from the DB1 to know a232 existing but write / read later on DB2 (we said same user should read and write from the same DB, anything I am missing?) Thanks in advance!
1) Using tagging preserves causality, in the sense that I can be sure the person deleting tag a232 is actually seeing the state from the person that added a232 as opposed to just hitting the delete button ten separate times.
2) The whole point of CRDTs is that I can read/write from/to any of the leaders. Depending on implementation details, I suppose a nice optimization would be to just have each user go to the same instance, but for CRDTs in general this may not be the case.
Amazing content as always. I really appreciated your videos! QQ about the ordering processing design:
why do we need an order splitter that dumps orders into kafka queues sharded by productId? can't we just shard the initial order queue by productId? If the initial queue is already shared by productID, doesn't the flink instance that processes the order will already to setup to process on a product basis?
Yep! I think my reasoning is to ensure atomicity of orders into our system without needing a two phase commit. Otherwise we have to ensure that all writes to Kafka go through, as there could be many (one per relevant partition). With flink/Kafka once a message is in the system it will be processed.
Please clarify my doubts
So, your Kafka solution to avoid contention related to add/removal of product concurrently to a cart, In order to ensure correctness kafka has to be partitioned based on hash of cart_id and also ensure exactly once processing. If we allow atleast once processing then the cart service have to be idempotent in nature. Suppose we design that idempotent cart service then we have to keep transaction_id and the schema will look something like this
CartTable(cart_id, transaction_id, product_id, user_updation_id, timestamp)
This structure of table doesn't quickly tell whether a product is in the cart or not and if it is in the cart how many quantity of that product is in it?
we have to write some complex query like this to get the quantity and whether it is in the cart or not
SELECT cart_id, product_id, SUM(CASE WHEN isRemoved = false THEN 1 ELSE 0 END) AS quantity
FROM Cart_table
GROUP BY cart_id, product_id;
May be I can use 2 index one on (cart_id, transaction_id) for idempotency and another one (cart_id, product_id) for query part
OR
May be I can construct another table Cart(cart_id, product_id, quantity) from this CartTable(cart_id, transaction_id, product_id, user_updation_id, timestamp) table by using stream processing and tumbling window thing for speeding the query part.
Ofcourse you propose a better design but I want to know If you went with this approach how did you solve?
Realistically, I think you're overthinking this one a bit.
For a given user, we'll have like max 20 items in the cart.
Just index on cart ID and product ID, and run the exact query that you mentioned, you can use this for idempotency too. It'll be fast enough due to the index
Hi @jordan do you have a video on how to configure a spark cluster . Things like partition size , number of cores/partitions( same I blv), number of executors , number of cores per executors , executor memory etc
I do not - I don't think this is overly relevant for the systems design interview though
For the order handling part, if I have multiple items in a single order, how do the flink nodes coordinate with each other to figure out if the whole order can be fulfilled? What happen if one of the items are out of stock? Are we sending out email once per item in the order?
Unfortunately, we give up atomicity here. The only way to do this would actually be to have a second queue that takes all of the events, once again grouped by order id, and combining them (almost like my design for youtube videos). In my current design though, I'm okay with partial orders.
Why we did not use saga pattern? though we did a talk about two phase commit. Any particular disadvantage you see?
I'm fairly concerned with the rollback idea in general, but you could try it out. I prefer change data capture to either two phase commit or compensating transactions where possible.
Hi Jordan
How do we handle quantities while implementing CRDT for cart service ?
Since the delete or remove 1 quantity would be same operation but that doesn’t mean that product is removed from cart. That product could have multiple quantities …
When I remove my product from the cart with the CRDT approach, I will create a remove operation for all of the tags on a given product that I currently see. If there happen to be other tags that I don't currently see, then those additions of the product will not be removed.
@@jordanhasnolife5163 hey jordan do you mean, multiple quantities of same item can have same tag when you add to crdt? When you want to remove you remove all the instances of the add operation with that tag? In that case, how would our cart DB look like? would it have different ids for each add operation but same tag?
@@martinwindsor4424 Each item that we add (every individual 1 lot quantity) can have a tag, or you can make it so that as an optimization when you add say 5 at a time the CRDT says ("eggs", 5, "kshfskjfdh"). Either will work.
When someone removes, they read from their database all mentions of that product from their cart, and remove them all, including the associated tag.
Hey Jordan, won't ensuring Quorum reads and writes help in our cart service case?
Is it possible to get quorum reads with R+W>N scenario from Riak? Any downsides in this?
2nd question, isn't the inventory data stream flink a spof here? Also since it's all in-memeory, if it goes down how do we plan to get all the inventory data back in memory of the new flink consumer? (I am thinking it'll have to construct the in-memeory data structure by making calls to inventory db and orders db, but that'd be very costly. Another option would be to have a background thread snapshot the data to disk, but that'd miss out on some data - if the consumer processes some order and before the snapshot it crashes?)
3rd qsn
I see the flink writing something to products DB, what data is being written? Since this flink node is just capturing inv info.
1) I'm not sure what you mean "help". I assume you mean not having any conflicts. That would help, but now you need a quorum, which means your latency is limited by the slowest responder. You can also still have version conflicts due to things like a sloppy quorum.
2) You can always have multiple flink instances listening to the same kafka partition. Anyways, you should read up on flink a bit - it periodically checkpoints state to S3 so that if it goes down we can easily restore its state from a certain offset in kafka and then resume from there.
3) A boolean of whether the product is out of stock or not.
How does the order service ensure that the cart has eventually converged before submitting it to the Kafka queue?
I guess the answer is that we don't. Whatever is on your local replica is what's getting sent
Hi Jordan. I have one question regarding the cart using multi leader setup. I do agree this helps solve contention and eventually we see the correct items in the cart. However, isn't cart supposed to be strongly consistent?. Because, suppose user A adds couple of items on one node1 and user B adds some other item on Node2. What happens when user A clicks checkout even before the 2 nodes exchange data and see the final state of cart???
I was mostly just using this as an example to stretch the boundaries here. It doesn't *have* to be strongly consistent, but I generally agree that it should. With that being said, I think using a single leader to perform writes is totally fine.
Hi @jordan. Awesome video. One question on using leaderless replication for cart service. Will the CRDT add read latency to the cart service since it is doing some aggregation stuff here. What is CRDT here anyway i mean is it just a method of aggregation Or a predefined solution for such problems . how much added latency this would be ? I understand your video is assuming we are pushing limits on the number of concurrent events on the cart but i think think might not be what an interviewer would agree with and can lead to noise while giving the interview. what do u think ?
Nope! Reading from a CRDT is just from one node. The anti entropy process between the multiple leaders is when the merging gets done.
I've been going through designing data intensive applications and using it to fill in the holes in the notes I took from your videos. It follows the same content in the same order 😅
Oh yeah I shamelessly ripped it off for the first 20 concepts videos no doubt about it
Thank you, very nice and clear explanation, I am just wondering which data store are we going to use to store the orders?
Seems pretty reasonable to me to just use SQL
Hi Jordan while implementing can we not just use an epoch timestamp for events and then decide the eventual cart picture ? Thanks for all ur videos, they are super cool 😊
Yep! Keep in mind that epoch timestamps across distributed nodes are not perfect though due to clock drift.
Thanks for the video. Could you do a video on Leetcode? WIth focus on the online contest and its leaderboard features.
Interesting. To me this seems like a combo of the job scheduler (for running the code in the cloud) as well as the leaderboard problem, which I'm doing next. Do you think that there are unique aspects to the problem outside of those?
@@jordanhasnolife5163 Thank you for doing the leaderboard! Other unique aspects could be plagiarism detection and handling time limits fairly. That time limit issue might get complex with different languages and hardware, but IDK, the ICPC has the same execution time limits no matter which language you use to solve their problems.
The reason I'm asking for a video on this problem, is because Meta is currently asking it:)
For the cart service, since it’s not really a critical service, can we merge all the items in all the leaders and return that to the user when there’s a conflict and allow the user to resolve the conflict. I think Dynamo does this
cart service is not critical service how ?
@@tanaygupta632 it’s not so critical in the sense that having a conflict there will be the end of the world
Yes you could store siblings and have the user merge them if we couldn't do the merging logic in the database. Keep in mind that this is still eventually consistent though, there will be a period where each leader is only aware of its own write and not the impending merge conflict.
for popular items read part, can we rely on LFU caching strategies for productid key everywhere where reads are happening, like product features display, similar items related to a product display etc
I don't see why not - any particular reason to opt for LFU over LRU?
@@jordanhasnolife5163 My thought process was that, lets say based on some capacity estimations, we can store info in cache for 10k popular items only, so limit the cache with ~10k keys and any key which is more frequent (i.e. more popular) will stay and less frequent will be dropped.
I get that with this we will not have the popularity score which is being used at later point during search index.
Hi Jordan,
Thanks for the video! do you mind sharing your notes please? if that's feasible?
Hey! Will do this eventually, though probably in batch in a couple of months once this current series comes to a conclusion
hold your horses Bezos, this will x10 $AMZN ! jk. I liked your approach to use kafka to ensure atomicity and scale the order service. I have a similar idea, tho more heuristic and complex. The idea builds on top of write back cache, we can have the inventory of very popular items that are running low on stock (tbd) on Redis, and when an order comes comes in the count is first decremented from there, and then (eventually) propagated to the database. My only worry here (apart from the complexity of this solution, eg need to know popular items in advance and decide when to put them into cache) is what is if Redis goes down? I guess we can have a passive replica replicating everything or a WAL. Argh, anyway, I am sure there are many flaws in my design, really hard to wrapped me head around thru all corned cases in transactions
At the end of the day, unless you have strong consistency, nothing can ever be perfect :)
In reality, my solution is pretty convoluted, and I think that yours probably is more practical. You can figure out from experience which items are popular, or just look at the ones that had the most orders the previous day or something and cache those.
Where the F did Elastic search come from? I didn't hear it mentioned until the last slide, also, what happened to the spark cosumer for counting orders/clicks in the last slide?
I talk about inverted elastic search pretty frequently, so I feel pretty justified in saying that at this point not spending 10 minutes a video talking about it is a good thing. That being said, I've brought it up in a variety of these other videos!
Its very clear how deeply you've read the DDIA book and its references. Can see all its concepts being applied with great detail in your videos. How long did it take you to go through all of that in a way you could digest the material?
Great delivery style :) . Rooting for your content to emerge as the de-facto source for system design knowledge!
Hey! Thank you, that's what I'm hoping to go for!
I'd say it took me two full passthroughs of DDIA + taking notes + googling what I didn't understand further to really feel like I had an okay grasp of it. Even then, trying to apply those principles in my work and in any designs that I make for this channel have made me feel like I've gotten to wrap my head around some of these principles even more!
Hi Jordan, thank you for your videos
I want to ask one thing, you don't need kafka queue to ensure atomicity, as afterwards you are checking for each product seperately, so atomicity is not ensured, right?
I care about the atomicity of writes. Not sure what you mean about the part of checking each product separately. I just want to make sure that when I place an order, the stocks of all of the items are decreased atomically.
What does Flink actually do here? Why aren't the downstream things just connected to the Kafka topics?
Flink is good for performing aggregation, and we don't have to worry about fault tolerance very much given the way that it checkpoints state. We can use a normal server and have it listen to kafka, but if it goes down the state that we've kept in memory gets lost forever.
Thanks for not having a life and answering so quickly. I also watched some other videos and it's interesting because Flink seems different to most of these infrastructure projects since it's concerned more with processing the data than storing it. That's what was confusing me.
Hey Jordan, could you also do a video on Airbnb, please. Your videos are amazing.
How do you find that Airbnb differs from Yelp in the core design
I haven't yet seen the Yelp's video. If it is more or less the same then a video on Airbnb won't be needed. Thank you!
Hi Jordan what is your recommendation to synchronize the product queues with the Inventory changes queue. What I mean is that how should we deal with the situation of lag on the inventory changes queue and we dont want orders to get rejected bcs of that lag. How much do u think should be our waiting period if add hold and is it ok for customer to get emails x minutes later or what is ur opinion on this ?
Not entirely sure what you mean here, feel free to clarify. The second a customer order comes into flink, we check whether we have inventory for it. If we don't we'll reject them right away. In reality, kafka should not have a significant lag period, but if for whatever reason it went down that's why we have replicas! The bigger thing is to just make sure to update the product db when we see that we don't have stock left so that people can stop placing orders.
Hey Jordan, I was wondering what your process was for coming up with these videos / where do you get your knowledge. Are you reading engineering blogs / textbooks / papers / farting around on wikipedia? It seems like you've built up an interesting set of knowledge I haven't seen in one place elsewhere.
Hey!
Yeah, just read about concepts as I come across them! I started out by reading DDIA, felt that it was great but didn't get a sense of which actual products used which pieces of ddia, started going from there, and then as I continue to make videos try and fix any holes in my knowledge. When you come across a term you don't understand, which will happen often, spend 5 mins reading about it!
For what it's worth, I admittedly only have a pretty high level understanding of all of this stuff. I don't have any advanced degrees, and if you deeply questioned me on many of these topics, I wouldn't be able to answer you. That being said, I think that I know enough for the purposes of having a youtube channel that is focused on being a generalist software engineer.
I spent the day mainlining your earlier videos and rereading chapters of DDIA, which I had forgotten. It made the newer videos make more sense. I almost wonder since the first 18 videos come before you do any Database comparisons, if they could make a sort of fundamentals playlist.
@@i_want_youtube_anonymity7099 (they do)
What's the difference between the Spark and the Flink processing? are they going to be 2 different consumers (flink and spark) or is the spark replacing the flink in the inventory quantity processing?
Hey yeah, in the final diagram I'm using fully flink, however generally speaking flink and spark streaming are basically the same, except spark streaming processes things in minibatches
@@jordanhasnolife5163 thank you for clarifying, I just didn't know where did the spark go. Do you see any advantages in terms of infra or maintenance between spark and flink?
@@managerbmr none specifically that I've heard of in my own research!
Can you do a video on figma?
Eventually, sure - I have to do some more research into how it actually works I'd say
+1
Great job as always
Is there a payment in this process? how is that handled in this design?
I tend to not touch on payment systems too much as I suppose that's a video of it's own, but I'd imagine you'd just use something like stripe and reach out to a bunch of third party apis to handle this for you.
Awesome!
Thanks for these videos!
Is the Full Orders Kafka sharded?
I believe by user id if I recall correctly
I refuse to believe that there's no relationship between Kleenex tissues and lotion
I plead the 5th
Why do you want to buy tissues?
For my runny nose of course
You should start a discord one day
Curious at 19:19 how to use two phase commit to ensure idempotent on email notification for order confirmation? shouldn't we keep the idempotent key somewhere here? or use two phase commit feels ok for me (we don't need to replay the email if we already use two phase commit right)?
Yep that's the idea! Partial failures can happen, but in theory if you use two phase commit you're only acknowledging the kafka message if you send the email and vice versa.