Advancing Spark - Identity Columns in Delta

Поделиться
HTML-код
  • Опубликовано: 10 июл 2024
  • A classic challenge in Data Warehousing is getting your surrogate key patterns right - but without the same tooling, how do we achieve it in a Lakehouse Environment? We've had several patterns in the past, each with their drawbacks, but now we've got a brand new IDENTITY column type... so how does it size up?
    In this video Simon does a quick recap of the existing surrogate key methods within Spark-based ETL processes, before looking through the new Delta Identity functionality!
    As always, if you're beginning your lakehouse journey, or need an expert eye to guide you on your way, you can always get in touch with Advancing Analytics.
    00:00 - Hello
    01:37 - Existing Key Methods
    10:36 - New Identity Functionality
    15:18 - Testing a larger insert

Комментарии • 38

  • @JulesTuffrey
    @JulesTuffrey 2 года назад +1

    Thats a great addition to the runtime. Thank you for the awesome vid!

  • @MrMikereeve89
    @MrMikereeve89 2 года назад

    I finally find a good use case for the identity function in Databricks! Typically we use hash keys so we can parallelise our jobs but I needed to create a unique identifier for an XML output, which was limited to a maximum 30 character string. Our natural keys were all GUIDs and our hash keys were also too long - delta identity to the rescue! Now we have a nice little mapping table from our natural keys to a bigint identity which we use in our XML output :D

  • @WastedFury
    @WastedFury 2 года назад +2

    'There's no Will' 😂 😂 😂
    Nicely done!

  • @joyo2122
    @joyo2122 2 года назад

    I‘m gonna try it

  • @KristofDeMiddelaer
    @KristofDeMiddelaer 2 года назад +4

    Any idea if this is coming to Azure Synapse soon?

  • @alexischicoine2072
    @alexischicoine2072 2 года назад

    Nice video Simon. You can create the delta table with the python library called delta it works pretty well. You can change the table location here too if you don't want it where the database default is which will make it an external table.

  • @89seanp
    @89seanp 2 года назад +2

    Some interesting ways to handle ids in the video. I used the monotonically_increasing_id for a while but have moved to using zipWuthUniqueId():
    new_schema = StructType([StructField(colName, LongType(), True)] + df.schema.fields)
    zipped_rdd = df.rdd.zipWithUniqueId()
    new_rdd = zipped_rdd.map(lambda row: ([row[1] + offset] + list(row[0])))
    spark.createDataFrame(new_rdd, new_schema)

    • @zycbrasil2618
      @zycbrasil2618 2 года назад

      Falling back to rdds and then to dataframe can be quite expensive

  • @rajkumarv5791
    @rajkumarv5791 Год назад

    Good one!! Will save a lot of coding + execution time :)

  • @user-is3gq7sp1h
    @user-is3gq7sp1h 2 года назад +2

    Delta tables with Identity columns loose the ability to be inserted by several processes simultaneously. Only one process will insert data, others will get MetadataChangedException. Are there any workarounds?

  • @YogeshShivakumar
    @YogeshShivakumar Месяц назад

    Can this be used without Databricks as well? or Can I use Spark SQL to utilize identity column feature without Databricks and with using DeltaLake?
    Please tell.

  • @YogeshShivakumar
    @YogeshShivakumar Месяц назад

    Is this only in Databricks , I can utilize this feature?, or can I use Spark SQL without using Databricks and make use of identity column feature using Delta Lake ?
    Please tell.

  • @guilleromero3762
    @guilleromero3762 2 года назад

    Is this new feature only available for delta tables or we can use the identity option in parquet tables? Reading the documentation I think you can use it for several kinds of tables, thanks for your super useful videos! I'm a big fan!

    • @AdvancingAnalytics
      @AdvancingAnalytics  2 года назад +1

      Only Delta I'm afraid! There's a note in the create table docs identity section saying "this functionality only available in Delta lake". The 10.4 release notes are so pretty specific about it being a Delta release, sorry!

  • @rakeshreddybadam6173
    @rakeshreddybadam6173 2 года назад

    Its a great video. I have a small question can we add the identity column by altering the table. I have tried fe different ways but not working: 'Alter table db.student add column student_id bigint GENERATED ALWAYS AS IDENTITY (start with 100 increment by 1 )'

  • @surenderraja1304
    @surenderraja1304 11 месяцев назад

    Very nice, Does IDENTITY auto generates number if id do df.write.() instaed on INSERT statement ?

  • @Kinnoshachi
    @Kinnoshachi 2 года назад

    Noiice!

  • @jefflingen8819
    @jefflingen8819 2 года назад

    Any known issues with merges? .whenNotMatchedInsertAll() Getting error: AnalysisException: cannot resolve Key in UPDATE clause given columns src.Id....

  • @flammenman5593
    @flammenman5593 Год назад

    I´m looking for a way now to read from the metadata these details that were set for the identity. If I start at 100 and increase by 10 e.g. that must have been defined at the creation of the table and must be stored somewhere in the metadata. But how can I get to that information. I already tried with information_shema.columns but for some reason (not runtime) it does not work in my database. It doesnt recognize the information_schema function. Is there any other way to get this info from the metadata? Maybe in python or scala?
    Please let me know.
    Otherwise great Video. I quite enjoy your style of explaining.

  • @zycbrasil2618
    @zycbrasil2618 2 года назад +2

    Hi Simon. I see that new feature works only with Spark SQL. Any similar approach using dataframe api? I don't want to create a table and use it to load a dataframe..

    • @AdvancingAnalytics
      @AdvancingAnalytics  2 года назад +1

      Not that I know of - we usually have a small function to create the table if it doesn't exist already, before we merge into the table!

    • @zycbrasil2618
      @zycbrasil2618 2 года назад

      @@AdvancingAnalytics Same here. Tks

  • @sval4020
    @sval4020 2 года назад

    Anyone knows what kind of function/logic sits behind the GENERATED ALWAYS AS IDENTITY .... ? Is it still doing windowing/sorting or is it hashing? I am not quite sure if this is mentioned somewhere in the docs so far, so wondering what it might be doing under the hood.

    • @AdvancingAnalytics
      @AdvancingAnalytics  2 года назад

      Haven't dug into the execution plans yet, definitely going to have a dig and see how it changes. I did see a new generateidentifiers() function (or something like that!) in the explain steps, just haven't seen what impact that has on shuffles etc.

  • @kyrilluk
    @kyrilluk 2 года назад

    Hi Simon, regarding this issue (ruclips.net/video/Gnn54rp5RWM/видео.html), I encounter it every time I'm using a temp view. So that might explain why your second query didn't have anything been skipped.

  • @gamachu2000
    @gamachu2000 2 года назад

    Does this work with dlt

    • @AdvancingAnalytics
      @AdvancingAnalytics  2 года назад

      Gooooood question, I'd assume not as DLT assumes it will create and manage the table for you, so you wouldn't have the column definition supplied? Might be able to insert into a static (IE: non-dlt) table as a final step, but that misses some of the dlt managed pipeline point!

  • @alexischicoine2072
    @alexischicoine2072 2 года назад

    What if you did a second pass and ranked the monotonic ids.

    • @zycbrasil2618
      @zycbrasil2618 2 года назад

      Monotically_increasing_id is distributed. Row_number() using Window function without partitionBy is not distributed. When we don't define partitionBy, all the data are sent to one executor for generating row number. This can cause performance and memory issues.

  • @andrewfogarty1447
    @andrewfogarty1447 2 года назад

    Is there no way to reset the identity column? Say, 6 months from now, our Identity column has value 10 billion?

    • @AdvancingAnalytics
      @AdvancingAnalytics  2 года назад

      I've not seen any syntax for resetting the seed/increment, no. That's not to say there aren't ways you can do it, but I've not seen any direct table update commands for it!

  • @22seb22
    @22seb22 2 года назад

    3:47 "there's no Will" lol and they say Data Engineers are boring

  • @m1nkeh
    @m1nkeh 2 года назад

    Chapters? 😆

    • @AdvancingAnalytics
      @AdvancingAnalytics  2 года назад +2

      Urgh, fiiiiine. Added some chapters - but that meant I had to actually watch my own video! I'll take a looksie if there's a way we can encourage the auto-chapters to work better, or let people add their own in the comments!

    • @m1nkeh
      @m1nkeh 2 года назад

      @@AdvancingAnalytics haha, only messing about.. probs only of most use on longer vids anyway. keep up the good work dude! 👍

  • @zycbrasil2618
    @zycbrasil2618 2 года назад +1

    My approach to avoid those big unique ids.. val df_base_row_id = df_base.withColumn("row_id", monotonically_increasing_id())
    val df_base_row_id_partition_offset = df_base_row_id.withColumn("partition_id", shiftright('row_id,33))
    .withColumn("row_offset", 'row_id.bitwiseAND(2147483647))
    val partitions_size = df_base_row_id_partition_offset.groupBy("partition_id")
    .count()
    .withColumnRenamed("count", "partition_size")
    val windowSpec = Window.orderBy("partition_id")
    .rowsBetween(Window.unboundedPreceding, -1)
    val partitions_offset = partitions_size.withColumn("partition_offset",
    when(expr("partition_id = 0"), lit(0))
    .otherwise(sum("partition_size").over(windowSpec)))
    val df_inc_rownum = df_base_row_id_partition_offset.join(broadcast(partitions_offset), "partition_id")
    .withColumn("row_num", 'partition_offset+'row_offset+1)