Advancing Spark - Identity Columns in Delta
HTML-код
- Опубликовано: 10 июл 2024
- A classic challenge in Data Warehousing is getting your surrogate key patterns right - but without the same tooling, how do we achieve it in a Lakehouse Environment? We've had several patterns in the past, each with their drawbacks, but now we've got a brand new IDENTITY column type... so how does it size up?
In this video Simon does a quick recap of the existing surrogate key methods within Spark-based ETL processes, before looking through the new Delta Identity functionality!
As always, if you're beginning your lakehouse journey, or need an expert eye to guide you on your way, you can always get in touch with Advancing Analytics.
00:00 - Hello
01:37 - Existing Key Methods
10:36 - New Identity Functionality
15:18 - Testing a larger insert
Thats a great addition to the runtime. Thank you for the awesome vid!
I finally find a good use case for the identity function in Databricks! Typically we use hash keys so we can parallelise our jobs but I needed to create a unique identifier for an XML output, which was limited to a maximum 30 character string. Our natural keys were all GUIDs and our hash keys were also too long - delta identity to the rescue! Now we have a nice little mapping table from our natural keys to a bigint identity which we use in our XML output :D
'There's no Will' 😂 😂 😂
Nicely done!
Shhhh, I think I got away with that one :D
I‘m gonna try it
Any idea if this is coming to Azure Synapse soon?
Nice video Simon. You can create the delta table with the python library called delta it works pretty well. You can change the table location here too if you don't want it where the database default is which will make it an external table.
Some interesting ways to handle ids in the video. I used the monotonically_increasing_id for a while but have moved to using zipWuthUniqueId():
new_schema = StructType([StructField(colName, LongType(), True)] + df.schema.fields)
zipped_rdd = df.rdd.zipWithUniqueId()
new_rdd = zipped_rdd.map(lambda row: ([row[1] + offset] + list(row[0])))
spark.createDataFrame(new_rdd, new_schema)
Falling back to rdds and then to dataframe can be quite expensive
Good one!! Will save a lot of coding + execution time :)
Delta tables with Identity columns loose the ability to be inserted by several processes simultaneously. Only one process will insert data, others will get MetadataChangedException. Are there any workarounds?
Can this be used without Databricks as well? or Can I use Spark SQL to utilize identity column feature without Databricks and with using DeltaLake?
Please tell.
Is this only in Databricks , I can utilize this feature?, or can I use Spark SQL without using Databricks and make use of identity column feature using Delta Lake ?
Please tell.
Is this new feature only available for delta tables or we can use the identity option in parquet tables? Reading the documentation I think you can use it for several kinds of tables, thanks for your super useful videos! I'm a big fan!
Only Delta I'm afraid! There's a note in the create table docs identity section saying "this functionality only available in Delta lake". The 10.4 release notes are so pretty specific about it being a Delta release, sorry!
Its a great video. I have a small question can we add the identity column by altering the table. I have tried fe different ways but not working: 'Alter table db.student add column student_id bigint GENERATED ALWAYS AS IDENTITY (start with 100 increment by 1 )'
Very nice, Does IDENTITY auto generates number if id do df.write.() instaed on INSERT statement ?
Noiice!
Any known issues with merges? .whenNotMatchedInsertAll() Getting error: AnalysisException: cannot resolve Key in UPDATE clause given columns src.Id....
I´m looking for a way now to read from the metadata these details that were set for the identity. If I start at 100 and increase by 10 e.g. that must have been defined at the creation of the table and must be stored somewhere in the metadata. But how can I get to that information. I already tried with information_shema.columns but for some reason (not runtime) it does not work in my database. It doesnt recognize the information_schema function. Is there any other way to get this info from the metadata? Maybe in python or scala?
Please let me know.
Otherwise great Video. I quite enjoy your style of explaining.
Hi Simon. I see that new feature works only with Spark SQL. Any similar approach using dataframe api? I don't want to create a table and use it to load a dataframe..
Not that I know of - we usually have a small function to create the table if it doesn't exist already, before we merge into the table!
@@AdvancingAnalytics Same here. Tks
Anyone knows what kind of function/logic sits behind the GENERATED ALWAYS AS IDENTITY .... ? Is it still doing windowing/sorting or is it hashing? I am not quite sure if this is mentioned somewhere in the docs so far, so wondering what it might be doing under the hood.
Haven't dug into the execution plans yet, definitely going to have a dig and see how it changes. I did see a new generateidentifiers() function (or something like that!) in the explain steps, just haven't seen what impact that has on shuffles etc.
Hi Simon, regarding this issue (ruclips.net/video/Gnn54rp5RWM/видео.html), I encounter it every time I'm using a temp view. So that might explain why your second query didn't have anything been skipped.
Does this work with dlt
Gooooood question, I'd assume not as DLT assumes it will create and manage the table for you, so you wouldn't have the column definition supplied? Might be able to insert into a static (IE: non-dlt) table as a final step, but that misses some of the dlt managed pipeline point!
What if you did a second pass and ranked the monotonic ids.
Monotically_increasing_id is distributed. Row_number() using Window function without partitionBy is not distributed. When we don't define partitionBy, all the data are sent to one executor for generating row number. This can cause performance and memory issues.
Is there no way to reset the identity column? Say, 6 months from now, our Identity column has value 10 billion?
I've not seen any syntax for resetting the seed/increment, no. That's not to say there aren't ways you can do it, but I've not seen any direct table update commands for it!
3:47 "there's no Will" lol and they say Data Engineers are boring
Lol, who says that? 😅
Chapters? 😆
Urgh, fiiiiine. Added some chapters - but that meant I had to actually watch my own video! I'll take a looksie if there's a way we can encourage the auto-chapters to work better, or let people add their own in the comments!
@@AdvancingAnalytics haha, only messing about.. probs only of most use on longer vids anyway. keep up the good work dude! 👍
My approach to avoid those big unique ids.. val df_base_row_id = df_base.withColumn("row_id", monotonically_increasing_id())
val df_base_row_id_partition_offset = df_base_row_id.withColumn("partition_id", shiftright('row_id,33))
.withColumn("row_offset", 'row_id.bitwiseAND(2147483647))
val partitions_size = df_base_row_id_partition_offset.groupBy("partition_id")
.count()
.withColumnRenamed("count", "partition_size")
val windowSpec = Window.orderBy("partition_id")
.rowsBetween(Window.unboundedPreceding, -1)
val partitions_offset = partitions_size.withColumn("partition_offset",
when(expr("partition_id = 0"), lit(0))
.otherwise(sum("partition_size").over(windowSpec)))
val df_inc_rownum = df_base_row_id_partition_offset.join(broadcast(partitions_offset), "partition_id")
.withColumn("row_num", 'partition_offset+'row_offset+1)