Tujuan utama pola ini adalah mengeliminasi kegagalan kebenaran (correctness failures), seperti pesan ganda (duplikasi) dan race conditions (kondisi balapan).
1.1 Layer 1: Idempotent Producer
Secara default, jika terjadi network timeout setelah pesan terkirim namun sebelum acknowledgment diterima, producer akan mengirim ulang pesan tersebut, sehingga menyebabkan duplikasi.
Solusi: Aktifkan idempotence. Kafka akan menyematkan sequence number pada setiap pesan, sehingga broker bisa mendeteksi dan menolak duplikasi secara otomatis.
Implementasi franz-go:
Library franz-go mengaktifkan idempotence secara default.
packagemainimport("github.com/twmb/franz-go/pkg/kgo")funcNewProductionProducer()(*kgo.Client,error){client,err:=kgo.NewClient(kgo.SeedBrokers("localhost:9092"),// Idempotence is enabled by default.// Menjamin message tidak berubah urutan / tidak duplicate, khususnya saat retry// Menjamin durabilitas data (menunggu semua replika in-sync)kgo.RequiredAcks(kgo.AllISRAcks()),)returnclient,err}
1.2 Layer 2: Partition Key Strategy
Tanpa partition key, pesan akan disebar secara acak (round-robin), menyebabkan hilangnya urutan (ordering). Ini memicu race conditions jika dua event terkait (misal: "Order Created" dan "Payment Success") diproses oleh consumer berbeda secara bersamaan.
Solusi: Gunakan Partition Key yang konsisten (misalnya OrderID). Ini menjamin semua pesan dengan ID yang sama masuk ke partisi yang sama dan diproses berurutan oleh satu consumer.
Kafka default dioptimalkan untuk latensi rendah (kirim secepatnya), yang justru membunuh throughput pada beban tinggi karena terlalu banyak network round-trips.
2.1 Layer 1: Producer Batching
Alih-alih mengirim pesan satu per satu, kumpulkan pesan dalam memori (batch) dan kirim sekaligus.
Konfigurasi:
batch.size: Ukuran batch dalam bytes.
linger.ms: Waktu tunggu maksimal sebelum batch dikirim (meski belum penuh)13131313.
2.2 Layer 2: Compression
Untuk Mengurangi beban bandwidth jaringan. Algoritma lz4 direkomendasikan karena keseimbangan terbaik antara rasio kompresi dan beban CPU.
Implementasi franz-go (Batching & Compression):
2.3 Layer 3: Consumer Fetch Optimization
Optimalkan cara consumer mengambil data. Jangan mengambil data byte demi byte. Paksa consumer menunggu sampai data cukup banyak terkumpul di broker sebelum dikirim melalui jaringan.
Fitur auto-commit pada Kafka bekerja berdasarkan waktu (time-based), bukan penyelesaian tugas. Ini berbahaya karena bisa menyebabkan data hilang (jika crash setelah commit tapi sebelum proses selesai) atau duplikasi.
3.1 Manual Commit Strategy
Matikan auto-commit. Lakukan commit offset hanya setelah logika bisnis sukses dieksekusi sepenuhnya.
Implementasi franz-go:
3.2 Static Group Membership
Saat consumer restart (misal saat deploy), Kafka biasanya memicu Rebalance (stop-the-world) yang menghentikan seluruh grup consumer selama beberapa detik.
Solusi: Gunakan Static Group Membership. Dengan memberikan ID statis, Kafka akan menoleransi ketidakhadiran consumer sementara (selama session.timeout.ms) tanpa memicu rebalance.
Implementasi franz-go:
Direkomendasikan untuk menerapkan pola ini secara bertahap:
Correctness (Pattern 1): Wajib. Data yang salah tidak ada gunanya diproses cepat.
Performance (Pattern 2): Terapkan saat volume data mulai meningkat.
Resilience (Pattern 3): Terapkan untuk stabilitas operasional jangka panjang.
Studi Kasus: "TicketMaster Lite" – High-Concurrency Concert Booking System
Skenario: Anda sedang membangun backend untuk penjualan tiket konser artis papan atas (misal: Coldplay atau Taylor Swift).
Masalah: Ribuan pengguna menekan tombol "Beli" secara bersamaan ("War Tiket").
Risiko:
Overselling: Menjual tiket lebih dari kapasitas kursi (Race Condition).
Double Charge: Pengguna tidak sengaja ter-charge 2x karena menekan tombol berulang saat jaringan lambat (Idempotency failure).
System Crash: Server down karena beban mendadak dan pemulihan yang lama (Resilience failure).
func ProduceOrderEvent(client *kgo.Client, orderID string, payload []byte) {
record := &kgo.Record{
Topic: "orders",
// Key sangat krusial untuk menjamin ordering
Key: []byte(orderID),
Value: payload,
}
// Pesan dengan orderID sama PASTI masuk partisi yang sama
client.Produce(context.Background(), record, nil)
}
func ProcessWithTransaction(client *kgo.Client) {
// Memulai transaksi Kafka
if err := client.BeginTransaction(); err != nil {
log.Fatal(err)
}
// ... Konsumsi pesan ...
// ... Proses Logika Bisnis ...
// LAYER 4: App-Level Idempotency (Pseudo-code)
// if database.HasProcessed(msg.Key) { return }
// Commit record yang diproduksi sebagai bagian dari transaksi
// client.Produce(...)
// Commit transaksi (Finalisasi offset dan pesan keluar)
if err := client.EndTransaction(context.Background(), kgo.TryCommit); err != nil {
log.Fatal("Transaction failed:", err)
}
}
func NewHighThroughputProducer() (*kgo.Client, error) {
client, err := kgo.NewClient(
kgo.SeedBrokers("localhost:9092"),
// LAYER 1: Batching Strategy
// Tingkatkan ukuran batch (misal 100KB)
kgo.ProducerBatchMaxBytes(100_000),
// Tunggu hingga 10ms untuk mengumpulkan pesan
kgo.ProducerLinger(10 * time.Millisecond),
// LAYER 2: Compression
// Menggunakan LZ4 untuk efisiensi CPU/Bandwidth terbaik
kgo.ProducerBatchCompression(kgo.Lz4Compression()),
)
return client, err
}
func NewHighThroughputConsumer() (*kgo.Client, error) {
client, err := kgo.NewClient(
kgo.SeedBrokers("localhost:9092"),
kgo.ConsumeTopics("orders"),
// LAYER 3: Fetch Optimization
// Server broker akan menunggu sampai ada 1MB data
// ATAU 100ms berlalu sebelum mengirim respon ke consumer.
kgo.FetchMinBytes(1 * 1024 * 1024), // 1MB
kgo.FetchMaxWait(100 * time.Millisecond),
)
return client, err
}
func ConsumeReliably(client *kgo.Client) {
// Disable Auto-Commit dilakukan via opsi client (default franz-go sebenarnya manual)
// Namun pastikan kita tidak menggunakan auto-commit timer.
for {
fetches := client.PollFetches(context.Background())
fetches.EachRecord(func(r *kgo.Record) {
// 1. Proses Logika Bisnis
err := processOrder(r.Value)
if err != nil {
// Handle error (retry/DLQ), JANGAN commit jika gagal
log.Println("Processing failed", err)
return
}
// 2. Manual Commit SETELAH sukses
// Menandai pesan ini aman untuk dikommit
client.MarkCommitRecords(r)
})
// Mengirim request commit ke broker secara asinkron namun aman
// Ini jauh lebih aman dari auto-commit time-based
client.AllowRebalance()
}
}
func NewResilientConsumer() (*kgo.Client, error) {
client, err := kgo.NewClient(
kgo.SeedBrokers("localhost:9092"),
kgo.ConsumerGroup("order-processor-group"),
// PATTERN 3: Static Membership
// ID ini harus unik per instance (misal dari env var HOSTNAME/POD_NAME)
kgo.InstanceID("consumer-pod-01"),
// Toleransi waktu down sebelum rebalance dipicu (misal 45 detik)
// Memberi waktu consumer untuk restart tanpa menendang member lain
kgo.SessionTimeout(45 * time.Second),
)
return client, err
}
init-schema.sql
-- Mengaktifkan ekstensi pgcrypto untuk generate UUID (jika pakai Postgres versi lama, tapi aman untuk v13+)
CREATE EXTENSION IF NOT EXISTS "pgcrypto";
-- Tabel Utama: Tiket yang terjual
CREATE TABLE IF NOT EXISTS bookings (
booking_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID NOT NULL,
concert_id UUID NOT NULL,
seat_number VARCHAR(10) NOT NULL,
status VARCHAR(20) DEFAULT 'PENDING',
created_at TIMESTAMP DEFAULT NOW()
);
-- Tabel Kunci Idempotensi (Jantung dari Exactly-Once Semantics)
CREATE TABLE IF NOT EXISTS idempotency_keys (
idempotency_key VARCHAR(255) PRIMARY KEY, -- Diubah ke VARCHAR agar fleksibel menerima UUID string dari client
booking_id UUID, -- Referensi ke booking jika sukses
status VARCHAR(20), -- 'PROCESSING', 'COMPLETED', 'FAILED'
created_at TIMESTAMP DEFAULT NOW()
);
-- Unique constraint untuk mencegah overselling (Race Condition Defense di DB Level)
-- Memastikan kombinasi ConcertID + SeatNumber hanya boleh ada satu
CREATE UNIQUE INDEX IF NOT EXISTS idx_concert_seat ON bookings(concert_id, seat_number);
compose.yaml
services:
postgres:
image: postgres:18-alpine
container_name: ticket_db_container
ports:
- "5432:5432"
environment:
- POSTGRES_USER=user
- POSTGRES_PASSWORD=pass
- POSTGRES_DB=ticket_db
volumes:
# Persistensi data agar tidak hilang saat restart
- postgres_data:/var/lib/postgresql/data
# MAGIC HAPPENS HERE:
# Mounting file SQL lokal ke folder initdb container
- ./init-schema.sql:/docker-entrypoint-initdb.d/init-schema.sql
restart: unless-stopped
healthcheck:
test: ["CMD-SHELL", "pg_isready -U user -d ticket_db"]
interval: 5s
timeout: 5s
retries: 5
volumes:
postgres_data:
import http from 'k6/http';
import { check, sleep } from 'k6';
import { randomItem } from 'https://jslib.k6.io/k6-utils/1.2.0/index.js';
// --- KONFIGURASI SKENARIO ---
export const options = {
stages: [
{ duration: '5s', target: 100 }, // Fase 1: Warming up (100 user siap-siap)
{ duration: '10s', target: 1000 }, // Fase 2: WAR dimulai! (Naik ke 1000 user)
{ duration: '20s', target: 2000 }, // Fase 3: Puncak Kemarahan (2000 user spamming)
{ duration: '5s', target: 0 }, // Fase 4: Sold Out / Bubar
],
// Ambang batas kelulusan tes
thresholds: {
http_req_failed: ['rate<0.01'], // Error harus di bawah 1%
http_req_duration: ['p(95)<200'], // 95% request harus selesai di bawah 200ms
},
};
// --- DATA POOL ---
// Kita asumsikan hanya ada 50 Kursi VIP (A1 - A50)
// Ribuan user akan berebut kursi-kursi ini.
const SEATS = [];
for (let i = 1; i <= 50; i++) {
SEATS.push(`VIP-${i}`);
}
const CONCERT_ID = "123e4567-e89b-12d3-a456-426614174000";
// Helper function untuk generate UUID (RFC4122 version 4)
function uuidv4() {
return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function(c) {
var r = Math.random() * 16 | 0, v = c == 'x' ? r : (r & 0x3 | 0x8);
return v.toString(16);
});
}
// --- FUNGSI VIRTUAL USER (VU) ---
export default function () {
// 1. Setiap iterasi adalah "User Baru" atau "User Mencoba Lagi"
// Kita generate ID unik untuk User dan Transaksi ini.
const userID = uuidv4();
const idempotencyKey = uuidv4(); // Transaksi unik
// 2. User memilih kursi secara acak dari pool yang tersedia
// Ini mensimulasikan user yang klik kursi mana saja yang terlihat kosong
const selectedSeat = randomItem(SEATS);
const payload = JSON.stringify({
user_id: userID,
concert_id: CONCERT_ID,
seat_number: selectedSeat,
idempotency_key: idempotencyKey,
});
const params = {
headers: {
'Content-Type': 'application/json',
},
};
// 3. Eksekusi Request (Klik tombol "Beli")
const res = http.post('http://localhost:3000/book-ticket', payload, params);
// 4. Validasi Response API
// Kita mengharapkan 202 Accepted (Async processing)
check(res, {
'is status 202': (r) => r.status === 202,
});
// 5. Human Behavior / Network Delay
// User tidak mungkin request 1ms sekali. Kita kasih jeda acak 0.1s - 1s
// Ini mensimulasikan "loading" di frontend atau jeda jari user.
sleep(Math.random() * 1);
}