Hi Jordan, I wanted to clarify a point regarding HBase's storage model. HBase is actually a column-family-oriented database, rather than a true columnar storage system like those used in analytical databases. This distinction is important because it affects data access patterns and use cases. Both HBase and Cassandra organize data into column families, where operations are optimized around row keys rather than individual columns across rows. Both HBase and Cassandra use column families for data organization, but Cassandra often provides higher write throughput. Considering this, along with your use case and scalability needs, Cassandra may be the better choice for message table.
It's worth noting that Cassandra's consistency guarantees when it is optimized for high write throughput lead to an inconsistent user experience where users might see different state across their devices until consistency is reached. In practice, this makes cassandra a poor choice for a messaging application where trust in the consistency model is important. You want to avoid a situation where some messages are temporarily missing or out of order even for a few minutes
Thank you so much for building this community to discuss advanced, in-practice system design. I discovered that this is not just another channel for sharing interview tips, but a space for in-depth discussions on real-world scenarios. How amazing! Keep going! Thank you!
I'm going to like every video I watch on this channel. It's the least I can do. Jordan, you're amazing. THANK YOU so much. Gonna watch every video in this series multiple times. Hope to become as great as you are someday.
Well, one thing I got after the interviews (and I had this exact question in a recent interview) is that it's not that hard to understand it, and see all the bottlenecks and everything watching a video. But it's freaking hard to come to any near-perfect solution on the interview within 35 minutes. I guess more practice can solve this, though 🙂 Great video, it's cool to see so many important aspects of a system design in such an informative way 🔥
Hi, overall great. I want to point out that we missed some key part here. The way the messages reach to each user is basically assumes that every user is connected thru some websocket connection. That is not the case mostly. In this flow, we have a data flow in which user comes and asks for all his pending messages across all the chat groups he is part of. Now we can do it by establising the connection and putting a status on the chat database of status and marking it "pending" when we receive the message in flink, and once the message gets delivered marking it "delivered". Here if the user is not online, we can put the pending messages in some cache like redis, so when the user does come back online, the websocket connection will pull all the pending messages and delivery it to his device. Also user can be using multiple devices, but that can be out of scope for this design.
Absolutely agreed, missed that part. All messages that should be read by a given user should be placed in some sort of cache for them to read. From there, they can make long polls to the cache or something to that extent.
Great work on this series so far, man ! I don’t have any interview plans but Im just watching to encourage your effort. And it feels fun watching these over other RUclips content. Way to go bro.
Another great video, thanks. Couple of clarification questions: 1. [23:14] If Chats member DB is sharded/indexed by userId, how does CDC data will be converted to kafka partition, which is by chatId? Will there be a conversion Flink node in between or will it be similar to tweeter's follower-followee relation, where both relation db exists? 2. [Generic Question] How does Flink's cache recreated in general, in fail over scenarios? Do we use any snapshotting in something like S3? 3. For chats db, can we use single leader key value DB instead, like DDB, which gives high write/read throughput? What will be the downside, we anyways are using Flink to write to it? Considering we use separate table for metadata? 4. How does chat across data center or globe would work? Will there be any specific constraint there, like latency or anything else? Will zookeeper/load balancer be responsible to connect across data centers? 5. [25:12] Does flink use 2PC to write to both LB and DB? If flink goes down after sending messages to LB, can new flink node will send them one more time to users? Probably, even if we send them again to users, may be Client can handle it. Just want to confirm?
1) yeah probably some intermediate node to do the forwarding 2) yep, through S3 checkpointing 3) you can do this, I think FB opts got Cassandra since they want high availability on DB failures 4) same deal pretty much 5) the chat server can tell flink the ID of the last message it's seen so that flink can backfill it with more messages if a write to the DB succeeds but the send to the chat server fails
Hi Jordan, First of all, thank you for your amazing videos! I have three questions regarding your design choices: 1.) You chose Apache HBase for storing chats. Could you elaborate on how it is read-optimized, considering it follows the same LSM tree structure and occasionally dumps data to disk? How does the column family model contribute to its performance and specific to this problem also? Are we grouping data based on specific columns in chats? 2.) Google Bigtable also offers a column family data model and uses LSM tree indexes. Are the underlying architectures of Google Bigtable and Apache HBase same?
1) I'd imagine each message has a lot of metadata associated with it, which we don't need when fetching it. Maybe column oriented storage will help us out here polling the table. Even though we are using SSTables, since each message is basically immutable, we can pretty much read a message the first time we've seen it, and since the primary key is basically a timestamp I think we'd be able to get some nice benefits from sparse indexing here. Maybe things degrade a bit if you can edit messages. This is a sort of convoluted explanation on my part, and IIRC facebook actually uses cassandra here for better availability in the face of node failures. 2) Yeah HBase is an open source version of BigTable.
Question around the ZooKeeper + Consistent Hashing Ring + Load Balancing setup (17:35) I'm having trouble understanding how these components are connected and what aspects we would actually have to "stand up" if we were implementing this design. My basline understanding is the following - ZooKeeper is used as a coordination service and it would hold the nodes in the cluster - Consistent hashing is used to distribute data across nodes - Load Balancer distribute incoming requests to available nodes (round-robin, etc.) What is ZooKeeper storing and what is the "hashing function" that it stores? Is Consistent Hashing used within the Load Balancer or is it separate? How are these components connected and what is the flow of interaction?
I'd say zookeeper probably isn't storing the hash function, but rather there's an external component that, upon registering a new chat server, the service component accounts for this and updates the binding in zookeeper as to the range of IDs that each chat server should be responsible for. That external service can look at the existing bindings and perform consistent hashing to figure out the range of IDs that a new server should handle.
This is a great video! Thank you, I learn a lot from your videos. I can think of an interesting scenario with using appserver timestamps for the sort key- it's possible for an older message to have a larger timestamp than a newer message in a Kafka partition. This means that the flink processor will will process the message with the larger timestamp first. Say the destination user goes temporarily offline- so the flink processor could miss the older message and only deliver the later message with the *smaller* timestamp. This could result in a "missing message" problem for the client.. The client can't just say "give me all the message older than this message I just got"- which won't contain the missed message.. We could tackle this by storing these missed messages in a separate database (an "outbox" db?) which the client can query on connect? There could still be a race: 1. flink- message delivery failed 2. client connects 3. client reads "outbox" db- which is empty 4. flink- store missed message in "outbox" .. client still missed the message.. do you see an elegant way to accommodate this failure?
So I'm going to challenge your first assumption here: "It's possible for an older message to have a newer timestamp in a kafka partition" -> Let's just assign the timestamps on ingestion by the DB Keep in mind that messages are only sent to after they are uploaded to the database. In this case, I can be certain that the ordering of the messages in the database is consistent with the ordering that I'll see them on my device. As long as this is the case, any queries that I make to the database based on timestamp will have the messages that I need.
> Let's just assign the timestamps on ingestion by the DB Are you suggesting that you can have the DB auto generate the timestamp? Not sure this is possible in a typical nosql store Also I just understood the design- the message flow is: user --> chatSvc --> kafka partition --> flink --> MessagesDB then, flink (same worker) --> LB --> chatSvc --> user So we could also just store the log_id from the kafka partition in the DB- and use that to order chat messages (and the user queries for missed messages using that- not timestamps), so I think there is no race then!
Love this series! Question: why? "We would be care about writes more if all of our messages were first being sent to a database, and then after we sent them to a database we would do additional processing to make sure they get delivered to every users’ device." around 9:42. Thanks!
If we have to first send every message to a database before it can be processed and sent out to all servers corresponding to a given chatId, then the database becomes a bottleneck in our message delivery times. If we first send it just to Kafka, and then to our database + other clients at a similar time down the line, then it does not!
Great video Jordan. Thank you! Follow-up question: You mentioned Load balancer is listening to Consistent hashing ring updates on Zookeeper and decides to use respective chat server for associated user request. What type of load balancer is this to have all these logics? L4, L7 or is it API gateway? I am always confused between these 3.
Very nice explanation!!! Thanks for making it! I am wondering how capacity estimation would be used in any later design decision - my guess is we can use it in partition number / database number / replication number decisions etc. But rarely see them in any examples, also don't know what number would be standard for a normal network / cpu / disk, so I can divide by the estimate.
Yeah this is very fair, I mainly use it for storage (e.g. can we keep this all on a single system, can we process a data model synchronously or do we need to process it in the background, is it going to be cheap to keep this dataset in memory or do we have to use it on disk). I think that it's pretty handwavy to do any type of CPU/networking estimations without some amount of actual prototyping and benchmarking.
Hey Jordan thank you for great explanation, I have a question when flink getting the users in chat, where flink stores this enormous data? In rocksdb ? As Kafka may delete the info after the retention period right? Let’s say there is chat room created 1 month ago and no one are active on it and no CDC happened and thus no data available about this chat room in flink? Then how flink will manage to get the users in this chat room? Am I missing anything here? I understand from your design may be flink is duplicating all chat-id to users list again somewhere in disk or rocksdb But can we rely on streaming platforms for getting the actual data as our source of truth is already stored in chat meta data db?
1) Yup, RocksDB seems reasonable. 2) Flink can always hold on to this data forever, or if need be not hold on to it, let it expire, and then cache it the first time it asks the DB for chat members. Keep in mind all chat info is in the database too
Nice Video Jordan.. 1. Can you explain the flow once on How the historical chat lookup would look like in your design ? How user1 sending message to user2 would look like ?
Sure, our chat db is partitioned by chat id and ordered by timestamp. So just provide a chat id, a start timestamp, and an end timestamp when you want to fetch historic messages. User 1 sending a message to user 2 could be implemented the same way (where the chat between them is represented as a group chat), or you could opt to just send the messages directly from one server to the next, but you'd still have to put the message in the db at some point.
Thank you so much all the videos. very usefule and binge watching them. I am not very good at numbers and its really hard for me to understand the capacity estimates. Can you please make a video about how to make capacity estimate number. Like convert bytes to TB/GB/PB. thanks
Hey, I appreciate the suggestion! However, this is more of a prefix learning thing than anything else. Mega = million, giga = billion, tera = trillion, peta = quadrillion
It was a great video diving deep and logically introduced each component before attaching final design. From personal experience for sequential / sync. cases CDC doesn't play well IME. So, I was thinking what would be another alternative for group chat users update? Just fetch from DB ?
Thanks Jordan. It is really a good explanation. But, I still have one question. How to address the multi-colo/multi-datacenter challenge as a global service? Let's say European Eddie wants to chat with Kai in China.
This makes sense. I don't really think there's a better option than to just..."send the message". If it was so prohibitive, perhaps the international messages could be polled instead.
Hey Jordan, nicely done! One thing that I wanted to point out which could have been elaborated is the scenario where the user is offline and not connected to any chat server. They still get the message notifications on their app. Based on this feature, I think an online presence service may be justified that flink or whatever sender would want to refer to in order to figure out the offline users and then send them a notification instead of a real time update. What do you think?
That's a fair point! This is something they cover in grokking, but I guess the gist of it would be to have the client write some row in a database saying they're "online" or "offline" when they open or close the app, and then the notification service could read this accordingly.
Thanks for the great video Jordan! For your design could you please share how a feature such as searching within a chat would work? Would that be part of the chat service ? Or is the something that could be done in Flink?
Hi Jordan, thanks for your videos on system design. Just wanted to check with you how would you handle end-to-end encryption of messages for an application in this design? Thanks!
I made a video on HTTPS/SSL a while back. Then I'd read this. www.reddit.com/r/cryptography/comments/j75ub8/how_group_chats_can_be_end_to_end_encrypted/
Thanks Jordan for such a clear explanation. How many topics will be there in Kafka, the chat service one, will it be one topic and partitioned on chatIds? if so will it be billion partition?
Agree one topic with partitions. "Partition by chatId" just means all messages with the same chatId in the same partition, not necessarily that there's one partition per chat id.
Thanks again for the informative videos! One question -> I didn't quite get it by using Kafka + flink to increase write throughput. Curious why this can speed up write over directly write to HBase?
Hey! I wouldn't see we're increasing our write "throughput", we're taking the write burden off of the client (we no longer have to wait for a synchronous write to hbase) and instead allow the client to just write to kafka (faster since this is just a write to the end of the log). We then handle things as fast or slow as we please in the background.
Hey Jordan, thanks for the amazing system design video as usual !! I have one doubt on usage of Flink. Whenever atleast one flink compute node goes down or restarts, then the flink job fails and it has to restore the entire state across all the nodes from S3. So, this whole restoration process can take few minutes. So, our message delievery will be delayed for that many minutes, affecting the entire user base. Is that understanding correct ?
I don't think this would require restoring all nodes from S3. I would just expect no messages to be processed from that partition during that point in time.
You could always just create a chat Id with two people in it. But if you wanted to optimize, just having 1:1 chats go from the user chat server to the other user chat server could be fine too.
Probably worst part of this video and I need to remake/amend. Same thing as Twitter basically, keep a database that holds messages per user. Popular ones should be pulled from database sharded by chat Id and not fanned out.
@@jordanhasnolife5163 in the DB, the status can say 'undelivered' for those messages. When a client comes online - how will clients pull these messages? or would we have a service - say presence service that watches when user comes online, and then delivers all undelivered messages to the user ?
Thanks Jordan for all these awesome contents. I have one quick question on CDC. The CDC events in Kafka would eventually be gone from Kafka but in Flink we need full snapshot/state of the DB in order to do anything with the messages. So how is this issue solved? May be I'm missing some details here. Thanks
For the load balancer, you mention using Zookeeper. How would this work in practice? Are there existing cloud solutions that allow you to connect load balancers with Zookeeper? I assume we wouldn't want to spend time implementing our own load balancer. I know AWS allows you to do Layer 7 load balancing, could we use that? Thank you for the great content!
Not sure on the AWS piece, but zookeeper has a bunch of server bindings that register there when they start up, and the "load balancer" is a proxy that subscribes to those and routes requests accordingly. You can see something like the service discovery recipe of the apache curator framework docs.
I don't recall saying to sort it by UUID. Gist is we just want to partition by chat Id and sort by timestamp, so that we can quickly fetch the messages for a given chat.
It was mentioned at 26:37, Jordan mentioned it was for providing Idempotency for Flink. Two messages can have same timestamp for some race condition when both of them arrives exactly at same time in two servers. That is my understanding.
Hi Jordan, Great video. I did want to know what the relationship between chat and messages is? A chat can have one or more messages and I found it a bit hard understanding that relationship while watching the video Thanks
Hey Jordan, When you say sharding kafka based on chat ID you mean within a single topic you have partitions which have partition key as chat ID. So, it means ordering within a chat will be maintained and all the messages from a single chat will go to single partition and it will be read by one flink consumer. Please elaborate in case I'm totally wrong.
Could you do a design of Google Sheets that can potentially support hundreds of concurrent users? Got asked that for a "frontend" system design interview.
I'll think about this one! Should mention though, the nice thing about sheets is that each cell should (in theory) be edited independently of one another, so perhaps less concurrency control is needed. Though that being said, I suppose you can bulk edit cells by changing an equation :). Make sure those edits are atomic!
Hi Jordan, thanks for making the video. I have a question for the choice of HBase, in the video it says the reason we choose it i HBase is column oriented storage when we have high percentage meta data it's a good choice. While Cassandra also support fetch specific columns without loading the entire row into memory. So why not just use Cassandra?
great video. qq : we did not talk about if you users are offline and once they connect, where are we going to keep undelivered messages for a device until then? is it going to live in flink until then?
No, they live in the chat database, which you read from upon startup. You could also maintain a cache of messages that each user is interested in, from a database
Thanks for the video; In 13:45 it mentioned message are being overwritten when there is same UUID and timestamp. I thought every message should have unique UUID? would it be easier if we just stop processing the incoming message if there is already a message with same UUID on the HBase?
This is the same message, it's just being replayed from kafka by flink, because of the "at least once" semantics. You can also choose to do what you suggested, it's basically equally expensive.
Should pub/sub be a good option as well here? Flink can publish to a topic of that chat ID and all the chat servers who have users in those chat ids can be subscribed
Isn't schema for chats table problematic since all messages for a given chat will be on same partition ? I believe discord chose to go with chatID + timeRange as partitioning key to ensure a partition doesn't filled up (if that makes sense)
Yep I think that I assumed any amount of messages probably wouldn't fill up a given disk, but as datamodels get bigger and we collect more data I'm sure they could, your solution seems perfectly reasonable to me
Thanks! Yeah most likely, a lot of these technologies are pretty interchangeable, and the main thing to note is just when I say technology x that could be one company's version of technology y. Amazon has basically remade all of this stuff to be hosted on aws but with different names.
Jordan burning question - how well does Flink scale if it has to score / cache all chats that have ever taken place? E.g. if we look since the inception of Whatapp we might have some multi billion / trillion(s) chats which have taken place. How could Flink scale to such a huge data set? How many Flink nodes would your system need to support Whatsapp level load? Should it ever expire older chats from its cache? And finally how would it ever restore from a known state incase a Flink node goes down? Thanks!
1) At the end of the day, you can always use a TTL on what flink retains and consult the database for chats that haven't been active for a large amount of time. Option 2 is to just keep adding more flink nodes lol, but that seems wasteful considering how many chats are largely inactive. 2) Flink checkpoints its state to S3 every once in a while alongside its offset in kafka.
Thanks for the video! It looks like you're leveraging message queues to asynchronously write every new chat message to the database before rerouting it to the other chat members. Would data inconsistency potentially be a problem if the write to the database fails, but the rerouting does not? Is this worth making the writes synchronous, or would the retries + idempotency key be a sufficient bandaid for this potential inconsistency in an interview?
Yoo Jordan! I've figured pretty similar design, and the one exception is the message retrieval. I also would go with sending messages like you did though a stream sharded/patitioned by chat_id, but on the contrary for reading messages. Isn't this sendrequest(user) for user in yours last Flink chat_id: [users] going to be spammy for the load balancers (we are not ignoring possibiliy inactive people on the chat, unless we already plan for some push notifications, smth smth)? Could we do a solution around user_id, so that after user establishes connection with the app via a websocket (and now knowing his userid), we are looking out for newest changes for that userid? another great one Jordan! thanks for your work
Hey! Yeah this is probably how it would work in reality, however my thinking is that considering that we only have one chat open at a time, it is fine to be connected to the server where the chat itself is sent. I'd say that for something like a notification service, where you can be getting notified for many chats at once, I'd probably have Flink redirect the messages to another node which the user is actively connected to.
Great work!! I’m not able to understand how to scale this to multi region for message delivery when both users are connected to chat servers via web sockets ie. support chatting between 2 users in different regions (Asia and North America)?
Not quite sure why anything would change in the design? It is true that latency will go up, but flink will just have to send the message over to a user's particular chat service.
Is there a failure scenario with Flink where chat service fails to send message to some of the chat group members? I guess Flink/chat service can give up after a few retries and push those undelivered messages to dead letter queue or just forget about it. But, what's bothering me is that the probability of this happening goes up with large number of chat participants. Would it be a better option to join against in the chat receive and submit jobs to Kafka (like fan-out) instead of jobs?
This is a part of the problem that I admittedly should have devoted more time to. Every message in the chat should have some sort of sequence number so that if one particular user fails to receive a message from its chat server, it can go to the database and backfill all of the messages that it missed.
@@jordanhasnolife5163 That's a great solution! Having a sequence number totally solves the problem. Feels like by focusing a lot on the backend we tend to forget that the client can also play a role in building a robust system (other example would be deduping through idempotency checks). BTW thanks for all the great work you are doing in this channel!
Hi Jordan Nice video Is the load balancer for user service same as multiple load balancer for the message service ? Why do we need two tables here indexed with user id ? We can just use one with chat id in it ?
Can you touch on how the clients have a web socket to the chat server? Presumably the client actually needs to go through the load balancer to not expose our internal chat server. So they would actually have a connection with the load balancer and the load balancer has a connection to the chat server?
I don't see why our client couldn't have a websocket to a chat server, assuming that these "chat servers" solely exist for the purpose of delivering chats to clients (and sending them to other clients).
@@jordanhasnolife5163 Thanks! That's good to know. I just wanted to make sure that this was an instance where we were OK with exposing an internal server to the client.
Hey Jordan! thanks for the video. I have a question around the part where flink consumers sends the message to other users in group chat. The consumer checks with load balancer to know about the hosts to which other users are connected and then consumer sends the message to those hosts. But how does the communication between flink consumer and chat service host occur for consumer to share those message to host to send them further to users?
Why do you hashing on the user id and just store the session state in a separate storage like redis ? Sticky session will not guarantee the even distribution of traffic and it might be a bottleneck for very high amount of traffic
I didn't fully understand the decision to use stream processing. Is it because HBase is write constrained so the stream processing acts as a way to queue up the requests? If that's the reason, does it really solve the problem, because wouldn't that just create a backlog in the stream that HBase still would not be able to eat through fast enough? I'm also not clear why we went for a NoSQL solution for the message table. Could MySQL have worked? Since we're not designing around a write optimized database, is there any additional advantages to using HBase over MySQL?
1) No the stream processing is done because it's expensive to fan out chat messages to so many receipients for group chats. 2) You could certainly use SQL, but in practice probably whatever gets you the fastest write throughput may be best.
Thanks for the amazing content Jordan! Is it fair to say from the proposed design that when the user A wants to create a new chat with User B they hit the metadata service which would drop the chat id in ChatMembers table. When the user actually sends the message this is sent from the chat server -> kafka -> flink. Could this lead to race conditions where message reaches Flink node before the chat id information? Do you have any proposals to handle this.
Hey Jordan! Thanks for making this video. Quick question - it sounds like you're using sharding and partitioning interchangeably to mean the same thing (around 7:00 and 23:00)? Or do you just mean that you would both shard AND partition based on user_id?
nice work. its pretty impressive. however do you mind explaining how timestamp can help us to order message ? how will clients handle the client buffer ? i was under impression that clients have client buffer in cases when a message is delivered they do not provide all the message until the entire order is received. this way they buffer all the message and then provide the ordered message. (something like strict or total ordering) i believe they use some total ordering + fifo to achieve this. do you have any idea on how it works. Could you please help me understand or maybe a video on it ?
I'm not quite sure what you mean by the above, however my proposal was the have the message be assigned a timestamp by the database. Then, we can use the database timestamp to help us achieve a total ordering of all of the messages within a chat, since they'll all live on the same database partition.
@@jordanhasnolife5163 sure, that does make sense. However flink will send message to all the members of group asynchrounusly while writing to DB. So in scenario let say m = message and t= time M1 at T1 M2 at T2 where M2 is recieved first by a member and then M1 . So are we going to show the M2 and then M1 and re-order it later ? or M2 will be buffered until M1 is delivered and then deliver M2 ? in eariler scenario user might view M2 and when M1 is delivered client will re-order. But this will not be strivt ordering. In later case how will that be achieved in your solution thanks for the explanation though
Hey Jordan, I really appreciate the videos and have learned a lot from you. Do you use the words "partitioning" and "sharding" interchangeably? I thought partitioning means splitting a table within the same database instance, but was unsure when you explained the User database at 6:30. Thanks for the content, I have been watching it non-stop lately.
@@jordanhasnolife5163 In SQL context at least, sharding is when data is physically separated across different machines. Partitioning, on the other hand, is just a way to organize data within the same machine. For example, something like order database, orders of user A and user B are sharded into database 1 and database 2. The orders within database 1 can be partitioned based on year of order.
@@knightbird00 yeah as I read more of these papers I see that people partition the same table as well and distribute those so I'm probably gonna continue to piss people off by using them interchangeably lol
Hi @Jordan, thanks for the detailed video. I am bit confused, that chat-members-table is partitioned on userId, and from there we are doing CDC to kafka which is sharded on chat ID. Can u pls tell what will come in CDC, and how the topics would be in kafka, will it be like one topic for each chat ID ??
The kafka topic would look something like "chat-member-changes", where a row has one of INSERT/DELETE, chatId, userId, and the topic is partitioned by the chatId
Hi Jordan, since you have talked about showing messages of particular chat to the user. If we have sorting of the messages done on the server-side, then instead of returning all the messages and sorting on the device, we could have lazy-loading of the messages. Server can send paginated kind of response for all the messages and load as user scrolls. Does it makes sense?
Hey Jordan, thanks for the amazing content. Just one doubt, I understand that in case of group chats, Flink will do the fanout and will tell the load balancer that I want to send messages to (2, 4, 6) but how load balancer knows that which chat server 2, 4, 6 are connected to? Where is the information stored?
In the final design, messages are stored in HBase using the partition key chatId. What if the data for a single chat exceeds the capacity of a single partition?
Hi Jordan, I don't understand why having a message queue at 14:14 as a buffer will be enough to solve the throughput problem. If the throughput is higher than what HBase can handle, consistently, doesn't the buffer just get bigger and bigger? But I get your point to shard the HBase single leader replication. Are you implying that both the message queue as buffer and partitioned HBase can solve the throughput problem?
The reason that we use the message queue here is not because HBase is our bottleneck, it's because the fan out to multiple different consumers is our bottleneck. So we'd rather perform that process asynchronously.
Hey Jordan, thanks for another useful content. I have one question, might be stupid one :( Why do client device actually needs to connect to any of the server? To receive msg from someone when you are not online or are there any different reasons as well? like receiving notifications?
I think the presentation of this one is a little bit confusing. I lose sight of the overall picture (what pieces are required in order to make a messaging system work) in all the discussions of how particular pieces should be set up and the tradeoffs of using one technology vs. another for them. The ZooKeeper / consistent-hashing / load-balancer part is a good example of this: it sounds like the overall goal is just to ensure that users are evenly distributed across message servers, so why not start with that very high level goal and then delve into the details (how ZooKeeper can solve this problem) as needed. It feels to me like it would probably be better in an interview as well to start by being able to present the high level picture and then show you can fill the sketch out with more detail as the interviewer requires it. Similarly, when you get to the overall architecture diagram, I feel like I'm missing the connection between the different use cases and the flow through the diagram. What parts of the architecture is the user going to hit when they log on to the service? What parts of the architecture are going to be hit when the user sends a message? Receives a message? How are more advanced use cases such as the removal of a user from the chat going to be handled. (For that matter, what does chat administration look like?) Another thing that I was wondering about when I was trying to reflect on the example after I watched the video: what function is the chat-user table (recording who is part of the chat) playing in the system? Feels like the motivation for this would be two-fold: access control (which I was wondering about above and isn't much discussed) and also (I think this is a bigger omission) management of open servers. Expanding on that point: when I use Facebook messenger, all my open chats are available to me -- I can look over the history of any chat session at any time. But of course not all these sessions are going to be open. So there probably also needs to be some service that is going to spin up chat servers when the chat is "active" and then spin them down when the chat is no longer active. I send you and Techlead a message one day and we have a brief conversation. Two weeks pass and I send you another message. Seems like when that new message is sent, something is going to have initiate a "create chat" -- a new chat server has to be spun up, web socket connections established for all participants, the history potentially queried (if it is no longer local on the clients of all participants). Then to save on costs and manage scale, chat servers will be spun down every so often. (Maybe this was discussed with the consistent hashing scheme a little bit, just could perhaps be better motivated.)
1) I don't think there is any need for "active" chat servers. Users connect when they're online, or they poll a database from when they first sign on. 2) When I'm a user, and I log in, I get connected to a chat server. When I send a message, I send it to kafka, which sends it to flink, which sends it to the chat servers that other active users in the chat are connected to. This is how access control is performed. 3) The reason I have a table outlining which chats a given user is in, and partitioned and indexed by user id, is so that I can quickly figure out all of the chats I'm in, and poll teh db for new messages in that chat when I come online. 4) Removal of a user from a chat isn't particularly complicated - remove them from the user-chats db, let the change data flow through the db, and now they're no longer subscribed to those messages. Additionally, that chat will no longer be an option when they're polling messages. 5) When the user receives a message, it is because a message came to the chat server that they're connected to, and it relays it to the user via a web socket.
@@jordanhasnolife5163 thanks. On the first point I still think I'm missing something. The chat server creates a web socket with users so it can push messages to them and receive messages from them, right? So when users send a message, is it right to say that is going from user to server to kafka? And then that message goes from kafka to flink to servers to users? So that a chat server is mediating all messages? for (3), it still seems to me like users could become involved in many chats with many different people over time (just imagine if this were like a hookup app with group chat) -- are users going to poll the database for all those chat groups every time they log in -- to see if there are new messages? would that be expensive? or am i missing something about how that would work? i.e. do i only poll the database for the most recent chats i was involved in on the assumption that those are chats i might have been disconnected from? and then at any time anyone in one of the other chat groups can "reopen" the chat and the message will be delivered to all participants?
Can you expand on how you'd design the Kafka topics? Is it a topic per user or chat, or something else? I initially thought you had a topic for each chat but looking into Kafka a bit more, it looks like you might be referring to how you can partition Kafka topics.
@@jordanhasnolife5163 Thanks for the response. I suspected this may have been the case. Are there any downsides to doing one topic for all chats? I understand that we're partitioning the topic itself and those can be spread across different brokers.
In the final design, a Flink node sends a request to the load balancer for each message recipient. The load balancer, using the cached consistent hash ring, fans out the message to the pertinent chat servers. Is the implication here that the load balancer is forwarding requests based on the verb, path, and recipientId?
flink noob here - when you say "sharding" do you mean "keyed stream on key 'chatId'" / partitioning on chatId? e.g. 23:30 (also possibly relevant to kafka too) (don't wanna use the wrong term in an interview and get caught flat-footed)
Fair point! It basically means to use the chatId as your kafka partitioning key and then have one flink consumer per kafka queue, which effectively means they're partitioned the same way.
6:50 is there any issue in storing users table? considering we might not have write conflicts although now I think about it, user name can be a write conflict, its a bad experience for failing one write and asking user to change user name. is this right reason to stick with single leader?
Yeah honestly I just don't think there's enough throughput needed here/availability concerns that we need leaderless/multi leader, I'd just do some partitioning and stick with single leader, perhaps with some geographic awareness of which user is writing to the table (aka use different leaders for US vs. China users)
It's a book that you have to pay for "grokking the system design" on educative (or you can try finding a free pdf). I don't think it's particularly great, but this is just my opinion
@@jordanhasnolife5163 Thank you, already bought it. I thought maybe you were talking about something else. In my opinion they are too short in explanation. Some cases are good, other too short. Feels like each case has different author.
Hey Jordan, I've one question here regarding the websocket connections between client and our backend servers. Since we do not have a load balancer layer in between once the WS connection is established, how are we going to prevent our system from spam calls or a DDOS attack?
Not sure what you mean here, we have a load balancer determining which server a client should connect to. You can perform rate limiting there. If you just mean sending too many messages over the web socket, you can perform rate limiting on the chat server and boot the client if need be.
Hi Jordan , just want to understand what will be the behaviour when flink is trying to send the messages to multiple users using LB and Chat service and some of the users don't have an active connection with the chat server or don't have an active internet connection. In this case how will we keep the track of messages yet to delivered and how will the remaining messages be sent again once the active connection is established again between the client and the server.
Yeah this is a fault of mine in this video. The gist is, it doesn't get delivered. Each client keeps track of a last seen timestamp for message delivery, and then when they next connect to the chat server they can reconnect and fetch all messages since that timestamp.
Hey Jordan! One more question (can't promise it's the last). Why are we caching the chat members table in Flink? Why don't we cache them in a separate Redis cluster using chat__id, for example? Then Flink can query Redis. Also, I understand partitioning cache using chat_id, but what do you mean when you say shard Flink on chat_id. Can we even shard Flink?
Using redis means that we have to make an extra network call to fetch our data, caching in flink keeps everything local. And yes, basically you'd have a kafka queue that is sharded on chat_id, and then one flink consumer per kafka partition.
You could do some smart partitioning within chatIds such that those that mostly contain users from x region go to a flink node in x region. But yeah, sometimes messages are just going to take longer if they have to go cross dc.
Hi Jordan, great content! thank you. One questions though, the send message path is clear (user through web-socket and its server puts the message in the Kafka and then from there the message goes through Flink to the chat table), but the receive message path is not... so Flink talks to LBs and finds the corresponding chat server to connect to the receiver... but how message is sent? where is that component (or arrow)? is there another Kafka for receivers? i just see one arrow to LBs which is for finding the corresponding chat server ... we don't send the message to the LB and it should go through chat-server. appreciate if you clarify that
@@jordanhasnolife5163 Thanks! So to summarize the steps for "receive message path": 1- Flink connects to load balancers in parallel (as mentioned in the video) for users 2, 4, and 6 (in your example). Flink should maintain this information (user 2 => LB 2) to facilitate parallel requests? 2- Assuming Flink is aware of the corresponding load balancer for these users, each load balancer responds to Flink using its cache or Zookeeper with the respective chat server details. 3- Flink subsequently utilizes HTTP (POST) to send messages directly to the identified chat servers and chat server send the message to the user through WS? Regarding number 1, why Flink doesn't directly ask Zookeeper to get the corresponding chat-server for a user? Why do we have to ask the LBs? Since Flink doesn't know a potential recipient (user) is connected to which LB... The only module that knows all these (up to date) is the ZooKeeper.
How do you handle new nodes coming online that change which server a client is supposed to be mapped to? Do you force then to disconnect and reconnect? Or just maintain a key value store of where they are currently connected? I feel like this would conflict with the reason we are using zookeeper in thr first place. Might as well not use it at that point
I would propose that they disconnect and re-connect yes, though the number of servers that have to do this should be minimal because we are using consistent hashing. My proposal is that zookeeper should just store the current hash ring (which the load balancer uses), and then route all messages accordingly. You could maintain state for where everything is stored, but I feel that this isn't necessary so long as we follow our consistent hashing schema.
1) I think that these would be separate tables. Do we have to put them on different nodes IRL? Perhaps not. That being said, at facebook's scale, I bet they are. 2) My guess is that facebook actually probably does a per user cache of all messages, meaning they fan them out, meaning there are a lot of DB writes per message created. Having a database that place nicely with write speeds (LSM trees) could be nice. Besides that, Cassandra is highly available - if we use things like sloppy quorums in our database we can likely ensure that a message will always be received by our system once sent, even if it takes a while to propagate. 3) It doesn't, it goes through the chat server, which hits the database (or optionally a per user cache of messages).
How does receiving of message happens, once it reaches to Flink node. Is someone subscribing to this Flink node? You mentioned load balancer receiving the message, then isn't that going to cause thundering herd problem?
Flink sends it to the chat servers associated with (actively connected to) the recipients of the messages For q2, in theory a load balancer can cause a SPOF, or you could run many in an active-active configuration, listening to zookeeper.
If storing messages is not a requirement, then partitioning the chat servers by chatId would be the correct approach, right? This way, all members will be connected to the same chat server can directly share messages.
Hey Jordan - I’m having a hard time with your 1.0 vs 2.0 for this system design. 1.0 seems simpler and seems to accomplish all the objectives, would it be fair to say that you did this just to provide another perspective and 1.0 would work just as well in an interview setting, or would you say your 1.0 design just seems wrong to you now and this design should be the only one we see?
Hey! The fact that 1.0 was simpler is the reason why I felt the need for 2.0. You may be able to get away with lesser depth in some interviews, but other interviewers may press you for more. I wouldn't say that 1.0 said many incorrect things, but looking back on it I think that repeating some of those solutions word for work would make you susceptible to an interviewer wanting more detail.
On time 13:15 What do you mean by Partition by chat-id in Kafka and also flink? Do we need to partition data to put in Kafka? Would there be multiple kakfa queues per chat-id? I understand partition by chat-id in Hbase bt didnt get the concept in message broker and processor(flink)
Nope! I mean partitioning messages within a given topic based on their chatId such that messages with the same chatId are on the same physical kafka queue so that they are handled in order. Each kafka partition will then have one consumer, basically making it so that the consumers are partitioned the same way.
Hey Jordan! thanks for a great content! I want to ask about Flink: you propose to use it in order to store relations between users and chats they are in but is it really possible? Let's say we have 10B chats overall and id of each one is int64 and 50 users on average per chat where each user also has int64 id Then we would have (8 bytes chat id * 10B groups) * (50 members * 8 bytes per id) ≈ 30TB of memory Are my calculations incorrect of my understanding of the world is incorrect regarding the amount of memory a company can afford?
I don't think it would be this bad in reality as I think you're overestimating, but even if you weren't you can throw that on 1000 servers and now use 30gb for caching on each. If it really was so bad, nbd, we don't have to cache, we can throw them in a sharded database and make an API call. It'll be slower, but who cares haha
@@jordanhasnolife5163 Thanks for an answer, makes sense I was making assumptions upon your capacity estimates where we have 1B users thus I decided that 10B chats is reasonable amount :)
I rewatched this part many times. Did you say that we will reload old message from the flink by reaching out to HBase? Isn't this a bad pattern using asychronous process to load old message when we could do this with simple api calls?
Hey Jordan, I am not clear on the fact that why were are using both kafka and flink together for message passing between chat servers. I understand that Kafka and flink both are stream processing and only one is enough. Do you have a video that gives some more detail in this ?
Its not clear to me that what is purpose of flink here. Its a stream processing engine where multiple messages are collated and few transformations are done over a window. I didnt see any messages are collated and aggregated. each message is processed separately one by one. so, looks like standard message consumer use case to me. why not use simple consumers and consumer group which can do same job of pushing to clients as well as storing in db.
Agreed on your first point. However I'm using flink here because we want to aggregate the results of the change data capture to create an in memory map of follower relationships. That's the part we want to collate, not the incoming posts.
Hey Jordan, love you videos man! Keep up! I've a question though, since chat ids would be random and a user can have any number of chats, would we be able to shard kafka queues with so many chat ids? can't we just shard by user id instead?
Hey Ulfat! So it wouldn't be one kafka queue per chat id, but rather each kafka queue is assigned a set of chatIds. You could send these messages based on the user that sent them, but then you'll basically have to separately figure out who is in this group chat and on top of that send the message to all of those users.
@@jordanhasnolife5163 Is it possible to partition the topic into 100s of millions partitions as the recommended is 200000 partition per cluster ? Actually I'm working on my project where I have to implement chat and I don't have any idea if I can assign a partition per user by user id cuz then partitions are gonna be in millions. And also we have to define how many partitions we want to create per topic before hand and later adding more partitions is a whole lot of mess.
@@mymobile550 Hundreds of millions seems unlikely (I'm not proposing one per userId to be clear, what's important is making sure the same flink node is always consuming the same partition). But if you have 1 billion users and 1000 partitions that feels pretty reasonable to me.
At around 15:23: can't the chat servers which have web socket connections with the intended message recipients directly subscribe to given kafka topic (via chatId) and read from kafka based on individual offsets? Also, kafka itself can be the message repository of sorts and we dont really need hbase? One can always start from required offset and reread kafka messages if required etc.?
1) Yes, albeit then each chat server now has to connect to thousands of kafka topics, which could very heavily increase the load on it. In designs like these, I think it generally helps for components to specialize: let flink deal with kafka, let chat servers deal with users, and then have a middleware that lets one push to the other. 2) Yes, albeit kafka typically has a retention period which is not infinite. If I want to read messages that are a week old, there's a good shot they've been dumped from that broker.
What happens if the client is not connected, a component from the client is missing. When a client comes back online, if messages are snowflake ids with timestamp the client can say send me message after x for this chat id.. and retrieve from a cache in front of db or the db itself.
so the flink here, when receive a new message, will do 2 things, 1) persist the msg to Hbase 2) send that message to other chat group members, how do we guarantee that both of them happen without 2pc? or is it the flink will keep track of the state and retry if fail?
Yeah. So we're only considering the message successfully processed if all of these operations succeed. Because flink reads from kafka, if one of these fails, we can just reread the kafka message and try again. Since all of these operations are idempotent I'm content to retry them as many times as needed.
Sorry if i missed it in the video but how would we design for quickly getting a set of most recent messages from the backend assuming messenger like app where we store messages on the server?
It allows us to not have to worry about a bunch of different failure scenarios that can occur when we don't use a combination of persistent streams and stream consumers. If, as opposed to using Flink, we just sent messages directly between chat servers, I could send a message to a chat server, it could get to the DB, and then the chat server goes down. Now, it's never getting sent to the other chat servers. Stream processing ensures that each messages will be handled at least once, and then we can build in idempotence within our chat servers to ensure that new message notifications are handled only once.
@@saber3112 Yeah I think it's just a matter of whether you want to be processing results as they come in or doing a batch job (once per day) will suffice. For the sake of these problems, most of them thus far have wanted instant delivery to users which is why you see me continue to use flink.
What properties of flink is beneficial over here? Let’s say I have my own stream consumer which consumes from Kafka, sends it to chat server load balancer, saves it to chat table and only after that commits to Kafka. Will this suffice or is there any other guarantee which flink provides which is lacking in this use case?
@@sbahety92I dont actually have any experience with Flink, i’m just a humble engineer reading about things so i can prepare for interviews. but my understanding of flinks main advantage is that it has check pointing built in. you could write your own check pointing to s3, but i would imagine that’s akin to implementing your own synchronized data structures: ripe opportunities for insidious bugs. i would expect flink to provide a way to interface with check pointed data structures in a way that involves your kafka/kinesis sequence number. so that way you know what your state is after processing a bit of data. i doubt that flink checkpoints after every single stream record processed. that’s where kinesis/kafka streams being replayable its key and why knowing records can be relayed in failure scenarios is key for design. so in short, you can reinvent the wheel if you want to. but someone already has gone through the trouble
I dont really understand how flink works. Ive never used it before. It needs to complete wrjting the message to the db before its done processing and can more on to thr next kafka message. Doesnt this slow down kafka message processing so users messages will be delayed? So the single leader aspect of hbase is still a potential bottle neck. Or do we havs multiple flink instances. One to send user messages to queues and one to write to db?
It is probably true that synchronously writing to the database before delivering messages to the users will slow down the amount of time before a message reaches the user. That being said, it also guarantees that said message actually gets to the database without requiring a two phase commit. I've opted for correctness here over speed, but if you wanted to deliver writes to users as soon as possible without a bunch of concern over whether they make it to the database you could do that too. In practice the delay for message delivery isn't something that I feel I'm able to predict without some amount of benchmarking. I think that worst comes to worst though, an increased amount of partitions is always a good way to increase our write throughput.
Hi Jordan, I wanted to clarify a point regarding HBase's storage model. HBase is actually a column-family-oriented database, rather than a true columnar storage system like those used in analytical databases. This distinction is important because it affects data access patterns and use cases. Both HBase and Cassandra organize data into column families, where operations are optimized around row keys rather than individual columns across rows. Both HBase and Cassandra use column families for data organization, but Cassandra often provides higher write throughput. Considering this, along with your use case and scalability needs, Cassandra may be the better choice for message table.
Perhaps so, thanks for the distinction!
> Cassandra often provides higher write throughput
In Jordan's design, write throughput is addressed using Kafka.
@@DarwinLokafka can serve as a buffer to protect server from spikes but doesn't really address consistent high traffic
It's worth noting that Cassandra's consistency guarantees when it is optimized for high write throughput lead to an inconsistent user experience where users might see different state across their devices until consistency is reached. In practice, this makes cassandra a poor choice for a messaging application where trust in the consistency model is important. You want to avoid a situation where some messages are temporarily missing or out of order even for a few minutes
Hey coming back to this, HBase is column oriented within a column family
Thank you so much for building this community to discuss advanced, in-practice system design. I discovered that this is not just another channel for sharing interview tips, but a space for in-depth discussions on real-world scenarios. How amazing! Keep going! Thank you!
I'm going to like every video I watch on this channel. It's the least I can do. Jordan, you're amazing. THANK YOU so much. Gonna watch every video in this series multiple times. Hope to become as great as you are someday.
You already are man - I appreciate the help with the algo ♥️
Well, one thing I got after the interviews (and I had this exact question in a recent interview) is that it's not that hard to understand it, and see all the bottlenecks and everything watching a video. But it's freaking hard to come to any near-perfect solution on the interview within 35 minutes. I guess more practice can solve this, though 🙂
Great video, it's cool to see so many important aspects of a system design in such an informative way 🔥
Makes sense! Definitely by no means easy to cover everything in such a short time frame!
Hi, overall great. I want to point out that we missed some key part here. The way the messages reach to each user is basically assumes that every user is connected thru some websocket connection. That is not the case mostly.
In this flow, we have a data flow in which user comes and asks for all his pending messages across all the chat groups he is part of.
Now we can do it by establising the connection and putting a status on the chat database of status and marking it "pending" when we receive the message in flink, and once the message gets delivered marking it "delivered".
Here if the user is not online, we can put the pending messages in some cache like redis, so when the user does come back online, the websocket connection will pull all the pending messages and delivery it to his device.
Also user can be using multiple devices, but that can be out of scope for this design.
Absolutely agreed, missed that part. All messages that should be read by a given user should be placed in some sort of cache for them to read. From there, they can make long polls to the cache or something to that extent.
Great work on this series so far, man ! I don’t have any interview plans but Im just watching to encourage your effort. And it feels fun watching these over other RUclips content. Way to go bro.
Thanks man means a lot!
same
Another great video, thanks.
Couple of clarification questions:
1. [23:14] If Chats member DB is sharded/indexed by userId, how does CDC data will be converted to kafka partition, which is by chatId? Will there be a conversion Flink node in between or will it be similar to tweeter's follower-followee relation, where both relation db exists?
2. [Generic Question] How does Flink's cache recreated in general, in fail over scenarios? Do we use any snapshotting in something like S3?
3. For chats db, can we use single leader key value DB instead, like DDB, which gives high write/read throughput? What will be the downside, we anyways are using Flink to write to it? Considering we use separate table for metadata?
4. How does chat across data center or globe would work? Will there be any specific constraint there, like latency or anything else? Will zookeeper/load balancer be responsible to connect across data centers?
5. [25:12] Does flink use 2PC to write to both LB and DB? If flink goes down after sending messages to LB, can new flink node will send them one more time to users? Probably, even if we send them again to users, may be Client can handle it. Just want to confirm?
1) yeah probably some intermediate node to do the forwarding
2) yep, through S3 checkpointing
3) you can do this, I think FB opts got Cassandra since they want high availability on DB failures
4) same deal pretty much
5) the chat server can tell flink the ID of the last message it's seen so that flink can backfill it with more messages if a write to the DB succeeds but the send to the chat server fails
Hi Jordan,
First of all, thank you for your amazing videos!
I have three questions regarding your design choices:
1.) You chose Apache HBase for storing chats. Could you elaborate on how it is read-optimized, considering it follows the same LSM tree structure and occasionally dumps data to disk? How does the column family model contribute to its performance and specific to this problem also? Are we grouping data based on specific columns in chats?
2.) Google Bigtable also offers a column family data model and uses LSM tree indexes. Are the underlying architectures of Google Bigtable and Apache HBase same?
1) I'd imagine each message has a lot of metadata associated with it, which we don't need when fetching it. Maybe column oriented storage will help us out here polling the table. Even though we are using SSTables, since each message is basically immutable, we can pretty much read a message the first time we've seen it, and since the primary key is basically a timestamp I think we'd be able to get some nice benefits from sparse indexing here. Maybe things degrade a bit if you can edit messages. This is a sort of convoluted explanation on my part, and IIRC facebook actually uses cassandra here for better availability in the face of node failures.
2) Yeah HBase is an open source version of BigTable.
the content is so good, it takes me hours to research and understand lol, love it thank you!
Question around the ZooKeeper + Consistent Hashing Ring + Load Balancing setup (17:35)
I'm having trouble understanding how these components are connected and what aspects we would actually have to "stand up" if we were implementing this design.
My basline understanding is the following
- ZooKeeper is used as a coordination service and it would hold the nodes in the cluster
- Consistent hashing is used to distribute data across nodes
- Load Balancer distribute incoming requests to available nodes (round-robin, etc.)
What is ZooKeeper storing and what is the "hashing function" that it stores?
Is Consistent Hashing used within the Load Balancer or is it separate?
How are these components connected and what is the flow of interaction?
I'd say zookeeper probably isn't storing the hash function, but rather there's an external component that, upon registering a new chat server, the service component accounts for this and updates the binding in zookeeper as to the range of IDs that each chat server should be responsible for.
That external service can look at the existing bindings and perform consistent hashing to figure out the range of IDs that a new server should handle.
This is a great video! Thank you, I learn a lot from your videos.
I can think of an interesting scenario with using appserver timestamps for the sort key- it's possible for an older message to have a larger timestamp than a newer message in a Kafka partition. This means that the flink processor will will process the message with the larger timestamp first. Say the destination user goes temporarily offline- so the flink processor could miss the older message and only deliver the later message with the *smaller* timestamp. This could result in a "missing message" problem for the client.. The client can't just say "give me all the message older than this message I just got"- which won't contain the missed message..
We could tackle this by storing these missed messages in a separate database (an "outbox" db?) which the client can query on connect? There could still be a race:
1. flink- message delivery failed
2. client connects
3. client reads "outbox" db- which is empty
4. flink- store missed message in "outbox"
.. client still missed the message..
do you see an elegant way to accommodate this failure?
So I'm going to challenge your first assumption here:
"It's possible for an older message to have a newer timestamp in a kafka partition"
-> Let's just assign the timestamps on ingestion by the DB
Keep in mind that messages are only sent to after they are uploaded to the database. In this case, I can be certain that the ordering of the messages in the database is consistent with the ordering that I'll see them on my device. As long as this is the case, any queries that I make to the database based on timestamp will have the messages that I need.
> Let's just assign the timestamps on ingestion by the DB
Are you suggesting that you can have the DB auto generate the timestamp? Not sure this is possible in a typical nosql store
Also I just understood the design- the message flow is:
user --> chatSvc --> kafka partition --> flink --> MessagesDB
then,
flink (same worker) --> LB --> chatSvc --> user
So we could also just store the log_id from the kafka partition in the DB- and use that to order chat messages (and the user queries for missed messages using that- not timestamps), so I think there is no race then!
@@ShortGiant1 yep agreed, you can use kafka ordering or a timestamp generated by flink if the db doesn't let you create one.
Love this series! Question: why? "We would be care about writes more if all of our messages were first being sent to a database, and then after we sent them to a database we would do additional processing to make sure they get delivered to every users’ device." around 9:42. Thanks!
If we have to first send every message to a database before it can be processed and sent out to all servers corresponding to a given chatId, then the database becomes a bottleneck in our message delivery times.
If we first send it just to Kafka, and then to our database + other clients at a similar time down the line, then it does not!
Hi Jordan,
Amazing explanation!!! It would be very helpful for revision if you can share the slides link for all your system design interview problems
Should have these up in the next few days
Great video Jordan. Thank you!
Follow-up question:
You mentioned Load balancer is listening to Consistent hashing ring updates on Zookeeper and decides to use respective chat server for associated user request. What type of load balancer is this to have all these logics? L4, L7 or is it API gateway? I am always confused between these 3.
I guess this would count as an L7 load balancer, though I'm not really proposing any specific terminology, just a custom server that does this.
Very nice explanation!!! Thanks for making it! I am wondering how capacity estimation would be used in any later design decision - my guess is we can use it in partition number / database number / replication number decisions etc. But rarely see them in any examples, also don't know what number would be standard for a normal network / cpu / disk, so I can divide by the estimate.
Yeah this is very fair, I mainly use it for storage (e.g. can we keep this all on a single system, can we process a data model synchronously or do we need to process it in the background, is it going to be cheap to keep this dataset in memory or do we have to use it on disk). I think that it's pretty handwavy to do any type of CPU/networking estimations without some amount of actual prototyping and benchmarking.
Hey Jordan thank you for great explanation, I have a question when flink getting the users in chat, where flink stores this enormous data? In rocksdb ? As Kafka may delete the info after the retention period right? Let’s say there is chat room created 1 month ago and no one are active on it and no CDC happened and thus no data available about this chat room in flink? Then how flink will manage to get the users in this chat room? Am I missing anything here? I understand from your design may be flink is duplicating all chat-id to users list again somewhere in disk or rocksdb
But can we rely on streaming platforms for getting the actual data as our source of truth is already stored in chat meta data db?
1) Yup, RocksDB seems reasonable.
2) Flink can always hold on to this data forever, or if need be not hold on to it, let it expire, and then cache it the first time it asks the DB for chat members.
Keep in mind all chat info is in the database too
Nice Video Jordan..
1. Can you explain the flow once on
How the historical chat lookup would look like in your design ?
How user1 sending message to user2 would look like ?
Sure, our chat db is partitioned by chat id and ordered by timestamp. So just provide a chat id, a start timestamp, and an end timestamp when you want to fetch historic messages.
User 1 sending a message to user 2 could be implemented the same way (where the chat between them is represented as a group chat), or you could opt to just send the messages directly from one server to the next, but you'd still have to put the message in the db at some point.
Thank you so much all the videos. very usefule and binge watching them. I am not very good at numbers and its really hard for me to understand the capacity estimates. Can you please make a video about how to make capacity estimate number. Like convert bytes to TB/GB/PB. thanks
Hey, I appreciate the suggestion! However, this is more of a prefix learning thing than anything else. Mega = million, giga = billion, tera = trillion, peta = quadrillion
It was a great video diving deep and logically introduced each component before attaching final design. From personal experience for sequential / sync. cases CDC doesn't play well IME. So, I was thinking what would be another alternative for group chat users update? Just fetch from DB ?
Yeah, CDC and flink saves us a DB fetch. What in your personal experience makes you say it doesn't work well?
Thanks Jordan. It is really a good explanation. But, I still have one question. How to address the multi-colo/multi-datacenter challenge as a global service? Let's say European Eddie wants to chat with Kai in China.
This makes sense. I don't really think there's a better option than to just..."send the message". If it was so prohibitive, perhaps the international messages could be polled instead.
Hey Jordan, nicely done! One thing that I wanted to point out which could have been elaborated is the scenario where the user is offline and not connected to any chat server. They still get the message notifications on their app. Based on this feature, I think an online presence service may be justified that flink or whatever sender would want to refer to in order to figure out the offline users and then send them a notification instead of a real time update. What do you think?
That's a fair point! This is something they cover in grokking, but I guess the gist of it would be to have the client write some row in a database saying they're "online" or "offline" when they open or close the app, and then the notification service could read this accordingly.
Hey man thanks for exsisting your teachings are too good.
Thanks for the great video Jordan! For your design could you please share how a feature such as searching within a chat would work? Would that be part of the chat service ? Or is the something that could be done in Flink?
I'd probably run a CDC from Cassandra to elastic search and partition the latter on chatId
Hi Jordan, thanks for your videos on system design. Just wanted to check with you how would you handle end-to-end encryption of messages for an application in this design? Thanks!
I made a video on HTTPS/SSL a while back.
Then I'd read this.
www.reddit.com/r/cryptography/comments/j75ub8/how_group_chats_can_be_end_to_end_encrypted/
Thanks Jordan for such a clear explanation. How many topics will be there in Kafka, the chat service one, will it be one topic and partitioned on chatIds? if so will it be billion partition?
Agree one topic with partitions. "Partition by chatId" just means all messages with the same chatId in the same partition, not necessarily that there's one partition per chat id.
Thanks again for the informative videos! One question -> I didn't quite get it by using Kafka + flink to increase write throughput. Curious why this can speed up write over directly write to HBase?
Hey! I wouldn't see we're increasing our write "throughput", we're taking the write burden off of the client (we no longer have to wait for a synchronous write to hbase) and instead allow the client to just write to kafka (faster since this is just a write to the end of the log). We then handle things as fast or slow as we please in the background.
Hey Jordan, thanks for the amazing system design video as usual !!
I have one doubt on usage of Flink. Whenever atleast one flink compute node goes down or restarts, then the flink job fails and it has to restore the entire state across all the nodes from S3. So, this whole restoration process can take few minutes. So, our message delievery will be delayed for that many minutes, affecting the entire user base. Is that understanding correct ?
I don't think this would require restoring all nodes from S3. I would just expect no messages to be processed from that partition during that point in time.
Thank you so much for the video. I am still trying trying to figure out why we need consistent hashing here for sticky sessions.
We don't really, it's just a nice optimization if you're logged in on phone and PC, but round robin works fine!
Kafka sharding is called partitioning. I think it is better to use right terms.
Thank you so much for posting such great content. One question, how does the schema handles 1-1 chat ?
You could always just create a chat Id with two people in it. But if you wanted to optimize, just having 1:1 chats go from the user chat server to the other user chat server could be fine too.
can you comment more on how the delivery of messages to offline users is handled? thanks for the great video !
Probably worst part of this video and I need to remake/amend. Same thing as Twitter basically, keep a database that holds messages per user. Popular ones should be pulled from database sharded by chat Id and not fanned out.
@@jordanhasnolife5163 in the DB, the status can say 'undelivered' for those messages.
When a client comes online - how will clients pull these messages? or would we have a service - say presence service that watches when user comes online, and then delivers all undelivered messages to the user ?
Thanks Jordan for all these awesome contents. I have one quick question on CDC. The CDC events in Kafka would eventually be gone from Kafka but in Flink we need full snapshot/state of the DB in order to do anything with the messages. So how is this issue solved? May be I'm missing some details here. Thanks
Hey, Flink checkpoints its state to S3 occasionally so it's ok to eventually purge some messages from kafka. You just need those snapshots to exist.
For the load balancer, you mention using Zookeeper. How would this work in practice? Are there existing cloud solutions that allow you to connect load balancers with Zookeeper? I assume we wouldn't want to spend time implementing our own load balancer. I know AWS allows you to do Layer 7 load balancing, could we use that?
Thank you for the great content!
Not sure on the AWS piece, but zookeeper has a bunch of server bindings that register there when they start up, and the "load balancer" is a proxy that subscribes to those and routes requests accordingly.
You can see something like the service discovery recipe of the apache curator framework docs.
@@jordanhasnolife5163 I was not aware of this project. I can see how it would make developing the LB a lot simpler, thanks!
Thanks Jordan for the video. Can you clarify what you meant by sorting chat table by timestamp and message UUID? What does sorting by UUID provide?
I don't recall saying to sort it by UUID. Gist is we just want to partition by chat Id and sort by timestamp, so that we can quickly fetch the messages for a given chat.
It was mentioned at 26:37, Jordan mentioned it was for providing Idempotency for Flink. Two messages can have same timestamp for some race condition when both of them arrives exactly at same time in two servers. That is my understanding.
Hi Jordan, Great video. I did want to know what the relationship between chat and messages is? A chat can have one or more messages and I found it a bit hard understanding that relationship while watching the video
Thanks
There's a chat table, a chat users table, and a chat messages table - which should encompass the relationship you're describing.
@@jordanhasnolife5163 thank you
Hey Jordan,
When you say sharding kafka based on chat ID you mean within a single topic you have partitions which have partition key as chat ID. So, it means ordering within a chat will be maintained and all the messages from a single chat will go to single partition and it will be read by one flink consumer. Please elaborate in case I'm totally wrong.
Nope on the contrary you're completely correct
Could you do a design of Google Sheets that can potentially support hundreds of concurrent users? Got asked that for a "frontend" system design interview.
I'll think about this one! Should mention though, the nice thing about sheets is that each cell should (in theory) be edited independently of one another, so perhaps less concurrency control is needed.
Though that being said, I suppose you can bulk edit cells by changing an equation :). Make sure those edits are atomic!
Hi Jordan, thanks for making the video. I have a question for the choice of HBase, in the video it says the reason we choose it i HBase is column oriented storage when we have high percentage meta data it's a good choice. While Cassandra also support fetch specific columns without loading the entire row into memory. So why not just use Cassandra?
Can you link me to where we do that? Not loading the whole thing into memory is different than not even reading that file from disk.
great video. qq : we did not talk about if you users are offline and once they connect, where are we going to keep undelivered messages for a device until then? is it going to live in flink until then?
No, they live in the chat database, which you read from upon startup. You could also maintain a cache of messages that each user is interested in, from a database
Thanks for the video; In 13:45 it mentioned message are being overwritten when there is same UUID and timestamp. I thought every message should have unique UUID? would it be easier if we just stop processing the incoming message if there is already a message with same UUID on the HBase?
This is the same message, it's just being replayed from kafka by flink, because of the "at least once" semantics. You can also choose to do what you suggested, it's basically equally expensive.
Should pub/sub be a good option as well here? Flink can publish to a topic of that chat ID and all the chat servers who have users in those chat ids can be subscribed
That's effectively what we're doing, no? What technology does pub sub use?
Isn't schema for chats table problematic since all messages for a given chat will be on same partition ? I believe discord chose to go with chatID + timeRange as partitioning key to ensure a partition doesn't filled up (if that makes sense)
Yep I think that I assumed any amount of messages probably wouldn't fill up a given disk, but as datamodels get bigger and we collect more data I'm sure they could, your solution seems perfectly reasonable to me
Great explanation, thanks for making these videos
whats happening around the 15th / 16th minute is something that can be achieved with Ddb streams too, yes? phenomenal explanation ..
Thanks! Yeah most likely, a lot of these technologies are pretty interchangeable, and the main thing to note is just when I say technology x that could be one company's version of technology y. Amazon has basically remade all of this stuff to be hosted on aws but with different names.
Jordan burning question - how well does Flink scale if it has to score / cache all chats that have ever taken place? E.g. if we look since the inception of Whatapp we might have some multi billion / trillion(s) chats which have taken place. How could Flink scale to such a huge data set? How many Flink nodes would your system need to support Whatsapp level load? Should it ever expire older chats from its cache? And finally how would it ever restore from a known state incase a Flink node goes down? Thanks!
1) At the end of the day, you can always use a TTL on what flink retains and consult the database for chats that haven't been active for a large amount of time. Option 2 is to just keep adding more flink nodes lol, but that seems wasteful considering how many chats are largely inactive.
2) Flink checkpoints its state to S3 every once in a while alongside its offset in kafka.
@@jordanhasnolife5163 Thanks Jordan!
@@jordanhasnolife5163 Thanks!
Thanks for the video! It looks like you're leveraging message queues to asynchronously write every new chat message to the database before rerouting it to the other chat members. Would data inconsistency potentially be a problem if the write to the database fails, but the rerouting does not? Is this worth making the writes synchronous, or would the retries + idempotency key be a sufficient bandaid for this potential inconsistency in an interview?
To be clear, the write to the database is synchronous in my design here. Once that completes we can do the fanout
Yoo Jordan! I've figured pretty similar design, and the one exception is the message retrieval. I also would go with sending messages like you did though a stream sharded/patitioned by chat_id, but on the contrary for reading messages.
Isn't this sendrequest(user) for user in yours last Flink chat_id: [users] going to be spammy for the load balancers (we are not ignoring possibiliy inactive people on the chat, unless we already plan for some push notifications, smth smth)? Could we do a solution around user_id, so that after user establishes connection with the app via a websocket (and now knowing his userid), we are looking out for newest changes for that userid?
another great one Jordan! thanks for your work
Hey! Yeah this is probably how it would work in reality, however my thinking is that considering that we only have one chat open at a time, it is fine to be connected to the server where the chat itself is sent. I'd say that for something like a notification service, where you can be getting notified for many chats at once, I'd probably have Flink redirect the messages to another node which the user is actively connected to.
Other than it was the choice made in Grokking, can you clarify why you went with HBase over Cassandra?
Column oriented storage means I don't have to read a ton of metadata when fetching messages.
I can see the Cassandra argument though.
Great work!! I’m not able to understand how to scale this to multi region for message delivery when both users are connected to chat servers via web sockets ie. support chatting between 2 users in different regions (Asia and North America)?
Not quite sure why anything would change in the design? It is true that latency will go up, but flink will just have to send the message over to a user's particular chat service.
Is there a failure scenario with Flink where chat service fails to send message to some of the chat group members? I guess Flink/chat service can give up after a few retries and push those undelivered messages to dead letter queue or just forget about it. But, what's bothering me is that the probability of this happening goes up with large number of chat participants. Would it be a better option to join against in the chat receive and submit jobs to Kafka (like fan-out) instead of jobs?
This is a part of the problem that I admittedly should have devoted more time to. Every message in the chat should have some sort of sequence number so that if one particular user fails to receive a message from its chat server, it can go to the database and backfill all of the messages that it missed.
@@jordanhasnolife5163 That's a great solution! Having a sequence number totally solves the problem. Feels like by focusing a lot on the backend we tend to forget that the client can also play a role in building a robust system (other example would be deduping through idempotency checks). BTW thanks for all the great work you are doing in this channel!
Hi Jordan
Nice video
Is the load balancer for user service same as multiple load balancer for the message service ?
Why do we need two tables here indexed with user id ? We can just use one with chat id in it ?
I need to know, as a user, which chat I'm subscribed to. And no, they should be distinct load balancers
Can you touch on how the clients have a web socket to the chat server? Presumably the client actually needs to go through the load balancer to not expose our internal chat server. So they would actually have a connection with the load balancer and the load balancer has a connection to the chat server?
I don't see why our client couldn't have a websocket to a chat server, assuming that these "chat servers" solely exist for the purpose of delivering chats to clients (and sending them to other clients).
@@jordanhasnolife5163 Thanks! That's good to know. I just wanted to make sure that this was an instance where we were OK with exposing an internal server to the client.
Hey Jordan! thanks for the video. I have a question around the part where flink consumers sends the message to other users in group chat. The consumer checks with load balancer to know about the hosts to which other users are connected and then consumer sends the message to those hosts. But how does the communication between flink consumer and chat service host occur for consumer to share those message to host to send them further to users?
Chat server can just expose an http endpoint that the flink node can hit
Why do you hashing on the user id and just store the session state in a separate storage like redis ? Sticky session will not guarantee the even distribution of traffic and it might be a bottleneck for very high amount of traffic
Yeah at the end of the day this is fine too, if you have to deliver a user notification to two servers it's not a big deal
Thanks for the video, i had a question, in above design, how are we going to handle retries if a user goes offline and then comes back online 🤔
Poll missed messages from the chat history db, and reconnect to a web server to consumer messages.
I didn't fully understand the decision to use stream processing. Is it because HBase is write constrained so the stream processing acts as a way to queue up the requests? If that's the reason, does it really solve the problem, because wouldn't that just create a backlog in the stream that HBase still would not be able to eat through fast enough?
I'm also not clear why we went for a NoSQL solution for the message table. Could MySQL have worked? Since we're not designing around a write optimized database, is there any additional advantages to using HBase over MySQL?
1) No the stream processing is done because it's expensive to fan out chat messages to so many receipients for group chats.
2) You could certainly use SQL, but in practice probably whatever gets you the fastest write throughput may be best.
Thanks for the amazing content Jordan! Is it fair to say from the proposed design that when the user A wants to create a new chat with User B they hit the metadata service which would drop the chat id in ChatMembers table. When the user actually sends the message this is sent from the chat server -> kafka -> flink. Could this lead to race conditions where message reaches Flink node before the chat id information? Do you have any proposals to handle this.
I think if there's an unrecognized chat ID flink will know that it has to buffer the message until the CDC row of the chat metadata table comes in.
14:02 isn't it the same UUId and different timestamp for replacing the message?
it's the sender timestamp, not the flink timestamp
Hey Jordan! Thanks for making this video. Quick question - it sounds like you're using sharding and partitioning interchangeably to mean the same thing (around 7:00 and 23:00)? Or do you just mean that you would both shard AND partition based on user_id?
I use them interchangeably yep
nice work. its pretty impressive.
however do you mind explaining how timestamp can help us to order message ?
how will clients handle the client buffer ? i was under impression that clients have client buffer in cases when a message is delivered they do not provide all the message until the entire order is received. this way they buffer all the message and then provide the ordered message. (something like strict or total ordering)
i believe they use some total ordering + fifo to achieve this. do you have any idea on how it works. Could you please help me understand or maybe a video on it ?
I'm not quite sure what you mean by the above, however my proposal was the have the message be assigned a timestamp by the database.
Then, we can use the database timestamp to help us achieve a total ordering of all of the messages within a chat, since they'll all live on the same database partition.
@@jordanhasnolife5163 sure, that does make sense. However flink will send message to all the members of group asynchrounusly while writing to DB.
So in scenario let say
m = message and t= time
M1 at T1
M2 at T2
where M2 is recieved first by a member and then M1 . So are we going to show the M2 and then M1 and re-order it later ? or M2 will be buffered until M1 is delivered and then deliver M2 ?
in eariler scenario user might view M2 and when M1 is delivered client will re-order. But this will not be strivt ordering.
In later case how will that be achieved in your solution
thanks for the explanation though
@@Summer-qs7rq Keep in mind that all messages are going through Kafka so they are buffered until the consumer handles the previous one!
Hey Jordan, I really appreciate the videos and have learned a lot from you. Do you use the words "partitioning" and "sharding" interchangeably? I thought partitioning means splitting a table within the same database instance, but was unsure when you explained the User database at 6:30. Thanks for the content, I have been watching it non-stop lately.
I do basically, apparently they have slightly different meanings but no-one ever seems to really properly convey that to me lol
@@jordanhasnolife5163 In SQL context at least, sharding is when data is physically separated across different machines. Partitioning, on the other hand, is just a way to organize data within the same machine. For example, something like order database, orders of user A and user B are sharded into database 1 and database 2. The orders within database 1 can be partitioned based on year of order.
@@knightbird00 yeah as I read more of these papers I see that people partition the same table as well and distribute those so I'm probably gonna continue to piss people off by using them interchangeably lol
Hi @Jordan, thanks for the detailed video. I am bit confused, that chat-members-table is partitioned on userId, and from there we are doing CDC to kafka which is sharded on chat ID.
Can u pls tell what will come in CDC, and how the topics would be in kafka, will it be like one topic for each chat ID ??
The kafka topic would look something like "chat-member-changes", where a row has one of INSERT/DELETE, chatId, userId, and the topic is partitioned by the chatId
Hi Jordan, since you have talked about showing messages of particular chat to the user. If we have sorting of the messages done on the server-side, then instead of returning all the messages and sorting on the device, we could have lazy-loading of the messages. Server can send paginated kind of response for all the messages and load as user scrolls.
Does it makes sense?
I believe I'm proposing the same thing. That being said when you're at the bottom of the messages you need to see them come in real time.
Hey Jordan, thanks for the amazing content. Just one doubt, I understand that in case of group chats, Flink will do the fanout and will tell the load balancer that I want to send messages to (2, 4, 6) but how load balancer knows that which chat server 2, 4, 6 are connected to? Where is the information stored?
We use our consistent hashing schema (stored in zookeeper) to help route our requests so we know which user should be mapped to which server.
In the final design, messages are stored in HBase using the partition key chatId. What if the data for a single chat exceeds the capacity of a single partition?
I doubt that'll happen in this case, but you could make the partition key chatId + date_id
Hi Jordan, I don't understand why having a message queue at 14:14 as a buffer will be enough to solve the throughput problem. If the throughput is higher than what HBase can handle, consistently, doesn't the buffer just get bigger and bigger?
But I get your point to shard the HBase single leader replication. Are you implying that both the message queue as buffer and partitioned HBase can solve the throughput problem?
The reason that we use the message queue here is not because HBase is our bottleneck, it's because the fan out to multiple different consumers is our bottleneck. So we'd rather perform that process asynchronously.
Hey Jordan, thanks for another useful content. I have one question, might be stupid one :(
Why do client device actually needs to connect to any of the server?
To receive msg from someone when you are not online or are there any different reasons as well? like receiving notifications?
I don't know what you mean here, we need the client to be connected to a server so that it can receive any messages.
I think the presentation of this one is a little bit confusing. I lose sight of the overall picture (what pieces are required in order to make a messaging system work) in all the discussions of how particular pieces should be set up and the tradeoffs of using one technology vs. another for them. The ZooKeeper / consistent-hashing / load-balancer part is a good example of this: it sounds like the overall goal is just to ensure that users are evenly distributed across message servers, so why not start with that very high level goal and then delve into the details (how ZooKeeper can solve this problem) as needed. It feels to me like it would probably be better in an interview as well to start by being able to present the high level picture and then show you can fill the sketch out with more detail as the interviewer requires it.
Similarly, when you get to the overall architecture diagram, I feel like I'm missing the connection between the different use cases and the flow through the diagram. What parts of the architecture is the user going to hit when they log on to the service? What parts of the architecture are going to be hit when the user sends a message? Receives a message? How are more advanced use cases such as the removal of a user from the chat going to be handled. (For that matter, what does chat administration look like?)
Another thing that I was wondering about when I was trying to reflect on the example after I watched the video: what function is the chat-user table (recording who is part of the chat) playing in the system? Feels like the motivation for this would be two-fold: access control (which I was wondering about above and isn't much discussed) and also (I think this is a bigger omission) management of open servers. Expanding on that point: when I use Facebook messenger, all my open chats are available to me -- I can look over the history of any chat session at any time. But of course not all these sessions are going to be open. So there probably also needs to be some service that is going to spin up chat servers when the chat is "active" and then spin them down when the chat is no longer active. I send you and Techlead a message one day and we have a brief conversation. Two weeks pass and I send you another message. Seems like when that new message is sent, something is going to have initiate a "create chat" -- a new chat server has to be spun up, web socket connections established for all participants, the history potentially queried (if it is no longer local on the clients of all participants). Then to save on costs and manage scale, chat servers will be spun down every so often. (Maybe this was discussed with the consistent hashing scheme a little bit, just could perhaps be better motivated.)
1) I don't think there is any need for "active" chat servers. Users connect when they're online, or they poll a database from when they first sign on.
2) When I'm a user, and I log in, I get connected to a chat server. When I send a message, I send it to kafka, which sends it to flink, which sends it to the chat servers that other active users in the chat are connected to. This is how access control is performed.
3) The reason I have a table outlining which chats a given user is in, and partitioned and indexed by user id, is so that I can quickly figure out all of the chats I'm in, and poll teh db for new messages in that chat when I come online.
4) Removal of a user from a chat isn't particularly complicated - remove them from the user-chats db, let the change data flow through the db, and now they're no longer subscribed to those messages. Additionally, that chat will no longer be an option when they're polling messages.
5) When the user receives a message, it is because a message came to the chat server that they're connected to, and it relays it to the user via a web socket.
@@jordanhasnolife5163 thanks. On the first point I still think I'm missing something. The chat server creates a web socket with users so it can push messages to them and receive messages from them, right? So when users send a message, is it right to say that is going from user to server to kafka? And then that message goes from kafka to flink to servers to users? So that a chat server is mediating all messages?
for (3), it still seems to me like users could become involved in many chats with many different people over time (just imagine if this were like a hookup app with group chat) -- are users going to poll the database for all those chat groups every time they log in -- to see if there are new messages? would that be expensive? or am i missing something about how that would work?
i.e. do i only poll the database for the most recent chats i was involved in on the assumption that those are chats i might have been disconnected from? and then at any time anyone in one of the other chat groups can "reopen" the chat and the message will be delivered to all participants?
Can you expand on how you'd design the Kafka topics? Is it a topic per user or chat, or something else? I initially thought you had a topic for each chat but looking into Kafka a bit more, it looks like you might be referring to how you can partition Kafka topics.
It's one topic for all chats which itself is partitioned based on some hashing function.
@@jordanhasnolife5163 Thanks for the response. I suspected this may have been the case. Are there any downsides to doing one topic for all chats? I understand that we're partitioning the topic itself and those can be spread across different brokers.
@@itsslo To my knowledge, no, I think the problems come when you have too many topics, not when a partitioned topic has too many messages.
In the final design, a Flink node sends a request to the load balancer for each message recipient. The load balancer, using the cached consistent hash ring, fans out the message to the pertinent chat servers.
Is the implication here that the load balancer is forwarding requests based on the verb, path, and recipientId?
Not sure what you mean by verb but yeah based on the receipient Id we should be able to figure out the chat server they're connected to.
flink noob here - when you say "sharding" do you mean "keyed stream on key 'chatId'" / partitioning on chatId? e.g. 23:30
(also possibly relevant to kafka too)
(don't wanna use the wrong term in an interview and get caught flat-footed)
Fair point! It basically means to use the chatId as your kafka partitioning key and then have one flink consumer per kafka queue, which effectively means they're partitioned the same way.
6:50 is there any issue in storing users table? considering we might not have write conflicts
although now I think about it, user name can be a write conflict, its a bad experience for failing one write and asking user to change user name.
is this right reason to stick with single leader?
Yeah honestly I just don't think there's enough throughput needed here/availability concerns that we need leaderless/multi leader, I'd just do some partitioning and stick with single leader, perhaps with some geographic awareness of which user is writing to the table (aka use different leaders for US vs. China users)
Hi Jordan, here you mentioned grokking system design resource. Can you share the link please? Thank you for the content
It's a book that you have to pay for "grokking the system design" on educative (or you can try finding a free pdf). I don't think it's particularly great, but this is just my opinion
@@jordanhasnolife5163 Thank you, already bought it. I thought maybe you were talking about something else. In my opinion they are too short in explanation. Some cases are good, other too short. Feels like each case has different author.
Hey Jordan, I've one question here regarding the websocket connections between client and our backend servers. Since we do not have a load balancer layer in between once the WS connection is established, how are we going to prevent our system from spam calls or a DDOS attack?
Not sure what you mean here, we have a load balancer determining which server a client should connect to. You can perform rate limiting there.
If you just mean sending too many messages over the web socket, you can perform rate limiting on the chat server and boot the client if need be.
I have a senior level interview in 36 hours and I am literally watching all of these and taking notes before then
I am so fucked dude
Best of luck amigo
@@jordanhasnolife5163 Thank you brother. I'm on video 11 and taking pre-workout with Brandy and some fresh fruit
@@jordanhasnolife5163 Got an offer.
I am pogging (currently).
How'd it go?
@@3rd_iimpact got an offer. Turned it down. Whole thing turned out to be a bit of a bait and switch.
Hi Jordan , just want to understand what will be the behaviour when flink is trying to send the messages to multiple users using LB and Chat service and some of the users don't have an active connection with the chat server or don't have an active internet connection. In this case how will we keep the track of messages yet to delivered and how will the remaining messages be sent again once the active connection is established again between the client and the server.
Yeah this is a fault of mine in this video.
The gist is, it doesn't get delivered. Each client keeps track of a last seen timestamp for message delivery, and then when they next connect to the chat server they can reconnect and fetch all messages since that timestamp.
Hey Jordan! One more question (can't promise it's the last). Why are we caching the chat members table in Flink? Why don't we cache them in a separate Redis cluster using chat__id, for example? Then Flink can query Redis. Also, I understand partitioning cache using chat_id, but what do you mean when you say shard Flink on chat_id. Can we even shard Flink?
Using redis means that we have to make an extra network call to fetch our data, caching in flink keeps everything local. And yes, basically you'd have a kafka queue that is sharded on chat_id, and then one flink consumer per kafka partition.
How does this scale to a mutil data center? Users in different regions are connected to different DC's how would we interact among data centers?
You could do some smart partitioning within chatIds such that those that mostly contain users from x region go to a flink node in x region. But yeah, sometimes messages are just going to take longer if they have to go cross dc.
Hi Jordan, great content! thank you.
One questions though, the send message path is clear (user through web-socket and its server puts the message in the Kafka and then from there the message goes through Flink to the chat table), but the receive message path is not... so Flink talks to LBs and finds the corresponding chat server to connect to the receiver... but how message is sent? where is that component (or arrow)? is there another Kafka for receivers? i just see one arrow to LBs which is for finding the corresponding chat server ... we don't send the message to the LB and it should go through chat-server. appreciate if you clarify that
Flink can just send the message to the chat servers via a normal http request
@@jordanhasnolife5163 Thanks! So to summarize the steps for "receive message path":
1- Flink connects to load balancers in parallel (as mentioned in the video) for users 2, 4, and 6 (in your example). Flink should maintain this information (user 2 => LB 2) to facilitate parallel requests?
2- Assuming Flink is aware of the corresponding load balancer for these users, each load balancer responds to Flink using its cache or Zookeeper with the respective chat server details.
3- Flink subsequently utilizes HTTP (POST) to send messages directly to the identified chat servers and chat server send the message to the user through WS?
Regarding number 1, why Flink doesn't directly ask Zookeeper to get the corresponding chat-server for a user? Why do we have to ask the LBs? Since Flink doesn't know a potential recipient (user) is connected to which LB... The only module that knows all these (up to date) is the ZooKeeper.
@@PoRBvG Flink can absolutely just cache the user mappings to chat server in zookeeper
How do you handle new nodes coming online that change which server a client is supposed to be mapped to? Do you force then to disconnect and reconnect? Or just maintain a key value store of where they are currently connected? I feel like this would conflict with the reason we are using zookeeper in thr first place. Might as well not use it at that point
I would propose that they disconnect and re-connect yes, though the number of servers that have to do this should be minimal because we are using consistent hashing. My proposal is that zookeeper should just store the current hash ring (which the load balancer uses), and then route all messages accordingly.
You could maintain state for where everything is stored, but I feel that this isn't necessary so long as we follow our consistent hashing schema.
How flink will send the messages to the load balancer or the server ??
Aren't we suppose to use kafka here between the flink and the load balance ?
1) HTTP
2) Why?
@@jordanhasnolife5163 You mean we'll send each and every messages to the user using http going through load balancer, not even websocket ???
Suggestions on handling popular chat rooms vs smaller chat rooms?
I think this would start to look very similar to Google docs/Twitter then where we'd occasionally poll the super popular chat room
Why did we create two separate databases for users and chat metadata tables or is this the same db just represented this way in the drawing?
Also, why not use a timeseries database like timescaledb for chat messages?
And, why does the request to get historic messages in a chat need to go through Flink?
1) I think that these would be separate tables. Do we have to put them on different nodes IRL? Perhaps not. That being said, at facebook's scale, I bet they are.
2) My guess is that facebook actually probably does a per user cache of all messages, meaning they fan them out, meaning there are a lot of DB writes per message created. Having a database that place nicely with write speeds (LSM trees) could be nice. Besides that, Cassandra is highly available - if we use things like sloppy quorums in our database we can likely ensure that a message will always be received by our system once sent, even if it takes a while to propagate.
3) It doesn't, it goes through the chat server, which hits the database (or optionally a per user cache of messages).
How does receiving of message happens, once it reaches to Flink node. Is someone subscribing to this Flink node?
You mentioned load balancer receiving the message, then isn't that going to cause thundering herd problem?
Flink sends it to the chat servers associated with (actively connected to) the recipients of the messages
For q2, in theory a load balancer can cause a SPOF, or you could run many in an active-active configuration, listening to zookeeper.
If storing messages is not a requirement, then partitioning the chat servers by chatId would be the correct approach, right? This way, all members will be connected to the same chat server can directly share messages.
While you're correct about that, what if I'm in many group chats? :) We don't want to have too many open persistent connections at once.
Hey Jordan - I’m having a hard time with your 1.0 vs 2.0 for this system design. 1.0 seems simpler and seems to accomplish all the objectives, would it be fair to say that you did this just to provide another perspective and 1.0 would work just as well in an interview setting, or would you say your 1.0 design just seems wrong to you now and this design should be the only one we see?
Hey! The fact that 1.0 was simpler is the reason why I felt the need for 2.0. You may be able to get away with lesser depth in some interviews, but other interviewers may press you for more. I wouldn't say that 1.0 said many incorrect things, but looking back on it I think that repeating some of those solutions word for work would make you susceptible to an interviewer wanting more detail.
On time 13:15 What do you mean by Partition by chat-id in Kafka and also flink? Do we need to partition data to put in Kafka? Would there be multiple kakfa queues per chat-id? I understand partition by chat-id in Hbase bt didnt get the concept in message broker and processor(flink)
Nope! I mean partitioning messages within a given topic based on their chatId such that messages with the same chatId are on the same physical kafka queue so that they are handled in order. Each kafka partition will then have one consumer, basically making it so that the consumers are partitioned the same way.
Hey Jordan!
thanks for a great content!
I want to ask about Flink: you propose to use it in order to store relations between users and chats they are in but is it really possible?
Let's say we have 10B chats overall and id of each one is int64 and 50 users on average per chat where each user also has int64 id
Then we would have (8 bytes chat id * 10B groups) * (50 members * 8 bytes per id) ≈ 30TB of memory
Are my calculations incorrect of my understanding of the world is incorrect regarding the amount of memory a company can afford?
I don't think it would be this bad in reality as I think you're overestimating, but even if you weren't you can throw that on 1000 servers and now use 30gb for caching on each.
If it really was so bad, nbd, we don't have to cache, we can throw them in a sharded database and make an API call. It'll be slower, but who cares haha
@@jordanhasnolife5163 Thanks for an answer, makes sense
I was making assumptions upon your capacity estimates where we have 1B users thus I decided that 10B chats is reasonable amount :)
I rewatched this part many times. Did you say that we will reload old message from the flink by reaching out to HBase? Isn't this a bad pattern using asychronous process to load old message when we could do this with simple api calls?
Nope, maybe I misspoke - HBase is there for clients that missed messages in real time to backfill historic ones before subscribing to new messages
Hey Jordan, I am not clear on the fact that why were are using both kafka and flink together for message passing between chat servers. I understand that Kafka and flink both are stream processing and only one is enough. Do you have a video that gives some more detail in this ?
Kafka and flink are not the same - I have multiple videos devoted to both Kafka and flink that you can watch
Its not clear to me that what is purpose of flink here. Its a stream processing engine where multiple messages are collated and few transformations are done over a window. I didnt see any messages are collated and aggregated. each message is processed separately one by one. so, looks like standard message consumer use case to me. why not use simple consumers and consumer group which can do same job of pushing to clients as well as storing in db.
Agreed on your first point. However I'm using flink here because we want to aggregate the results of the change data capture to create an in memory map of follower relationships.
That's the part we want to collate, not the incoming posts.
Hey Jordan, love you videos man! Keep up! I've a question though, since chat ids would be random and a user can have any number of chats, would we be able to shard kafka queues with so many chat ids? can't we just shard by user id instead?
Hey Ulfat! So it wouldn't be one kafka queue per chat id, but rather each kafka queue is assigned a set of chatIds.
You could send these messages based on the user that sent them, but then you'll basically have to separately figure out who is in this group chat and on top of that send the message to all of those users.
@@jordanhasnolife5163
Is it possible to partition the topic into 100s of millions partitions as the recommended is 200000 partition per cluster ?
Actually I'm working on my project where I have to implement chat and I don't have any idea if I can assign a partition per user by user id cuz then partitions are gonna be in millions.
And also we have to define how many partitions we want to create per topic before hand and later adding more partitions is a whole lot of mess.
@@mymobile550 Hundreds of millions seems unlikely (I'm not proposing one per userId to be clear, what's important is making sure the same flink node is always consuming the same partition). But if you have 1 billion users and 1000 partitions that feels pretty reasonable to me.
At around 15:23:
can't the chat servers which have web socket connections with the intended message recipients directly subscribe to given kafka topic (via chatId) and read from kafka based on individual offsets?
Also, kafka itself can be the message repository of sorts and we dont really need hbase? One can always start from required offset and reread kafka messages if required etc.?
1) Yes, albeit then each chat server now has to connect to thousands of kafka topics, which could very heavily increase the load on it. In designs like these, I think it generally helps for components to specialize: let flink deal with kafka, let chat servers deal with users, and then have a middleware that lets one push to the other.
2) Yes, albeit kafka typically has a retention period which is not infinite. If I want to read messages that are a week old, there's a good shot they've been dumped from that broker.
What happens if the client is not connected, a component from the client is missing. When a client comes back online, if messages are snowflake ids with timestamp the client can say send me message after x for this chat id.. and retrieve from a cache in front of db or the db itself.
Seems reasonable to me.
so the flink here, when receive a new message, will do 2 things, 1) persist the msg to Hbase 2) send that message to other chat group members, how do we guarantee that both of them happen without 2pc? or is it the flink will keep track of the state and retry if fail?
Yeah. So we're only considering the message successfully processed if all of these operations succeed. Because flink reads from kafka, if one of these fails, we can just reread the kafka message and try again. Since all of these operations are idempotent I'm content to retry them as many times as needed.
Sorry if i missed it in the video but how would we design for quickly getting a set of most recent messages from the backend assuming messenger like app where we store messages on the server?
For sure! Just reach out to cassandra, all of the messages from a given chat will be on the same partition so this should be easy.
Hey Jordan, what happens when the user is offline and message was send by someone to a chat id, offline user was part of?
They don't receive the message, and then they poll for it when they come back online
What is a “user” and “metadata” service? Are those servers or an actual micro service?
In reality it's probably a micro service, for the sake of me not drawing all day it's a server
why flink in everything?
It allows us to not have to worry about a bunch of different failure scenarios that can occur when we don't use a combination of persistent streams and stream consumers.
If, as opposed to using Flink, we just sent messages directly between chat servers, I could send a message to a chat server, it could get to the DB, and then the chat server goes down. Now, it's never getting sent to the other chat servers.
Stream processing ensures that each messages will be handled at least once, and then we can build in idempotence within our chat servers to ensure that new message notifications are handled only once.
@@jordanhasnolife5163 thanks any comments on when we should use flink and when spark or hadoop ,could you cover that?
@@saber3112 Yeah I think it's just a matter of whether you want to be processing results as they come in or doing a batch job (once per day) will suffice. For the sake of these problems, most of them thus far have wanted instant delivery to users which is why you see me continue to use flink.
What properties of flink is beneficial over here? Let’s say I have my own stream consumer which consumes from Kafka, sends it to chat server load balancer, saves it to chat table and only after that commits to Kafka. Will this suffice or is there any other guarantee which flink provides which is lacking in this use case?
@@sbahety92I dont actually have any experience with Flink, i’m just a humble engineer reading about things so i can prepare for interviews.
but my understanding of flinks main advantage is that it has check pointing built in. you could write your own check pointing to s3, but i would imagine that’s akin to implementing your own synchronized data structures: ripe opportunities for insidious bugs.
i would expect flink to provide a way to interface with check pointed data structures in a way that involves your kafka/kinesis sequence number. so that way you know what your state is after processing a bit of data.
i doubt that flink checkpoints after every single stream record processed. that’s where kinesis/kafka streams being replayable its key and why knowing records can be relayed in failure scenarios is key for design.
so in short, you can reinvent the wheel if you want to. but someone already has gone through the trouble
I dont really understand how flink works. Ive never used it before.
It needs to complete wrjting the message to the db before its done processing and can more on to thr next kafka message. Doesnt this slow down kafka message processing so users messages will be delayed? So the single leader aspect of hbase is still a potential bottle neck.
Or do we havs multiple flink instances. One to send user messages to queues and one to write to db?
It is probably true that synchronously writing to the database before delivering messages to the users will slow down the amount of time before a message reaches the user. That being said, it also guarantees that said message actually gets to the database without requiring a two phase commit.
I've opted for correctness here over speed, but if you wanted to deliver writes to users as soon as possible without a bunch of concern over whether they make it to the database you could do that too.
In practice the delay for message delivery isn't something that I feel I'm able to predict without some amount of benchmarking. I think that worst comes to worst though, an increased amount of partitions is always a good way to increase our write throughput.