Kafka Producer Advanced: Konfigurasi Retries Produser Kafka

Konfigurasi Retries Produser Kafka

Dalam Apache Kafka, produser dapat menghadapi kesalahan saat mengirim pesan ke broker, seperti kegagalan jaringan atau replika yang tidak tersedia. Kesalahan ini dapat bersifat retriable (dapat dicoba ulang) atau non-retriable (tidak dapat dicoba ulang). Mengonfigurasi retries dengan benar sangat penting untuk memastikan pesan tidak hilang, sambil menjaga pengurutan dan performa sistem. Artikel ini akan menjelaskan jenis-jenis kesalahan, parameter konfigurasi retries, cara mengonfigurasi produser untuk retries yang aman, serta praktik terbaik untuk menyeimbangkan ketahanan (durability), pengurutan (ordering), dan throughput. Contoh lengkap menggunakan kode Java juga disertakan untuk mengilustrasikan implementasi.

Jenis Kesalahan Produser

Saat produser mengirim pesan ke broker Kafka, broker dapat mengembalikan kode sukses atau kode kesalahan. Kesalahan ini terbagi menjadi dua kategori:

  1. Kesalahan Retriable:

    • Kesalahan yang dapat diselesaikan dengan mencoba ulang pengiriman pesan.

    • Contoh: NotEnoughReplicasException, yang terjadi ketika jumlah replika In-Sync Replicas (ISR) di bawah min.insync.replicas. Mencoba ulang dapat berhasil jika replika kembali online.

    • Produser dapat dikonfigurasi untuk mencoba ulang secara otomatis untuk kesalahan ini.

  2. Kesalahan Non-Retriable:

    • Kesalahan yang tidak akan terselesaikan meskipun dicoba ulang.

    • Contoh: InvalidConfigException, yang menunjukkan masalah konfigurasi yang tidak dapat diperbaiki dengan pengiriman ulang.

    • Produser akan langsung menandai pesan sebagai gagal untuk kesalahan ini.

Mengaktifkan retries sangat dianjurkan untuk mencegah kehilangan pesan akibat kesalahan retriable, tetapi konfigurasi harus diatur dengan hati-hati untuk menghindari masalah seperti pengurutan pesan yang salah atau penurunan performa.

Parameter Konfigurasi Retries

Berikut adalah parameter konfigurasi utama untuk mengelola retries pada produser Kafka:

1. retries

  • Deskripsi: Menentukan jumlah percobaan ulang yang dilakukan produser sebelum menandai pesan sebagai gagal.

  • Default:

    • Kafka ≤ 2.0: 0 (tidak ada retries).

    • Kafka ≥ 2.1: 2147483647 (nilai maksimum integer, Integer.MAX_VALUE).

  • Rekomendasi: Biarkan retries tidak diatur secara eksplisit dan gunakan delivery.timeout.ms untuk mengontrol perilaku retries, karena ini lebih intuitif dan fleksibel.

2. delivery.timeout.ms

  • Deskripsi: Batas waktu total (dalam milidetik) untuk pengiriman pesan, termasuk semua percobaan ulang. Jika pesan tidak dapat dikirim dalam waktu ini, pesan akan ditandai sebagai gagal.

  • Default: 120000 (2 menit).

  • Contoh: Dengan delivery.timeout.ms=120000, produser akan berhenti mencoba setelah 2 menit, meskipun retries diatur ke nilai besar.

  • Catatan: Diperkenalkan melalui KIP-91 (Kafka 2.1) untuk memberikan kontrol yang lebih baik atas batas waktu pengiriman.

3. retry.backoff.ms

  • Deskripsi: Waktu tunggu (dalam milidetik) antara percobaan ulang.

  • Default: 100 (100 milidetik).

  • Rekomendasi: Sesuaikan nilai ini berdasarkan latensi jaringan dan frekuensi kesalahan. Nilai yang terlalu kecil dapat membebani broker, sedangkan nilai yang terlalu besar meningkatkan latensi pengiriman.

4. max.in.flight.requests.per.connection

  • Deskripsi: Jumlah maksimum permintaan yang belum diakui (in-flight) yang dapat dikirim produser ke satu koneksi broker.

  • Default: 5.

  • Penting:

    • Jika diatur ke 1, menjamin pengurutan pesan berdasarkan kunci, karena hanya satu batch yang diproses pada satu waktu.

    • Jika lebih dari 1 (misalnya, 5), batch kedua dapat berhasil sebelum batch pertama yang gagal dicoba ulang, menyebabkan pengurutan pesan berubah. Ini bermasalah jika aplikasi bergantung pada pengurutan kunci.

    • Dengan enable.idempotence=true, max.in.flight.requests.per.connection harus ≤ 5 untuk menjaga pengurutan pesan.

5. enable.idempotence

  • Deskripsi: Mengaktifkan produser idempoten, yang mencegah duplikasi pesan akibat retries, memastikan pengiriman exactly-once untuk pesan dalam satu sesi produser.

  • Default: false.

  • Persyaratan:

    • acks=all harus diatur.

    • max.in.flight.requests.per.connection harus ≤ 5.

    • retries harus > 0 (default Integer.MAX_VALUE pada Kafka ≥ 2.1 sudah cukup).

  • Keuntungan: Menjamin tidak ada duplikasi pesan dan pengurutan tetap terjaga, bahkan dengan retries.

Konfigurasi Produser Aman untuk Retries

Untuk memastikan retries aman, hindari kehilangan pesan, dan menjaga pengurutan, gunakan konfigurasi berikut:

Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
properties.setProperty(ProducerConfig.ACKS_CONFIG, "all");
properties.setProperty(ProducerConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE));
properties.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");
properties.setProperty(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, "120000");
properties.setProperty(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "100");

Penjelasan:

  • enable.idempotence=true: Mengaktifkan produser idempoten untuk mencegah duplikasi dan menjaga pengurutan.

  • acks=all: Memastikan pesan ditulis ke semua replika ISR, meningkatkan ketahanan.

  • retries=Integer.MAX_VALUE: Memungkinkan percobaan ulang maksimum, dibatasi oleh delivery.timeout.ms.

  • max.in.flight.requests.per.connection=5: Menjaga pengurutan dengan idempotence (≤ 5 diperlukan).

  • delivery.timeout.ms=120000: Membatasi waktu pengiriman hingga 2 menit.

  • retry.backoff.ms=100: Waktu tunggu standar antara retries.

Trade-off: Keamanan vs. Throughput

  • Keamanan Maksimum:

    • Gunakan enable.idempotence=true, acks=all, dan max.in.flight.requests.per.connection=1 untuk menjamin pengurutan dan pengiriman exactly-once.

    • Kekurangan: Throughput menurun karena hanya satu permintaan yang diproses pada satu waktu.

  • Throughput Tinggi:

    • Gunakan max.in.flight.requests.per.connection=5 (dengan idempotence) untuk meningkatkan throughput sambil tetap menjaga pengurutan.

    • Jika pengurutan tidak penting, acks=1 atau acks=0 dapat digunakan untuk throughput lebih tinggi, tetapi dengan risiko kehilangan data.

  • Catatan: Dengan enable.idempotence=true, pengurutan terjaga untuk max.in.flight.requests.per.connection5, sehingga ini adalah kompromi yang baik.

Praktik Konfigurasi Retries

Berikut adalah langkah-langkah praktis untuk mengonfigurasi produser dengan retries menggunakan CLI dan kode Java, dengan contoh pengujian.

Prasyarat

  • Pastikan klaster Kafka berjalan (mode Zookeeper atau KRaft).

  • Gunakan ekstensi CLI yang sesuai: .sh untuk Linux/Mac, .bat untuk Windows.

  • Pastikan broker Kafka aktif di localhost:9092.

  • Klaster memiliki setidaknya tiga broker untuk mendukung replication.factor=3.

Langkah-langkah Praktis

1. Buat Topik

Buat topik bernama retry-topic dengan 3 partisi, faktor replikasi 3, dan min.insync.replicas=2:

kafka-topics.sh --bootstrap-server localhost:9092 --create --topic retry-topic \
--partitions 3 --replication-factor 3 \
--config min.insync.replicas=2

Verifikasi konfigurasi topik:

kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic retry-topic

Contoh Keluaran:

Topic: retry-topic TopicId: XYZ123 PartitionCount: 3 ReplicationFactor: 3 Configs: min.insync.replicas=2
Topic: retry-topic Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: retry-topic Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
Topic: retry-topic Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2

2. Implementasi Produser dengan Java

Berikut adalah contoh kode Java untuk produser dengan konfigurasi retries aman:

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;

public class SafeRetryProducer {
    public static void main(String[] args) {
        // Konfigurasi produser
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
        properties.setProperty(ProducerConfig.ACKS_CONFIG, "all");
        properties.setProperty(ProducerConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE));
        properties.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");
        properties.setProperty(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, "120000");
        properties.setProperty(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "100");

        // Buat produser
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        // Kirim pesan
        for (int i = 0; i < 10; i++) {
            String key = "key-" + i;
            String value = "Message " + i;
            ProducerRecord<String, String> record = new ProducerRecord<>("retry-topic", key, value);

            producer.send(record, (metadata, exception) -> {
                if (exception == null) {
                    System.out.printf("Pesan dikirim: key=%s, value=%s, partition=%d, offset=%d%n",
                            key, value, metadata.partition(), metadata.offset());
                } else {
                    System.err.printf("Gagal mengirim pesan: %s%n", exception.getMessage());
                }
            });
        }

        // Tutup produser
        producer.flush();
        producer.close();
    }
}

Penjelasan Kode:

  • Menggunakan konfigurasi retries aman dengan idempotence.

  • Mengirim 10 pesan dengan kunci dan nilai untuk pengujian.

  • Callback menangani hasil pengiriman, mencetak sukses atau kesalahan.

3. Uji dengan Produser Konsol

Jalankan produser konsol dengan konfigurasi retries:

kafka-console-producer.sh --bootstrap-server localhost:9092 \
--topic retry-topic \
--producer-property acks=all \
--producer-property retries=2147483647 \
--producer-property delivery.timeout.ms=120000 \
--producer-property max.in.flight.requests.per.connection=5 \
--producer-property enable.idempotence=true

Kirim beberapa pesan:

key1,Message 1
key2,Message 2
key3,Message 3

4. Konsumsi Pesan

Jalankan konsumer untuk memverifikasi pesan:

kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic retry-topic \
--from-beginning \
--property print.key=true \
--property key.separator=,

Contoh Keluaran:

key1,Message 1
key2,Message 2
key3,Message 3

5. Simulasi Kesalahan

Untuk menguji retries:

  • Matikan satu broker untuk memicu NotEnoughReplicasException.

  • Amati apakah produser mencoba ulang secara otomatis (periksa log produser atau callback).

  • Pastikan semua pesan akhirnya dikirim setelah broker kembali online, selama dalam delivery.timeout.ms.

Catatan

  • Idempotence: Dengan enable.idempotence=true, Kafka secara otomatis menangani duplikasi pesan, tetapi memerlukan acks=all dan max.in.flight.requests.per.connection5.

  • Pengurutan: Untuk menjamin pengurutan kunci, atur max.in.flight.requests.per.connection=1, tetapi ini mengurangi throughput.

  • Mode KRaft: Dalam mode KRaft, perilaku retries tetap sama, tetapi manajemen replika lebih efisien.

Praktik Terbaik

  1. Aktifkan Idempotence untuk Keamanan:

    • Gunakan enable.idempotence=true untuk mencegah duplikasi dan menjaga pengurutan, terutama untuk data kritis.

    • Pastikan acks=all dan max.in.flight.requests.per.connection5.

  2. Gunakan delivery.timeout.ms:

    • Alih-alih mengatur retries secara eksplisit, gunakan delivery.timeout.ms (misalnya, 120000) untuk membatasi waktu pengiriman.

  3. Sesuaikan retry.backoff.ms:

    • Atur retry.backoff.ms berdasarkan latensi jaringan (misalnya, 100 untuk jaringan cepat, lebih tinggi untuk jaringan lambat).

  4. Pilih max.in.flight.requests.per.connection dengan Hati-hati:

    • Gunakan 1 untuk pengurutan ketat, atau 5 dengan idempotence untuk kompromi antara pengurutan dan throughput.

  5. Pantau Metrik:

    • Gunakan metrik JMX seperti RequestLatencyAvg dan RetryCount untuk memantau performa retries.

    • Periksa log produser untuk kesalahan seperti NotEnoughReplicasException.

  6. Uji di Lingkungan Non-Produksi:

    • Simulasikan kegagalan broker untuk memastikan retries bekerja sesuai harapan tanpa kehilangan pesan.

  7. Kombinasikan dengan min.insync.replicas:

    • Gunakan min.insync.replicas=2 dengan acks=all untuk memastikan ketahanan, terutama untuk topik dengan replication.factor=3.

  8. Dokumentasikan Konfigurasi:

    • Catat pengaturan retries dan alasan pemilihannya untuk memudahkan pemeliharaan dan debugging.

Penjelasan Tambahan

Hubungan dengan Pengurutan

  • Retries tanpa batasan max.in.flight.requests.per.connection=1 dapat mengganggu pengurutan kunci. Misalnya, jika batch pertama gagal tetapi batch kedua berhasil, pesan dalam batch kedua dapat muncul lebih dulu di partisi.

  • Dengan enable.idempotence=true, Kafka menangani pengurutan dengan benar hingga max.in.flight.requests.per.connection=5.

Pemecahan Masalah

Jika pesan hilang atau pengurutan salah:

  • Periksa Konfigurasi:

    kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type topics --entity-name retry-topic

    Pastikan min.insync.replicas cukup tinggi.

  • Periksa Log Produser: Cari kesalahan seperti NotEnoughReplicasException atau timeout.

  • Uji dengan max.in.flight.requests.per.connection=1: Jika pengurutan bermasalah, turunkan ke 1 untuk debugging.

Jika throughput rendah:

  • Tinjau max.in.flight.requests.per.connection dan tingkatkan ke 5 jika idempotence diaktifkan.

  • Pertimbangkan acks=1 untuk data non-kritis.

Kesimpulan

Konfigurasi retries pada produser Kafka adalah kunci untuk memastikan pesan tidak hilang akibat kesalahan retriable seperti NotEnoughReplicasException. Dengan mengatur enable.idempotence=true, acks=all, retries=Integer.MAX_VALUE, delivery.timeout.ms=120000, dan max.in.flight.requests.per.connection=5, Anda dapat mencapai pengiriman yang aman dengan pengurutan terjaga. Untuk aplikasi yang memerlukan pengurutan ketat, gunakan max.in.flight.requests.per.connection=1, tetapi waspadai penurunan throughput. Dengan mengikuti praktik terbaik seperti pengujian, pemantauan metrik, dan dokumentasi, Anda dapat mengelola retries secara efektif untuk mendukung aplikasi Kafka yang andal.

Last updated