Very Impressive! I want to use autoloader and medallion architecture in my project but i just don't understand how autoloader handles upsert or merge for incremental file load? if i have a file landed in landing zone that i want to merge into staging zone. how it do that ? how it decide this will be update or insert? based on which business keys?
Only issue I had with merging the stream into a delta table is handling merge errors. If the merge fails, the autoloader still counts the file has read, and unless I handle the error in the merge function and write the read stream somewhere, I’ll lose sight of that idea. For this reason I just wound up writing to a table, and then merging that into my final table. Anyone handle this better/differently?
Thanks so much Simon for another great video :) Can you please go ahead with more videos on Azure Databricks in terms of designing a Pipeline from Notebook itself so that, we don't need ADF(to execute Notebook) for Big Data engineering? I prefer to do data transformation from Databricks Notebook using Autoloader.
Im getting this error - Failed to create an Event Grid subscription. Please make sure that your service principal has 'read' permissions (e.g., assign it a Contributor role) in order to list Event Grid Subscriptions.
This is great and thanks for the information. How do we complete the steps to gather secrets. I did not really understand things like serviceprincipalsecret and DirectoryID. Thanks in anticipation for a response
Nice explanation. Thanks. I m new to ADB ..just wondering 1. where can I find the sample data and the note books pls ? 2. You have mentioned few configuration changes for the optimisation. Where and how to get these changed pls ? 3. Do you tun any boot camp or training sessions for a fee ?
Impressive! What if we have hierarchies on ADLS gen2 container like - YYYY/MM/DD/data.csv. Can we use wildcards when reading (YYYY/*/*/*.csv)? I believe it's very rare to have one big folder with millions of files. btw I spent 3 hours setting up all configs but it was worth it 😃
Hey! So you can absolutely set up wildcards, but we'd usually just point Autoloader at the root folder, it'll automatically bring in any new files it finds in the subfolders. So as long as your date hierarchies are within each data entity, it'll be fine (IE: CustomerData/YYYY/MM/DD/file.csv)
Thank you very much for the video, it helped me a lot, but I have a doubt: the second csv you load, it has 5 lines, correct? Min 27:27 Why when you run the query the number of input rows (numimputrows min 31:07) is 10? In my implementation this number is always 2x the number of the rows of the file, and I dont find the answer about it
I have used autoloader but couldn't get it to work right. I had problems with it finding new files and dealing with the parquet format. I attached a schema but I still couldn't get it to work right.
When I run the stream I get an error that [DELTA_INVALID_CHARACTERS_IN_COLUMN_NAMES] Found invalid character(s) among ' ,;{}() \t=' in the column names. My column names don't have any of those charecters, how can I fix this
I want to change the schema of my existing table from int to bigint. Will this be allowed if I use foreachBatch() ? Is there any way to preserve the checkpoint state of the Autoloader stream load if I want to alter the target table ?
Thanks for the great explanation. Can AutoLoader be used for updating the previous records? (for example if a data correction happened to a field, would it correct the previously loaded data?). Thanks!
Clearly explained, Thanks. I have a query, can someone help? The options used in readStream and writeStream can be in any order or we should follow certain order like, firstly .format then .outputMode and so on. I can understand that .load and .start should be at the end and .format("cloudFiles) should be at the start but other than these options, can we follow any order or do we need to follow any specific order
You can follow any order as long as the object that’s being returned accepts that method. So df dot something, it it returns a df, you can then chain another method that returns a df. Here the order does not matter.
Awesome Simon, I searched and read quiet a few blogs , but found your video the best and this is right on top. Very good explanation and this is right on top. Great explanation of autoloader with schema evolution and upserts logic. makes ETL very simple. One thing I am not clear is the way the schema is being defined. Can we not just use StructType and StructField for a simple csv file and define the schema, Why do we need a json schema ?
Hey! I use JSON schema when passing I'm using a separate metadata store, but you can use Struct elements, SQL definitions, whatever method of schema definition you prefer :)
Thanks for your videos, It was really helpful I have few queries, can some one help 1. Do we need to follow any specific order when using options in readStream and writeStream. For example: dataframe.readStream.format("cloudFile).option("cloudFiles.format": "avro").option("multiline", True).schema(schema).load(path) 2. Delta table creation w/ both tableName and location option, is that right?? If I use both only I can see the files like .parquet , _delta log, checkpoint in the specified path and if I use tableName only I can see the table in hive meta store/spark catalog.bronze of SQL editor in databricks The syntax i use, is it ok to use both .tableName() and .location() option DeltaTable.createIfNotExists(spark) .tableName("%s.%s_%s" % (layer, domain, deltaTable)) .addColumn("x", "INTEGER") .location(path) .execute()
I have millions of small cloud files to process, I wanted to control how many files to read in one batch by maxFilesPerTrigger=100000 but it is completely ignoring it and resulting into very slow streaming. Could you please advise on this?
"Azure Databricks consumes up to the lower limit of cloudFiles.maxFilesPerTrigger or cloudFiles.maxBytesPerTrigger, whichever is reached first. This option has no effect when used with Trigger.Once()."
Hello Sir, Great explaination. I have one query regarding one scenario I am facing while reading real time incoming files using spark file streaming. So the json files are being written to ADLS storage in such a way that it is taking atleast 5 to 10 seconds to completely written (Like, Initially file is created with 0byte and then modified comitted finally after 5 to 10 seconds, but on top of this folder, spark streaming is also running, so streaming picks this new file as soon as it is created (with 0byte) and since there was no data at that instant of file creation, it update it's checkpoint and here I lost my data. But I tried with autoloader(cloudfiles with json) option and in same scenario, streaming was able to pick it and no data loss encountered. So I wanted to know that can I rely or trust auto loader? I am asking this because it nowhere in documentation mentioned that cloudfiles work on commit protocol. So can you tell me whether I can trust autoloader for this behaviour or not
So glad to see Simon at the databricks youtube channel so frequently lately.
Clearly explained all the concepts and features for Autoloader. Pretty cool!
Excellent, Simon's content never disappoints 👏🏼. Any place I can get the Notebooks used in the demo from?
Comprehensive and well articulated.
Very interesting topic and a passionate teacher ;-) , thank you!
Amazing way of explanation. Thank you so much
O tipo de conteúdo que eu precisava Champ!
Very Impressive! I want to use autoloader and medallion architecture in my project but i just don't understand how autoloader handles upsert or merge for incremental file load? if i have a file landed in landing zone that i want to merge into staging zone. how it do that ? how it decide this will be update or insert? based on which business keys?
Amazing explanation 👏👏👏👏
Only issue I had with merging the stream into a delta table is handling merge errors. If the merge fails, the autoloader still counts the file has read, and unless I handle the error in the merge function and write the read stream somewhere, I’ll lose sight of that idea. For this reason I just wound up writing to a table, and then merging that into my final table. Anyone handle this better/differently?
Great job. Keep going on.
Thanks so much Simon for another great video :)
Can you please go ahead with more videos on Azure Databricks in terms of designing a Pipeline from Notebook itself so that, we don't need ADF(to execute Notebook) for Big Data engineering? I prefer to do data transformation from Databricks Notebook using Autoloader.
Thanks for the suggestions Muhammad - we can definitely work on adding this to the next Build A Thing - thanks!
wow , you are great !!
I get an error saying 'foreachBatch' does not support partitioning. How do I get around that?
Im getting this error - Failed to create an Event Grid subscription. Please make sure that your service
principal has 'read' permissions (e.g., assign it a Contributor role) in order to list Event Grid Subscriptions.
This is great and thanks for the information. How do we complete the steps to gather secrets. I did not really understand things like serviceprincipalsecret and DirectoryID. Thanks in anticipation for a response
Great video content !
Is it possible to alter data-type of a column in a non-empty Databricks table without deleting the checkpoint metadata ?
Thanks a ton! very nicely explained. So informative!
Nice explanation. Thanks.
I m new to ADB ..just wondering
1. where can I find the sample data and the note books pls ?
2. You have mentioned few configuration changes for the optimisation. Where and how to get these changed pls ?
3. Do you tun any boot camp or training sessions for a fee ?
I recommend this from Udemy Azure Databricks & Spark For Data Engineers (PySpark / SQL) from Ramesh Retnasamy
Impressive! What if we have hierarchies on ADLS gen2 container like - YYYY/MM/DD/data.csv. Can we use wildcards when reading (YYYY/*/*/*.csv)? I believe it's very rare to have one big folder with millions of files.
btw I spent 3 hours setting up all configs but it was worth it 😃
Hey! So you can absolutely set up wildcards, but we'd usually just point Autoloader at the root folder, it'll automatically bring in any new files it finds in the subfolders. So as long as your date hierarchies are within each data entity, it'll be fine (IE: CustomerData/YYYY/MM/DD/file.csv)
Excellent explanation , could you please share the data file and Notebook . Thanks in advance
Thanks a lot, was a very well explained video!
Thank you! Has there been any change in recent updates?
Thank you very much for the video, it helped me a lot, but I have a doubt:
the second csv you load, it has 5 lines, correct? Min 27:27
Why when you run the query the number of input rows (numimputrows min 31:07) is 10?
In my implementation this number is always 2x the number of the rows of the file, and I dont find the answer about it
I have used autoloader but couldn't get it to work right. I had problems with it finding new files and dealing with the parquet format. I attached a schema but I still couldn't get it to work right.
Contact your Databricks rep, they can give you 1:1 support for issues like these.
When I run the stream I get an error that [DELTA_INVALID_CHARACTERS_IN_COLUMN_NAMES] Found invalid character(s) among ' ,;{}()
\t=' in the column names. My column names don't have any of those charecters, how can I fix this
@Databricks what if do a trigger once load and we've had two files for the same delta table primary key in the same day?
I want to change the schema of my existing table from int to bigint. Will this be allowed if I use foreachBatch() ? Is there any way to preserve the checkpoint state of the Autoloader stream load if I want to alter the target table ?
Question - will reader detect new subfolders created in s3 and read files from those sub folders
Yes, it will.
Did they add anything new? In rescued data im getting another field called _file_path that contains the file path
Where can I get notebook for the contents
from delta.tables import *
def upsertToDelta(microBatchOutputDF, batchId):
deltadf = DeltaTable.forname(spark, "NAME")
(deltadf.alias("t")
.merge(
microBatchOutputDF.alias("s"),
" s.employee_id = t.employee_id" )
.whenMatchedUpdateAll()
.whenNotMatchedUpdateAll()
.execute()
)
just typed it here, really needed this code
is there a schema check of new files that can email if new schema found?
Thanks for the great explanation. Can AutoLoader be used for updating the previous records? (for example if a data correction happened to a field, would it correct the previously loaded data?). Thanks!
While you can infer and evolve your schema with AutoLoader, to runa merge statement, you would want to utilize .foreachBatch.
Question for Simon: Does Marcellus Wallace look like a batch to you!?
Clearly explained, Thanks.
I have a query, can someone help?
The options used in readStream and writeStream can be in any order or we should follow certain order like, firstly .format then .outputMode and so on.
I can understand that .load and .start should be at the end and .format("cloudFiles) should be at the start but other than these options, can we follow any order or do we need to follow any specific order
You can follow any order as long as the object that’s being returned accepts that method. So df dot something, it it returns a df, you can then chain another method that returns a df. Here the order does not matter.
Is there any way to not have rescued data column ?
Awesome Simon, I searched and read quiet a few blogs , but found your video the best and this is right on top. Very good explanation and this is right on top. Great explanation of autoloader with schema evolution and upserts logic. makes ETL very simple. One thing I am not clear is the way the schema is being defined. Can we not just use StructType and StructField for a simple csv file and define the schema, Why do we need a json schema ?
Hey! I use JSON schema when passing I'm using a separate metadata store, but you can use Struct elements, SQL definitions, whatever method of schema definition you prefer :)
Thanks for your videos, It was really helpful
I have few queries, can some one help
1. Do we need to follow any specific order when using options in readStream and writeStream. For example: dataframe.readStream.format("cloudFile).option("cloudFiles.format": "avro").option("multiline", True).schema(schema).load(path)
2. Delta table creation w/ both tableName and location option, is that right?? If I use both only I can see the files like .parquet , _delta log, checkpoint in the specified path and if I use tableName only I can see the table in hive meta store/spark catalog.bronze of SQL editor in databricks
The syntax i use, is it ok to use both .tableName() and .location() option DeltaTable.createIfNotExists(spark)
.tableName("%s.%s_%s" % (layer, domain, deltaTable))
.addColumn("x", "INTEGER")
.location(path) .execute()
I have millions of small cloud files to process, I wanted to control how many files to read in one batch by maxFilesPerTrigger=100000 but it is completely ignoring it and resulting into very slow streaming. Could you please advise on this?
"Azure Databricks consumes up to the lower limit of cloudFiles.maxFilesPerTrigger or cloudFiles.maxBytesPerTrigger, whichever is reached first. This option has no effect when used with Trigger.Once()."
Any chance you can also help create a Mqtt example of reading from a Mqtt topic
this is a great idea for a future video!
Hello Sir, Great explaination. I have one query regarding one scenario I am facing while reading real time incoming files using spark file streaming. So the json files are being written to ADLS storage in such a way that it is taking atleast 5 to 10 seconds to completely written (Like, Initially file is created with 0byte and then modified comitted finally after 5 to 10 seconds, but on top of this folder, spark streaming is also running, so streaming picks this new file as soon as it is created (with 0byte) and since there was no data at that instant of file creation, it update it's checkpoint and here I lost my data. But I tried with autoloader(cloudfiles with json) option and in same scenario, streaming was able to pick it and no data loss encountered. So I wanted to know that can I rely or trust auto loader? I am asking this because it nowhere in documentation mentioned that cloudfiles work on commit protocol. So can you tell me whether I can trust autoloader for this behaviour or not
Can anyone tell me incase my source is AWS S3 with autoloader, can anyone share the best source to refer ?
Jai Shree Ram
13:57