Say if we have deptid 111 in emp table a million times and deptid 111 in dept table over 500k times. During the shuffle spark would create 200 partitions. So deptid 111 of emptable may split across 20 partitions and deptid 111 of depttable may split across 10 partitions and if the sort and merge is performed on these partitions, then this would result in partial join. How does spark handle it internally?
But in the third stage its not completed right lets say there is one more filter operation on the data frame it will still be in that stage only but if the data frame encounters a shuffle operation like join there will be another stage correct ?
@@rajasdataengineering7585 One more question : How can we use broadcast if the small df couldn't occupy the memory? Wouldn't the data spill from the memory?
Hello! 1 executor unit is not 1 worker node unit? Maybe this worker node 1 is rack or little cluster? Or maybe this executors is actually containers (cores) on 1 executor (worker)?
Good explanation Raja. Few questions 1)Does number of partitions determined by number of cores in the cluster or input split size for example s3 bucket 128MB 2)what happens if the partition size greater than the executor size . Does it spill to the disk ? Is that impacts the performance ?
Thanks Venkat. 1. Number of partitions are determined by various factors. If the input file is in splittable format, each core will start read the data in parallel and each core can produce one partition at 128 mb size. If the input file is much bigger, each core will produce multiple partitions of 128 mb. So number of partitions will be in multiples of number of cores. 2. Usually partition size does not exceed executor onheap memory. If Dataframe (multiple distributed partitions across cluster) size is exceeding total size of on heap memory, it leads to data spill. So few partitions will be stored in local disk of worker node. Splilled data hits the performance as it needs to be recalculated every time. Hope it helps
You are here to make our lives simple. Thank you so much !!
Thank you Omprakash
No one can explain better than this..Thanks raja for your efforts and time.
Thanks for your comment. Glad it helps you
to-the-point explanation, thanks!
Glad it was helpful! Thanks
Best channel for data bricks
Thank you
hats of to you sir g ur explanation is next level.
Thank you, Suresh!
Excellent Explanation!!!
The best explanation i´ve seen.
Thank you
Say if we have deptid 111 in emp table a million times and deptid 111 in dept table over 500k times.
During the shuffle spark would create 200 partitions. So deptid 111 of emptable may split across 20 partitions and deptid 111 of depttable may split across 10 partitions and if the sort and merge is performed on these partitions, then this would result in partial join. How does spark handle it internally?
Very good explanation
Thank you
Excellence explanation thank you
Glad it was helpful! Thanks Prathap
Thank you for clear explaination
Is this the same as the Sort-Merge-Bucket (SMB) join?
Hats off
Thank you
Thank You Sir
Most welcome
Sir i have seen multiple join strategies are there . I could find in ur playlist.
That's great
ur awsome Spark Guru
Thanks
But in the third stage its not completed right lets say there is one more filter operation on the data frame it will still be in that stage only but if the data frame encounters a shuffle operation like join there will be another stage correct ?
Yes that's right. Only when there is shuffle through wide transformation, new stage would be created
@@rajasdataengineering7585 thanks Raja
Thanks, Helpful
Thanks
Is this why we use BROADCAST join? Because normal joins are expensive?
Exactly, this is the reason why we need to use broadcast join to avoid expensive sort merge join
@@rajasdataengineering7585 One more question : How can we use broadcast if the small df couldn't occupy the memory? Wouldn't the data spill from the memory?
Hello!
1 executor unit is not 1 worker node unit? Maybe this worker node 1 is rack or little cluster? Or maybe this executors is actually containers (cores) on 1 executor (worker)?
Good explanation Raja. Few questions 1)Does number of partitions determined by number of cores in the cluster or input split size for example s3 bucket 128MB 2)what happens if the partition size greater than the executor size . Does it spill to the disk ? Is that impacts the performance ?
Thanks Venkat.
1. Number of partitions are determined by various factors. If the input file is in splittable format, each core will start read the data in parallel and each core can produce one partition at 128 mb size. If the input file is much bigger, each core will produce multiple partitions of 128 mb. So number of partitions will be in multiples of number of cores.
2. Usually partition size does not exceed executor onheap memory. If Dataframe (multiple distributed partitions across cluster) size is exceeding total size of on heap memory, it leads to data spill. So few partitions will be stored in local disk of worker node. Splilled data hits the performance as it needs to be recalculated every time.
Hope it helps
@@rajasdataengineering7585 thanks raja