Outbox Pattern
Konsep Lengkap Outbox Pattern
1. Masalah: Jaminan Konsistensi pada Sistem Terdistribusi
Bayangkan sebuah skenario umum dalam arsitektur microservices:
Sebuah layanan UserService
bertanggung jawab untuk mendaftarkan pengguna baru. Setelah pengguna berhasil disimpan ke dalam database, layanan ini harus mempublikasikan sebuah event (pesan) bernama UserCreated
ke sebuah message broker (seperti RabbitMQ, Kafka, atau Google Pub/Sub). Layanan lain, misalnya NotificationService
, akan mendengarkan event ini dan mengirim email selamat datang.
Prosesnya terlihat sederhana:
Simpan data pengguna ke database.
Kirim pesan
UserCreated
ke message broker.
Masalahnya terletak pada kegagalan parsial. Apa yang terjadi jika:
Kasus A: Operasi simpan ke database berhasil, tetapi pengiriman pesan gagal (misalnya, message broker sedang down atau ada masalah jaringan)?
Hasil: Sistem menjadi tidak konsisten. Ada pengguna baru di database, tetapi tidak ada layanan lain yang tahu tentang itu. Email selamat datang tidak akan pernah terkirim.
Kasus B: Pengiriman pesan berhasil, tetapi transaksi database gagal di saat-saat terakhir (commit gagal atau terjadi rollback)?
Hasil: Sistem juga tidak konsisten.
NotificationService
menerima pesanUserCreated
dan mengirim email ke pengguna yang sebenarnya tidak ada di dalam database.
Masalah ini dikenal sebagai dual-write problem. Kita mencoba menulis ke dua sistem eksternal yang berbeda (database dan message broker) dan tidak ada cara mudah untuk menjamin kedua operasi ini terjadi secara atomik (keduanya berhasil atau keduanya gagal). Menggunakan distributed transaction (2PC - Two-Phase Commit) seringkali terlalu kompleks, lambat, dan tidak didukung oleh semua teknologi.
2. Solusi: Outbox Pattern
Outbox Pattern memecahkan masalah ini dengan cara yang cerdas dan andal. Idenya adalah mengubah dua operasi tulis yang terpisah menjadi satu operasi tulis atomik di dalam database lokal.
Bagaimana Cara Kerjanya?
Satu Transaksi Atomik:
Ketika sebuah permintaan untuk mendaftarkan pengguna baru datang, layanan
UserService
memulai sebuah transaksi database.Di dalam transaksi yang sama, layanan melakukan dua hal:
Menyisipkan data pengguna baru ke dalam tabel
users
.Menyisipkan data event (pesan yang akan dikirim) ke dalam tabel khusus bernama
outbox
.
Setelah itu, transaksi di-commit.
Karena kedua operasi
INSERT
(keusers
danoutbox
) berada dalam satu transaksi database, maka properti ACID (Atomicity, Consistency, Isolation, Durability) dari database menjamin bahwa keduanya akan berhasil atau keduanya akan gagal bersamaan. Tidak akan ada lagi status inkonsisten.Proses Asinkron (Message Relay):
Ada sebuah proses terpisah yang berjalan di latar belakang. Proses ini kita sebut Message Relay atau Outbox Processor.
Tugas Message Relay adalah:
Secara periodik (misalnya setiap beberapa detik), ia memeriksa tabel
outbox
untuk mencari event yang belum diproses.Untuk setiap event yang ditemukan, ia akan mencoba mempublikasikannya ke message broker.
Jika publikasi berhasil, Message Relay akan menandai event di tabel
outbox
sebagai "telah diproses" (atau menghapusnya). Ini untuk memastikan pesan tidak dikirim berulang kali.Jika publikasi gagal, event tersebut tetap ada di tabel
outbox
dan akan dicoba lagi pada siklus berikutnya.
3. Keuntungan Outbox Pattern
Reliabilitas dan Konsistensi: Menjamin bahwa sebuah event akan terkirim jika dan hanya jika transaksi bisnis utama berhasil. Ini adalah jaminan pengiriman "setidaknya sekali" (at-least-once delivery).
Kinerja: Transaksi bisnis utama menjadi lebih cepat karena tidak perlu menunggu respons dari message broker yang bisa lambat karena latensi jaringan. Penulisan ke tabel
outbox
sangat cepat karena berada di database yang sama.Pemisahan Tanggung Jawab (Decoupling): Layanan bisnis utama (misalnya
UserService
) tidak perlu tahu detail tentang message broker. Tanggung jawabnya hanya menulis ke tabeloutbox
. Logika pengiriman, retry, dll., diisolasi di dalam Message Relay.
4. Opsi Implementasi Message Relay
Polling Publisher: Ini adalah pendekatan yang paling umum dan akan kita implementasikan. Sebuah proses melakukan polling (query
SELECT
) ke tabeloutbox
secara berkala.Transaction Log Tailing (CDC - Change Data Capture): Pendekatan yang lebih canggih dan efisien. Menggunakan alat seperti Debezium yang "membaca" log transaksi database (WAL - Write-Ahead Log). Ketika ada
INSERT
baru ke tabeloutbox
, CDC akan menangkapnya secara real-time dan mempublikasikannya. Ini mengurangi beban pada database karena tidak ada polling dan latensinya sangat rendah.
Kode Produksi-Ready Menggunakan Golang
Berikut adalah contoh implementasi Outbox Pattern yang lengkap dan siap produksi dengan Go. Kode ini mencakup:
Struktur proyek yang jelas.
Penggunaan Dependency Injection untuk kemudahan pengujian.
Manajemen graceful shutdown.
Logika retry implisit pada poller.
Pemisahan antara logika bisnis, repositori, dan proses latar belakang.
Struktur Proyek
outbox-pattern-go/
├── go.mod
├── go.sum
├── main.go
├── schema.sql # Skema database
├── config/ # Konfigurasi (opsional)
├── domain/ # Model data utama (User, OutboxEvent)
│ └── model.go
├── repository/ # Akses data (database)
│ └── user_repo.go
├── service/ # Logika bisnis
│ └── user_service.go
├── outbox/ # Outbox Processor (Message Relay)
│ └── processor.go
└── messaging/ # Abstraksi untuk message broker
├── publisher.go
└── log_publisher.go # Implementasi mock untuk logging
Langkah 1: Skema Database (schema.sql
)
Kita butuh dua tabel: users
untuk data bisnis dan outbox_events
untuk pola outbox.
-- schema.sql
CREATE TABLE IF NOT EXISTS users (
id UUID PRIMARY KEY,
email VARCHAR(255) UNIQUE NOT NULL,
created_at TIMESTAMPTZ NOT NULL
);
CREATE TABLE IF NOT EXISTS outbox_events (
id BIGSERIAL PRIMARY KEY,
aggregate_id UUID NOT NULL, -- ID dari entitas terkait (misal: user_id)
topic VARCHAR(255) NOT NULL, -- Topik/channel di message broker
payload JSONB NOT NULL, -- Isi pesan dalam format JSON
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
processed_at TIMESTAMPTZ NULL -- Kapan event ini diproses. NULL berarti belum.
);
-- Indeks untuk membantu poller mencari event yang belum diproses dengan cepat.
CREATE INDEX IF NOT EXISTS idx_unprocessed_events ON outbox_events (processed_at) WHERE processed_at IS NULL;
Langkah 2: Domain Model (domain/model.go
)
// domain/model.go
package domain
import (
"encoding/json"
"time"
"github.com/google/uuid"
)
// User merepresentasikan model pengguna
type User struct {
ID uuid.UUID `db:"id"`
Email string `db:"email"`
CreatedAt time.Time `db:"created_at"`
}
// OutboxEvent merepresentasikan event yang akan dikirim
type OutboxEvent struct {
ID int64 `db:"id"`
AggregateID uuid.UUID `db:"aggregate_id"`
Topic string `db:"topic"`
Payload json.RawMessage `db:"payload"`
CreatedAt time.Time `db:"created_at"`
ProcessedAt *time.Time `db:"processed_at"`
}
Langkah 3: Abstraksi Message Publisher (messaging/
)
Ini penting agar kita tidak terikat pada satu jenis message broker.
// messaging/publisher.go
package messaging
import "context"
// EventPublisher adalah interface untuk mempublikasikan event.
type EventPublisher interface {
Publish(ctx context.Context, topic string, payload []byte) error
}
Implementasi mock yang hanya mencetak ke log. Di dunia nyata, ini akan diganti dengan klien Kafka atau RabbitMQ.
// messaging/log_publisher.go
package messaging
import (
"context"
"log"
)
type LogPublisher struct{}
func NewLogPublisher() *LogPublisher {
return &LogPublisher{}
}
func (p *LogPublisher) Publish(ctx context.Context, topic string, payload []byte) error {
log.Printf("PUBLISHING event to topic '%s': %s", topic, string(payload))
// Di dunia nyata, di sini ada logika untuk mengirim ke Kafka/RabbitMQ.
// Jika gagal, return error agar outbox processor mencoba lagi.
return nil
}
Langkah 4: Repository (repository/user_repo.go
)
Lapisan ini bertanggung jawab untuk berinteraksi dengan database. Perhatikan bagaimana metode-metodenya menerima sqlx.ExtContext
yang bisa berupa *sqlx.DB
atau *sqlx.Tx
, memungkinkan mereka untuk berpartisipasi dalam transaksi.
// repository/user_repo.go
package repository
import (
"context"
"outbox-pattern-go/domain"
"github.com/jmoiron/sqlx"
)
// DBTX adalah interface yang dipenuhi oleh *sqlx.DB dan *sqlx.Tx
type DBTX interface {
sqlx.ExtContext
sqlx.PreparerContext
}
type UserRepository struct{}
func NewUserRepository() *UserRepository {
return &UserRepository{}
}
// CreateUser menyimpan user baru ke database
func (r *UserRepository) CreateUser(ctx context.Context, db DBTX, user *domain.User) error {
query := `INSERT INTO users (id, email, created_at) VALUES ($1, $2, $3)`
_, err := db.ExecContext(ctx, query, user.ID, user.Email, user.CreatedAt)
return err
}
// CreateOutboxEvent menyimpan event ke tabel outbox
func (r *UserRepository) CreateOutboxEvent(ctx context.Context, db DBTX, event *domain.OutboxEvent) error {
query := `INSERT INTO outbox_events (aggregate_id, topic, payload) VALUES ($1, $2, $3)`
_, err := db.ExecContext(ctx, query, event.AggregateID, event.Topic, event.Payload)
return err
}
Langkah 5: Service Layer (service/user_service.go
)
Ini adalah inti dari logika atomik kita.
// service/user_service.go
package service
import (
"context"
"encoding/json"
"log"
"outbox-pattern-go/domain"
"outbox-pattern-go/repository"
"time"
"github.com/google/uuid"
"github.com/jmoiron/sqlx"
)
type UserService struct {
db *sqlx.DB
repo *repository.UserRepository
}
func NewUserService(db *sqlx.DB, repo *repository.UserRepository) *UserService {
return &UserService{db: db, repo: repo}
}
// RegisterUser mendaftarkan pengguna dan membuat event dalam satu transaksi.
func (s *UserService) RegisterUser(ctx context.Context, email string) (*domain.User, error) {
log.Printf("Registering user with email: %s", email)
// 1. Buat data user dan event
user := &domain.User{
ID: uuid.New(),
Email: email,
CreatedAt: time.Now(),
}
payload, err := json.Marshal(user)
if err != nil {
return nil, err
}
event := &domain.OutboxEvent{
AggregateID: user.ID,
Topic: "user.created",
Payload: payload,
}
// 2. Mulai transaksi database
tx, err := s.db.BeginTxx(ctx, nil)
if err != nil {
return nil, err
}
// Pastikan transaksi di-rollback jika ada error
defer tx.Rollback()
// 3. Simpan user (di dalam transaksi)
if err := s.repo.CreateUser(ctx, tx, user); err != nil {
return nil, err
}
// 4. Simpan event ke outbox (di dalam transaksi yang sama)
if err := s.repo.CreateOutboxEvent(ctx, tx, event); err != nil {
return nil, err
}
// 5. Commit transaksi. Ini adalah titik atomik.
// Jika ini berhasil, user dan event DIJAMIN tersimpan.
if err := tx.Commit(); err != nil {
return nil, err
}
log.Printf("User %s and outbox event created successfully in a single transaction", user.ID)
return user, nil
}
Langkah 6: Outbox Processor (outbox/processor.go
)
Ini adalah Message Relay yang berjalan di latar belakang.
// outbox/processor.go
package outbox
import (
"context"
"log"
"outbox-pattern-go/domain"
"outbox-pattern-go/messaging"
"time"
"github.com/jmoiron/sqlx"
)
type Processor struct {
db *sqlx.DB
publisher messaging.EventPublisher
interval time.Duration
}
func NewProcessor(db *sqlx.DB, publisher messaging.EventPublisher, interval time.Duration) *Processor {
return &Processor{
db: db,
publisher: publisher,
interval: interval,
}
}
// Start memulai proses polling outbox
func (p *Processor) Start(ctx context.Context) {
log.Println("Starting outbox processor...")
ticker := time.NewTicker(p.interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if err := p.processEvents(ctx); err != nil {
log.Printf("Error processing outbox events: %v", err)
}
case <-ctx.Done():
log.Println("Stopping outbox processor...")
return
}
}
}
func (p *Processor) processEvents(ctx context.Context) error {
events, err := p.fetchUnprocessedEvents(ctx)
if err != nil {
return err
}
if len(events) > 0 {
log.Printf("Found %d unprocessed events", len(events))
}
for _, event := range events {
// 1. Publikasikan event
err := p.publisher.Publish(ctx, event.Topic, event.Payload)
if err != nil {
// Jika gagal, kita tidak update `processed_at`.
// Event akan diambil lagi di iterasi berikutnya.
log.Printf("Failed to publish event %d: %v. Will retry later.", event.ID, err)
continue // Lanjut ke event berikutnya
}
// 2. Tandai sebagai sudah diproses
if err := p.markEventAsProcessed(ctx, event.ID); err != nil {
// Kasus langka: event terkirim tapi gagal update DB.
// Ini bisa menyebabkan event terkirim ulang (at-least-once).
// Sistem consumer harus idempotensi.
log.Printf("CRITICAL: Failed to mark event %d as processed after publishing: %v", event.ID, err)
} else {
log.Printf("Successfully processed and published event %d", event.ID)
}
}
return nil
}
func (p *Processor) fetchUnprocessedEvents(ctx context.Context) ([]domain.OutboxEvent, error) {
var events []domain.OutboxEvent
// LIMIT ditambahkan untuk mencegah memuat terlalu banyak event sekaligus
query := `SELECT id, aggregate_id, topic, payload FROM outbox_events WHERE processed_at IS NULL ORDER BY created_at ASC LIMIT 10`
err := p.db.SelectContext(ctx, &events, query)
return events, err
}
func (p *Processor) markEventAsProcessed(ctx context.Context, eventID int64) error {
query := `UPDATE outbox_events SET processed_at = NOW() WHERE id = $1`
_, err := p.db.ExecContext(ctx, query, eventID)
return err
}
Langkah 7: Merangkai Semuanya (main.go
)
// main.go
package main
import (
"context"
"log"
"net/http"
"os"
"os/signal"
"syscall"
"time"
"outbox-pattern-go/messaging"
"outbox-pattern-go/outbox"
"outbox-pattern-go/repository"
"outbox-pattern-go/service"
"github.com/jmoiron/sqlx"
_ "github.com/lib/pq"
)
func main() {
// --- Konfigurasi ---
// Di aplikasi nyata, ini diambil dari env var atau file config
dbConnectionString := "user=postgres password=password dbname=outbox_demo sslmode=disable"
pollingInterval := 5 * time.Second
// --- Setup Database ---
db, err := sqlx.Connect("postgres", dbConnectionString)
if err != nil {
log.Fatalf("Failed to connect to database: %v", err)
}
defer db.Close()
log.Println("Database connection successful")
// --- Inisialisasi Komponen (Dependency Injection) ---
userRepo := repository.NewUserRepository()
userService := service.NewUserService(db, userRepo)
logPublisher := messaging.NewLogPublisher() // Mock publisher
outboxProcessor := outbox.NewProcessor(db, logPublisher, pollingInterval)
// --- Menjalankan Outbox Processor di Goroutine terpisah ---
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go outboxProcessor.Start(ctx)
// --- Setup HTTP Server untuk menerima request (contoh) ---
http.HandleFunc("/register", func(w http.ResponseWriter, r *http.Request) {
email := r.URL.Query().Get("email")
if email == "" {
http.Error(w, "Email query parameter is required", http.StatusBadRequest)
return
}
user, err := userService.RegisterUser(r.Context(), email)
if err != nil {
log.Printf("ERROR: Failed to register user: %v", err)
http.Error(w, "Failed to register user", http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusCreated)
w.Write([]byte("User registered successfully. Outbox event created.\n"))
log.Printf("API: Handled registration for user %s", user.ID)
})
server := &http.Server{Addr: ":8080"}
// --- Graceful Shutdown ---
go func() {
log.Println("Starting HTTP server on :8080")
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Fatalf("Could not listen on :8080: %v\n", err)
}
}()
stopChan := make(chan os.Signal, 1)
signal.Notify(stopChan, syscall.SIGINT, syscall.SIGTERM)
<-stopChan // Tunggu sinyal shutdown
log.Println("Shutting down server...")
// Beri waktu untuk request yang sedang berjalan
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer shutdownCancel()
if err := server.Shutdown(shutdownCtx); err != nil {
log.Fatalf("Server shutdown failed: %+v", err)
}
// Hentikan outbox processor
cancel()
// Beri waktu sedikit agar processor bisa berhenti dengan rapi
time.Sleep(1 * time.Second)
log.Println("Server gracefully stopped")
}
Cara Menjalankan
Siapkan Dependensi:
Bash
go mod init outbox-pattern-go go get github.com/jmoiron/sqlx go get github.com/lib/pq go get github.com/google/uuid
Buat Database: Pastikan Anda memiliki PostgreSQL yang berjalan dan buat database bernama
outbox_demo
.Jalankan Aplikasi:
Bash
go run .
Uji Coba:
Buka terminal baru.
Kirim request untuk mendaftarkan pengguna:
Bash
curl -X POST "http://localhost:8080/register?email=test1@example.com"
Lihat Log Server: Anda akan melihat log yang menunjukkan:
User dan outbox event dibuat dalam satu transaksi.
Outbox processor menemukan event baru.
Event dipublikasikan (ke log, dalam kasus ini).
Event ditandai sebagai sudah diproses.
Periksa Database: Anda bisa memeriksa tabel
users
danoutbox_events
untuk melihat datanya. Perhatikan bahwa kolomprocessed_at
di tabeloutbox_events
akan terisi.
Tentu. Mari kita lanjutkan pembahasan dengan mengimplementasikan Outbox Pattern menggunakan pendekatan yang lebih modern dan efisien: Change Data Capture (CDC) dengan Debezium.
Ini adalah pendekatan yang sangat direkomendasikan untuk lingkungan produksi karena kinerjanya yang tinggi dan beban yang minimal pada database.
Konsep: Outbox Pattern dengan CDC dan Debezium
Berbeda dengan pendekatan polling di mana kita secara aktif menanyakan database ("apakah ada event baru?"), pendekatan CDC bekerja secara pasif dengan "mendengarkan" log transaksi database.
Log Transaksi (Write-Ahead Log - WAL): Setiap database modern yang andal (seperti PostgreSQL, MySQL, SQL Server) memiliki log transaksi. Log ini mencatat setiap perubahan data (INSERT, UPDATE, DELETE) sebelum perubahan itu sendiri ditulis ke file data utama. Ini adalah mekanisme inti untuk recovery dan replikasi.
Debezium: Debezium adalah platform CDC open-source yang bertindak sebagai "agen" yang membaca log transaksi ini. Debezium berjalan di dalam Kafka Connect, sebuah framework untuk menghubungkan Kafka dengan sistem lain.
Alur Kerja:
Aplikasi Go (Sama Seperti Sebelumnya): Aplikasi Anda melakukan hal yang persis sama seperti pada contoh sebelumnya. Yaitu, dalam satu transaksi atomik, ia menyisipkan data ke tabel
users
danoutbox_events
. Tidak ada perubahan kode yang diperlukan di sisi aplikasi produsen ini.Konfigurasi Database: Anda mengaktifkan "logical replication" pada PostgreSQL. Ini menginstruksikan PostgreSQL untuk menulis informasi yang lebih detail ke dalam WAL, cukup untuk merekonstruksi perubahan baris per baris.
Debezium Connector: Anda men-deploy Debezium Connector untuk PostgreSQL ke cluster Kafka Connect. Konektor ini dikonfigurasi untuk:
Terhubung ke database PostgreSQL Anda.
Memonitor perubahan hanya pada tabel
outbox_events
.
Tailing Log: Debezium mulai membaca WAL PostgreSQL secara real-time.
Publikasi ke Kafka: Ketika Debezium mendeteksi
INSERT
baru pada tabeloutbox_events
, ia akan:Membaca data baris baru tersebut.
Mengubahnya menjadi pesan (event).
Mempublikasikannya ke sebuah topik di Apache Kafka.
Event Routing (Fitur Kunci): Debezium memiliki transformasi bawaan yang sangat kuat bernama Event Router. Kita akan menggunakannya untuk secara dinamis menentukan topik Kafka tujuan berdasarkan kolom di tabel
outbox_events
kita (yaitu kolomtopic
).Layanan Konsumen: Layanan lain (misalnya
NotificationService
) sekarang hanya perlu mendengarkan topik Kafka yang relevan (misalnyauser.created
) untuk menerima event.
Keuntungan Dibandingkan Polling
Near Real-time: Latensi sangat rendah, karena event ditangkap segera setelah transaksi di-commit.
Beban Database Rendah: Tidak ada lagi query
SELECT
yang berulang-ulang ke database. Debezium membaca stream log yang memang sudah ada, yang jauh lebih efisien.Skalabilitas dan Keandalan: Dibangun di atas Kafka dan Kafka Connect yang terbukti andal, dapat diskalakan, dan toleran terhadap kesalahan.
Tidak Perlu Kode Tambahan: Anda tidak perlu menulis dan memelihara kode poller (
outbox/processor.go
). Tanggung jawab itu dialihkan ke infrastruktur (Debezium).
Implementasi Produksi-Ready dengan Docker Compose
Cara terbaik untuk mendemonstrasikan ini adalah dengan menggunakan Docker Compose untuk menjalankan semua infrastruktur yang diperlukan.
Langkah 1: Siapkan Proyek Go (Gunakan yang Sebelumnya)
Anda dapat menggunakan proyek Go yang sama persis dari jawaban sebelumnya. Hal yang penting adalah:
Struktur
domain
,repository
, danservice
tetap sama.Logika
service/user_service.go
yang menulis keusers
danoutbox_events
dalam satu transaksi adalah kunci utamanya dan tidak berubah.Hapus atau jangan jalankan
outbox/processor.go
danmessaging/log_publisher.go
. Komponen-komponen ini sekarang digantikan oleh Debezium.
Langkah 2: Buat docker-compose.yml
File ini akan mendefinisikan seluruh stack kita: PostgreSQL, Zookeeper, Kafka, dan Kafka Connect (dengan Debezium).
YAML
# docker-compose.yml
version: '3.8'
services:
postgres:
image: postgres:15
container_name: postgres_db
ports:
- "5432:5432"
environment:
POSTGRES_USER: user
POSTGRES_PASSWORD: password
POSTGRES_DB: outbox_demo
# Baris ini SANGAT PENTING untuk Debezium
command: postgres -c wal_level=logical
healthcheck:
test: ["CMD-SHELL", "pg_isready -U user -d outbox_demo"]
interval: 10s
timeout: 5s
retries: 5
zookeeper:
image: confluentinc/cp-zookeeper:7.5.3
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:7.5.3
container_name: kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
connect:
image: debezium/connect:2.6
container_name: kafka_connect
depends_on:
- kafka
- postgres
ports:
- "8083:8083"
environment:
BOOTSTRAP_SERVERS: 'kafka:29092'
GROUP_ID: 1
CONFIG_STORAGE_TOPIC: my_connect_configs
OFFSET_STORAGE_TOPIC: my_connect_offsets
STATUS_STORAGE_TOPIC: my_connect_statuses
# Opsional: UI untuk melihat topik dan pesan Kafka, sangat membantu saat development
kafka-ui:
image: provectuslabs/kafka-ui:latest
container_name: kafka_ui
depends_on:
- kafka
- connect
ports:
- "8088:8080" # Akses UI di http://localhost:8088
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: connect
KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: http://connect:8083
networks:
default:
name: outbox-cdc-net
Langkah 3: Konfigurasi Debezium Connector
Setelah semua layanan di docker-compose
berjalan, kita perlu memberitahu Kafka Connect untuk memulai Debezium Connector dengan konfigurasi yang kita inginkan. Buat file JSON untuk konfigurasi ini.
register-postgres-connector.json
:
JSON
{
"name": "outbox-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "user",
"database.password": "password",
"database.dbname": "outbox_demo",
"database.server.name": "dbserver1",
"table.include.list": "public.outbox_events",
"tombstones.on.delete": "false",
"transforms": "outbox",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.table.field.event.id": "aggregate_id",
"transforms.outbox.table.field.event.payload": "payload",
"transforms.outbox.route.by.field": "topic"
}
}
Penjelasan Konfigurasi Kunci:
table.include.list
: "public.outbox_events" -> Hanya memonitor tabel ini.transforms
: "outbox" -> Memberi nama pada transformasi yang akan kita definisikan.transforms.outbox.type
: "io.debezium.transforms.outbox.EventRouter" -> Ini adalah "otak" dari pola ini. Transformasi ini secara khusus dirancang untuk Outbox Pattern.transforms.outbox.table.field.event.payload
: "payload" -> Memberitahu Event Router bahwa isi dari pesan (event) ada di kolompayload
.transforms.outbox.route.by.field
: "topic" -> Memberitahu Event Router untuk menggunakan nilai dari kolomtopic
sebagai nama topik Kafka tujuan.
Langkah 4: Jalankan dan Uji Coba
Jalankan Infrastruktur:
Buka terminal di direktori proyek Anda dan jalankan:
Bash
docker-compose up -d
Tunggu beberapa saat hingga semua kontainer, terutama
postgres
, dalam keadaan healthy. Anda bisa memeriksanya dengandocker ps
.Buat Tabel di Database:
Jalankan aplikasi Go Anda sekali untuk membuat tabel melalui main.go (jika Anda memiliki logika inisialisasi skema di sana), atau hubungkan ke database menggunakan psql atau DBeaver dan jalankan schema.sql dari contoh sebelumnya.
Daftarkan Debezium Connector:
Gunakan curl untuk mengirim konfigurasi JSON ke API Kafka Connect.
Bash
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" \ localhost:8083/connectors/ \ -d @register-postgres-connector.json
Jika berhasil, Anda akan mendapatkan respons
HTTP/1.1 201 Created
. Anda dapat memverifikasi statusnya di Kafka UI (http://localhost:8088
) di bawah tab "Kafka Connect".Jalankan Aplikasi Go:
Sekarang, jalankan aplikasi Go Anda (yang berisi UserService dan endpoint HTTP).
Bash
go run .
Picu Event:
Kirim request ke aplikasi Go Anda untuk mendaftarkan pengguna baru.
Bash
curl -X POST "http://localhost:8080/register?email=cdc-test@example.com"
Verifikasi Hasilnya:
Buka Kafka UI di browser:
http://localhost:8088
.Navigasi ke bagian "Topics".
Anda akan melihat sebuah topik baru bernama
user.created
telah dibuat secara otomatis!Klik topik tersebut dan lihat pesannya. Anda akan menemukan payload JSON dari pengguna yang baru saja Anda daftarkan.
JSON
// Contoh payload yang akan Anda lihat di topik 'user.created' { "ID": "some-uuid-string", "Email": "cdc-test@example.com", "CreatedAt": "2025-06-27T14:30:00Z" }
Kesimpulan Akhir: Polling vs. CDC
Fitur
Polling Publisher
CDC (Debezium)
Pemenang
Kinerja
Beban tambahan pada DB karena query SELECT
yang konstan.
Beban sangat minimal, membaca dari log stream.
CDC
Latensi
Tergantung interval polling (misal: 5 detik).
Near real-time (milidetik).
CDC
Kompleksitas Kode
Perlu menulis dan memelihara kode poller (logika retry, backoff, dll).
Tidak perlu kode poller. Logika bisnis tetap sederhana.
CDC
Kompleksitas Infra
Sederhana. Hanya perlu aplikasi dan database.
Lebih kompleks. Membutuhkan Kafka, Zookeeper, Kafka Connect.
Polling
Keandalan
Cukup andal, tetapi ada kasus tepi (misal: gagal update processed_at
).
Sangat andal, dibangun di atas mekanisme inti database dan Kafka.
CDC
Rekomendasi:
Untuk proyek kecil, prototipe, atau di mana latensi bukan masalah kritis, Polling Publisher sudah cukup baik dan lebih mudah untuk dimulai.
Untuk sistem produksi yang serius, berskala besar, dan membutuhkan latensi rendah, CDC dengan Debezium adalah pilihan yang jauh lebih unggul dan merupakan standar industri modern. Meskipun memerlukan penyiapan infrastruktur awal, keuntungan jangka panjangnya dalam hal kinerja, skalabilitas, dan pemeliharaan sangat besar.
Last updated