در این مقاله، Kafka را بهصورت عملی اجرا میکنیم: با Docker Compose یک کلاستر سبک Kafka/Zookeeper بالا میآوریم، سپس دو پروژهٔ مستقل Producer و Consumer در .NET میسازیم تا جریان دادهٔ را تجربه کنیم. جداسازی پروژهها امکان استقرار و مقیاسپذیری مستقل، CI/CD تمیزتر و امنیت بهتر (حداقل سطح دسترسی) را فراهم میکند.
پیشنیازها
- Docker و Docker Compose
- .NET 8 SDK
- آشنایی کلی با مفاهیم Kafka از مقالهٔ اول (Topic، Partition، Consumer Group)
راهاندازی کلاستر با Docker Compose
فایل docker-compose.yml زیر یک محیط توسعهٔ ساده ایجاد میکند. در ویندوز/WSL ممکن است برای دسترسی کلاینتها به host.docker.internal نیاز داشته باشید.
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.6.1
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ports: ["2181:2181"]
kafka:
image: confluentinc/cp-kafka:7.6.1
depends_on: [zookeeper]
ports: ["9092:9092"]
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
docker compose up -d
docker ps
ایجاد Topic و بررسی
برای ایجاد Topic اولیه با ۳ پارتیشن (جهت مصرف موازی):
docker exec -it $(docker ps -qf name=kafka) bash
kafka-topics --create --topic order-created \
--bootstrap-server localhost:9092 \
--partitions 3 --replication-factor 1
kafka-topics --describe --topic order-created --bootstrap-server localhost:9092
تعداد پارتیشنها میزان موازیسازی مصرف را تعیین میکند. در Production مقدار Replication Factor را حداقل ۳ در نظر بگیرید.
ساختار سلوشن پروژه
سه پروژهٔ مستقل: تولیدکننده، مصرفکننده و قراردادها.
KafkaDemo/
├─ docker-compose.yml
└─ src/
├─ ProducerApp/
│ ├─ ProducerApp.csproj
│ ├─ Program.cs
│ └─ appsettings.json
├─ ConsumerApp/
│ ├─ ConsumerApp.csproj
│ ├─ Program.cs
│ └─ appsettings.json
└─ Shared.Contracts/
├─ Shared.Contracts.csproj
└─ OrderCreated.cs
پروژه Shared.Contracts محل DTO/Record مشترک است تا از ناهماهنگی اسکیمای پیام جلوگیری شود. در ادامه میتوانید آن را با Schema Registry تکمیل کنید.
پیام مشترک: OrderCreated
namespace Shared.Contracts;
public record OrderCreated(
string OrderId,
string UserId,
decimal Amount,
DateTime OccurredAtUtc
);
ProducerApp
تنظیمات
{
"Kafka": {
"BootstrapServers": "localhost:9092",
"Topic": "order-created"
}
}
کد نمونه
using Confluent.Kafka;
using Microsoft.Extensions.Configuration;
using Shared.Contracts;
var config = new ConfigurationBuilder()
.AddJsonFile("appsettings.json", optional: true)
.AddEnvironmentVariables(prefix: "PRODUCER_")
.Build();
var bootstrap = config["Kafka:BootstrapServers"] ?? "localhost:9092";
var topic = config["Kafka:Topic"] ?? "order-created";
var producerConfig = new ProducerConfig
{
BootstrapServers = bootstrap,
Acks = Acks.All,
EnableIdempotence = true
};
using var producer = new ProducerBuilder<string, string>(producerConfig).Build();
for (int i = 1; i <= 5; i++)
{
var evt = new OrderCreated(
OrderId: Guid.NewGuid().ToString("N"),
UserId: "78",
Amount: 249000 + i,
OccurredAtUtc: DateTime.UtcNow
);
var payload = System.Text.Json.JsonSerializer.Serialize(evt);
var result = await producer.ProduceAsync(topic, new Message<string, string>
{
Key = evt.OrderId,
Value = payload
});
Console.WriteLine($"sent to {result.TopicPartitionOffset} :: {payload}");
}
producer.Flush(TimeSpan.FromSeconds(5));
نکته: با EnableIdempotence=true و Acks=All احتمال دوبارهفرستی مخرب کاهش مییابد. کلید پیام (Key) پارتیشنینگ پایدار را تضمین میکند.
ConsumerApp
تنظیمات
{
"Kafka": {
"BootstrapServers": "localhost:9092",
"Topic": "order-created",
"GroupId": "order-created-consumer-1",
"AutoOffsetReset": "Earliest"
}
}
کد نمونه
using Confluent.Kafka;
using Microsoft.Extensions.Configuration;
using Shared.Contracts;
var config = new ConfigurationBuilder()
.AddJsonFile("appsettings.json", optional: true)
.AddEnvironmentVariables(prefix: "CONSUMER_")
.Build();
var bootstrap = config["Kafka:BootstrapServers"] ?? "localhost:9092";
var topic = config["Kafka:Topic"] ?? "order-created";
var groupId = config["Kafka:GroupId"] ?? "order-created-consumer-1";
var offsetReset = Enum.TryParse<AutoOffsetReset>(config["Kafka:AutoOffsetReset"], true, out var parsed)
? parsed : AutoOffsetReset.Earliest;
var consumerConfig = new ConsumerConfig
{
BootstrapServers = bootstrap,
GroupId = groupId,
AutoOffsetReset = offsetReset,
EnableAutoCommit = false,
PartitionAssignmentStrategy = PartitionAssignmentStrategy.CooperativeSticky
};
using var consumer = new ConsumerBuilder<string, string>(consumerConfig).Build();
consumer.Subscribe(topic);
Console.WriteLine("listening... Ctrl+C to exit");
try
{
while (true)
{
var cr = consumer.Consume();
var evt = System.Text.Json.JsonSerializer.Deserialize<OrderCreated>(cr.Message.Value);
Console.WriteLine($"got: {evt?.OrderId} amount={evt?.Amount} at {evt?.OccurredAtUtc:O}");
// پردازش شما ...
consumer.Commit(cr); // فقط پس از موفقیت
}
}
catch (OperationCanceledException) { }
finally
{
consumer.Close();
}
نکته: با EnableAutoCommit=false کنترل کامل روی Commit دارید و الگوی «حداقلیکبار» (At-least-once) را پیاده میکنید.
اجرای پروژهها
# ترمینال ۱
cd src/ConsumerApp
dotnet run
# ترمینال ۲
cd src/ProducerApp
dotnet run
عیبیابی سریع
- اتصال برقرار نمیشود: مقدار
KAFKA_ADVERTISED_LISTENERSرا با هاست واقعی تطبیق دهید (در ویندوز:host.docker.internal). - پیامها نمیرسد:
AutoOffsetReset=Earliestو عضویت در Topic را بررسی کنید. - دریافت پیامها کند است: پارتیشنها را افزایش دهید و چند Consumer در یک Group اجرا کنید.
نکات Production
- Producer: تنظیمات
retries،linger.ms،batch.size،compression.type(مثلاًlz4). - Topic: تعداد پارتیشنها طبق موازیسازی؛ Replication ≥ 3.
- Schema Management: استفاده از Schema Registry و نسخهبندی تکاملی.
- Observability: متریکها (JMX/Prometheus) و داشبورد Grafana؛ پایش Lag.
- امنیت: فعالسازی SSL/SASL و اعمال ACL جدا برای Producer/Consumer.
جمعبندی
با جداسازی Producer و Consumer و اجرای Kafka روی Docker، پایهٔ محکمی برای سیستمهای رویدادمحور واقعی خواهید داشت. این الگو به شما اجازه میدهد هر جزء را مستقل توسعه، مقیاس و دیپلوی کنید و با افزودن مانیتورینگ و مدیریت اسکیمای پیام، مسیر Production را هموار سازید.