Outbox Pattern

Konsep Lengkap Outbox Pattern

1. Masalah: Jaminan Konsistensi pada Sistem Terdistribusi

Bayangkan sebuah skenario umum dalam arsitektur microservices:

Sebuah layanan UserService bertanggung jawab untuk mendaftarkan pengguna baru. Setelah pengguna berhasil disimpan ke dalam database, layanan ini harus mempublikasikan sebuah event (pesan) bernama UserCreated ke sebuah message broker (seperti RabbitMQ, Kafka, atau Google Pub/Sub). Layanan lain, misalnya NotificationService, akan mendengarkan event ini dan mengirim email selamat datang.

Prosesnya terlihat sederhana:

  1. Simpan data pengguna ke database.

  2. Kirim pesan UserCreated ke message broker.

Masalahnya terletak pada kegagalan parsial. Apa yang terjadi jika:

  • Kasus A: Operasi simpan ke database berhasil, tetapi pengiriman pesan gagal (misalnya, message broker sedang down atau ada masalah jaringan)?

    • Hasil: Sistem menjadi tidak konsisten. Ada pengguna baru di database, tetapi tidak ada layanan lain yang tahu tentang itu. Email selamat datang tidak akan pernah terkirim.

  • Kasus B: Pengiriman pesan berhasil, tetapi transaksi database gagal di saat-saat terakhir (commit gagal atau terjadi rollback)?

    • Hasil: Sistem juga tidak konsisten. NotificationService menerima pesan UserCreated dan mengirim email ke pengguna yang sebenarnya tidak ada di dalam database.

Masalah ini dikenal sebagai dual-write problem. Kita mencoba menulis ke dua sistem eksternal yang berbeda (database dan message broker) dan tidak ada cara mudah untuk menjamin kedua operasi ini terjadi secara atomik (keduanya berhasil atau keduanya gagal). Menggunakan distributed transaction (2PC - Two-Phase Commit) seringkali terlalu kompleks, lambat, dan tidak didukung oleh semua teknologi.

2. Solusi: Outbox Pattern

Outbox Pattern memecahkan masalah ini dengan cara yang cerdas dan andal. Idenya adalah mengubah dua operasi tulis yang terpisah menjadi satu operasi tulis atomik di dalam database lokal.

Bagaimana Cara Kerjanya?

  1. Satu Transaksi Atomik:

    • Ketika sebuah permintaan untuk mendaftarkan pengguna baru datang, layanan UserService memulai sebuah transaksi database.

    • Di dalam transaksi yang sama, layanan melakukan dua hal:

      1. Menyisipkan data pengguna baru ke dalam tabel users.

      2. Menyisipkan data event (pesan yang akan dikirim) ke dalam tabel khusus bernama outbox.

    • Setelah itu, transaksi di-commit.

    Karena kedua operasi INSERT (ke users dan outbox) berada dalam satu transaksi database, maka properti ACID (Atomicity, Consistency, Isolation, Durability) dari database menjamin bahwa keduanya akan berhasil atau keduanya akan gagal bersamaan. Tidak akan ada lagi status inkonsisten.

  2. Proses Asinkron (Message Relay):

    • Ada sebuah proses terpisah yang berjalan di latar belakang. Proses ini kita sebut Message Relay atau Outbox Processor.

    • Tugas Message Relay adalah:

      1. Secara periodik (misalnya setiap beberapa detik), ia memeriksa tabel outbox untuk mencari event yang belum diproses.

      2. Untuk setiap event yang ditemukan, ia akan mencoba mempublikasikannya ke message broker.

      3. Jika publikasi berhasil, Message Relay akan menandai event di tabel outbox sebagai "telah diproses" (atau menghapusnya). Ini untuk memastikan pesan tidak dikirim berulang kali.

      4. Jika publikasi gagal, event tersebut tetap ada di tabel outbox dan akan dicoba lagi pada siklus berikutnya.

3. Keuntungan Outbox Pattern

  • Reliabilitas dan Konsistensi: Menjamin bahwa sebuah event akan terkirim jika dan hanya jika transaksi bisnis utama berhasil. Ini adalah jaminan pengiriman "setidaknya sekali" (at-least-once delivery).

  • Kinerja: Transaksi bisnis utama menjadi lebih cepat karena tidak perlu menunggu respons dari message broker yang bisa lambat karena latensi jaringan. Penulisan ke tabel outbox sangat cepat karena berada di database yang sama.

  • Pemisahan Tanggung Jawab (Decoupling): Layanan bisnis utama (misalnya UserService) tidak perlu tahu detail tentang message broker. Tanggung jawabnya hanya menulis ke tabel outbox. Logika pengiriman, retry, dll., diisolasi di dalam Message Relay.

4. Opsi Implementasi Message Relay

  • Polling Publisher: Ini adalah pendekatan yang paling umum dan akan kita implementasikan. Sebuah proses melakukan polling (query SELECT) ke tabel outbox secara berkala.

  • Transaction Log Tailing (CDC - Change Data Capture): Pendekatan yang lebih canggih dan efisien. Menggunakan alat seperti Debezium yang "membaca" log transaksi database (WAL - Write-Ahead Log). Ketika ada INSERT baru ke tabel outbox, CDC akan menangkapnya secara real-time dan mempublikasikannya. Ini mengurangi beban pada database karena tidak ada polling dan latensinya sangat rendah.


Kode Produksi-Ready Menggunakan Golang

Berikut adalah contoh implementasi Outbox Pattern yang lengkap dan siap produksi dengan Go. Kode ini mencakup:

  • Struktur proyek yang jelas.

  • Penggunaan Dependency Injection untuk kemudahan pengujian.

  • Manajemen graceful shutdown.

  • Logika retry implisit pada poller.

  • Pemisahan antara logika bisnis, repositori, dan proses latar belakang.

Struktur Proyek

outbox-pattern-go/
├── go.mod
├── go.sum
├── main.go
├── schema.sql                # Skema database
├── config/                   # Konfigurasi (opsional)
├── domain/                   # Model data utama (User, OutboxEvent)
│   └── model.go
├── repository/               # Akses data (database)
│   └── user_repo.go
├── service/                  # Logika bisnis
│   └── user_service.go
├── outbox/                   # Outbox Processor (Message Relay)
│   └── processor.go
└── messaging/                # Abstraksi untuk message broker
    ├── publisher.go
    └── log_publisher.go      # Implementasi mock untuk logging

Langkah 1: Skema Database (schema.sql)

Kita butuh dua tabel: users untuk data bisnis dan outbox_events untuk pola outbox.

-- schema.sql

CREATE TABLE IF NOT EXISTS users (
    id UUID PRIMARY KEY,
    email VARCHAR(255) UNIQUE NOT NULL,
    created_at TIMESTAMPTZ NOT NULL
);

CREATE TABLE IF NOT EXISTS outbox_events (
    id BIGSERIAL PRIMARY KEY,
    aggregate_id UUID NOT NULL,          -- ID dari entitas terkait (misal: user_id)
    topic VARCHAR(255) NOT NULL,         -- Topik/channel di message broker
    payload JSONB NOT NULL,              -- Isi pesan dalam format JSON
    created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    processed_at TIMESTAMPTZ NULL        -- Kapan event ini diproses. NULL berarti belum.
);

-- Indeks untuk membantu poller mencari event yang belum diproses dengan cepat.
CREATE INDEX IF NOT EXISTS idx_unprocessed_events ON outbox_events (processed_at) WHERE processed_at IS NULL;

Langkah 2: Domain Model (domain/model.go)

// domain/model.go
package domain

import (
	"encoding/json"
	"time"

	"github.com/google/uuid"
)

// User merepresentasikan model pengguna
type User struct {
	ID        uuid.UUID `db:"id"`
	Email     string    `db:"email"`
	CreatedAt time.Time `db:"created_at"`
}

// OutboxEvent merepresentasikan event yang akan dikirim
type OutboxEvent struct {
	ID           int64           `db:"id"`
	AggregateID  uuid.UUID       `db:"aggregate_id"`
	Topic        string          `db:"topic"`
	Payload      json.RawMessage `db:"payload"`
	CreatedAt    time.Time       `db:"created_at"`
	ProcessedAt  *time.Time      `db:"processed_at"`
}

Langkah 3: Abstraksi Message Publisher (messaging/)

Ini penting agar kita tidak terikat pada satu jenis message broker.

// messaging/publisher.go
package messaging

import "context"

// EventPublisher adalah interface untuk mempublikasikan event.
type EventPublisher interface {
	Publish(ctx context.Context, topic string, payload []byte) error
}

Implementasi mock yang hanya mencetak ke log. Di dunia nyata, ini akan diganti dengan klien Kafka atau RabbitMQ.

// messaging/log_publisher.go
package messaging

import (
	"context"
	"log"
)

type LogPublisher struct{}

func NewLogPublisher() *LogPublisher {
	return &LogPublisher{}
}

func (p *LogPublisher) Publish(ctx context.Context, topic string, payload []byte) error {
	log.Printf("PUBLISHING event to topic '%s': %s", topic, string(payload))
	// Di dunia nyata, di sini ada logika untuk mengirim ke Kafka/RabbitMQ.
	// Jika gagal, return error agar outbox processor mencoba lagi.
	return nil
}

Langkah 4: Repository (repository/user_repo.go)

Lapisan ini bertanggung jawab untuk berinteraksi dengan database. Perhatikan bagaimana metode-metodenya menerima sqlx.ExtContext yang bisa berupa *sqlx.DB atau *sqlx.Tx, memungkinkan mereka untuk berpartisipasi dalam transaksi.

// repository/user_repo.go
package repository

import (
	"context"
	"outbox-pattern-go/domain"

	"github.com/jmoiron/sqlx"
)

// DBTX adalah interface yang dipenuhi oleh *sqlx.DB dan *sqlx.Tx
type DBTX interface {
	sqlx.ExtContext
	sqlx.PreparerContext
}

type UserRepository struct{}

func NewUserRepository() *UserRepository {
	return &UserRepository{}
}

// CreateUser menyimpan user baru ke database
func (r *UserRepository) CreateUser(ctx context.Context, db DBTX, user *domain.User) error {
	query := `INSERT INTO users (id, email, created_at) VALUES ($1, $2, $3)`
	_, err := db.ExecContext(ctx, query, user.ID, user.Email, user.CreatedAt)
	return err
}

// CreateOutboxEvent menyimpan event ke tabel outbox
func (r *UserRepository) CreateOutboxEvent(ctx context.Context, db DBTX, event *domain.OutboxEvent) error {
	query := `INSERT INTO outbox_events (aggregate_id, topic, payload) VALUES ($1, $2, $3)`
	_, err := db.ExecContext(ctx, query, event.AggregateID, event.Topic, event.Payload)
	return err
}

Langkah 5: Service Layer (service/user_service.go)

Ini adalah inti dari logika atomik kita.

// service/user_service.go
package service

import (
	"context"
	"encoding/json"
	"log"
	"outbox-pattern-go/domain"
	"outbox-pattern-go/repository"
	"time"

	"github.com/google/uuid"
	"github.com/jmoiron/sqlx"
)

type UserService struct {
	db   *sqlx.DB
	repo *repository.UserRepository
}

func NewUserService(db *sqlx.DB, repo *repository.UserRepository) *UserService {
	return &UserService{db: db, repo: repo}
}

// RegisterUser mendaftarkan pengguna dan membuat event dalam satu transaksi.
func (s *UserService) RegisterUser(ctx context.Context, email string) (*domain.User, error) {
	log.Printf("Registering user with email: %s", email)

	// 1. Buat data user dan event
	user := &domain.User{
		ID:        uuid.New(),
		Email:     email,
		CreatedAt: time.Now(),
	}

	payload, err := json.Marshal(user)
	if err != nil {
		return nil, err
	}

	event := &domain.OutboxEvent{
		AggregateID: user.ID,
		Topic:       "user.created",
		Payload:     payload,
	}

	// 2. Mulai transaksi database
	tx, err := s.db.BeginTxx(ctx, nil)
	if err != nil {
		return nil, err
	}
	// Pastikan transaksi di-rollback jika ada error
	defer tx.Rollback()

	// 3. Simpan user (di dalam transaksi)
	if err := s.repo.CreateUser(ctx, tx, user); err != nil {
		return nil, err
	}

	// 4. Simpan event ke outbox (di dalam transaksi yang sama)
	if err := s.repo.CreateOutboxEvent(ctx, tx, event); err != nil {
		return nil, err
	}

	// 5. Commit transaksi. Ini adalah titik atomik.
	// Jika ini berhasil, user dan event DIJAMIN tersimpan.
	if err := tx.Commit(); err != nil {
		return nil, err
	}

	log.Printf("User %s and outbox event created successfully in a single transaction", user.ID)

	return user, nil
}

Langkah 6: Outbox Processor (outbox/processor.go)

Ini adalah Message Relay yang berjalan di latar belakang.

// outbox/processor.go
package outbox

import (
	"context"
	"log"
	"outbox-pattern-go/domain"
	"outbox-pattern-go/messaging"
	"time"

	"github.com/jmoiron/sqlx"
)

type Processor struct {
	db        *sqlx.DB
	publisher messaging.EventPublisher
	interval  time.Duration
}

func NewProcessor(db *sqlx.DB, publisher messaging.EventPublisher, interval time.Duration) *Processor {
	return &Processor{
		db:        db,
		publisher: publisher,
		interval:  interval,
	}
}

// Start memulai proses polling outbox
func (p *Processor) Start(ctx context.Context) {
	log.Println("Starting outbox processor...")
	ticker := time.NewTicker(p.interval)
	defer ticker.Stop()

	for {
		select {
		case <-ticker.C:
			if err := p.processEvents(ctx); err != nil {
				log.Printf("Error processing outbox events: %v", err)
			}
		case <-ctx.Done():
			log.Println("Stopping outbox processor...")
			return
		}
	}
}

func (p *Processor) processEvents(ctx context.Context) error {
	events, err := p.fetchUnprocessedEvents(ctx)
	if err != nil {
		return err
	}

	if len(events) > 0 {
		log.Printf("Found %d unprocessed events", len(events))
	}

	for _, event := range events {
		// 1. Publikasikan event
		err := p.publisher.Publish(ctx, event.Topic, event.Payload)
		if err != nil {
			// Jika gagal, kita tidak update `processed_at`.
			// Event akan diambil lagi di iterasi berikutnya.
			log.Printf("Failed to publish event %d: %v. Will retry later.", event.ID, err)
			continue // Lanjut ke event berikutnya
		}

		// 2. Tandai sebagai sudah diproses
		if err := p.markEventAsProcessed(ctx, event.ID); err != nil {
			// Kasus langka: event terkirim tapi gagal update DB.
			// Ini bisa menyebabkan event terkirim ulang (at-least-once).
			// Sistem consumer harus idempotensi.
			log.Printf("CRITICAL: Failed to mark event %d as processed after publishing: %v", event.ID, err)
		} else {
			log.Printf("Successfully processed and published event %d", event.ID)
		}
	}

	return nil
}

func (p *Processor) fetchUnprocessedEvents(ctx context.Context) ([]domain.OutboxEvent, error) {
	var events []domain.OutboxEvent
	// LIMIT ditambahkan untuk mencegah memuat terlalu banyak event sekaligus
	query := `SELECT id, aggregate_id, topic, payload FROM outbox_events WHERE processed_at IS NULL ORDER BY created_at ASC LIMIT 10`
	err := p.db.SelectContext(ctx, &events, query)
	return events, err
}

func (p *Processor) markEventAsProcessed(ctx context.Context, eventID int64) error {
	query := `UPDATE outbox_events SET processed_at = NOW() WHERE id = $1`
	_, err := p.db.ExecContext(ctx, query, eventID)
	return err
}

Langkah 7: Merangkai Semuanya (main.go)

// main.go
package main

import (
	"context"
	"log"
	"net/http"
	"os"
	"os/signal"
	"syscall"
	"time"

	"outbox-pattern-go/messaging"
	"outbox-pattern-go/outbox"
	"outbox-pattern-go/repository"
	"outbox-pattern-go/service"

	"github.com/jmoiron/sqlx"
	_ "github.com/lib/pq"
)

func main() {
	// --- Konfigurasi ---
	// Di aplikasi nyata, ini diambil dari env var atau file config
	dbConnectionString := "user=postgres password=password dbname=outbox_demo sslmode=disable"
	pollingInterval := 5 * time.Second

	// --- Setup Database ---
	db, err := sqlx.Connect("postgres", dbConnectionString)
	if err != nil {
		log.Fatalf("Failed to connect to database: %v", err)
	}
	defer db.Close()
	log.Println("Database connection successful")

	// --- Inisialisasi Komponen (Dependency Injection) ---
	userRepo := repository.NewUserRepository()
	userService := service.NewUserService(db, userRepo)
	logPublisher := messaging.NewLogPublisher() // Mock publisher
	outboxProcessor := outbox.NewProcessor(db, logPublisher, pollingInterval)

	// --- Menjalankan Outbox Processor di Goroutine terpisah ---
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	go outboxProcessor.Start(ctx)

	// --- Setup HTTP Server untuk menerima request (contoh) ---
	http.HandleFunc("/register", func(w http.ResponseWriter, r *http.Request) {
		email := r.URL.Query().Get("email")
		if email == "" {
			http.Error(w, "Email query parameter is required", http.StatusBadRequest)
			return
		}
		user, err := userService.RegisterUser(r.Context(), email)
		if err != nil {
			log.Printf("ERROR: Failed to register user: %v", err)
			http.Error(w, "Failed to register user", http.StatusInternalServerError)
			return
		}
		w.WriteHeader(http.StatusCreated)
		w.Write([]byte("User registered successfully. Outbox event created.\n"))
		log.Printf("API: Handled registration for user %s", user.ID)
	})

	server := &http.Server{Addr: ":8080"}

	// --- Graceful Shutdown ---
	go func() {
		log.Println("Starting HTTP server on :8080")
		if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
			log.Fatalf("Could not listen on :8080: %v\n", err)
		}
	}()

	stopChan := make(chan os.Signal, 1)
	signal.Notify(stopChan, syscall.SIGINT, syscall.SIGTERM)

	<-stopChan // Tunggu sinyal shutdown

	log.Println("Shutting down server...")

	// Beri waktu untuk request yang sedang berjalan
	shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer shutdownCancel()

	if err := server.Shutdown(shutdownCtx); err != nil {
		log.Fatalf("Server shutdown failed: %+v", err)
	}

	// Hentikan outbox processor
	cancel()
	// Beri waktu sedikit agar processor bisa berhenti dengan rapi
	time.Sleep(1 * time.Second)

	log.Println("Server gracefully stopped")
}

Cara Menjalankan

  1. Siapkan Dependensi:

    Bash

    go mod init outbox-pattern-go
    go get github.com/jmoiron/sqlx
    go get github.com/lib/pq
    go get github.com/google/uuid
  2. Buat Database: Pastikan Anda memiliki PostgreSQL yang berjalan dan buat database bernama outbox_demo.

  3. Jalankan Aplikasi:

    Bash

    go run .
  4. Uji Coba:

    • Buka terminal baru.

    • Kirim request untuk mendaftarkan pengguna:

      Bash

      curl -X POST "http://localhost:8080/register?email=test1@example.com"
    • Lihat Log Server: Anda akan melihat log yang menunjukkan:

      1. User dan outbox event dibuat dalam satu transaksi.

      2. Outbox processor menemukan event baru.

      3. Event dipublikasikan (ke log, dalam kasus ini).

      4. Event ditandai sebagai sudah diproses.

    • Periksa Database: Anda bisa memeriksa tabel users dan outbox_events untuk melihat datanya. Perhatikan bahwa kolom processed_at di tabel outbox_events akan terisi.

Tentu. Mari kita lanjutkan pembahasan dengan mengimplementasikan Outbox Pattern menggunakan pendekatan yang lebih modern dan efisien: Change Data Capture (CDC) dengan Debezium.

Ini adalah pendekatan yang sangat direkomendasikan untuk lingkungan produksi karena kinerjanya yang tinggi dan beban yang minimal pada database.


Konsep: Outbox Pattern dengan CDC dan Debezium

Berbeda dengan pendekatan polling di mana kita secara aktif menanyakan database ("apakah ada event baru?"), pendekatan CDC bekerja secara pasif dengan "mendengarkan" log transaksi database.

  1. Log Transaksi (Write-Ahead Log - WAL): Setiap database modern yang andal (seperti PostgreSQL, MySQL, SQL Server) memiliki log transaksi. Log ini mencatat setiap perubahan data (INSERT, UPDATE, DELETE) sebelum perubahan itu sendiri ditulis ke file data utama. Ini adalah mekanisme inti untuk recovery dan replikasi.

  2. Debezium: Debezium adalah platform CDC open-source yang bertindak sebagai "agen" yang membaca log transaksi ini. Debezium berjalan di dalam Kafka Connect, sebuah framework untuk menghubungkan Kafka dengan sistem lain.

  3. Alur Kerja:

    1. Aplikasi Go (Sama Seperti Sebelumnya): Aplikasi Anda melakukan hal yang persis sama seperti pada contoh sebelumnya. Yaitu, dalam satu transaksi atomik, ia menyisipkan data ke tabel users dan outbox_events. Tidak ada perubahan kode yang diperlukan di sisi aplikasi produsen ini.

    2. Konfigurasi Database: Anda mengaktifkan "logical replication" pada PostgreSQL. Ini menginstruksikan PostgreSQL untuk menulis informasi yang lebih detail ke dalam WAL, cukup untuk merekonstruksi perubahan baris per baris.

    3. Debezium Connector: Anda men-deploy Debezium Connector untuk PostgreSQL ke cluster Kafka Connect. Konektor ini dikonfigurasi untuk:

      • Terhubung ke database PostgreSQL Anda.

      • Memonitor perubahan hanya pada tabel outbox_events.

    4. Tailing Log: Debezium mulai membaca WAL PostgreSQL secara real-time.

    5. Publikasi ke Kafka: Ketika Debezium mendeteksi INSERT baru pada tabel outbox_events, ia akan:

      • Membaca data baris baru tersebut.

      • Mengubahnya menjadi pesan (event).

      • Mempublikasikannya ke sebuah topik di Apache Kafka.

    6. Event Routing (Fitur Kunci): Debezium memiliki transformasi bawaan yang sangat kuat bernama Event Router. Kita akan menggunakannya untuk secara dinamis menentukan topik Kafka tujuan berdasarkan kolom di tabel outbox_events kita (yaitu kolom topic).

    7. Layanan Konsumen: Layanan lain (misalnya NotificationService) sekarang hanya perlu mendengarkan topik Kafka yang relevan (misalnya user.created) untuk menerima event.

Keuntungan Dibandingkan Polling

  • Near Real-time: Latensi sangat rendah, karena event ditangkap segera setelah transaksi di-commit.

  • Beban Database Rendah: Tidak ada lagi query SELECT yang berulang-ulang ke database. Debezium membaca stream log yang memang sudah ada, yang jauh lebih efisien.

  • Skalabilitas dan Keandalan: Dibangun di atas Kafka dan Kafka Connect yang terbukti andal, dapat diskalakan, dan toleran terhadap kesalahan.

  • Tidak Perlu Kode Tambahan: Anda tidak perlu menulis dan memelihara kode poller (outbox/processor.go). Tanggung jawab itu dialihkan ke infrastruktur (Debezium).


Implementasi Produksi-Ready dengan Docker Compose

Cara terbaik untuk mendemonstrasikan ini adalah dengan menggunakan Docker Compose untuk menjalankan semua infrastruktur yang diperlukan.

Langkah 1: Siapkan Proyek Go (Gunakan yang Sebelumnya)

Anda dapat menggunakan proyek Go yang sama persis dari jawaban sebelumnya. Hal yang penting adalah:

  • Struktur domain, repository, dan service tetap sama.

  • Logika service/user_service.go yang menulis ke users dan outbox_events dalam satu transaksi adalah kunci utamanya dan tidak berubah.

  • Hapus atau jangan jalankan outbox/processor.go dan messaging/log_publisher.go. Komponen-komponen ini sekarang digantikan oleh Debezium.

Langkah 2: Buat docker-compose.yml

File ini akan mendefinisikan seluruh stack kita: PostgreSQL, Zookeeper, Kafka, dan Kafka Connect (dengan Debezium).

YAML

# docker-compose.yml
version: '3.8'

services:
  postgres:
    image: postgres:15
    container_name: postgres_db
    ports:
      - "5432:5432"
    environment:
      POSTGRES_USER: user
      POSTGRES_PASSWORD: password
      POSTGRES_DB: outbox_demo
    # Baris ini SANGAT PENTING untuk Debezium
    command: postgres -c wal_level=logical
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U user -d outbox_demo"]
      interval: 10s
      timeout: 5s
      retries: 5

  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.3
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:7.5.3
    container_name: kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1

  connect:
    image: debezium/connect:2.6
    container_name: kafka_connect
    depends_on:
      - kafka
      - postgres
    ports:
      - "8083:8083"
    environment:
      BOOTSTRAP_SERVERS: 'kafka:29092'
      GROUP_ID: 1
      CONFIG_STORAGE_TOPIC: my_connect_configs
      OFFSET_STORAGE_TOPIC: my_connect_offsets
      STATUS_STORAGE_TOPIC: my_connect_statuses

  # Opsional: UI untuk melihat topik dan pesan Kafka, sangat membantu saat development
  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    container_name: kafka_ui
    depends_on:
      - kafka
      - connect
    ports:
      - "8088:8080" # Akses UI di http://localhost:8088
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
      KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: connect
      KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: http://connect:8083

networks:
  default:
    name: outbox-cdc-net

Langkah 3: Konfigurasi Debezium Connector

Setelah semua layanan di docker-compose berjalan, kita perlu memberitahu Kafka Connect untuk memulai Debezium Connector dengan konfigurasi yang kita inginkan. Buat file JSON untuk konfigurasi ini.

register-postgres-connector.json:

JSON

{
  "name": "outbox-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "tasks.max": "1",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "user",
    "database.password": "password",
    "database.dbname": "outbox_demo",
    "database.server.name": "dbserver1",
    "table.include.list": "public.outbox_events",
    "tombstones.on.delete": "false",

    "transforms": "outbox",
    "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
    "transforms.outbox.table.field.event.id": "aggregate_id",
    "transforms.outbox.table.field.event.payload": "payload",
    "transforms.outbox.route.by.field": "topic"
  }
}

Penjelasan Konfigurasi Kunci:

  • table.include.list: "public.outbox_events" -> Hanya memonitor tabel ini.

  • transforms: "outbox" -> Memberi nama pada transformasi yang akan kita definisikan.

  • transforms.outbox.type: "io.debezium.transforms.outbox.EventRouter" -> Ini adalah "otak" dari pola ini. Transformasi ini secara khusus dirancang untuk Outbox Pattern.

  • transforms.outbox.table.field.event.payload: "payload" -> Memberitahu Event Router bahwa isi dari pesan (event) ada di kolom payload.

  • transforms.outbox.route.by.field: "topic" -> Memberitahu Event Router untuk menggunakan nilai dari kolom topic sebagai nama topik Kafka tujuan.

Langkah 4: Jalankan dan Uji Coba

  1. Jalankan Infrastruktur:

    Buka terminal di direktori proyek Anda dan jalankan:

    Bash

    docker-compose up -d

    Tunggu beberapa saat hingga semua kontainer, terutama postgres, dalam keadaan healthy. Anda bisa memeriksanya dengan docker ps.

  2. Buat Tabel di Database:

    Jalankan aplikasi Go Anda sekali untuk membuat tabel melalui main.go (jika Anda memiliki logika inisialisasi skema di sana), atau hubungkan ke database menggunakan psql atau DBeaver dan jalankan schema.sql dari contoh sebelumnya.

  3. Daftarkan Debezium Connector:

    Gunakan curl untuk mengirim konfigurasi JSON ke API Kafka Connect.

    Bash

    curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" \
    localhost:8083/connectors/ \
    -d @register-postgres-connector.json

    Jika berhasil, Anda akan mendapatkan respons HTTP/1.1 201 Created. Anda dapat memverifikasi statusnya di Kafka UI (http://localhost:8088) di bawah tab "Kafka Connect".

  4. Jalankan Aplikasi Go:

    Sekarang, jalankan aplikasi Go Anda (yang berisi UserService dan endpoint HTTP).

    Bash

    go run .
  5. Picu Event:

    Kirim request ke aplikasi Go Anda untuk mendaftarkan pengguna baru.

    Bash

    curl -X POST "http://localhost:8080/register?email=cdc-test@example.com"
  6. Verifikasi Hasilnya:

    • Buka Kafka UI di browser: http://localhost:8088.

    • Navigasi ke bagian "Topics".

    • Anda akan melihat sebuah topik baru bernama user.created telah dibuat secara otomatis!

    • Klik topik tersebut dan lihat pesannya. Anda akan menemukan payload JSON dari pengguna yang baru saja Anda daftarkan.

    JSON

    // Contoh payload yang akan Anda lihat di topik 'user.created'
    {
      "ID": "some-uuid-string",
      "Email": "cdc-test@example.com",
      "CreatedAt": "2025-06-27T14:30:00Z"
    }

Kesimpulan Akhir: Polling vs. CDC

Fitur

Polling Publisher

CDC (Debezium)

Pemenang

Kinerja

Beban tambahan pada DB karena query SELECT yang konstan.

Beban sangat minimal, membaca dari log stream.

CDC

Latensi

Tergantung interval polling (misal: 5 detik).

Near real-time (milidetik).

CDC

Kompleksitas Kode

Perlu menulis dan memelihara kode poller (logika retry, backoff, dll).

Tidak perlu kode poller. Logika bisnis tetap sederhana.

CDC

Kompleksitas Infra

Sederhana. Hanya perlu aplikasi dan database.

Lebih kompleks. Membutuhkan Kafka, Zookeeper, Kafka Connect.

Polling

Keandalan

Cukup andal, tetapi ada kasus tepi (misal: gagal update processed_at).

Sangat andal, dibangun di atas mekanisme inti database dan Kafka.

CDC

Rekomendasi:

  • Untuk proyek kecil, prototipe, atau di mana latensi bukan masalah kritis, Polling Publisher sudah cukup baik dan lebih mudah untuk dimulai.

  • Untuk sistem produksi yang serius, berskala besar, dan membutuhkan latensi rendah, CDC dengan Debezium adalah pilihan yang jauh lebih unggul dan merupakan standar industri modern. Meskipun memerlukan penyiapan infrastruktur awal, keuntungan jangka panjangnya dalam hal kinerja, skalabilitas, dan pemeliharaan sangat besar.

Last updated