How to build and automate your Python ETL pipeline with Airflow | Data pipeline | Python

Поделиться
HTML-код
  • Опубликовано: 29 сен 2024

Комментарии • 100

  • @BiInsightsInc
    @BiInsightsInc  2 года назад +9

    Videos in this series:
    Build ETL pipeline: ruclips.net/video/dfouoh9QdUw/видео.html&t
    ETL Load Reference Data: ruclips.net/video/W-8tEFAWD5A/видео.html
    ETL Incremental Data Load (Source Change Detection): ruclips.net/video/32ErvH_m_no/видео.html&t
    ETL Incremental Data Load (Destination Change Comparison): ruclips.net/video/a_T8xRaCO60/видео.html
    How to install Apache Airflow: ruclips.net/video/t4h4vsULwFE/видео.html

    • @beastmaroc7585
      @beastmaroc7585 Год назад

      How to load big tables example of 30M rows from SQL to PG ? Trying to solve load speed and memory usage

    • @BiInsightsInc
      @BiInsightsInc  Год назад

      ​@@beastmaroc7585 You can horizontally scale your set up to cluster if your data exceeds the resources of a single machine. This will allow you to process large datasets as cluster nodes will offer more compute and memory resources. You can also use a distributed engine like Spark and/or kafka to process large datasets.
      I have discussed the Airflow execution and cluster based approach here. Feel free to check it out.
      ruclips.net/video/In7zwp0FDX4/видео.html&ab_channel=BIInsightsInc

  • @agussyarif8429
    @agussyarif8429 2 года назад

    this is powerful knwoledge

  • @beastmaroc7585
    @beastmaroc7585 Год назад

    Can you share a tip for big tables 30M rows ... I have an issue with Df and memory usage

    • @BiInsightsInc
      @BiInsightsInc  Год назад

      I have done a video on this topic sinc a lot of folks were raising this. Also, look into big data processing frameworks design for large data processing i.e., Dask, Kafka, Spark and Flink. I have covered Spark and Kafka on this channel.
      ruclips.net/video/8Awk8CpfeGc/видео.html&ab_channel=BIInsightsInc

  • @eunheechoi3745
    @eunheechoi3745 11 месяцев назад

    Hi, can you please make a video about python ETL pipeline with Airflow for extracting data from oracle sql and loading to postgres?

    • @BiInsightsInc
      @BiInsightsInc  11 месяцев назад

      I think you all you need to is connect to orcale database and rest of the process should be similar. Here is a link to oracledb library that showcases how to establish connection to oracle database.
      python-oracledb.readthedocs.io/en/latest/user_guide/connection_handling.html

    • @eunheechoi3745
      @eunheechoi3745 11 месяцев назад

      @@BiInsightsIncthanks for the reply. I’ve already know how to connect to oracle db in Python itself but was trying to connect oracle directly to airflow by using providers. No luck so far. Can you please make a video how to install airflow provider for oracle and how to extract data from oracle and insert data to Postgres ?

    • @BiInsightsInc
      @BiInsightsInc  11 месяцев назад +1

      @@eunheechoi3745 I will cover the oracle provider with airflow soon.

    • @eunheechoi3745
      @eunheechoi3745 11 месяцев назад

      @@BiInsightsInc thanks you so much! actually, I've figured it out. It was just including airflow.providers.oracle in the docker-compose.yaml file... but I suppose this is not recommended for production. Can you show better approach to install the oracle provider like how to install cx_Oracle with instant client etc...? or alternatives...
      Also, after connection to oracle in airflow is established, I am having an error of UnicodeDecode error UTF-8 when I try to get the entire table from oracle... do you know how to fix it?

    • @BiInsightsInc
      @BiInsightsInc  11 месяцев назад +1

      @@eunheechoi3745 You're on the right track to install the oracle providers. However, don't include it in the docker compose file rather build a custom image with additional libraries. I have covered how to build a custom image with additional libraries/providers here: ruclips.net/video/In7zwp0FDX4/видео.html&t

  • @saadlechhb3702
    @saadlechhb3702 4 месяца назад +1

    Thank you, i have a question If the task is scheduled to run daily and new data has been inserted into the source since the last transfer, will just new data get transferred next task or all data again

    • @BiInsightsInc
      @BiInsightsInc  4 месяца назад

      This would bring in all data. This is a truncate and load approach. If you need to bring only newly inserted data then you want to look into the incremental data load approache(s). I have covered those in the following videos:
      ruclips.net/video/a_T8xRaCO60/видео.html
      ruclips.net/video/32ErvH_m_no/видео.html

    • @saadlechhb3702
      @saadlechhb3702 4 месяца назад

      @@BiInsightsInc i can make scripte that merge between the incremental load and aiflow ?

  • @ryanschraeder8681
    @ryanschraeder8681 3 месяца назад

    What happens if you kill the airflow web server, or localhost? Will the DAG still run on the schedule you specified?

    • @BiInsightsInc
      @BiInsightsInc  3 месяца назад

      If the services are down then DAG won’t run. You want to make sure your server remains on for the DAG to execute on schedule.

  • @GiovanniDeCillis
    @GiovanniDeCillis 2 года назад +4

    Hey, this is really helpful. It would be even more insightful if you provided or suggested ways to run this process (along with those described in this recent series of tutorials) in the cloud or in a server less environment. Thanks!

    • @BiInsightsInc
      @BiInsightsInc  2 года назад +3

      Hey Giovanni thanks for stopping by. We've covered AWS data lake in the last few videos and will cover ETL topics going forward. I will be sure to include Airflow and how to process data from S3 to Redshift with it. Stay tuned.

    • @GiovanniDeCillis
      @GiovanniDeCillis 2 года назад +1

      @@BiInsightsInc thank you indeed. I completely missed the data lake video. Thanks that is really helpful!

  • @demohub
    @demohub Год назад +1

    This video was a great resource. Thanks for the tutelage and your take on it.

  • @Obtemus
    @Obtemus 2 года назад +3

    Great playlist. Your method of building the videos is very practical and lovable.
    One Question: How can you perform the "paste" line by line in the recording? is it ctrl+y after so many ctrl+z ops?

    • @BiInsightsInc
      @BiInsightsInc  2 года назад +5

      Glad you find it helpful. Yes, you're right. I write the code and then it's ctrl+y and ctrl+z as I am going through the code. It takes practice and sometimes few retakes to get it right.

    • @Obtemus
      @Obtemus 2 года назад +3

      @@BiInsightsInc that's so nice. I asked because I found a very useful way to build the code during the video

  • @tevintemu105
    @tevintemu105 2 года назад +2

    I want to thank you for posting this content. It is helpful in many ways.

  • @lasopainstantaneasuprema6861
    @lasopainstantaneasuprema6861 7 месяцев назад +1

    simple and godlike understandable 10/10

  • @AbdelhadiChajia-b8d
    @AbdelhadiChajia-b8d Месяц назад

    Thank you sir, you helped me understand airflow, and I did the same thing following the same process but from mysql - extract-load -> transformation -> load with free employees database and I did share it on my github and linkedin tagging this video.

  • @mredmister3014
    @mredmister3014 22 дня назад

    You have perfect cadence and clarity in your training. Short yet complex. Very cool and subscribed.

  • @crisybelkis
    @crisybelkis Год назад

    Thank you a lot. I'm trying to understand hot to create pipeline. I want to be expert on this and be a good Data Engineering. Professional.

  • @franciscmoldovan2153
    @franciscmoldovan2153 2 года назад +1

    Very nice videos and blog! Keep up the good work!

  • @sharanchandrasekaran8399
    @sharanchandrasekaran8399 Год назад +1

    Excellent video sir, thank you.

  • @abnormalirfan
    @abnormalirfan 2 года назад

    Hi, how dags airflow mssql to gcs please, i will to build data warehouse in bigquery.

  • @bandedecodeurs
    @bandedecodeurs Год назад

    Very good video. Thank you !

  • @rjrmatias
    @rjrmatias Год назад

    excellent tutorial, thank you ! it would be great if you could split the tasks in several files, need to learn how to do this

  • @eunheechoi3745
    @eunheechoi3745 11 месяцев назад

    Was able to connect to import airflow.providers.oracle and oracle hook. However, when I use OracelHook, it keeps throwing an error saying ‘conn_id’ not found even thought the connection has been configured fine via the airflow UI. Do you have any idea? What could go wrong ?

    • @BiInsightsInc
      @BiInsightsInc  11 месяцев назад

      It's not able to find the connection configured in the Airflow. You need to define the connection, test and try agian.

  • @WEN_the_cat_Astonot
    @WEN_the_cat_Astonot 2 года назад +1

    sql server to gcs with airflow please

    • @BiInsightsInc
      @BiInsightsInc  2 года назад +2

      Will make a video on this topic soon. Thanks

    • @WEN_the_cat_Astonot
      @WEN_the_cat_Astonot 2 года назад

      ​@@BiInsightsInc
      I'm really waiting for that

    • @BiInsightsInc
      @BiInsightsInc  2 года назад

      @@WEN_the_cat_Astonot SQL Server to GCP video is up.
      ruclips.net/video/Nq8Td8h_szk/видео.html&t

  • @nabarajghimire610
    @nabarajghimire610 Год назад

    Plz make video about installation of airflow .I faced many issues in windows 10/11 like localhost:8080 not open properly lacking many GUI feature

    • @BiInsightsInc
      @BiInsightsInc  Год назад

      I have covered the Airflow's installation as a docker container. You can follow the steps outline in the following video. Docker takes the OS system out of the equation and you will get the same Airflow functionaly not matter what system you are running.
      ruclips.net/video/t4h4vsULwFE/видео.html

  • @thalesdefaria
    @thalesdefaria 2 года назад +2

    Great content, so pragmatic!

  • @flyingdutchman9804
    @flyingdutchman9804 Год назад +1

    Fantastic presentation!

  • @dzakwandaffahidayatullah3125
    @dzakwandaffahidayatullah3125 2 года назад

    do you have any suggestions for me to run etl python in command line ubuntu server? to read a csv file or a connection with a pre-existing mysql database?

    • @BiInsightsInc
      @BiInsightsInc  2 года назад

      Hi Dzakwan, I have done a video on querying existing mysql database. You can follow along to establish database connection and query the database. Once you write and test the script then save as Python file i.e. etl.py. You can type: Python etl.py in the terminal to execute the script. Hope this helps.
      ruclips.net/video/mKnMY2GqaGI/видео.html

  • @CriticalThinker0890
    @CriticalThinker0890 Год назад

    why didn't you transform the data after you extract the data from mssql itself and then load the final data to postgresql ?

    • @BiInsightsInc
      @BiInsightsInc  Год назад

      It's better to break the each operation in a different task so it is easier to manage and troubleshoot. This in turns results in a neater dag structure extract >> transform >> load. You can write the entire operation in a single function but it would be hard to maintain and debug.

  • @hellowillow_YT
    @hellowillow_YT 8 месяцев назад

    Hi, Sir! Thanks for the insightful video! However I'd like to ask if we need to place the ETL python file in a particular folder for it to be recognized as a DAG by Airflow?

    • @BiInsightsInc
      @BiInsightsInc  8 месяцев назад +1

      Yes, you need to put the DAG files in the dags folder. This folder gets mounted in the docker image.

    • @hellowillow_YT
      @hellowillow_YT 8 месяцев назад

      Understood. Thank you, Sir! @@BiInsightsInc

  • @ChaimaHermi-zi8pq
    @ChaimaHermi-zi8pq Год назад

    Could you please provide more details on what you mean by "table fact of the datawarehouse"?

    • @BiInsightsInc
      @BiInsightsInc  Год назад

      Can you please provide the timeline in the video for the context.

  • @shrutibangad8112
    @shrutibangad8112 2 года назад

    Question: Do you have a video to build pipeline to move data from Postgres Server to SQL Server?

    • @BiInsightsInc
      @BiInsightsInc  2 года назад

      Hi Shruti, I think we can reverse the order of the source and destination and it will give you the code to move from Postgres to SQL Server. Hope this helps!

  • @parikshitchavan2211
    @parikshitchavan2211 Год назад

    Hello Sir Thanks for such a great tutorial everting you made smooth like butter thanks for that ,just one question whenever we made new DAG ( we will have to add docker-compose-CeleryExecutor, docker-compose-LocalExecutor, and Config for that particular DAG )

    • @BiInsightsInc
      @BiInsightsInc  Год назад

      Hi Parikshit, if you have mounted the host folder “dag” to the docker container, just as shown in the tutorial, then dags created in this folder will automatically appear in your docker UI and will be copied over to appropriate containers.

    • @parikshitchavan2211
      @parikshitchavan2211 Год назад

      Thanks sir for clearing confusion please keep uploading videos 😀🙏

  • @ulhasbhagwat4942
    @ulhasbhagwat4942 2 года назад

    in load_src_data, can we not use postgres connection object(conn) to load the data instead of using create_engine? because we need to know all the same connection details again which we used to create connection id in airflow

    • @BiInsightsInc
      @BiInsightsInc  2 года назад

      Hi Ulhas, when we use df.to_sql, we need to pass in a SQLAlchemy engine and not a standard connection object (and not a cursor either). If you try it on your end it will throw an error. Hope this helps.

  • @eunheechoi3745
    @eunheechoi3745 11 месяцев назад

    is it better to extract the entire tables to the stagings and then final table with consolidated data ?? why?
    wouldn't recommend to have queried data first (extract) and then transforming and loading it to the final table?
    I am trying to understand why the first approach is better over the latter....

    • @BiInsightsInc
      @BiInsightsInc  11 месяцев назад

      If you are referring to the technique showcased in the video then it is following the dimensional modeling approach. After staging we are building a proper dimension table. With the basic ETL you would extract, transform and load the tables. I'd advise to pick up a good book on ETL, dimensional modeling if you're curious. Hope this helps.

    • @eunheechoi3745
      @eunheechoi3745 11 месяцев назад

      @@BiInsightsInc Thank you so much for the explanation and advice. Can you please recommend the books/tutorials/courses which have practical hands on projects and where I can delve into data engineering space. I have worked as a quant/data analytics for about 5 + years and expanding my skills/knowledge in data engineering

  • @babakbarzegari9228
    @babakbarzegari9228 Год назад

    it was awesome, I had a blast, Can a video be created for ETL SQL Server Tables to Azure Blobs with Airflow, you never worked around Azure Blob

    • @BiInsightsInc
      @BiInsightsInc  Год назад +1

      Thanks Babak! I will try and explore Azure.

    • @babakbarzegari9228
      @babakbarzegari9228 Год назад

      @@BiInsightsInc Thank you so much! Looking forward to it with great anticipation. Please ping me here when it done.

  • @bigdataon-premorcloud7866
    @bigdataon-premorcloud7866 2 года назад

    how to add the "Conn Type" of mssql? ruclips.net/video/eZfD6x9FJ4E/видео.html

    • @BiInsightsInc
      @BiInsightsInc  2 года назад

      Check out the Airflow installation with SQL Server provider. I go over on how to create a SQL Server connection.
      How to install Apache Airflow on Docker? | Build Custom Airflow Docker Image | Airflow | Docker
      ruclips.net/video/t4h4vsULwFE/видео.html

  • @isbakhullail6693
    @isbakhullail6693 2 года назад

    Do you use dokcer for airflow or not? is there an installation video?

    • @BiInsightsInc
      @BiInsightsInc  2 года назад

      I have Airflow running on Docker. Here is the link to Airflow installation video:
      ruclips.net/video/t4h4vsULwFE/видео.html

  • @alanamantinodasilva4362
    @alanamantinodasilva4362 2 года назад

    Can you ask me a question? I need to transfer data from Azure SQL server to MSSQL Server (cloud to server). I can do it directly from source and load to destiny, or need to land in postgre?

    • @BiInsightsInc
      @BiInsightsInc  2 года назад +1

      Hi Alan, you can directly load the data from Azure SQL to SQL Server. There’s no need to load data to PostgreSQL.

    • @alanamantinodasilva4362
      @alanamantinodasilva4362 2 года назад

      ​@@BiInsightsInc can you indicate to me a content that explain this, i loved your vídeo, but is so much complex to me, i just need to transfer data, from source to destiny. (sorry about my english).

    • @alanamantinodasilva4362
      @alanamantinodasilva4362 2 года назад

      @@BiInsightsInc i tried with generic transfer, but this limit the transfer to 1000 rows por json file and take a lot of time to transfer.

  • @mrg807
    @mrg807 Год назад

    Thanks for the video. One question is why you didn't use Airflow's built-in PostgresHook or PostgreSQLOperator instead of SQLAlchemy and Pandas. I think this would simplify the code and make it more consistent with the way the SQL server connection is established using MsSqlHook.

    • @BiInsightsInc
      @BiInsightsInc  Год назад

      This is a continuation of the Python ETL series. This session orchestrate what we developed previously and the (E*T*L) transformations are carried out with Pandas. I am not sure if PostgresHook offers same capabilities as Pandas therefore, I went with Pandas. I could've used the PostgresHook to get a dataframe from Postgres but not sure if I can persist dataframe as easily as SQLAlchemy. In addition, If I were to use the PostgreSQLOperator then I would've to switch to SQL as oppose to Python.

    • @alonsom.donayre1992
      @alonsom.donayre1992 Год назад

      @@BiInsightsInc I guess this is just an example of how to use the hooks, cause airflow is not a processing framework but an orchestrator. Transformation should be handle by an external source like Spark, Database engine etc

    • @BiInsightsInc
      @BiInsightsInc  Год назад

      @@alonsom.donayre1992 Yes, it is an orchestrator however with TaskFlow API 2.0, which is used here, they're claiming you can carry out the execution (transformation) within Airflow. I am not sold on it because it is designed for small to medium jobs, unless you are working with a cluster. I am waiting for Dagster to mature as it can handle orchestration and processing. More to come on Dagster. So yeah I would advise to process large datasets with an external database engine or distributed engines like Spark & Glue.
      airflow.apache.org/docs/apache-airflow/stable/tutorial/taskflow.html

  • @rajansonvane488
    @rajansonvane488 Год назад

    Thanks !!❤

  • @varshachauhan5185
    @varshachauhan5185 2 года назад

    Thank you so much for the video, I have one question ,suppose i have 1000 different source files how to compare this file with one table present in database

    • @BiInsightsInc
      @BiInsightsInc  2 года назад

      Hi Varsha thanks for stopping by. If the files structure is the same then you can combine them into a DataFrame. Then query the database. From there is just comparing the two DataFrames. Hope this helps.
      Also, if you are trying to do change detection for incremental load then I will be doing a video on how to perform incremental load. Stay tuned.

    • @varshachauhan5185
      @varshachauhan5185 2 года назад

      @@BiInsightsInc Thank you for your reply . I will wait for your video and request u to make more videos on operations which can be performed using pandas dataframe

    • @BiInsightsInc
      @BiInsightsInc  2 года назад

      @@varshachauhan5185 I have a video on the Pandas library with various tips and trips. Feel free to check it out: ruclips.net/video/-jerzfh2bS0/видео.html

  • @fashiuddin7911
    @fashiuddin7911 Год назад

    Great great great

  • @mycousins896
    @mycousins896 Год назад

    why didnt you show how you connect to mysql? please make a video on that!

    • @BiInsightsInc
      @BiInsightsInc  Год назад +1

      As you can see that I am using SQL Server as the source and Postgres as the target therefore, MySQL is out of the scope. This is why it is not covered. Here is screenshot on how to connect to MySQL via Airflow:
      github.com/hnawaz007/pythondataanalysis/blob/main/AirflowSession2/airflow_mysql.png

    • @mycousins896
      @mycousins896 Год назад

      @@BiInsightsInc Thanks, how about sql server? Can you make that as well? And how did you host your db to that ip address in the picture?
      Im asking these because there really isnt any good tutorials out there for stuff like these. Many people underestimate how difficult sometimes it is to make connections work between these tools, while the actual development on the other hand is most of times easy. That is a really gap that you could fill by making some videos on how to make it work between docker, microsoft sql server/myssql etc., airflow, and write back to connected database

    • @BiInsightsInc
      @BiInsightsInc  Год назад

      @@mycousins896 Check out the Airflow installation with SQL Server provider. I go over on how to install SQL Server provider and how to create a SQL Server connection.
      How to install Apache Airflow on Docker? | Build Custom Airflow Docker Image | Airflow | Docker
      ruclips.net/video/t4h4vsULwFE/видео.html

  • @CSE-AshikAhamedP
    @CSE-AshikAhamedP Год назад

    but why you importing pandas

    • @BiInsightsInc
      @BiInsightsInc  Год назад

      Pandas is used to query the database and carry out the transformation.

  • @letsplaionline
    @letsplaionline 2 года назад

    Great explanation! Thanks!

  • @tahirrana786
    @tahirrana786 2 года назад

    👍

  • @esmailpaltasingh9691
    @esmailpaltasingh9691 Год назад

    How to connect to Sybase db

    • @BiInsightsInc
      @BiInsightsInc  Год назад

      You can use sybase library in Python to connect to a Sybase database. If you want to use SQLAlchemey then use the following method to connect to a Sybase database.
      # Sybase library
      import Sybase
      db = Sybase.connect('server','name','pass','database')
      c = db.cursor()
      c.execute("sql statement")
      list1 = c.fetchall()
      #SQLAlchemey
      params = (
      "DRIVER = "+driver+";"\
      "SERVER = "+server+";"\
      "DATABASE = "+database+";"\
      "PORT = "+port+";"\
      "UID = "+user+";"\
      "PWD= "+password+";"
      params = urllib.parse.quote_plus(params)
      connexion_string = 'sybase+pyodbc:///?odbc_connect = %s'%params)

  • @Levy957
    @Levy957 2 года назад

    Amazing

  • @ardavanmoinzadeh801
    @ardavanmoinzadeh801 2 года назад

    Hi, Can you share the source code ?

    • @BiInsightsInc
      @BiInsightsInc  2 года назад

      Please check the description of the video. All the resources including link to the repo is there. Thanks

    • @ardavanmoinzadeh801
      @ardavanmoinzadeh801 2 года назад

      @@BiInsightsInc yes.. I commented too fast :) found them and forgot to fix my comment!! thanks!