This project implements a Kafka-based notification system using Python with confluent-kafka. It includes a Kafka producer to publish messages and a Kafka consumer to process them.
- 📝 Overview
- 📂 Project Structure
- 🚀 Kafka Producer (Publishing Messages)
- 🎯 Kafka Consumer (Processing Messages)
- 🛠 Kafka Topic Management
▶️ How to Run⚠️ Error Handling- 🔗 Download & Setup Links
This system consists of two main components:
- Kafka Producer - Publishes messages to a Kafka topic.
- Kafka Consumer - Consumes messages from the Kafka topic and processes them.
Each component ensures that Kafka topics exist before producing or consuming messages. The messages are serialized as JSON before publishing.
project/
│── publish_to_kafka.py # Defines code to publish the payload
│── kafka_service.py # Defines the Kafka Producer
│── consumer.py # Defines the Kafka Consumer
│── .env # Environment variables
│── config.py # Configuration settings
│── README.md # Documentation
The kafka_service.py file defines a producer that publishes messages to a Kafka topic.
-
publish_to_kafka(payload):- Ensures the topic exists using
ensure_kafka_topic(). - Serializes the payload as JSON.
- Publishes the message to Kafka.
- Calls
flush()to ensure the message is sent immediately.
- Ensures the topic exists using
-
ensure_kafka_topic(topic_name):- Checks if the Kafka topic exists.
- If not, creates the topic with default partitions and replication factor.
-
acked(err, msg):- Callback function to check if a message was successfully published.
try:
publish_to_kafka(payload)
except Exception as e:
logger.error("Kafka error while publishing payload: ", e)The consumer.py file defines a consumer that reads messages from the Kafka topic.
-
consume_messages():- Ensures the topic exists before consuming.
- Subscribes to the topic and continuously polls for new messages.
- Calls
process_message()for each received message. - If successful, commits the message offset.
-
process_message(message):- Decodes and processes the message.
- Logs any errors that occur during processing.
-
commit_offsets(message):- Commits the message offset after successful processing.
if __name__ == "__main__":
asyncio.run(consume_messages())Both producer and consumer include the ensure_kafka_topic() function to:
- ✅ Verify if the topic exists.
- ➕ If not, create it dynamically.
Ensure that Kafka is installed and running.
# Start Zookeeper in a new terminal
bin/zookeeper-server-start.sh config/zookeeper.properties
# Start Kafka Broker in another terminal
bin/kafka-server-start.sh config/server.propertiesCreate a .env file and set the following variables:
KAFKA_BOOTSTRAP_SERVERS=localhost:9092
TOPIC_NAME=notifications# Delete an existing topic
bin/kafka-topics.sh --delete --topic notifications --bootstrap-server localhost:9092
# Create a new topic
bin/kafka-topics.sh --create --topic notifications --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1python kafka_service.pypython consumer.pybin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic notifications --from-beginning-
Producer Errors:
- ❌ If message publishing fails, an error is logged.
- 🔄 The producer flushes messages to ensure they are sent.
-
Consumer Errors:
- ❌ If message processing fails, it is logged and offset is not committed.
- ⚙️ Kafka errors are logged and handled appropriately.
- Download Apache Kafka: Kafka Official Download
- Install Java (Required for Kafka): Download Java
- Install Python Dependencies:
pip install confluent-kafka python-dotenv
- Kafka Documentation: Kafka Official Docs
This system provides a robust way to publish and consume messages using Kafka. It ensures that topics exist before publishing and consuming messages and includes error handling mechanisms to handle failures gracefully.