# Catatan Seekor: Kafka

Apache Kafka adalah distributed streaming platform yang dirancang untuk high-throughput, fault-tolerant handling of real-time data feeds.

## Fundamental

### Kafka Concepts

* **Producer**: Aplikasi yang mengirim data ke Kafka
* **Consumer**: Aplikasi yang membaca data dari Kafka
* **Topic**: Kategori atau feed name untuk menyimpan data
* **Partition**: Topic dibagi menjadi beberapa partition
* **Broker**: Server Kafka yang menyimpan data
* **Cluster**: Kumpulan dari beberapa broker

### Kafka Architecture

```
┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│  Producer   │───▶│    Topic    │───▶│  Consumer   │
└─────────────┘    └─────────────┘    └─────────────┘
                          │
                          ▼
                   ┌─────────────┐
                   │  Partition  │
                   └─────────────┘
                          │
                          ▼
                   ┌─────────────┐
                   │    Broker   │
                   └─────────────┘
```

## Topic and Partition

### Topic Configuration

```bash
# Create topic
kafka-topics.sh --create \
    --bootstrap-server localhost:9092 \
    --replication-factor 3 \
    --partitions 6 \
    --topic my-topic

# List topics
kafka-topics.sh --list \
    --bootstrap-server localhost:9092

# Describe topic
kafka-topics.sh --describe \
    --bootstrap-server localhost:9092 \
    --topic my-topic
```

### Partition Strategy

```java
// Custom partitioner
public class CustomPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes,
                        Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        
        if (keyBytes == null) {
            return 0;
        }
        
        // Hash-based partitioning
        return Math.abs(Utils.murmur2(keyBytes)) % numPartitions;
    }
}
```

## Producer

### Basic Producer

```java
import org.apache.kafka.clients.producer.*;

import java.util.Properties;

public class SimpleProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
                 "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
                 "org.apache.kafka.common.serialization.StringSerializer");
        
        Producer<String, String> producer = new KafkaProducer<>(props);
        
        // Send message
        ProducerRecord<String, String> record = 
            new ProducerRecord<>("my-topic", "key", "value");
        
        producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if (exception == null) {
                    System.out.println("Message sent successfully");
                } else {
                    exception.printStackTrace();
                }
            }
        });
        
        producer.close();
    }
}
```

### Producer with Configuration

```java
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
         "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
         "org.apache.kafka.common.serialization.StringSerializer");

// Reliability settings
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 3);
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 1000);

// Performance settings
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
```

## Consumer

### Basic Consumer

```java
import org.apache.kafka.clients.consumer.*;

import java.util.Arrays;
import java.util.Properties;

public class SimpleConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
                 "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
                 "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("my-topic"));
        
        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("offset = %d, key = %s, value = %s%n", 
                                    record.offset(), record.key(), record.value());
                }
            }
        } finally {
            consumer.close();
        }
    }
}
```

### Consumer Groups

```java
// Consumer group configuration
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

// Manual offset commit
try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records) {
            // Process record
            processRecord(record);
        }
        // Commit offsets after processing
        consumer.commitSync();
    }
} finally {
    consumer.close();
}
```

## Streams API

### Word Count Example

```java
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;

public class WordCountApp {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "word-count-app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
                 Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
                 Serdes.String().getClass());
        
        StreamsBuilder builder = new StreamsBuilder();
        
        // Build topology
        KStream<String, String> source = builder.stream("input-topic");
        
        KTable<String, Long> wordCounts = source
            .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
            .groupBy((key, value) -> value)
            .count();
        
        wordCounts.toStream().to("output-topic");
        
        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();
        
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}
```

## Connect API

### File Source Connector

```properties
# File source connector configuration
name=file-source
connector.class=org.apache.kafka.connect.file.FileStreamSourceConnector
tasks.max=1
file=/tmp/test.txt
topic=connect-test
```

### JDBC Sink Connector

```properties
# JDBC sink connector configuration
name=jdbc-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=users
connection.url=jdbc:postgresql://localhost:5432/mydb
connection.user=username
connection.password=password
auto.create=true
auto.evolve=true
insert.mode=insert
```

## Configuration

### Broker Configuration

```properties
# server.properties
broker.id=0
listeners=PLAINTEXT://localhost:9092
log.dirs=/tmp/kafka-logs
num.partitions=1
default.replication.factor=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
```

### Producer Configuration

```properties
# Producer settings
bootstrap.servers=localhost:9092
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
acks=all
retries=3
batch.size=16384
linger.ms=1
buffer.memory=33554432
```

### Consumer Configuration

```properties
# Consumer settings
bootstrap.servers=localhost:9092
group.id=test-group
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
auto.offset.reset=earliest
enable.auto.commit=false
```

## Monitoring

### JMX Metrics

```bash
# Enable JMX
export KAFKA_OPTS="-Dcom.sun.management.jmxremote \
    -Dcom.sun.management.jmxremote.port=9999 \
    -Dcom.sun.management.jmxremote.authenticate=false \
    -Dcom.sun.management.jmxremote.ssl=false"

# Start Kafka with JMX
bin/kafka-server-start.sh config/server.properties
```

### Kafka Manager

```yaml
# docker-compose.yml
version: '3'
services:
  kafka-manager:
    image: sheepkiller/kafka-manager
    ports:
      - "9000:9000"
    environment:
      ZK_HOSTS: zookeeper:2181
    depends_on:
      - zookeeper
```

## Best Practices

### Performance Tuning

* **Partition Count**: Choose appropriate number of partitions
* **Replication Factor**: Use 3 for production environments
* **Batch Size**: Optimize producer batch size
* **Buffer Memory**: Configure adequate buffer memory
* **Compression**: Enable compression for better throughput

### Reliability

* **Acks**: Use 'all' for critical data
* **Retries**: Configure retry mechanism
* **Replication**: Ensure adequate replication
* **Monitoring**: Monitor lag and throughput
* **Backup**: Regular backup of critical topics

### Security

* **Authentication**: Enable SASL authentication
* **Authorization**: Configure ACLs for topics
* **Encryption**: Enable SSL/TLS encryption
* **Audit**: Enable audit logging
* **Network**: Restrict network access

## References

### Kafka Resources

* [How to configure Kafka to behave like a FiFo queue?](https://stackoverflow.com/questions/62450323/how-to-configure-kafka-to-behave-like-a-fifo-queue)

### Additional Resources

* Apache Kafka Documentation: <https://kafka.apache.org/documentation/>
* Confluent Platform: <https://docs.confluent.io/>
* Kafka Streams: <https://kafka.apache.org/documentation/streams/>
* Kafka Connect: <https://kafka.apache.org/documentation/#connect>
