In-depth intuition on different ways to send messages to Kafka Topic using Python
HTML-код
- Опубликовано: 10 фев 2025
- There are three primary methods of sending messages:
Fire-and-forget
Synchronous send
Asynchronous send
This video explains all the above techniques (using Python) with in-depth intuition.
Prerequisite:
-------------------------
• Apache Kafka for Pytho...
Code:
-----------
#Fire-and-forget
# from time import sleep
from json import dumps
from kafka import KafkaProducer
# topic_name='hello_world1'
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],value_serializer=lambda x: dumps(x).encode('utf-8'))
# for e in range(100):
data = {'number' : e}
print(data)
producer.send(topic_name, value=data)
sleep(0.5)
#----------------------------------------------------------------------------------------------------------------------
#Synchronous send
# from time import sleep
from json import dumps
from kafka import KafkaProducer
# topic_name='hello_world1'
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],value_serializer=lambda x: dumps(x).encode('utf-8'))
#
# for e in range(100):
data = {'number' : e}
print(data)
try:
record_metadata =producer.send(topic_name, value=data).get(timeout=10)
print(record_metadata.topic)
print(record_metadata.partition)
print(record_metadata.offset)
sleep(0.5)
except Exception as e:
print(e)
#
# producer.flush()
producer.close()
#----------------------------------------------------------------------------------------------------------------------
#Asynchronous send
#
# from json import dumps
from kafka import KafkaProducer
# topic_name='hello_world1'
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],value_serializer=lambda x: dumps(x).encode('utf-8'))
#
# def on_send_success(record_metadata,message):
print()
print("""Successfully produced "{}" to topic {} and partition {} at offset {}""".format(message,record_metadata.topic,record_metadata.partition,record_metadata.offset))
print()
#
# def on_send_error(excp,message):
print()
print('Failed to write the message "{}" , error : {}'.format(message,excp))
print()
# for e in range(1000):
data = {'number' : e}
record_metadata =producer.send(topic_name, value=data).add_callback(on_send_success,message=data).add_errback(on_send_error,message=data)
print("Sent the message {} using send method".format(data))
#
# producer.flush()
producer.close()
#----------------------------------------------------------------------------------------------------------------------
#Asynchronous send
# from time import sleep
from json import dumps
from kafka import KafkaProducer
#
topic_name='hello_world1'
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],value_serializer=lambda x: dumps(x).encode('utf-8'))
#
#
def on_send_success(record_metadata,message):
print("""Successfully produced "{}" to topic {} and partition {} at offset {}""".format(message,record_metadata.topic,record_metadata.partition,record_metadata.offset))
print()
#
#
def on_send_error(excp,message):
print('Failed to write the message "{}" , error : {}'.format(message,excp))
#
for e in range(100):
data = {'number' : e}
record_metadata =producer.send(topic_name, value=data).add_callback(on_send_success,message=data).add_errback(on_send_error,message=data)
print("Sent the message {} using send method".format(data))
print()
sleep(0.5)
producer.flush()
producer.close()
#----------------------------------------------------------------------------------------------------------------------
Check this playlist for more Data Engineering related videos:
• Demystifying Data Engi...
Snowflake Complete Course from scratch with End-to-End Project with in-depth explanation--
doc.clickup.co...
🙏🙏🙏🙏🙏🙏🙏🙏
YOU JUST NEED TO DO
3 THINGS to support my channel
LIKE
SHARE
&
SUBSCRIBE
TO MY RUclips CHANNEL
Insightful 👌
Thank you CodeWithSharath! Happy Learning
Just a suggestion, You should use f string, this will make ur code look better
Totally Agree Ayush Mandloi! Thanks for your suggestion .. will use this in upcoming videos ..
Thanks for your videos! a question: what to do incase of failure (eg: server down) when trying to send a message from producer.Does Kafka provide temporary storage for failed messages that can be retrieved later or else should we manually catch errors and store them in a db , retry them later manually ?
Hello kranthi kumar, this video is dedicated to you which explains the answer of your question in-depth --
ruclips.net/video/Sq9Idany-_o/видео.html
Hope it will be helpful! Happy Learning
@@KnowledgeAmplifier1 thank you so much.
This is really helpful
Can we send message to topic using kafka APIs?
Hi @keshav0844, yes, you can, here is a detail video on this topic -- ruclips.net/video/wZGtH0u-prg/видео.htmlsi=XfnsvGBEVpnQomVl
@@KnowledgeAmplifier1 can you please give me your email id/number is you are fine with it
@@KnowledgeAmplifier1 if possible, can you share your email id/phone number?
How can i contact you sir
Hello Naga Srinivasarao, please post your doubt here ...
I want matlab project on face recognition based new generation atm machine