The TriggerDagRunOperator in Airflow! Create DAG dependencies at ease
HTML-код
- Опубликовано: 17 окт 2024
- DAG dependency in Airflow is a though topic. Or was a though topic. Indeed, with the new version of the TriggerDagRunOperator, in Airflow 2.0 it has never been so easy to create DAG dependencies!
👍 Smash the like button to become an Airflow Super Hero!
❤️ Subscribe to my channel to become a master of Airflow
🏆 Take my course : www.udemy.com/... to join the legends of Airflow
🚨 My Patreon: / marclamberti to support my work and be friend for life
Materials: www.notion.so/...
1. Problem
In Airflow 1.10.X we had multiple choices. Subdags, the ExternalTaskSensor or the TriggerDagRunOperator. One of the most common question is, how can I trigger another DAG B from my DAG A and wait for its completion before executing the next task of DAG A. Well, there wasn't an easy way to that. This only answer was to create a new operator, checking in the metadata database for the DAGRuns and so on. Wasn't easy at all. Well, guess what, it's over! Now in Airflow 2.0, there is a new version of the TriggerDagRunOperator!
2. Solution
The new version of the TriggerDagRunOperator brings two most awaited features, wait_for_completion and reset_dag_run!
3. Benefits
With the new TriggerDagRunOperator you can wait for the completion of the triggered DAG. No need to create your own custom operator anymore. It is as simple as setting the parameter wait_for_completion to true. Moreover, the parameter reset_dag_run allows you to backfill your triggered DAGRuns as well which is absolutely CRITICAL.
Enjoy!
Thanks @Marc for explaining the core concepts. I have a doubt. Consider this example - DAG1-Task1 (scheduled @daily , 06:00 - 18:30 execution time) is triggering DAG2-Task2 (Scheduled @Daily , 07:00 AM) it is so that when my DAG1 run for Monday finishes i should trigger DAG2-Task2 only on Tuesday 7:00 AM. Is this possible with trigggerDagRunOperator ? Can I give below scheduling definition for DAG2 - schedule=datetime.timedelta(days=1) ? Many Thanks !
As always... You have come with such a strong operator that resolves critical issues in existing project airflows.
Thanks
Thank you 🤩
Hi Marc, is it possible with this Operator to fail the target dag? (suppose target dag is running). My purpose is to fail another dag if another DAG is triggered.
Great video, thank you for sharing your experience. Can you clarify one moment. Is it possible to trigger dag with the same configuration json, because in airflow 2.x.x we're not allowed to push xcomarg in conf parameter. I've already tried conf = '{{dag_run.conf}}' , but it didn't t work out for me.
Great Video and a very powefull battle skin 🤙
thank you, helped me alot!
Hi Marc, great tutorial. I've tried to use TriggerDagRunOperator within a for loop while adding unique task_id for the task but doesn't work. Any solution or atleast the reason why it doesn't work as expected within a loop while the same works outside the loop?
Thank you😊
Hello Marc! Could u please make the specific videos on Taskflow api.
I have a case where DAG 1 runs daily, DAG 2 weekly and DAG 3 daily. Then triggerdagrun won't work right with wait for completion set to true?
Hey,
Is it possible to pass parameters to the newly triggered DAG?
thanx alot that really helped
Hi Marc could you please help me to understand which one is better TriggerDagRunOperator vs Dataset and why?
There isn’t one better than the other. Datasets allow you to trigger a DAG based on data updates whereas the triggerdagrunoperator triggers another DAG from a DAG.
for some reason I get this error when TriggerOperator is running:
sqlalchemy.exc.IntegrityError: (sqlite3.IntegrityError) UNIQUE constraint failed: dag_run.dag_id, dag_run.execution_date
any clues?
How to stop running airflow dag task?
I am running docker swarm operators in parallel mode, like [t1, t2 , t3].
If any of t1, t2 and t3 fails, I want to kill other using python code.
Please provide some guidance here.
Video is nice and helpful but i have one question.............In case if i want to access(use) dagA into dagB (Not saying triggering of dagA from DagB) So how can we do it? Help me regarding this
Thanks, This video is very helpful.
That's awesome,.. For earlier versions.. I had to add extra code to keep the track of triggered dag
Not anymore 🚀
Great work Marc! Yours videos are always very helpful. Thank you
You're welcome 😁
Just a coincidence, yesterday I was playing with this operator. Did not realize I would learn so much more from this video. Just curious, regarding the execution_date = {{ds}}, can we send a macro with timestamp {{ ts } to the target DAG, and not worry about setting reset_dag_run=True? If I understand correctly, {{ ds }} has no timestamp, hence the same date (yyyy-mm-dd) sent to target is making it fail. Thanks Marc for this great video. Looking for more :)
You always have to set this parameter to true if you want to be able to retry past already triggered dag runs :)
@sarit
Yes you can try giving the execution_date as {{ ts }}, and it will run if the dagrun with same timestamp is not present.
But if that does exist, then as Marc suggested you will need to set reset_dag_run to true
Hi Marc, Thank you for your explanation... If I have 6-7 dags that getting triggered from one of the trigger dag script ... Is there a way I can run all the target dags in serial?
Have you got the answer for your query ?? I am having a similar use case.
Great video, but how can we trigger target_dag with some config??
Thanks Marc! Really useful thing.
You're welcome :)
Hi Marc, i have one use case like Dag should run only if the previous day run is successful ..can you please share your thoughts if anyone come across any scenario like this
Thank you very much
I have scheduled the interval by min for the trigger dag and it still shows the already exists for dag id, even after using reset_dag_run=True and wait_for _completion=True
I tried using TriggerDagRunOperator and the DAG that is started has several subdags. And they are all starting at the same time. Have you seen this?
you're an awesome
may goodness always be given to you
Wow, thank you
Marc, Thank you for providing absolutely awesome content related to Airflow. subscribed and liked the video, i am looking for link in the description, i am only seeing patreon link and udemy link. Can you please add the trigger dag operator link as well. if there is a link and i over looked it , i apologize in advance
Oh you absolutely right I forgot the link 😱
Fixed 😁
We are using an older version of Airflow. 1.10.10 and I get an error when I try to import TriggerDagRunOperator
No module named 'airflow.operators.trigger_dagrun'
Update 🥹🥹🥹🥹
@@MarcLambertithanks for giving your time to reply to my comments. By any chance, is there a way to trigger 3 dags inside a Main Dag? This is specific to 1.10.10. Upgrade is a straightforward solution but that cannot be done right now.
when the wait_for_completion=False, the target was triggered properly. But when the parameter was set to True, the trigger_task was in indefinite state and the target dag was not triggering at all. This is the error log
[2022-07-27, 19:27:44 UTC] {trigger_dagrun.py:184} INFO - Waiting for target_dag on 2022-07-27T13:57:44.898572+00:00 to become allowed state [] ...
Is I am missing something?