Kafka Producer Advanced: Kafka Producer Batching

Kafka Producer Batching

Dalam Apache Kafka, batching adalah teknik di mana produser mengelompokkan beberapa pesan ke dalam satu batch sebelum mengirimnya ke broker. Batching meningkatkan throughput, mengurangi latensi jaringan, dan meningkatkan efisiensi penyimpanan serta kompresi. Secara default, produser Kafka berusaha mengirim pesan secepat mungkin, tetapi dengan konfigurasi yang tepat, Anda dapat mengoptimalkan batching untuk mencapai performa tinggi. Artikel ini menjelaskan cara kerja batching, parameter utama seperti linger.ms dan batch.size, serta langkah-langkah praktis untuk mengonfigurasi produser menggunakan kode Java dan alat CLI. Kami juga menyertakan praktik terbaik untuk memastikan keseimbangan antara throughput, latensi, dan penggunaan sumber daya.

Cara Kerja Batching pada Produser Kafka

Produser Kafka mengelompokkan pesan ke dalam batch untuk mengurangi jumlah perjalanan jaringan (network trips) ke broker, sehingga meningkatkan efisiensi. Batching bekerja sebagai berikut:

  1. Pengelompokan Pesan:

    • Produser mengumpulkan pesan yang ditujukan ke partisi yang sama ke dalam satu batch.

    • Setiap batch dialokasikan per partisi, sehingga pesan untuk partisi berbeda tidak dicampur dalam satu batch.

  2. Pengiriman Batch:

    • Secara default, produser mengirim batch secepat mungkin (dengan linger.ms=0).

    • Produser dapat memiliki hingga 5 permintaan dalam penerbangan (in-flight requests), yang dikontrol oleh max.in.flight.requests.per.connection (default: 5).

    • Jika lebih banyak pesan harus dikirim sementara 5 permintaan sedang diproses, produser secara otomatis mengelompokkan pesan tambahan ke dalam batch baru hingga permintaan sebelumnya selesai.

  3. Efisiensi Batching:

    • Batching meningkatkan rasio kompresi jika compression.type diaktifkan (misalnya, snappy, lz4), karena batch yang lebih besar memiliki lebih banyak data untuk dikompresi.

    • Mengurangi overhead jaringan karena lebih sedikit permintaan dikirim.

    • Mengurangi penggunaan disk di broker karena batch yang dikompresi disimpan lebih kecil.

Keuntungan Batching

  • Throughput Tinggi: Mengirim lebih banyak pesan dalam satu permintaan meningkatkan jumlah pesan yang diproses per detik.

  • Latensi Jaringan Rendah: Mengurangi jumlah perjalanan jaringan.

  • Efisiensi Kompresi: Batch besar menghasilkan rasio kompresi yang lebih baik, terutama untuk data berbasis teks seperti JSON.

  • Penggunaan Disk Efisien: Batch yang dikompresi membutuhkan ruang penyimpanan lebih kecil di broker.

Kekurangan Batching

  • Latensi Tambahan: Menunggu batch terisi (dengan linger.ms > 0) dapat menambah sedikit latensi.

  • Penggunaan Memori: Batch besar (dengan batch.size tinggi) meningkatkan penggunaan memori di sisi produser.

  • Overhead CPU: Jika kompresi diaktifkan, batching memerlukan siklus CPU untuk kompresi.

Parameter Konfigurasi Batching

Dua parameter utama mengontrol batching pada produser Kafka:

1. linger.ms

  • Deskripsi: Jumlah milidetik yang bersedia ditunggu produser sebelum mengirim batch.

  • Default: 0 (kirim segera tanpa menunggu).

  • Efek:

    • Dengan linger.ms=0, pesan dikirim secepat mungkin, mengurangi peluang batching.

    • Dengan linger.ms=20, produser menunggu hingga 20ms untuk mengumpulkan lebih banyak pesan ke dalam batch, meningkatkan throughput dengan biaya latensi kecil.

    • Jika batch mencapai ukuran maksimum (batch.size) sebelum linger.ms habis, batch akan dikirim segera.

  • Rekomendasi: Gunakan linger.ms=5 hingga 20 untuk aplikasi dengan throughput tinggi.

2. batch.size

  • Deskripsi: Ukuran maksimum batch dalam byte sebelum dikirim.

  • Default: 16384 (16KB).

  • Efek:

    • Batch besar (misalnya, 32KB atau 64KB) meningkatkan kompresi, throughput, dan efisiensi jaringan.

    • Pesan yang lebih besar dari batch.size tidak akan di-batch dan dikirim sebagai pesan tunggal.

    • Setiap partisi memiliki batch sendiri, sehingga batch.size yang terlalu besar dapat meningkatkan penggunaan memori, terutama untuk topik dengan banyak partisi.

  • Rekomendasi: Gunakan batch.size=32768 (32KB) atau 65536 (64KB) untuk throughput tinggi, tetapi sesuaikan berdasarkan memori yang tersedia.

Konfigurasi Produser untuk Throughput Tinggi

Untuk produser dengan throughput tinggi, kombinasikan batching dengan kompresi. Berikut adalah konfigurasi yang disarankan:

Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "20");
properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, Integer.toString(32 * 1024)); // 32KB
properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
properties.setProperty(ProducerConfig.ACKS_CONFIG, "all");
properties.setProperty(ProducerConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE));
properties.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");
properties.setProperty(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, "120000");

Penjelasan:

  • compression.type=snappy: Mengaktifkan kompresi untuk efisiensi jaringan dan penyimpanan.

  • linger.ms=20: Menunggu 20ms untuk mengisi batch, meningkatkan peluang batching.

  • batch.size=32768: Mengatur ukuran batch ke 32KB untuk kompresi dan throughput lebih baik.

  • Idempotence dan acks=all memastikan pengiriman yang andal tanpa duplikasi.

Praktik Konfigurasi dan Pengujian

Berikut adalah langkah-langkah praktis untuk mengonfigurasi produser dengan batching optimal menggunakan kode Java dan alat CLI, serta pengujian untuk memverifikasi performa.

Prasyarat

  • Pastikan klaster Kafka berjalan (versi ≥ 0.11, mode Zookeeper atau KRaft).

  • Gunakan ekstensi CLI yang sesuai: .sh untuk Linux/Mac, .bat untuk Windows.

  • Pastikan broker Kafka aktif di localhost:9092.

  • Klaster memiliki setidaknya tiga broker untuk mendukung replication.factor=3.

Langkah-langkah Praktis

1. Buat Topik

Buat topik bernama batch-topic dengan 3 partisi, faktor replikasi 3, dan min.insync.replicas=2:

kafka-topics.sh --bootstrap-server localhost:9092 --create --topic batch-topic \
--partitions 3 --replication-factor 3 \
--config min.insync.replicas=2 \
--config compression.type=producer

Verifikasi konfigurasi topik:

kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic batch-topic

Contoh Keluaran:

Topic: batch-topic TopicId: XYZ123 PartitionCount: 3 ReplicationFactor: 3 Configs: min.insync.replicas=2,compression.type=producer
Topic: batch-topic Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: batch-topic Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
Topic: batch-topic Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2

2. Implementasi Produser dengan Batching (Java)

Berikut adalah contoh kode Java untuk produser dengan batching dan kompresi:

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;

public class BatchProducer {
    public static void main(String[] args) {
        // Konfigurasi produser
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
        properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "20");
        properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, Integer.toString(32 * 1024)); // 32KB
        properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
        properties.setProperty(ProducerConfig.ACKS_CONFIG, "all");
        properties.setProperty(ProducerConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE));
        properties.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");
        properties.setProperty(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, "120000");

        // Buat produser
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        // Kirim pesan JSON
        for (int i = 0; i < 1000; i++) {
            String key = "key-" + i;
            String value = "{\"id\": " + i + ", \"data\": \"High throughput JSON message\"}";
            ProducerRecord<String, String> record = new ProducerRecord<>("batch-topic", key, value);

            producer.send(record, (metadata, exception) -> {
                if (exception == null) {
                    System.out.printf("Pesan dikirim: key=%s, partition=%d, offset=%d%n",
                            key, metadata.partition(), metadata.offset());
                } else {
                    System.err.printf("Gagal mengirim pesan: %s%n", exception.getMessage());
                }
            });
        }

        // Tutup produser
        producer.flush();
        producer.close();
    }
}

Penjelasan Kode:

  • Menggunakan linger.ms=20 dan batch.size=32768 untuk batching optimal.

  • Mengaktifkan kompresi snappy untuk efisiensi.

  • Mengirim 1000 pesan JSON untuk menguji throughput.

  • Idempotence memastikan tidak ada duplikasi.

3. Uji dengan Produser Konsol

Jalankan produser konsol dengan batching dan kompresi:

kafka-console-producer.sh --bootstrap-server localhost:9092 \
--topic batch-topic \
--producer-property compression.type=snappy \
--producer-property linger.ms=20 \
--producer-property batch.size=32768 \
--producer-property enable.idempotence=true \
--producer-property acks=all \
--producer-property retries=2147483647 \
--producer-property max.in.flight.requests.per.connection=5 \
--producer-property delivery.timeout.ms=120000 \
--property parse.key=true \
--property key.separator=,

Kirim beberapa pesan JSON:

key1,{"id": 1, "data": "High throughput message"}
key2,{"id": 2, "data": "Another batch message"}
key3,{"id": 3, "data": "Optimized batch data"}

4. Konsumsi Pesan

Jalankan konsumer untuk memverifikasi pesan:

kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic batch-topic \
--from-beginning \
--property print.key=true \
--property key.separator=,

Contoh Keluaran:

key1,{"id": 1, "data": "High throughput message"}
key2,{"id": 2, "data": "Another batch message"}
key3,{"id": 3, "data": "Optimized batch data"}

5. Uji Performa Batching

  • Pantau Throughput: Gunakan metrik JMX seperti BytesOutPerSec untuk membandingkan throughput dengan dan tanpa batching.

  • Periksa Latensi: Gunakan metrik RequestLatencyAvg untuk memastikan latensi tetap rendah dengan linger.ms=20.

  • Periksa Penggunaan Memori: Gunakan alat seperti jvisualvm untuk memantau memori produser dengan batch.size=32768.

  • Uji Berbagai Nilai: Coba linger.ms=10 dan batch.size=65536 untuk melihat dampak pada performa.

Catatan

  • Pesan Besar: Pesan yang lebih besar dari batch.size tidak akan di-batch dan dikirim sebagai pesan tunggal, yang dapat mengurangi efisiensi.

  • Memori: Batch besar meningkatkan penggunaan memori, terutama untuk topik dengan banyak partisi. Sesuaikan batch.size berdasarkan kapasitas memori produser.

  • Mode KRaft: Batching bekerja sama di mode KRaft, tetapi pastikan compression.type=producer pada topik untuk efisiensi maksimum.

Praktik Terbaik

  1. Gunakan Batching untuk Throughput Tinggi:

    • Aktifkan batching dengan linger.ms=5 hingga 20 dan batch.size=32768 atau 65536 untuk aplikasi dengan throughput tinggi.

  2. Kombinasikan dengan Kompresi:

    • Gunakan compression.type=snappy atau lz4 untuk meningkatkan efisiensi batching, terutama untuk data JSON.

  3. Sesuaikan linger.ms dan batch.size:

    • Gunakan linger.ms=20 untuk menambah peluang batching tanpa latensi berlebihan.

    • Atur batch.size=32768 atau lebih tinggi, tetapi pastikan memori cukup untuk setiap partisi.

  4. Gunakan Idempotence:

    • Aktifkan enable.idempotence=true untuk mencegah duplikasi, terutama saat menggunakan batching dengan retries.

  5. Pantau Performa:

    • Gunakan metrik JMX seperti BytesOutPerSec, RequestLatencyAvg, dan BatchSizeAvg untuk memantau efisiensi batching.

    • Periksa penggunaan memori dan CPU untuk mendeteksi bottleneck.

  6. Uji di Lingkungan Non-Produksi:

    • Uji berbagai kombinasi linger.ms dan batch.size untuk menemukan pengaturan optimal untuk kasus penggunaan Anda.

  7. Hindari batch.size Terlalu Besar:

    • Jangan atur batch.size terlalu tinggi (misalnya, >1MB) untuk menghindari penggunaan memori berlebihan.

  8. Dokumentasikan Konfigurasi:

    • Catat pengaturan linger.ms, batch.size, dan compression.type untuk setiap produser, bersama dengan alasan pemilihannya.

Penjelasan Tambahan

Hubungan dengan Kompresi

  • Batching meningkatkan efektivitas kompresi karena batch besar memiliki lebih banyak data untuk dikompresi, menghasilkan rasio kompresi yang lebih baik.

  • Parameter linger.ms dan batch.size bekerja bersama dengan compression.type untuk mengoptimalkan efisiensi jaringan dan penyimpanan.

Pemecahan Masalah

Jika throughput rendah:

  • Periksa linger.ms:

    • Tingkatkan ke 20 atau lebih tinggi untuk batch lebih besar, tetapi pantau latensi.

  • Periksa batch.size:

    • Tingkatkan ke 65536 jika memori memadai, tetapi pastikan tidak melebihi kapasitas.

  • Periksa Kompresi:

    • Pastikan compression.type diatur (misalnya, snappy) untuk data berbasis teks.

Jika latensi tinggi:

  • Kurangi linger.ms ke 5 atau lebih rendah untuk mengurangi penundaan.

  • Gunakan metrik RequestLatencyAvg untuk mengidentifikasi bottleneck.

Jika penggunaan memori tinggi:

  • Turunkan batch.size ke 16384 atau sesuaikan jumlah partisi topik.

  • Gunakan alat seperti jvisualvm untuk memantau memori produser.

Kesimpulan

Batching pada produser Kafka adalah teknik penting untuk meningkatkan throughput, mengurangi latensi jaringan, dan meningkatkan efisiensi kompresi serta penyimpanan. Dengan mengatur linger.ms=20 dan batch.size=32768, serta mengaktifkan kompresi seperti snappy, Anda dapat mengoptimalkan performa untuk aplikasi dengan throughput tinggi. Contoh kode Java dan CLI menunjukkan cara menerapkan batching dengan konfigurasi aman, sementara praktik terbaik seperti pengujian, pemantauan metrik, dan dokumentasi memastikan operasi yang efisien. Batching sangat penting di lingkungan produksi, terutama jika dikombinasikan dengan kompresi dan idempotence.

Last updated