Kafka Consumer Advanced: Poll and Internal Threads Behavior

Pengaturan Penting Konsumer Kafka: Perilaku Poll dan Thread Internal

Dalam Apache Kafka, konsumer menggunakan metode poll() untuk mengambil batch data dari broker, mengelola koordinasi grup, rebalancing partisi, dan pengiriman heartbeat untuk menjaga keanggotaan dalam grup konsumer. Perilaku poll dan thread internal seperti heartbeat thread dan poll thread sangat penting untuk memastikan konsumer tetap aktif, responsif, dan efisien dalam memproses pesan. Artikel ini menjelaskan cara kerja poll, pengaturan utama seperti heartbeat.interval.ms, session.timeout.ms, max.poll.interval.ms, fetch.min.bytes, fetch.max.wait.ms, max.partition.fetch.bytes, fetch.max.bytes, dan max.poll.records, serta bagaimana mengonfigurasi konsumer untuk performa optimal. Kami menyertakan contoh kode Java, langkah-langkah pengujian dengan CLI, dan praktik terbaik untuk lingkungan produksi.

Perilaku Poll Konsumer Kafka

Metode poll() adalah inti dari pengoperasian konsumer Kafka. Setelah konsumer berlangganan ke topik, poll loop menangani:

  • Koordinasi Grup: Mengelola keanggotaan dalam grup konsumer.

  • Rebalancing Partisi: Menangani redistribusi partisi saat konsumer bergabung atau keluar dari grup.

  • Heartbeat: Mengirim sinyal ke koordinator grup untuk menunjukkan konsumer masih aktif.

  • Pengambilan Data: Mengambil batch pesan dari partisi yang ditugaskan.

Optimasi Internal

Konsumer Kafka dioptimalkan untuk efisiensi:

  • Jika konsumer berhasil mengambil data, ia akan memulai permintaan fetch berikutnya secara asinkronus (pre-fetching) saat memproses batch saat ini.

  • Ini mengurangi waktu tunggu pada pemanggilan poll() berikutnya, meningkatkan throughput dan mengurangi latensi.

Diagram Perilaku Poll:

[poll() -> Ambil Batch -> Proses Batch | Pre-fetch Batch Berikutnya]
[poll() berikutnya -> Kembalikan Batch yang Sudah Di-fetch]

Kontrol melalui Poll

Metode poll() memungkinkan konsumer untuk mengontrol:

  • Posisi Konsumsi: Mengatur dari mana dalam log konsumer mulai membaca (misalnya, earliest, latest).

  • Kecepatan Konsumsi: Mengatur jumlah data yang diambil per poll (via max.poll.records, fetch.max.bytes).

  • Replay Pesan: Memungkinkan konsumsi ulang pesan dengan mengatur ulang offset.

Thread Internal Konsumer

Konsumer Kafka menggunakan dua thread utama untuk operasinya:

  • Heartbeat Thread: Mengirim heartbeat untuk menjaga keanggotaan grup.

  • Poll Thread: Menangani pengambilan data melalui poll().

Diagram Thread Internal:

[Heartbeat Thread -> Kirim Heartbeat ke Koordinator]
[Poll Thread -> Panggil poll() -> Ambil & Proses Pesan]

Heartbeat Thread

  • Fungsi: Mengirim heartbeat secara periodik ke koordinator grup untuk menunjukkan konsumer masih aktif.

  • Pengaturan:

    • heartbeat.interval.ms (default: 3000 ms atau 3 detik): Waktu antara pengiriman heartbeat. Menentukan seberapa sering konsumer melaporkan statusnya.

    • session.timeout.ms (Kafka ≥ 3.0: 45000 ms atau 45 detik; Kafka ≤ 2.8: 10000 ms atau 10 detik): Waktu maksimum konsumer dianggap aktif tanpa heartbeat. Jika melebihi waktu ini, koordinator menganggap konsumer mati dan memicu rebalance.

  • Aturan:

    • heartbeat.interval.ms harus lebih kecil dari session.timeout.ms, biasanya tidak lebih dari sepertiga dari nilai session.timeout.ms (misalnya, 3 detik untuk 10 detik).

    • Nilai default biasanya cukup, tetapi dapat disesuaikan untuk kasus khusus.

  • Implikasi:

    • Heartbeat yang terlalu jarang dapat menyebabkan rebalance prematur.

    • Session timeout yang terlalu pendek meningkatkan risiko rebalance yang tidak perlu.

Poll Thread

  • Fungsi: Mengambil batch pesan dari partisi menggunakan poll() dan memprosesnya di thread utama.

  • Pengaturan:

    • max.poll.interval.ms (default: 300000 ms atau 5 menit): Waktu maksimum antara dua pemanggilan poll(). Jika waktu ini terlampaui, konsumer dianggap gagal (livelock), dan koordinator memicu rebalance.

    • max.poll.records (default: 500): Jumlah maksimum record yang dikembalikan per pemanggilan poll(). Mengontrol jumlah data yang diproses dalam satu iterasi.

  • Implikasi:

    • max.poll.interval.ms yang terlalu rendah dapat menyebabkan rebalance jika pengolahan lambat (misalnya, di Apache Spark).

    • max.poll.records yang terlalu besar meningkatkan penggunaan memori dan waktu pengolahan, berisiko melebihi max.poll.interval.ms.

Perilaku Fetch Konsumer

Saat poll() dipanggil, konsumer mengambil data dari partisi dan menyimpannya dalam cache internal untuk diproses. Perilaku fetch dikontrol oleh:

  • fetch.min.bytes (default: 1 byte): Jumlah minimum data yang diinginkan konsumer per permintaan fetch. Broker menunda pengiriman hingga data mencapai jumlah ini atau waktu tunggu habis.

  • fetch.max.wait.ms (default: 500 ms): Waktu maksimum broker menunda respons fetch jika fetch.min.bytes belum terpenuhi.

  • max.partition.fetch.bytes (default: 1048576 atau 1MB): Jumlah maksimum data per partisi per fetch. Jika batch pertama melebihi batas ini, batch tetap dikirim untuk memastikan kemajuan.

  • fetch.max.bytes (default: 57671680 atau 55MB): Jumlah maksimum data yang dikembalikan per permintaan fetch dari semua partisi.

Implikasi:

  • fetch.min.bytes dan fetch.max.wait.ms mengurangi overhead jaringan dengan menunggu batch besar.

  • max.partition.fetch.bytes dan fetch.max.bytes membatasi penggunaan memori konsumer.

  • max.poll.records tidak memengaruhi fetch tetapi membatasi jumlah record yang dikembalikan ke aplikasi per poll().

Konfigurasi Konsumer untuk Performa Optimal

Berikut adalah konfigurasi konsumer untuk performa tinggi dengan perilaku poll dan fetch yang dioptimalkan:

Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "consumer-group");
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "3000");
properties.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000");
properties.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "300000");
properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500");
properties.setProperty(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "1024"); // 1KB
properties.setProperty(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "500");
properties.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, Integer.toString(1 * 1024 * 1024)); // 1MB
properties.setProperty(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, Integer.toString(55 * 1024 * 1024)); // 55MB

Penjelasan:

  • enable.auto.commit=false: Menggunakan komit manual untuk semantik At Least Once.

  • heartbeat.interval.ms=3000 dan session.timeout.ms=10000: Menjaga keanggotaan grup dengan heartbeat reguler.

  • max.poll.interval.ms=300000: Memberikan waktu cukup untuk pengolahan batch.

  • max.poll.records=500: Membatasi jumlah record per poll untuk pengolahan terkendali.

  • fetch.min.bytes=1024 dan fetch.max.wait.ms=500: Mengoptimalkan fetch untuk batch besar.

  • max.partition.fetch.bytes=1MB dan fetch.max.bytes=55MB: Menjaga penggunaan memori wajar.

Praktik Konfigurasi dan Pengujian

Berikut adalah langkah-langkah praktis untuk mengonfigurasi konsumer dengan pengaturan poll dan fetch yang optimal menggunakan kode Java dan alat CLI, serta pengujian untuk memverifikasi performa dan keandalan.

Prasyarat

  • Pastikan klaster Kafka berjalan (versi ≥ 0.11, mode Zookeeper atau KRaft).

  • Gunakan ekstensi CLI yang sesuai: .sh untuk Linux/Mac, .bat untuk Windows.

  • Pastikan broker Kafka aktif di localhost:9092.

  • Klaster memiliki setidaknya tiga broker untuk mendukung replication.factor=3.

Langkah-langkah Praktis

1. Buat Topik

Buat topik bernama poll-topic dengan 10 partisi, faktor replikasi 3, dan min.insync.replicas=2:

kafka-topics.sh --bootstrap-server localhost:9092 --create --topic poll-topic \
--partitions 10 --replication-factor 3 \
--config min.insync.replicas=2 \
--config compression.type=producer

Verifikasi konfigurasi topik:

kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic poll-topic

Contoh Keluaran:

Topic: poll-topic TopicId: XYZ123 PartitionCount: 10 ReplicationFactor: 3 Configs: min.insync.replicas=2,compression.type=producer
Topic: poll-topic Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
...
Topic: poll-topic Partition: 9 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1

2. Produksi Pesan untuk Pengujian

Gunakan produser konsol untuk mengirim pesan:

kafka-console-producer.sh --bootstrap-server localhost:9092 \
--topic poll-topic \
--property parse.key=true \
--property key.separator=,

Kirim beberapa pesan:

key1,{"id": 1, "data": "Poll test message 1"}
key2,{"id": 2, "data": "Poll test message 2"}
key3,{"id": 3, "data": "Poll test message 3"}

3. Implementasi Konsumer dengan Poll Optimal (Java)

Berikut adalah contoh kode Java untuk konsumer dengan pengaturan poll dan fetch yang dioptimalkan:

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class PollOptimizedConsumer {
    public static void main(String[] args) {
        // Konfigurasi konsumer
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "consumer-group");
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        properties.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "3000");
        properties.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000");
        properties.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "300000");
        properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500");
        properties.setProperty(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "1024");
        properties.setProperty(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "500");
        properties.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, Integer.toString(1 * 1024 * 1024));
        properties.setProperty(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, Integer.toString(55 * 1024 * 1024));

        // Buat konsumer
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Collections.singletonList("poll-topic"));

        try {
            while (true) {
                // Poll pesan
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                
                // Proses setiap pesan
                for (ConsumerRecord<String, String> record : records) {
                    String key = record.key();
                    String value = record.value();
                    System.out.printf("Menerima pesan: key=%s, value=%s, partition=%d, offset=%d%n",
                            key, value, record.partition(), record.offset());
                    
                    // Simulasi pengolahan idempoten
                    try {
                        processMessage(key, value);
                    } catch (Exception e) {
                        System.err.printf("Gagal memproses pesan: %s%n", e.getMessage());
                        continue;
                    }
                }
                
                // Komit offset setelah semua pesan diproses
                if (!records.isEmpty()) {
                    try {
                        consumer.commitSync();
                        System.out.println("Offset dikomit");
                    } catch (Exception e) {
                        System.err.printf("Gagal mengomit offset: %s%n", e.getMessage());
                    }
                }
            }
        } finally {
            consumer.close();
        }
    }

    private static void processMessage(String key, String value) {
        // Simulasi pengolahan idempoten
        System.out.printf("Memproses pesan: key=%s, value=%s%n", key, value);
    }
}

Penjelasan Kode:

  • Menggunakan pengaturan poll dan fetch yang dioptimalkan untuk throughput dan keandalan.

  • enable.auto.commit=false untuk semantik At Least Once dengan komit manual.

  • fetch.min.bytes=1024 dan fetch.max.wait.ms=500 untuk batch besar.

  • max.poll.records=500 untuk mengontrol jumlah record per poll.

  • Heartbeat diatur dengan heartbeat.interval.ms=3000 dan session.timeout.ms=10000.

4. Uji Konsumer dengan CLI

Jalankan konsumer konsol untuk membandingkan perilaku:

kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic poll-topic \
--from-beginning \
--group consumer-group \
--property print.key=true \
--property key.separator=, \
--consumer-property fetch.min.bytes=1024 \
--consumer-property fetch.max.wait.ms=500 \
--consumer-property max.partition.fetch.bytes=1048576 \
--consumer-property fetch.max.bytes=57671680 \
--consumer-property max.poll.records=500

Contoh Keluaran:

key1,{"id": 1, "data": "Poll test message 1"}
key2,{"id": 2, "data": "Poll test message 2"}
key3,{"id": 3, "data": "Poll test message 3"}

5. Uji Performa dan Keandalan

  • Simulasi Pengolahan Lambat:

    • Tambahkan penundaan (misalnya, Thread.sleep(1000)) di processMessage untuk menguji max.poll.interval.ms.

    • Pastikan konsumer tidak memicu rebalance selama pengolahan dalam batas 5 menit.

  • Simulasi Kegagalan Heartbeat:

    • Hentikan konsumer sementara untuk melebihi session.timeout.ms (10 detik).

    • Periksa apakah rebalance terjadi menggunakan:

      kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group consumer-group --describe
  • Pantau Performa:

    • Gunakan metrik JMX seperti RecordsConsumedTotal, FetchSizeAvg, dan FetchLatencyAvg untuk mengukur efisiensi fetch.

    • Periksa ConsumerCoordinatorMetrics untuk memantau heartbeat dan rebalance.

  • Uji Fetch Behavior:

    • Kurangi fetch.min.bytes ke 1 dan amati peningkatan frekuensi fetch (dengan latensi lebih rendah).

    • Tingkatkan fetch.max.bytes ke 100MB untuk menguji kapasitas memori konsumer.

Catatan

  • Default Biasanya Cukup: Nilai default untuk heartbeat.interval.ms, session.timeout.ms, dan lainnya biasanya cukup untuk sebagian besar kasus. Jangan ubah tanpa pengujian.

  • Mode KRaft: Perilaku poll dan heartbeat serupa di mode KRaft, tetapi pastikan controller stabil untuk koordinasi grup.

  • Pengolahan Berat: Untuk aplikasi seperti Apache Spark, tingkatkan max.poll.interval.ms untuk mengakomodasi waktu pengolahan yang lama.

Praktik Terbaik

  1. Gunakan Komit Manual untuk At Least Once:

    • Atur enable.auto.commit=false dan gunakan commitSync() setelah pengolahan untuk menjamin semantik At Least Once.

  2. Jaga Heartbeat Konsisten:

    • Gunakan heartbeat.interval.ms=3000 dan session.timeout.ms=10000 untuk klaster dengan latensi rendah.

    • Pastikan heartbeat.interval.ms ≤ 1/3 dari session.timeout.ms.

  3. Sesuaikan max.poll.interval.ms untuk Pengolahan Berat:

    • Tingkatkan ke 600000 (10 menit) untuk aplikasi seperti Spark yang memerlukan waktu pengolahan lama.

    • Pastikan pengolahan selesai dalam batas ini untuk menghindari rebalance.

  4. Optimalkan Fetch Behavior:

    • Gunakan fetch.min.bytes=1024 dan fetch.max.wait.ms=500 untuk batch besar dan efisiensi jaringan.

    • Sesuaikan max.partition.fetch.bytes dan fetch.max.bytes berdasarkan kapasitas memori konsumer.

  5. Kontrol Jumlah Record:

    • Gunakan max.poll.records=500 untuk pengolahan terkendali. Kurangi ke 100 untuk aplikasi dengan memori terbatas.

  6. Pantau Performa dan Rebalance:

    • Gunakan kafka-consumer-groups.sh untuk memeriksa lag dan status grup.

    • Pantau metrik JMX seperti FetchSizeAvg, FetchLatencyAvg, dan RebalanceRate untuk mendeteksi masalah.

  7. Uji di Lingkungan Non-Produksi:

    • Simulasikan pengolahan lambat, kegagalan jaringan, atau rebalance untuk menguji pengaturan poll dan heartbeat.

    • Bandingkan performa dengan berbagai nilai fetch.min.bytes dan max.poll.records.

  8. Dokumentasikan Konfigurasi:

    • Catat pengaturan heartbeat.interval.ms, session.timeout.ms, max.poll.interval.ms, dan lainnya, serta alasan pemilihannya.

Penjelasan Tambahan

Hubungan dengan Semantik Pengiriman

  • At Least Once: Gunakan komit manual (commitSync) dengan enable.auto.commit=false untuk memastikan pesan diproses sebelum offset dikomit.

  • At Most Once: Gunakan auto-commit (enable.auto.commit=true) hanya jika kehilangan pesan dapat ditoleransi.

  • Exactly Once: Gunakan Kafka Streams (processing.guarantee=exactly_once_v2) untuk alur Kafka ke Kafka, dengan pengaturan poll yang konsisten.

Pemecahan Masalah

Jika konsumer memicu rebalance:

  • Periksa Heartbeat:

    • Pastikan heartbeat.interval.ms cukup kecil relatif terhadap session.timeout.ms.

    • Periksa koneksi jaringan ke koordinator grup:

      kafka-broker-api-versions.sh --bootstrap-server localhost:9092
  • Periksa max.poll.interval.ms:

    • Tingkatkan jika pengolahan lambat (misalnya, ke 600000).

    • Optimalkan logika pengolahan untuk mempercepat poll().

Jika throughput rendah:

  • Periksa Fetch Settings:

    • Tingkatkan fetch.min.bytes ke 10240 (10KB) untuk batch lebih besar.

    • Kurangi fetch.max.wait.ms ke 200 untuk latensi lebih rendah.

  • Periksa max.poll.records:

    • Tingkatkan ke 1000 jika memori memadai, tetapi pantau waktu pengolahan.

Jika penggunaan memori tinggi:

  • Kurangi max.partition.fetch.bytes ke 512KB atau fetch.max.bytes ke 10MB.

  • Kurangi max.poll.records ke 100 untuk mengurangi beban memori.

Kesimpulan

Perilaku poll dan thread internal (heartbeat dan poll thread) pada konsumer Kafka sangat penting untuk menjaga keanggotaan grup, pengambilan data yang efisien, dan keandalan pengolahan pesan. Pengaturan seperti heartbeat.interval.ms, session.timeout.ms, max.poll.interval.ms, fetch.min.bytes, fetch.max.wait.ms, max.partition.fetch.bytes, fetch.max.bytes, dan max.poll.records memungkinkan kontrol halus atas performa konsumer. Contoh kode Java dan CLI menunjukkan cara menerapkan konfigurasi ini, sementara praktik terbaik seperti pengujian, pemantauan metrik, dan dokumentasi memastikan operasi yang andal di lingkungan produksi. Untuk sebagian besar aplikasi, gunakan pengaturan default dengan komit manual untuk semantik At Least Once, dan sesuaikan parameter fetch untuk throughput tinggi.

Last updated