59. Databricks Pyspark:Slowly Changing Dimension|SCD Type1| Merge using Pyspark and Spark SQL

Поделиться
HTML-код
  • Опубликовано: 28 дек 2024

Комментарии • 120

  • @sohelsayyad5572
    @sohelsayyad5572 Год назад +1

    Informative video... Nd comment section too.
    Thanks Raja sir 💐

  • @tanushreenagar3116
    @tanushreenagar3116 Год назад +1

    Superb sir now I have cleared this concept

  • @parameshgosula5510
    @parameshgosula5510 3 года назад +1

    Very informative..keep continuing..

  • @MrVivekc
    @MrVivekc 2 года назад +1

    Very good content. keep it up.

    • @rajasdataengineering7585
      @rajasdataengineering7585  2 года назад

      Thanks Vivek

    • @MrVivekc
      @MrVivekc 2 года назад +1

      @@rajasdataengineering7585 wanted to confirm one thing. Is delta lake feature available in spark 3.x onwards?

    • @rajasdataengineering7585
      @rajasdataengineering7585  2 года назад

      Yes available

    • @MrVivekc
      @MrVivekc 2 года назад

      @@rajasdataengineering7585 OK and can we implement delta lake in spark 2.3.x?

  • @shubhamalsunde3230
    @shubhamalsunde3230 11 месяцев назад +1

    nice information

  • @RadhaJhaq
    @RadhaJhaq 2 года назад +1

    Very well explained

  • @joyo2122
    @joyo2122 2 года назад +1

    Great Video for data scientist like me

  • @ravinarang6865
    @ravinarang6865 Год назад +1

    Your videos are nice.

  • @abhaybisht101
    @abhaybisht101 3 года назад +1

    Nice content Raja👍

  • @kartikeshsaurkar4353
    @kartikeshsaurkar4353 2 года назад +5

    I think video title should change to "how to implement SCD 1 in databricks". It'll reach to larger audience

  • @rambabuposa5082
    @rambabuposa5082 Год назад +1

    Hi Raja, nice videos. have gone through all of your videos.
    In this video, you have titled like this SCD Type1. As per my knowledge, its Delta Lake with all kinds of history (versions). I think it should be SCD Type2.

    • @rajasdataengineering7585
      @rajasdataengineering7585  Год назад

      Hi Rambabu, thanks for your comment. I hope I explained merge statement which overwrites the previous version and not maintaining history. Anyway I will check the video and make corrections if needed.
      Also I have posted another video on scd type 2

  • @sanjayss8564
    @sanjayss8564 2 года назад +1

    Great explanation 👏👏, thanks

  • @awasthi4948
    @awasthi4948 2 года назад +1

    Truly appreciate your efforts!!
    Can you please share the script which you have used, So that we can do hands on same. ...

  • @surenderraja1304
    @surenderraja1304 Год назад +2

    Very Nice . Is it possible to supply the column names dynamically from somewhere. currently the columns names ON condition is hardcoded as id and also the set columns are hardcoded. can we try to pull those columns dynamically from a list or array or config file

  • @sravankumar1767
    @sravankumar1767 3 года назад

    Nice explanation 👌

  • @muvvalabhaskar3948
    @muvvalabhaskar3948 8 месяцев назад +1

    Hi in this example there is only one table
    If there are multiple tables with multiple columns and primary key also different for each table how do we generalize this one

  • @pritamsuryavanshi2582
    @pritamsuryavanshi2582 2 года назад +2

    Could you make a video on "How to implement SCD 2 using PySpark/Spark SQL in Databricks" ? Thanks.

  • @MrinalDas3107
    @MrinalDas3107 3 года назад +1

    loved the content. Thanks Brother

    • @rajasdataengineering7585
      @rajasdataengineering7585  3 года назад

      Thank you Mrinal.

    • @MrinalDas3107
      @MrinalDas3107 3 года назад +1

      @@rajasdataengineering7585 I am planning to make a transition to Data Engineering and would love to have someone like you guide me in my journey. Could we connect in some way?

    • @rajasdataengineering7585
      @rajasdataengineering7585  3 года назад +1

      Sure, please contact me at audaciousazure@gmail.com

    • @MrinalDas3107
      @MrinalDas3107 3 года назад

      @@rajasdataengineering7585 sure let me do that right away. Thanks alot :-)

  • @leviettung2000
    @leviettung2000 2 года назад +1

    Hi Raja,
    i am also doing upsert with structure streaming into Azure SQL database. Everything is not as it should be. I can upload via connect ODBC on normal connection but not in writeStream. Error that ODBC is not installed (but I do). I upsert with forEach.
    Can you give me some advice, many thanks

  • @JL-qc5gq
    @JL-qc5gq 2 года назад +2

    How do we update records in db table via jdbc in databricks? I tried read and write (overwrite/append) but not update.

    • @rajasdataengineering7585
      @rajasdataengineering7585  2 года назад

      You can use options presql and postsql statement using jdbc connection

    • @JL-qc5gq
      @JL-qc5gq 2 года назад +1

      @@rajasdataengineering7585 do you have example? let's say updating records using presql and postsql options

    • @rajasdataengineering7585
      @rajasdataengineering7585  2 года назад

      I haven't yet created a video on it

    • @bachannigam4332
      @bachannigam4332 5 месяцев назад

      I have a suggestion, create a stored procedure to delete the records which r found, call the procedure before insert, then insert all the values as new values

  • @ashishsharan6503
    @ashishsharan6503 3 года назад +1

    what will be the syntax for inserting record manually into Delta lake and dataframe using PySpark

    • @rajasdataengineering7585
      @rajasdataengineering7585  3 года назад +1

      HI Ashish, It is same SQL syntax but use %sql magic command if you are using pyhon or scala notebook.
      If you want insert based on dataframe, convert the dataframe into temp view first then you can use sql insert syntax

  • @Ek_e_Alfaaz
    @Ek_e_Alfaaz Год назад +2

    Sir please make playlist on streaming

  • @kamaltheja
    @kamaltheja 3 месяца назад

    Can you please share all the notebooks in this series?

  • @perryliu6573
    @perryliu6573 2 года назад +1

    How do we manage if the one of rows in the source table got deleted and we also want to delete this row in the target table?

    • @rajasdataengineering7585
      @rajasdataengineering7585  2 года назад +1

      There is an option for whenmatcheddelete as well. But that's is for matching cases. In your case if source df contains latest snapshot, then better you can go for truncate and load

  • @cajaykiran
    @cajaykiran 2 года назад +1

    Thank you

  • @rajeshmanepalli7367
    @rajeshmanepalli7367 9 месяцев назад +1

    in real time which one we can use either pyspark or sql which one is effective

  • @ashishsharan6503
    @ashishsharan6503 3 года назад +1

    Do we have SCD type 1 and Type 2 videos in PySpark and Spark SQL ?

    • @rajasdataengineering7585
      @rajasdataengineering7585  3 года назад

      HI Ashish, This tutorial can be used for SCD Type1 and will post another video for SCD Type 2

  • @ashishsharan6503
    @ashishsharan6503 3 года назад +1

    From where I can get the scripts you have shown in the tutorials, I liked them very much

  • @suhassajjan4056
    @suhassajjan4056 Год назад +1

    In my case table is in Hive ,can I implement same solution

  • @kunalmishra348
    @kunalmishra348 Год назад +1

    Hello
    Can you please tell how to change the data type of columns of the created delta table .
    For ex : In this video you have created

  • @yogeshgavali5238
    @yogeshgavali5238 8 месяцев назад

    How can we delete the data which is not in source in same merge statement for pyspark?

  • @keerthanavijayakumar9754
    @keerthanavijayakumar9754 10 месяцев назад +1

    @rajasdataengineering7585 hi sir,
    I have data in RDBM sql(source)
    I do some transformation and write that data in postgres db using pyspark. As this job is triggered on an hourly basis and fetching the data form source in 8 hour interval, there are so many duplicates in postgres table how to overcome that. Plsss explain me. Pls

    • @rajasdataengineering7585
      @rajasdataengineering7585  10 месяцев назад +1

      Hi Keerthana, better create a view at postgres side with logic of handling duplicates and ingest data from view to databricks

    • @keerthanavijayakumar9754
      @keerthanavijayakumar9754 10 месяцев назад

      @@rajasdataengineering7585 I need to move that data to warehouse. Transformed data can only be written but existing data and transformed data having duplication. How to write without again entring the same data in postgres

  • @yourtuber6367
    @yourtuber6367 10 месяцев назад

    Please upload delta live table series

  • @krishnamurthy9720
    @krishnamurthy9720 2 года назад +1

    I am getting error when inserting specific columns instead of all columns. Saying column is missing in insert

    • @rajasdataengineering7585
      @rajasdataengineering7585  2 года назад

      For update, you can give specific columns but for insert complete list of columns to be provided

  • @sanjaynath7206
    @sanjaynath7206 2 года назад +1

    SCD Type 2 video has been removed or made private? Could you please make it public? Awesome videos!

  • @krishnamurthy9720
    @krishnamurthy9720 3 года назад

    How do we write that number of inserted records count into some audit table

    • @rajasdataengineering7585
      @rajasdataengineering7585  3 года назад +3

      Hi Krishna,
      Very good question.
      In my example, delta table is located at /FileStore/tables/delta_merge.
      So after performing merge operation on this delta table, you can follow below steps
      from delta.tables import *
      delta_df = DeltaTable.forPath(spark, "/FileStore/tables/delta_merge")
      lastOperationDF = delta_df.history(1) # get the last operation
      display(lastOperationDF)
      explode_df = lastOperationDF.select(lastOperationDF.operation,explode(lastOperationDF.operationMetrics))
      display(explode_df)
      The column operationMetrics would contains all metrics including number of records inserted and number of records updated etc.,
      explode_df can be used to retrieve these metrics.
      Hope it helps

    • @krishnamurthy9720
      @krishnamurthy9720 3 года назад

      @@rajasdataengineering7585 thank you for instant reply..

    • @rajasdataengineering7585
      @rajasdataengineering7585  3 года назад

      You are welcome

  • @sivaani37
    @sivaani37 Год назад +1

    Can we directly update a table in SQL server instead of delta table ?

  • @gyan_chakra
    @gyan_chakra 2 года назад +1

    Which join is equivalent to merge ?

    • @rajasdataengineering7585
      @rajasdataengineering7585  2 года назад +2

      Merge is upsert operation, not joining operation. However internally it is equivalent of outer join

    • @gyan_chakra
      @gyan_chakra 2 года назад +1

      @@rajasdataengineering7585 thank you so much 😊👌

  • @keerthanavijayakumar9754
    @keerthanavijayakumar9754 11 месяцев назад +1

    Hi sir,
    Is that possible through pyspark standalone. Pls explain

  • @DevelopingI
    @DevelopingI Год назад

    Hey Thank you for the video. I am using the Method 1 to Perform Merge on a big table (1TB). It takes 3+ hours to do that.
    Can you please suggest how can I improve that?
    Also is it possible and advised to perform Merges on Parquet rather than converting these to Delta?

  • @dataanalyst3210
    @dataanalyst3210 5 месяцев назад +1

    please add one project production ready which live

  • @midhunssivan139
    @midhunssivan139 Год назад +1

    Can you please explain pyspark to Oracle table update.Insert i am able to do.

    • @rajasdataengineering7585
      @rajasdataengineering7585  Год назад

      We need to use jdbc driver for Oracle database as well. The process is same as ms SQL table in this example

    • @midhunssivan139
      @midhunssivan139 Год назад

      I tried using jdbc.It is inserting.But update statement not supported,either it will overwrite the full data.Is there anyway to execute merge statement on rdbms using pyspark

  • @sravankumar1767
    @sravankumar1767 2 года назад +1

    How to Merge Spark DataFrame - Complex type if we have two json files json 1 schema and json2 schema is differenr how can we merge using pyspark. can you please explain this scenario.

    • @rajasdataengineering7585
      @rajasdataengineering7585  2 года назад +1

      When you say merge, I assume you mean union of 2 dataframes from json files.
      Pls let me know if you mean SQL merge operation, not union.
      For union, number of columns and datatype should match. So you need to alter the dataframe first to meet these 2 conditions and it can be combined

    • @sravankumar1767
      @sravankumar1767 2 года назад +1

      @@rajasdataengineering7585 yes we can use union but how can we handle complex json when the schema is different then finally we can use UNION. Can you please explain this scenario

    • @rajasdataengineering7585
      @rajasdataengineering7585  2 года назад +1

      First, we need to flatten the nested json fields and remove unwanted columns and make the schema same for both dataframe.
      Now we can apply union.
      If you have any sample dataset, you can share it to my mail box. I can help you

    • @sravankumar1767
      @sravankumar1767 2 года назад

      @@rajasdataengineering7585 can you please share your mail id

    • @rajasdataengineering7585
      @rajasdataengineering7585  2 года назад +1

      audaciousazure@gmail.com

  • @rohitkumar-nk6sd
    @rohitkumar-nk6sd 2 года назад +1

    Hi how to merge on 2 columns?

    • @rajasdataengineering7585
      @rajasdataengineering7585  2 года назад

      I have posted another video for SCD type 2 and explained merging multiple columns. Kindly refer that video

    • @rohitkumar-nk6sd
      @rohitkumar-nk6sd 2 года назад +1

      @@rajasdataengineering7585
      Hi sir
      In my case
      I need to merge on 2 columns with 2 merge keys
      Please help me out 🙏

    • @rajasdataengineering7585
      @rajasdataengineering7585  2 года назад

      Please refer this video ruclips.net/video/GhBlup-8JbE/видео.html

  • @shalinikumari-qx9tn
    @shalinikumari-qx9tn 3 года назад

    Please make a video on delta lake

  • @mohanishsingh9786
    @mohanishsingh9786 2 года назад

    Sir can you share the notebook please

  • @tejadeep-u2n
    @tejadeep-u2n Год назад

    can you provide total script we can usefully to practice

  • @AK-ff7xt
    @AK-ff7xt Год назад

    Hi,
    Need this notebook. Can you please share

  • @DebayanKar7
    @DebayanKar7 Год назад

    Please add the html version of your notebook

  • @weldervicentemoreiramartin9467
    @weldervicentemoreiramartin9467 2 года назад

    04:40

  • @weldervicentemoreiramartin9467
    @weldervicentemoreiramartin9467 2 года назад +1

    Hello, I copied the template from the databricks documentation and saw that there are some differences in your example. Why doesn't the documentation model work?
    from delta.tables import *
    deltaTableVendas = DeltaTable.forPath(spark, 'dbfs:/mnt/bronze/vendas/')
    deltaTableVendasUpdates = DeltaTable.forPath(spark, 'dbfs:/mnt/silver/vendas/')
    dfUpdates = deltaTableVendasUpdates.toDF()
    deltaTableVendas.alias('vendas') \
    .merge(
    dfUpdates.alias('updates'),
    'vendas.numero_transacao = updates.numero_transacao'
    ) \
    .whenMatchedUpdate(set =
    {
    "numero_transacao": "updates.numero_transacao",
    "numped": "updates.numped",
    "codcli": "updates.codcli",
    "codprod": "updates.codprod",
    "data_venda": "updates.data_venda",
    "quantidade": "updates.quantidade",
    "valor": "updates.valor"

    }
    ) \
    .whenNotMatchedInsert(values =
    {
    "numero_transacao": "updates.numero_transacao",
    "numped": "updates.numped",
    "codcli": "updates.codcli",
    "codprod": "updates.codprod",
    "data_venda": "updates.data_venda",
    "quantidade": "updates.quantidade",
    "valor": "updates.valor"
    }
    ) \
    .execute()

    • @rajasdataengineering7585
      @rajasdataengineering7585  2 года назад +1

      Ideally this code should work as well. Need to look into details if any specific error. Could you elaborate more if you get any error

  • @nagamanickam6604
    @nagamanickam6604 8 месяцев назад +1

    Thank you

  • @shalinikumari-qx9tn
    @shalinikumari-qx9tn 3 года назад

    Please make a video on delta lake