# Catatan Seekor: RabbitMQ

RabbitMQ adalah open-source message broker yang mengimplementasikan Advanced Message Queuing Protocol (AMQP) untuk reliable messaging.

## Fundamental

### RabbitMQ Concepts

* **Producer**: Aplikasi yang mengirim pesan
* **Consumer**: Aplikasi yang menerima pesan
* **Queue**: Buffer yang menyimpan pesan
* **Exchange**: Menerima pesan dari producer dan mengirim ke queue
* **Binding**: Aturan yang menghubungkan exchange dengan queue
* **Channel**: Virtual connection dalam connection

### AMQP Model

```
┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│  Producer   │───▶│   Exchange  │───▶│    Queue    │
└─────────────┘    └─────────────┘    └─────────────┘
                          │                    │
                          ▼                    ▼
                   ┌─────────────┐    ┌─────────────┐
                   │  Binding    │    │  Consumer   │
                   └─────────────┘    └─────────────┘
```

## Exchange Types

### Direct Exchange

```python
import pika

# Producer
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# Declare exchange
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

# Send message with routing key
severity = 'error'
message = 'Error message'
channel.basic_publish(exchange='direct_logs',
                     routing_key=severity,
                     body=message)

# Consumer
channel.queue_declare(queue='error_queue')
channel.queue_bind(exchange='direct_logs',
                   queue='error_queue',
                   routing_key='error')
```

### Topic Exchange

```python
# Producer
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

# Send message with routing key
routing_key = 'kern.critical'
message = 'A critical kernel error'
channel.basic_publish(exchange='topic_logs',
                     routing_key=routing_key,
                     body=message)

# Consumer
channel.queue_declare(queue='kernel_queue')
channel.queue_bind(exchange='topic_logs',
                   queue='kernel_queue',
                   routing_key='kern.*')
```

### Fanout Exchange

```python
# Producer
channel.exchange_declare(exchange='logs', exchange_type='fanout')

# Send message (no routing key needed)
message = 'Log message'
channel.basic_publish(exchange='logs',
                     routing_key='',
                     body=message)

# Consumer
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange='logs', queue=queue_name)
```

### Headers Exchange

```python
# Producer
channel.exchange_declare(exchange='headers_logs', exchange_type='headers')

# Send message with headers
properties = pika.BasicProperties(
    headers={'x-match': 'all', 'level': 'error', 'source': 'database'}
)
message = 'Database error'
channel.basic_publish(exchange='headers_logs',
                     routing_key='',
                     body=message,
                     properties=properties)

# Consumer
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange='headers_logs',
                   queue=queue_name,
                   arguments={'x-match': 'all',
                             'level': 'error',
                             'source': 'database'})
```

## Queue Management

### Queue Declaration

```python
# Basic queue
channel.queue_declare(queue='hello')

# Queue with arguments
channel.queue_declare(queue='task_queue',
                     durable=True,  # Survive broker restart
                     arguments={'x-message-ttl': 60000})  # TTL in ms

# Exclusive queue (deleted when connection closes)
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
```

### Queue Properties

```python
# Durable queue
channel.queue_declare(queue='durable_queue', durable=True)

# Queue with TTL
channel.queue_declare(queue='ttl_queue',
                     arguments={'x-message-ttl': 300000})  # 5 minutes

# Queue with max length
channel.queue_declare(queue='max_length_queue',
                     arguments={'x-max-length': 1000})

# Queue with max priority
channel.queue_declare(queue='priority_queue',
                     arguments={'x-max-priority': 10})
```

## Message Publishing

### Basic Publishing

```python
# Simple publish
channel.basic_publish(exchange='',
                     routing_key='hello',
                     body='Hello World!')

# Publish with properties
properties = pika.BasicProperties(
    delivery_mode=2,  # Make message persistent
    priority=5,        # Message priority
    headers={'source': 'producer'}
)

channel.basic_publish(exchange='logs',
                     routing_key='info',
                     body='Info message',
                     properties=properties)
```

### Publisher Confirms

```python
# Enable publisher confirms
channel.confirm_delivery()

# Publish with confirmation
if channel.basic_publish(exchange='logs',
                        routing_key='info',
                        body='Info message'):
    print("Message confirmed")
else:
    print("Message not confirmed")
```

## Message Consumption

### Basic Consumption

```python
def callback(ch, method, properties, body):
    print(f"Received {body}")
    ch.basic_ack(delivery_tag=method.delivery_tag)

# Set up consumer
channel.basic_consume(queue='hello',
                     on_message_callback=callback,
                     auto_ack=False)

print('Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
```

### Consumer Acknowledgments

```python
def callback(ch, method, properties, body):
    print(f"Received {body}")
    
    # Process message
    try:
        process_message(body)
        # Acknowledge successful processing
        ch.basic_ack(delivery_tag=method.delivery_tag)
    except Exception as e:
        print(f"Error processing message: {e}")
        # Reject message and requeue
        ch.basic_nack(delivery_tag=method.delivery_tag,
                      requeue=True)

# Set up consumer with manual acknowledgment
channel.basic_consume(queue='task_queue',
                     on_message_callback=callback,
                     auto_ack=False)
```

### Consumer Prefetch

```python
# Set prefetch count
channel.basic_qos(prefetch_count=1)

# This ensures that the worker won't receive a new message
# until it has processed and acknowledged the previous one
```

## Dead Letter Exchange

### Dead Letter Configuration

```python
# Declare dead letter exchange
channel.exchange_declare(exchange='dlx', exchange_type='direct')

# Declare dead letter queue
channel.queue_declare(queue='dlq')
channel.queue_bind(exchange='dlx', queue='dlq', routing_key='dead')

# Declare main queue with dead letter exchange
channel.queue_declare(queue='main_queue',
                     arguments={'x-dead-letter-exchange': 'dlx',
                               'x-dead-letter-routing-key': 'dead'})
```

### Message Rejection

```python
def callback(ch, method, properties, body):
    try:
        process_message(body)
        ch.basic_ack(delivery_tag=method.delivery_tag)
    except Exception as e:
        print(f"Error processing message: {e}")
        # Reject message without requeue (goes to DLQ)
        ch.basic_nack(delivery_tag=method.delivery_tag,
                      requeue=False)
```

## Clustering and High Availability

### Cluster Setup

```bash
# Join cluster
rabbitmqctl join_cluster rabbit@node1

# Check cluster status
rabbitmqctl cluster_status

# Set policy for queue mirroring
rabbitmqctl set_policy ha-all "^ha\." '{"ha-mode":"all","ha-sync-mode":"automatic"}'
```

### Queue Mirroring

```python
# Declare queue with mirroring
channel.queue_declare(queue='mirrored_queue',
                     arguments={'x-ha-policy': 'all'})
```

## Monitoring and Management

### Management Plugin

```bash
# Enable management plugin
rabbitmq-plugins enable rabbitmq_management

# Access management UI at http://localhost:15672
# Default credentials: guest/guest
```

### Command Line Tools

```bash
# List queues
rabbitmqctl list_queues

# List exchanges
rabbitmqctl list_exchanges

# List bindings
rabbitmqctl list_bindings

# Check queue status
rabbitmqctl list_queues name messages_ready messages_unacknowledged
```

### Health Checks

```bash
# Check node health
rabbitmqctl node_health_check

# Check cluster health
rabbitmqctl cluster_status

# Check queue health
rabbitmqctl list_queues name messages_ready messages_unacknowledged
```

## Performance Tuning

### Connection Pooling

```python
import pika
from pika.adapters.blocking_connection import BlockingConnection

class RabbitMQConnectionPool:
    def __init__(self, host='localhost', max_connections=10):
        self.host = host
        self.max_connections = max_connections
        self.connections = []
        
    def get_connection(self):
        if not self.connections:
            connection = BlockingConnection(
                pika.ConnectionParameters(host=self.host)
            )
            return connection
        return self.connections.pop()
        
    def return_connection(self, connection):
        if len(self.connections) < self.max_connections:
            self.connections.append(connection)
        else:
            connection.close()
```

### Message Persistence

```python
# Make message persistent
properties = pika.BasicProperties(
    delivery_mode=2,  # Persistent delivery
)

channel.basic_publish(exchange='logs',
                     routing_key='info',
                     body='Persistent message',
                     properties=properties)

# Declare durable queue
channel.queue_declare(queue='durable_queue', durable=True)
```

## Error Handling

### Connection Recovery

```python
import pika
import time

def create_connection():
    while True:
        try:
            connection = pika.BlockingConnection(
                pika.ConnectionParameters('localhost')
            )
            return connection
        except pika.exceptions.AMQPConnectionError:
            print("Connection failed, retrying in 5 seconds...")
            time.sleep(5)

def publish_with_retry(channel, exchange, routing_key, body, max_retries=3):
    for attempt in range(max_retries):
        try:
            channel.basic_publish(exchange=exchange,
                                routing_key=routing_key,
                                body=body)
            return True
        except Exception as e:
            print(f"Publish attempt {attempt + 1} failed: {e}")
            if attempt == max_retries - 1:
                raise
            time.sleep(2 ** attempt)  # Exponential backoff
```

## References

### RabbitMQ Resources

* [RabbitMQ Tutorials](https://www.rabbitmq.com/getstarted.html)

### Additional Resources

* RabbitMQ Documentation: <https://www.rabbitmq.com/documentation.html>
* AMQP Specification: <https://www.amqp.org/specification/0-9-1/amqp-org-download>
* Pika Python Client: <https://pika.readthedocs.io/>

## Best Practices

### Message Design

* **Idempotency**: Design messages to be processed multiple times safely
* **Serialization**: Use efficient serialization formats (JSON, Protocol Buffers)
* **Size**: Keep messages small and focused
* **Headers**: Use message headers for metadata

### Performance

* **Connection Reuse**: Reuse connections and channels
* **Batch Processing**: Process messages in batches when possible
* **Async Processing**: Use asynchronous consumers for better throughput
* **Monitoring**: Monitor queue depths and consumer lag

### Reliability

* **Persistent Messages**: Use persistent delivery for critical messages
* **Dead Letter Queues**: Handle failed messages gracefully
* **Consumer Acknowledgments**: Use manual acknowledgments
* **Error Handling**: Implement proper error handling and retry logic
