Merhaba! Bugün modern distributed sistemlerin vazgeçilmez bileşeni RabbitMQ’yu inceleyeceğiz. Message queuing, microservices iletişimi ve asenkron processing konularını kod örnekleriyle öğreneceğiz.
RabbitMQ Nedir?
RabbitMQ, AMQP (Advanced Message Queuing Protocol) tabanlı açık kaynaklı bir message broker’dır. Uygulamalar arasında güvenilir mesaj iletişimi sağlar.
Temel Kavramlar
# RabbitMQ Components
"""
Producer → Exchange → Queue → Consumer
↓ ↓ ↓ ↓
Mesaj Routing Buffer Process
Gönder Yap Tut Et
"""
Core Components:
- Producer: Mesaj gönderen uygulama
- Exchange: Mesajları route eden component
- Queue: Mesajları tutan buffer
- Consumer: Mesajları işleyen uygulama
- Binding: Exchange ve Queue arası bağlantı
Installation ve Setup
Docker ile RabbitMQ
# docker-compose.yml
version: "3.8"
services:
rabbitmq:
image: rabbitmq:3.12-management
container_name: rabbitmq
ports:
- "5672:5672" # AMQP port
- "15672:15672" # Management UI
environment:
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: admin123
RABBITMQ_DEFAULT_VHOST: /
volumes:
- rabbitmq_data:/var/lib/rabbitmq
networks:
- app_network
# Example producer service
producer:
build: ./producer
depends_on:
- rabbitmq
environment:
RABBITMQ_URL: amqp://admin:admin123@rabbitmq:5672/
networks:
- app_network
# Example consumer service
consumer:
build: ./consumer
depends_on:
- rabbitmq
environment:
RABBITMQ_URL: amqp://admin:admin123@rabbitmq:5672/
networks:
- app_network
volumes:
rabbitmq_data:
networks:
app_network:
driver: bridge
Python Setup
# Dependencies
pip install pika python-dotenv asyncio-mqtt celery
Basic Message Queuing
Simple Producer-Consumer
import pika
import json
import time
import logging
from typing import Dict, Any, Callable
from dataclasses import dataclass
from datetime import datetime
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class Message:
id: str
content: Dict[str, Any]
timestamp: datetime
retry_count: int = 0
class RabbitMQConnection:
def __init__(self, connection_url: str):
self.connection_url = connection_url
self.connection = None
self.channel = None
def connect(self):
"""RabbitMQ bağlantısı kur"""
try:
parameters = pika.URLParameters(self.connection_url)
self.connection = pika.BlockingConnection(parameters)
self.channel = self.connection.channel()
logger.info("RabbitMQ connection established")
return True
except Exception as e:
logger.error(f"Connection failed: {e}")
return False
def disconnect(self):
"""Bağlantıyı kapat"""
if self.connection and not self.connection.is_closed:
self.connection.close()
logger.info("RabbitMQ connection closed")
class SimpleProducer(RabbitMQConnection):
def __init__(self, connection_url: str):
super().__init__(connection_url)
self.connect()
def declare_queue(self, queue_name: str, durable: bool = True):
"""Queue oluştur"""
self.channel.queue_declare(queue=queue_name, durable=durable)
logger.info(f"Queue declared: {queue_name}")
def send_message(self, queue_name: str, message: Dict[str, Any],
persistent: bool = True):
"""Basit mesaj gönder"""
try:
# Message properties
properties = pika.BasicProperties(
delivery_mode=2 if persistent else 1, # Persistent message
timestamp=int(time.time()),
message_id=f"msg_{int(time.time() * 1000)}"
)
# Send message
self.channel.basic_publish(
exchange='',
routing_key=queue_name,
body=json.dumps(message, default=str),
properties=properties
)
logger.info(f"Message sent to {queue_name}: {message}")
return True
except Exception as e:
logger.error(f"Failed to send message: {e}")
return False
class SimpleConsumer(RabbitMQConnection):
def __init__(self, connection_url: str):
super().__init__(connection_url)
self.connect()
self.message_handlers = {}
def declare_queue(self, queue_name: str, durable: bool = True):
"""Queue oluştur"""
self.channel.queue_declare(queue=queue_name, durable=durable)
# QoS ayarı - aynı anda işlenecek mesaj sayısı
self.channel.basic_qos(prefetch_count=1)
logger.info(f"Queue declared: {queue_name}")
def register_handler(self, queue_name: str, handler: Callable):
"""Message handler register et"""
self.message_handlers[queue_name] = handler
def process_message(self, channel, method, properties, body):
"""Mesaj işleme callback"""
try:
queue_name = method.routing_key
message_data = json.loads(body.decode('utf-8'))
logger.info(f"Processing message from {queue_name}: {message_data}")
# Handler çağır
if queue_name in self.message_handlers:
handler = self.message_handlers[queue_name]
success = handler(message_data)
if success:
# Acknowledge message
channel.basic_ack(delivery_tag=method.delivery_tag)
logger.info("Message acknowledged")
else:
# Reject and requeue
channel.basic_nack(
delivery_tag=method.delivery_tag,
requeue=True
)
logger.warning("Message rejected and requeued")
else:
logger.warning(f"No handler for queue: {queue_name}")
channel.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
logger.error(f"Message processing failed: {e}")
# Reject message
channel.basic_nack(
delivery_tag=method.delivery_tag,
requeue=False
)
def start_consuming(self, queue_name: str):
"""Mesaj dinlemeye başla"""
self.channel.basic_consume(
queue=queue_name,
on_message_callback=self.process_message
)
logger.info(f"Started consuming from {queue_name}")
try:
self.channel.start_consuming()
except KeyboardInterrupt:
logger.info("Stopping consumer...")
self.channel.stop_consuming()
# Usage example
def example_message_handler(message_data: Dict) -> bool:
"""Örnek mesaj işleyici"""
try:
print(f"Processing order: {message_data}")
# Simulate processing
time.sleep(2)
# Processing logic here
order_id = message_data.get('order_id')
customer_email = message_data.get('customer_email')
print(f"Order {order_id} processed for {customer_email}")
return True
except Exception as e:
print(f"Handler error: {e}")
return False
# Producer example
def send_sample_messages():
producer = SimpleProducer("amqp://admin:admin123@localhost:5672/")
producer.declare_queue("order_queue")
# Sample orders
orders = [
{
"order_id": "ORD001",
"customer_email": "john@example.com",
"items": [{"product": "Laptop", "quantity": 1}],
"total": 999.99
},
{
"order_id": "ORD002",
"customer_email": "jane@example.com",
"items": [{"product": "Mouse", "quantity": 2}],
"total": 29.99
}
]
for order in orders:
producer.send_message("order_queue", order)
time.sleep(1)
producer.disconnect()
# Consumer example
def consume_messages():
consumer = SimpleConsumer("amqp://admin:admin123@localhost:5672/")
consumer.declare_queue("order_queue")
consumer.register_handler("order_queue", example_message_handler)
consumer.start_consuming()
# Run producer
# send_sample_messages()
# Run consumer
# consume_messages()
Exchange Types ve Routing
Direct Exchange
class DirectExchangeExample:
def __init__(self, connection_url: str):
self.connection = pika.BlockingConnection(pika.URLParameters(connection_url))
self.channel = self.connection.channel()
# Exchange declare et
self.exchange_name = "direct_logs"
self.channel.exchange_declare(
exchange=self.exchange_name,
exchange_type='direct',
durable=True
)
def setup_queues(self):
"""Severity-based queues"""
severities = ['info', 'warning', 'error', 'critical']
for severity in severities:
queue_name = f"logs_{severity}"
# Queue oluştur
self.channel.queue_declare(queue=queue_name, durable=True)
# Exchange'e bind et
self.channel.queue_bind(
exchange=self.exchange_name,
queue=queue_name,
routing_key=severity
)
print(f"Queue {queue_name} bound to {self.exchange_name} with key {severity}")
def send_log(self, severity: str, message: str):
"""Log mesajı gönder"""
log_data = {
'timestamp': datetime.now().isoformat(),
'severity': severity,
'message': message,
'service': 'order_service'
}
self.channel.basic_publish(
exchange=self.exchange_name,
routing_key=severity,
body=json.dumps(log_data),
properties=pika.BasicProperties(delivery_mode=2)
)
print(f"Sent {severity} log: {message}")
def consume_logs(self, severity: str):
"""Belirli severity'deki logları dinle"""
queue_name = f"logs_{severity}"
def callback(channel, method, properties, body):
log_data = json.loads(body.decode('utf-8'))
print(f"[{severity.upper()}] {log_data['timestamp']}: {log_data['message']}")
channel.basic_ack(delivery_tag=method.delivery_tag)
self.channel.basic_consume(queue=queue_name, on_message_callback=callback)
print(f"Consuming {severity} logs. Press Ctrl+C to stop...")
try:
self.channel.start_consuming()
except KeyboardInterrupt:
self.channel.stop_consuming()
# Usage
direct_example = DirectExchangeExample("amqp://admin:admin123@localhost:5672/")
direct_example.setup_queues()
# Send different severity logs
direct_example.send_log("info", "User logged in successfully")
direct_example.send_log("warning", "High memory usage detected")
direct_example.send_log("error", "Database connection failed")
direct_example.send_log("critical", "System disk space critical")
# Consume only error logs
# direct_example.consume_logs("error")
Topic Exchange
class TopicExchangeExample:
def __init__(self, connection_url: str):
self.connection = pika.BlockingConnection(pika.URLParameters(connection_url))
self.channel = self.connection.channel()
# Topic exchange
self.exchange_name = "topic_notifications"
self.channel.exchange_declare(
exchange=self.exchange_name,
exchange_type='topic',
durable=True
)
def setup_notification_queues(self):
"""Notification routing patterns"""
# Queue configurations: (queue_name, routing_patterns)
queue_configs = [
("email_notifications", ["*.email.*", "urgent.email.*"]),
("sms_notifications", ["*.sms.*", "urgent.sms.*"]),
("push_notifications", ["*.push.*", "urgent.push.*"]),
("urgent_all", ["urgent.*"]),
("user_all", ["user.*"]),
("system_all", ["system.*"])
]
for queue_name, patterns in queue_configs:
# Queue oluştur
self.channel.queue_declare(queue=queue_name, durable=True)
# Routing patterns bind et
for pattern in patterns:
self.channel.queue_bind(
exchange=self.exchange_name,
queue=queue_name,
routing_key=pattern
)
print(f"Bound {queue_name} to pattern: {pattern}")
def send_notification(self, routing_key: str, notification_data: Dict):
"""Notification gönder"""
message = {
'timestamp': datetime.now().isoformat(),
'routing_key': routing_key,
'data': notification_data
}
self.channel.basic_publish(
exchange=self.exchange_name,
routing_key=routing_key,
body=json.dumps(message, default=str),
properties=pika.BasicProperties(delivery_mode=2)
)
print(f"Sent notification: {routing_key}")
def consume_queue(self, queue_name: str):
"""Queue dinle"""
def callback(channel, method, properties, body):
message = json.loads(body.decode('utf-8'))
print(f"[{queue_name}] Routing: {message['routing_key']}")
print(f" Data: {message['data']}")
channel.basic_ack(delivery_tag=method.delivery_tag)
self.channel.basic_consume(queue=queue_name, on_message_callback=callback)
print(f"Consuming {queue_name}. Press Ctrl+C to stop...")
try:
self.channel.start_consuming()
except KeyboardInterrupt:
self.channel.stop_consuming()
# Usage
topic_example = TopicExchangeExample("amqp://admin:admin123@localhost:5672/")
topic_example.setup_notification_queues()
# Send different types of notifications
notifications = [
("user.email.welcome", {"user_id": 123, "subject": "Welcome!"}),
("user.sms.verification", {"phone": "+1234567890", "code": "123456"}),
("urgent.email.security", {"user_id": 456, "subject": "Security Alert"}),
("system.push.maintenance", {"message": "Scheduled maintenance"}),
("urgent.sms.payment", {"phone": "+1234567890", "amount": "$100"})
]
for routing_key, data in notifications:
topic_example.send_notification(routing_key, data)
# Consume urgent notifications only
# topic_example.consume_queue("urgent_all")
Fanout Exchange (Pub/Sub)
class PubSubExample:
def __init__(self, connection_url: str):
self.connection = pika.BlockingConnection(pika.URLParameters(connection_url))
self.channel = self.connection.channel()
# Fanout exchange - tüm queues'e broadcast
self.exchange_name = "events_broadcast"
self.channel.exchange_declare(
exchange=self.exchange_name,
exchange_type='fanout',
durable=True
)
def setup_event_subscribers(self):
"""Event subscriber services"""
services = [
"analytics_service",
"notification_service",
"audit_service",
"recommendation_service"
]
for service in services:
queue_name = f"events_{service}"
# Queue oluştur
self.channel.queue_declare(queue=queue_name, durable=True)
# Fanout exchange'e bind et (routing key gerekmez)
self.channel.queue_bind(
exchange=self.exchange_name,
queue=queue_name
)
print(f"Service {service} subscribed to events")
def publish_event(self, event_type: str, event_data: Dict):
"""Event publish et - tüm subscribers'a gider"""
event = {
'event_id': f"evt_{int(time.time() * 1000)}",
'event_type': event_type,
'timestamp': datetime.now().isoformat(),
'data': event_data
}
self.channel.basic_publish(
exchange=self.exchange_name,
routing_key='', # Fanout'ta routing key önemli değil
body=json.dumps(event, default=str),
properties=pika.BasicProperties(delivery_mode=2)
)
print(f"Published event: {event_type}")
def subscribe_service(self, service_name: str):
"""Service event subscription"""
queue_name = f"events_{service_name}"
def process_event(channel, method, properties, body):
event = json.loads(body.decode('utf-8'))
print(f"[{service_name.upper()}] Processing event: {event['event_type']}")
print(f" Event ID: {event['event_id']}")
print(f" Data: {event['data']}")
# Service-specific processing
if service_name == "analytics_service":
self.process_analytics(event)
elif service_name == "notification_service":
self.process_notification(event)
elif service_name == "audit_service":
self.process_audit(event)
channel.basic_ack(delivery_tag=method.delivery_tag)
self.channel.basic_consume(queue=queue_name, on_message_callback=process_event)
print(f"{service_name} started. Press Ctrl+C to stop...")
try:
self.channel.start_consuming()
except KeyboardInterrupt:
self.channel.stop_consuming()
def process_analytics(self, event: Dict):
"""Analytics service processing"""
print(f" 📊 Analytics: Recording {event['event_type']} metrics")
def process_notification(self, event: Dict):
"""Notification service processing"""
if event['event_type'] in ['user_registered', 'order_placed']:
print(f" 📧 Sending notification for {event['event_type']}")
def process_audit(self, event: Dict):
"""Audit service processing"""
print(f" 📝 Audit: Logging {event['event_type']} event")
# Usage
pubsub_example = PubSubExample("amqp://admin:admin123@localhost:5672/")
pubsub_example.setup_event_subscribers()
# Publish sample events
events = [
("user_registered", {"user_id": 123, "email": "john@example.com"}),
("order_placed", {"order_id": "ORD001", "user_id": 123, "amount": 99.99}),
("payment_processed", {"payment_id": "PAY001", "order_id": "ORD001"}),
("product_viewed", {"product_id": "PROD001", "user_id": 123})
]
for event_type, data in events:
pubsub_example.publish_event(event_type, data)
# Start analytics service subscriber
# pubsub_example.subscribe_service("analytics_service")
Configuration Files
# rabbitmq.conf
# Clustering
cluster_formation.peer_discovery_backend = classic_config
cluster_formation.classic_config.nodes.1 = rabbit@rabbitmq-prod
cluster_formation.classic_config.nodes.2 = rabbit@rabbitmq-2
# Performance
vm_memory_high_watermark.relative = 0.6
disk_free_limit.relative = 2.0
# Security
auth_mechanisms.1 = PLAIN
auth_mechanisms.2 = AMQPLAIN
ssl_options.verify = verify_peer
ssl_options.fail_if_no_peer_cert = false
# Management
management.tcp.port = 15672
management.ssl.port = 15671
# Logging
log.console = true
log.console.level = info
log.file = false
# Limits
channel_max = 2047
connection_max = 1000
heartbeat = 60
Sonuç
RabbitMQ modern distributed sistemlerin kritik bir parçasıdır:
💡 Temel Özellikler:
- Message Queuing: Asenkron iletişim
- Exchange Types: Direct, Topic, Fanout, Headers
- Routing: Flexible message yönlendirme
- Durability: Persistent messaging
- Clustering: High availability
🔧 Production Features:
- Dead Letter Queues: Error handling
- Priority Queues: Message importance
- TTL: Message expiration
- Publisher Confirms: Delivery guarantees
🚀 Use Cases:
- Microservices communication
- Background job processing
- Event-driven architecture
- Load balancing
- Decoupling systems
⚡ Best Practices:
- Message idempotency sağlayın
- Proper error handling implement edin
- Connection pooling kullanın
- Monitoring ve alerting kurun
- Clustering ile HA sağlayın
RabbitMQ ile scalable ve reliable distributed sistemler inşa edebilirsiniz!