Kafka Pattern Untuk Production

Pattern 1: Exactly-Once Semantics (Menjamin Kebenaran Data)

Tujuan utama pola ini adalah mengeliminasi kegagalan kebenaran (correctness failures), seperti pesan ganda (duplikasi) dan race conditions (kondisi balapan).

1.1 Layer 1: Idempotent Producer

Secara default, jika terjadi network timeout setelah pesan terkirim namun sebelum acknowledgment diterima, producer akan mengirim ulang pesan tersebut, sehingga menyebabkan duplikasi.

Solusi: Aktifkan idempotence. Kafka akan menyematkan sequence number pada setiap pesan, sehingga broker bisa mendeteksi dan menolak duplikasi secara otomatis.

Implementasi franz-go:

Library franz-go mengaktifkan idempotence secara default.

package main

import (
	"github.com/twmb/franz-go/pkg/kgo"
)

func NewProductionProducer() (*kgo.Client, error) {
	client, err := kgo.NewClient(
		kgo.SeedBrokers("localhost:9092"),
		// Idempotence is enabled by default.
		// Menjamin message tidak berubah urutan / tidak duplicate, khususnya saat retry

		// Menjamin durabilitas data (menunggu semua replika in-sync)
		kgo.RequiredAcks(kgo.AllISRAcks()),
	)
	return client, err
}

1.2 Layer 2: Partition Key Strategy

Tanpa partition key, pesan akan disebar secara acak (round-robin), menyebabkan hilangnya urutan (ordering). Ini memicu race conditions jika dua event terkait (misal: "Order Created" dan "Payment Success") diproses oleh consumer berbeda secara bersamaan.

Solusi: Gunakan Partition Key yang konsisten (misalnya OrderID). Ini menjamin semua pesan dengan ID yang sama masuk ke partisi yang sama dan diproses berurutan oleh satu consumer.

Implementasi franz-go:

func ProduceOrderEvent(client *kgo.Client, orderID string, payload []byte) {
	record := &kgo.Record{
		Topic: "orders",
		// Key sangat krusial untuk menjamin ordering
		Key:   []byte(orderID), 
		Value: payload,
	}

	// Pesan dengan orderID sama PASTI masuk partisi yang sama
	client.Produce(context.Background(), record, nil)
}

1.3 Layer 3 & 4: Transactional Semantics & App-Level Idempotency

Lapisan infrastruktur (Layer 1 & 2) tidak bisa mencegah duplikasi jika consumer crash setelah memproses pesan tapi sebelum commit offset.

Solusi:

  1. Transactions: Menjamin pembacaan dan penulisan ke Kafka bersifat atomik.

  2. App-Level Idempotency: Consumer harus mengecek apakah pesan sudah pernah diproses (misal cek ke DB) sebelum mengeksekusi logika bisnis.

Implementasi franz-go (Transactional):

func ProcessWithTransaction(client *kgo.Client) {
	// Memulai transaksi Kafka
	if err := client.BeginTransaction(); err != nil {
		log.Fatal(err)
	}

	// ... Konsumsi pesan ...
    // ... Proses Logika Bisnis ...

	// LAYER 4: App-Level Idempotency (Pseudo-code)
	// if database.HasProcessed(msg.Key) { return }
    
	// Commit record yang diproduksi sebagai bagian dari transaksi
	// client.Produce(...) 

	// Commit transaksi (Finalisasi offset dan pesan keluar)
	if err := client.EndTransaction(context.Background(), kgo.TryCommit); err != nil {
		log.Fatal("Transaction failed:", err)
	}
}

Pattern 2: Throughput Optimization (Optimalisasi Performa)

Kafka default dioptimalkan untuk latensi rendah (kirim secepatnya), yang justru membunuh throughput pada beban tinggi karena terlalu banyak network round-trips.

2.1 Layer 1: Producer Batching

Alih-alih mengirim pesan satu per satu, kumpulkan pesan dalam memori (batch) dan kirim sekaligus.

Konfigurasi:

  • batch.size: Ukuran batch dalam bytes.

  • linger.ms: Waktu tunggu maksimal sebelum batch dikirim (meski belum penuh)13131313.

2.2 Layer 2: Compression

Untuk Mengurangi beban bandwidth jaringan. Algoritma lz4 direkomendasikan karena keseimbangan terbaik antara rasio kompresi dan beban CPU.

Implementasi franz-go (Batching & Compression):

func NewHighThroughputProducer() (*kgo.Client, error) {
	client, err := kgo.NewClient(
		kgo.SeedBrokers("localhost:9092"),

		// LAYER 1: Batching Strategy
		// Tingkatkan ukuran batch (misal 100KB)
		kgo.ProducerBatchMaxBytes(100_000), 
		// Tunggu hingga 10ms untuk mengumpulkan pesan
		kgo.ProducerLinger(10 * time.Millisecond),

		// LAYER 2: Compression
		// Menggunakan LZ4 untuk efisiensi CPU/Bandwidth terbaik
		kgo.ProducerBatchCompression(kgo.Lz4Compression()),
	)
	return client, err
}

2.3 Layer 3: Consumer Fetch Optimization

Optimalkan cara consumer mengambil data. Jangan mengambil data byte demi byte. Paksa consumer menunggu sampai data cukup banyak terkumpul di broker sebelum dikirim melalui jaringan.

Implementasi franz-go:

func NewHighThroughputConsumer() (*kgo.Client, error) {
	client, err := kgo.NewClient(
		kgo.SeedBrokers("localhost:9092"),
		kgo.ConsumeTopics("orders"),

		// LAYER 3: Fetch Optimization
		// Server broker akan menunggu sampai ada 1MB data
		// ATAU 100ms berlalu sebelum mengirim respon ke consumer.
		kgo.FetchMinBytes(1 * 1024 * 1024), // 1MB
		kgo.FetchMaxWait(100 * time.Millisecond),
	)
	return client, err
}

Pattern 3: Failure Recovery (Ketahanan & Pemulihan)

Fitur auto-commit pada Kafka bekerja berdasarkan waktu (time-based), bukan penyelesaian tugas. Ini berbahaya karena bisa menyebabkan data hilang (jika crash setelah commit tapi sebelum proses selesai) atau duplikasi.

3.1 Manual Commit Strategy

Matikan auto-commit. Lakukan commit offset hanya setelah logika bisnis sukses dieksekusi sepenuhnya.

Implementasi franz-go:

func ConsumeReliably(client *kgo.Client) {
	// Disable Auto-Commit dilakukan via opsi client (default franz-go sebenarnya manual)
	// Namun pastikan kita tidak menggunakan auto-commit timer.
	
	for {
		fetches := client.PollFetches(context.Background())
		
		fetches.EachRecord(func(r *kgo.Record) {
			// 1. Proses Logika Bisnis
			err := processOrder(r.Value)
			if err != nil {
				// Handle error (retry/DLQ), JANGAN commit jika gagal
				log.Println("Processing failed", err)
				return
			}

			// 2. Manual Commit SETELAH sukses
			// Menandai pesan ini aman untuk dikommit
			client.MarkCommitRecords(r)
		})
		
		// Mengirim request commit ke broker secara asinkron namun aman
		// Ini jauh lebih aman dari auto-commit time-based
		client.AllowRebalance() 
	}
}

3.2 Static Group Membership

Saat consumer restart (misal saat deploy), Kafka biasanya memicu Rebalance (stop-the-world) yang menghentikan seluruh grup consumer selama beberapa detik.

Solusi: Gunakan Static Group Membership. Dengan memberikan ID statis, Kafka akan menoleransi ketidakhadiran consumer sementara (selama session.timeout.ms) tanpa memicu rebalance.

Implementasi franz-go:

func NewResilientConsumer() (*kgo.Client, error) {
	client, err := kgo.NewClient(
		kgo.SeedBrokers("localhost:9092"),
		kgo.ConsumerGroup("order-processor-group"),
		
		// PATTERN 3: Static Membership
		// ID ini harus unik per instance (misal dari env var HOSTNAME/POD_NAME)
		kgo.InstanceID("consumer-pod-01"),
		
		// Toleransi waktu down sebelum rebalance dipicu (misal 45 detik)
		// Memberi waktu consumer untuk restart tanpa menendang member lain
		kgo.SessionTimeout(45 * time.Second),
	)
	return client, err
}

Direkomendasikan untuk menerapkan pola ini secara bertahap:

  1. Correctness (Pattern 1): Wajib. Data yang salah tidak ada gunanya diproses cepat.

  2. Performance (Pattern 2): Terapkan saat volume data mulai meningkat.

  3. Resilience (Pattern 3): Terapkan untuk stabilitas operasional jangka panjang.


Studi Kasus: "TicketMaster Lite" – High-Concurrency Concert Booking System

Skenario: Anda sedang membangun backend untuk penjualan tiket konser artis papan atas (misal: Coldplay atau Taylor Swift).

  • Masalah: Ribuan pengguna menekan tombol "Beli" secara bersamaan ("War Tiket").

  • Risiko:

    1. Overselling: Menjual tiket lebih dari kapasitas kursi (Race Condition).

    2. Double Charge: Pengguna tidak sengaja ter-charge 2x karena menekan tombol berulang saat jaringan lambat (Idempotency failure).

    3. System Crash: Server down karena beban mendadak dan pemulihan yang lama (Resilience failure).

Tech Stack:

  • Language: Golang 1.23+

  • Web Framework: Go Chi (Lightweight & Fast)

  • Database: PostgreSQL (dengan pgx driver)

  • Message Broker: Kafka (dengan github.com/twmb/franz-go)

init-schema.sql
-- Mengaktifkan ekstensi pgcrypto untuk generate UUID (jika pakai Postgres versi lama, tapi aman untuk v13+)
CREATE EXTENSION IF NOT EXISTS "pgcrypto";

-- Tabel Utama: Tiket yang terjual
CREATE TABLE IF NOT EXISTS bookings (
    booking_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    user_id UUID NOT NULL,
    concert_id UUID NOT NULL,
    seat_number VARCHAR(10) NOT NULL,
    status VARCHAR(20) DEFAULT 'PENDING',
    created_at TIMESTAMP DEFAULT NOW()
);

-- Tabel Kunci Idempotensi (Jantung dari Exactly-Once Semantics)
CREATE TABLE IF NOT EXISTS idempotency_keys (
    idempotency_key VARCHAR(255) PRIMARY KEY, -- Diubah ke VARCHAR agar fleksibel menerima UUID string dari client
    booking_id UUID, -- Referensi ke booking jika sukses
    status VARCHAR(20), -- 'PROCESSING', 'COMPLETED', 'FAILED'
    created_at TIMESTAMP DEFAULT NOW()
);

-- Unique constraint untuk mencegah overselling (Race Condition Defense di DB Level)
-- Memastikan kombinasi ConcertID + SeatNumber hanya boleh ada satu
CREATE UNIQUE INDEX IF NOT EXISTS idx_concert_seat ON bookings(concert_id, seat_number);
compose.yaml
services:
    postgres:
        image: postgres:18-alpine
        container_name: ticket_db_container
        ports:
            - "5432:5432"
        environment:
            - POSTGRES_USER=user
            - POSTGRES_PASSWORD=pass
            - POSTGRES_DB=ticket_db
        volumes:
            # Persistensi data agar tidak hilang saat restart
            - postgres_data:/var/lib/postgresql/data

            # MAGIC HAPPENS HERE:
            # Mounting file SQL lokal ke folder initdb container
            - ./init-schema.sql:/docker-entrypoint-initdb.d/init-schema.sql
        restart: unless-stopped
        healthcheck:
            test: ["CMD-SHELL", "pg_isready -U user -d ticket_db"]
            interval: 5s
            timeout: 5s
            retries: 5

volumes:
    postgres_data:
ticketmasterlite.go
package ticketmasterlite

type BookingRequest struct {
	IdempotencyKey string `json:"idempotency_key"` // Client-generated UUID
	UserID         string `json:"user_id"`
	ConcertID      string `json:"concert_id"`
	SeatNumber     string `json:"seat_number"`
}
producer.go
package main

import (
	"context"
	"encoding/json"
	"log"
	"net/http"
	"time"

	"github.com/go-chi/chi/v5"
	ticketmasterlite "github.com/semmidev/belajar-kafka/1-basic/ticket-master-lite"
	"github.com/twmb/franz-go/pkg/kgo"
)

// Inisialisasi Kafka Client dengan Pattern 1 & 2
func NewProducer() (*kgo.Client, error) {
	return kgo.NewClient(
		kgo.SeedBrokers("localhost:9092"),
		// Pattern 1: Correctness
		kgo.RequiredAcks(kgo.AllISRAcks()),

		// Pattern 2: High Throughput Optimization
		kgo.ProducerBatchMaxBytes(100*1024),                // 100KB Batch
		kgo.ProducerLinger(10*time.Millisecond),            // Tunggu 10ms
		kgo.ProducerBatchCompression(kgo.Lz4Compression()), // LZ4 Compression
	)
}

func main() {
	kafkaClient, _ := NewProducer()
	defer kafkaClient.Close()

	r := chi.NewRouter()
	r.Post("/book-ticket", func(w http.ResponseWriter, r *http.Request) {
		var req ticketmasterlite.BookingRequest
		if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
			http.Error(w, "Invalid Body", 400)
			return
		}

		// Payload untuk Kafka
		payload, _ := json.Marshal(req)

		// Produce ke Kafka
		// Pattern 1 Layer 2: Partition Key Strategy
		// Key = req.ConcertID. Semua booking untuk konser X masuk partisi yang sama.
		record := &kgo.Record{
			Topic: "ticket-bookings",
			Key:   []byte(req.ConcertID),
			Value: payload,
		}

		// Asynchronous produce untuk throughput maksimal (handle error di callback/logging)
		kafkaClient.Produce(context.Background(), record, func(r *kgo.Record, err error) {
			if err != nil {
				log.Printf("Gagal kirim ke Kafka: %v", err)
			}
		})

		// Return 202 Accepted (Async processing)
		w.WriteHeader(http.StatusAccepted)
		w.Write([]byte(`{"status": "queued"}`))
	})

	http.ListenAndServe(":3000", r)
}
consumer.go
package main

import (
	"context"
	"encoding/json"
	"log"
	"time"

	"github.com/jackc/pgx/v5/pgxpool"
	ticketmasterlite "github.com/semmidev/belajar-kafka/1-basic/ticket-master-lite"
	"github.com/twmb/franz-go/pkg/kgo"
)

// Koneksi DB (menggunakan pgxpool)
var db *pgxpool.Pool

func main() {
	// Init DB
	var err error
	db, err = pgxpool.New(context.Background(), "postgres://user:pass@localhost:5432/ticket_db")
	if err != nil {
		log.Fatal(err)
	}

	// Init Kafka Consumer
	client, err := kgo.NewClient(
		kgo.SeedBrokers("localhost:9092"),
		kgo.ConsumeTopics("ticket-bookings"),
		kgo.ConsumerGroup("booking-workers"),

		// Pattern 3: Static Group Membership (Fast Recovery)
		// InstanceID harus unik per pod/container (misal: worker-01)
		// kgo.InstanceID("worker-01"),
		// kgo.SessionTimeout(45*time.Second),

		// Pattern 2: Fetch Optimization
		kgo.FetchMinBytes(1*1024*1024), // 1MB
		kgo.FetchMaxWait(100*time.Millisecond),

		// Pattern 3: Manual Commit (Matikan Auto-Commit time-based)
		kgo.DisableAutoCommit(),

		// GANTI nama group ini, misal tambah suffix v2
		// kgo.ConsumerGroup("booking-workers-v2"),
		// Pastikan dia start dari message TERBARU, abaikan yang lama
		// kgo.ConsumeResetOffset(kgo.NewOffset().AtEnd()),
	)
	if err != nil {
		log.Fatal(err)
	}
	defer client.Close()

	// Processing Loop
	for {
		// Fetch Batch
		fetches := client.PollFetches(context.Background())
		if fetches.IsClientClosed() {
			break
		}

		fetches.EachRecord(func(record *kgo.Record) {
			// Parse Message
			var req ticketmasterlite.BookingRequest // Struct sama dengan Producer
			if err := json.Unmarshal(record.Value, &req); err != nil {
				log.Printf("❌ JSON Error (Poison Pill): %v. Skipping...", err)
				// client.MarkCommitRecords(record) // COMMIT AGAR SKIP
				return
			}

			// Proses Bisnis dengan Transaksi DB (Atomic)
			err := processBookingTransactional(req)

			if err != nil {
				// Log error, mungkin kirim ke Dead Letter Queue (DLQ)
				log.Printf("Failed to process %s: %v", req.IdempotencyKey, err)
				// JANGAN commit offset jika error bersifat transient (DB down)
				// Jika error logic (data invalid), commit saja agar tidak stuck.
			} else {
				// Pattern 3: Manual Commit setelah sukses DB
				client.MarkCommitRecords(record)
			}
		})

		// Kirim sinyal commit ke broker
		client.AllowRebalance()
	}
}

// Core Logic: Menggabungkan Pattern 1 (Idempotency) & Pattern 3 (Atomic DB)
func processBookingTransactional(req ticketmasterlite.BookingRequest) error {
	ctx := context.Background()

	// 1. Mulai Transaksi DB
	tx, err := db.Begin(ctx)
	if err != nil {
		return err
	}
	defer tx.Rollback(ctx) // Rollback jika panic/error sebelum commit

	// 2. Cek App-Level Idempotency (Pattern 1 Layer 4)
	// "SELECT 1 FROM idempotency_keys WHERE key = $1 FOR UPDATE SKIP LOCKED"
	// FOR UPDATE mencegah race condition antar consumer jika ada duplikat pesan Kafka
	var exists bool
	err = tx.QueryRow(ctx,
		"SELECT EXISTS(SELECT 1 FROM idempotency_keys WHERE idempotency_key=$1)",
		req.IdempotencyKey).Scan(&exists)

	if exists {
		log.Printf("Duplicate Event Detected: %s. Skipping.", req.IdempotencyKey)
		return nil // Anggap sukses (sudah diproses sebelumnya)
	}

	// 3. Insert Booking (Bisnis Logic)
	var bookingID string
	err = tx.QueryRow(ctx,
		`INSERT INTO bookings (user_id, concert_id, seat_number, status)
		 VALUES ($1, $2, $3, 'CONFIRMED') RETURNING booking_id`,
		req.UserID, req.ConcertID, req.SeatNumber).Scan(&bookingID)

	if err != nil {
		// Handle Unique Violation (Overselling/Seat Taken)
		// Return nil agar offset di-commit (karena ini valid failure bisnis, bukan system error)
		log.Printf("Seat Taken: %v", err)
		return nil
	}

	// 4. Catat Idempotency Key
	_, err = tx.Exec(ctx,
		`INSERT INTO idempotency_keys (idempotency_key, booking_id, status)
		 VALUES ($1, $2, 'COMPLETED')`,
		req.IdempotencyKey, bookingID)
	if err != nil {
		return err
	}

	// 5. Commit Transaksi DB
	return tx.Commit(ctx)
}
Makefile
# Konfigurasi Target
API_URL := http://localhost:3000/book-ticket
CONCURRENCY := 100
REQUESTS := 1000
TIMEOUT := 10

# --- FIX FINAL: HAPUS TANDA KUTIP DI SINI ---
# Biarkan raw string, nanti tanda kutipnya ditambahkan di block 'hey' di bawah
VALID_USER_ID := a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11
VALID_CONCERT_ID := 123e4567-e89b-12d3-a456-426614174000
SEAT_NUMBER := A1

# Generate UUID unik
RANDOM_ID := $(shell uuidgen || echo "550e8400-e29b-41d4-a716-446655440000")

.PHONY: help check-tool load-idempotency load-throughput load-war-ticket

check-tool:
	@which hey > /dev/null || (echo "Error: 'hey' is not installed." && exit 1)

help:
	@echo "Available commands:"
	@grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-20s\033[0m %s\n", $$1, $$2}'

load-idempotency: check-tool ## Uji Idempotensi: 10k request, KEY SAMA.
	@echo "🚀 Starting Idempotency Test..."
	@echo "   Key: $(RANDOM_ID)"
	@hey \
		-n $(REQUESTS) \
		-c $(CONCURRENCY) \
		-t $(TIMEOUT) \
		-m POST \
		-H "Content-Type: application/json" \
		-d '{"user_id": "$(VALID_USER_ID)", "concert_id": "$(VALID_CONCERT_ID)", "seat_number": "$(SEAT_NUMBER)", "idempotency_key": "$(RANDOM_ID)"}' \
		$(API_URL)

load-throughput: check-tool ## Uji Throughput Producer.
	@echo "🚀 Starting Throughput Test..."
	@hey \
		-n $(REQUESTS) \
		-c $(CONCURRENCY) \
		-t $(TIMEOUT) \
		-m POST \
		-H "Content-Type: application/json" \
		-d '{"user_id": "$(VALID_USER_ID)", "concert_id": "$(VALID_CONCERT_ID)", "seat_number": "X99", "idempotency_key": "$(RANDOM_ID)"}' \
		$(API_URL)

load-war-ticket: check-tool ## Simulasi WAR TIKET.
	@echo "🔥 Starting WAR TIKET Simulation..."
	@hey \
		-z 15s \
		-c 200 \
		-q 500 \
		-t $(TIMEOUT) \
		-m POST \
		-H "Content-Type: application/json" \
		-d '{"user_id": "$(VALID_USER_ID)", "concert_id": "$(VALID_CONCERT_ID)", "seat_number": "VIP-1", "idempotency_key": "$(RANDOM_ID)"}' \
		$(API_URL)
war-ticker.js
import http from 'k6/http';
import { check, sleep } from 'k6';
import { randomItem } from 'https://jslib.k6.io/k6-utils/1.2.0/index.js';

// --- KONFIGURASI SKENARIO ---
export const options = {
  stages: [
    { duration: '5s', target: 100 },  // Fase 1: Warming up (100 user siap-siap)
    { duration: '10s', target: 1000 }, // Fase 2: WAR dimulai! (Naik ke 1000 user)
    { duration: '20s', target: 2000 }, // Fase 3: Puncak Kemarahan (2000 user spamming)
    { duration: '5s', target: 0 },    // Fase 4: Sold Out / Bubar
  ],
  // Ambang batas kelulusan tes
  thresholds: {
    http_req_failed: ['rate<0.01'], // Error harus di bawah 1%
    http_req_duration: ['p(95)<200'], // 95% request harus selesai di bawah 200ms
  },
};

// --- DATA POOL ---
// Kita asumsikan hanya ada 50 Kursi VIP (A1 - A50)
// Ribuan user akan berebut kursi-kursi ini.
const SEATS = [];
for (let i = 1; i <= 50; i++) {
  SEATS.push(`VIP-${i}`);
}

const CONCERT_ID = "123e4567-e89b-12d3-a456-426614174000";

// Helper function untuk generate UUID (RFC4122 version 4)
function uuidv4() {
  return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function(c) {
    var r = Math.random() * 16 | 0, v = c == 'x' ? r : (r & 0x3 | 0x8);
    return v.toString(16);
  });
}

// --- FUNGSI VIRTUAL USER (VU) ---
export default function () {
  // 1. Setiap iterasi adalah "User Baru" atau "User Mencoba Lagi"
  // Kita generate ID unik untuk User dan Transaksi ini.
  const userID = uuidv4();
  const idempotencyKey = uuidv4(); // Transaksi unik

  // 2. User memilih kursi secara acak dari pool yang tersedia
  // Ini mensimulasikan user yang klik kursi mana saja yang terlihat kosong
  const selectedSeat = randomItem(SEATS);

  const payload = JSON.stringify({
    user_id: userID,
    concert_id: CONCERT_ID,
    seat_number: selectedSeat,
    idempotency_key: idempotencyKey,
  });

  const params = {
    headers: {
      'Content-Type': 'application/json',
    },
  };

  // 3. Eksekusi Request (Klik tombol "Beli")
  const res = http.post('http://localhost:3000/book-ticket', payload, params);

  // 4. Validasi Response API
  // Kita mengharapkan 202 Accepted (Async processing)
  check(res, {
    'is status 202': (r) => r.status === 202,
  });

  // 5. Human Behavior / Network Delay
  // User tidak mungkin request 1ms sekali. Kita kasih jeda acak 0.1s - 1s
  // Ini mensimulasikan "loading" di frontend atau jeda jari user.
  sleep(Math.random() * 1);
}

Last updated