Kafka e Event-Driven Architecture
Event-Driven Architecture
Eventos vs Comandos vs Queries
Antes de qualquer coisa, é preciso distinguir três conceitos que frequentemente são confundidos. Essa distinção não é acadêmica — ela determina como você modela a comunicação entre serviços.
EVENTO COMANDO QUERY
────────────────────────────── ────────────────────────────── ──────────────────────────────
"Algo aconteceu" "Faça algo" "Me diga algo"
Passado (imutável) Imperativo (pode ser rejeitado) Interrogativo (sem side effects)
OrderPlaced PlaceOrder GetOrderById
Broadcast (0..N consumers) Endereçado (1 handler) Endereçado (1 handler)
Publisher não sabe quem consome Handler pode rejeitar Handler retorna dados
Sem expectativa de resposta Espera confirmação ou erro Espera resposta com dados
Exemplo concreto:
Comando: PlaceOrder { userId: "u1", items: [...], total: 250.00 }
Evento: OrderPlaced { orderId: "o1", userId: "u1", total: 250.00, at: "2025-01-15T10:30:00Z" }
Query: GetOrder { orderId: "o1" } → { orderId: "o1", status: "placed", ... }
Um evento é um fato imutável sobre algo que já aconteceu. Um comando é uma intenção que pode ser aceita ou rejeitada. Uma query é uma leitura sem side effects. Essa tríade é a base de CQRS e Event Sourcing.
Tipos de Event-Driven Architecture
Martin Fowler descreve três padrões distintos de uso de eventos. Cada um tem trade-offs diferentes.
1. EVENT NOTIFICATION
─────────────────────────────────────────────────────────────
Evento carrega MÍNIMO de dados — apenas o que mudou e um ID.
O consumer precisa chamar o producer de volta para obter detalhes.
OrderService ──► { event: "OrderPlaced", orderId: "o1" } ──► ShippingService
│
◄── GET /orders/o1 ────────────────────────────────┘
✓ Acoplamento mínimo no payload
✗ Acoplamento temporal: consumer depende do producer estar online
✗ Mais latência (ida + volta)
2. EVENT-CARRIED STATE TRANSFER
─────────────────────────────────────────────────────────────
Evento carrega TODOS os dados necessários. Consumer não precisa
chamar ninguém de volta — é autossuficiente.
OrderService ──► { event: "OrderPlaced", orderId: "o1",
userId: "u1", items: [...], total: 250.00,
shippingAddress: {...} } ──► ShippingService
✓ Consumer totalmente desacoplado (temporal e espacialmente)
✓ Menor latência (sem round-trip)
✗ Payload grande, possível duplicação de dados
✗ Consumer pode ficar com dados stale
3. EVENT SOURCING
─────────────────────────────────────────────────────────────
Eventos são a FONTE PRIMÁRIA de verdade. Estado é derivado
fazendo replay de todos os eventos.
[OrderCreated] → [ItemAdded] → [ItemAdded] → [OrderPaid] → [OrderShipped]
│ │ │ │ │
└───────────────┴──────────────┴──────────────┴──────────────┘
Estado atual = f(eventos)
✓ Auditoria completa, time-travel debugging, replay
✗ Complexidade significativa, event versioning, eventual consistency
Arquitetura Event-Driven vs Request-Response
REQUEST-RESPONSE (síncrono)
═══════════════════════════════════════════════════════════════
┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐
│ Client │────►│ Order │────►│ Payment │────►│ Shipping │
│ │◄────│ Service │◄────│ Service │◄────│ Service │
└──────────┘ └──────────┘ └──────────┘ └──────────┘
│
├────────────────────────────►┌──────────┐
│◄────────────────────────────│Inventory │
│ └──────────┘
• Latência = soma de todas as chamadas (serial)
• Uma falha propaga em cascata
• Forte acoplamento temporal
EVENT-DRIVEN (assíncrono)
═══════════════════════════════════════════════════════════════
┌──────────┐ ┌──────────┐
│ Client │────►│ Order │──publish──►┌─────────────────────┐
│ │◄────│ Service │ │ │
└──────────┘ └──────────┘ │ Event Backbone │
(202 Accepted) │ (Kafka / NATS) │
│ │
└──┬──────┬──────┬───┘
│ │ │
▼ ▼ ▼
┌────────┐ ┌────┐ ┌─────────┐
│Payment │ │Inv.│ │Shipping │
│Worker │ │Wkr │ │Worker │
└────────┘ └────┘ └─────────┘
• Latência do client = apenas o tempo do Order Service
• Cada worker escala independentemente
• Falha de um não afeta os outros
• Trade-off: eventual consistency, complexidade operacional
Vantagens: loose coupling, temporal decoupling, scalability horizontal independente por consumer, absorção natural de picos (backpressure via partitions/queues), auditoria nativa se eventos são persistidos.
Desvantagens: eventual consistency (read-your-own-writes é difícil), debugging distribuído é complexo (precisa de correlation IDs + distributed tracing), ordem de eventos pode ser desafiadora, infraestrutura de mensageria é mais um componente crítico para operar.
Kafka Internals
Arquitetura: Brokers, Topics, Partitions, Segments
Kafka não é uma message queue. É um distributed commit log — um log append-only, durável, particionado e replicado. Essa distinção é fundamental.
KAFKA CLUSTER — TOPOLOGIA
═══════════════════════════════════════════════════════════════════════
┌─────────────────────────────────────────────────────────────────┐
│ KAFKA CLUSTER │
│ │
│ ┌───────────────┐ ┌───────────────┐ ┌───────────────┐ │
│ │ Broker 0 │ │ Broker 1 │ │ Broker 2 │ │
│ │ │ │ │ │ │ │
│ │ orders-p0 (L)│ │ orders-p0 (F)│ │ orders-p0 (F)│ │
│ │ orders-p1 (F)│ │ orders-p1 (L)│ │ orders-p1 (F)│ │
│ │ orders-p2 (F)│ │ orders-p2 (F)│ │ orders-p2 (L)│ │
│ │ │ │ │ │ │ │
│ │ payments-p0(L)│ │ payments-p1(L)│ │ │ │
│ │ payments-p1(F)│ │ payments-p0(F)│ │ │ │
│ └───────────────┘ └───────────────┘ └───────────────┘ │
│ │
│ (L) = Leader (F) = Follower │
│ Cada partition tem exatamente 1 leader e N-1 followers │
│ Reads e writes vão APENAS para o leader (por padrão) │
└─────────────────────────────────────────────────────────────────┘
Producers ──► Leaders (writes)
Consumers ──► Leaders (reads) — ou followers com rack-aware fetch (KIP-392)
Broker: um processo Kafka rodando em uma máquina. Um cluster tem N brokers. Cada broker armazena um subconjunto das partitions.
Topic: categoria lógica de eventos (ex: orders, payments, user-events). É apenas um nome — os dados vivem nas partitions.
Partition: unidade fundamental de paralelismo. É um log append-only, ordenado, imutável. Cada mensagem dentro de uma partition tem um offset sequencial monotonicamente crescente.
Partitions e Offsets
TOPIC "orders" — 3 PARTITIONS
═══════════════════════════════════════════════════════════════
Partition 0: [0] [1] [2] [3] [4] [5] [6] [7] ──► writes (append-only)
│ │
oldest newest (high watermark)
Partition 1: [0] [1] [2] [3] [4] [5] ──► writes
▲
│
consumer offset (grupo "payment-service")
Partition 2: [0] [1] [2] [3] [4] [5] [6] [7] [8] [9] ──► writes
CONSUMER GROUPS
─────────────────────────────────────────────────────────
Group "payment-service":
Consumer A ←── Partition 0, Partition 1
Consumer B ←── Partition 2
Group "analytics-service":
Consumer C ←── Partition 0
Consumer D ←── Partition 1
Consumer E ←── Partition 2
• Cada partition é consumida por EXATAMENTE 1 consumer dentro de um grupo
• Consumers > Partitions = consumers ociosos (nunca escale além do nº de partitions)
• Grupos diferentes são independentes — cada um mantém seus próprios offsets
Cada consumer group rastreia seu progresso por partition via offsets armazenados no topic interno __consumer_offsets. Isso permite que cada grupo leia no seu próprio ritmo, e permite replay — basta resetar o offset para o início.
Segments: Como Dados São Armazenados em Disco
Uma partition não é um único arquivo. Ela é dividida em segments — arquivos físicos no disco.
PARTITION 0 NO DISCO
═══════════════════════════════════════════════════════════════
/var/kafka-logs/orders-0/
├── 00000000000000000000.log ← segment file (offsets 0-999)
├── 00000000000000000000.index ← sparse index (offset → posição no .log)
├── 00000000000000000000.timeindex ← timestamp index
├── 00000000000000001000.log ← segment file (offsets 1000-1999)
├── 00000000000000001000.index
├── 00000000000000001000.timeindex
└── 00000000000000002000.log ← ACTIVE segment (writes aqui)
• segment.bytes (default 1GB): tamanho máximo de cada segment
• O segment ativo recebe writes. Quando atinge o limite, fecha e um novo abre.
• Segments antigos são deletados por retention policy (time ou size)
• Index é sparse: aponta para cada N-ésimo offset (não para todos)
→ Para encontrar offset 1050: busca no index o entry mais próximo ≤ 1050,
depois faz sequential scan no .log a partir dali
Log Compaction
Diferente de retenção por tempo, log compaction retém apenas o último valor por key. Ideal para changelogs e snapshots de estado.
LOG COMPACTION
═══════════════════════════════════════════════════════════════
ANTES (log cru):
offset key value
0 user-1 { name: "Alice", email: "a@x.com" }
1 user-2 { name: "Bob", email: "b@x.com" }
2 user-1 { name: "Alice", email: "alice@new.com" } ← atualização
3 user-3 { name: "Charlie", email: "c@x.com" }
4 user-2 null ← tombstone (delete)
5 user-1 { name: "Alice Z.", email: "alice@new.com" } ← atualização
DEPOIS (compactado):
offset key value
3 user-3 { name: "Charlie", email: "c@x.com" }
4 user-2 null ← tombstone retido temporariamente
5 user-1 { name: "Alice Z.", email: "alice@new.com" }
• Apenas o ÚLTIMO valor por key sobrevive
• Tombstones (value=null) são retidos por delete.retention.ms, depois removidos
• Ideal para: __consumer_offsets, KTable changelogs, CDC snapshots
• cleanup.policy=compact (ou compact,delete para combinar ambos)
Replication: Leader, Followers e ISR
REPLICAÇÃO — PARTITION "orders-0" COM replication.factor=3
═══════════════════════════════════════════════════════════════
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Broker 0 │ │ Broker 1 │ │ Broker 2 │
│ │ │ │ │ │
│ orders-0 │ │ orders-0 │ │ orders-0 │
│ [LEADER] │────►│ [FOLLOWER] │ │ [FOLLOWER] │
│ │ │ (ISR) │ │ (ISR) │
│ offset: 50 │ │ offset: 50 │ │ offset: 48 │ ← lag!
└──────────────┘ └──────────────┘ └──────────────┘
▲ ▲ ▲
│ │ │
writes fetch from fetch from
+ reads leader leader
ISR (In-Sync Replicas) = { Broker 0, Broker 1 }
Broker 2 caiu do ISR (lag > replica.lag.time.max.ms = 30s)
min.insync.replicas=2:
Se ISR < 2, o leader RECUSA writes (NotEnoughReplicasException)
Garante que pelo menos 2 cópias existem antes de ACK
Se o Leader cai:
Novo leader eleito APENAS entre ISR members
unclean.leader.election.enable=false (default) → sem eleição de follower out-of-sync
unclean.leader.election.enable=true → aceita perda de dados para priorizar disponibilidade
A combinação recomendada para durabilidade em produção: replication.factor=3, min.insync.replicas=2, acks=all no producer. Isso garante que toda mensagem escrita é confirmada por pelo menos 2 réplicas antes do ACK.
KRaft: Sem ZooKeeper
A partir do Kafka 3.3+, o KRaft mode (Kafka Raft) substitui o ZooKeeper para metadata management. O controller é agora um quorum interno de brokers.
KRAFT MODE
═══════════════════════════════════════════════════════════════
ANTES (com ZooKeeper): DEPOIS (KRaft):
┌──────────┐ ┌──────────┐ ┌──────────────────────┐
│ ZooKeeper│◄───►│ Broker │ │ Broker (Controller) │
│ Ensemble │ │ │ │ Quorum via Raft │
│ (3 nodes)│ │ │ │ │
└──────────┘ └──────────┘ └──────────────────────┘
• Metadata agora armazenado em um topic interno: __cluster_metadata
• Controller quorum: 3 brokers votam (Raft consensus)
• Vantagem: menos componentes, startup mais rápido, sem split-brain do ZK
• ZooKeeper mode deprecated desde Kafka 3.5, removido no 4.0
Producer
Anatomia de uma Mensagem Kafka
KAFKA RECORD (ProducerRecord)
═══════════════════════════════════════════════════════════════
┌──────────────────────────────────────────────────────┐
│ Key (opcional) │ Value │ Headers │
│ bytes[] │ bytes[] │ key-value │
│ ex: "order-123" │ ex: JSON payload │ metadata │
├────────────────────┴─────────────────────┴───────────┤
│ Topic: "orders" │
│ Partition: determinado pela key (ou round-robin) │
│ Timestamp: create time ou log append time │
└───────────────────────────────────────────────────────┘
Key determina a partition:
hash(key) % numPartitions → partition number
Mesma key → sempre mesma partition → ordenação garantida por key
Key null → round-robin entre partitions
Partitioning Strategy
ESTRATÉGIAS DE PARTICIONAMENTO
═══════════════════════════════════════════════════════════════
KEY-BASED (default):
hash(key) % numPartitions
✓ Garante ordenação por key (ex: todos os eventos de um orderId na mesma partition)
✗ Hot partition se keys não são uniformemente distribuídas
ROUND-ROBIN (key = null):
Distribui uniformemente entre partitions
✓ Máximo throughput, distribuição uniforme
✗ Sem garantia de ordenação
CUSTOM PARTITIONER:
Implementa lógica customizada (ex: por região geográfica, por tenant)
✓ Controle total
✗ Complexidade extra, risco de skew
ACKs Configuration
acks=0 FIRE AND FORGET
Producer envia e não espera confirmação.
Máximo throughput. Perda de mensagens aceita.
Use case: métricas, logs não críticos.
acks=1 LEADER ACKNOWLEDGED
Producer espera ACK do leader.
Se leader cai ANTES de replicar → mensagem perdida.
Bom equilíbrio throughput/durabilidade para maioria dos casos.
acks=all ALL ISR ACKNOWLEDGED (acks=-1)
Producer espera ACK de TODOS os ISR.
Combinado com min.insync.replicas=2 → máxima durabilidade.
Mais latência, menos throughput.
Use case: transações financeiras, eventos críticos.
Idempotent Producer
Sem idempotência, retries podem causar mensagens duplicadas. O idempotent producer resolve isso com producer ID e sequence numbers.
IDEMPOTENT PRODUCER
═══════════════════════════════════════════════════════════════
SEM idempotência:
Producer ──► msg(seq=1) ──► Broker (write OK)
Producer ──► msg(seq=1) ──► Broker (timeout, mas write OK)
Producer ──► msg(seq=1) ──► Broker (retry → DUPLICATA!)
COM idempotência (enable.idempotence=true):
Producer(PID=42) ──► msg(seq=1) ──► Broker (write OK, registra PID+seq)
Producer(PID=42) ──► msg(seq=1) ──► Broker (retry → detecta duplicata → ignora, retorna ACK)
O Broker mantém: { PID: 42, partition: 0, lastSeq: 1 }
Se recebe seq ≤ lastSeq → duplicata → ignora
Se recebe seq > lastSeq + 1 → out of order → rejeita (OutOfOrderSequenceException)
Producer: Código TypeScript com KafkaJS
// producer.ts — Kafka producer com KafkaJS
import { Kafka, CompressionTypes, logLevel } from 'kafkajs';
const kafka = new Kafka({
clientId: 'order-service',
brokers: ['kafka-1:9092', 'kafka-2:9092', 'kafka-3:9092'],
logLevel: logLevel.WARN,
retry: {
initialRetryTime: 100, // ms antes do primeiro retry
retries: 8, // número máximo de retries
maxRetryTime: 30000, // máximo backoff entre retries
factor: 2, // multiplicador exponencial
},
});
const producer = kafka.producer({
allowAutoTopicCreation: false, // nunca criar topics automaticamente em produção
idempotent: true, // enable.idempotence — elimina duplicatas
maxInFlightRequests: 5, // com idempotência, até 5 é seguro (ordering mantido)
transactionalId: 'order-tx-producer', // necessário para transações
});
interface OrderEvent {
orderId: string;
userId: string;
items: Array<{ productId: string; quantity: number; price: number }>;
total: number;
occurredAt: string;
}
async function publishOrderPlaced(event: OrderEvent): Promise<void> {
await producer.connect();
// Mensagem com key = orderId → garante que todos os eventos do mesmo
// pedido vão para a mesma partition → ordenação por pedido
const result = await producer.send({
topic: 'orders',
compression: CompressionTypes.LZ4, // LZ4: melhor trade-off velocidade/compressão
acks: -1, // acks=all
timeout: 30000, // timeout em ms
messages: [
{
key: event.orderId, // partition key
value: JSON.stringify(event),
headers: {
'event-type': 'OrderPlaced',
'schema-version': '2',
'correlation-id': crypto.randomUUID(),
'source-service': 'order-service',
},
timestamp: Date.now().toString(),
},
],
});
// result: [{ topicName, partition, errorCode, baseOffset, logAppendTime, logStartOffset }]
console.log(`Published to partition ${result[0].partition}, offset ${result[0].baseOffset}`);
}
// ─── TRANSACTIONAL PRODUCER ─────────────────────────────────────────
// Escreve em múltiplos topics atomicamente (tudo ou nada)
async function placeOrderTransaction(
orderEvent: OrderEvent,
inventoryReservation: { productId: string; quantity: number }
): Promise<void> {
const transaction = await producer.transaction();
try {
await transaction.send({
topic: 'orders',
messages: [{ key: orderEvent.orderId, value: JSON.stringify(orderEvent) }],
});
await transaction.send({
topic: 'inventory-reservations',
messages: [{
key: inventoryReservation.productId,
value: JSON.stringify(inventoryReservation),
}],
});
// Commit: ambas as mensagens ficam visíveis atomicamente
await transaction.commit();
} catch (error) {
// Abort: nenhuma mensagem fica visível
await transaction.abort();
throw error;
}
}
Consumer
Consumer Groups e Rebalancing
Quando um consumer entra ou sai de um grupo, ou quando partitions são adicionadas, ocorre um rebalance — redistribuição de partitions entre consumers.
REBALANCING PROTOCOLS
═══════════════════════════════════════════════════════════════
EAGER REBALANCE (legado):
1. TODOS os consumers param de consumir
2. TODOS revogam TODAS as partitions
3. Coordinator redistribui
4. Todos recomeçam
✗ Stop-the-world: downtime total durante rebalance
✗ Pode levar segundos em clusters grandes
COOPERATIVE-STICKY REBALANCE (padrão moderno):
1. Coordinator calcula o diff (quais partitions precisam mover)
2. APENAS as partitions afetadas são revogadas
3. Consumers que mantêm suas partitions NÃO param
✓ Incremental: maioria dos consumers não é afetada
✓ Muito mais rápido e menos disruptivo
partition.assignment.strategy=CooperativeStickyAssignor
Offset Management
AUTO-COMMIT (enable.auto.commit=true, auto.commit.interval.ms=5000)
Consumer commita offsets automaticamente a cada 5s.
Se crashar entre auto-commits → reprocessa mensagens → AT-LEAST-ONCE.
Se processar e commitar antes de completar → perde mensagens → AT-MOST-ONCE.
⚠ Não recomendado para processamento crítico.
MANUAL COMMIT — DUAS VARIANTES:
commitSync(): Bloqueia até o broker confirmar o commit.
Mais lento, mas garante que offset está salvo.
commitAsync(): Non-blocking. Callback para sucesso/falha.
Mais rápido, mas retry em caso de falha é perigoso
(pode commitar offset antigo DEPOIS de um mais recente).
Padrão recomendado: commitAsync() no loop normal, commitSync() no shutdown.
Delivery Semantics
AT-MOST-ONCE:
Commit offset ANTES de processar.
Se crashar durante processamento → mensagem perdida.
Use case: dados descartáveis.
AT-LEAST-ONCE:
Processa ANTES de commitar offset.
Se crashar depois de processar mas antes de commitar → reprocessa.
Requer IDEMPOTÊNCIA no consumer.
Use case: maioria dos sistemas (com idempotent handler).
EXACTLY-ONCE (end-to-end):
Requer TODOS os três:
1. Idempotent producer (enable.idempotence=true)
2. Transactional API (begin/commit/abort)
3. Consumer com isolation.level=read_committed
O consumer só vê mensagens de transações COMMITTED.
Mensagens de transações abortadas são invisíveis.
⚠ Exactly-once só é nativo dentro do ecossistema Kafka (Kafka → Kafka).
Para Kafka → sistema externo, você PRECISA de idempotência no consumer.
Consumer: Código TypeScript com KafkaJS
// consumer.ts — Kafka consumer com manual commit e error handling
import { Kafka, EachMessagePayload, logLevel } from 'kafkajs';
const kafka = new Kafka({
clientId: 'payment-service',
brokers: ['kafka-1:9092', 'kafka-2:9092', 'kafka-3:9092'],
logLevel: logLevel.WARN,
});
const consumer = kafka.consumer({
groupId: 'payment-service-group',
sessionTimeout: 30000, // tempo sem heartbeat antes de rebalance
heartbeatInterval: 3000, // intervalo de heartbeat (≤ sessionTimeout / 3)
maxWaitTimeInMs: 500, // long-polling: espera até 500ms por novas mensagens
maxBytesPerPartition: 1048576, // 1MB max por partition por fetch
retry: { retries: 5 },
});
// ─── PROCESSAMENTO COM IDEMPOTÊNCIA ─────────────────────────────────
const processedEvents = new Set<string>(); // Em produção: Redis ou tabela no banco
async function handleOrderPlaced(payload: EachMessagePayload): Promise<void> {
const { topic, partition, message, heartbeat } = payload;
const eventId = message.headers?.['correlation-id']?.toString();
if (!eventId) throw new Error('Missing correlation-id header');
// Idempotência: verificar se já processou este evento
if (processedEvents.has(eventId)) {
console.log(`Duplicate event ${eventId}, skipping`);
return;
}
const event = JSON.parse(message.value!.toString());
// Processar pagamento
await processPayment(event);
// Marcar como processado (em produção: dentro da mesma transação do banco)
processedEvents.add(eventId);
console.log(
`Processed: topic=${topic}, partition=${partition}, ` +
`offset=${message.offset}, key=${message.key?.toString()}`
);
// Heartbeat manual em processamentos longos (evita session timeout)
await heartbeat();
}
// ─── DEAD LETTER QUEUE (DLQ) ────────────────────────────────────────
const dlqProducer = kafka.producer();
async function sendToDLQ(
originalMessage: EachMessagePayload,
error: Error
): Promise<void> {
await dlqProducer.send({
topic: 'orders.dlq',
messages: [{
key: originalMessage.message.key,
value: originalMessage.message.value,
headers: {
...originalMessage.message.headers,
'dlq-reason': error.message,
'dlq-original-topic': originalMessage.topic,
'dlq-original-partition': originalMessage.partition.toString(),
'dlq-original-offset': originalMessage.message.offset,
'dlq-timestamp': Date.now().toString(),
},
}],
});
}
// ─── RETRY COM BACKOFF + DLQ ────────────────────────────────────────
async function processWithRetry(
payload: EachMessagePayload,
maxRetries = 3
): Promise<void> {
for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
await handleOrderPlaced(payload);
return; // sucesso
} catch (error) {
if (attempt === maxRetries) {
console.error(`Failed after ${maxRetries} attempts, sending to DLQ`);
await sendToDLQ(payload, error as Error);
return; // não relança — mensagem vai para DLQ, consumer avança
}
// Backoff exponencial: 1s, 2s, 4s
const delay = Math.pow(2, attempt - 1) * 1000;
await new Promise((resolve) => setTimeout(resolve, delay));
}
}
}
// ─── INICIALIZAÇÃO ──────────────────────────────────────────────────
async function startConsumer(): Promise<void> {
await consumer.connect();
await dlqProducer.connect();
await consumer.subscribe({
topics: ['orders'],
fromBeginning: false, // começa do último offset commitado (ou latest se novo grupo)
});
await consumer.run({
autoCommit: false, // manual commit!
eachMessage: async (payload) => {
await processWithRetry(payload);
// Commit APÓS processamento bem-sucedido (ou envio para DLQ)
await consumer.commitOffsets([{
topic: payload.topic,
partition: payload.partition,
offset: (BigInt(payload.message.offset) + 1n).toString(),
}]);
},
});
}
// Graceful shutdown
async function shutdown(): Promise<void> {
await consumer.disconnect();
await dlqProducer.disconnect();
}
process.on('SIGTERM', shutdown);
process.on('SIGINT', shutdown);
Kafka Streams e Stream Processing
KTable vs KStream
Kafka Streams opera sobre dois conceitos fundamentais: KStream (fluxo de eventos) e KTable (tabela materializada, changelog).
KStream — FLUXO DE EVENTOS (append-only)
═══════════════════════════════════════════════════════════════
Cada record é um evento independente.
Inserir "Alice comprou X" e depois "Alice comprou Y" = 2 eventos.
key=alice, value=comprou-X (t1)
key=alice, value=comprou-Y (t2)
→ KStream vê: [comprou-X, comprou-Y] — ambos existem
KTable — TABELA MATERIALIZADA (changelog)
═══════════════════════════════════════════════════════════════
Cada record é um upsert. Última value por key ganha.
Inserir "Alice saldo=100" e depois "Alice saldo=200" = saldo é 200.
key=alice, value=100 (t1)
key=alice, value=200 (t2)
→ KTable vê: { alice: 200 } — apenas o último valor
Internamente backed por log compaction.
Windowing
WINDOWING EM KAFKA STREAMS
═══════════════════════════════════════════════════════════════
TUMBLING WINDOW (janelas fixas, sem sobreposição)
────────────────────────────────────────────
| Window 1 | Window 2 | Window 3 |
| [e1,e2,e3] | [e4,e5] | [e6,e7,e8] |
0s 10s 20s 30s
HOPPING WINDOW (janelas fixas, COM sobreposição)
────────────────────────────────────────────
| Window 1 (0-10s) |
| Window 2 (5-15s) |
| Window 3 (10-20s) |
size=10s, advance=5s → sobreposição de 5s
SLIDING WINDOW (janela que desliza com cada evento)
────────────────────────────────────────────
Diferença máxima entre timestamps de dois eventos na mesma janela.
Sem alinhamento com o relógio — definido pela presença de eventos.
SESSION WINDOW (gap-based)
────────────────────────────────────────────
|e1 e2 e3| gap > 5min |e4 e5| gap > 5min |e6|
|session-1| |session-2| |session-3|
Sessão encerra quando não há eventos por inactivityGap.
Joins e State Stores
Joins: KStream-KStream (windowed, ambos lados são eventos), KStream-KTable (enriquecimento — join do stream com tabela de referência), KTable-KTable (join materializado, ambos lados são changelogs).
State stores: Kafka Streams mantém estado local usando RocksDB. Para fault tolerance, o state store é backed por um changelog topic — se o processo cai, o novo consumer reconstrói o estado a partir do changelog.
Alternativas: Apache Flink (processamento stateful distribuído de grande escala, exatamente-once nativo, SQL support), Spark Streaming (micro-batching, integração com ecossistema Spark). Flink é o padrão da indústria para stream processing complexo; Kafka Streams é ideal quando o processamento é acoplado a uma aplicação Kafka-centric.
Schema Registry
Por Que Schemas Importam
Sem schema enforcement, qualquer producer pode publicar qualquer JSON. O consumer descobre que o schema mudou apenas quando quebra em produção. Schema Registry resolve isso com contratos formais.
SEM SCHEMA REGISTRY:
Producer v1: { "orderId": "o1", "total": 100 }
Producer v2: { "order_id": "o1", "amount": 100 } ← campo renomeado!
Consumer: JSON.parse(msg).orderId → undefined 💥
COM SCHEMA REGISTRY:
Producer v1: registra schema v1 no Registry
Producer v2: tenta registrar schema v2 → REJEITADO (breaking change!)
Consumer: confia que o schema é compatível → sem surpresas
Avro e Compatibility Modes
COMPATIBILITY MODES
═══════════════════════════════════════════════════════════════
BACKWARD (default):
Consumer novo pode ler dados escritos pelo producer antigo.
Pode: adicionar campos com default, remover campos.
Não pode: adicionar campos sem default, mudar tipo.
FORWARD:
Consumer antigo pode ler dados escritos pelo producer novo.
Pode: remover campos, adicionar campos com default.
Oposto do BACKWARD.
FULL:
BACKWARD + FORWARD. Ambas as direções.
Mais restritivo. Mais seguro.
NONE:
Sem verificação. Qualquer mudança aceita.
⚠ Apenas para desenvolvimento.
AVRO SCHEMA EXAMPLE:
{
"type": "record",
"name": "OrderPlaced",
"namespace": "com.brewnary.events",
"fields": [
{ "name": "orderId", "type": "string" },
{ "name": "userId", "type": "string" },
{ "name": "total", "type": "double" },
{ "name": "currency", "type": "string", "default": "BRL" }, ← campo com default (backward-compatible)
{ "name": "occurredAt", "type": "long", "logicalType": "timestamp-millis" }
]
}
Alternativa: Protobuf com Schema Registry (suporte nativo no Confluent Schema Registry). Protobuf é mais eficiente em tamanho e mais common em ecossistemas gRPC. Avro é mais natural em ecossistemas Kafka/Hadoop.
Event Sourcing
Conceito: Estado = Replay de Todos os Eventos
Em vez de armazenar o estado atual de uma entidade (linha no banco), Event Sourcing armazena a sequência completa de eventos que levaram ao estado atual. O estado é derivado fazendo replay (fold) dos eventos.
TRADITIONAL CRUD EVENT SOURCING
═══════════════════════════════ ═══════════════════════════════
orders table: order_events stream:
┌────────┬────────┬────────┐ ┌───┬──────────────┬──────────────┐
│order_id│ status │ total │ │seq│ event_type │ data │
├────────┼────────┼────────┤ ├───┼──────────────┼──────────────┤
│ o1 │shipped │ 250.00 │ │ 1 │OrderCreated │{total:200} │
└────────┴────────┴────────┘ │ 2 │ItemAdded │{+50.00} │
│ 3 │OrderPaid │{method:pix} │
Você sabe o estado atual, │ 4 │OrderShipped │{tracking:X} │
mas NÃO sabe como chegou lá. └───┴──────────────┴──────────────┘
Estado atual = fold(eventos)
Você sabe EXATAMENTE como chegou lá.
Pode reconstruir estado em qualquer ponto no tempo.
Event Store, Snapshots e Projections
Event Store: banco append-only onde cada stream (aggregate) tem sua sequência de eventos. Em produção: EventStoreDB, ou uma tabela Postgres com constraints de append-only.
Snapshots: quando um aggregate tem milhares de eventos, replay fica lento. Um snapshot salva o estado materializado periodicamente. O replay começa do snapshot mais recente.
SNAPSHOTS
═══════════════════════════════════════════════════════════════
Sem snapshot: replay de 10.000 eventos para reconstruir estado → lento
Com snapshot: snapshot no evento 9.900 + replay dos últimos 100 → rápido
[e1] [e2] ... [e9900] [SNAPSHOT] [e9901] ... [e10000]
▲
│
Estado materializado até aqui.
Replay começa daqui.
Projections: materializam read models a partir do event stream. São consumers que leem eventos e escrevem em tabelas denormalizadas, otimizadas para queries específicas.
Implementação Prática: Event Sourcing em TypeScript
// event-sourcing.ts — Implementação simplificada de Event Sourcing
// ─── DOMAIN EVENTS ──────────────────────────────────────────────────
type OrderEvent =
| { type: 'OrderCreated'; orderId: string; userId: string; createdAt: string }
| { type: 'ItemAdded'; orderId: string; productId: string; quantity: number; price: number }
| { type: 'ItemRemoved'; orderId: string; productId: string }
| { type: 'OrderPaid'; orderId: string; method: string; paidAt: string }
| { type: 'OrderShipped'; orderId: string; trackingCode: string; shippedAt: string }
| { type: 'OrderCancelled'; orderId: string; reason: string; cancelledAt: string };
// ─── AGGREGATE STATE ────────────────────────────────────────────────
interface OrderState {
orderId: string;
userId: string;
status: 'created' | 'paid' | 'shipped' | 'cancelled';
items: Map<string, { quantity: number; price: number }>;
total: number;
version: number;
}
function initialState(): OrderState {
return {
orderId: '',
userId: '',
status: 'created',
items: new Map(),
total: 0,
version: 0,
};
}
// ─── REDUCER (evolve) ───────────────────────────────────────────────
// Função pura: dado um estado e um evento, retorna o novo estado.
// Sem side effects. Sem I/O. Testável trivialmente.
function evolve(state: OrderState, event: OrderEvent): OrderState {
switch (event.type) {
case 'OrderCreated':
return { ...state, orderId: event.orderId, userId: event.userId, status: 'created', version: state.version + 1 };
case 'ItemAdded': {
const items = new Map(state.items);
items.set(event.productId, { quantity: event.quantity, price: event.price });
const total = [...items.values()].reduce((sum, item) => sum + item.quantity * item.price, 0);
return { ...state, items, total, version: state.version + 1 };
}
case 'ItemRemoved': {
const items = new Map(state.items);
items.delete(event.productId);
const total = [...items.values()].reduce((sum, item) => sum + item.quantity * item.price, 0);
return { ...state, items, total, version: state.version + 1 };
}
case 'OrderPaid':
return { ...state, status: 'paid', version: state.version + 1 };
case 'OrderShipped':
return { ...state, status: 'shipped', version: state.version + 1 };
case 'OrderCancelled':
return { ...state, status: 'cancelled', version: state.version + 1 };
}
}
// ─── REBUILD STATE FROM EVENTS ──────────────────────────────────────
function rebuildState(events: OrderEvent[]): OrderState {
return events.reduce(evolve, initialState());
}
// ─── COMMAND HANDLERS (com validação) ───────────────────────────────
// Commands validam regras de negócio ANTES de emitir eventos.
function addItem(
state: OrderState,
productId: string,
quantity: number,
price: number
): OrderEvent {
if (state.status !== 'created') {
throw new Error(`Cannot add items to order in status: ${state.status}`);
}
if (quantity <= 0 || price <= 0) {
throw new Error('Quantity and price must be positive');
}
return {
type: 'ItemAdded',
orderId: state.orderId,
productId,
quantity,
price,
};
}
function payOrder(state: OrderState, method: string): OrderEvent {
if (state.status !== 'created') {
throw new Error(`Cannot pay order in status: ${state.status}`);
}
if (state.items.size === 0) {
throw new Error('Cannot pay for an empty order');
}
return {
type: 'OrderPaid',
orderId: state.orderId,
method,
paidAt: new Date().toISOString(),
};
}
// ─── USAGE ──────────────────────────────────────────────────────────
const events: OrderEvent[] = [
{ type: 'OrderCreated', orderId: 'o1', userId: 'u1', createdAt: '2025-01-15T10:00:00Z' },
{ type: 'ItemAdded', orderId: 'o1', productId: 'p1', quantity: 2, price: 50.00 },
{ type: 'ItemAdded', orderId: 'o1', productId: 'p2', quantity: 1, price: 150.00 },
{ type: 'OrderPaid', orderId: 'o1', method: 'pix', paidAt: '2025-01-15T10:05:00Z' },
{ type: 'OrderShipped', orderId: 'o1', trackingCode: 'BR123456789', shippedAt: '2025-01-16T08:00:00Z' },
];
const currentState = rebuildState(events);
// → { orderId: 'o1', status: 'shipped', total: 250.00, items: Map(2), version: 5 }
// Time-travel: reconstruir estado em qualquer ponto
const stateAfterPayment = rebuildState(events.slice(0, 4));
// → { orderId: 'o1', status: 'paid', total: 250.00, version: 4 }
Quando Usar e Quando NAO Usar Event Sourcing
USE EVENT SOURCING QUANDO:
✓ Auditoria completa é requisito regulatório (financeiro, saúde)
✓ Precisa de time-travel debugging ou replay
✓ Domínio é naturalmente event-driven (pedidos, transações, workflows)
✓ Precisa projetar múltiplas read views a partir dos mesmos dados
✓ Equipe tem maturidade para lidar com eventual consistency
NÃO USE EVENT SOURCING QUANDO:
✗ CRUD simples resolve o problema (cadastro de usuários, configurações)
✗ Equipe não tem experiência com sistemas distribuídos
✗ Requisitos de consistência forte (read-your-writes obrigatório)
✗ Volume de eventos por aggregate é muito baixo (overhead não compensa)
✗ Não há necessidade real de auditoria ou replay
CQRS (Command Query Responsibility Segregation)
Separação de Write Model e Read Model
CQRS separa a aplicação em dois lados: o write side (recebe comandos, aplica domain logic, gera eventos) e o read side (materializa views otimizadas para queries, eventualmente consistentes).
CQRS + EVENT SOURCING — ARQUITETURA
═══════════════════════════════════════════════════════════════════════
WRITE SIDE READ SIDE
┌───────────────┐ ┌───────────────┐
PlaceOrder ─────►│ Command │ │ Query │◄── GetOrderById
AddItem ────────►│ Handler │ │ Handler │◄── ListOrdersByUser
PayOrder ───────►│ │ │ │◄── GetOrderStats
└──────┬────────┘ └───────┬───────┘
│ ▲
│ validate │ read
│ + emit events │
▼ │
┌───────────────┐ ┌───────────────┐
│ Event Store │────────────►│ Read DB │
│ (append-only)│ projections │ (Postgres, │
│ │ (consumers) │ Elasticsearch│
│ Kafka / │ │ Redis, etc.) │
│ EventStoreDB │ │ │
└───────────────┘ └───────────────┘
Write path: Command → Validate → Event(s) → Event Store → ACK
Read path: Query → Read DB → Response (denormalized, fast)
Eventual consistency: write events são projetados para o read DB
de forma assíncrona. Lag típico: milliseconds a segundos.
Write Side vs Read Side
Write side: contém toda a domain logic. Valida invariantes de negócio. Gera eventos. O model é otimizado para proteger regras (aggregates, entities, value objects). Pode ser complexo (DDD).
Read side: zero domain logic. Tabelas denormalizadas, otimizadas para queries específicas. Pode ter múltiplas projeções para diferentes use cases (ex: uma view para o admin, outra para analytics, outra para o dashboard do usuário). Cada projeção é um consumer group independente.
EXEMPLO: ORDER READ MODELS (múltiplas projeções)
═══════════════════════════════════════════════════════════════
Event Stream: [OrderCreated, ItemAdded, ItemAdded, OrderPaid, OrderShipped]
Projeção 1 — "order_details" (para API do cliente):
┌────────┬────────┬────────┬──────────┬───────────────┐
│order_id│user_id │ total │ status │ tracking_code │
├────────┼────────┼────────┼──────────┼───────────────┤
│ o1 │ u1 │ 250.00 │ shipped │ BR123456789 │
└────────┴────────┴────────┴──────────┴───────────────┘
Projeção 2 — "daily_revenue" (para dashboard analytics):
┌────────────┬───────────┬──────────┐
│ date │ total_rev │ num_orders│
├────────────┼───────────┼──────────┤
│ 2025-01-15 │ 12500.00 │ 47 │
└────────────┴───────────┴──────────┘
Projeção 3 — "user_order_history" (para Elasticsearch, full-text search):
Documento JSON indexado com items, status, datas — search-optimized.
Quando CQRS Vale a Pena
CQRS VALE A PENA QUANDO:
✓ Read e write patterns são MUITO diferentes
(ex: writes complexos com validação, reads simples mas de alto volume)
✓ Read side precisa de múltiplas representações (SQL, Elasticsearch, cache)
✓ Escala de leitura >> escala de escrita (escala cada lado independentemente)
✓ Combinado com Event Sourcing (projeções são o read model natural)
✓ Domain logic é complexa (DDD com aggregates grandes)
CQRS É OVERHEAD DESNECESSÁRIO QUANDO:
✗ CRUD simples (read e write models são idênticos)
✗ Poucos usuários, baixo volume (over-engineering)
✗ Equipe pequena sem experiência em sistemas distribuídos
✗ Requisito de strong consistency imediata (sem tolerance para eventual consistency)
✗ Um único banco relacional atende todas as queries eficientemente
Outbox Pattern
O Problema: Dual-Write
O problema mais traiçoeiro de sistemas event-driven: você precisa salvar no banco E publicar um evento. Se fizer os dois separadamente, pode ficar inconsistente.
DUAL-WRITE PROBLEM
═══════════════════════════════════════════════════════════════
Cenário 1 — Banco primeiro, depois Kafka:
1. BEGIN transaction
2. INSERT INTO orders (...) ✓ sucesso
3. COMMIT ✓ sucesso
4. kafka.publish("OrderPlaced", {...}) ✗ Kafka fora do ar!
→ Dado salvo no banco, MAS evento nunca publicado.
→ Consumers nunca ficam sabendo. Sistema inconsistente.
Cenário 2 — Kafka primeiro, depois banco:
1. kafka.publish("OrderPlaced", {...}) ✓ sucesso
2. BEGIN transaction
3. INSERT INTO orders (...) ✗ constraint violation!
→ Evento publicado, MAS dado não salvo.
→ Consumers processam evento fantasma.
Cenário 3 — Ambos dentro de "transação":
Kafka NÃO participa de transações distribuídas (XA/2PC).
Não existe transação atômica entre Postgres e Kafka.
Solução: Transactional Outbox
A solução é escrever o evento em uma tabela outbox dentro da mesma transação do banco. Um processo separado (CDC ou polling) lê a outbox e publica no Kafka.
OUTBOX PATTERN — FLUXO COMPLETO
═══════════════════════════════════════════════════════════════
┌─────────────────────────────────────────────────────────┐
│ APPLICATION │
│ │
│ 1. BEGIN TRANSACTION │
│ 2. INSERT INTO orders (...) VALUES (...) │
│ 3. INSERT INTO outbox (aggregate_type, aggregate_id, │
│ event_type, payload) │
│ 4. COMMIT │
│ │
│ ✓ Ambas as escritas na MESMA transação ACID │
└────────────────────────────┬────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ DATABASE │
│ │
│ orders table: outbox table: │
│ ┌──────┬──────┐ ┌────┬───────────┬──────────┬──────┐ │
│ │ id │ ... │ │ id │event_type │ payload │ sent │ │
│ ├──────┼──────┤ ├────┼───────────┼──────────┼──────┤ │
│ │ o1 │ ... │ │ 1 │OrderPlaced│ {...} │ false│ │
│ └──────┴──────┘ └────┴───────────┴──────────┴──────┘ │
└────────────────────────────┬────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ CDC (Debezium) ou Polling │
│ │
│ Debezium: lê WAL do Postgres (logical replication) │
│ Polling: SELECT * FROM outbox WHERE sent = false │
│ │
│ Publica evento no Kafka, marca como enviado. │
└────────────────────────────┬────────────────────────────┘
│
▼
┌─────────────┐
│ KAFKA │
│ topic: │
│ "orders" │
└─────────────┘
CDC com Debezium
Debezium é um conector CDC (Change Data Capture) que lê o WAL (Write-Ahead Log) do banco de dados e publica as mudanças como eventos no Kafka. Não precisa de polling — é push-based via logical replication.
DEBEZIUM — COMO FUNCIONA
═══════════════════════════════════════════════════════════════
Postgres (WAL) ──logical replication──► Debezium Connector ──► Kafka
1. App faz INSERT na tabela outbox (dentro da transação)
2. Postgres escreve no WAL (Write-Ahead Log)
3. Debezium lê o WAL via logical replication slot
4. Transforma a row change em um evento Kafka
5. Publica no topic configurado
Vantagens vs Polling:
• Sem queries repetitivas no banco (zero impacto em produção)
• Latência sub-segundo (lê WAL em near real-time)
• Não perde eventos (WAL é a fonte de verdade)
• Detecta DELETE, UPDATE, INSERT automaticamente
Implementação Prática do Outbox Pattern
// outbox-pattern.ts — Implementação com Prisma + Postgres
import { PrismaClient } from '@prisma/client';
const prisma = new PrismaClient();
interface OutboxEvent {
aggregateType: string;
aggregateId: string;
eventType: string;
payload: Record<string, unknown>;
}
// ─── WRITE SIDE: Salvar dado + outbox na mesma transação ────────────
async function placeOrder(
userId: string,
items: Array<{ productId: string; quantity: number; price: number }>
): Promise<string> {
const orderId = crypto.randomUUID();
const total = items.reduce((sum, item) => sum + item.quantity * item.price, 0);
// Transação ACID: ambas as operações são atômicas
await prisma.$transaction(async (tx) => {
// 1. Salva o pedido
await tx.order.create({
data: {
id: orderId,
userId,
status: 'CREATED',
total,
items: { create: items },
},
});
// 2. Salva o evento na outbox (mesma transação!)
await tx.outboxEvent.create({
data: {
id: crypto.randomUUID(),
aggregateType: 'Order',
aggregateId: orderId,
eventType: 'OrderPlaced',
payload: {
orderId,
userId,
items,
total,
occurredAt: new Date().toISOString(),
},
createdAt: new Date(),
},
});
});
return orderId;
}
// ─── POLLING PUBLISHER (alternativa simples ao Debezium) ────────────
// Roda como um background job (cron, setInterval, ou worker dedicado)
import { Kafka } from 'kafkajs';
const kafka = new Kafka({ clientId: 'outbox-publisher', brokers: ['kafka:9092'] });
const producer = kafka.producer({ idempotent: true });
async function publishOutboxEvents(): Promise<void> {
// Busca eventos não publicados, ordenados por criação
const events = await prisma.outboxEvent.findMany({
where: { publishedAt: null },
orderBy: { createdAt: 'asc' },
take: 100, // batch size
});
if (events.length === 0) return;
for (const event of events) {
try {
await producer.send({
topic: `${event.aggregateType.toLowerCase()}.events`,
messages: [{
key: event.aggregateId,
value: JSON.stringify(event.payload),
headers: {
'event-type': event.eventType,
'aggregate-type': event.aggregateType,
'event-id': event.id,
},
}],
});
// Marca como publicado
await prisma.outboxEvent.update({
where: { id: event.id },
data: { publishedAt: new Date() },
});
} catch (error) {
console.error(`Failed to publish event ${event.id}:`, error);
break; // para de publicar para manter ordem
}
}
}
// Executa a cada 1 segundo
setInterval(publishOutboxEvents, 1000);
// ─── SQL: Schema da tabela outbox ───────────────────────────────────
/*
CREATE TABLE outbox_events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
aggregate_type VARCHAR(100) NOT NULL,
aggregate_id VARCHAR(100) NOT NULL,
event_type VARCHAR(100) NOT NULL,
payload JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
published_at TIMESTAMPTZ NULL,
-- Index para o polling publisher
-- Partial index: só eventos não publicados
CREATE INDEX idx_outbox_unpublished ON outbox_events (created_at)
WHERE published_at IS NULL;
);
*/
Comparacao: Kafka vs RabbitMQ vs SQS vs NATS vs Pulsar
┌──────────────────┬────────────┬────────────┬────────────┬────────────┬────────────┐
│ │ KAFKA │ RABBITMQ │ AWS SQS │ NATS │ PULSAR │
├──────────────────┼────────────┼────────────┼────────────┼────────────┼────────────┤
│ Model │ Log │ Queue │ Queue │ Pub/Sub │ Log+Queue │
│ │ (commit │ (AMQP │ (managed │ (cloud- │ (unificado)│
│ │ log) │ broker) │ queue) │ native) │ │
├──────────────────┼────────────┼────────────┼────────────┼────────────┼────────────┤
│ Ordering │ Per- │ Per-queue │ Best- │ Nenhuma │ Per- │
│ │ partition │ (FIFO) │ effort │ (JetStream:│ partition │
│ │ │ │ (FIFO opt.)│ per-stream)│ │
├──────────────────┼────────────┼────────────┼────────────┼────────────┼────────────┤
│ Delivery │ At-least- │ At-least- │ At-least- │ At-most- │ At-least- │
│ Semantics │ once │ once │ once │ once │ once │
│ │ (exactly- │ (com ACK │ │ (JetStream:│ (exactly- │
│ │ once c/ │ manual) │ │ at-least- │ once c/ │
│ │ tx API) │ │ │ once) │ tx API) │
├──────────────────┼────────────┼────────────┼────────────┼────────────┼────────────┤
│ Throughput │ Milhões │ ~50K │ ~3K (SQS │ Milhões │ Milhões │
│ │ msg/s │ msg/s │ Standard) │ msg/s │ msg/s │
│ │ │ │ ~300 (FIFO)│ │ │
├──────────────────┼────────────┼────────────┼────────────┼────────────┼────────────┤
│ Latency (p99) │ ~5-15ms │ ~1-5ms │ ~20-50ms │ <1ms │ ~5-10ms │
├──────────────────┼────────────┼────────────┼────────────┼────────────┼────────────┤
│ Retention │ Configurá- │ Até │ 14 dias │ JetStream: │ Configurá- │
│ │ vel (dias, │ consumo │ max │ configurá- │ vel │
│ │ infinito) │ (TTL opt.) │ │ vel │ (tiered │
│ │ │ │ │ │ storage) │
├──────────────────┼────────────┼────────────┼────────────┼────────────┼────────────┤
│ Replay │ ✓ (reset │ ✗ │ ✗ │ ✓ (Jet- │ ✓ │
│ │ offset) │ │ │ Stream) │ │
├──────────────────┼────────────┼────────────┼────────────┼────────────┼────────────┤
│ Use Cases │ Event │ Task │ Serverless │ Microser- │ Multi- │
│ Ideais │ streaming, │ queues, │ decoupling,│ vices │ tenant, │
│ │ CDC, log │ RPC async, │ AWS-native │ comms, │ geo-repli- │
│ │ aggregation│ routing │ workloads │ IoT, edge │ cation, │
│ │ analytics │ complexo │ │ │ Kafka │
│ │ │ │ │ │ migration │
├──────────────────┼────────────┼────────────┼────────────┼────────────┼────────────┤
│ Operacional │ Alto │ Médio │ Zero │ Baixo │ Alto │
│ Complexity │ (ou use │ │ (managed) │ │ (ou use │
│ │ Confluent) │ │ │ │ StreamNat.)│
└──────────────────┴────────────┴────────────┴────────────┴────────────┴────────────┘
Regra prática: Kafka quando precisa de event streaming, replay, CDC e retenção. RabbitMQ quando precisa de roteamento flexível e task distribution. SQS quando já está na AWS e precisa de zero ops. NATS quando precisa de latência ultra-baixa e simplicidade. Pulsar quando precisa de multi-tenancy e tiered storage nativos.
Kafka em Producao
Partition Strategy
O número de partitions determina o paralelismo máximo de consumers. Escolher errado causa problemas difíceis de resolver depois (reparticionamento requer migração).
COMO ESCOLHER O NÚMERO DE PARTITIONS
═══════════════════════════════════════════════════════════════
Fórmula base:
numPartitions ≥ max(throughput_desejado / throughput_por_consumer,
throughput_desejado / throughput_por_producer)
Exemplo:
Target: 100K msg/s
Cada consumer processa: 10K msg/s
→ Mínimo 10 partitions (10 consumers paralelos)
Regras práticas:
• Comece com 6-12 partitions para topics de alto volume
• 3-6 para topics de volume moderado
• Mais partitions = mais file handles, mais memória, rebalance mais lento
• Nunca reduza partitions depois (só aumentar)
• Partition count excessivo: overhead de metadata, leader election mais lento
• max.message.bytes vs message.max.bytes: limites por partition e por broker
Monitoring
MÉTRICAS CRÍTICAS PARA MONITORAR
═══════════════════════════════════════════════════════════════
CONSUMER LAG (a mais importante):
Diferença entre o latest offset e o committed offset do consumer.
Lag crescente = consumer não está acompanhando produção.
Alerta: lag > threshold por > 5 minutos.
UNDER-REPLICATED PARTITIONS:
Partitions onde alguma réplica não está no ISR.
> 0 = risco de perda de dados se o leader cair.
Alerta: qualquer valor > 0.
ISR SHRINK/EXPAND RATE:
Frequência com que réplicas saem/entram do ISR.
Alta taxa = brokers instáveis, rede saturada, disco lento.
REQUEST LATENCY (produce/fetch):
Latência do broker para processar requests.
p99 > 100ms = investigar.
DISK USAGE:
Retention policies + throughput = consumo de disco previsível.
Alerta: > 80% de uso.
Ferramentas:
• Conduktor: UI comercial, excelente para dev/staging
• AKHQ (open-source): visualização de topics, consumers, schemas
• Kafka UI (Provectus): open-source, leve, boa UX
• Burrow (LinkedIn): monitoring especializado de consumer lag
• Prometheus + Grafana: JMX metrics via JMX Exporter
MirrorMaker 2: Cross-Datacenter Replication
MirrorMaker 2 (MM2) replica topics entre clusters Kafka em diferentes datacenters. Baseado em Kafka Connect, suporta active-active e active-passive.
MIRRORMAKER 2 — CROSS-DATACENTER
═══════════════════════════════════════════════════════════════
┌───────────────────┐ ┌───────────────────┐
│ Cluster DC-1 │ │ Cluster DC-2 │
│ (primary) │ │ (secondary) │
│ │ │ │
│ topic: orders │──MM2────►│ topic: dc1.orders│
│ topic: payments │──MM2────►│ topic: dc1.payments│
│ │ │ │
└───────────────────┘ └───────────────────┘
• Topics replicados com prefixo do cluster de origem
• Offset translation: mantém mapeamento de offsets entre clusters
• Heartbeats e checkpoints para monitorar replication lag
• Active-active: ambos clusters produzem e consomem, MM2 replica bidirecionalmente
Exercicios Praticos
Exercicio 1 — Producer com Garantias
Implemente um producer KafkaJS que publique eventos UserRegistered com as seguintes configurações: idempotent: true, acks: -1, compression LZ4, key = userId. O producer deve fazer batching de até 100 mensagens antes de enviar. Adicione headers com event-type, schema-version e correlation-id. Teste publicando 10.000 eventos e verifique que não há duplicatas no topic (use kafka-console-consumer com --from-beginning).
Exercicio 2 — Consumer Resiliente com DLQ
Implemente um consumer que processe eventos OrderPlaced com manual commit. O consumer deve: (1) verificar idempotência usando um Set ou Redis, (2) retry até 3 vezes com backoff exponencial em caso de falha, (3) enviar para um topic DLQ (orders.dlq) após esgotar retries. Simule falhas aleatórias (Math.random() < 0.3) no handler para validar o fluxo de retry + DLQ.
Exercicio 3 — Event Sourcing: Conta Bancária
Implemente um sistema de Event Sourcing para uma conta bancária com os eventos: AccountOpened, MoneyDeposited, MoneyWithdrawn, AccountClosed. Implemente: (1) a função evolve (reducer) que reconstrói o estado, (2) command handlers com validação (ex: saldo insuficiente para saque, conta fechada não aceita depósitos), (3) uma função de snapshot que salva o estado a cada 100 eventos. Escreva testes unitários para o reducer — ele é puro, portanto trivialmente testável.
Exercicio 4 — Outbox Pattern com Polling
Implemente o Outbox Pattern usando Prisma + Postgres: (1) crie a migration com a tabela outbox_events, (2) implemente a função de negócio que salva na tabela principal + outbox na mesma transação, (3) implemente o polling publisher que lê eventos não publicados e envia para Kafka. Garanta idempotência no publisher usando o event-id como header e verificando duplicatas no consumer.
Exercicio 5 — CQRS: Read Models
Usando os eventos do Exercicio 3 (conta bancária), implemente dois read models: (1) account_balances — tabela com saldo atual de cada conta (projeção simples), (2) monthly_statements — tabela com extrato mensal agregado (total de depositos, total de saques, saldo final por mes). Implemente os projectors como consumers Kafka que materializam essas views em Postgres.
Exercicio 6 — Monitoring de Consumer Lag
Configure um ambiente local com Docker Compose (Kafka + ZooKeeper ou KRaft) e Kafka UI. Crie um producer que publique 1.000 msg/s e um consumer que processe 500 msg/s (simule com setTimeout). Monitore o consumer lag crescendo no Kafka UI. Escale para 2 consumers e observe o lag diminuir. Experimente matar um consumer e observe o rebalancing.
Referencias
- “Designing Event-Driven Systems” — Ben Stopford (Confluent, gratuito). Melhor introdução a event-driven com Kafka. Cobre event sourcing, CQRS, e patterns de streaming.
- “Kafka: The Definitive Guide” — Neha Narkhede, Gwen Shapira, Todd Palino (O’Reilly). Referência completa sobre Kafka internals, operações e best practices.
- “Building Event-Driven Microservices” — Adam Bellemare (O’Reilly). Foco em microserviços event-driven com Kafka.
- Martin Fowler — Event Sourcing: https://martinfowler.com/eaaDev/EventSourcing.html
- Martin Fowler — CQRS: https://martinfowler.com/bliki/CQRS.html
- Confluent Documentation: https://docs.confluent.io/ — referência oficial, excelente cobertura de Kafka Streams, Schema Registry e Connect.
- KafkaJS Documentation: https://kafka.js.org/ — client Node.js/TypeScript para Kafka.
- Debezium Documentation: https://debezium.io/documentation/ — CDC com Kafka Connect.
- KIP-500 (KRaft): https://cwiki.apache.org/confluence/display/KAFKA/KIP-500 — remoção do ZooKeeper.