Advancing Spark - Delta Merging with Structured Streaming Data

Поделиться
HTML-код
  • Опубликовано: 3 ноя 2020
  • Delta's merge functionality is amazingly valuable when dealing with largely varied and changeable datasets, however there is a growing movement towards streaming data as more and more source systems over live event feeds as a method of ingesting data. But how can we take advantage of both/ Using the full delta merge functionality AND spark streaming? At the same time?!?
    In this week's video, Simon shows us how to take an existing Delta streaming query, appending data directly from one delta table to another, and update it with full Delta merge capabilities.
    As always, stop by the advancing analytics website if you want to know more about our Databricks training, or how we can help you build a next generation spark-based data platform!
    www.advancinganalytics.co.uk

Комментарии • 59

  • @NeumsFor9
    @NeumsFor9 3 года назад +1

    Mohit Batra's Pluralsight course on this subject is a great complement to this video. Good work. Whatley + Batra = All Bases Covered :)

  • @jonathanbergenblom9888
    @jonathanbergenblom9888 3 года назад +4

    You’re targeting everything I could ever ask for. Best azure/dataengineer topic channel out there.

    • @AdvancingAnalytics
      @AdvancingAnalytics  3 года назад +1

      Wahey! Glad to hear it's hitting the mark - hit us up if there are any topics we should add to the backlog!
      Simon

  • @taikoktsui_sithlord
    @taikoktsui_sithlord 11 месяцев назад

    newbie data engineer here, just the thing I needed for this task I'm assigned!

  • @alifrad
    @alifrad 2 года назад

    very happy that I found your great channel... very educational and... thank you Advancing Analytics

  • @richardgriffiths3277
    @richardgriffiths3277 3 года назад

    Awesome. Something was nagging me all this week after a hack in work to get data streaming into our lake, for all append data great... but we had one regular batch process from a SQL db. I thought "well everything listening to this now further downstream in the lake can't take advantage of the self-producing watermarks of delta streams...there must be a way" This is the way! Thank you
    Basically one off historical loads use batch to get it in to delta, ongoing batches to keep that fluid stream to the next process, use this :)

  • @mnseshan
    @mnseshan 2 года назад

    Brilliant video my friend. Necessary & sufficient ..👍🙏

  • @AvrtblrBM
    @AvrtblrBM 10 месяцев назад

    Awesome work. Keep going 👍

  • @PakIslam2012
    @PakIslam2012 3 года назад

    Hi, Great work...Love the channels, keep doing the videos you make it so entertaining and easy to understand
    Can we do a video on how to determine the best and most cosft effective cluster configuration on Azure Databricks when running our Batch or Streaming job over the platform?

  • @avnish.dixit_
    @avnish.dixit_ 3 года назад

    Very Helpful...

  • @zhakunigor
    @zhakunigor Год назад

    Finally I got the solution for deduplication in streaming!! Omg! Thanks!

  • @anandladda
    @anandladda 2 года назад +3

    Hi Simon - Great videos & helpful content in understanding core spark and Db concepts. Do you maintain a repo someplace of the various notebooks you use in your videos?

  • @fb-gu2er
    @fb-gu2er 3 года назад +2

    I feel the deduplication might cause issues. You should only de duplicate on the primary key alone, not both, primary and change key, because you may end up writing to the same target row and delta will throw an exception. I know it’s a simple example. It’s best in production to append a timestamp to every row and deduplicate only by the primary key by selecting the most recent row with the largest timestamp

    • @AdvancingAnalytics
      @AdvancingAnalytics  3 года назад +2

      Aha, yep, that's very true. Our standard pattern, if we de-dupe at all, just uses the Primary Key. At the time I made this video, a client specifically had issues with conflicting updates and the received file was the only timestamp - therefore we couldn't determine a "winner" and they wanted the update to fail. Lazy example, just to demonstrate what can be done inside the microbatching function :)
      Simon

  • @nikolaychalkanov896
    @nikolaychalkanov896 9 месяцев назад

    Learning a lot from your channel Simon. Thank you! Is there a way we can further improve the merge into the foreachBatch func - like partition pruning towards destination write?

  • @NORCupcake
    @NORCupcake 3 года назад

    If I wish to fetch data through API's using GET Request (e.g. from an accounting system). Would you create a script in Databricks that handles the ETL process on a scheduled basis, or is there any other platforms that would be more efficient and suffice better? Thank you for sharing great content!

  • @saeedrahman8362
    @saeedrahman8362 3 года назад

    This is very helpful, thanks.
    But one issue I found is that I can't save the delta with parttiion if we are using foreachBatch, so how can we make sure that the data is still partitioned when saved.

  • @koleaby4
    @koleaby4 2 года назад +1

    a very useful approach - thanks for sharing 👍
    One issue I am facing in our systems is merge conflicts - when there are several different updates in the source, which are targeting the same record in the target table.
    Have you been in a similar situation? If so - how did you approach it?
    An idea for future topics (something I found very limited documentation for) - joining streaming DFs.
    * How to join 2 streaming DFs before writing them into sink?
    * Is it possible to aggregate one of the streams before joining the two?
    * etc.
    Thanks again

    • @AdvancingAnalytics
      @AdvancingAnalytics  2 года назад +1

      Yep, that scenario comes up fairly often with engineered merges. You need to remove the duplicate matches somehow - either by changing the match key, or removing them from the incoming stream - as long as you put the dedupe logic (dropduplicates or a target filter, depending on requirements) inside the forEachBatch function you'll be fine!

    • @koleaby4
      @koleaby4 2 года назад

      @@AdvancingAnalytics yes, that's our current approach - thanks for validating my thought process 👍🏼

  • @stvv5546
    @stvv5546 3 года назад +3

    Hey there, been following for months now, this is indeed the best channel on all things spark/databricks/delta lake for sure! Many thanks for providing the knowledge! Appreciate it!
    I got a situation in which we were extracting data from a delta table in a batch mode, saving as delta in a target table and then writing to an azure sql server database table. Right now the situation is about to change and we should start streaming from the same source table (since it will get refreshed every 3-4min or so), then save the data in the same target delta table and from there load into the same sql table we used in above batch scenario.
    So is it actually possible to write the results of the streaming query directly to a database like for example SQL server on Azure and will it bottleneck at the time of writing (especially when dealing with huge number of rows streamed)?
    Also is it possible to tell spark streaming from which point in time I want my former batch table (now a streaming source) to be streamed from since I do not want to start from scratch and stream it fully, I just want to start the stream where the old batch process has ended? (we are talking of a billion record source table here!)
    Many thanks!

    • @AdvancingAnalytics
      @AdvancingAnalytics  3 года назад +2

      Hey hey - glad the videos are hitting the spot!
      So - writing to the SQL DB will indeed bottleneck, we tend to find that you quickly hit DTU limits quicker than you hit databricks cluster limits, but both can be tweaked to overcome.
      Streaming doesn't support SQLDB as a native sink, but you can use the foreachBatch() approach to do this, it's described in this page: docs.databricks.com/spark/latest/structured-streaming/foreach.html
      In terms of starting points, you can provide a timestamp and/or delta transaction log version that the stream should start from, then all progress is logged in the checkpoint anyway, so it should be a pretty efficient cut over.
      Simon

    • @stvv5546
      @stvv5546 3 года назад

      @@AdvancingAnalytics Thank you!

  • @Nehmaiz
    @Nehmaiz 2 года назад

    Thank you for this video, super informative! one question though, where do I see the output of the print statements in the mergetoDF() function?

    • @AdvancingAnalytics
      @AdvancingAnalytics  2 года назад

      They're not exposed in the notebook output directly, but you can either 1) view the outputs in the logs, or 2) call a logging function to pass the message to log analytics etc, or even just an audit delta table

    • @Nehmaiz
      @Nehmaiz 2 года назад

      @@AdvancingAnalytics Thanks, they don't show up the logs hence my question. I actually created an audit delta table as you mentioned.

  • @danielguillermopericosanch9103
    @danielguillermopericosanch9103 2 года назад

    I'm curious about the print statement that you added in the foreachBatch function. I don't see it anywhere... 🤔. Thank you Simon!

    • @AdvancingAnalytics
      @AdvancingAnalytics  2 года назад

      Yeah, you don't see the output of that nested function, that was an oversight when I put the demo together. But you can output the statement to the spark logs, or call a logging function to pass the data back to log analytics etc!
      Would be nice if any cell output fed directly into the streaming microbatch outputs...

  • @marellasrikanth8497
    @marellasrikanth8497 3 года назад

    Thanks for sharing knowledge!
    Can this approach applies to other targets like 'Snowflake' or Azure SQL as well?

    • @AdvancingAnalytics
      @AdvancingAnalytics  3 года назад

      Hey! So from Databricks to other services, the foreachBatch can use any valid datawriter. So yep, you can microbatch to SQLDB, Snowflake, whatever you fancy.
      Whether the streaming capabilities of snowflake have a similar function to spark streaming, I have no idea! :)
      Simon

  • @marcocaviezel2672
    @marcocaviezel2672 3 года назад

    Hi Simon!
    Great video!
    A more general question.
    Do you know if there is a “when not matched by source” in Databricks?
    Because I want to achieve that data which isn’t in the “new data” is deleted in the original table.
    Unfortunately I don’t have a delete flag in the new data.

    • @AdvancingAnalytics
      @AdvancingAnalytics  3 года назад +2

      Hey Marco, sorry, missed this one! There isn't a "not matched by source" in the default merge criteria, which is a little annoying. If you're not maintaining history, then you could just overwrite the table rather than merging, but I'm assuming you want something more along the SCD Type 2 route.
      The main scenario I've seen is a little messy - You do an initial query to bring back any records in the destination that aren't in the update, append those logical deletions into the update dataframe, then handle those as one of the matched elements in your merge statement. Not the cleanest and feels "hacky", but the examples I've seen from Databricks use that pattern!
      Their SCD 2 example notebook is a good starting point - docs.databricks.com/_static/notebooks/merge-in-scd-type-2.html
      Simon

    • @marcocaviezel2672
      @marcocaviezel2672 3 года назад

      Hey Simon!
      Thanks a lot for this source!
      So far I worked with a logical delete after the merge statement, but I will try to implement your suggestion.
      -- Logical delete
      UPDATE testdb.test_data_delta
      SET DeleteFlag = 1
      WHERE testdb.test_data_delta.Index in
      (
      SELECT
      D.index
      FROM testdb.test_data_delta AS D
      LEFT JOIN upsert_data AS S ON (S.Index = D.Index)
      WHERE S.Index is null
      )
      Thanks a lot for you great channel! It really helps to learn databricks :-)

  • @kasthurisomshekar3717
    @kasthurisomshekar3717 Год назад

    hey Simon ..Iwatched all the videos of your's on autoloader. it's awesome work. I need one help. I want use drop duplicate dataframe in autoloader with watermark value. can you please provide example syntax for that. it would be really helpful

  • @mannammanojkumar2822
    @mannammanojkumar2822 2 года назад

    Hi, excellent presentation.
    if we create multiple dataframes beween and read and write streams, is replication will continuously reflect in data frames as well?.

    • @AdvancingAnalytics
      @AdvancingAnalytics  2 года назад

      You can join together different streaming/non-streaming dataframes but there are complexities around state. Each time you call an action on a streaming dataframe (ie: if you wanted multiple writes), each will create it's own streaming query and trigger as specified

  • @sid0000009
    @sid0000009 3 года назад

    If we add Ignorechanges = True during the read stream would be make it more optimize? File Level CDC read before update it on the target? ( Needless to say I am big fan of your videos :))

    • @AdvancingAnalytics
      @AdvancingAnalytics  3 года назад +1

      Without ignore changes, you'll get errors if you ever need to re-write files in the source delta. Basically kicks out the whole updated file with unchanged and updated rows combined - it's not particularly efficient as you might get 10,000 unchanged rows with a single updated row, but yes, you would only see the files that were changed in the source table.
      As with most things delta, there's a lot of data re-writing going on under the hood to make things nice and simple from a code perspective!

  • @gauravkumar796
    @gauravkumar796 3 года назад

    Hi buddy, I have been following your channel and it has been very helpful with my Databricks journey in organization. Currently we are using Databricks for data ingestion and creating delta tables as part of batch jobs. Now business users want to use this data for BI and analytics .. primarily in Tableau or any interface (Might be Redash in future). My question is that if Databricks is sufficient for BI , does it provide good performance.. or do we need any other tool like Qubole or Dremio which provides a semantic later on top of ADLS data without any data ingestion. Please let me know.

    • @AdvancingAnalytics
      @AdvancingAnalytics  3 года назад +2

      Depends what "good performance" is - it's still a distributed compute platform, so it'll always take a couple of seconds to return data - if your users are looking for millisecond latency, they'll need some kind of cached semantic layer (either inside Tableau server, Power BI etc). If they're happy that queries may take a few seconds to return, then you can likely tweak and tune a cluster to be sufficient directly. All depends on requirements - the real benefit of Databricks is the ability to run HUGE queries over massive datasets in reasonable times :)

    • @bhaveshpatelaus
      @bhaveshpatelaus 3 года назад

      @@AdvancingAnalytics We are using the delta lake with Power BI Premium for our BI and Analytics workload and the performance is pretty good so far.

    • @gauravkumar796
      @gauravkumar796 2 года назад

      @@bhaveshpatelaus do you connect powerbi via serveless cluster or SQL endpoint?

  • @avnish.dixit_
    @avnish.dixit_ 3 года назад

    What if we want to modify data present in DataFrame but we don't want large amount of latency. I mean is there any other approach as well which we can use instead of ForeachBatch

    • @fb-gu2er
      @fb-gu2er 3 года назад

      You shouldn’t be updating too much in this case. The best use case for delta merges is to deduplicate, maybe add some metadata columns like timestamp, etc, and merge the results. For what you want you should do it before you get to the foreachbatch on a merge

  • @sid0000009
    @sid0000009 3 года назад

    In ForeachBatch , how do we control the batch size we want it to read at a given time...like can we set the threshold in someway.. thank you

    • @AdvancingAnalytics
      @AdvancingAnalytics  3 года назад +3

      Same way you work with normal streaming queries! You can use the .trigger() function on the writeStream action and the maxFilesPerTrigger/maxBytesPerTrigger readStream options to tweak how the batches are structured.
      Simon

    • @sid0000009
      @sid0000009 3 года назад

      @@AdvancingAnalytics thanks

  • @kuldipjoshi1406
    @kuldipjoshi1406 3 года назад

    is there any way to directly upsert data to adls gen 2 with structured streaming.?

    • @AdvancingAnalytics
      @AdvancingAnalytics  3 года назад

      In this example, the delta table is stored within ADLS Gen 2, so it's upserting via the merge clause.
      If you want to put the data directly to the lake without a delta table, you can write to one of the native output sinks in adls gen 2, but you'll need to be careful about the output mode (append/update/complete) depending on the aggregations you're using. The databricks streaming examples are a pretty good starting place - docs.databricks.com/spark/latest/structured-streaming/demo-notebooks.html#structured-streaming-demo-python-notebook

    • @kuldipjoshi1406
      @kuldipjoshi1406 3 года назад

      @@AdvancingAnalytics Yeh, this was really helpful. If I want to send data realtime to other downstream kafka or event hub, should i hit the final merged table or should i consider using 1)one staging table having append mode and write rows in the downstream directly and parallelly merge it into merge table OR 2)hit merged table and and write to downstream sink (with ignorechange=true and i will need to handle bunch of data as doc. says into downstram)

  • @giovanicamargodosanjos6659
    @giovanicamargodosanjos6659 2 года назад

    What if I want to pass another param to mergetoDF function?

    • @AdvancingAnalytics
      @AdvancingAnalytics  2 года назад

      Hrm, haven't tried overloading it with additional parameters - it should have context for other variables defined in the session though, without you needing to pass them in explicitly? Not great from a coding standpoint but should work :)

  • @shanhuahuang3063
    @shanhuahuang3063 Год назад

    why I got errors like
    AttributeError: 'DataFrame' object has no attribute 'merge'?

    • @AdvancingAnalytics
      @AdvancingAnalytics  Год назад

      DataFrames don't have the merge function - you need to create a DeltaTable object using DeltaTable.forPath() then run the merge command on that new object!

    • @shanhuahuang3063
      @shanhuahuang3063 Год назад

      @@AdvancingAnalytics Thanks! I suggest if you can make some vedio on how to get key vault for databricks that will be great. In my working place, setting up key vaults using dutils.getkeys() then print out something I am sure a lot of people want to know that.

    • @shanhuahuang3063
      @shanhuahuang3063 Год назад

      @@AdvancingAnalytics It is like setting up storage account string, key valut and run some set up script. Please let me know if you can make a vedio of it. I will share around as I am very interesting in it. We are using databrisk at work.