Skip to content

RabbitMQ: Message Queuing ve Microservices İletişimi

Published: at 11:48 PMSuggest an edit

Merhaba! Bugün modern distributed sistemlerin vazgeçilmez bileşeni RabbitMQ’yu inceleyeceğiz. Message queuing, microservices iletişimi ve asenkron processing konularını kod örnekleriyle öğreneceğiz.

RabbitMQ Nedir?

RabbitMQ, AMQP (Advanced Message Queuing Protocol) tabanlı açık kaynaklı bir message broker’dır. Uygulamalar arasında güvenilir mesaj iletişimi sağlar.

Temel Kavramlar

# RabbitMQ Components
"""
Producer  → Exchange → Queue → Consumer
    ↓         ↓        ↓        ↓
  Mesaj    Routing   Buffer   Process
  Gönder   Yap      Tut      Et
"""

Core Components:

Installation ve Setup

Docker ile RabbitMQ

# docker-compose.yml
version: "3.8"
services:
  rabbitmq:
    image: rabbitmq:3.12-management
    container_name: rabbitmq
    ports:
      - "5672:5672" # AMQP port
      - "15672:15672" # Management UI
    environment:
      RABBITMQ_DEFAULT_USER: admin
      RABBITMQ_DEFAULT_PASS: admin123
      RABBITMQ_DEFAULT_VHOST: /
    volumes:
      - rabbitmq_data:/var/lib/rabbitmq
    networks:
      - app_network

  # Example producer service
  producer:
    build: ./producer
    depends_on:
      - rabbitmq
    environment:
      RABBITMQ_URL: amqp://admin:admin123@rabbitmq:5672/
    networks:
      - app_network

  # Example consumer service
  consumer:
    build: ./consumer
    depends_on:
      - rabbitmq
    environment:
      RABBITMQ_URL: amqp://admin:admin123@rabbitmq:5672/
    networks:
      - app_network

volumes:
  rabbitmq_data:

networks:
  app_network:
    driver: bridge

Python Setup

# Dependencies
pip install pika python-dotenv asyncio-mqtt celery

Basic Message Queuing

Simple Producer-Consumer

import pika
import json
import time
import logging
from typing import Dict, Any, Callable
from dataclasses import dataclass
from datetime import datetime

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class Message:
    id: str
    content: Dict[str, Any]
    timestamp: datetime
    retry_count: int = 0

class RabbitMQConnection:
    def __init__(self, connection_url: str):
        self.connection_url = connection_url
        self.connection = None
        self.channel = None

    def connect(self):
        """RabbitMQ bağlantısı kur"""
        try:
            parameters = pika.URLParameters(self.connection_url)
            self.connection = pika.BlockingConnection(parameters)
            self.channel = self.connection.channel()
            logger.info("RabbitMQ connection established")
            return True
        except Exception as e:
            logger.error(f"Connection failed: {e}")
            return False

    def disconnect(self):
        """Bağlantıyı kapat"""
        if self.connection and not self.connection.is_closed:
            self.connection.close()
            logger.info("RabbitMQ connection closed")

class SimpleProducer(RabbitMQConnection):
    def __init__(self, connection_url: str):
        super().__init__(connection_url)
        self.connect()

    def declare_queue(self, queue_name: str, durable: bool = True):
        """Queue oluştur"""
        self.channel.queue_declare(queue=queue_name, durable=durable)
        logger.info(f"Queue declared: {queue_name}")

    def send_message(self, queue_name: str, message: Dict[str, Any],
                    persistent: bool = True):
        """Basit mesaj gönder"""
        try:
            # Message properties
            properties = pika.BasicProperties(
                delivery_mode=2 if persistent else 1,  # Persistent message
                timestamp=int(time.time()),
                message_id=f"msg_{int(time.time() * 1000)}"
            )

            # Send message
            self.channel.basic_publish(
                exchange='',
                routing_key=queue_name,
                body=json.dumps(message, default=str),
                properties=properties
            )

            logger.info(f"Message sent to {queue_name}: {message}")
            return True

        except Exception as e:
            logger.error(f"Failed to send message: {e}")
            return False

class SimpleConsumer(RabbitMQConnection):
    def __init__(self, connection_url: str):
        super().__init__(connection_url)
        self.connect()
        self.message_handlers = {}

    def declare_queue(self, queue_name: str, durable: bool = True):
        """Queue oluştur"""
        self.channel.queue_declare(queue=queue_name, durable=durable)

        # QoS ayarı - aynı anda işlenecek mesaj sayısı
        self.channel.basic_qos(prefetch_count=1)
        logger.info(f"Queue declared: {queue_name}")

    def register_handler(self, queue_name: str, handler: Callable):
        """Message handler register et"""
        self.message_handlers[queue_name] = handler

    def process_message(self, channel, method, properties, body):
        """Mesaj işleme callback"""
        try:
            queue_name = method.routing_key
            message_data = json.loads(body.decode('utf-8'))

            logger.info(f"Processing message from {queue_name}: {message_data}")

            # Handler çağır
            if queue_name in self.message_handlers:
                handler = self.message_handlers[queue_name]
                success = handler(message_data)

                if success:
                    # Acknowledge message
                    channel.basic_ack(delivery_tag=method.delivery_tag)
                    logger.info("Message acknowledged")
                else:
                    # Reject and requeue
                    channel.basic_nack(
                        delivery_tag=method.delivery_tag,
                        requeue=True
                    )
                    logger.warning("Message rejected and requeued")
            else:
                logger.warning(f"No handler for queue: {queue_name}")
                channel.basic_ack(delivery_tag=method.delivery_tag)

        except Exception as e:
            logger.error(f"Message processing failed: {e}")
            # Reject message
            channel.basic_nack(
                delivery_tag=method.delivery_tag,
                requeue=False
            )

    def start_consuming(self, queue_name: str):
        """Mesaj dinlemeye başla"""
        self.channel.basic_consume(
            queue=queue_name,
            on_message_callback=self.process_message
        )

        logger.info(f"Started consuming from {queue_name}")

        try:
            self.channel.start_consuming()
        except KeyboardInterrupt:
            logger.info("Stopping consumer...")
            self.channel.stop_consuming()

# Usage example
def example_message_handler(message_data: Dict) -> bool:
    """Örnek mesaj işleyici"""
    try:
        print(f"Processing order: {message_data}")

        # Simulate processing
        time.sleep(2)

        # Processing logic here
        order_id = message_data.get('order_id')
        customer_email = message_data.get('customer_email')

        print(f"Order {order_id} processed for {customer_email}")
        return True

    except Exception as e:
        print(f"Handler error: {e}")
        return False

# Producer example
def send_sample_messages():
    producer = SimpleProducer("amqp://admin:admin123@localhost:5672/")
    producer.declare_queue("order_queue")

    # Sample orders
    orders = [
        {
            "order_id": "ORD001",
            "customer_email": "john@example.com",
            "items": [{"product": "Laptop", "quantity": 1}],
            "total": 999.99
        },
        {
            "order_id": "ORD002",
            "customer_email": "jane@example.com",
            "items": [{"product": "Mouse", "quantity": 2}],
            "total": 29.99
        }
    ]

    for order in orders:
        producer.send_message("order_queue", order)
        time.sleep(1)

    producer.disconnect()

# Consumer example
def consume_messages():
    consumer = SimpleConsumer("amqp://admin:admin123@localhost:5672/")
    consumer.declare_queue("order_queue")
    consumer.register_handler("order_queue", example_message_handler)
    consumer.start_consuming()

# Run producer
# send_sample_messages()

# Run consumer
# consume_messages()

Exchange Types ve Routing

Direct Exchange

class DirectExchangeExample:
    def __init__(self, connection_url: str):
        self.connection = pika.BlockingConnection(pika.URLParameters(connection_url))
        self.channel = self.connection.channel()

        # Exchange declare et
        self.exchange_name = "direct_logs"
        self.channel.exchange_declare(
            exchange=self.exchange_name,
            exchange_type='direct',
            durable=True
        )

    def setup_queues(self):
        """Severity-based queues"""
        severities = ['info', 'warning', 'error', 'critical']

        for severity in severities:
            queue_name = f"logs_{severity}"

            # Queue oluştur
            self.channel.queue_declare(queue=queue_name, durable=True)

            # Exchange'e bind et
            self.channel.queue_bind(
                exchange=self.exchange_name,
                queue=queue_name,
                routing_key=severity
            )

            print(f"Queue {queue_name} bound to {self.exchange_name} with key {severity}")

    def send_log(self, severity: str, message: str):
        """Log mesajı gönder"""
        log_data = {
            'timestamp': datetime.now().isoformat(),
            'severity': severity,
            'message': message,
            'service': 'order_service'
        }

        self.channel.basic_publish(
            exchange=self.exchange_name,
            routing_key=severity,
            body=json.dumps(log_data),
            properties=pika.BasicProperties(delivery_mode=2)
        )

        print(f"Sent {severity} log: {message}")

    def consume_logs(self, severity: str):
        """Belirli severity'deki logları dinle"""
        queue_name = f"logs_{severity}"

        def callback(channel, method, properties, body):
            log_data = json.loads(body.decode('utf-8'))
            print(f"[{severity.upper()}] {log_data['timestamp']}: {log_data['message']}")
            channel.basic_ack(delivery_tag=method.delivery_tag)

        self.channel.basic_consume(queue=queue_name, on_message_callback=callback)

        print(f"Consuming {severity} logs. Press Ctrl+C to stop...")
        try:
            self.channel.start_consuming()
        except KeyboardInterrupt:
            self.channel.stop_consuming()

# Usage
direct_example = DirectExchangeExample("amqp://admin:admin123@localhost:5672/")
direct_example.setup_queues()

# Send different severity logs
direct_example.send_log("info", "User logged in successfully")
direct_example.send_log("warning", "High memory usage detected")
direct_example.send_log("error", "Database connection failed")
direct_example.send_log("critical", "System disk space critical")

# Consume only error logs
# direct_example.consume_logs("error")

Topic Exchange

class TopicExchangeExample:
    def __init__(self, connection_url: str):
        self.connection = pika.BlockingConnection(pika.URLParameters(connection_url))
        self.channel = self.connection.channel()

        # Topic exchange
        self.exchange_name = "topic_notifications"
        self.channel.exchange_declare(
            exchange=self.exchange_name,
            exchange_type='topic',
            durable=True
        )

    def setup_notification_queues(self):
        """Notification routing patterns"""

        # Queue configurations: (queue_name, routing_patterns)
        queue_configs = [
            ("email_notifications", ["*.email.*", "urgent.email.*"]),
            ("sms_notifications", ["*.sms.*", "urgent.sms.*"]),
            ("push_notifications", ["*.push.*", "urgent.push.*"]),
            ("urgent_all", ["urgent.*"]),
            ("user_all", ["user.*"]),
            ("system_all", ["system.*"])
        ]

        for queue_name, patterns in queue_configs:
            # Queue oluştur
            self.channel.queue_declare(queue=queue_name, durable=True)

            # Routing patterns bind et
            for pattern in patterns:
                self.channel.queue_bind(
                    exchange=self.exchange_name,
                    queue=queue_name,
                    routing_key=pattern
                )
                print(f"Bound {queue_name} to pattern: {pattern}")

    def send_notification(self, routing_key: str, notification_data: Dict):
        """Notification gönder"""
        message = {
            'timestamp': datetime.now().isoformat(),
            'routing_key': routing_key,
            'data': notification_data
        }

        self.channel.basic_publish(
            exchange=self.exchange_name,
            routing_key=routing_key,
            body=json.dumps(message, default=str),
            properties=pika.BasicProperties(delivery_mode=2)
        )

        print(f"Sent notification: {routing_key}")

    def consume_queue(self, queue_name: str):
        """Queue dinle"""
        def callback(channel, method, properties, body):
            message = json.loads(body.decode('utf-8'))
            print(f"[{queue_name}] Routing: {message['routing_key']}")
            print(f"    Data: {message['data']}")
            channel.basic_ack(delivery_tag=method.delivery_tag)

        self.channel.basic_consume(queue=queue_name, on_message_callback=callback)

        print(f"Consuming {queue_name}. Press Ctrl+C to stop...")
        try:
            self.channel.start_consuming()
        except KeyboardInterrupt:
            self.channel.stop_consuming()

# Usage
topic_example = TopicExchangeExample("amqp://admin:admin123@localhost:5672/")
topic_example.setup_notification_queues()

# Send different types of notifications
notifications = [
    ("user.email.welcome", {"user_id": 123, "subject": "Welcome!"}),
    ("user.sms.verification", {"phone": "+1234567890", "code": "123456"}),
    ("urgent.email.security", {"user_id": 456, "subject": "Security Alert"}),
    ("system.push.maintenance", {"message": "Scheduled maintenance"}),
    ("urgent.sms.payment", {"phone": "+1234567890", "amount": "$100"})
]

for routing_key, data in notifications:
    topic_example.send_notification(routing_key, data)

# Consume urgent notifications only
# topic_example.consume_queue("urgent_all")

Fanout Exchange (Pub/Sub)

class PubSubExample:
    def __init__(self, connection_url: str):
        self.connection = pika.BlockingConnection(pika.URLParameters(connection_url))
        self.channel = self.connection.channel()

        # Fanout exchange - tüm queues'e broadcast
        self.exchange_name = "events_broadcast"
        self.channel.exchange_declare(
            exchange=self.exchange_name,
            exchange_type='fanout',
            durable=True
        )

    def setup_event_subscribers(self):
        """Event subscriber services"""

        services = [
            "analytics_service",
            "notification_service",
            "audit_service",
            "recommendation_service"
        ]

        for service in services:
            queue_name = f"events_{service}"

            # Queue oluştur
            self.channel.queue_declare(queue=queue_name, durable=True)

            # Fanout exchange'e bind et (routing key gerekmez)
            self.channel.queue_bind(
                exchange=self.exchange_name,
                queue=queue_name
            )

            print(f"Service {service} subscribed to events")

    def publish_event(self, event_type: str, event_data: Dict):
        """Event publish et - tüm subscribers'a gider"""
        event = {
            'event_id': f"evt_{int(time.time() * 1000)}",
            'event_type': event_type,
            'timestamp': datetime.now().isoformat(),
            'data': event_data
        }

        self.channel.basic_publish(
            exchange=self.exchange_name,
            routing_key='',  # Fanout'ta routing key önemli değil
            body=json.dumps(event, default=str),
            properties=pika.BasicProperties(delivery_mode=2)
        )

        print(f"Published event: {event_type}")

    def subscribe_service(self, service_name: str):
        """Service event subscription"""
        queue_name = f"events_{service_name}"

        def process_event(channel, method, properties, body):
            event = json.loads(body.decode('utf-8'))

            print(f"[{service_name.upper()}] Processing event: {event['event_type']}")
            print(f"    Event ID: {event['event_id']}")
            print(f"    Data: {event['data']}")

            # Service-specific processing
            if service_name == "analytics_service":
                self.process_analytics(event)
            elif service_name == "notification_service":
                self.process_notification(event)
            elif service_name == "audit_service":
                self.process_audit(event)

            channel.basic_ack(delivery_tag=method.delivery_tag)

        self.channel.basic_consume(queue=queue_name, on_message_callback=process_event)

        print(f"{service_name} started. Press Ctrl+C to stop...")
        try:
            self.channel.start_consuming()
        except KeyboardInterrupt:
            self.channel.stop_consuming()

    def process_analytics(self, event: Dict):
        """Analytics service processing"""
        print(f"    📊 Analytics: Recording {event['event_type']} metrics")

    def process_notification(self, event: Dict):
        """Notification service processing"""
        if event['event_type'] in ['user_registered', 'order_placed']:
            print(f"    📧 Sending notification for {event['event_type']}")

    def process_audit(self, event: Dict):
        """Audit service processing"""
        print(f"    📝 Audit: Logging {event['event_type']} event")

# Usage
pubsub_example = PubSubExample("amqp://admin:admin123@localhost:5672/")
pubsub_example.setup_event_subscribers()

# Publish sample events
events = [
    ("user_registered", {"user_id": 123, "email": "john@example.com"}),
    ("order_placed", {"order_id": "ORD001", "user_id": 123, "amount": 99.99}),
    ("payment_processed", {"payment_id": "PAY001", "order_id": "ORD001"}),
    ("product_viewed", {"product_id": "PROD001", "user_id": 123})
]

for event_type, data in events:
    pubsub_example.publish_event(event_type, data)

# Start analytics service subscriber
# pubsub_example.subscribe_service("analytics_service")

Configuration Files

# rabbitmq.conf
# Clustering
cluster_formation.peer_discovery_backend = classic_config
cluster_formation.classic_config.nodes.1 = rabbit@rabbitmq-prod
cluster_formation.classic_config.nodes.2 = rabbit@rabbitmq-2

# Performance
vm_memory_high_watermark.relative = 0.6
disk_free_limit.relative = 2.0

# Security
auth_mechanisms.1 = PLAIN
auth_mechanisms.2 = AMQPLAIN
ssl_options.verify = verify_peer
ssl_options.fail_if_no_peer_cert = false

# Management
management.tcp.port = 15672
management.ssl.port = 15671

# Logging
log.console = true
log.console.level = info
log.file = false

# Limits
channel_max = 2047
connection_max = 1000
heartbeat = 60

Sonuç

RabbitMQ modern distributed sistemlerin kritik bir parçasıdır:

💡 Temel Özellikler:

🔧 Production Features:

🚀 Use Cases:

⚡ Best Practices:

RabbitMQ ile scalable ve reliable distributed sistemler inşa edebilirsiniz!

Kaynaklar



Previous Post
ActivityPub: Sosyal Medyanın Merkezi Olmayan Geleceği
Next Post
fsck: Linux Dosya Sisteminin Doktoru