Kafka Pattern Untuk Production
Pattern 1: Exactly-Once Semantics (Menjamin Kebenaran Data)
Tujuan utama pola ini adalah mengeliminasi kegagalan kebenaran (correctness failures), seperti pesan ganda (duplikasi) dan race conditions (kondisi balapan).
1.1 Layer 1: Idempotent Producer
Secara default, jika terjadi network timeout setelah pesan terkirim namun sebelum acknowledgment diterima, producer akan mengirim ulang pesan tersebut, sehingga menyebabkan duplikasi.
Solusi: Aktifkan idempotence. Kafka akan menyematkan sequence number pada setiap pesan, sehingga broker bisa mendeteksi dan menolak duplikasi secara otomatis.
Implementasi franz-go:
Library franz-go mengaktifkan idempotence secara default.
package main
import (
"github.com/twmb/franz-go/pkg/kgo"
)
func NewProductionProducer() (*kgo.Client, error) {
client, err := kgo.NewClient(
kgo.SeedBrokers("localhost:9092"),
// Idempotence is enabled by default.
// Menjamin message tidak berubah urutan / tidak duplicate, khususnya saat retry
// Menjamin durabilitas data (menunggu semua replika in-sync)
kgo.RequiredAcks(kgo.AllISRAcks()),
)
return client, err
}1.2 Layer 2: Partition Key Strategy
Tanpa partition key, pesan akan disebar secara acak (round-robin), menyebabkan hilangnya urutan (ordering). Ini memicu race conditions jika dua event terkait (misal: "Order Created" dan "Payment Success") diproses oleh consumer berbeda secara bersamaan.
Solusi: Gunakan Partition Key yang konsisten (misalnya OrderID). Ini menjamin semua pesan dengan ID yang sama masuk ke partisi yang sama dan diproses berurutan oleh satu consumer.
Implementasi franz-go:
func ProduceOrderEvent(client *kgo.Client, orderID string, payload []byte) {
record := &kgo.Record{
Topic: "orders",
// Key sangat krusial untuk menjamin ordering
Key: []byte(orderID),
Value: payload,
}
// Pesan dengan orderID sama PASTI masuk partisi yang sama
client.Produce(context.Background(), record, nil)
}1.3 Layer 3 & 4: Transactional Semantics & App-Level Idempotency
Lapisan infrastruktur (Layer 1 & 2) tidak bisa mencegah duplikasi jika consumer crash setelah memproses pesan tapi sebelum commit offset.
Solusi:
Transactions: Menjamin pembacaan dan penulisan ke Kafka bersifat atomik.
App-Level Idempotency: Consumer harus mengecek apakah pesan sudah pernah diproses (misal cek ke DB) sebelum mengeksekusi logika bisnis.
Implementasi franz-go (Transactional):
func ProcessWithTransaction(client *kgo.Client) {
// Memulai transaksi Kafka
if err := client.BeginTransaction(); err != nil {
log.Fatal(err)
}
// ... Konsumsi pesan ...
// ... Proses Logika Bisnis ...
// LAYER 4: App-Level Idempotency (Pseudo-code)
// if database.HasProcessed(msg.Key) { return }
// Commit record yang diproduksi sebagai bagian dari transaksi
// client.Produce(...)
// Commit transaksi (Finalisasi offset dan pesan keluar)
if err := client.EndTransaction(context.Background(), kgo.TryCommit); err != nil {
log.Fatal("Transaction failed:", err)
}
}Pattern 2: Throughput Optimization (Optimalisasi Performa)
Kafka default dioptimalkan untuk latensi rendah (kirim secepatnya), yang justru membunuh throughput pada beban tinggi karena terlalu banyak network round-trips.
2.1 Layer 1: Producer Batching
Alih-alih mengirim pesan satu per satu, kumpulkan pesan dalam memori (batch) dan kirim sekaligus.
Konfigurasi:
batch.size: Ukuran batch dalam bytes.linger.ms: Waktu tunggu maksimal sebelum batch dikirim (meski belum penuh)13131313.
2.2 Layer 2: Compression
Untuk Mengurangi beban bandwidth jaringan. Algoritma lz4 direkomendasikan karena keseimbangan terbaik antara rasio kompresi dan beban CPU.
Implementasi franz-go (Batching & Compression):
func NewHighThroughputProducer() (*kgo.Client, error) {
client, err := kgo.NewClient(
kgo.SeedBrokers("localhost:9092"),
// LAYER 1: Batching Strategy
// Tingkatkan ukuran batch (misal 100KB)
kgo.ProducerBatchMaxBytes(100_000),
// Tunggu hingga 10ms untuk mengumpulkan pesan
kgo.ProducerLinger(10 * time.Millisecond),
// LAYER 2: Compression
// Menggunakan LZ4 untuk efisiensi CPU/Bandwidth terbaik
kgo.ProducerBatchCompression(kgo.Lz4Compression()),
)
return client, err
}2.3 Layer 3: Consumer Fetch Optimization
Optimalkan cara consumer mengambil data. Jangan mengambil data byte demi byte. Paksa consumer menunggu sampai data cukup banyak terkumpul di broker sebelum dikirim melalui jaringan.
Implementasi franz-go:
func NewHighThroughputConsumer() (*kgo.Client, error) {
client, err := kgo.NewClient(
kgo.SeedBrokers("localhost:9092"),
kgo.ConsumeTopics("orders"),
// LAYER 3: Fetch Optimization
// Server broker akan menunggu sampai ada 1MB data
// ATAU 100ms berlalu sebelum mengirim respon ke consumer.
kgo.FetchMinBytes(1 * 1024 * 1024), // 1MB
kgo.FetchMaxWait(100 * time.Millisecond),
)
return client, err
}Pattern 3: Failure Recovery (Ketahanan & Pemulihan)
Fitur auto-commit pada Kafka bekerja berdasarkan waktu (time-based), bukan penyelesaian tugas. Ini berbahaya karena bisa menyebabkan data hilang (jika crash setelah commit tapi sebelum proses selesai) atau duplikasi.
3.1 Manual Commit Strategy
Matikan auto-commit. Lakukan commit offset hanya setelah logika bisnis sukses dieksekusi sepenuhnya.
Implementasi franz-go:
func ConsumeReliably(client *kgo.Client) {
// Disable Auto-Commit dilakukan via opsi client (default franz-go sebenarnya manual)
// Namun pastikan kita tidak menggunakan auto-commit timer.
for {
fetches := client.PollFetches(context.Background())
fetches.EachRecord(func(r *kgo.Record) {
// 1. Proses Logika Bisnis
err := processOrder(r.Value)
if err != nil {
// Handle error (retry/DLQ), JANGAN commit jika gagal
log.Println("Processing failed", err)
return
}
// 2. Manual Commit SETELAH sukses
// Menandai pesan ini aman untuk dikommit
client.MarkCommitRecords(r)
})
// Mengirim request commit ke broker secara asinkron namun aman
// Ini jauh lebih aman dari auto-commit time-based
client.AllowRebalance()
}
}3.2 Static Group Membership
Saat consumer restart (misal saat deploy), Kafka biasanya memicu Rebalance (stop-the-world) yang menghentikan seluruh grup consumer selama beberapa detik.
Solusi: Gunakan Static Group Membership. Dengan memberikan ID statis, Kafka akan menoleransi ketidakhadiran consumer sementara (selama session.timeout.ms) tanpa memicu rebalance.
Implementasi franz-go:
func NewResilientConsumer() (*kgo.Client, error) {
client, err := kgo.NewClient(
kgo.SeedBrokers("localhost:9092"),
kgo.ConsumerGroup("order-processor-group"),
// PATTERN 3: Static Membership
// ID ini harus unik per instance (misal dari env var HOSTNAME/POD_NAME)
kgo.InstanceID("consumer-pod-01"),
// Toleransi waktu down sebelum rebalance dipicu (misal 45 detik)
// Memberi waktu consumer untuk restart tanpa menendang member lain
kgo.SessionTimeout(45 * time.Second),
)
return client, err
}Direkomendasikan untuk menerapkan pola ini secara bertahap:
Correctness (Pattern 1): Wajib. Data yang salah tidak ada gunanya diproses cepat.
Performance (Pattern 2): Terapkan saat volume data mulai meningkat.
Resilience (Pattern 3): Terapkan untuk stabilitas operasional jangka panjang.
Studi Kasus: "TicketMaster Lite" – High-Concurrency Concert Booking System
Skenario: Anda sedang membangun backend untuk penjualan tiket konser artis papan atas (misal: Coldplay atau Taylor Swift).
Masalah: Ribuan pengguna menekan tombol "Beli" secara bersamaan ("War Tiket").
Risiko:
Overselling: Menjual tiket lebih dari kapasitas kursi (Race Condition).
Double Charge: Pengguna tidak sengaja ter-charge 2x karena menekan tombol berulang saat jaringan lambat (Idempotency failure).
System Crash: Server down karena beban mendadak dan pemulihan yang lama (Resilience failure).
Tech Stack:
Language: Golang 1.23+
Web Framework: Go Chi (Lightweight & Fast)
Database: PostgreSQL (dengan
pgxdriver)Message Broker: Kafka (dengan
github.com/twmb/franz-go)
-- Mengaktifkan ekstensi pgcrypto untuk generate UUID (jika pakai Postgres versi lama, tapi aman untuk v13+)
CREATE EXTENSION IF NOT EXISTS "pgcrypto";
-- Tabel Utama: Tiket yang terjual
CREATE TABLE IF NOT EXISTS bookings (
booking_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID NOT NULL,
concert_id UUID NOT NULL,
seat_number VARCHAR(10) NOT NULL,
status VARCHAR(20) DEFAULT 'PENDING',
created_at TIMESTAMP DEFAULT NOW()
);
-- Tabel Kunci Idempotensi (Jantung dari Exactly-Once Semantics)
CREATE TABLE IF NOT EXISTS idempotency_keys (
idempotency_key VARCHAR(255) PRIMARY KEY, -- Diubah ke VARCHAR agar fleksibel menerima UUID string dari client
booking_id UUID, -- Referensi ke booking jika sukses
status VARCHAR(20), -- 'PROCESSING', 'COMPLETED', 'FAILED'
created_at TIMESTAMP DEFAULT NOW()
);
-- Unique constraint untuk mencegah overselling (Race Condition Defense di DB Level)
-- Memastikan kombinasi ConcertID + SeatNumber hanya boleh ada satu
CREATE UNIQUE INDEX IF NOT EXISTS idx_concert_seat ON bookings(concert_id, seat_number);
services:
postgres:
image: postgres:18-alpine
container_name: ticket_db_container
ports:
- "5432:5432"
environment:
- POSTGRES_USER=user
- POSTGRES_PASSWORD=pass
- POSTGRES_DB=ticket_db
volumes:
# Persistensi data agar tidak hilang saat restart
- postgres_data:/var/lib/postgresql/data
# MAGIC HAPPENS HERE:
# Mounting file SQL lokal ke folder initdb container
- ./init-schema.sql:/docker-entrypoint-initdb.d/init-schema.sql
restart: unless-stopped
healthcheck:
test: ["CMD-SHELL", "pg_isready -U user -d ticket_db"]
interval: 5s
timeout: 5s
retries: 5
volumes:
postgres_data:
package ticketmasterlite
type BookingRequest struct {
IdempotencyKey string `json:"idempotency_key"` // Client-generated UUID
UserID string `json:"user_id"`
ConcertID string `json:"concert_id"`
SeatNumber string `json:"seat_number"`
}package main
import (
"context"
"encoding/json"
"log"
"net/http"
"time"
"github.com/go-chi/chi/v5"
ticketmasterlite "github.com/semmidev/belajar-kafka/1-basic/ticket-master-lite"
"github.com/twmb/franz-go/pkg/kgo"
)
// Inisialisasi Kafka Client dengan Pattern 1 & 2
func NewProducer() (*kgo.Client, error) {
return kgo.NewClient(
kgo.SeedBrokers("localhost:9092"),
// Pattern 1: Correctness
kgo.RequiredAcks(kgo.AllISRAcks()),
// Pattern 2: High Throughput Optimization
kgo.ProducerBatchMaxBytes(100*1024), // 100KB Batch
kgo.ProducerLinger(10*time.Millisecond), // Tunggu 10ms
kgo.ProducerBatchCompression(kgo.Lz4Compression()), // LZ4 Compression
)
}
func main() {
kafkaClient, _ := NewProducer()
defer kafkaClient.Close()
r := chi.NewRouter()
r.Post("/book-ticket", func(w http.ResponseWriter, r *http.Request) {
var req ticketmasterlite.BookingRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "Invalid Body", 400)
return
}
// Payload untuk Kafka
payload, _ := json.Marshal(req)
// Produce ke Kafka
// Pattern 1 Layer 2: Partition Key Strategy
// Key = req.ConcertID. Semua booking untuk konser X masuk partisi yang sama.
record := &kgo.Record{
Topic: "ticket-bookings",
Key: []byte(req.ConcertID),
Value: payload,
}
// Asynchronous produce untuk throughput maksimal (handle error di callback/logging)
kafkaClient.Produce(context.Background(), record, func(r *kgo.Record, err error) {
if err != nil {
log.Printf("Gagal kirim ke Kafka: %v", err)
}
})
// Return 202 Accepted (Async processing)
w.WriteHeader(http.StatusAccepted)
w.Write([]byte(`{"status": "queued"}`))
})
http.ListenAndServe(":3000", r)
}
package main
import (
"context"
"encoding/json"
"log"
"time"
"github.com/jackc/pgx/v5/pgxpool"
ticketmasterlite "github.com/semmidev/belajar-kafka/1-basic/ticket-master-lite"
"github.com/twmb/franz-go/pkg/kgo"
)
// Koneksi DB (menggunakan pgxpool)
var db *pgxpool.Pool
func main() {
// Init DB
var err error
db, err = pgxpool.New(context.Background(), "postgres://user:pass@localhost:5432/ticket_db")
if err != nil {
log.Fatal(err)
}
// Init Kafka Consumer
client, err := kgo.NewClient(
kgo.SeedBrokers("localhost:9092"),
kgo.ConsumeTopics("ticket-bookings"),
kgo.ConsumerGroup("booking-workers"),
// Pattern 3: Static Group Membership (Fast Recovery)
// InstanceID harus unik per pod/container (misal: worker-01)
// kgo.InstanceID("worker-01"),
// kgo.SessionTimeout(45*time.Second),
// Pattern 2: Fetch Optimization
kgo.FetchMinBytes(1*1024*1024), // 1MB
kgo.FetchMaxWait(100*time.Millisecond),
// Pattern 3: Manual Commit (Matikan Auto-Commit time-based)
kgo.DisableAutoCommit(),
// GANTI nama group ini, misal tambah suffix v2
// kgo.ConsumerGroup("booking-workers-v2"),
// Pastikan dia start dari message TERBARU, abaikan yang lama
// kgo.ConsumeResetOffset(kgo.NewOffset().AtEnd()),
)
if err != nil {
log.Fatal(err)
}
defer client.Close()
// Processing Loop
for {
// Fetch Batch
fetches := client.PollFetches(context.Background())
if fetches.IsClientClosed() {
break
}
fetches.EachRecord(func(record *kgo.Record) {
// Parse Message
var req ticketmasterlite.BookingRequest // Struct sama dengan Producer
if err := json.Unmarshal(record.Value, &req); err != nil {
log.Printf("❌ JSON Error (Poison Pill): %v. Skipping...", err)
// client.MarkCommitRecords(record) // COMMIT AGAR SKIP
return
}
// Proses Bisnis dengan Transaksi DB (Atomic)
err := processBookingTransactional(req)
if err != nil {
// Log error, mungkin kirim ke Dead Letter Queue (DLQ)
log.Printf("Failed to process %s: %v", req.IdempotencyKey, err)
// JANGAN commit offset jika error bersifat transient (DB down)
// Jika error logic (data invalid), commit saja agar tidak stuck.
} else {
// Pattern 3: Manual Commit setelah sukses DB
client.MarkCommitRecords(record)
}
})
// Kirim sinyal commit ke broker
client.AllowRebalance()
}
}
// Core Logic: Menggabungkan Pattern 1 (Idempotency) & Pattern 3 (Atomic DB)
func processBookingTransactional(req ticketmasterlite.BookingRequest) error {
ctx := context.Background()
// 1. Mulai Transaksi DB
tx, err := db.Begin(ctx)
if err != nil {
return err
}
defer tx.Rollback(ctx) // Rollback jika panic/error sebelum commit
// 2. Cek App-Level Idempotency (Pattern 1 Layer 4)
// "SELECT 1 FROM idempotency_keys WHERE key = $1 FOR UPDATE SKIP LOCKED"
// FOR UPDATE mencegah race condition antar consumer jika ada duplikat pesan Kafka
var exists bool
err = tx.QueryRow(ctx,
"SELECT EXISTS(SELECT 1 FROM idempotency_keys WHERE idempotency_key=$1)",
req.IdempotencyKey).Scan(&exists)
if exists {
log.Printf("Duplicate Event Detected: %s. Skipping.", req.IdempotencyKey)
return nil // Anggap sukses (sudah diproses sebelumnya)
}
// 3. Insert Booking (Bisnis Logic)
var bookingID string
err = tx.QueryRow(ctx,
`INSERT INTO bookings (user_id, concert_id, seat_number, status)
VALUES ($1, $2, $3, 'CONFIRMED') RETURNING booking_id`,
req.UserID, req.ConcertID, req.SeatNumber).Scan(&bookingID)
if err != nil {
// Handle Unique Violation (Overselling/Seat Taken)
// Return nil agar offset di-commit (karena ini valid failure bisnis, bukan system error)
log.Printf("Seat Taken: %v", err)
return nil
}
// 4. Catat Idempotency Key
_, err = tx.Exec(ctx,
`INSERT INTO idempotency_keys (idempotency_key, booking_id, status)
VALUES ($1, $2, 'COMPLETED')`,
req.IdempotencyKey, bookingID)
if err != nil {
return err
}
// 5. Commit Transaksi DB
return tx.Commit(ctx)
}
# Konfigurasi Target
API_URL := http://localhost:3000/book-ticket
CONCURRENCY := 100
REQUESTS := 1000
TIMEOUT := 10
# --- FIX FINAL: HAPUS TANDA KUTIP DI SINI ---
# Biarkan raw string, nanti tanda kutipnya ditambahkan di block 'hey' di bawah
VALID_USER_ID := a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11
VALID_CONCERT_ID := 123e4567-e89b-12d3-a456-426614174000
SEAT_NUMBER := A1
# Generate UUID unik
RANDOM_ID := $(shell uuidgen || echo "550e8400-e29b-41d4-a716-446655440000")
.PHONY: help check-tool load-idempotency load-throughput load-war-ticket
check-tool:
@which hey > /dev/null || (echo "Error: 'hey' is not installed." && exit 1)
help:
@echo "Available commands:"
@grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-20s\033[0m %s\n", $$1, $$2}'
load-idempotency: check-tool ## Uji Idempotensi: 10k request, KEY SAMA.
@echo "🚀 Starting Idempotency Test..."
@echo " Key: $(RANDOM_ID)"
@hey \
-n $(REQUESTS) \
-c $(CONCURRENCY) \
-t $(TIMEOUT) \
-m POST \
-H "Content-Type: application/json" \
-d '{"user_id": "$(VALID_USER_ID)", "concert_id": "$(VALID_CONCERT_ID)", "seat_number": "$(SEAT_NUMBER)", "idempotency_key": "$(RANDOM_ID)"}' \
$(API_URL)
load-throughput: check-tool ## Uji Throughput Producer.
@echo "🚀 Starting Throughput Test..."
@hey \
-n $(REQUESTS) \
-c $(CONCURRENCY) \
-t $(TIMEOUT) \
-m POST \
-H "Content-Type: application/json" \
-d '{"user_id": "$(VALID_USER_ID)", "concert_id": "$(VALID_CONCERT_ID)", "seat_number": "X99", "idempotency_key": "$(RANDOM_ID)"}' \
$(API_URL)
load-war-ticket: check-tool ## Simulasi WAR TIKET.
@echo "🔥 Starting WAR TIKET Simulation..."
@hey \
-z 15s \
-c 200 \
-q 500 \
-t $(TIMEOUT) \
-m POST \
-H "Content-Type: application/json" \
-d '{"user_id": "$(VALID_USER_ID)", "concert_id": "$(VALID_CONCERT_ID)", "seat_number": "VIP-1", "idempotency_key": "$(RANDOM_ID)"}' \
$(API_URL)
import http from 'k6/http';
import { check, sleep } from 'k6';
import { randomItem } from 'https://jslib.k6.io/k6-utils/1.2.0/index.js';
// --- KONFIGURASI SKENARIO ---
export const options = {
stages: [
{ duration: '5s', target: 100 }, // Fase 1: Warming up (100 user siap-siap)
{ duration: '10s', target: 1000 }, // Fase 2: WAR dimulai! (Naik ke 1000 user)
{ duration: '20s', target: 2000 }, // Fase 3: Puncak Kemarahan (2000 user spamming)
{ duration: '5s', target: 0 }, // Fase 4: Sold Out / Bubar
],
// Ambang batas kelulusan tes
thresholds: {
http_req_failed: ['rate<0.01'], // Error harus di bawah 1%
http_req_duration: ['p(95)<200'], // 95% request harus selesai di bawah 200ms
},
};
// --- DATA POOL ---
// Kita asumsikan hanya ada 50 Kursi VIP (A1 - A50)
// Ribuan user akan berebut kursi-kursi ini.
const SEATS = [];
for (let i = 1; i <= 50; i++) {
SEATS.push(`VIP-${i}`);
}
const CONCERT_ID = "123e4567-e89b-12d3-a456-426614174000";
// Helper function untuk generate UUID (RFC4122 version 4)
function uuidv4() {
return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function(c) {
var r = Math.random() * 16 | 0, v = c == 'x' ? r : (r & 0x3 | 0x8);
return v.toString(16);
});
}
// --- FUNGSI VIRTUAL USER (VU) ---
export default function () {
// 1. Setiap iterasi adalah "User Baru" atau "User Mencoba Lagi"
// Kita generate ID unik untuk User dan Transaksi ini.
const userID = uuidv4();
const idempotencyKey = uuidv4(); // Transaksi unik
// 2. User memilih kursi secara acak dari pool yang tersedia
// Ini mensimulasikan user yang klik kursi mana saja yang terlihat kosong
const selectedSeat = randomItem(SEATS);
const payload = JSON.stringify({
user_id: userID,
concert_id: CONCERT_ID,
seat_number: selectedSeat,
idempotency_key: idempotencyKey,
});
const params = {
headers: {
'Content-Type': 'application/json',
},
};
// 3. Eksekusi Request (Klik tombol "Beli")
const res = http.post('http://localhost:3000/book-ticket', payload, params);
// 4. Validasi Response API
// Kita mengharapkan 202 Accepted (Async processing)
check(res, {
'is status 202': (r) => r.status === 202,
});
// 5. Human Behavior / Network Delay
// User tidak mungkin request 1ms sekali. Kita kasih jeda acak 0.1s - 1s
// Ini mensimulasikan "loading" di frontend atau jeda jari user.
sleep(Math.random() * 1);
}
Last updated