Stream your PostgreSQL changes into Kafka with Debezium
HTML-код
- Опубликовано: 18 сен 2024
- Watch your Postgres changes stream into Kafka in realtime using Debezium! End to end example of CDC from Postgres all the way into Kafka in realtime.
In this video we go over a tutorial where we stream PostgreSQL changes into Kafka using the Debezium Connector. We go over the docker containers necessary and we demonstrate end to end how the whole thing works.
This is a real life example of how CDC works with Postgres & Kafka.
Code: github.com/irt...
Theory of CDC: • Change Data Capture (C...
#kafka #postgres #streaming #realtime #debezium #cdc #systemDesign #tutorial #programming
Visit me at: irtizahafiz.com?
Contact me at: irtizahafiz9@gmail.com
This is an extremely rich and high quality video. I stumbled upon this by chance when I was searching for some info about Debezium for some project we might be interested in. Interestingly enough, the files provided in the video description did not quite work for MAC and it took about 2 days to make the required adjustments including getting the correct images and port configuration, network configuration etc. but in the end things worked. My experience was that I learned a lot as I went through the process, not just about Debezium but a great deal of other things on what docker-compose structure, network dependency, port issues and all that. So it was worth. Kudos to Irtiza for taking all efforts to put together such as amazing tutorial....!!
Thanks, it is helpful. "Weird" thing with psql shell is just because of missing semicolon at the end.
Ahh I see! That’s a good callout. I swear it used to work before without the semicolon :/
Glad you found it helpful regardless : )
Really like your teaching style. Thank you.
Thanks for watching!
Very good presentation short and to the point. I got the information I needed. Thank you
Awesome concise explanation. Appreciate your work!
Thank you! I will start posting again soon, so please let me know what type of content interests you the most.
Thanks for the comprehensive explanation.
You are welcome!
Thanks a lot , simple and straight forward and clear
I have a question, when you did the update the event on the kafka topic had a '{"before": null, ' why is that null if the row already had information before ?
Hi! That's a very good observation. I was actually running into that intermittently. Like 8/10 updates were working properly, but for 2/10 the before was NULL for some reason. Resetting the WAL of the database fixed it every time, but I am not sure why it happens in the first place.
@@irtizahafiz Hi! Before is null because you missed semicolon ';' after ALTER TABLE REPLICA IDENTITY FULL
Just what i was searching for👏
Glad : )
THIS WAS GREAT! Thank you!
Your 'Trevor' update is showing up in Kafka not as an update, but as a new record ("before": null)
Any idea why the last update had a 'before' value of 'null'? Since the last thing was an update, shouldn't the 'before' have the values as they were before the update (id:2, name:mike)?
same question
Hi! Thank you for bringing this up. I think I answered this in another comment.
I was running into this bug intermittently. I think it had to do with some misconfiguration of the postgres cluster.
I had the same issue but resolved it quickly. Make sure you use - alter table replica identity full.
Explain really well
Thank you! I will start posting again soon, so please let me know what type of content interests you the most.
Nice explanation ! Thanks
Glad you found it helpful : )
Thanks ! Clear Explanation in this video . Do you have PostgreSQL Kafka Sink connector tutorial similar to this ?
Hi! Unfortunately, I don't! I do plan on getting to it at some point though.
Meanwhile, I am doing most of these data pipeline stuff using AWS. So if you are interested, please check out those videos.
How to deal with the concept of foreign key in debezium?
Great video BTW! Life saviour
It's very informative
So nice of you
Really helpful ! thanks
Why after the sql update command, event has `before: null` in Kafka?
Nice video. Like your teaching style. Quick question - How stable is Debezium if you want to run this at scale in production?
Many companies use it at scale.
nice tutorial. you missed ; in some commands, that's why the output was not visible.
Sorry about that!
Great introduction 👍,I have a question ,how to consume the data into a target database or consumer the data from topic using kafka python or any other api ?
For consuming data into a target database, you should be able to do it with a Sink Connector of your respective database. Almost all Sink Connectors should be able to read from a Kafka topic.
As for consuming Kafka using Python, I have an end to end example of that here in this video. I think you will find it helpful: ruclips.net/video/qi7uR3ItaOY/видео.html
Thanks a lot!
Glad it was helpful. I added the idea to my “future videos” list. The next few weeks I will be focusing on the System Design course, so unfortunately it will take some time :(
Awesome tutorial! I wanted to understand 1 piece more. There are 2 patterns which I am now aware of of streaming CDC to Kafka. 1 via debezium connector that you talked about, other is via the Outbox pattern where the application service commits to an Outbox table in the same commit it writes to other application tables, post which tools like Ceres can stream the data to Kafka. What are the core differences in these 2 approaches and is there a recommendation of one over the other?
I am actually not familiar with Outbox :(
Typically the outbox pattern is to guarantee once delivery of the message, it's not to perform cdc (imagine a critical service that you must guarantee message delivery). Your application does not send messages to a kafka topic but instead send to a db table. The outbox service then is responsible to see which messages in your table (outbox) have been successfully published to kafka and retries for the ones who haven't.There is a chance that the outbox publishes a event more than once, so your consumers must be idempotent.
From the consumers side you may have what would be called an inbox pattern, which blocks the same message to be consumer more than once, or they are okay with receiving the same message more than once.
What you may have is CDC with Debezium but instead of publishing directly to kafka it publishes the events to a DB table via outbox.
please help me I'm getting the error '% ERROR: Invalid token 'k' in key pack-format, see -s usage."
Thanks for the explanation. Can you please tell how can we store the result of kafka connector which we are getting from the data base?
I don't think I follow the question.
If you mean the CDC data from kafka, you can route it to whatever you want to - a different DB, application logic, cache, etc.
Hi, Thanks for the video... Whenever I insert or update the data in postgres, those changes are not showing in the docker run command
I would recommend restarting Postgres after you change its configuration.
@@irtizahafiz Thanks you Sir... Now it is working as expected....
when i run the command to fire up dbz, i get "empty reply from server". kindly assist
Great explanations, just have few questions. The first one is how the configuration should be made in debezium.json file if the the source is an API other than Database. The last one, how to allow debezium to listen to change from multiple database tables. Thanks
I can consider making future videos on these. Thank you!
sorry when I ran the docker-compose command it just keeps on spinning forever ...it doesnot work.
Hi! Sorry it's difficult for me to help without some more context.
Hello i need some help i was able to implement everything you showed in video
i cannot consume messages from my local kafka-python can you please help ?
Hi! It's difficult to help without more details.
Nice video, do you have an example for Spring boot with Apache kafka and Debezium connector(MySQL, MS SQL Server)?
Unfortunately, I do not. Maybe something in the future.
If I want Yugabytedb postgresql for doing this migration, is it using same way like postgresql?
It should be similar as long as there's a connector available.
Great introduction. Could you share your scripts in your video?
Hi! I can share as much as my iterm command history allows me. If you need anything in particular, let me know.
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 127.0.0.1:8083/connectors/ --data "@debezium.json"
docker run --tty --network postgres_debezium_cdc_default confluentinc/cp-kafkacat kafkacat -b kafka:9092 \
-C -s key=s -s value=avro -r schema-registry:8081 -t postgres.public.student
Hi I am trying to add skipped.operations = "c" in debezium.json but its still showing create/update/delete?
Hi! Unfortunately, I don't think I can be too helpful when it comes to debezium specifics.
If we have airflow in docker-compose, we don't need Debezium, right?
I think you can do a lot with Airflow, but not sure about the specifics.
table.include.list: can this accept array or list separated by ,?
Hi,
I am getting this error when running kafkacat command
ERROR: Failed to format message in postgres.public.student [0] at offset 0: Avro/Schema-registry message deserialization: REST request failed (code -1): HTTP request failed: URL using bad/illegal format or missing URL : terminating
Please tell the next step how to solve this.
thanks!
Not sure without more details. Can you provide with examples of your payload?
How come on the update it doesn't show the before data?
That was happening intermittently. I believe there are some discussions about this in the comments section.
Is it possible to listen to join queries?
I'm getting this error after running the final kafkacat command. I'm using windows to run this. I checked in both cmd and powershell.
Error: file/topic list only allowed in producer(-P)/kafkaconsumer(-G) mode
docker run --tty --network bin_default confluentinc/cp-kafkacat kafkacat -b kafka:9092 -C -s key=s -s value=avro -r schema-registry:8081 -t postgres.public.student
this command works for me now.
I guess "ALTER TABLE ..." part didn't work, due to lack of semi column at the end. @4.20
Thats weird, messages that end up in kafka topic are not in json, do you have clue why? Im using kafdrop for inspecting topics. Could it be that your kafkacat command also parses messages to json?
Potentially. I know that the Kafkacat command has lots of "quality of life improvements" baked in.
docker: Error response from daemon: network postgres_debezium_cdc_master_default not found. i have getting this error on docker run -tty .....command.plz help
Make sure you define the network in the docker-compose file.
Is there any way at all to do a major version DB upgrade without manually stopping writes, then killing the Debezium connector, etc. ? For a microservice architecture it really is quite costly to have such prolonged downtimes whenever a major upgrade is done. I don’t see a solution anywhere
I don’t have any experience with it :( So unfortunately I won’t be of much help here sorry.
I dont think so, the process of doing a major version upgrade of a database typically requires some level of downtime. But if you can setup a replica and divert the traffic there for the time being it might be a solution
Is it possible to implement this without using docker environment? If it's possible can you demonstrate how to perform that?
Hi! Yes it is possible! I might be able to demonstrate that some time in the future, but it’s currently not in my plans unfortunately.
Good one brother. Do you have a video to write to a source database from this Kafka source topic?
I don’t have a video yet, but I do plan on making one soon. : )
@@irtizahafiz Did you happen to create one?
hi, i wonder, how about delete, does debezium also support delete?
Yes. It supports delete too.
In PostgreSQL, we often forgot to append ";" at the end of the SQL statement 🤪
Thank you for the correction! I will start posting again soon, so please let me know what type of content interests you the most.
Hi Irtiza,
any stuff related SQL server source and sink connectors
I do have a few SQL videos coming up soon.
Great man ,could you please give me docker commond to start consumer on topic!
Unfortunately I don't have the code for this anymore :(
is there any way I can provide a topic name to which it should be published?
Yes definitely!
Hello actually I'm doing this currently and I'm stuck at 8:20 so can anyone please tell me what is --network and also what is postgres_debezium_default it's saying it doesn't exist
Depends on the app you are building. I tried keeping things generic here. The ranking can be based on some score generated by a machine learning model, etc.
Thanks, it helped me a lot but the weird thing is you updated the data in the database in the end, and still Kafka "before key" is null. Anyone has any thoughts on this?😁
That has to do with some kind of Postgres configuration. I remember fixing it immediately after the video, but can't remember :(
@@irtizahafiz It is working after below command, because in video it was not executed because you've missed semicolon.
ALTER TABLE public.student REPLICA IDENTITY FULL;
hi need your help, i try to run docker run --tty --network postgres_debezium_cdc_default but it showing ERROR: Failed to query metadata for topic postgres.public.student: Local: Broker transport failure, please help, thank you
Are you sure the Kafka topic was created successfully? I would recommend listing the available topics in the broker first.
@@irtizahafiz I'm not sure that kafka topic created successfully because I followed all the instructions in your video :D, how to listing the available topics in the broker? thank you
You can run something like this in your CLI to list the Kafka topics:
./usr/bin/kafka-topics --zookeeper zookeeper:2181 --list
@@irtizahafiz i have the same problem and the topic postgres.public.student exists
it works!! It is sufficient runs the command two or threee times
Even with the update , you get null ! ! ! !
I know :( I was getting null every now and then. I believe it was because my Postgres docker container wasn't retaining the WAL setting every time I exited out of it.
please make video postgresql -> debezium -> pubsub, thank you
Hi! I do plan on doing that in the near future. After the system design videos that is.
can we use same using airflow
Yup for sure! Products like AirFlow make data engineering super easy by abstracting away all these.
delete would be nice to see
I am trying to build a project on top of this tutoral , got stuck below, could you help :)
I ran the compose file and i can see from docker desktop all containers are up.
From the go code i was trying to communicate with kafka to create a topic, throws me below error, could you help
"panic: failed to dial: failed to open connection to kafka:9092: dial tcp: lookup kafka: no such host"
Go code :
conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", "my-topic", 0)
if err != nil {
panic(err.Error())
}
conn.SetReadDeadline(time.Now().Add(10 * time.Second))
Hi! I am not really familiar with the project.
Just make sure both the Go project and your kafka containers are running on the same network.
hi need your help, i try to run docker run --tty --network debezium_postgres_cdc_default(my app name start with debezium_postgres_cdc_default) but it showing ERROR: file/topic list only allowed in producer(-P)/kafkaconsumer(-G)
I have faced the same issue in windows command prompt, but it works in powershell.
Are you trying it out in Windows or Mac or Linux?
@@prasadkintali6560 i am having same issue in both command prompt and powershell.i am using windows machine. Can anyone help
Hi Irtiaz
I am facing issue - ERROR: Failed to query metadata for topic postgres.public.student: Local: Broker transport failure
./usr/bin/kafka-topics --zookeeper zookeeper:2181 --list
This gives ->
-bash: ./usr/bin/kafka-topics: No such file or directory
I will really appreciate you if you could help me this issue
Thankyou!
Have you created the Kafka topics successfully? You have to register the topic first, before you can consume from it.
Hi, I'm getting unable to find image 'confluentinc/cp-kafka:latest locally. How to resolve this please help
Error response from daemon: failed to create shim: OCI runtime create failed: container_linux.go:380: starting container process caused: exec: "-b": executable file not found in $PATH: unknown
Hi! I can look at this later on in the week when I have some time.
docker: Error response from daemon: network postgres_debezium_cdc_default not found. having this error while run this command : docker run --tty --network postgres_debezium_cdc_default confluentinc/cp-kafkacat -b kafka:9092 -C -s key=s value=avro -r schema-registry:8001 -t postgres.public.student ...
please help me .. --network postgres_debezium_cdc_default denotes what?
please i need help...
That's because the network has another name. Print in the command line "docker network ls" and find the network in the list. In my case it was "debezium-default".
@@iuriivdovin731 thanks bro
Thanks for helping out! : )
I got the following error :% ERROR: Topic postgres.public.student error: Broker: Leader not available
after running docker run --tty --network postgres_debezium_cdc_default confluentinc/cp-kafkacat kafkacat -b kafka:9092 -C -s key=s -s value=avro -r schema-registry:8081 -t postgres.public.student
Maybe try resetting Docker if it's running already?