Delta Change Feed and Delta Merge pipeline (extended demo)

Поделиться
HTML-код
  • Опубликовано: 9 фев 2025
  • This video shows an extended demo a pipeline that loads refined (silver) and curated (gold) tables. It complements the demo from the session "Data Ingestion - Practical Data Loading with Azure" that was part of PASS Data Community Summity 2022. It shows use of the Delta Lake Change Data Feed and the Merge command to track and process only inserts, updates, and deletes.
    Related content:
    Concurrent data ingestion in Spark notebooks - • Parallel table ingesti...
    Databricks blog on change data feed - docs.databrick...

Комментарии • 6

  • @gardnmi
    @gardnmi 2 года назад

    If you use the spark structured streaming for batch processing you can just use the delta tables themselves as sinks and you don't have to bother with keeping track of the state of the table yourself. There is a some good documentation on databricks if you just search "Delta table as a sink". My current go to pattern is append only ingest for bronze and then do a streaming merge into silver turning on the change data feed for that table. In the gold layer you can then read the change data feed which is append only as well and provide the cdc updates to the gold aggregates.

    • @felipecastro3710
      @felipecastro3710 2 года назад +1

      Hi! I am doing the same process as you, for bronze and silver ingestion. About using CDF for gold layer, won't I need to keep checkpoints of the versions I have already loaded? Getting the MAX(last_modified) seems like a heavy operation on big tables. Imagining a daily run, how are you usually filtering data when querying the CDF, to only merge data that should actually be merged?

  • @gallardorivilla
    @gallardorivilla 2 года назад +2

    Hi Dustin, great job, do you have any notebook example in Github repository? Thz!!

    • @DustinVannoy
      @DustinVannoy  Год назад +3

      github.com/datakickstart/azure-data-engineer-databricks/blob/main/best_of_class_recruiting/nb_refined_table_load.py

  • @ThisIsFrederic
    @ThisIsFrederic Год назад

    Hi Dustin,
    Thank you so much for sharing this demo with us.
    While trying to adapt it to my environment (I am using Synapse), I am facing an issue that I hope you could help me resolve: when the target delta table does not exist, I noticed that after I create it, CDF shows being enabled only with version 1 and not 0. The initial version 0 is for the initial WRITE only, no CDF enabled.
    Consequently, I cannot use your trick to load everything from version 0 if the table does not exist.
    I tried to use the "SET spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;" but Synapse seems to ignore it completely.
    I also tried to include the option of enabling CDF while saving the delta table like shown below, but again, CDF gets only enabled with version 1:
    df_records.write.format('delta').option("delta.enableChangeDataFeed", "true").save(target_path)
    Any clue?
    Thanks!

    • @ThisIsFrederic
      @ThisIsFrederic Год назад

      Well, I just discovered that when you create a delta table, adding option("delta.enableChangeDataFeed", "true") is not enough. When creating the temnp view to switch to SQL, then you also need to add the delta.enableChangeDataFeed = true option to the TBLPROPERTIES when issuing the CREATE OR REPLACE TABLE statement, and this works.
      Still, the question about enabling by default CDF in Synapse remains, if ever you have a clue.
      Thanks!