Hi Raja, nice videos. have gone through all of your videos. In this video, you have titled like this SCD Type1. As per my knowledge, its Delta Lake with all kinds of history (versions). I think it should be SCD Type2.
Hi Rambabu, thanks for your comment. I hope I explained merge statement which overwrites the previous version and not maintaining history. Anyway I will check the video and make corrections if needed. Also I have posted another video on scd type 2
Very Nice . Is it possible to supply the column names dynamically from somewhere. currently the columns names ON condition is hardcoded as id and also the set columns are hardcoded. can we try to pull those columns dynamically from a list or array or config file
Hi in this example there is only one table If there are multiple tables with multiple columns and primary key also different for each table how do we generalize this one
@@rajasdataengineering7585 I am planning to make a transition to Data Engineering and would love to have someone like you guide me in my journey. Could we connect in some way?
Hi Raja, i am also doing upsert with structure streaming into Azure SQL database. Everything is not as it should be. I can upload via connect ODBC on normal connection but not in writeStream. Error that ODBC is not installed (but I do). I upsert with forEach. Can you give me some advice, many thanks
I have a suggestion, create a stored procedure to delete the records which r found, call the procedure before insert, then insert all the values as new values
HI Ashish, It is same SQL syntax but use %sql magic command if you are using pyhon or scala notebook. If you want insert based on dataframe, convert the dataframe into temp view first then you can use sql insert syntax
There is an option for whenmatcheddelete as well. But that's is for matching cases. In your case if source df contains latest snapshot, then better you can go for truncate and load
@rajasdataengineering7585 hi sir, I have data in RDBM sql(source) I do some transformation and write that data in postgres db using pyspark. As this job is triggered on an hourly basis and fetching the data form source in 8 hour interval, there are so many duplicates in postgres table how to overcome that. Plsss explain me. Pls
@@rajasdataengineering7585 I need to move that data to warehouse. Transformed data can only be written but existing data and transformed data having duplication. How to write without again entring the same data in postgres
Hi Krishna, Very good question. In my example, delta table is located at /FileStore/tables/delta_merge. So after performing merge operation on this delta table, you can follow below steps from delta.tables import * delta_df = DeltaTable.forPath(spark, "/FileStore/tables/delta_merge") lastOperationDF = delta_df.history(1) # get the last operation display(lastOperationDF) explode_df = lastOperationDF.select(lastOperationDF.operation,explode(lastOperationDF.operationMetrics)) display(explode_df) The column operationMetrics would contains all metrics including number of records inserted and number of records updated etc., explode_df can be used to retrieve these metrics. Hope it helps
Hey Thank you for the video. I am using the Method 1 to Perform Merge on a big table (1TB). It takes 3+ hours to do that. Can you please suggest how can I improve that? Also is it possible and advised to perform Merges on Parquet rather than converting these to Delta?
I tried using jdbc.It is inserting.But update statement not supported,either it will overwrite the full data.Is there anyway to execute merge statement on rdbms using pyspark
How to Merge Spark DataFrame - Complex type if we have two json files json 1 schema and json2 schema is differenr how can we merge using pyspark. can you please explain this scenario.
When you say merge, I assume you mean union of 2 dataframes from json files. Pls let me know if you mean SQL merge operation, not union. For union, number of columns and datatype should match. So you need to alter the dataframe first to meet these 2 conditions and it can be combined
@@rajasdataengineering7585 yes we can use union but how can we handle complex json when the schema is different then finally we can use UNION. Can you please explain this scenario
First, we need to flatten the nested json fields and remove unwanted columns and make the schema same for both dataframe. Now we can apply union. If you have any sample dataset, you can share it to my mail box. I can help you
Hello, I copied the template from the databricks documentation and saw that there are some differences in your example. Why doesn't the documentation model work? from delta.tables import * deltaTableVendas = DeltaTable.forPath(spark, 'dbfs:/mnt/bronze/vendas/') deltaTableVendasUpdates = DeltaTable.forPath(spark, 'dbfs:/mnt/silver/vendas/') dfUpdates = deltaTableVendasUpdates.toDF() deltaTableVendas.alias('vendas') \ .merge( dfUpdates.alias('updates'), 'vendas.numero_transacao = updates.numero_transacao' ) \ .whenMatchedUpdate(set = { "numero_transacao": "updates.numero_transacao", "numped": "updates.numped", "codcli": "updates.codcli", "codprod": "updates.codprod", "data_venda": "updates.data_venda", "quantidade": "updates.quantidade", "valor": "updates.valor"
Informative video... Nd comment section too.
Thanks Raja sir 💐
Thanks and welcome!
Superb sir now I have cleared this concept
Great to hear👍🏻
Very informative..keep continuing..
Thanks Paramesh
Very good content. keep it up.
Thanks Vivek
@@rajasdataengineering7585 wanted to confirm one thing. Is delta lake feature available in spark 3.x onwards?
Yes available
@@rajasdataengineering7585 OK and can we implement delta lake in spark 2.3.x?
nice information
Thanks
Very well explained
Thank you
Great Video for data scientist like me
Thank you
Your videos are nice.
Glad you like them!
Nice content Raja👍
Thanks Abhay
I think video title should change to "how to implement SCD 1 in databricks". It'll reach to larger audience
Sure, will change it as suggested
Hi Raja, nice videos. have gone through all of your videos.
In this video, you have titled like this SCD Type1. As per my knowledge, its Delta Lake with all kinds of history (versions). I think it should be SCD Type2.
Hi Rambabu, thanks for your comment. I hope I explained merge statement which overwrites the previous version and not maintaining history. Anyway I will check the video and make corrections if needed.
Also I have posted another video on scd type 2
Great explanation 👏👏, thanks
Thanks Sanjay!
Truly appreciate your efforts!!
Can you please share the script which you have used, So that we can do hands on same. ...
Very Nice . Is it possible to supply the column names dynamically from somewhere. currently the columns names ON condition is hardcoded as id and also the set columns are hardcoded. can we try to pull those columns dynamically from a list or array or config file
Yes that's possible
Nice explanation 👌
Thank you Sravan
Hi in this example there is only one table
If there are multiple tables with multiple columns and primary key also different for each table how do we generalize this one
We can create parameterized UDF to make it generic
Could you make a video on "How to implement SCD 2 using PySpark/Spark SQL in Databricks" ? Thanks.
Sure Pritam, it would be my next video as per your request
loved the content. Thanks Brother
Thank you Mrinal.
@@rajasdataengineering7585 I am planning to make a transition to Data Engineering and would love to have someone like you guide me in my journey. Could we connect in some way?
Sure, please contact me at audaciousazure@gmail.com
@@rajasdataengineering7585 sure let me do that right away. Thanks alot :-)
Hi Raja,
i am also doing upsert with structure streaming into Azure SQL database. Everything is not as it should be. I can upload via connect ODBC on normal connection but not in writeStream. Error that ODBC is not installed (but I do). I upsert with forEach.
Can you give me some advice, many thanks
How do we update records in db table via jdbc in databricks? I tried read and write (overwrite/append) but not update.
You can use options presql and postsql statement using jdbc connection
@@rajasdataengineering7585 do you have example? let's say updating records using presql and postsql options
I haven't yet created a video on it
I have a suggestion, create a stored procedure to delete the records which r found, call the procedure before insert, then insert all the values as new values
what will be the syntax for inserting record manually into Delta lake and dataframe using PySpark
HI Ashish, It is same SQL syntax but use %sql magic command if you are using pyhon or scala notebook.
If you want insert based on dataframe, convert the dataframe into temp view first then you can use sql insert syntax
Sir please make playlist on streaming
Sure, will create a playlist for streaming concepts
Can you please share all the notebooks in this series?
How do we manage if the one of rows in the source table got deleted and we also want to delete this row in the target table?
There is an option for whenmatcheddelete as well. But that's is for matching cases. In your case if source df contains latest snapshot, then better you can go for truncate and load
Thank you
in real time which one we can use either pyspark or sql which one is effective
Both are performing at same level. Its all about developer's convenience
Do we have SCD type 1 and Type 2 videos in PySpark and Spark SQL ?
HI Ashish, This tutorial can be used for SCD Type1 and will post another video for SCD Type 2
From where I can get the scripts you have shown in the tutorials, I liked them very much
Will share the scripts
In my case table is in Hive ,can I implement same solution
Yes you can implement for hive table also
Hello
Can you please tell how to change the data type of columns of the created delta table .
For ex : In this video you have created
Sure, will create a video on this requirement
How can we delete the data which is not in source in same merge statement for pyspark?
@rajasdataengineering7585 hi sir,
I have data in RDBM sql(source)
I do some transformation and write that data in postgres db using pyspark. As this job is triggered on an hourly basis and fetching the data form source in 8 hour interval, there are so many duplicates in postgres table how to overcome that. Plsss explain me. Pls
Hi Keerthana, better create a view at postgres side with logic of handling duplicates and ingest data from view to databricks
@@rajasdataengineering7585 I need to move that data to warehouse. Transformed data can only be written but existing data and transformed data having duplication. How to write without again entring the same data in postgres
Please upload delta live table series
I am getting error when inserting specific columns instead of all columns. Saying column is missing in insert
For update, you can give specific columns but for insert complete list of columns to be provided
SCD Type 2 video has been removed or made private? Could you please make it public? Awesome videos!
Sure Sanjay, will add that one
@@rajasdataengineering7585 please upload SCD type 2 video
How do we write that number of inserted records count into some audit table
Hi Krishna,
Very good question.
In my example, delta table is located at /FileStore/tables/delta_merge.
So after performing merge operation on this delta table, you can follow below steps
from delta.tables import *
delta_df = DeltaTable.forPath(spark, "/FileStore/tables/delta_merge")
lastOperationDF = delta_df.history(1) # get the last operation
display(lastOperationDF)
explode_df = lastOperationDF.select(lastOperationDF.operation,explode(lastOperationDF.operationMetrics))
display(explode_df)
The column operationMetrics would contains all metrics including number of records inserted and number of records updated etc.,
explode_df can be used to retrieve these metrics.
Hope it helps
@@rajasdataengineering7585 thank you for instant reply..
You are welcome
Can we directly update a table in SQL server instead of delta table ?
Yes we can do
@@rajasdataengineering7585 can you post a video on that ?
Sure will create a video on this request
Which join is equivalent to merge ?
Merge is upsert operation, not joining operation. However internally it is equivalent of outer join
@@rajasdataengineering7585 thank you so much 😊👌
Hi sir,
Is that possible through pyspark standalone. Pls explain
Hi Keerthana, yes it is possible only through pyspark also
Hey Thank you for the video. I am using the Method 1 to Perform Merge on a big table (1TB). It takes 3+ hours to do that.
Can you please suggest how can I improve that?
Also is it possible and advised to perform Merges on Parquet rather than converting these to Delta?
hv u got the soln?
please add one project production ready which live
Sure, I will create a project for this requirement
Can you please explain pyspark to Oracle table update.Insert i am able to do.
We need to use jdbc driver for Oracle database as well. The process is same as ms SQL table in this example
I tried using jdbc.It is inserting.But update statement not supported,either it will overwrite the full data.Is there anyway to execute merge statement on rdbms using pyspark
How to Merge Spark DataFrame - Complex type if we have two json files json 1 schema and json2 schema is differenr how can we merge using pyspark. can you please explain this scenario.
When you say merge, I assume you mean union of 2 dataframes from json files.
Pls let me know if you mean SQL merge operation, not union.
For union, number of columns and datatype should match. So you need to alter the dataframe first to meet these 2 conditions and it can be combined
@@rajasdataengineering7585 yes we can use union but how can we handle complex json when the schema is different then finally we can use UNION. Can you please explain this scenario
First, we need to flatten the nested json fields and remove unwanted columns and make the schema same for both dataframe.
Now we can apply union.
If you have any sample dataset, you can share it to my mail box. I can help you
@@rajasdataengineering7585 can you please share your mail id
audaciousazure@gmail.com
Hi how to merge on 2 columns?
I have posted another video for SCD type 2 and explained merging multiple columns. Kindly refer that video
@@rajasdataengineering7585
Hi sir
In my case
I need to merge on 2 columns with 2 merge keys
Please help me out 🙏
Please refer this video ruclips.net/video/GhBlup-8JbE/видео.html
Please make a video on delta lake
Sure Shalini, will make videos on delta lake
Sir can you share the notebook please
can you provide total script we can usefully to practice
Hi,
Need this notebook. Can you please share
Please add the html version of your notebook
04:40
Hello, I copied the template from the databricks documentation and saw that there are some differences in your example. Why doesn't the documentation model work?
from delta.tables import *
deltaTableVendas = DeltaTable.forPath(spark, 'dbfs:/mnt/bronze/vendas/')
deltaTableVendasUpdates = DeltaTable.forPath(spark, 'dbfs:/mnt/silver/vendas/')
dfUpdates = deltaTableVendasUpdates.toDF()
deltaTableVendas.alias('vendas') \
.merge(
dfUpdates.alias('updates'),
'vendas.numero_transacao = updates.numero_transacao'
) \
.whenMatchedUpdate(set =
{
"numero_transacao": "updates.numero_transacao",
"numped": "updates.numped",
"codcli": "updates.codcli",
"codprod": "updates.codprod",
"data_venda": "updates.data_venda",
"quantidade": "updates.quantidade",
"valor": "updates.valor"
}
) \
.whenNotMatchedInsert(values =
{
"numero_transacao": "updates.numero_transacao",
"numped": "updates.numped",
"codcli": "updates.codcli",
"codprod": "updates.codprod",
"data_venda": "updates.data_venda",
"quantidade": "updates.quantidade",
"valor": "updates.valor"
}
) \
.execute()
Ideally this code should work as well. Need to look into details if any specific error. Could you elaborate more if you get any error
Thank you
You're welcome
Please make a video on delta lake