Apache Kafka Event-Driven Architecture
Build scalable event-driven systems with Apache Kafka - from local development to production deployment.
Core Concepts
Kafka is a distributed event streaming platform with three core capabilities:
- Publish and Subscribe - Read and write streams of events (records)
- Store - Durably store event streams with fault tolerance
- Process - Process event streams in real-time or retrospectively
Key Components
- Producers - Applications that publish events to topics
- Consumers - Applications that subscribe to topics and process events
- Brokers - Kafka servers that store and serve data
- Topics - Named event streams organized into partitions
- Partitions - Ordered, immutable sequences of events within a topic
- Consumer Groups - Load-balanced consumption across multiple consumers
- Replication - Data redundancy across brokers for fault tolerance
Quick Start Workflows
1. Hello World - Local Setup
Start Kafka with Docker:
bash
1# Pull and run Apache Kafka (includes KRaft - no Zookeeper needed)
2docker pull apache/kafka:latest
3docker run -p 9092:9092 apache/kafka:latest
Create a topic:
bash
1# Using Kafka CLI tools (if installed locally)
2bin/kafka-topics.sh --bootstrap-server localhost:9092 \
3 --create --topic hello-world \
4 --partitions 3 --replication-factor 1
5
6# Using Docker
7docker exec <container-id> /opt/kafka/bin/kafka-topics.sh \
8 --bootstrap-server localhost:9092 \
9 --create --topic hello-world \
10 --partitions 3 --replication-factor 1
Produce messages:
bash
1bin/kafka-console-producer.sh --topic hello-world \
2 --bootstrap-server localhost:9092
Consume messages:
bash
1bin/kafka-console-consumer.sh --topic hello-world \
2 --from-beginning --bootstrap-server localhost:9092
2. Basic Producer (Java)
See scripts/simple_producer.java for a complete working example.
Key configuration:
java
1Properties props = new Properties();
2props.put("bootstrap.servers", "localhost:9092");
3props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
4props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
5props.put("acks", "all"); // Wait for all replicas
6props.put("retries", 3); // Retry on transient errors
7
8KafkaProducer<String, String> producer = new KafkaProducer<>(props);
3. Basic Consumer (Java)
See scripts/simple_consumer.java for a complete working example.
Key configuration:
java
1Properties props = new Properties();
2props.put("bootstrap.servers", "localhost:9092");
3props.put("group.id", "my-consumer-group");
4props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
5props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
6props.put("enable.auto.commit", "false"); // Manual offset management
7props.put("auto.offset.reset", "earliest"); // Start from beginning if no offset
8
9consumer.subscribe(Collections.singletonList("my-topic"));
Advanced Patterns
Stream Processing with Kafka Streams
For stateful stream processing, windowing, joins, and aggregations, see:
When to use Kafka Streams:
- Real-time aggregations and analytics
- Stream enrichment via joins (stream-stream, stream-table)
- Windowed computations (tumbling, hopping, sliding)
- Stateful transformations requiring local state stores
Data Integration with Kafka Connect
For integrating Kafka with databases, file systems, and external systems, see:
When to use Kafka Connect:
- Import data from databases (CDC patterns)
- Export Kafka data to data warehouses
- File-based integrations
- Managed, scalable data pipelines without custom code
Transactions and Exactly-Once Semantics
For mission-critical applications requiring strong guarantees:
java
1// Producer configuration
2props.put("enable.idempotence", true);
3props.put("transactional.id", "my-transactional-producer-1");
4
5producer.initTransactions();
6
7try {
8 producer.beginTransaction();
9 producer.send(record1);
10 producer.send(record2);
11 producer.commitTransaction();
12} catch (Exception e) {
13 producer.abortTransaction();
14}
See scripts/transactional_producer.java for complete implementation.
Production Deployment
Topic Configuration
Critical settings for production topics:
bash
1# Create topic with production settings
2kafka-topics.sh --bootstrap-server localhost:9092 --create \
3 --topic production-events \
4 --partitions 12 \
5 --replication-factor 3 \
6 --config min.insync.replicas=2 \
7 --config retention.ms=604800000 \ # 7 days
8 --config compression.type=lz4
Key parameters:
partitions - Parallelism (more = higher throughput)
replication-factor - Data redundancy (typically 3)
min.insync.replicas - Minimum replicas for writes (typically 2)
retention.ms - How long to keep data
compression.type - Compression algorithm (lz4, snappy, gzip, zstd)
Producer Configuration for Production
java
1props.put("acks", "all"); // Wait for all in-sync replicas
2props.put("retries", Integer.MAX_VALUE); // Retry indefinitely
3props.put("max.in.flight.requests.per.connection", 5);
4props.put("enable.idempotence", true); // Prevent duplicates
5props.put("compression.type", "lz4"); // Compress messages
6props.put("batch.size", 32768); // Batch size in bytes
7props.put("linger.ms", 10); // Wait time for batching
8props.put("buffer.memory", 67108864); // 64MB buffer
Consumer Configuration for Production
java
1props.put("enable.auto.commit", false); // Manual commit control
2props.put("max.poll.records", 500); // Records per poll
3props.put("max.poll.interval.ms", 300000); // 5 minutes
4props.put("session.timeout.ms", 10000); // 10 seconds
5props.put("heartbeat.interval.ms", 3000); // 3 seconds
6props.put("fetch.min.bytes", 1024); // Minimum fetch size
7props.put("fetch.max.wait.ms", 500); // Max wait for fetch
Monitoring and Observability
See OPERATIONS_GUIDE.md for comprehensive monitoring setup.
Key metrics to monitor:
- Producer:
record-send-rate, request-latency-avg, record-error-rate
- Consumer:
records-consumed-rate, records-lag-max, commit-latency-avg
- Broker:
under-replicated-partitions, offline-partitions-count, request-queue-size
Security Configuration
For SSL/TLS and SASL authentication:
java
1// SSL configuration
2props.put("security.protocol", "SSL");
3props.put("ssl.truststore.location", "/path/to/truststore.jks");
4props.put("ssl.truststore.password", "password");
5props.put("ssl.keystore.location", "/path/to/keystore.jks");
6props.put("ssl.keystore.password", "password");
7
8// SASL/PLAIN configuration
9props.put("security.protocol", "SASL_SSL");
10props.put("sasl.mechanism", "PLAIN");
11props.put("sasl.jaas.config",
12 "org.apache.kafka.common.security.plain.PlainLoginModule required " +
13 "username=\"admin\" password=\"secret\";");
Schema Management
For data contracts and schema evolution, see:
Schema Registry integration (when using Confluent Schema Registry):
java
1// Avro producer
2props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
3props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
4props.put("schema.registry.url", "http://localhost:8081");
Architecture Decision Guide
Choosing Number of Partitions
Factors to consider:
- Target throughput (more partitions = higher throughput)
- Number of consumers in consumer group (max parallelism)
- File handles per broker (more partitions = more file descriptors)
Rule of thumb: Start with max(target_throughput / partition_throughput, num_consumers)
Consumer Group vs. Independent Consumers
Use Consumer Groups when:
- You need load balancing across multiple instances
- Each message should be processed once by the group
- You want automatic partition rebalancing
Use Independent Consumers when:
- Each consumer needs to process all messages
- You need broadcast semantics
- You want manual partition assignment
Kafka Streams vs. Custom Consumer
Use Kafka Streams when:
- You need stateful processing (aggregations, joins, windowing)
- You want built-in fault tolerance and state management
- Processing logic is complex with multiple transformations
Use Custom Consumer when:
- Simple message-by-message processing
- Integration with non-Kafka systems
- Fine-grained control over consumption logic
Error Handling Patterns
Producer Error Handling
java
1producer.send(record, (metadata, exception) -> {
2 if (exception != null) {
3 if (exception instanceof RetriableException) {
4 // Automatic retry by producer
5 logger.warn("Retriable error: {}", exception.getMessage());
6 } else {
7 // Non-retriable - handle explicitly
8 logger.error("Failed to send: {}", exception.getMessage());
9 // Dead letter queue, alert, etc.
10 }
11 } else {
12 logger.info("Sent to partition {} offset {}",
13 metadata.partition(), metadata.offset());
14 }
15});
Consumer Error Handling
java
1while (true) {
2 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
3
4 for (ConsumerRecord<String, String> record : records) {
5 try {
6 processRecord(record);
7 } catch (TransientException e) {
8 // Retry with backoff
9 retryWithBackoff(record);
10 } catch (PermanentException e) {
11 // Send to DLQ
12 sendToDeadLetterQueue(record, e);
13 }
14 }
15
16 consumer.commitSync(); // Commit after successful processing
17}
Multi-Language Support
Python
See scripts/python_producer.py and scripts/python_consumer.py for working examples using confluent-kafka-python.
Quick start:
python
1from confluent_kafka import Producer, Consumer
2
3# Producer
4producer = Producer({'bootstrap.servers': 'localhost:9092'})
5producer.produce('topic', key='key', value='value')
6producer.flush()
7
8# Consumer
9consumer = Consumer({
10 'bootstrap.servers': 'localhost:9092',
11 'group.id': 'my-group',
12 'auto.offset.reset': 'earliest'
13})
14consumer.subscribe(['topic'])
15
16while True:
17 msg = consumer.poll(1.0)
18 if msg and not msg.error():
19 print(f'Received: {msg.value().decode("utf-8")}')
Node.js
See scripts/nodejs_producer.js and scripts/nodejs_consumer.js for working examples using kafkajs.
Quick start:
javascript
1const { Kafka } = require('kafkajs');
2
3const kafka = new Kafka({ brokers: ['localhost:9092'] });
4
5// Producer
6const producer = kafka.producer();
7await producer.connect();
8await producer.send({
9 topic: 'topic',
10 messages: [{ key: 'key', value: 'value' }]
11});
12
13// Consumer
14const consumer = kafka.consumer({ groupId: 'my-group' });
15await consumer.connect();
16await consumer.subscribe({ topic: 'topic' });
17await consumer.run({
18 eachMessage: async ({ message }) => {
19 console.log(message.value.toString());
20 }
21});
Common Patterns and Anti-Patterns
✅ Best Practices
- Always set
acks=all for producers in production
- Use consumer groups for scalable consumption
- Disable auto-commit for at-least-once or exactly-once semantics
- Set appropriate
retention.ms based on replay requirements
- Monitor consumer lag to detect processing bottlenecks
- Use compression (
lz4 or snappy) to reduce network bandwidth
- Configure
min.insync.replicas=2 with replication-factor=3
❌ Anti-Patterns
- Creating too many topics - Use partitions for parallelism, not topics
- Small batch sizes - Reduces throughput; increase
batch.size and linger.ms
- Synchronous sends without batching - Kills performance
- Auto-commit without error handling - Can lead to message loss
- Not monitoring consumer lag - Silent degradation
- Single partition topics - No parallelism or load balancing
- Hardcoding broker addresses - Use DNS or service discovery
Troubleshooting
Producer Issues
Slow throughput:
- Increase
batch.size and linger.ms
- Enable compression
- Check network latency to brokers
- Increase
buffer.memory
Messages not arriving:
- Check producer error callbacks
- Verify topic exists and is accessible
- Check broker logs for errors
- Verify
acks configuration
Consumer Issues
High lag:
- Increase consumer instances (up to partition count)
- Optimize processing logic
- Increase
max.poll.records if processing is fast
- Check for rebalancing issues
Rebalancing storms:
- Increase
max.poll.interval.ms
- Reduce processing time per batch
- Decrease
max.poll.records
- Check session timeout settings
Reference Files
For detailed implementation guidance:
Script Directory
All scripts are tested and ready to use:
simple_producer.java - Basic producer with error handling
simple_consumer.java - Basic consumer with manual commit
transactional_producer.java - Exactly-once semantics
word_count_streams.java - Kafka Streams word count
source_connector.java - Custom Kafka Connect source connector
python_producer.py - Python producer example
python_consumer.py - Python consumer example
nodejs_producer.js - Node.js producer example
nodejs_consumer.js - Node.js consumer example