راه‌اندازی Apache Kafka با Docker و اجرای Producer/Consumer

در این مقاله، Kafka را به‌صورت عملی اجرا می‌کنیم: با Docker Compose یک کلاستر سبک Kafka/Zookeeper بالا می‌آوریم، سپس دو پروژهٔ مستقل Producer و Consumer در .NET می‌سازیم تا جریان دادهٔ را تجربه کنیم. جداسازی پروژه‌ها امکان استقرار و مقیاس‌پذیری مستقل، CI/CD تمیزتر و امنیت بهتر (حداقل سطح دسترسی) را فراهم می‌کند.

پیش‌نیازها

  • Docker و Docker Compose
  • .NET 8 SDK
  • آشنایی کلی با مفاهیم Kafka از مقالهٔ اول (Topic، Partition، Consumer Group)

راه‌اندازی کلاستر با Docker Compose

فایل docker-compose.yml زیر یک محیط توسعهٔ ساده ایجاد می‌کند. در ویندوز/WSL ممکن است برای دسترسی کلاینت‌ها به host.docker.internal نیاز داشته باشید.

version: '3.8'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.6.1
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
    ports: ["2181:2181"]

  kafka:
    image: confluentinc/cp-kafka:7.6.1
    depends_on: [zookeeper]
    ports: ["9092:9092"]
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
docker compose up -d
docker ps

ایجاد Topic و بررسی

برای ایجاد Topic اولیه با ۳ پارتیشن (جهت مصرف موازی):

docker exec -it $(docker ps -qf name=kafka) bash
kafka-topics --create --topic order-created \
  --bootstrap-server localhost:9092 \
  --partitions 3 --replication-factor 1
kafka-topics --describe --topic order-created --bootstrap-server localhost:9092

تعداد پارتیشن‌ها میزان موازی‌سازی مصرف را تعیین می‌کند. در Production مقدار Replication Factor را حداقل ۳ در نظر بگیرید.

ساختار سلوشن پروژه

سه پروژهٔ مستقل: تولیدکننده، مصرف‌کننده و قراردادها.

KafkaDemo/
├─ docker-compose.yml
└─ src/
   ├─ ProducerApp/
   │  ├─ ProducerApp.csproj
   │  ├─ Program.cs
   │  └─ appsettings.json
   ├─ ConsumerApp/
   │  ├─ ConsumerApp.csproj
   │  ├─ Program.cs
   │  └─ appsettings.json
   └─ Shared.Contracts/
      ├─ Shared.Contracts.csproj
      └─ OrderCreated.cs

پروژه Shared.Contracts محل DTO/Record مشترک است تا از ناهماهنگی اسکیمای پیام جلوگیری شود. در ادامه می‌توانید آن را با Schema Registry تکمیل کنید.

پیام مشترک: OrderCreated

namespace Shared.Contracts;

public record OrderCreated(
    string OrderId,
    string UserId,
    decimal Amount,
    DateTime OccurredAtUtc
);

ProducerApp

تنظیمات

{
  "Kafka": {
    "BootstrapServers": "localhost:9092",
    "Topic": "order-created"
  }
}

کد نمونه

using Confluent.Kafka;
using Microsoft.Extensions.Configuration;
using Shared.Contracts;

var config = new ConfigurationBuilder()
    .AddJsonFile("appsettings.json", optional: true)
    .AddEnvironmentVariables(prefix: "PRODUCER_")
    .Build();

var bootstrap = config["Kafka:BootstrapServers"] ?? "localhost:9092";
var topic = config["Kafka:Topic"] ?? "order-created";

var producerConfig = new ProducerConfig
{
    BootstrapServers = bootstrap,
    Acks = Acks.All,
    EnableIdempotence = true
};

using var producer = new ProducerBuilder<string, string>(producerConfig).Build();

for (int i = 1; i <= 5; i++)
{
    var evt = new OrderCreated(
        OrderId: Guid.NewGuid().ToString("N"),
        UserId: "78",
        Amount: 249000 + i,
        OccurredAtUtc: DateTime.UtcNow
    );

    var payload = System.Text.Json.JsonSerializer.Serialize(evt);

    var result = await producer.ProduceAsync(topic, new Message<string, string>
    {
        Key = evt.OrderId,
        Value = payload
    });

    Console.WriteLine($"sent to {result.TopicPartitionOffset} :: {payload}");
}

producer.Flush(TimeSpan.FromSeconds(5));

نکته: با EnableIdempotence=true و Acks=All احتمال دوباره‌فرستی مخرب کاهش می‌یابد. کلید پیام (Key) پارتیشنینگ پایدار را تضمین می‌کند.

ConsumerApp

تنظیمات

{
  "Kafka": {
    "BootstrapServers": "localhost:9092",
    "Topic": "order-created",
    "GroupId": "order-created-consumer-1",
    "AutoOffsetReset": "Earliest"
  }
}

کد نمونه

using Confluent.Kafka;
using Microsoft.Extensions.Configuration;
using Shared.Contracts;

var config = new ConfigurationBuilder()
    .AddJsonFile("appsettings.json", optional: true)
    .AddEnvironmentVariables(prefix: "CONSUMER_")
    .Build();

var bootstrap = config["Kafka:BootstrapServers"] ?? "localhost:9092";
var topic = config["Kafka:Topic"] ?? "order-created";
var groupId = config["Kafka:GroupId"] ?? "order-created-consumer-1";
var offsetReset = Enum.TryParse<AutoOffsetReset>(config["Kafka:AutoOffsetReset"], true, out var parsed)
    ? parsed : AutoOffsetReset.Earliest;

var consumerConfig = new ConsumerConfig
{
    BootstrapServers = bootstrap,
    GroupId = groupId,
    AutoOffsetReset = offsetReset,
    EnableAutoCommit = false,
    PartitionAssignmentStrategy = PartitionAssignmentStrategy.CooperativeSticky
};

using var consumer = new ConsumerBuilder<string, string>(consumerConfig).Build();
consumer.Subscribe(topic);

Console.WriteLine("listening... Ctrl+C to exit");

try
{
    while (true)
    {
        var cr = consumer.Consume();
        var evt = System.Text.Json.JsonSerializer.Deserialize<OrderCreated>(cr.Message.Value);
        Console.WriteLine($"got: {evt?.OrderId} amount={evt?.Amount} at {evt?.OccurredAtUtc:O}");

        // پردازش شما ...

        consumer.Commit(cr); // فقط پس از موفقیت
    }
}
catch (OperationCanceledException) { }
finally
{
    consumer.Close();
}

نکته: با EnableAutoCommit=false کنترل کامل روی Commit دارید و الگوی «حداقل‌یک‌بار» (At-least-once) را پیاده می‌کنید.

اجرای پروژه‌ها

# ترمینال ۱
cd src/ConsumerApp
dotnet run

# ترمینال ۲
cd src/ProducerApp
dotnet run

عیب‌یابی سریع

  • اتصال برقرار نمی‌شود: مقدار KAFKA_ADVERTISED_LISTENERS را با هاست واقعی تطبیق دهید (در ویندوز: host.docker.internal).
  • پیام‌ها نمی‌رسد: AutoOffsetReset=Earliest و عضویت در Topic را بررسی کنید.
  • دریافت پیام‌ها کند است: پارتیشن‌ها را افزایش دهید و چند Consumer در یک Group اجرا کنید.

نکات Production

  • Producer: تنظیمات retries، linger.ms، batch.size، compression.type (مثلاً lz4).
  • Topic: تعداد پارتیشن‌ها طبق موازی‌سازی؛ Replication ≥ 3.
  • Schema Management: استفاده از Schema Registry و نسخه‌بندی تکاملی.
  • Observability: متریک‌ها (JMX/Prometheus) و داشبورد Grafana؛ پایش Lag.
  • امنیت: فعال‌سازی SSL/SASL و اعمال ACL جدا برای Producer/Consumer.

جمع‌بندی

با جداسازی Producer و Consumer و اجرای Kafka روی Docker، پایهٔ محکمی برای سیستم‌های رویدادمحور واقعی خواهید داشت. این الگو به شما اجازه می‌دهد هر جزء را مستقل توسعه، مقیاس و دیپلوی کنید و با افزودن مانیتورینگ و مدیریت اسکیمای پیام، مسیر Production را هموار سازید.