In-depth intuition on different ways to send messages to Kafka Topic using Python

Поделиться
HTML-код
  • Опубликовано: 10 фев 2025
  • There are three primary methods of sending messages:
    Fire-and-forget
    Synchronous send
    Asynchronous send
    This video explains all the above techniques (using Python) with in-depth intuition.
    Prerequisite:
    -------------------------
    • Apache Kafka for Pytho...
    Code:
    -----------
    #Fire-and-forget
    # from time import sleep
    from json import dumps
    from kafka import KafkaProducer
    # topic_name='hello_world1'
    producer = KafkaProducer(bootstrap_servers=['localhost:9092'],value_serializer=lambda x: dumps(x).encode('utf-8'))
    # for e in range(100):
    data = {'number' : e}
    print(data)
    producer.send(topic_name, value=data)
    sleep(0.5)
    #----------------------------------------------------------------------------------------------------------------------
    #Synchronous send
    # from time import sleep
    from json import dumps
    from kafka import KafkaProducer
    # topic_name='hello_world1'
    producer = KafkaProducer(bootstrap_servers=['localhost:9092'],value_serializer=lambda x: dumps(x).encode('utf-8'))
    #
    # for e in range(100):
    data = {'number' : e}
    print(data)
    try:
    record_metadata =producer.send(topic_name, value=data).get(timeout=10)
    print(record_metadata.topic)
    print(record_metadata.partition)
    print(record_metadata.offset)
    sleep(0.5)
    except Exception as e:
    print(e)
    #
    # producer.flush()
    producer.close()
    #----------------------------------------------------------------------------------------------------------------------
    #Asynchronous send
    #
    # from json import dumps
    from kafka import KafkaProducer
    # topic_name='hello_world1'
    producer = KafkaProducer(bootstrap_servers=['localhost:9092'],value_serializer=lambda x: dumps(x).encode('utf-8'))
    #
    # def on_send_success(record_metadata,message):
    print()
    print("""Successfully produced "{}" to topic {} and partition {} at offset {}""".format(message,record_metadata.topic,record_metadata.partition,record_metadata.offset))
    print()
    #
    # def on_send_error(excp,message):
    print()
    print('Failed to write the message "{}" , error : {}'.format(message,excp))
    print()
    # for e in range(1000):
    data = {'number' : e}
    record_metadata =producer.send(topic_name, value=data).add_callback(on_send_success,message=data).add_errback(on_send_error,message=data)
    print("Sent the message {} using send method".format(data))
    #
    # producer.flush()
    producer.close()
    #----------------------------------------------------------------------------------------------------------------------
    #Asynchronous send
    # from time import sleep
    from json import dumps
    from kafka import KafkaProducer
    #
    topic_name='hello_world1'
    producer = KafkaProducer(bootstrap_servers=['localhost:9092'],value_serializer=lambda x: dumps(x).encode('utf-8'))
    #
    #
    def on_send_success(record_metadata,message):
    print("""Successfully produced "{}" to topic {} and partition {} at offset {}""".format(message,record_metadata.topic,record_metadata.partition,record_metadata.offset))
    print()
    #
    #
    def on_send_error(excp,message):
    print('Failed to write the message "{}" , error : {}'.format(message,excp))
    #
    for e in range(100):
    data = {'number' : e}
    record_metadata =producer.send(topic_name, value=data).add_callback(on_send_success,message=data).add_errback(on_send_error,message=data)
    print("Sent the message {} using send method".format(data))
    print()
    sleep(0.5)
    producer.flush()
    producer.close()
    #----------------------------------------------------------------------------------------------------------------------
    Check this playlist for more Data Engineering related videos:
    • Demystifying Data Engi...
    Snowflake Complete Course from scratch with End-to-End Project with in-depth explanation--
    doc.clickup.co...
    🙏🙏🙏🙏🙏🙏🙏🙏
    YOU JUST NEED TO DO
    3 THINGS to support my channel
    LIKE
    SHARE
    &
    SUBSCRIBE
    TO MY RUclips CHANNEL

Комментарии •

  • @codewithsharath5988
    @codewithsharath5988 2 года назад +1

    Insightful 👌

  • @AyushMandloi
    @AyushMandloi Год назад +1

    Just a suggestion, You should use f string, this will make ur code look better

    • @KnowledgeAmplifier1
      @KnowledgeAmplifier1  Год назад

      Totally Agree Ayush Mandloi! Thanks for your suggestion .. will use this in upcoming videos ..

  • @kranthikumar610
    @kranthikumar610 2 года назад +1

    Thanks for your videos! a question: what to do incase of failure (eg: server down) when trying to send a message from producer.Does Kafka provide temporary storage for failed messages that can be retrieved later or else should we manually catch errors and store them in a db , retry them later manually ?

    • @KnowledgeAmplifier1
      @KnowledgeAmplifier1  2 года назад +1

      Hello kranthi kumar, this video is dedicated to you which explains the answer of your question in-depth --
      ruclips.net/video/Sq9Idany-_o/видео.html
      Hope it will be helpful! Happy Learning

    • @kranthikumar610
      @kranthikumar610 2 года назад +1

      @@KnowledgeAmplifier1 thank you so much.
      This is really helpful

  • @keshav0844
    @keshav0844 5 месяцев назад +1

    Can we send message to topic using kafka APIs?

    • @KnowledgeAmplifier1
      @KnowledgeAmplifier1  5 месяцев назад

      Hi @keshav0844, yes, you can, here is a detail video on this topic -- ruclips.net/video/wZGtH0u-prg/видео.htmlsi=XfnsvGBEVpnQomVl

    • @keshav0844
      @keshav0844 5 месяцев назад

      @@KnowledgeAmplifier1 can you please give me your email id/number is you are fine with it

    • @keshav0844
      @keshav0844 5 месяцев назад

      @@KnowledgeAmplifier1 if possible, can you share your email id/phone number?

  • @nagasrinivasarao1385
    @nagasrinivasarao1385 2 года назад +1

    How can i contact you sir

    • @KnowledgeAmplifier1
      @KnowledgeAmplifier1  2 года назад

      Hello Naga Srinivasarao, please post your doubt here ...

    • @nagasrinivasarao1385
      @nagasrinivasarao1385 2 года назад

      I want matlab project on face recognition based new generation atm machine