Great video, almost covered points that cause performance issues. It's easy to code spark application because of available api but it's important to understand architecture of spark to get the real value out of spark.
@39:22 I did not understand why my job would fail if I used a coalesce to reduce the number of partitions while writing output. Can anyone please explain ? What happens when coalesce() is pushed up to the previous stage ? How does that makes the job to fail ?
As spark is lazy it will look what it has to do as the last step and go backwards to do that. So e.g. it needs to write data to 10 files because you used coalesce(10) at the end. In earlier steps he has to do huge join to calculate this data but because you want it in the end in only 10 files it will already do shuffling for the join into those 10 partitions which would be huge and would not fit into memory failing the job.
He has 96 cores so 480 partitions (less but a little bigger) will allow him to process them in 5 "batches". If he would stay with 540 this would take 6 "batches" and the last one would only utilize 60 cores and remaining 36 being idle so not that opitmal as first solution.
He is saying that the compressed input to stage 21 is 54G, but the uncompressed size is 550G, so the data compresses very well (about 10-to-1), hence the need for more partitions than "normal" because the data "blows out" more than normal--ten times as much when uncompressed. So, he decided to use 100M per input partition in his calculation rather than his normal recommended value of 200M of compressed input per (input) partition, to account for the more-than-typical "blow out"/decompressed size. If the data instead compressed at at 5-to-1 ratio (which is likely more typical), then try using the typical/normally-recommended 200M per partition in your calculations as a starting point.
I should note that the choice of 100M or 200M of compressed input per partition is also dependent on how much executor memory you have. He has 7.6G per core. If you have half that, then you want your input partitions to also be 50% smaller. I often run Spark jobs with just 1-1.5G per core, and my rule-of-thumb is therefore much lower: 30-50M per partition.
@@mrajcok Thanks for your replies. These are very helpful. Just one question though : Do you assume only about 25-30% of available memory utilization per core for shuffle to get to that 30-50M figure?
Normally you use compressed sizes for these. When Spark writes the output of a stage to disk, by default (config item spark.shuffle.compress is true by default) it will compress the data to reduce I/O, which is normally what you want. The Spark UI shows the compressed sizes, so that's another reason to be using compressed sizes in your calculations. As he discusses in the video, if your data compresses "more than average", then you'll want more partitions than average, since the decompressed data will consume more executor memory than average.
@@harishreddyanam2078 just do a google search for "spark lazy loading". Look at the pages that come back from the search on stackoverflow and from the Spark official docs. It would be too much to try and explain in a comment.
If you have several joins on one table, how do you set shuffle partitions count for specific join? As i currently understand, this config is rendered into physical plan only when action is triggered.
540p/96=5.625. He says that after running 5 sets only 62 percent will be utilized, it is because of the reminder. Incase if he uses 480partitions then 480/96 will give a whole value of 5
Great Video! I've been thinking about our spark input read partitions since we have a lot of heavily compressed data using BZIP2. Sometimes this compress is 20-25X. So a 128MB blah.csv.bz2 file is really a 2.5-3GB blah.csv file. Should we reduce the value in our spark.sql.files.maxPartitionBytes to accommodate this and have it result in more partitions created on read?
Most likely "yes", but it depends on how much executor memory you have per core, how many cores you have, and how much processing/CPU you use to process a partition. You want all of your cores busy (i.e., at least one input partition per core). If you do that, but you then run out of executor memory, you could try smaller (hence more) partitions by setting maxPartitionBytes even lower.
Great video, almost covered points that cause performance issues. It's easy to code spark application because of available api but it's important to understand architecture of spark to get the real value out of spark.
Very deep technical explanation on Spark optimization. Nice stuff, thank you for sharing with the community.
instaBlaster...
One of the best videos on Spark job optimization.
@39:22 I did not understand why my job would fail if I used a coalesce to reduce the number of partitions while writing output. Can anyone please explain ? What happens when coalesce() is pushed up to the previous stage ? How does that makes the job to fail ?
As spark is lazy it will look what it has to do as the last step and go backwards to do that. So e.g. it needs to write data to 10 files because you used coalesce(10) at the end. In earlier steps he has to do huge join to calculate this data but because you want it in the end in only 10 files it will already do shuffling for the join into those 10 partitions which would be huge and would not fit into memory failing the job.
I didn't understand the part @31:00 where he chooses 480 partitions instead of 540. Can anyone please explain why
becaus he wants to give every core the same amount of partitions and go away from skewness
He has 96 cores so 480 partitions (less but a little bigger) will allow him to process them in 5 "batches". If he would stay with 540 this would take 6 "batches" and the last one would only utilize 60 cores and remaining 36 being idle so not that opitmal as first solution.
29:20 If shuffle still is 550 GB why is columnar compression good?
Got the same question!
and what is the logic behind using the target size of 100 MB when your input is 54G and shuffle spill is 550G?
He is saying that the compressed input to stage 21 is 54G, but the uncompressed size is 550G, so the data compresses very well (about 10-to-1), hence the need for more partitions than "normal" because the data "blows out" more than normal--ten times as much when uncompressed. So, he decided to use 100M per input partition in his calculation rather than his normal recommended value of 200M of compressed input per (input) partition, to account for the more-than-typical "blow out"/decompressed size. If the data instead compressed at at 5-to-1 ratio (which is likely more typical), then try using the typical/normally-recommended 200M per partition in your calculations as a starting point.
I should note that the choice of 100M or 200M of compressed input per partition is also dependent on how much executor memory you have. He has 7.6G per core. If you have half that, then you want your input partitions to also be 50% smaller. I often run Spark jobs with just 1-1.5G per core, and my rule-of-thumb is therefore much lower: 30-50M per partition.
@@mrajcok Thanks for your replies. These are very helpful. Just one question though : Do you assume only about 25-30% of available memory utilization per core for shuffle to get to that 30-50M figure?
Very nice explanation of the Spark Architecture and terminologies.
Awesome video. Where we can find the slides-reference material?
Are the suggested sizes (target shuffle size, target file write) compressed or uncompressed?
Normally you use compressed sizes for these. When Spark writes the output of a stage to disk, by default (config item spark.shuffle.compress is true by default) it will compress the data to reduce I/O, which is normally what you want. The Spark UI shows the compressed sizes, so that's another reason to be using compressed sizes in your calculations. As he discusses in the video, if your data compresses "more than average", then you'll want more partitions than average, since the decompressed data will consume more executor memory than average.
thanks a lot , @27 min where from you got Stage 21 input read i.e. 45.4g + 8.6g = 54g ?
From stages 19 and 20, the "Shuffle Write" column numbers. These become input to stage 21, as indicated by the DAG graph.
@@mrajcok can you please explain about Lazy loading?
@@harishreddyanam2078 just do a google search for "spark lazy loading". Look at the pages that come back from the search on stackoverflow and from the Spark official docs. It would be too much to try and explain in a comment.
Excellent presentation but terrible screenshots..very hard to read what's written.
If you have several joins on one table, how do you set shuffle partitions count for specific join? As i currently understand, this config is rendered into physical plan only when action is triggered.
nice question this the one I wanted to know. I have not get the answer yet.
spark.conf.set(spark.sql.shuffle.partitions, value), but spark 3.0 should fix this kind of thing for you with AQE
@@josephkambourakis Really excited for AQE, we are upgrading our cluster in about a month
@30 min , how did he get to the percentage, only 60% of the cluster is being utilized?
540p/96=5.625. He says that after running 5 sets only 62 percent will be utilized, it is because of the reminder. Incase if he uses 480partitions then 480/96 will give a whole value of 5
Great Video! I've been thinking about our spark input read partitions since we have a lot of heavily compressed data using BZIP2. Sometimes this compress is 20-25X. So a 128MB blah.csv.bz2 file is really a 2.5-3GB blah.csv file. Should we reduce the value in our spark.sql.files.maxPartitionBytes to accommodate this and have it result in more partitions created on read?
Most likely "yes", but it depends on how much executor memory you have per core, how many cores you have, and how much processing/CPU you use to process a partition. You want all of your cores busy (i.e., at least one input partition per core). If you do that, but you then run out of executor memory, you could try smaller (hence more) partitions by setting maxPartitionBytes even lower.