Brief Outline 00:00:22 Unique Users Problem 00:00:55 Problem Requirements 00:01:29 Terminology/Paradign Overview 00:03:42 Session Expiration 00:05:21 Change Data Capture 00:06:37 Calculating Number of Unique Active Users - Naive 00:08:42 Calculating Unique Number of Users - Better 00:10:50 Calculating Unique Number of Users - Best 00:11:58 Calculating Number of Users - Best Approach 00:14:00 Generating Derived Data 00:16:11 The Problem With Snapshots 00:17:49 HypberLogLog 00:19:35 HyperLogLog Example 00:20:45 Building Our HLLs 00:22:46 Reading Our HLLs 00:23:59 Final Diagram - Count Unique Active Users Thanks, Jordan~
Thank you for the videos. CDC is such a powerful tool and can be used in many system design. In the derived data where you build TSDB table to find session based on start / end time, you are partitioning based on start/end time. -> if we are partitioning based on hash range of start/end time( hash based partitioning), wouldn't the query to fetch the session by start/end time would be scatter/gather in nature means you have to read from multiple partition. Read will be slow. -> if we are partitioning based on range of start/end time (range based partitioning), wouldn't we create a hotspot partition where all the write are going to? Here also, we have to read from multiple partition if the start/end time range is not on one partition (less common) but read are better in this case compared to previous one. BTW, the query of finding unique number is somewhat similar to Range Sum Query with Range Update.(May be).
1) Agree range based partitioning is the way to go here 2) This is why I propose using derived data, we can buffer the writes and upload them in batch. There will be hot partitions, certainly, but we can also introduce either caching or re-partitioning within a given range to keep the amount of scatter gather to a low degree.
Based on the examples here, if theoretically snapshots happen every 10 mins, and the query is something like 11 : 14-11: 18, then a combination of snapshots (11: 10 & 11: 20) + Time series DB query (11: 14-11: 18 start & end times) will still miss a session that started at 11: 12 and ended at 11: 19. I guess realistically, either we have snapshots at minutely granularity or ensure the queries that users have spans across two snapshots granularity (eg: multiples of 10 mins in this example)
See 12:24. Get snapshots at the start and end of the granularity (11:10 and 11:20). And then use the time series db to find all starts and ends from 11:10 to 11:20. Then filter form there.
At ~23:00, I believe the right way of aggregating HLLs of multiple snapshots is to step 1: aggregating for each bucket separately by taking max value of the given bucket across all snapshots, and only then do the step 2: average the value of aggregated buckets (let's call it X) and then calculate 2^X being the approximate num unique users in the range. In your example it seems like you do these two steps in reverse order which can lead to a way off approximation the longer the range is. E.g. imagine you want to approximate unique active users across 30 days through 30 daily snapshots, where in reality your app has ~50m daily actives and ~500m monthly actives. With your way of aggregating snapshots, it will approximate ~50m monthly actives instead of ~500m. Am I missing something?
I'm confused with what you're saying because I don't see myself ever performing step 1. I'm just doing step 2. "aggregating for each bucket separately by taking max value of the given bucket across all snapshots" I agree with what you're saying if we have multiple buckets. In my example we only have one.
Another randomized idea: when the clients are sending their heartbeats, we forward them to a single server which, every minute, inserts a user-id into a hash set with probability of 0.001 (1/1000). This way even if we have a billion users the expected size of the hash set is only 1 million (fits in RAM). At the end of a minute, the approximate number of unique users is 1000 times the size of hash set. And we clear the hash set after. If we're worried about too many requests going to a single server, we can do this across more servers partitioned by user-id's and just sum the result.
Seems reasonable to me (though my stats knowledge isn't great lol) - my one suggestion would be to do the randomization client side so that you don't overload the network on your single server.
What do you think would be more accurate? 1) (as in the video) Averaging the HLL values and then doing 2^avg. or 2) Calculating 2^HLL and then taking the average.
Jorden, Very very thanks for all your efforts. Why can't we add key to redis with time to live(equal to session time), with hearbeat we can increase TTL. , Redis gives the command to count the keys. Every thing seems less complicated with this.
1) How does this get us historical counts over arbitrary windows? 2) You still need to run a scatter/gather query over all of the redis nodes, but that's inevitable if we're trying to get an exact count in a given moment. But if we want it for a given moment do we use locking to keep the snapshots of counts from each redis node consistent? Then we can't make any writes.
@@jordanhasnolife5163 You are correct. Thinking alternative solutions again(As somehow I think this is fairly complex- but working and I would have never come up with this solution on my own, so no negative thoughts it is a great work.), what if we store all pings in db, these will be massive writes, what if we can filter ? Something like bloom filter or cuckoo filter should help to reduce writes, if the session is already stored for last minute. Thoughts? I think this is similar to your flink(in memory) approach, where you keep everything in memory, bloom /cuckoo filter could be a small optimization on it.
@@nalamda3682 Sure, can you elaborate on the optimization part? I think as I see things, we're using heartbeats to basically only write a "session started" and "session ended" event so trying to see how this cuts down on the number of writes.
I am wondering why we cannot just +1 for any new user's start, and -1 for any user's end (if this is the last session we are maintaining for that user)?
Lock contention! You can partition the counters, but that basically describes what we're doing for the realtime count. Unless of course you mean individual rows with +1 and -1. In that case, you can aggregate the +1s and -1s over the selected timestamps, but then you have to actually do all of that work anytime that someone makes a query. Basically I think you'd end up just reimplementing my solution eventually.
Hey Jordan. Thank you for the video. I'm wandering could we use two continuous HyperLogLog. First HLL named "Positive" will count all logins. Second one "Negative" HLL will count all logouts and session expirations. I assumed that session server will send these two events. Will delta of "Positive" HLL and "Negative" HLL give us quantity on active/online users?
At that point, I wonder why you'd even need a hyper log log as opposed to just using a normal counter. I could see it being totally fine for the current count, but it doesn't really help us too much with doing so for arbitrary time queries I don't think Cool idea!
Hey Jordan! Great video! I'm curious why we're using a Time Series DB here to store the start and end index data(also assuming user ID and/or session ID is in the TSDB?) instead of just putting the start and end timestamps in the mysql sessions_snapshot table and just query the mysql table to get all the session data counts? Thanks!
If we are querying for unique users, do we need to query session start and end dbs? If yes, in case an user has multiple sessions, will active user service fire 'select unique userId' query from the dbs?
Yeah when you combine all of the sessions together you can deduplicate the userIds. If it's too big to fit in memory you'll need some sort of shuffle phase based on the hash of the user id to do this.
Is the Session DB sharded by user ID and the Session Db indexed by start time and end time 3 different databases or are they the same db with multiple indexes on the columns?
Hey Jordan, thank you for the video 🙏 WDYT solving the start and end time like the twitter follower and followee problem? having two tables in cassandra (userid, startTime, endTime) - both partitioned by userid but one have a sort key on startTime and one on endTime where we could sync the one with the endTime using cdc (kafka+flink) from the startTime table? then we can just query both of them to get the count
@@jordanhasnolife5163 I mean there is user for example, whose session starts at 1:00 and ends at 1:30, they are still active user in the range of 1:08 to 1:23, but they are not counted.
How does the session server know that a given session has expired? I mean, I get the part that it hasn't received a ping from the user, but the server has to be stateful and know all the users it is waiting pings from, am I right?
Brief Outline
00:00:22 Unique Users Problem
00:00:55 Problem Requirements
00:01:29 Terminology/Paradign Overview
00:03:42 Session Expiration
00:05:21 Change Data Capture
00:06:37 Calculating Number of Unique Active Users - Naive
00:08:42 Calculating Unique Number of Users - Better
00:10:50 Calculating Unique Number of Users - Best
00:11:58 Calculating Number of Users - Best Approach
00:14:00 Generating Derived Data
00:16:11 The Problem With Snapshots
00:17:49 HypberLogLog
00:19:35 HyperLogLog Example
00:20:45 Building Our HLLs
00:22:46 Reading Our HLLs
00:23:59 Final Diagram - Count Unique Active Users
Thanks, Jordan~
Thanks Leon!!
Thank you for the videos. CDC is such a powerful tool and can be used in many system design.
In the derived data where you build TSDB table to find session based on start / end time, you are partitioning based on start/end time.
-> if we are partitioning based on hash range of start/end time( hash based partitioning), wouldn't the query to fetch the session by start/end time would be scatter/gather in nature means you have to read from multiple partition. Read will be slow.
-> if we are partitioning based on range of start/end time (range based partitioning), wouldn't we create a hotspot partition where all the write are going to?
Here also, we have to read from multiple partition if the start/end time range is not on one partition (less common) but read are better in this case compared to previous one.
BTW, the query of finding unique number is somewhat similar to Range Sum Query with Range Update.(May be).
1) Agree range based partitioning is the way to go here
2) This is why I propose using derived data, we can buffer the writes and upload them in batch. There will be hot partitions, certainly, but we can also introduce either caching or re-partitioning within a given range to keep the amount of scatter gather to a low degree.
Based on the examples here, if theoretically snapshots happen every 10 mins, and the query is something like 11 : 14-11: 18, then a combination of snapshots (11: 10 & 11: 20) + Time series DB query (11: 14-11: 18 start & end times) will still miss a session that started at 11: 12 and ended at 11: 19.
I guess realistically, either we have snapshots at minutely granularity or ensure the queries that users have spans across two snapshots granularity (eg: multiples of 10 mins in this example)
See 12:24. Get snapshots at the start and end of the granularity (11:10 and 11:20). And then use the time series db to find all starts and ends from 11:10 to 11:20. Then filter form there.
Struggler completes epic RUclips vid with severe case of ligma 🙏🏻 thankful for your work as always
Thanks I'm gonna have to get over my ligma rn
At ~23:00, I believe the right way of aggregating HLLs of multiple snapshots is to step 1: aggregating for each bucket separately by taking max value of the given bucket across all snapshots, and only then do the step 2: average the value of aggregated buckets (let's call it X) and then calculate 2^X being the approximate num unique users in the range. In your example it seems like you do these two steps in reverse order which can lead to a way off approximation the longer the range is. E.g. imagine you want to approximate unique active users across 30 days through 30 daily snapshots, where in reality your app has ~50m daily actives and ~500m monthly actives. With your way of aggregating snapshots, it will approximate ~50m monthly actives instead of ~500m. Am I missing something?
I'm confused with what you're saying because I don't see myself ever performing step 1. I'm just doing step 2.
"aggregating for each bucket separately by taking max value of the given bucket across all snapshots" I agree with what you're saying if we have multiple buckets. In my example we only have one.
Another randomized idea: when the clients are sending their heartbeats, we forward them to a single server which, every minute, inserts a user-id into a hash set with probability of 0.001 (1/1000). This way even if we have a billion users the expected size of the hash set is only 1 million (fits in RAM). At the end of a minute, the approximate number of unique users is 1000 times the size of hash set. And we clear the hash set after. If we're worried about too many requests going to a single server, we can do this across more servers partitioned by user-id's and just sum the result.
Seems reasonable to me (though my stats knowledge isn't great lol) - my one suggestion would be to do the randomization client side so that you don't overload the network on your single server.
@@jordanhasnolife5163 Nice, makes sense.
stunning! I'm becoming kind of addicted...
That mustache is straight fire! How many feet pics did you score with that bad boy?
I feel like I'm always sending them and no one ever sends them back 😞
@@nobodyismyself empty body
What do you think would be more accurate? 1) (as in the video) Averaging the HLL values and then doing 2^avg. or 2) Calculating 2^HLL and then taking the average.
I'm no mathematician, but I'm guessing the former, feel free to read the HLL paper though as they'll be able to explain way better than I ever could
Jorden, Very very thanks for all your efforts. Why can't we add key to redis with time to live(equal to session time), with hearbeat we can increase TTL. , Redis gives the command to count the keys. Every thing seems less complicated with this.
1) How does this get us historical counts over arbitrary windows?
2) You still need to run a scatter/gather query over all of the redis nodes, but that's inevitable if we're trying to get an exact count in a given moment. But if we want it for a given moment do we use locking to keep the snapshots of counts from each redis node consistent? Then we can't make any writes.
@@jordanhasnolife5163 You are correct. Thinking alternative solutions again(As somehow I think this is fairly complex- but working and I would have never come up with this solution on my own, so no negative thoughts it is a great work.), what if we store all pings in db, these will be massive writes, what if we can filter ? Something like bloom filter or cuckoo filter should help to reduce writes, if the session is already stored for last minute. Thoughts? I think this is similar to your flink(in memory) approach, where you keep everything in memory, bloom /cuckoo filter could be a small optimization on it.
@@nalamda3682 Sure, can you elaborate on the optimization part? I think as I see things, we're using heartbeats to basically only write a "session started" and "session ended" event so trying to see how this cuts down on the number of writes.
I am wondering why we cannot just +1 for any new user's start, and -1 for any user's end (if this is the last session we are maintaining for that user)?
Lock contention! You can partition the counters, but that basically describes what we're doing for the realtime count. Unless of course you mean individual rows with +1 and -1.
In that case, you can aggregate the +1s and -1s over the selected timestamps, but then you have to actually do all of that work anytime that someone makes a query.
Basically I think you'd end up just reimplementing my solution eventually.
Hey Jordan. Thank you for the video.
I'm wandering could we use two continuous HyperLogLog. First HLL named "Positive" will count all logins.
Second one "Negative" HLL will count all logouts and session expirations.
I assumed that session server will send these two events.
Will delta of "Positive" HLL and "Negative" HLL give us quantity on active/online users?
At that point, I wonder why you'd even need a hyper log log as opposed to just using a normal counter.
I could see it being totally fine for the current count, but it doesn't really help us too much with doing so for arbitrary time queries I don't think
Cool idea!
Hey Jordan! Great video! I'm curious why we're using a Time Series DB here to store the start and end index data(also assuming user ID and/or session ID is in the TSDB?) instead of just putting the start and end timestamps in the mysql sessions_snapshot table and just query the mysql table to get all the session data counts? Thanks!
Hey! We could use MySQL, but at the end of the day this is time series data, so I think it is very reasonable to use a time series database.
@@jordanhasnolife5163 thank you. good sir!
If we are querying for unique users, do we need to query session start and end dbs?
If yes, in case an user has multiple sessions, will active user service fire 'select unique userId' query from the dbs?
Yeah when you combine all of the sessions together you can deduplicate the userIds. If it's too big to fit in memory you'll need some sort of shuffle phase based on the hash of the user id to do this.
Is the Session DB sharded by user ID and the Session Db indexed by start time and end time 3 different databases or are they the same db with multiple indexes on the columns?
Most likely different dbs I'd say, becomes tough to make them the same DB when we have tons of sessions, otherwise we need to use local indexes
Great video!
Hey Jordan, thank you for the video 🙏
WDYT solving the start and end time like the twitter follower and followee problem?
having two tables in cassandra
(userid, startTime, endTime) - both partitioned by userid but one have a sort key on startTime and one on endTime where we could sync the one with the endTime using cdc (kafka+flink) from the startTime table?
then we can just query both of them to get the count
I think that the derived data is fine but you then miss the case where a user is active before the time range starts and after it ends
By taking snapshot start/end at the 1:05 and 1:25, we are still missing to count those snapshot that starts/end before/after 1:05 and 1:25 right?
Why? I just want the user counts from 1:08 to 1:23.
@@jordanhasnolife5163 I mean there is user for example, whose session starts at 1:00 and ends at 1:30, they are still active user in the range of 1:08 to 1:23, but they are not counted.
Awesome great vid! Can you do a system design for a banking system?
What do you see the functional requirements/complexity of a banking/payment system being?
How does the session server know that a given session has expired? I mean, I get the part that it hasn't received a ping from the user, but the server has to be stateful and know all the users it is waiting pings from, am I right?
yep - we need to make sure to route the heartbeats to the same server.
does this work for facebook status light system desgin question?
I'm going to say no. You'd probably want to actually be using something like TTLs to keep that status going. Probably plenty of overlap though.