Concorrência Avançada
Concorrência Avançada
1. Concorrência vs Paralelismo
Rob Pike (co-criador de Go) formalizou a distinção de forma precisa:
- Concorrência é sobre estrutura — lidar com múltiplas coisas ao mesmo tempo. Propriedade do design.
- Paralelismo é sobre execução — fazer múltiplas coisas ao mesmo tempo. Propriedade do runtime.
Um programa pode ser concorrente sem ser paralelo (um core alternando entre tasks), paralelo sem ser concorrente (SIMD processando um vetor), ou ambos.
Concorrência (um core): Paralelismo (múltiplos cores):
Thread A: ████░░░░████░░░░ Core 0: ████████████████
Thread B: ░░░░████░░░░████ Core 1: ████████████████
──────────────→ t ──────────────→ t
Interleaving em um core Execução simultânea real
Concorrência + Paralelismo:
Core 0: ██A██B██A██C██A██B██
Core 1: ██B██C██B██A██C██A██ Múltiplos cores, cada um alternando
Concurrency enablers
Preemptive scheduling: O SO interrompe uma thread a qualquer momento via timer interrupt (~1-10ms). A thread não controla quando é pausada. Modelo de OS threads.
Cooperative scheduling: A task decide quando ceder controle (yield). Se não coopera, bloqueia as outras. Modelo de coroutines, async/await e event loops. Vantagem: sem necessidade de sincronização entre yield points.
// Cooperative: yield explícito
async function fetchData() {
const response = await fetch("/api/data"); // yield point
const json = await response.json(); // yield point
return json;
// Entre os awaits, NENHUMA outra task executa no mesmo thread
}
2. Modelos de Concorrência
2.1 OS Threads (preemptive, 1:1)
Cada thread mapeia 1:1 para uma kernel thread. No Linux, threads são task_struct que compartilham mm_struct via CLONE_VM.
- Stack: ~8MB (Linux). Context switch: ~1-10μs (salvar registradores + TLB flush + cache misses).
- Criação: ~10-50μs. Limite prático: ~10k threads antes de degradação significativa.
2.2 Green Threads (M:N)
M threads de usuário mapeadas para N threads do SO (M >> N). O runtime gerencia escalonamento.
Go goroutines: stack inicial 2KB (cresce até 1GB), switch ~200ns, criação ~300ns, suporta milhões simultâneas.
Erlang/BEAM processes: stack ~300B, preemptive via reduction counting, isolamento total, GC per-process.
Java Virtual Threads (Loom, JDK 21+): mounted/unmounted em carrier threads, yield automático em I/O, backward compatible.
// Go: 100k goroutines trivialmente — ~200MB (100k × 2KB)
// Com OS threads (8MB stack): 800GB — impossível
func main() {
var wg sync.WaitGroup
for i := 0; i < 100_000; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
time.Sleep(time.Second)
}(i)
}
wg.Wait()
}
2.3 Coroutines (cooperative)
Funções que podem ser suspensas e retomadas. Suspensão explícita — o programador decide os yield points.
| Aspecto | Stackful | Stackless |
|---|---|---|
| Stack própria | Sim (heap) | Não (state machine) |
| Suspensão | Qualquer ponto da call stack | Apenas na função async |
| Overhead | ~KB (stack) | ~bytes (estado) |
| Exemplo | Go goroutines, Lua | Rust async/await, Python asyncio |
Stackless coroutines (async/await) são transformadas pelo compilador em state machines: cada await vira um estado, e a função é uma switch(state) que avança a cada resumption.
2.4 Actor Model
O Actor Model (Hewitt, 1973): cada actor possui estado privado, recebe mensagens via mailbox, processa uma por vez. Sem memória compartilhada — elimina locks.
Supervision Tree: Application Supervisor
├── Worker (GenServer)
├── Worker Pool
└── Sub-Supervisor
├── Worker
└── Worker
one_for_one strategy: worker crasha → supervisor reinicia APENAS ele.
"Let it crash" — design for failure, not prevention.
2.5 CSP (Communicating Sequential Processes)
CSP (Hoare, 1978): comunicação síncrona por default via channels nomeados (não mailboxes).
// Go — CSP: fan-out/fan-in
func fanOutFanIn() {
jobs := make(chan int, 100)
results := make(chan int, 100)
// Fan-out: 5 workers consumindo do mesmo channel
var wg sync.WaitGroup
for w := 0; w < 5; w++ {
wg.Add(1)
go func() {
defer wg.Done()
for job := range jobs {
results <- job * job
}
}()
}
go func() { // Produtor
for i := 0; i < 50; i++ { jobs <- i }
close(jobs)
}()
go func() { wg.Wait(); close(results) }() // Fan-in
for result := range results { fmt.Println(result) }
}
Tabela comparativa
| Modelo | Comunicação | Escalonamento | Overhead/unit | Exemplos |
|---|---|---|---|---|
| OS Threads | Shared memory + locks | Preemptive (kernel) | ~8MB + ~10μs switch | pthreads, Java |
| Green Threads | Runtime-dependent | Preemptive (runtime) | ~2KB + ~200ns switch | Go, Erlang |
| Coroutines | Runtime-dependent | Cooperative | ~bytes (state machine) | async/await |
| Actors | Message passing | Preemptive (runtime) | ~300B (BEAM) | Erlang, Akka |
| CSP | Channels | Preemptive (runtime) | ~2KB (Go) | Go, Clojure |
3. Memory Model
Por que o reordenamento acontece
Instruções NÃO executam na ordem escrita. Reordenamento ocorre em três níveis:
- Compilador: reordena para otimizar registradores, eliminar loads redundantes.
- CPU (out-of-order execution): executa fora de ordem para maximizar pipeline. Apple M2: ~600 instruções in-flight.
- Store buffers: escritas vão para buffer local ao core — visíveis para o próprio core imediatamente, para outros cores somente após flush.
Bug real de reordenamento:
Thread 1: Thread 2:
x = 42 (store) while (!ready) {} (load)
ready = true (store) print(x) (load)
Intuição: imprime 42. Realidade possível: imprime 0!
- Compilador pode reordenar: ready=true ANTES de x=42
- CPU pode flush ready antes de x no store buffer
Sem memory barriers, NÃO há garantia de ordenação entre cores.
Happens-before relation
Se A happens-before B (A → B), os efeitos de A são garantidamente visíveis quando B executa. É uma ordem parcial — eventos sem relação happens-before estão em data race.
Regras universais: (1) program order, (2) unlock → lock do mesmo mutex, (3) escrita atômica → leitura subsequente, (4) thread.start() → primeira instrução, (5) última instrução → join(), (6) transitividade.
Memory ordering levels
Sequential Consistency (SC) — ordem global visível para todas as threads. Default: Java volatile.
│
Acquire/Release — acquire impede reordenamento para baixo; release impede para cima.
│ Suficiente para critical sections. Default: C++ mutex, Rust AcqRel.
│
Relaxed — apenas atomicidade, sem ordering. Útil para contadores isolados.
Java Memory Model
class VolatileExample {
private volatile boolean ready = false; // SC ordering
private int x = 0;
void writer() {
x = 42; // escrita normal
ready = true; // volatile write = RELEASE barrier
}
void reader() {
if (ready) { // volatile read = ACQUIRE barrier
assert x == 42; // GARANTIDO pelo JMM
}
}
}
Go Memory Model
// Channel send happens-before receive correspondente
var data int
func main() {
done := make(chan struct{})
go func() {
data = 42 // escrita
done <- struct{}{} // send = RELEASE
}()
<-done // receive = ACQUIRE
fmt.Println(data) // GARANTIDO: 42
}
// CUIDADO: sem sincronização, Go NÃO garante nada
// Thread 1: x = 1; y = 2 | Thread 2: r1 = y; r2 = x
// Resultado r1=2, r2=0 é LEGAL pelo Go memory model
C++ memory_order
std::atomic<bool> ready{false};
std::atomic<int> data{0};
void producer() {
data.store(42, std::memory_order_relaxed); // relaxed ok — publicado pelo release
ready.store(true, std::memory_order_release); // release barrier
}
void consumer() {
while (!ready.load(std::memory_order_acquire)) {} // acquire barrier
assert(data.load(std::memory_order_relaxed) == 42); // garantido pelo pair
}
// Opções: seq_cst (default), acquire, release, acq_rel, relaxed, consume (deprecated)
4. Lock-Free Data Structures
CAS (Compare-And-Swap)
Operação atômica fundamental: CAS(addr, expected, desired) — se *addr == expected, seta desired e retorna true; senão false. No x86: LOCK CMPXCHG (protocolo MESI — cache line em estado Modified). Em ARM: LL/SC (LDXR/STXR).
Lock-free stack (Treiber stack, 1986)
type LockFreeStack[T any] struct {
top atomic.Pointer[node[T]] // node: { value T; next *node[T] }
}
func (s *LockFreeStack[T]) Push(value T) {
n := &node[T]{value: value}
for {
old := s.top.Load()
n.next = old
if s.top.CompareAndSwap(old, n) { return } // CAS falhou → retry
}
}
func (s *LockFreeStack[T]) Pop() (T, bool) {
for {
old := s.top.Load()
if old == nil { var zero T; return zero, false }
if s.top.CompareAndSwap(old, old.next) { return old.value, true }
}
}
Lock-free queue (Michael-Scott, 1996)
head tail
│ │
▼ ▼
┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐
│sentinel│──▶│ val=A │──▶│ val=B │──▶│ val=C │──▶ nil
└────────┘ └────────┘ └────────┘ └────────┘
Enqueue(D): CAS(tail.next, nil, D) → CAS(tail, old, D)
Dequeue: CAS(head, old, head.next) → retorna valor de head.next
Usado em ConcurrentLinkedQueue (Java) e muitos runtimes.
ABA Problem
Estado inicial: Stack top → A → B → C
Thread 1 (Pop): Thread 2:
1. Lê top=A, next=B
2. Prepara CAS(top, A, B)
--- PREEMPTED ---
3. Pop A → top=B
4. Pop B → top=C
5. Push A → top=A→C (A reutilizado!)
--- RESUMES ---
6. CAS(top, A, B) → SUCESSO! (top==A ✓, mas A.next agora→C, não B)
7. top=B... mas B já foi freed! → CORRUPÇÃO
Soluções: (1) Tagged pointers — counter monotônico acoplado ao ponteiro, CAS de 128 bits (CMPXCHG16B). (2) Hazard pointers — threads publicam ponteiros em uso. (3) Epoch-based reclamation — crossbeam (Rust).
Hierarquia de progresso
Wait-free → todo thread completa em passos finitos. Ex: atomic fetch-and-add.
Lock-free → ao menos um thread progride. Ex: Treiber stack, MS queue.
Obstruction-free → progresso em isolamento. Pode livelocked sob contention.
Blocking → deadlock possível. Ex: mutex, semaphore.
5. Concurrent Patterns
Producer-Consumer com bounded buffer
func producerConsumer() {
ch := make(chan Task, 10) // bounded — backpressure natural
var prodWg sync.WaitGroup
for i := 0; i < 3; i++ { // 3 producers
prodWg.Add(1)
go func(id int) {
defer prodWg.Done()
for j := 0; j < 100; j++ { ch <- Task{ID: id*100 + j} }
}(i)
}
var consWg sync.WaitGroup
for i := 0; i < 5; i++ { // 5 consumers
consWg.Add(1)
go func() { defer consWg.Done(); for task := range ch { process(task) } }()
}
go func() { prodWg.Wait(); close(ch) }()
consWg.Wait()
}
Readers-Writer Lock
sync.RWMutex: múltiplos RLock() simultâneos para reads, Lock() exclusivo para writes. Go dá prioridade a writers pendentes (evita starvation).
Work Stealing
Cada worker tem uma deque. LIFO para próprio trabalho (cache locality), roubo FIFO do fundo da deque de outro worker (baixa contention). Usado em Go scheduler, Tokio, Java ForkJoinPool.
Double-Checked Locking (broken)
// BROKEN sem volatile (pre-Java 5)
private static Singleton instance; // SEM volatile
static Singleton getInstance() {
if (instance == null) {
synchronized (Singleton.class) {
if (instance == null) {
instance = new Singleton(); // 1.aloca 2.constrói 3.atribui
// CPU reordena: 1→3→2. Thread B vê instance!=null, campos zerados!
}
}
}
return instance;
}
// Fix: private static volatile Singleton instance;
6. Async Runtimes Internals
Node.js — libuv event loop
Event Loop (libuv) — fases:
┌────────────────────────────────────┐
│ timers (setTimeout/setInterval — min-heap)
├────────────────────────────────────┤
│ pending callbacks (I/O adiados)
├────────────────────────────────────┤
│ poll (epoll_wait/kqueue — bloqueia aqui)
├────────────────────────────────────┤
│ check (setImmediate)
├────────────────────────────────────┤
│ close callbacks (socket.on('close'))
└────────────────────────────────────┘
Entre fases: nextTick queue + microtask queue (Promises)
Thread pool (UV_THREADPOOL_SIZE=4): fs, dns.lookup, crypto, zlib
Go Scheduler — modelo G-M-P
- G (Goroutine): unidade de trabalho (stack, IP, estado)
- M (Machine): OS thread
- P (Processor): contexto lógico com run queue local
Global Run Queue: [G7, G8, ...] GOMAXPROCS = 3
P0 [G1,G2] P1 [G4,G5] P2 [G6] ← Local Run Queues (lock-free)
│ │ │
▼ ▼ ▼
M0 (run G3) M1 (run G0) M2 (run G10) ← OS Threads
Fluxo: local queue O(1) → steal metade de outro P → global (com lock) → park
Preemption (1.14+): SIGURG a cada 10ms. Syscalls: M detaches do P → P migra.
Tokio (Rust)
┌──────────────────────────────────────┐
│ I/O Driver (epoll/kqueue) │
│ Timer Driver (hierarchical wheel) │
├──────────────────────────────────────┤
│ Task Scheduler (work-stealing) │
│ Cada worker tem deque + injection │
├──────────────────────────────────────┤
│ Worker 0 │ Worker 1 │ Worker 2 │ (OS threads)
└──────────────────────────────────────┘
Tasks = Futures: poll() → Ready(val) | Pending (registra Waker)
Sem stack por task — apenas state machine.
Cada task é polled pelo worker thread. Arc<Mutex<T>> é o padrão para shared mutable state — Rust garante em compile time (Send/Sync traits) a ausência de data races.
7. Paralelismo: Leis e Limites
Lei de Amdahl
Speedup(N) = 1 / (S + (1-S)/N) S = fração serial, N = cores
Quando N → ∞: Speedup_max = 1/S
Se 5% serial: máx 20x. Com 1000 cores: ~19.6x (980 cores extras = +0.4x!)
Speedup
▲
20 │ ───── S=5% (máx 20x)
│ ─────────
│ ──────
│ ─────
10 │ ─── ───── S=10% (máx 10x)
│ ── ─────────
│ ── ──────
5 │ ── ───── ───── S=20% (máx 5x)
│ ─ ──── ─────────────
2 │─ ─── ───── S=50% (máx 2x)
1 │────────────────────────────────────
└──┬──┬──┬──┬───┬───┬────┬────┬────▶ N
1 2 4 8 16 32 64 128 256
Lei de Gustafson
Scaled Speedup(N) = N - S × (N - 1)
S=0.05, N=100: Speedup = 100 - 0.05×99 = 95.05x
Amdahl: "Problema fixo, quantos cores ajudam?" → retornos decrescentes.
Gustafson: "Mais cores, problema maior." → speedup quase linear.
Na prática, Gustafson é mais realista (rendering, ML training, simulação).
Overhead de paralelização
Communication: cache line transfer entre cores L1: ~30-70 ciclos. NUMA cross-socket: ~100-300 ciclos.
False sharing: variáveis diferentes na mesma cache line (64B) causam ping-pong:
// False sharing — 10-50x mais lento que o esperado
type Counters struct {
a int64 // offset 0-7 ← mesma cache line
b int64 // offset 8-15 ← mesma cache line!
}
// Fix: padding
type CountersPadded struct {
a int64
_ [56]byte // 64-8=56 → a e b em cache lines diferentes
b int64
}
// Java: @Contended | Rust: #[repr(align(64))]
Synchronization: mutex uncontended ~25ns; contended ~μs.
8. Implementações Práticas
Go: pipeline concorrente completo
// Pipeline: generate → filterPrimes → fanOut (N workers) → fanIn → collect
// Cada estágio é uma goroutine conectada por channels. Context para cancelamento.
func generate(ctx context.Context, nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
select { case out <- n: case <-ctx.Done(): return }
}
}()
return out
}
func filterPrimes(ctx context.Context, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
if isPrime(n) {
select { case out <- n: case <-ctx.Done(): return }
}
}
}()
return out
}
func fanOut(ctx context.Context, in <-chan int, n int) []<-chan int {
chs := make([]<-chan int, n)
for i := range chs {
ch := make(chan int)
go func() {
defer close(ch)
for v := range in {
select { case ch <- v * v: case <-ctx.Done(): return }
}
}()
chs[i] = ch
}
return chs
}
func fanIn(ctx context.Context, chs ...<-chan int) <-chan int {
out := make(chan int)
var wg sync.WaitGroup
for _, c := range chs {
wg.Add(1)
go func(ch <-chan int) {
defer wg.Done()
for v := range ch {
select { case out <- v: case <-ctx.Done(): return }
}
}(c)
}
go func() { wg.Wait(); close(out) }()
return out
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
nums := make([]int, 1000)
for i := range nums { nums[i] = i + 2 }
results := fanIn(ctx, fanOut(ctx, filterPrimes(ctx, generate(ctx, nums...)), runtime.NumCPU())...)
count := 0
for range results { count++ }
fmt.Printf("Processed %d primes with %d workers\n", count, runtime.NumCPU())
}
Rust: async com ownership safety
use tokio::sync::mpsc;
struct WorkItem { id: u64, payload: String }
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel::<WorkItem>(100); // bounded channel
// Produtor — tx é MOVIDO (ownership transfer)
tokio::spawn(async move {
for i in 0..50 {
let item = WorkItem { id: i, payload: format!("data-{}", i) };
if tx.send(item).await.is_err() { break; }
} // tx dropped → channel closed
});
// Consumer — ownership de cada WorkItem transferida via channel
let mut handles = vec![];
while let Some(item) = rx.recv().await {
handles.push(tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
item.id // retorna id após processamento
}));
}
let total: u64 = futures::future::join_all(handles)
.await.into_iter().map(|r| r.unwrap()).sum();
println!("Sum: {}", total);
}
// Rust garante em compile time: sem data races (Send/Sync), sem use-after-free.
Java: Virtual Threads vs CompletableFuture
// Virtual Threads (JDK 21+) — código sequencial, concorrência automática
public List<OrderResult> processOrders(List<Order> orders) throws Exception {
try (var exec = Executors.newVirtualThreadPerTaskExecutor()) {
var futures = orders.stream()
.map(order -> exec.submit(() -> {
var inv = inventoryService.check(order); // yield no I/O
var pay = paymentService.charge(order); // yield no I/O
return new OrderResult(order, inv, pay);
})).toList();
return futures.stream().map(f -> {
try { return f.get(); }
catch (Exception e) { throw new RuntimeException(e); }
}).toList();
}
}
// CompletableFuture — composição funcional (mais verboso)
public CompletableFuture<List<OrderResult>> processAsync(List<Order> orders) {
var futures = orders.stream().map(order ->
supplyAsync(() -> inventoryService.check(order))
.thenCombine(supplyAsync(() -> paymentService.charge(order)),
(inv, pay) -> new OrderResult(order, inv, pay))
).toList();
return allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(v -> futures.stream().map(CompletableFuture::join).toList());
}
// Virtual Threads: simples, debugável. CompletableFuture: callback hell.
// JDK 21+: prefira Virtual Threads.
9. Exercícios
Exercício 1: Detectar data race
Identifique o data race e proponha três soluções (mutex, channel, atomic):
func main() {
counter := 0
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counter++ // DATA RACE: read-modify-write sem sincronização
}()
}
wg.Wait()
fmt.Println(counter) // Provavelmente < 1000
}
// Use `go run -race` para verificar. Qual solução é mais rápida? Por quê?
Exercício 2: Token bucket rate limiter
Implemente um rate limiter thread-safe com token bucket. Sem mutex — use channels.
type TokenBucket struct { /* defina campos */ }
func NewTokenBucket(capacity int, refillRate time.Duration) *TokenBucket
func (tb *TokenBucket) Allow() bool // Consome token, false se vazio
func (tb *TokenBucket) Stop()
// Thread-safe, tokens não excedem capacity.
Exercício 3: Memory ordering
Quais outputs são possíveis segundo o Go memory model? Justifique.
var x, y int
func main() {
go func() { x = 1; y = 1 }()
go func() {
r1 := y; r2 := x
fmt.Println(r1, r2)
}()
time.Sleep(time.Second)
}
// "0 0"? "0 1"? "1 0"? "1 1"? Qual é surpreendente?
Exercício 4: Lock-free counter benchmark
Compare sync/atomic vs sync.Mutex sob contention alta (10 goroutines, 1M incrementos). Use testing.Benchmark. Qual vence sob contention alta vs baixa? Como false sharing afeta?
Exercício 5: Pipeline com cancelamento
Pipeline de 4 estágios: (1) lê URLs, (2) HTTP GET com concurrency limit de 5, (3) extrai Content-Length, (4) soma total. Requisitos: context.Context cancelamento, timeout 10s/request, error handling sem parar pipeline.
10. Referências
- “The Art of Multiprocessor Programming” — Herlihy & Shavit. Lock-free/wait-free algorithms com provas formais.
- “Java Concurrency in Practice” — Goetz et al. Happens-before, safe publication, JMM. Caps 3 e 16.
- Go Memory Model — https://go.dev/ref/mem. Happens-before para Go.
- “Is Parallel Programming Hard?” — Paul McKenney. Memory barriers, RCU, kernel patterns.
- “Concurrency is not Parallelism” — Rob Pike (talk). A distinção formalizada.
- Rust Atomics and Locks — Mara Bos. Hardware memory model até lock-free structures.
- C++ Concurrency in Action — Anthony Williams.
std::atomic, memory ordering detalhado. - LMAX Disruptor — Martin Thompson. Ring buffer lock-free, 6M+ eventos/seg.