How to manage Airflow Dags in Production | Dags versioning & Deployment

Поделиться
HTML-код
  • Опубликовано: 2 ноя 2024

Комментарии • 17

  • @BalzaelRisingDark
    @BalzaelRisingDark Год назад

    Very nice enginering of the life cycle.
    I'm wondering if the release/modifcation of a DAG and subsequent generation of a new Docker Image for Airflow will mean to rollout the installation again every time. In other words how do you manage cases where the current Airflow installation is already running some DAGs and you want to update your DAG repository?

    • @dchandrateja
      @dchandrateja Год назад +1

      Hi Mattia, based on my knowledge, if you are using a kubernetes executer, the replicasets of the pods are updated wo make sure it is fault tolerant. Hence K8's will let your present image runnning in a seperate replica and updates the other replicas, and will complete the whole process repectively.

  • @mithunmanoharmithun
    @mithunmanoharmithun 2 года назад +1

    How does airflow pick up the DAG from each of the submodules - since on checkout each submodule will be having the folder structure of its source repo?

    • @maxcoteclearning
      @maxcoteclearning  2 года назад

      Airflow automatically scans for python scripts with airflow.DAG() definition, and if it finds it, it treats it as a DAG. Normally we keep single main.py (with DAG() code) file at root of every submodule repository.

  • @vaib5917
    @vaib5917 2 года назад +1

    Hi, I really need to know if we put the python script into a container and run it using DockerOperator, how can we pass the values of Variables from AirFlow Admin UI t the container ?? Please help.

    • @maxcoteclearning
      @maxcoteclearning  2 года назад

      If you want to use dockerOperator for one or all of the DAG tasks, then yes. keep the business logic script (python, bash, java, go or any other language) inside a Docker container image name e.g. my_task_1:latest. Then write a DAG code in python which initializes this DockerOperator task(s). This DAG code will be containarized inside airflow image. Whie initializing the DockerOperator in your DAG code, you can pass AIrflow variables, secrets, connections and xcoms to my_task_1:latest by two ways, 1. via environment variables or 2. command args. See DockerOperator details here airflow.apache.org/docs/apache-airflow-providers-docker/stable/_api/airflow/providers/docker/operators/docker/index.html

  • @samsid5223
    @samsid5223 Год назад

    Do you faced issue with parallelism.
    I have set airflow core parallelism to 512, max dag run is 1 max task run for one dag is 128. I have a default pool with 128 slot but still my airflow is not running more than the 32 tasks at time.
    I'm using helm chart to deploy the airflow.

    • @maxcoteclearning
      @maxcoteclearning  Год назад +1

      No we haven't. Since you have provided very less hints, I'd assume there could be multiple factors causing your issue. 1. the machine where you are running Airflow may not have enough resources (CPU and memory) ? Check scheduler logs to get more hints. Or 2. Different executors (e.g., SequentialExecutor, LocalExecutor, CeleryExecutor, etc.) have different behavior regarding task parallelism. Make sure you are using an executor that supports the desired level of parallelism. e.g. in case of Celery, check if your task queue is not throttled. 3. Could it be because your tasks are dependent on each other due to which you are not seeing desired tasks running in parallel ? 4. whats the value set for AIRFLOW__CORE__DAG_CONCURRENCY ? its the max number of task instances allowed to run concurrently FOR A SINGLE SPECIFIC DAG.

    • @samsid5223
      @samsid5223 Год назад

      Thanks for your prompt response.
      We have enough resources on Kubernetes.
      I triggered individual dags to check the concurrency.
      I'm using CeleryExecutor and for task concurrency I have set it up AIRFLOW__CORE__MAX_ACTIVE_TASKS_PER_DAG to 128. AIRFLOW__CORE__DAG__CONCURRENCY has been renamed to max_active_tasks_per_dag in newer version .
      I thought I have only 128 slot and thought could might be creating issue because I was putting parallelism more than the slot available but now I have decreased to 56 but still it's not going beyond 32.

    • @samsid5223
      @samsid5223 Год назад

      What about the scheduler log? I don't see any issue do you want me to check for specific errors/warnings.
      I list down the configuration in airflow ui and pod as well and both the places I can see parallelism is set to 512.
      What about the celery queue throttle how can I view this? and also do you know what's the general memory usage for running airflow with 50-100 dags. How much memory usually it consumes for all the poda?

    • @maxcoteclearning
      @maxcoteclearning  Год назад

      @@samsid5223 I haven't got much experience with CeleryExecutors TBH. But have you tried looking at celery concurrency settings ? specifically celeryconfig.py file max-task-per-child (docs.celeryq.dev/en/stable/userguide/workers.html#max-tasks-per-child-setting) , it could be referered as `--max-tasks-per-child`. I believe this control how many tasks Celery workers can execute concurrently. have you tried increasing celert worker concurrency ? (docs.celeryq.dev/en/stable/userguide/workers.html#concurrency) .

    • @samsid5223
      @samsid5223 Год назад

      Thank you for all your responses.
      I will take a look in celeryconfig but imho max task per child is same as max task per dag and other parallelism settings. This could also be a bug in airflow version 2.5.3. I will try with different versions

  • @oluwatobitobias
    @oluwatobitobias 2 года назад +1

    If airflow is running via virtualenv ...shouldn't it be automatically using the lib/python/custom-site of that venv???
    And if not...how do i point airflow to the lib directory to look for packages???

    • @maxcoteclearning
      @maxcoteclearning  2 года назад

      Yes airflow will use lib/python/custom-site packages (if using venv). But here i am explaining NOT to use the same airflow venv for populating packages required by individual dags. In doing so, we will end-up having a gigantic python venv with loads of py-packages (used by several different dags). This becomes mess and hard to manage especially when cleaning up the venv while dag removal or upgrade.

  • @rahulbhandari1
    @rahulbhandari1 Год назад

    So if you have 2000+ dags you are saying to have 2000+ repos ?
    thats sound like nightmare to manage especially during airflow version upgrades when some variables/operators gets deprecated which needs to be updated in the repo for each dag. Imagine creating 2000 separate prs , one for each dag repo. Also wat about common operator/libs/utils which are used across sub set of dags and may have to updated which will mean updating code for each dag repo . If you have common repo say added as separate repo or part of base image and if some updates needs to be done .

    • @maxcoteclearning
      @maxcoteclearning  Год назад

      Not necessarily. Usually single repo generates multiple dags. We have around 120 dags from 50 repos (dynamic dags from configs). Making code compatible with newer versions is always part of life cycle maintenance.

    • @rahulbhandari1
      @rahulbhandari1 Год назад

      @@maxcoteclearning We have about 2500 dags running on our airflow cluster . Not all dags can be generated using simple config . If thats the case you have very narrow use case . We do have common factories that we use for some of our similar dags . Having 50 plus repos is just bonkers to manage just 120 dags ,
      "Making code compatible with newer versions is always part of life cycle maintenance." -> yes and the idea is to make the lifecycle management scalable . if u have 1000 repos to manage ur ETL good luck with that . There is reason why at scale mono repo is the preferred approach at least per application . IT democratizes use of common libs, code quality and tests and scale your ci processes. Check implementation of big tech like airbnb, meta, google . Proper tooling and custom ci can achieve everything u mentioned even with single repo