Thanks Mark! I think it's just answered all the questions I have now I'm trying to improve the performance of my NiFi cluster. Now time to go switch everything to the Record processors!
Thank you so much Mark 🥰. You have been excellent like always and I always admire to learn more from you 🙂. If you are starting NiFi course then I'm going to be your first student no doubt 😃. Fantastic video 😍 👌 infact no words. Thanks again for all the help from you and awesome community. I hope this can reach out to more NiFi developers.
Thank you, it's excellent material. I have a question: Is concurrancy tasks configuration in processor means thet it is count of threads on every node or on all nodes? For example 10 nodes in cluster and configuration is 10 councurrancy tasks for processor it will be 100 threads or 10 threads at all?
@@nifinotes5127 I have seen nice feature about Run duration in this video, but now I'm disappointing cause this parameter not for MergeRecord proc. I have about 300k 1-row flowfiles with json and have to merge it in larger files before put S3. I setup 2 MergeRecord processors to minimize pressure: 1) Schedule: 5 concurrancy tasks| All nodes(10)| Run Schedule-1 sec Properties: Min Number of records: 200| Max Number of Records: 1000| Min Bin Size: 5 KB| Max Bin Age: 120 sec| Max Number of Bins: 1000 (it is deffinetly > than threads😀) Balancing before processor: set partition by attribute, which use mergerecord, and test it with only one value of this attribute for clear results of test. 2) Schedule: 5 concurrancy tasks| All nodes(10)| Run Schedule-1 sec Properties: Min Number of records: 10000| Max Number of Records: 100000| Min Bin Size: 25 KB| Max Bin Age: 300 sec| Max Number of Bins: 1000 But after this two processors I still recieve small flowfiles for example ~300k files with 400 mb size merged into 30-35K files. I've tried ro increase bin age to 15 and 30 minutes - no result. I don't understand why but if I use MergeContent it works perfectly and merges into 12 files, (twelve, Carl!) Can u give some recommendations about what I do wrong using mergerecord?
So MergeRecord and MergeContent are incredibly powerful. But I'll admit they can be a bit complex to configure. If you're running into a specific issue I'd recommend asking on Apache Slack (see nifi.apache.org/mailing_lists.html for invite link and workspace link) because it's easier to engage there. But some general guidelines: - If you have the option, prefer MergeContent - it is much more efficient because it doesn't have to parse the data when it's read, it just smashes it together. - You probably want to use a Run Schedule of 0 secs, rather than 1 secs. Use Max Bin to help control max frequency of merging. - With MergeRecord, you can use Data Provenance to look at a FlowFile that has few records and see why it was merged. If you look at the Provenance Event details, it will give an explanation. You can also enable debug logging (add a line to conf/logback.xml: ) and that will log at a DEBUG level the reason for each 'bin' that it merges (i.e., each flowfile that it creates). So here you'll see explanations like "Bin has reached Max Bin Age" (been there long enough based on Max Bin Age property), "Bin is full enough" (reached both min records and min bin size), "Bin is completely full" (reached the Max number of records or max bin size), " or "Maximum number of bins has been exceeded" (May need to increase your Maximum Number of Bins property). Hope this is helpful!
Do you have any recommendations for network based tasks, that may take up quite some time, e.g. downloading huge files via FetchS3Object? Is there a good way to parallelize the download of multiple files other than increasing the number of tasks the processor is allowed to run simultaneously? Does increasing the run time help in this case?
To increase parallelization you would increase the number of concurrent tasks, yes. How much to increase it can get a bit tricky. There's not much processing for FetchS3Object, but the disk and the network can both be bottlenecks. So if you give it too many tasks, you'll saturate one of those, and you'll end up having a lot of downloads going very slowly. Because of how TCP works, with a lot of back-and-forth chatter, you'll find that over a high latency network, having several threads will help. Over a low-latency network, several threads typically results in throttling the transfer rates of all downloads. Even with a high latency network though, in my experience, I'd say that your performance doesn't improve much going beyond maybe 3-5 concurrent tasks. But your mileage may vary. If the intent is to decrease the latency associated with downloading a huge file, and the protocol/source allow for it, it would be possible that a processor could be written that is capable of taking in a FlowFile, and then using several threads to concurrently download several "chunks" of the FlowFile and then re-assembling them together again. But that can get complicated to do efficiently. FetchS3Object specifically doesn't support this.
@@nifinotes5127 Thank you for the detailed response and the video series as well. :) Luckily we avoided all the anti-patterns you've mentioned so far. Which is good to know being relatively new to NiFi and not having that much experience with it yet. Looking forward to the upcoming videos. Have a nice weekend. :)
Hi Mark! I have a question about run duration, please give me some recommendation if it possible. I use invoke http processor as final processor in my flow, it writes sends many http queries. While I'm trying to set up Run Duration >0 I see warning in schedule menu inside processor's settings: "Source Processors with a run duration greater than 0 ms and no incoming connections could lose data when WiFi is shutdown" I suppose that it is true only in case when InvokeHttp reads data, and it is good decision increase run_duration when it writes. Is it true?
That warning shouldn't be concerning in your use case. The reason you see it is that you're configuring the Run Duration before you connect any source processor to it. If you first connect the incoming Connection to the Processor you won't see it. You should definitely be able to set the Run Duration > 0 when the processor is not the source of data.
We need more videos like this one... the whole series is very useful
The most important lesson in the series (yet), thanks a lot for this guide!
Great content! Thanks! Keep making this type of videos. Best regards from Brazil!
Thanks Mark! I think it's just answered all the questions I have now I'm trying to improve the performance of my NiFi cluster. Now time to go switch everything to the Record processors!
Glad it was helpful!
Fantastic. I learned in 25 minutes at the jym more than an hour at the terminal1
Thank you so much Mark 🥰. You have been excellent like always and I always admire to learn more from you 🙂. If you are starting NiFi course then I'm going to be your first student no doubt 😃. Fantastic video 😍 👌 infact no words. Thanks again for all the help from you and awesome community. I hope this can reach out to more NiFi developers.
Thanks Nadeem! Really appreciate you taking the time to watch!
thanks for this!! btw do you have the template?
Very well explained. Like your series of Anti-Patterns, You have a new subscriber 🙂!!
Keep making this type on tutorials! Thanks mark :)
what is the anti pattern? increasing the thread pool so abruptly that it overwhelms the system?
was Event-driven thread pool finally deprecated?
Thank you, it's excellent material.
I have a question: Is concurrancy tasks configuration in processor means thet it is count of threads on every node or on all nodes? For example 10 nodes in cluster and configuration is 10 councurrancy tasks for processor it will be 100 threads or 10 threads at all?
Thanks. It’s per node. So you’d have 100 threads total.
@@nifinotes5127 I have seen nice feature about Run duration in this video, but now I'm disappointing cause this parameter not for MergeRecord proc.
I have about 300k 1-row flowfiles with json and have to merge it in larger files before put S3. I setup 2 MergeRecord processors to minimize pressure:
1) Schedule: 5 concurrancy tasks| All nodes(10)| Run Schedule-1 sec
Properties: Min Number of records: 200| Max Number of Records: 1000| Min Bin Size: 5 KB| Max Bin Age: 120 sec| Max Number of Bins: 1000 (it is deffinetly > than threads😀)
Balancing before processor: set partition by attribute, which use mergerecord, and test it with only one value of this attribute for clear results of test.
2) Schedule: 5 concurrancy tasks| All nodes(10)| Run Schedule-1 sec
Properties: Min Number of records: 10000| Max Number of Records: 100000| Min Bin Size: 25 KB| Max Bin Age: 300 sec| Max Number of Bins: 1000
But after this two processors I still recieve small flowfiles for example ~300k files with 400 mb size merged into 30-35K files.
I've tried ro increase bin age to 15 and 30 minutes - no result.
I don't understand why but if I use MergeContent it works perfectly and merges into 12 files, (twelve, Carl!)
Can u give some recommendations about what I do wrong using mergerecord?
So MergeRecord and MergeContent are incredibly powerful. But I'll admit they can be a bit complex to configure. If you're running into a specific issue I'd recommend asking on Apache Slack (see nifi.apache.org/mailing_lists.html for invite link and workspace link) because it's easier to engage there.
But some general guidelines:
- If you have the option, prefer MergeContent - it is much more efficient because it doesn't have to parse the data when it's read, it just smashes it together.
- You probably want to use a Run Schedule of 0 secs, rather than 1 secs. Use Max Bin to help control max frequency of merging.
- With MergeRecord, you can use Data Provenance to look at a FlowFile that has few records and see why it was merged. If you look at the Provenance Event details, it will give an explanation. You can also enable debug logging (add a line to conf/logback.xml: ) and that will log at a DEBUG level the reason for each 'bin' that it merges (i.e., each flowfile that it creates). So here you'll see explanations like "Bin has reached Max Bin Age" (been there long enough based on Max Bin Age property), "Bin is full enough" (reached both min records and min bin size), "Bin is completely full" (reached the Max number of records or max bin size), " or "Maximum number of bins has been exceeded" (May need to increase your Maximum Number of Bins property).
Hope this is helpful!
Hello Sir i am using InvokeHttp processor in nifi for api calling is it possible to collect or generate logs of success and Failure api calls ?
Thank you Mark
Do you have any recommendations for network based tasks, that may take up quite some time, e.g. downloading huge files via FetchS3Object? Is there a good way to parallelize the download of multiple files other than increasing the number of tasks the processor is allowed to run simultaneously? Does increasing the run time help in this case?
To increase parallelization you would increase the number of concurrent tasks, yes. How much to increase it can get a bit tricky. There's not much processing for FetchS3Object, but the disk and the network can both be bottlenecks. So if you give it too many tasks, you'll saturate one of those, and you'll end up having a lot of downloads going very slowly. Because of how TCP works, with a lot of back-and-forth chatter, you'll find that over a high latency network, having several threads will help. Over a low-latency network, several threads typically results in throttling the transfer rates of all downloads. Even with a high latency network though, in my experience, I'd say that your performance doesn't improve much going beyond maybe 3-5 concurrent tasks. But your mileage may vary.
If the intent is to decrease the latency associated with downloading a huge file, and the protocol/source allow for it, it would be possible that a processor could be written that is capable of taking in a FlowFile, and then using several threads to concurrently download several "chunks" of the FlowFile and then re-assembling them together again. But that can get complicated to do efficiently. FetchS3Object specifically doesn't support this.
@@nifinotes5127 Thank you for the detailed response and the video series as well. :)
Luckily we avoided all the anti-patterns you've mentioned so far. Which is good to know being relatively new to NiFi and not having that much experience with it yet.
Looking forward to the upcoming videos.
Have a nice weekend. :)
Hi Mark!
I have a question about run duration, please give me some recommendation if it possible.
I use invoke http processor as final processor in my flow, it writes sends many http queries.
While I'm trying to set up Run Duration >0 I see warning in schedule menu inside processor's settings:
"Source Processors with a run duration greater than 0 ms and no incoming connections could lose data when WiFi is shutdown"
I suppose that it is true only in case when InvokeHttp reads data, and it is good decision increase run_duration when it writes. Is it true?
That warning shouldn't be concerning in your use case. The reason you see it is that you're configuring the Run Duration before you connect any source processor to it. If you first connect the incoming Connection to the Processor you won't see it. You should definitely be able to set the Run Duration > 0 when the processor is not the source of data.
@@nifinotes5127 thanks. Actually this warning shows in moment I already set incoming connection) this is why I asked my question.
Is it possible to write a script for cron scheduling of nifi processors ?
Please help 🙏
I’m not sure that I understand the question. Processors do support CRON Scheduling natively.
How to schedule a cron job to run Nifi Process groups can anyone help me
Please Help me 🙏
I’d recommend shooting a note to users@nifi.Apache.org or using the Slack channel. Makes it much easier to converse than RUclips comments :)
@@nifinotes5127 please share slack channel link
nifi.apache.org/mailing_lists.html you can go to this page and at the bottom is the Invite Link for slack.
@@nifinotes5127 Thanku soo much for the help