Message Queues e Eventos

Message Queues e Arquitetura Orientada a Eventos

O Problema: Acoplamento Síncrono e Cascading Failures

Em arquiteturas baseadas em chamadas síncronas (HTTP/gRPC), cada serviço depende da disponibilidade imediata dos serviços downstream. Quando um serviço falha ou fica lento, a degradação propaga-se em cascata — o chamador bloqueia threads, acumula conexões e eventualmente falha também.

ACOPLAMENTO SÍNCRONO — CASCADING FAILURE

  Client ──► API Gateway ──► Order Service ──► Payment Service ✖ (timeout 30s)

                                    ├──► Inventory Service ✖ (thread pool exausto)

                                    └──► Notification Service ✖ (connection refused)

  Resultado: uma falha no Payment Service derruba TODA a cadeia.
  Threads bloqueadas → pool exausto → 503 em cascata → sistema inteiro fora.

  Agravantes:
  • Retry storms: cada chamador retenta, multiplicando a carga no serviço já degradado
  • Thundering herd: quando o serviço volta, todos os clientes reconectam simultaneamente
  • Backpressure inexistente: sem fila, não há como absorver picos — é "processa ou morre"

A solução fundamental é desacoplar temporalmente os serviços. Em vez de “chamo e espero”, o modelo passa a ser “publico e esqueço” — o produtor envia uma mensagem para um intermediário (broker/queue), e o consumidor processa no seu próprio ritmo.

DESACOPLAMENTO VIA MESSAGE QUEUE

  Order Service ──► [  Message Broker  ] ──► Payment Worker
                          │                  (processa no próprio ritmo)
                          ├──────────────► Inventory Worker
                          │                  (escala independente)
                          └──────────────► Notification Worker
                                             (falha sem afetar o pedido)

  ✓ Order Service responde imediatamente (202 Accepted)
  ✓ Se Payment Worker cai, mensagens ficam na fila até ele voltar
  ✓ Pico de carga? Fila absorve. Workers escalam horizontalmente.
  ✓ Cada serviço falha e recupera de forma independente

Message Queue vs Event Stream: Dois Paradigmas Diferentes

Existe uma diferença arquitetural fundamental entre message queues (RabbitMQ, SQS) e event streams (Kafka, Kinesis). Não são sinônimos.

┌─────────────────────────┬─────────────────────────────────────┐
│     MESSAGE QUEUE       │         EVENT STREAM                │
│   (RabbitMQ, SQS)       │       (Kafka, Kinesis)              │
├─────────────────────────┼─────────────────────────────────────┤
│ Point-to-point           │ Pub/Sub com múltiplos consumers     │
│ Consumo destrutivo       │ Log imutável (append-only)          │
│ Mensagem entregue a 1    │ Cada consumer group lê tudo         │
│ consumer                 │                                     │
│ Sem replay               │ Replay por offset                   │
│ Roteamento flexível      │ Particionamento por key             │
│ (exchanges, bindings)    │                                     │
│ Mensagem deletada após   │ Retenção configurável               │
│ ACK                      │ (horas, dias, infinito)             │
│ Smart broker,            │ Dumb broker,                        │
│ dumb consumer            │ smart consumer                      │
│                          │                                     │
│ Ideal para: task queues, │ Ideal para: event sourcing,         │
│ work distribution, RPC   │ analytics, CDC, replay              │
└─────────────────────────┴─────────────────────────────────────┘

A escolha não é “um ou outro” — sistemas maduros usam ambos. Kafka para o event backbone, RabbitMQ/SQS para distribuição de tarefas.


RabbitMQ: O Protocolo AMQP e o Roteamento de Mensagens

RabbitMQ implementa o protocolo AMQP 0-9-1 (Advanced Message Queuing Protocol). A arquitetura central gira em torno de três conceitos: exchanges, queues e bindings.

ARQUITETURA AMQP NO RABBITMQ

  Producer ──► Exchange ──binding──► Queue ──► Consumer
                  │       (routing     │
                  │        key)        │
                  │                    └──► Consumer (competing consumers)

                  ├──binding──► Queue 2 ──► Consumer

                  └──binding──► Queue 3 ──► Consumer

  O Producer NUNCA publica diretamente na Queue.
  Ele publica no Exchange com uma routing key.
  O Exchange decide para quais Queues rotear baseado no tipo e nos bindings.

Tipos de Exchange

DIRECT EXCHANGE
  Routing exato: routing_key == binding_key

  Producer ─► Exchange (direct) ─┬─ binding_key="order.created" ─► Queue Orders
                                 └─ binding_key="order.paid"    ─► Queue Payments

  exchange.publish("order.created", msg) → vai APENAS para Queue Orders


TOPIC EXCHANGE
  Routing por padrão com wildcards (* = uma palavra, # = zero ou mais)

  Producer ─► Exchange (topic) ─┬─ "order.*"         ─► Queue A (order.created, order.paid...)
                                ├─ "order.created"    ─► Queue B (só order.created)
                                └─ "#"                ─► Queue C (TUDO — audit log)

  Mais flexível. Ideal para event routing granular.


FANOUT EXCHANGE
  Broadcast: ignora routing key, envia para TODAS as queues vinculadas

  Producer ─► Exchange (fanout) ─┬─► Queue Notifications
                                 ├─► Queue Analytics
                                 └─► Queue Audit

  Equivalente a pub/sub puro. Cada queue recebe uma cópia.


HEADERS EXCHANGE
  Roteamento por headers da mensagem (não por routing key)
  Suporta match "all" (AND) ou "any" (OR) nos headers
  Raramente usado na prática — topic exchange cobre a maioria dos casos.

Exemplo Prático: RabbitMQ com amqplib

// producer.js — publica evento de pedido criado
const amqp = require('amqplib');

async function publishOrderCreated(order) {
  const conn = await amqp.connect('amqp://localhost');
  const channel = await conn.createChannel();

  // Declara exchange do tipo topic (idempotente)
  await channel.assertExchange('orders', 'topic', { durable: true });

  const routingKey = `order.${order.status}`; // ex: "order.created"

  channel.publish('orders', routingKey, Buffer.from(JSON.stringify({
    orderId: order.id,
    userId: order.userId,
    items: order.items,
    total: order.total,
    timestamp: new Date().toISOString(),
  })), {
    persistent: true,        // Mensagem sobrevive a restart do broker (delivery mode 2)
    contentType: 'application/json',
    messageId: crypto.randomUUID(), // Idempotency key
    headers: {
      'x-source-service': 'order-service',
      'x-schema-version': '2',
    },
  });

  await channel.close();
  await conn.close();
}
// consumer.js — processa pedidos com manual ACK
const amqp = require('amqplib');

async function startConsumer() {
  const conn = await amqp.connect('amqp://localhost');
  const channel = await conn.createChannel();

  // Prefetch: consumer recebe no máximo 10 mensagens sem ACK por vez
  // Sem prefetch, o broker despeja TUDO no consumer → memória explode
  await channel.prefetch(10);

  await channel.assertExchange('orders', 'topic', { durable: true });

  // Queue com Dead Letter Exchange configurado
  const q = await channel.assertQueue('payment-processing', {
    durable: true,
    arguments: {
      'x-dead-letter-exchange': 'orders.dlx',    // Mensagens rejeitadas vão para cá
      'x-dead-letter-routing-key': 'order.failed',
      'x-message-ttl': 300000,                    // TTL: 5 minutos (em ms)
    },
  });

  await channel.bindQueue(q.queue, 'orders', 'order.created');

  channel.consume(q.queue, async (msg) => {
    if (!msg) return;

    try {
      const order = JSON.parse(msg.content.toString());
      await processPayment(order);

      // ACK manual: mensagem removida da queue SOMENTE após sucesso
      channel.ack(msg);
    } catch (error) {
      // NACK com requeue=false → mensagem vai para Dead Letter Queue
      // Se requeue=true, volta para a mesma fila (cuidado com loop infinito)
      channel.nack(msg, false, false);
    }
  }, {
    noAck: false, // CRÍTICO: desabilitar auto-ack para garantir processamento
  });
}

RabbitMQ Avançado: Prefetch, DLQ, TTL e Priority Queues

PREFETCH (QoS) — Controle de fluxo no consumer

  prefetch=1   → consumer recebe 1 msg por vez. Lento, mas distribui trabalho igualmente.
  prefetch=10  → consumer recebe até 10 msgs sem ACK. Bom para throughput.
  prefetch=0   → SEM LIMITE. Broker despeja tudo. ⚠ Perigoso em produção.

  Regra: prefetch ≈ tempo_processamento × throughput_desejado
  Se cada msg leva 100ms e você quer 50 msg/s → prefetch ≈ 5


ACKNOWLEDGEMENTS

  auto-ack (noAck: true)   → Mensagem removida ao ser ENTREGUE ao consumer.
                              Se o consumer crashar, mensagem perdida para sempre.
                              Use apenas para dados descartáveis (métricas, logs).

  manual-ack (noAck: false) → Mensagem removida apenas quando consumer envia ACK.
                               channel.ack(msg)  — processou com sucesso
                               channel.nack(msg) — falhou, pode requeue ou DLQ
                               channel.reject(msg) — rejeita uma mensagem específica


DEAD LETTER QUEUE (DLQ)

  Mensagens vão para a DLQ quando:
  1. Consumer faz NACK/REJECT sem requeue
  2. TTL da mensagem expira
  3. Queue atinge max-length

  Exchange original ──► Queue ──(falha)──► DLX ──► Dead Letter Queue

                                                       └─► Monitor/Alert/Retry manual

  DLQ NÃO é lixeira. É um mecanismo de observabilidade.
  Monitore a profundidade da DLQ — mensagens acumulando = bug no consumer.


PRIORITY QUEUES

  const q = await channel.assertQueue('tasks', {
    durable: true,
    arguments: { 'x-max-priority': 10 }, // Prioridade de 0 a 10
  });

  // Mensagem com prioridade alta
  channel.publish('', 'tasks', msg, { priority: 9 });

  ⚠ Priority queues usam mais memória e CPU.
  ⚠ Não usam se todos os consumers estão idle (sem fila acumulada, prioridade não importa).

Apache Kafka: Distributed Commit Log

Kafka não é uma fila — é um log distribuído e imutável. Essa distinção é crucial. Mensagens não são deletadas após consumo; elas ficam no log pelo tempo de retenção configurado. Consumidores controlam sua própria posição (offset).

ARQUITETURA DO KAFKA

  Producers ──► Kafka Cluster ──► Consumers

          ┌─────────┴──────────┐
          │     Topic: orders  │
          │                    │
          │  Partition 0: [0][1][2][3][4][5]──► Consumer A (grupo: payments)
          │  Partition 1: [0][1][2][3]────────► Consumer B (grupo: payments)
          │  Partition 2: [0][1][2][3][4]─────► Consumer C (grupo: payments)
          │                    │
          │                    │               Consumer D (grupo: analytics)
          │  (cada partition   │               ↑ lê TODAS as partitions
          │   é um log         │               (grupo diferente = leitura independente)
          │   append-only      │
          │   ordenado)        │
          └────────────────────┘

  Regras fundamentais:
  • 1 partition → 1 consumer por grupo (máximo de paralelismo = nº de partitions)
  • Ordenação garantida APENAS dentro de uma partition
  • Mensagens com a mesma key vão SEMPRE para a mesma partition
  • Consumer groups permitem pub/sub: cada grupo lê tudo independentemente

Kafka Internamente: Por Que É Tão Rápido

ESTRUTURA NO DISCO

  Topic "orders", Partition 0:
  /kafka-logs/orders-0/
    ├── 00000000000000000000.log    ← segmento ativo (append-only)
    ├── 00000000000000000000.index  ← índice offset → posição no arquivo
    ├── 00000000000000000000.timeindex ← índice timestamp → offset
    ├── 00000000000524288000.log    ← segmento antigo (roll a cada segment.bytes)
    └── ...

  Cada segmento é um arquivo sequencial no disco.
  Append-only: NUNCA modifica dados existentes → I/O sequencial (10x mais rápido que random I/O)


ZERO-COPY (sendfile syscall)

  Fluxo TRADICIONAL (4 cópias, 4 context switches):
  Disco → Kernel Buffer → User Buffer → Socket Buffer → NIC

  Fluxo com ZERO-COPY (2 cópias, 2 context switches):
  Disco → Kernel Buffer ───────────────────────────► NIC
                    (sendfile() — dados nunca vão para user space)

  Kafka usa sendfile() para transferir dados do disco direto para a rede.
  Combinado com page cache do OS, mensagens recentes são servidas da RAM sem I/O de disco.


LOG COMPACTION

  Modo padrão (delete): segmentos antigos são deletados após retention.ms
  Modo compaction: mantém APENAS a última mensagem por key

  Antes da compaction:     Depois da compaction:
  [K1:v1][K2:v1][K1:v2]   [K2:v1][K1:v2][K3:v1]
  [K3:v1][K1:v3][K2:v2]   [K1:v3][K2:v2]

  Ideal para: CDC (Change Data Capture), tabelas de estado, configurações.
  O topic vira efetivamente uma "tabela" — último estado de cada entidade.

Exemplo Prático: Kafka com kafkajs

// producer.js — producer idempotente com particionamento por chave
const { Kafka } = require('kafkajs');

const kafka = new Kafka({
  clientId: 'order-service',
  brokers: ['kafka-1:9092', 'kafka-2:9092', 'kafka-3:9092'],
});

const producer = kafka.producer({
  idempotent: true,         // Habilita idempotência (exactly-once no producer)
  maxInFlightRequests: 5,   // Máx 5 em paralelo (exigido pelo idempotent)
  transactionalId: 'order-producer-txn', // Para transações cross-partition
});

async function publishOrderEvent(order) {
  await producer.connect();

  // A key determina a partition → todas as mensagens do mesmo orderId
  // vão para a MESMA partition → ordenação garantida por pedido
  await producer.send({
    topic: 'orders',
    messages: [{
      key: order.id,                           // Partition key
      value: JSON.stringify({
        eventType: 'ORDER_CREATED',
        orderId: order.id,
        payload: order,
        timestamp: Date.now(),
      }),
      headers: {
        'event-type': 'ORDER_CREATED',
        'schema-version': '2',
        'correlation-id': order.correlationId,
      },
    }],
    acks: -1,       // acks=all → líder + todas as ISRs confirmam (máxima durabilidade)
    timeout: 30000,
  });
}
// consumer.js — consumer group com commit manual de offsets
const { Kafka } = require('kafkajs');

const kafka = new Kafka({
  clientId: 'payment-service',
  brokers: ['kafka-1:9092', 'kafka-2:9092', 'kafka-3:9092'],
});

const consumer = kafka.consumer({
  groupId: 'payment-processors',      // Consumer group
  sessionTimeout: 30000,              // Se não enviar heartbeat em 30s, rebalance
  heartbeatInterval: 3000,            // Heartbeat a cada 3s
  maxWaitTimeInMs: 500,               // Long polling: espera até 500ms por batch
  maxBytesPerPartition: 1048576,      // 1MB por partition por fetch
});

async function startConsumer() {
  await consumer.connect();
  await consumer.subscribe({ topic: 'orders', fromBeginning: false });

  await consumer.run({
    autoCommit: false, // Commit manual para at-least-once
    eachMessage: async ({ topic, partition, message, heartbeat }) => {
      const event = JSON.parse(message.value.toString());

      // Verificação de idempotência no consumer
      const alreadyProcessed = await redis.get(`processed:${message.headers['correlation-id']}`);
      if (alreadyProcessed) {
        // Mensagem já processada — commita offset e ignora
        await consumer.commitOffsets([{
          topic, partition, offset: (BigInt(message.offset) + 1n).toString(),
        }]);
        return;
      }

      try {
        await processPayment(event);

        // Marca como processado (com TTL para não crescer infinitamente)
        await redis.set(`processed:${message.headers['correlation-id']}`, '1', 'EX', 86400);

        // Commit offset APÓS processamento bem-sucedido
        await consumer.commitOffsets([{
          topic, partition, offset: (BigInt(message.offset) + 1n).toString(),
        }]);
      } catch (error) {
        // NÃO commita offset → mensagem será reprocessada (at-least-once)
        // Heartbeat para evitar rebalance durante processamento longo
        await heartbeat();
        throw error;
      }
    },
  });
}

Garantias de Entrega: O Trade-off Fundamental

Não existe almoço grátis. Cada garantia tem um custo.

AT-MOST-ONCE (no máximo uma vez)
  ┌──────────┐         ┌────────┐         ┌──────────┐
  │ Producer │──msg──► │ Broker │──msg──► │ Consumer │
  └──────────┘         └────────┘         └──────────┘
       │                                        │
       └─ fire and forget                       └─ auto-commit antes de processar
          (acks=0)                                 (autoCommit: true)

  • Mensagem pode ser PERDIDA (producer não confirma, consumer commita antes de processar)
  • ZERO duplicatas
  • Máximo throughput, mínima latência
  • Use para: métricas, logs, dados de telemetria onde perder 0,1% é aceitável


AT-LEAST-ONCE (no mínimo uma vez)
  ┌──────────┐         ┌────────┐         ┌──────────┐
  │ Producer │──msg──► │ Broker │──msg──► │ Consumer │
  └──────────┘  ack ◄──┘        │  ack    └────┬─────┘
       │                        │     ◄────────┘
       └─ retenta se            └─ commit APÓS processar
          não receber ack          (pode crashar após processar,
                                    antes de commitar → reprocessa)

  • Mensagem NUNCA perdida
  • PODE haver duplicatas (consumer processa, crasha antes do commit, reprocessa)
  • Consumer DEVE ser idempotente
  • Use para: a maioria dos casos (pagamentos, pedidos, notificações)


EXACTLY-ONCE (exatamente uma vez)
  ┌──────────────────┐     ┌────────┐     ┌──────────────────────┐
  │ Idempotent       │     │ Broker │     │ Transactional        │
  │ Producer         │────►│        │────►│ Consumer             │
  │ (PID + SeqNum)   │     │        │     │ (read-process-write  │
  └──────────────────┘     └────────┘     │  numa transação)     │
                                          └──────────────────────┘

  Kafka: idempotent producer (PID + sequence number por partition) +
         transactional consumer (consume-transform-produce atômico)

  • Sem perdas, sem duplicatas
  • Maior latência, menor throughput
  • Complexidade MUITO maior
  • Trade-off: exactly-once entre Kafka e sistema externo (banco de dados)
    NÃO existe nativamente — requer idempotent consumer pattern

Idempotent Consumer Pattern

// O consumer DEVE produzir o mesmo resultado independente de quantas vezes
// receba a mesma mensagem. Duas abordagens:

// 1. Deduplicação por message ID (simples)
async function processIdempotent(event) {
  const lockKey = `lock:${event.messageId}`;

  // SET NX (set if not exists) + EX (expiry) — mutex distribuído
  const acquired = await redis.set(lockKey, '1', 'NX', 'EX', 3600);
  if (!acquired) return; // Já processado ou em processamento

  await processEvent(event);
}

// 2. Operações naturalmente idempotentes (melhor)
// Em vez de: INSERT INTO transactions (amount) VALUES (100)  ← duplica
// Use:       INSERT INTO transactions (id, amount) VALUES ('txn-123', 100)
//            ON CONFLICT (id) DO NOTHING                     ← idempotente

// Em vez de: UPDATE balance SET amount = amount + 100        ← duplica
// Use:       UPDATE balance SET amount = 100                 ← idempotente
//            WHERE version = expected_version                (optimistic lock)

Ordering Guarantees

KAFKA — Ordenação por Partition

  Partition 0: [msg1] [msg2] [msg3]  ← ordenação GARANTIDA dentro da partition
  Partition 1: [msg4] [msg5]         ← ordenação GARANTIDA dentro da partition

  MAS: msg1 pode ser processada DEPOIS de msg4 (partitions diferentes)

  Solução: use a mesma partition key para mensagens que precisam de ordem
  Ex: userId como key → todas as ações de um usuário na mesma partition → ordem preservada


SQS FIFO — Ordenação por Message Group ID

  Message Group "user-123": [msg1] [msg2] [msg3]  ← FIFO estrita por grupo
  Message Group "user-456": [msg4] [msg5]          ← FIFO estrita por grupo

  Grupos diferentes processados em paralelo.
  Dentro do grupo: estritamente ordenado.

  ⚠ SQS FIFO: máx 300 msgs/s por grupo (3.000 com batching de alta throughput)


RABBITMQ — Sem Garantia Nativa de Ordenação

  RabbitMQ garante FIFO por queue com 1 consumer e prefetch=1.
  Com múltiplos consumers (competing consumers), ordenação NÃO é garantida.
  Para ordenar: use consistent hashing exchange ou Kafka.

AWS SQS/SNS: Managed Queuing na Nuvem

SQS STANDARD vs SQS FIFO

  ┌──────────────────────┬──────────────────────────┐
  │    SQS Standard      │       SQS FIFO           │
  ├──────────────────────┼──────────────────────────┤
  │ Throughput ilimitado  │ 300 msg/s (3.000 batch)  │
  │ At-least-once        │ Exactly-once processing  │
  │ Best-effort ordering │ FIFO estrita por grupo   │
  │ Pode duplicar msgs   │ Deduplicação nativa (5m) │
  │ Mais barato           │ ~2x o preço              │
  └──────────────────────┴──────────────────────────┘


VISIBILITY TIMEOUT

  Consumer A ──poll──► SQS ──► msg1 (invisível por 30s para outros consumers)

                          ┌──────┴───────┐
                          │ Processando  │ ← Se não deletar em 30s,
                          │   (≤ 30s)    │   msg1 fica visível de novo
                          └──────────────┘   → outro consumer pega → DUPLICATA

  Regra: visibility_timeout > tempo_máximo_de_processamento
  Se processamento demora 25s, use visibility_timeout = 60s (2x margem)
  Se ultrapassar, chame ChangeMessageVisibility para estender


LONG POLLING vs SHORT POLLING

  Short polling (padrão): SQS responde imediatamente, mesmo se vazio → custo alto
  Long polling (WaitTimeSeconds=20): SQS espera até 20s por mensagem → reduz custo em 90%+

  SEMPRE use long polling em produção.


FAN-OUT PATTERN: SNS + SQS

  Order Service ──► SNS Topic ──┬──► SQS Queue (Payment)   ──► Payment Worker
                                ├──► SQS Queue (Inventory) ──► Inventory Worker
                                ├──► SQS Queue (Email)     ──► Email Worker
                                └──► Lambda Function       ──► Analytics

  SNS faz fan-out (broadcast), cada SQS absorve independentemente.
  Se Email Worker cair, Payment e Inventory continuam funcionando.
  Cada fila tem sua própria DLQ, retry policy e scaling.

Event-Driven Architecture: Event Sourcing e CQRS

Event Sourcing

Em vez de armazenar o estado atual de uma entidade (UPDATE), armazene todos os eventos que levaram àquele estado (INSERT-only). O estado é derivado por replay dos eventos.

CRUD TRADICIONAL                    EVENT SOURCING

  orders table:                      order_events table:
  ┌──────┬────────┬────────┐        ┌─────┬──────────────────┬────────────┐
  │ id   │ status │ total  │        │ seq │ event_type       │ data       │
  ├──────┼────────┼────────┤        ├─────┼──────────────────┼────────────┤
  │ 123  │ paid   │ 150.00 │        │ 1   │ ORDER_CREATED    │ {total:150}│
  └──────┴────────┴────────┘        │ 2   │ ITEM_ADDED       │ {sku: ABC} │
                                    │ 3   │ PAYMENT_RECEIVED │ {amt: 150} │
  Problema: COMO chegou             │ 4   │ ORDER_CONFIRMED  │ {}         │
  no status "paid"?                 └─────┴──────────────────┴────────────┘
  Não sabemos. O UPDATE apagou      Estado atual = replay(eventos 1..4)
  o histórico.                       Auditoria completa. Pode reconstruir qualquer ponto no tempo.

CQRS (Command Query Responsibility Segregation)

                    ┌─────────────────┐
  Commands ────────►│  Write Model    │──eventos──► Event Store (Kafka/EventStoreDB)
  (create, update)  │  (normalizado,  │                   │
                    │   consistente)  │                   ▼
                    └─────────────────┘            ┌──────────────┐
                                                   │  Projeções   │
  Queries ─────────────────────────────────────────│  (Read Model) │
  (listas, relatórios,                             │  desnormaliz.  │
   dashboards)                                     │  otimizado p/  │
                                                   │  leitura)      │
                                                   └──────────────┘

  Write Model: otimizado para consistência (domínio, regras de negócio)
  Read Model: otimizado para queries (views materializadas, denormalizadas)
  Projeções consomem eventos e atualizam o read model assincronamente.

  Complexidade MUITO alta. Use apenas quando read e write têm requisitos
  radicalmente diferentes (ex: write simples, mas queries complexas sobre agregações).

Backpressure: Controlando o Fluxo

Backpressure é o mecanismo que impede um produtor rápido de sobrecarregar um consumidor lento.

SEM BACKPRESSURE                    COM BACKPRESSURE

  Producer (1000 msg/s)              Producer (1000 msg/s)
       │                                  │
       ▼                                  ▼
  [Queue: 0 → 1M → OOM → ✖]        [Queue: 0 → 10K → flow control]
       │                                  │
       ▼                                  ▼
  Consumer (100 msg/s)               Consumer (100 msg/s)
  → Queue cresce infinitamente       → Queue estabiliza
  → Broker fica sem memória          → Producer é desacelerado ou
  → Mensagens perdidas                 consumers escalam


MECANISMOS DE BACKPRESSURE POR TECNOLOGIA:

  RabbitMQ:
  • Memory alarm: quando memória atinge threshold (40% padrão), bloqueia publishers
  • Disk alarm: quando espaço em disco < threshold, bloqueia publishers
  • Prefetch: limita quantas msgs o consumer recebe sem ACK
  • Queue max-length: rejeita ou dead-letter msgs quando fila atinge limite

  Kafka:
  • Producer: buffer.memory (32MB padrão) — se cheio, .send() bloqueia ou lança exceção
  • Consumer: consumer lag monitoring — diferença entre último offset e offset do consumer
  • Broker: quotas por client ID (bytes/s de produce e fetch)

  SQS:
  • Sem backpressure nativo — queue escala "infinitamente" (por design)
  • Controle via autoscaling de consumers baseado em ApproximateNumberOfMessages

Consumer Lag Monitoring e Autoscaling

// Monitoramento de consumer lag no Kafka com kafkajs
async function monitorLag(admin, groupId, topic) {
  const offsets = await admin.fetchTopicOffsets(topic);
  const groupOffsets = await admin.fetchOffsets({ groupId, topics: [topic] });

  let totalLag = 0;
  for (const partition of offsets) {
    const groupOffset = groupOffsets.find(
      g => g.topic === topic
    )?.partitions.find(p => p.partition === partition.partition);

    const lag = BigInt(partition.offset) - BigInt(groupOffset?.offset || '0');
    totalLag += Number(lag);

    // Emite métrica para Prometheus/CloudWatch
    metrics.gauge('kafka_consumer_lag', Number(lag), {
      topic, partition: partition.partition, group: groupId,
    });
  }

  // Autoscaling: se lag > threshold, escala consumers
  if (totalLag > 100000) {
    await scaleConsumers(Math.min(Math.ceil(totalLag / 10000), MAX_CONSUMERS));
  }
}

Padrões Avançados

Saga Orchestration via Queues

SAGA PATTERN — Coordenação de transações distribuídas

  Saga Orchestrator

       ├──► [Queue: reserve-inventory] ──► Inventory Service
       │         ◄── inventory.reserved ──┘       │
       │                                          │ (se falhar)
       ├──► [Queue: process-payment] ──► Payment Service
       │         ◄── payment.processed ──┘        │
       │                                          │ (se falhar)
       ├──► [Queue: ship-order] ──► Shipping Service
       │         ◄── order.shipped ──┘

       └── Se qualquer passo falhar:
           Publica comandos de COMPENSAÇÃO na ordem inversa
           • cancel-shipment → refund-payment → release-inventory

  Cada serviço:
  1. Executa sua operação local
  2. Publica evento de sucesso/falha
  3. Orquestrador decide próximo passo ou compensação

  Alternativa: Saga Choreography (sem orquestrador central, cada serviço
  ouve eventos e decide autonomamente — mais desacoplado, mais difícil de debugar)

Transactional Outbox Pattern

PROBLEMA: Dual write — preciso salvar no banco E publicar na fila atomicamente.
Se o app crasha entre as duas operações, banco e fila ficam inconsistentes.

  ✖ ERRADO:
  await db.save(order);          // ← sucesso
  await queue.publish(event);    // ← app crasha aqui → banco tem, fila não


SOLUÇÃO: Transactional Outbox

  ┌─────────────────────────────────────────────┐
  │  MESMA TRANSAÇÃO DO BANCO                    │
  │                                              │
  │  BEGIN TRANSACTION;                          │
  │  INSERT INTO orders (...) VALUES (...);      │
  │  INSERT INTO outbox (                        │
  │    id, aggregate_type, aggregate_id,         │
  │    event_type, payload, created_at           │
  │  ) VALUES (...);                             │
  │  COMMIT;                                     │
  │                                              │
  └─────────────────────────────────────────────┘


  Outbox Publisher (poll ou CDC):
  • Poll: SELECT * FROM outbox WHERE published = false (cron a cada 500ms)
  • CDC: Debezium lê o WAL do Postgres e publica no Kafka automaticamente

  Vantagem: atomicidade garantida pelo banco de dados.
  Desvantagem (poll): latência adicional, carga no banco.
  Desvantagem (CDC): complexidade operacional (Debezium, Kafka Connect).
// Implementação do Transactional Outbox com polling
async function createOrderWithOutbox(order) {
  const client = await pool.connect();
  try {
    await client.query('BEGIN');

    // Operação de negócio
    const result = await client.query(
      'INSERT INTO orders (id, user_id, total, status) VALUES ($1, $2, $3, $4) RETURNING *',
      [order.id, order.userId, order.total, 'created']
    );

    // Evento na outbox — MESMA transação
    await client.query(
      `INSERT INTO outbox (id, aggregate_type, aggregate_id, event_type, payload)
       VALUES ($1, $2, $3, $4, $5)`,
      [
        crypto.randomUUID(),
        'Order',
        order.id,
        'ORDER_CREATED',
        JSON.stringify(result.rows[0]),
      ]
    );

    await client.query('COMMIT');
  } catch (error) {
    await client.query('ROLLBACK');
    throw error;
  } finally {
    client.release();
  }
}

// Outbox publisher — roda como worker separado
async function publishOutboxEvents() {
  const events = await pool.query(
    'SELECT * FROM outbox WHERE published = false ORDER BY created_at LIMIT 100 FOR UPDATE SKIP LOCKED'
  );

  for (const event of events.rows) {
    await kafkaProducer.send({
      topic: `${event.aggregate_type.toLowerCase()}.events`,
      messages: [{ key: event.aggregate_id, value: event.payload }],
    });

    await pool.query('UPDATE outbox SET published = true WHERE id = $1', [event.id]);
  }
}

// Executa a cada 500ms
setInterval(publishOutboxEvents, 500);

Observabilidade em Sistemas de Mensageria

MÉTRICAS CRÍTICAS PARA MONITORAR

  ┌─────────────────────────────────────────────────────────────────┐
  │ Métrica                    │ O que indica           │ Alerta    │
  ├────────────────────────────┼────────────────────────┼───────────┤
  │ Consumer Lag               │ Consumers não acompa-  │ > 10K     │
  │ (Kafka)                    │ nham os producers      │ mensagens │
  ├────────────────────────────┼────────────────────────┼───────────┤
  │ Queue Depth                │ Mensagens acumulando   │ > 5K ou   │
  │ (RabbitMQ/SQS)             │ na fila                │ crescente │
  ├────────────────────────────┼────────────────────────┼───────────┤
  │ DLQ Depth                  │ Mensagens falhando     │ > 0       │
  │                            │ permanentemente        │ (qualquer)│
  ├────────────────────────────┼────────────────────────┼───────────┤
  │ Publish Rate               │ Throughput de entrada  │ Desvio    │
  │ (msgs/s)                   │                        │ > 2σ      │
  ├────────────────────────────┼────────────────────────┼───────────┤
  │ Consume Rate               │ Throughput de saída    │ < publish │
  │ (msgs/s)                   │                        │ rate      │
  ├────────────────────────────┼────────────────────────┼───────────┤
  │ End-to-End Latency         │ Tempo total: publish   │ > p99     │
  │ (ms)                       │ → process → complete   │ SLA       │
  ├────────────────────────────┼────────────────────────┼───────────┤
  │ Redelivery Rate            │ Mensagens reprocessa-  │ > 5%      │
  │                            │ das (duplicatas)       │           │
  ├────────────────────────────┼────────────────────────┼───────────┤
  │ Consumer Rebalances        │ Instabilidade no       │ > 2/hora  │
  │ (Kafka)                    │ consumer group         │           │
  └─────────────────────────────────────────────────────────────────┘


INSTRUMENTAÇÃO PRÁTICA

  // Middleware de métricas para consumer
  async function instrumentedConsumer(handler) {
    return async (message) => {
      const start = performance.now();
      const publishedAt = parseInt(message.headers?.['published-at'] || '0');

      try {
        await handler(message);

        metrics.counter('messages_processed_total', 1, {
          topic: message.topic,
          status: 'success',
        });
      } catch (error) {
        metrics.counter('messages_processed_total', 1, {
          topic: message.topic,
          status: 'error',
        });
        throw error;
      } finally {
        // Latência de processamento
        metrics.histogram('message_processing_duration_ms',
          performance.now() - start,
          { topic: message.topic }
        );

        // Latência end-to-end (publish → process)
        if (publishedAt > 0) {
          metrics.histogram('message_e2e_latency_ms',
            Date.now() - publishedAt,
            { topic: message.topic }
          );
        }
      }
    };
  }

Resumo: Quando Usar O Quê

DECISÃO RÁPIDA

  "Preciso distribuir tarefas entre workers"
  → RabbitMQ ou SQS

  "Preciso de event streaming com replay e múltiplos consumers"
  → Kafka

  "Preciso de fila managed sem infra"
  → SQS/SNS (AWS) ou Cloud Pub/Sub (GCP)

  "Preciso de FIFO estrita com deduplicação"
  → SQS FIFO ou Kafka (1 partition)

  "Preciso de event sourcing"
  → Kafka com log compaction ou EventStoreDB

  "Preciso publicar + salvar no banco atomicamente"
  → Transactional Outbox + CDC (Debezium)

  "Preciso de transação distribuída"
  → Saga Pattern (não 2PC — 2PC não escala)

  "Preciso garantir que consumer não duplique processamento"
  → Idempotent consumer pattern (dedup key + upsert)

Referencias e Fontes

  • RabbitMQ Documentationhttps://www.rabbitmq.com/docs — Documentacao oficial do RabbitMQ, cobrindo exchanges, queues, bindings e patterns de mensageria
  • Apache Kafka Documentationhttps://kafka.apache.org/documentation/ — Documentacao oficial do Kafka, incluindo arquitetura, consumers, producers e Kafka Streams
  • “Designing Data-Intensive Applications” — Martin Kleppmann — Analise aprofundada de sistemas de mensageria, log-based messaging e garantias de entrega