Excellent stuff and lot of details covered in short time. I always need to watch your videos multiple times to grasp all the intricacies since your content covers so much depth.👏
Bookmarks Talking points 4:02 Job types - Cron, DAG (build>test>deploy), Can include SLA Guarantees, binary size, job sandbox, security timeouts. 5:46 DAG scheduling, data schemas and sharding (need ACID, single leader, replication for reads) Execution 12:04 Scheduler table, may use status with locks (scheduled, running, complete) 14:20 Scheduling Performance, can use scheduler, use queue with priority with consumer groups 21:00 Job completion, run at least once or only once, idempotent jobs 28:00 Diagram
Excellent video Jordan 1. I have few doubts on how the system would scale when R1. For a high priority job scheduled at 2pm i want it to get executed within 200ms of scheduled time Constraint : The s3 binary for the job itself might be 100 mb , and downloading that would take 5 sec . Here is my high level approach Two options here . 1.Have a resource manager 2.Execution Planner 3.Executor Execution planner , at 1.30 pm starts and see what are the tasks planned at 2.00 pm . Categorizes them into high resource , medium resource, low resource and how much Talks to Resource planner pre identify apppropriate workers and pre warm the nodes , 1. Pre download the s3 binary Creates task execution , worker node mapping Any changes eg. cancellation are communicated to the worker nodes, Now at 2.00 pm , it can again result into a thundering herd problem where the database gets inundated with queries , To avoid that , we can push the jobs , before to workers , and a local cron job , so it runs exactly at 2.00 pm , since the binary is already downloaded.
Seems fairly reasonable to me. I think if any tasks came in like this you could just ensure that they were split into a binary pre cache step and a run step. You'd either then have to ensure that those steps run on the same physical node, or the physical node would basically have to remain idle from 1:30 to 2
@@jordanhasnolife5163 Thanks that should be better IMO , Using existing system , just divide into two part and preschedule with a constraint like job schedule time < T+30min , and schedule .
Hey Jordan! This is super helpful. Thanks so much! Quick question about ensuring jobs only run once - the retry logic via the scheduling node a la timestamps is intentional right? As in, we intend to run the same job more than once so was a bit confused by that concern at around 23:05. Even if an executor goes down, we want to retry that job right? I guess I was bit confused why "running jobs once" is even a concern as that is expected behavior.
I think the gist is 1) you don't want two of the same job running at the same time 2) you don't want the same invocation/iteration of the same job running more than once if it isn't idempotent (e.g. sending an email)
Thank you, Jordan! I still have some clarifications to get a better understanding: 1. What does "step" mean in the context of updating the run_timestamp each time we process the job? For example, if we update the job's run_timestamp from 2:01 to 2:06, is this just a one-time update, or do we continue to update it at subsequent steps, say from 2:06 to 2:11? 2. I'm struggling to understand the need for the run_timestamp according to "increase the run_ts for reflect how much time we should wait before rescheduling the job". Especially when we already have a status column. Typically, we can determine which jobs to queue by checking the status field, for example, moving jobs from "READY" to "PROCESSING". For scenarios involving failure and retry, if a job fails and the executor is still operational, we could simply update the status to "FAILED". If the executor fails, it seems another executor pick up the job via a message queue, and handle the status updates accordingly? 3. Concerning priority scheduling, is there a risk of resource wastage, especially since it appears that all long-running jobs might subsequetitially occupy all executor resources connected from low to mid and to high-level message queues, since we always have any job start from the lowest level?
1) Steps: Job is read by scheduling cron, job gets put in message queue, job reaches executor. Nope, we'll continue to update it in the future if we retry a job! 2) If we don't have a run timestamp, we will just constantly retry the job every time that we poll our scheduling table. If we instead use some sort of enum like a status to say whether a job is completed, in progress, or failed, then we may not retry the job if the node running it goes down and can never tell us that it failed. 3) Yes, but that's typically why you have the lowest queues have a pretty small timeout. In theory, we could also have users submit a minimum priority to run at when they submit a job.
@@jordanhasnolife5163 Continuing on this, when exactly we update the run_timestamp ? If we do it everytime, then we will end up running the job again although it's finished in an earlier run isn't it ?
@@rakeshvarma8091 The run timestamp is updated to say our restart time if we reach it. In the case of finishing the job, we can remove our entry from the table upon completion, or use a separate status column to say don't run it again.
QQ Jordan: Why we have put 2 Kafkas to stream job events from "DAG Table" and "Cron Table" to "SchedulingTable"? Why didn't we put 1 Kafka after Job EnqueueService to stream events (Cron or DAG) into the Scheduling Table? I believe this would simplify the design and also make JobEnqueueService asynchronous. One possible advantage I can think of in your design is that by committing initially in DB, we are ensuring the clients that we have successfully taken job requests. Whereas Kafka messages can drop even though Kafka has high retention policy.
I'm putting things in a database first so that I can reschedule them if need be. This is especially relevant for the DAG table. I need to know when all of my dependency jobs have been completed before enqueuing an object again. This could probably be done with kafka and flink but I think that overcomplicates things a bit.
awesome content! qq - at the last where you show the high level diagram, for cron case which component is responsible for keeping adding the "next/future" schedule?
I envisioned it as the worker running the previous invocation of the task would schedule the next. That certainly has pros and cons though, especially for graphs, and perhaps a dedicated component would be best.
Thanks for the great content! Regarding the idea of assigning a job to a worker (as you named it "dumb" :-)). Couldn’t we use a workers’ status table (which they update via heartbeats) to find an available "proper (priority)" machine, assign the job to it, and place it in the queue? That way, only the designated worker processes the job. Once the job is completed (either successfully or with failure), the result is sent to the job creator service, and the status field in the job table is updated for the scheduler. In the case of a failure, the scheduler would re-queue the job according to its retry policy, potentially assigning it to a different machine based on the updated status. What do you think? jobs => (1) create/submit job service => job DB (job table, job schedule table, DAG table)=> (2) job scheduler=> SQS=> (3) executors => go back to (1) (create/submit job service) | | | (read/poll) | (write: heartbeat to DB) machine DB (executor store table)
Yeah that's fine, I don't think you'd need a queue then though. I'd also say to beware of potential contention on that database table, since hundreds of jobs will be looking for the first available worker with a certain priority. I'd be curious to see how such a design plays our in practice and where bottlenekcs might come up.
@@jordanhasnolife5163 a- The priority queue helps balance the load on the executors, instead of pushing a new task to them, allowing them to pick up assigned jobs as soon as they are ready. b- Regarding contention: The task scheduler acts as a load balancer, routing jobs to executors based on real-time information (heartbeats) about their availability and capacity. Without stamping the jobs with the worker_id, the workers still need to decide which job to take (Reverse contention)
Great video! In the part about scheduling dag jobs, why we have to declare job 1 and 2 are dependencies of job 4 and 5? The execution of job 1 and 2 are based on the current timestamp and if it is their turn, they can run without assuming 4 and 5 are dependencies. On the other hand, knowing job 4 and 5 are not the dependencies of any other job, we can tell that they are the last job to run and can mark the dag as succeed? So having job 1 and 2 to say they are dependent on 4 and 5, how can we tell a dag is finished
Yeah this was just my convenience method of being able to tell that all leaf nodes of a dag job had been run and therefore automatically scheduling the root nodes again. In reality, a lot of DAGs start on a cron anyways, so you can basically just have a cron job to enqueue the root node for scheduling and then go from there
Just wanna say I really like the addition of the initial high level design! Definitely wouldn’t say it was incomprehensible before (I think your other videos are great too, thanks for all the content!), but this style definitely feels a little more like interview style and helps to better understand where your deeper explanations fit in the system.
Question at time 22:42: if we have the table with the partition key (run_time + random number), we cannot update the run_time for next retry (at 12:45), am I right? NoSQL DB doesn't allow to update the partition key.
In the slide starting at minute 6:00, I'm curious as to what the best strategy for the database logic is to schedule a job based on its dependencies, e.g. for job 3, when 1: 1 and 2: 1. Is the logic dependent on the epochs of the parent nodes becoming unequal and then equal again to trigger job 3?
Jordan, excellent video, for the case of using kafka as the message queue, one consumer in a consumer group can process the job in a partition sequentially, what if a job takes a long time to run, will it be a problem of blocking all following jobs in this partition ?shall we wait or kill the job after sometime or move it to the retry queue ?
One approach is to use a distributed lock in Redis to prevent multiple executors from rescheduling the same job. An executor can acquire a lock on a job ID by creating an entry in Redis with a TTL. Do you think this is a good idea?
Redis doesn't use distributed consensus, so it will be faster than zookeeper. But it can also go down, so I suppose you'll have to make that decision for yourself!
everytime rewatch the video feel learning something new. One new question: at 16:40 for the problem about load balancing (LB need to know status of the all executor, and can be single point of failure); isn't this the problem for all LB (not only for job scheduler problem)? for point of failure we can use multiple LB (active-active/active-passive etc.). Asking as if it is a common problem for LB, why LB are used frequently in most of other designs?
You're correct, load balancing can *be* a single point of failure, and we mitigate that through active-active/active-passive configurations of load balancing.
Hi Jordan, thanks for the great content! I have a question on the job scheduler/poller. Is it possible to make job scheduler poll a partition range by reading some partition/consistent hashing ring assignment from ZK (or like a kafka consumer to handle a particular partition), so each job scheduler could run independently in parallel and we don't need to worry about the same job being picked up by different job scheduler?
Hi Jordan, at 12:54 why do we need to update run_timestamp and add 5 minutes ? What i understand is that we are updating it to make sure we dont re run it in case it is already in execution. Cant we track it using status and some error message field.
@@jordanhasnolife5163 but that would be a new record for same job id in the table, right ? As per my understanding, that table is maintaining different runs of jobs at different timestamps. So we might not need to update any timestamp here. As soon as a job's instance runs at time 12:01, it will create a new entry in table whether it runs successfully /errors out. The instance corresponding to new entry will start when that timestamp is crossed.
Hey Jordan, thank you for your video, it's very inspiring. For the scheduler table, could you please clarify the rationale behind using a combination of time_range and random_number as the partition key? At the end of the day, wouldn’t it achieve the same result as using time_range alone?
Hi Jordan, thanks for the amazing videos, really learned a lot from you. One question here I feel confused is that there are many nodes in the design like task queuing node, scheduler node, for me they are just some services, like task enquing service or scheduleing Service, is there any reason you draw them as "node"? Thank you!
at 22:12 about indexing, I am wondering if we index by status then when we want to update the delivery has been succeeded, don't we need to search that job id without the index (which would take a lot of time)?
Do you think if it makes more sense on just creating schedules whenever it gets to the scheduled time? Executor could possibly take a long time to execute a heavy job and therefore the scheduling will be delayed and users might be confused on why the job was not kicked off on the scheduling window.
Not entirely sure what you mean here, feel free to elaborate. When the job gets to the executor has nothing to do with the scheduling time, once the job gets to the executor, we'll increase the retry timestamp as well
is it good idea to serialize DAG in application code (topological sort) and treat it as a single task (containing bunch of sub tasks which are serialize DAG tasks), have one worker executing the subtasks orderly ?
Probably not because people may still have other constraints to starting subtasks such as a time, so then the worker has to sit idle. Plus they may have different CPU requirements.
Another questions about SchedulingTable: How do we query to get all rows before current time (STATUS_StartTime is partition key and JobId is sort key) since it would require SCAN operation on this huge table? What I know that we can't execute range query on partition key without scan. Even if we take STATUS_JobId as partition key and StartTime as sort key, it will still require SCAN operation in NoSQL and SQL DBs. Shouldn't we just keep "STATUS_StartTime" as partition key (where StartTime is time in unit at minute level precision) and JobId as sort key? The SchedulerNode will poll every minute all data from the current minute time interval partition.
We aren't partitioning by start time, we're indexing by start time (partitioning by start time will probably lead to hot partitions). Then you're just doing a normal index read on each partition, where the entries you care about are right at the front of the DB.
@@jordanhasnolife5163 Understood. Assuming its NoSQL (say DynamoDB) where index is different from SQL DB index. In NoSQL DDB, indexes are created based partition key and sort key. So what would be the partition key/sort key?
@@shahnawazalam9939 Partition key would probably just be some sort of randomly generated job id, and the sort key would be the timestamp at which we want to run it.
Thanks Jordan for your video! But have one question: Whether DAG jobs and cron jobs will have some overlapping? I understand that for simplifying the design, we can see that in most cases DAG jobs rely on job dependency finish and cron jobs rely on the time. But if it is possible that some DAG jobs may also be the cron jobs? If this is true, whether that means we need more cols in cron table for this? Or may need an extra table for this? Thanks a lot.
Typically the first nodes in the dag will be on some cron schedule, so yeah I would agree there would be additional logic to do there! I don't know that we'd need more logic in the cron table to do this, I think it's more so just what timestamp you throw on the dag job when you put it in the scheduler table (for the next time that it should run)
I'm less familiar with redis distributed locks and whether or not they are consensus based. If they are, that means they persist on hardware failures (which I know zookeeper locks do). I'm sure they're faster than zookeeper though.
I am wondering how we know the job in scheduler task needs to be run? are we going to have the executor repeatedly query on the scheduler DB to check if current_time > run_timestamp? and this should be as frequent as even every one second (if we want an accurate one)?
Great content, one question about the "cron-table". Is it used in your final solution? I can't understand when it used, maybe except for the first scheduling, since you are rescheduling the heads of the DAGs by putting them as the dependencies of the tails. Am I missing something?
Dah yeah I mean you basically want to ensure that if Cron schedule changes you can update that in the scheduling table, so tasks should read from the Cron table when they schedule their next instance
@jordanhasnolife5163 . At 18:16, we mentioned that we can't use kafka partitions technique since earlier messages will be blocked to get picked up due to long running Executors. Broker technique works. But can't we still use Kafka partition technique? So, can we make "Kafka TO Executors" stateless (async) instead of sync using a Master Slave architecture where Master (replicated as all Leaders) which will quickly pull messages from one of the Kafka partitions as per Executors (workers) availability and just send it to one of the Executors? Honeycomb handles over 1M transactions via 70 Kafka partitions with a single Kafka topic
If we make the root node dependent on its child nodes, wouldn’t this make the graph no longer acyclic? How would we be able to figure out which one is the root node in this case?
The node will still have higher epoch number I presume but yeah, not sure what all issues it can create. We can take a look at how argo scheduler works and use that idea as well probably.
Hi Jordan, I am newbie to system design, I have a couple of questions, I assume that the executer is the pool of cron jobs scheduled to run every minute. I also that only one cron job will pull the scheduled tasks eligible for running. My questions are: 1- What if we have many tasks scheduled at a particular interval and all these get picked up, what is the liklihood of this scenario, and should we even care about the throttling of the executer? 2- Is running the task exactly at the specified time a non functional requirement? Or do we allow a margin?
1) The executor is basically a bunch of random nodes responsible for running a task, that is passed to it from the message broker. I'm not sure what you mean by this question, we'll absolutely have a lot of tasks scheduled at once. 2) I suppose that's up to your interviewer, the more that you partition those scheduling tables the faster you can get jobs in the queue, but this doesn't guarantee when they'll be run if there aren't enough executors available.
Thank you, Jordan! Your videos are really helpful. I have a request for one of the amazon's most asked HLD system design interview questions - traffic control system. Would be really helpful if you could make a video on this🙏
Hi Jordan, If DAG update isn't needed (as in if it's a simple cron job) then does executor directly updates schedules table, as there won't be CDC in this case?
schedule(s3_link_to_binary, cron_schedule) -> returns job id schedule(some_graph_specification_with_specific_binaries, cron_schedule) -> returns list of job id get_status(job_id)
I still prefer reading compared to watching videos for tech stuff. Wondering whether you can also publish your content as writing somewhere. There are also platforms writers get paid for their content. Or probably a book like Alex Xu's.
On the DAG Table, you mentioned "When all dependency task have an equal epoch for a given row, schedule that task". By epoch, do you mean just a counter? If we use an actual linux epoch (which is number of seconds elapsed 1 January 1970), they won't be the same because two task will finish in different time.
@18:42 If our system has billions jobs per day, thats 10K jobs/second. If probably needs at least several hundreds of executors listening to the in-memory broker (or even kafka). Would it not overwhelm the broker when thousands of executors keep polling it, and in case there are jobs available to execute, which design pattern (or method) can assure there is no concurency bottleneck and same job is not picked by mutiple executors.
Excellent stuff and lot of details covered in short time. I always need to watch your videos multiple times to grasp all the intricacies since your content covers so much depth.👏
One of the excellent System design videos I have ever seen, Touched all the concepts in 30 minutes.
Bookmarks
Talking points
4:02 Job types - Cron, DAG (build>test>deploy), Can include SLA Guarantees, binary size, job sandbox, security timeouts.
5:46 DAG scheduling, data schemas and sharding (need ACID, single leader, replication for reads)
Execution
12:04 Scheduler table, may use status with locks (scheduled, running, complete)
14:20 Scheduling Performance, can use scheduler, use queue with priority with consumer groups
21:00 Job completion, run at least once or only once, idempotent jobs
28:00 Diagram
Your videos are gem, many so-called paid courses do not have this level of quality.
Another Jordan classic - great learning material as always! Thank you Sir!
Excellent video Jordan
1. I have few doubts on how the system would scale when
R1. For a high priority job scheduled at 2pm i want it to get executed within 200ms of scheduled time
Constraint : The s3 binary for the job itself might be 100 mb , and downloading that would take 5 sec .
Here is my high level approach
Two options here .
1.Have a resource manager
2.Execution Planner
3.Executor
Execution planner , at 1.30 pm starts and see what are the tasks planned at 2.00 pm .
Categorizes them into high resource , medium resource, low resource
and how much
Talks to Resource planner pre identify apppropriate workers and pre warm the nodes ,
1. Pre download the s3 binary
Creates task execution , worker node mapping
Any changes eg. cancellation are communicated to the worker nodes,
Now at 2.00 pm , it can again result into a thundering herd problem where the database gets inundated with queries ,
To avoid that , we can push the jobs , before to workers , and a local cron job ,
so it runs exactly at 2.00 pm , since the binary is already downloaded.
Seems fairly reasonable to me. I think if any tasks came in like this you could just ensure that they were split into a binary pre cache step and a run step. You'd either then have to ensure that those steps run on the same physical node, or the physical node would basically have to remain idle from 1:30 to 2
@@jordanhasnolife5163 Thanks that should be better IMO , Using existing system , just divide into two part and preschedule with a constraint like job schedule time < T+30min ,
and schedule .
A lot of other SD youtube or other coaches never go into the depths you are going , with so less of a experience , this is L6 - stuff definitely
I'm a chronic procrastubater myself. Thanks for taking the time to create this Jordan.
Thanks for taking the time to watch it, hopefully it didn't stop you from beating the wood for too long
Hey Jordan! This is super helpful. Thanks so much! Quick question about ensuring jobs only run once - the retry logic via the scheduling node a la timestamps is intentional right? As in, we intend to run the same job more than once so was a bit confused by that concern at around 23:05. Even if an executor goes down, we want to retry that job right? I guess I was bit confused why "running jobs once" is even a concern as that is expected behavior.
I think the gist is
1) you don't want two of the same job running at the same time
2) you don't want the same invocation/iteration of the same job running more than once if it isn't idempotent (e.g. sending an email)
Thank you, Jordan!
I still have some clarifications to get a better understanding:
1. What does "step" mean in the context of updating the run_timestamp each time we process the job? For example, if we update the job's run_timestamp from 2:01 to 2:06, is this just a one-time update, or do we continue to update it at subsequent steps, say from 2:06 to 2:11?
2. I'm struggling to understand the need for the run_timestamp according to "increase the run_ts for reflect how much time we should wait before rescheduling the job".
Especially when we already have a status column. Typically, we can determine which jobs to queue by checking the status field, for example, moving jobs from "READY" to "PROCESSING". For scenarios involving failure and retry, if a job fails and the executor is still operational, we could simply update the status to "FAILED". If the executor fails, it seems another executor pick up the job via a message queue, and handle the status updates accordingly?
3. Concerning priority scheduling, is there a risk of resource wastage, especially since it appears that all long-running jobs might subsequetitially occupy all executor resources connected from low to mid and to high-level message queues, since we always have any job start from the lowest level?
1) Steps: Job is read by scheduling cron, job gets put in message queue, job reaches executor. Nope, we'll continue to update it in the future if we retry a job!
2) If we don't have a run timestamp, we will just constantly retry the job every time that we poll our scheduling table. If we instead use some sort of enum like a status to say whether a job is completed, in progress, or failed, then we may not retry the job if the node running it goes down and can never tell us that it failed.
3) Yes, but that's typically why you have the lowest queues have a pretty small timeout. In theory, we could also have users submit a minimum priority to run at when they submit a job.
@@jordanhasnolife5163 Thanks a lot!!
@@jordanhasnolife5163
Continuing on this, when exactly we update the run_timestamp ? If we do it everytime, then we will end up running the job again although it's finished in an earlier run isn't it ?
@@rakeshvarma8091 The run timestamp is updated to say our restart time if we reach it. In the case of finishing the job, we can remove our entry from the table upon completion, or use a separate status column to say don't run it again.
QQ Jordan: Why we have put 2 Kafkas to stream job events from "DAG Table" and "Cron Table" to "SchedulingTable"? Why didn't we put 1 Kafka after Job EnqueueService to stream events (Cron or DAG) into the Scheduling Table? I believe this would simplify the design and also make JobEnqueueService asynchronous. One possible advantage I can think of in your design is that by committing initially in DB, we are ensuring the clients that we have successfully taken job requests. Whereas Kafka messages can drop even though Kafka has high retention policy.
I'm putting things in a database first so that I can reschedule them if need be. This is especially relevant for the DAG table. I need to know when all of my dependency jobs have been completed before enqueuing an object again. This could probably be done with kafka and flink but I think that overcomplicates things a bit.
awesome content! qq - at the last where you show the high level diagram, for cron case which component is responsible for keeping adding the "next/future" schedule?
I envisioned it as the worker running the previous invocation of the task would schedule the next. That certainly has pros and cons though, especially for graphs, and perhaps a dedicated component would be best.
Thanks for the great content! Regarding the idea of assigning a job to a worker (as you named it "dumb" :-)). Couldn’t we use a workers’ status table (which they update via heartbeats) to find an available "proper (priority)" machine, assign the job to it, and place it in the queue? That way, only the designated worker processes the job. Once the job is completed (either successfully or with failure), the result is sent to the job creator service, and the status field in the job table is updated for the scheduler. In the case of a failure, the scheduler would re-queue the job according to its retry policy, potentially assigning it to a different machine based on the updated status. What do you think?
jobs => (1) create/submit job service => job DB (job table, job schedule table, DAG table)=> (2) job scheduler=> SQS=> (3) executors => go back to (1) (create/submit job service)
| |
| (read/poll) | (write: heartbeat to DB)
machine DB (executor store table)
Yeah that's fine, I don't think you'd need a queue then though.
I'd also say to beware of potential contention on that database table, since hundreds of jobs will be looking for the first available worker with a certain priority.
I'd be curious to see how such a design plays our in practice and where bottlenekcs might come up.
@@jordanhasnolife5163
a- The priority queue helps balance the load on the executors, instead of pushing a new task to them, allowing them to pick up assigned jobs as soon as they are ready. b- Regarding contention: The task scheduler acts as a load balancer, routing jobs to executors based on real-time information (heartbeats) about their availability and capacity. Without stamping the jobs with the worker_id, the workers still need to decide which job to take (Reverse contention)
Great video! In the part about scheduling dag jobs, why we have to declare job 1 and 2 are dependencies of job 4 and 5? The execution of job 1 and 2 are based on the current timestamp and if it is their turn, they can run without assuming 4 and 5 are dependencies. On the other hand, knowing job 4 and 5 are not the dependencies of any other job, we can tell that they are the last job to run and can mark the dag as succeed? So having job 1 and 2 to say they are dependent on 4 and 5, how can we tell a dag is finished
Yeah this was just my convenience method of being able to tell that all leaf nodes of a dag job had been run and therefore automatically scheduling the root nodes again. In reality, a lot of DAGs start on a cron anyways, so you can basically just have a cron job to enqueue the root node for scheduling and then go from there
Just wanna say I really like the addition of the initial high level design! Definitely wouldn’t say it was incomprehensible before (I think your other videos are great too, thanks for all the content!), but this style definitely feels a little more like interview style and helps to better understand where your deeper explanations fit in the system.
Thanks Owen!
Question at time 22:42: if we have the table with the partition key (run_time + random number), we cannot update the run_time for next retry (at 12:45), am I right? NoSQL DB doesn't allow to update the partition key.
Sorry we're partitioning on the random number, the run_time is the sort key
Awesome jordan ! Can you share the slides which you use - so that we can take a print of the notes and refer back when required.
Check channel description
In the slide starting at minute 6:00, I'm curious as to what the best strategy for the database logic is to schedule a job based on its dependencies, e.g. for job 3, when 1: 1 and 2: 1. Is the logic dependent on the epochs of the parent nodes becoming unequal and then equal again to trigger job 3?
Yep basically
Jordan, excellent video, for the case of using kafka as the message queue, one consumer in a consumer group can process the job in a partition sequentially, what if a job takes a long time to run, will it be a problem of blocking all following jobs in this partition ?shall we wait or kill the job after sometime or move it to the retry queue ?
You're correct, but that's why we don't use kafka here and instead opt for a normal in memory broker.
One approach is to use a distributed lock in Redis to prevent multiple executors from rescheduling the same job. An executor can acquire a lock on a job ID by creating an entry in Redis with a TTL. Do you think this is a good idea?
Redis doesn't use distributed consensus, so it will be faster than zookeeper. But it can also go down, so I suppose you'll have to make that decision for yourself!
everytime rewatch the video feel learning something new. One new question: at 16:40 for the problem about load balancing (LB need to know status of the all executor, and can be single point of failure); isn't this the problem for all LB (not only for job scheduler problem)? for point of failure we can use multiple LB (active-active/active-passive etc.). Asking as if it is a common problem for LB, why LB are used frequently in most of other designs?
You're correct, load balancing can *be* a single point of failure, and we mitigate that through active-active/active-passive configurations of load balancing.
Basically Jenkins master and slave set up deployed on Kubernetes for scalability
Hi Jordan, thanks for the great content! I have a question on the job scheduler/poller. Is it possible to make job scheduler poll a partition range by reading some partition/consistent hashing ring assignment from ZK (or like a kafka consumer to handle a particular partition), so each job scheduler could run independently in parallel and we don't need to worry about the same job being picked up by different job scheduler?
I believe that's what I proposed in this video.
Have multiple scheduler shards, each of which has a poller process reading it on some interval.
Hi Jordan, at 12:54 why do we need to update run_timestamp and add 5 minutes ? What i understand is that we are updating it to make sure we dont re run it in case it is already in execution. Cant we track it using status and some error message field.
Yeah that's probably sufficient, but if you do rerun the job you'll want to update the run timestamp with your new run time
@@jordanhasnolife5163 but that would be a new record for same job id in the table, right ? As per my understanding, that table is maintaining different runs of jobs at different timestamps. So we might not need to update any timestamp here. As soon as a job's instance runs at time 12:01, it will create a new entry in table whether it runs successfully /errors out. The instance corresponding to new entry will start when that timestamp is crossed.
@@amanpaliwal2132 You could do it as a new row as well, or if it's a retry of the same job you can do the same row, the choice is yours there
@@jordanhasnolife5163 got it, thanks.
Hey Jordan, thank you for your video, it's very inspiring. For the scheduler table, could you please clarify the rationale behind using a combination of time_range and random_number as the partition key? At the end of the day, wouldn’t it achieve the same result as using time_range alone?
Gotta load balance somehow, right? Otherwise if there are a million jobs starting at noon today we can overload that DB.
@@jordanhasnolife5163 Didn't think about that one! very nice thinking :)
Hi Jordan, thanks for the amazing videos, really learned a lot from you. One question here I feel confused is that there are many nodes in the design like task queuing node, scheduler node, for me they are just some services, like task enquing service or scheduleing Service, is there any reason you draw them as "node"? Thank you!
I think that service is a fine word to use here
at 22:12 about indexing, I am wondering if we index by status then when we want to update the delivery has been succeeded, don't we need to search that job id without the index (which would take a lot of time)?
Fair point! I think this might be a good use case for either a local secondary index for the job id
Do you think if it makes more sense on just creating schedules whenever it gets to the scheduled time? Executor could possibly take a long time to execute a heavy job and therefore the scheduling will be delayed and users might be confused on why the job was not kicked off on the scheduling window.
Not entirely sure what you mean here, feel free to elaborate. When the job gets to the executor has nothing to do with the scheduling time, once the job gets to the executor, we'll increase the retry timestamp as well
is it good idea to serialize DAG in application code (topological sort) and treat it as a single task (containing bunch of sub tasks which are serialize DAG tasks), have one worker executing the subtasks orderly ?
Probably not because people may still have other constraints to starting subtasks such as a time, so then the worker has to sit idle. Plus they may have different CPU requirements.
Another questions about SchedulingTable: How do we query to get all rows before current time (STATUS_StartTime is partition key and JobId is sort key) since it would require SCAN operation on this huge table? What I know that we can't execute range query on partition key without scan. Even if we take STATUS_JobId as partition key and StartTime as sort key, it will still require SCAN operation in NoSQL and SQL DBs. Shouldn't we just keep "STATUS_StartTime" as partition key (where StartTime is time in unit at minute level precision) and JobId as sort key? The SchedulerNode will poll every minute all data from the current minute time interval partition.
We aren't partitioning by start time, we're indexing by start time (partitioning by start time will probably lead to hot partitions). Then you're just doing a normal index read on each partition, where the entries you care about are right at the front of the DB.
@@jordanhasnolife5163 Understood. Assuming its NoSQL (say DynamoDB) where index is different from SQL DB index. In NoSQL DDB, indexes are created based partition key and sort key. So what would be the partition key/sort key?
@@shahnawazalam9939 Partition key would probably just be some sort of randomly generated job id, and the sort key would be the timestamp at which we want to run it.
Thanks Jordan for your video! But have one question: Whether DAG jobs and cron jobs will have some overlapping? I understand that for simplifying the design, we can see that in most cases DAG jobs rely on job dependency finish and cron jobs rely on the time. But if it is possible that some DAG jobs may also be the cron jobs? If this is true, whether that means we need more cols in cron table for this? Or may need an extra table for this? Thanks a lot.
Typically the first nodes in the dag will be on some cron schedule, so yeah I would agree there would be additional logic to do there! I don't know that we'd need more logic in the cron table to do this, I think it's more so just what timestamp you throw on the dag job when you put it in the scheduler table (for the next time that it should run)
Curious if there is any specific reason we choose zookeeper as distributed lock instead of redis for this case?
I'm less familiar with redis distributed locks and whether or not they are consensus based. If they are, that means they persist on hardware failures (which I know zookeeper locks do). I'm sure they're faster than zookeeper though.
I am wondering how we know the job in scheduler task needs to be run? are we going to have the executor repeatedly query on the scheduler DB to check if current_time > run_timestamp? and this should be as frequent as even every one second (if we want an accurate one)?
Yes, basically.
Great content, one question about the "cron-table". Is it used in your final solution? I can't understand when it used, maybe except for the first scheduling, since you are rescheduling the heads of the DAGs by putting them as the dependencies of the tails. Am I missing something?
Dah yeah I mean you basically want to ensure that if Cron schedule changes you can update that in the scheduling table, so tasks should read from the Cron table when they schedule their next instance
@jordanhasnolife5163 . At 18:16, we mentioned that we can't use kafka partitions technique since earlier messages will be blocked to get picked up due to long running Executors. Broker technique works. But can't we still use Kafka partition technique? So, can we make "Kafka TO Executors" stateless (async) instead of sync using a Master Slave architecture where Master (replicated as all Leaders) which will quickly pull messages from one of the Kafka partitions as per Executors (workers) availability and just send it to one of the Executors?
Honeycomb handles over 1M transactions via 70 Kafka partitions with a single Kafka topic
Yep you can use kafka partitions, you probably just have to be smarter about having one partition per consumer.
25:33 Rather a Redis can be used for distributed lock with TTL.
Fair enough, though your consistency guarantees I imagine get diminished ever so slightly if the lock gets dropped on a redis leader failover
If we make the root node dependent on its child nodes, wouldn’t this make the graph no longer acyclic? How would we be able to figure out which one is the root node in this case?
Yeah. Came to the comments section to ask the same. 1->2->4->1
The node will still have higher epoch number I presume but yeah, not sure what all issues it can create. We can take a look at how argo scheduler works and use that idea as well probably.
The root nodes have a non-null Cron schedule, so should be fairly easy to identify for a given dag
Thanks for share! Would you please offer the content doc
Yeah I've been procrasturbating, will likely upload everything in batch in like 8 weeks when this series is done
@@jordanhasnolife5163 what's your next series that your planning ?
Wouldn't indexing on run_timestamp degrade the performance of the DB ? since ideally, we create indexes on columns that don't change often?
Most likely, but I don't see much of an other option
Hi Jordan, I am newbie to system design, I have a couple of questions, I assume that the executer is the pool of cron jobs scheduled to run every minute. I also that only one cron job will pull the scheduled tasks eligible for running. My questions are:
1- What if we have many tasks scheduled at a particular interval and all these get picked up, what is the liklihood of this scenario, and should we even care about the throttling of the executer?
2- Is running the task exactly at the specified time a non functional requirement? Or do we allow a margin?
1) The executor is basically a bunch of random nodes responsible for running a task, that is passed to it from the message broker. I'm not sure what you mean by this question, we'll absolutely have a lot of tasks scheduled at once.
2) I suppose that's up to your interviewer, the more that you partition those scheduling tables the faster you can get jobs in the queue, but this doesn't guarantee when they'll be run if there aren't enough executors available.
Do you think using Redis instead of DB could be beneficial here for any of the tables?
Probably all of them if you want to pay for it, given it is faster
Thank you, Jordan! Your videos are really helpful. I have a request for one of the amazon's most asked HLD system design interview questions - traffic control system. Would be really helpful if you could make a video on this🙏
Hopefully at some point I'll have time to do so!
Hi Jordan,
If DAG update isn't needed (as in if it's a simple cron job) then does executor directly updates schedules table, as there won't be CDC in this case?
Seems reasonable to me
I propose using Temporal to simplify and abstract away all the retry logic, locking, and ensure idempotency.
This is new to me - I'll take a look, thanks!
What would the APIs look like for this design?
schedule(s3_link_to_binary, cron_schedule) -> returns job id
schedule(some_graph_specification_with_specific_binaries, cron_schedule) -> returns list of job id
get_status(job_id)
I still prefer reading compared to watching videos for tech stuff. Wondering whether you can also publish your content as writing somewhere. There are also platforms writers get paid for their content. Or probably a book like Alex Xu's.
I will likely do this eventually! Though as you alluded to, I may try and get paid for it lol
Thanks Jordan! For writing notes, do you just use Apple notes? Or this is a different app?
OneNote
What’s another word for epoch in this context?
I don't know what you're referring to, do I need another word for it?
Bro this was next level! Love you bruh
Got this in Amazon interview today. Was LLD though but your overall video helped a lot!
Can you share notes on google drive link or some other way via icloud
I will do this eventually, but it will realistically be a couple of months
@@jordanhasnolife5163 what do you plan to start once this series get over ?
Thanks Jordan for these wonderful videos!
On the DAG Table, you mentioned "When all dependency task have an equal epoch for a given row, schedule that task". By epoch, do you mean just a counter?
If we use an actual linux epoch (which is number of seconds elapsed 1 January 1970), they won't be the same because two task will finish in different time.
Yes just an epoch. Linux is "millis since epoch", where they use that to mean 1970, but yeah I just mean a monotonically increasing sequence number.
@18:42 If our system has billions jobs per day, thats 10K jobs/second. If probably needs at least several hundreds of executors listening to the in-memory broker (or even kafka). Would it not overwhelm the broker when thousands of executors keep polling it, and in case there are jobs available to execute, which design pattern (or method) can assure there is no concurency bottleneck and same job is not picked by mutiple executors.
1) Partition the scheduler table
2) Partition the brokers
3) Each broker is listened to by a subset of the executor nodes
Amazing one Jordan, learned a lot from this!!
Thanks Jordan! Very nice video!
Thought I'd be clapping cheeks on a weekend, but I'm making notes from Jordans videos. fml.
You and me both brother
Thank You!
helll yeah
bad hand writing bro
Sorry my wrists are always tired for some reason