The TriggerDagRunOperator in Airflow! Create DAG dependencies at ease

Поделиться
HTML-код
  • Опубликовано: 17 окт 2024
  • DAG dependency in Airflow is a though topic. Or was a though topic. Indeed, with the new version of the TriggerDagRunOperator, in Airflow 2.0 it has never been so easy to create DAG dependencies!
    👍 Smash the like button to become an Airflow Super Hero!
    ❤️ Subscribe to my channel to become a master of Airflow
    🏆 Take my course : www.udemy.com/... to join the legends of Airflow
    🚨 My Patreon: / marclamberti to support my work and be friend for life
    Materials: www.notion.so/...
    1. Problem
    In Airflow 1.10.X we had multiple choices. Subdags, the ExternalTaskSensor or the TriggerDagRunOperator. One of the most common question is, how can I trigger another DAG B from my DAG A and wait for its completion before executing the next task of DAG A. Well, there wasn't an easy way to that. This only answer was to create a new operator, checking in the metadata database for the DAGRuns and so on. Wasn't easy at all. Well, guess what, it's over! Now in Airflow 2.0, there is a new version of the TriggerDagRunOperator!
    2. Solution
    The new version of the TriggerDagRunOperator brings two most awaited features, wait_for_completion and reset_dag_run!
    3. Benefits
    With the new TriggerDagRunOperator you can wait for the completion of the triggered DAG. No need to create your own custom operator anymore. It is as simple as setting the parameter wait_for_completion to true. Moreover, the parameter reset_dag_run allows you to backfill your triggered DAGRuns as well which is absolutely CRITICAL.
    Enjoy!

Комментарии • 44

  • @aditikhisti4826
    @aditikhisti4826 Год назад

    Thanks @Marc for explaining the core concepts. I have a doubt. Consider this example - DAG1-Task1 (scheduled @daily , 06:00 - 18:30 execution time) is triggering DAG2-Task2 (Scheduled @Daily , 07:00 AM) it is so that when my DAG1 run for Monday finishes i should trigger DAG2-Task2 only on Tuesday 7:00 AM. Is this possible with trigggerDagRunOperator ? Can I give below scheduling definition for DAG2 - schedule=datetime.timedelta(days=1) ? Many Thanks !

  • @sunil._sharma
    @sunil._sharma 3 года назад +1

    As always... You have come with such a strong operator that resolves critical issues in existing project airflows.
    Thanks

  • @Abdullahkbc
    @Abdullahkbc 10 месяцев назад

    Hi Marc, is it possible with this Operator to fail the target dag? (suppose target dag is running). My purpose is to fail another dag if another DAG is triggered.

  • @thexndr5773
    @thexndr5773 3 года назад +2

    Great video, thank you for sharing your experience. Can you clarify one moment. Is it possible to trigger dag with the same configuration json, because in airflow 2.x.x we're not allowed to push xcomarg in conf parameter. I've already tried conf = '{{dag_run.conf}}' , but it didn't t work out for me.

  • @juanagudelo4154
    @juanagudelo4154 2 года назад

    Great Video and a very powefull battle skin 🤙

  • @MrShockalex
    @MrShockalex 10 месяцев назад

    thank you, helped me alot!

  • @KayVirals
    @KayVirals 3 года назад +1

    Hi Marc, great tutorial. I've tried to use TriggerDagRunOperator within a for loop while adding unique task_id for the task but doesn't work. Any solution or atleast the reason why it doesn't work as expected within a loop while the same works outside the loop?

  • @DarkShadow-321
    @DarkShadow-321 7 месяцев назад

    Thank you😊

  • @sagarbhatt3346
    @sagarbhatt3346 2 года назад

    Hello Marc! Could u please make the specific videos on Taskflow api.

  • @CD-hq9hd
    @CD-hq9hd 3 года назад

    I have a case where DAG 1 runs daily, DAG 2 weekly and DAG 3 daily. Then triggerdagrun won't work right with wait for completion set to true?

  • @danielfridman8317
    @danielfridman8317 6 месяцев назад

    Hey,
    Is it possible to pass parameters to the newly triggered DAG?

  • @harmeetsingh-ch7pd
    @harmeetsingh-ch7pd 2 года назад

    thanx alot that really helped

  • @1984ssn
    @1984ssn 11 месяцев назад

    Hi Marc could you please help me to understand which one is better TriggerDagRunOperator vs Dataset and why?

    • @MarcLamberti
      @MarcLamberti  11 месяцев назад

      There isn’t one better than the other. Datasets allow you to trigger a DAG based on data updates whereas the triggerdagrunoperator triggers another DAG from a DAG.

  • @TheBlackSheepO
    @TheBlackSheepO 3 года назад

    for some reason I get this error when TriggerOperator is running:
    sqlalchemy.exc.IntegrityError: (sqlite3.IntegrityError) UNIQUE constraint failed: dag_run.dag_id, dag_run.execution_date
    any clues?

  • @scorpion__8445
    @scorpion__8445 Год назад

    How to stop running airflow dag task?
    I am running docker swarm operators in parallel mode, like [t1, t2 , t3].
    If any of t1, t2 and t3 fails, I want to kill other using python code.
    Please provide some guidance here.

  • @irohitvarma
    @irohitvarma 3 года назад

    Video is nice and helpful but i have one question.............In case if i want to access(use) dagA into dagB (Not saying triggering of dagA from DagB) So how can we do it? Help me regarding this

  • @shivamsofta6461
    @shivamsofta6461 3 года назад

    Thanks, This video is very helpful.

  • @neerajvyas3947
    @neerajvyas3947 3 года назад

    That's awesome,.. For earlier versions.. I had to add extra code to keep the track of triggered dag

  • @jcosta.16
    @jcosta.16 3 года назад

    Great work Marc! Yours videos are always very helpful. Thank you

  • @saritkumarsi4166
    @saritkumarsi4166 3 года назад

    Just a coincidence, yesterday I was playing with this operator. Did not realize I would learn so much more from this video. Just curious, regarding the execution_date = {{ds}}, can we send a macro with timestamp {{ ts } to the target DAG, and not worry about setting reset_dag_run=True? If I understand correctly, {{ ds }} has no timestamp, hence the same date (yyyy-mm-dd) sent to target is making it fail. Thanks Marc for this great video. Looking for more :)

    • @MarcLamberti
      @MarcLamberti  3 года назад +1

      You always have to set this parameter to true if you want to be able to retry past already triggered dag runs :)

    • @sunil._sharma
      @sunil._sharma 3 года назад +1

      @sarit
      Yes you can try giving the execution_date as {{ ts }}, and it will run if the dagrun with same timestamp is not present.
      But if that does exist, then as Marc suggested you will need to set reset_dag_run to true

  • @sasmitanayak6840
    @sasmitanayak6840 2 года назад

    Hi Marc, Thank you for your explanation... If I have 6-7 dags that getting triggered from one of the trigger dag script ... Is there a way I can run all the target dags in serial?

    • @adityachouhan5054
      @adityachouhan5054 2 года назад

      Have you got the answer for your query ?? I am having a similar use case.

  • @jorgeespana482
    @jorgeespana482 2 года назад

    Great video, but how can we trigger target_dag with some config??

  • @nikolaysokolov9027
    @nikolaysokolov9027 3 года назад

    Thanks Marc! Really useful thing.

  • @tamilpulit8622
    @tamilpulit8622 2 года назад

    Hi Marc, i have one use case like Dag should run only if the previous day run is successful ..can you please share your thoughts if anyone come across any scenario like this

  • @akashhudge5735
    @akashhudge5735 3 года назад

    Thank you very much

  • @ateechakma568
    @ateechakma568 2 года назад

    I have scheduled the interval by min for the trigger dag and it still shows the already exists for dag id, even after using reset_dag_run=True and wait_for _completion=True

  • @rodrigofernandosilva1768
    @rodrigofernandosilva1768 2 года назад

    I tried using TriggerDagRunOperator and the DAG that is started has several subdags. And they are all starting at the same time. Have you seen this?

  • @mahesakurniawan7804
    @mahesakurniawan7804 3 года назад

    you're an awesome
    may goodness always be given to you

  • @PrakashReddyK
    @PrakashReddyK 3 года назад

    Marc, Thank you for providing absolutely awesome content related to Airflow. subscribed and liked the video, i am looking for link in the description, i am only seeing patreon link and udemy link. Can you please add the trigger dag operator link as well. if there is a link and i over looked it , i apologize in advance

  • @shivaprasadmpshivu1882
    @shivaprasadmpshivu1882 11 месяцев назад

    We are using an older version of Airflow. 1.10.10 and I get an error when I try to import TriggerDagRunOperator
    No module named 'airflow.operators.trigger_dagrun'

    • @MarcLamberti
      @MarcLamberti  11 месяцев назад

      Update 🥹🥹🥹🥹

    • @shivaprasadmpshivu1882
      @shivaprasadmpshivu1882 11 месяцев назад

      @@MarcLambertithanks for giving your time to reply to my comments. By any chance, is there a way to trigger 3 dags inside a Main Dag? This is specific to 1.10.10. Upgrade is a straightforward solution but that cannot be done right now.

  • @dhanunjayar2511
    @dhanunjayar2511 2 года назад

    when the wait_for_completion=False, the target was triggered properly. But when the parameter was set to True, the trigger_task was in indefinite state and the target dag was not triggering at all. This is the error log
    [2022-07-27, 19:27:44 UTC] {trigger_dagrun.py:184} INFO - Waiting for target_dag on 2022-07-27T13:57:44.898572+00:00 to become allowed state [] ...
    Is I am missing something?