Apache Kafkaって何?大量データを処理する仕組み

最近のWebサービスやアプリケーションでは、膨大な量のデータがリアルタイムで発生しますよね。
Apache Kafka(アパッチ カフカ)は、大量のデータをリアルタイムで処理・配信するための分散ストリーミングプラットフォームです。
身近な例で理解しよう
Kafkaを理解するために、まずは郵便配達に例えてみましょう。
従来の方法(直接配達):
送信者A → 受信者X
送信者B → 受信者Y
送信者C → 受信者Z
それぞれが直接やり取りするため、送信者が多いと管理が複雑になります。
Kafkaの方法(郵便局を経由):
送信者A ┐
送信者B ├→ Kafka(郵便局)→ 受信者X
送信者C ┘ └→ 受信者Y
└→ 受信者Z
Kafkaが中継地点となり、データを一時的に保管して配送します。
これにより、送信者と受信者を分離でき、システムが柔軟になるんです。
Kafkaが解決する課題
問題1:大量のデータをどう処理する?
例えば、1秒間に10万件のアクセスログが発生するWebサイトがあったとします。
すべてのログをリアルタイムで処理するのは大変ですよね。
Kafkaの解決策:
- データを高速に受け取って一時保管
- 処理側は自分のペースで取り出せる
- スケールアウトで処理能力を増やせる
問題2:システム間の依存関係が複雑
複数のシステムが互いにデータをやり取りすると、依存関係が複雑になります。
Kafkaの解決策:
- データの送信側と受信側を分離
- 新しいシステムを簡単に追加できる
- 1つのシステムが停止しても他に影響しない
問題3:データの欠損を防ぎたい
一時的にシステムがダウンした時、データが消えてしまうと困ります。
Kafkaの解決策:
- データをディスクに永続化
- 複数のサーバーに複製して保管
- 後から過去のデータを再処理できる
Kafkaが生まれた背景と歴史
Kafkaがなぜ作られたのか、その背景を知ると理解が深まります。
LinkedInでの誕生
2011年:
- LinkedInの社内プロジェクトとして開発
- ユーザーの行動データをリアルタイムで処理する必要があった
- 既存のメッセージングシステムでは性能が不足
開発の目的:
- 毎秒数百万のイベントを処理
- 低遅延(数ミリ秒)での配信
- 高い信頼性とスケーラビリティ
2011年:Apache Softwareプロジェクトとしてオープンソース化
現在の活用状況
利用している主な企業:
- LinkedIn(発祥の地)
- Netflix(視聴データの処理)
- Uber(位置情報の処理)
- Twitter(ツイートストリームの処理)
- Spotify(ユーザー行動の分析)
世界中の大規模Webサービスで採用されています。
名前の由来
「Kafka」という名前は、チェコの作家フランツ・カフカから取られています。
特に深い意味はなく、開発者が好きな作家の名前を付けたそうです。
Kafkaの基本的な構成要素
Kafkaを理解するために、重要な構成要素を見ていきましょう。
Producer(プロデューサー)
役割:データを送信する側
- アプリケーションからKafkaにデータを送る
- データの生産者
具体例:
- Webサーバーがアクセスログを送信
- IoTセンサーが測定データを送信
- ユーザーのクリック情報を送信
イメージ:
Webアプリ(Producer) → データ送信 → Kafka
Consumer(コンシューマー)
役割:データを受信する側
- Kafkaからデータを取得して処理
- データの消費者
具体例:
- ログ分析システムがデータを取得
- データベースに保存するプログラム
- リアルタイムダッシュボードへの配信
イメージ:
Kafka → データ取得 → 分析システム(Consumer)
Topic(トピック)
役割:データのカテゴリ分け
- データを種類ごとに分類する単位
- 「チャンネル」や「郵便受け」のようなもの
具体例:
access-logs:アクセスログ用のトピックuser-clicks:クリック情報用のトピックorders:注文データ用のトピック
イメージ:
Producer → Topic「access-logs」 → Consumer(ログ分析)
Producer → Topic「user-clicks」 → Consumer(行動分析)
Producer → Topic「orders」 → Consumer(注文処理)
それぞれのトピックに関心のあるConsumerだけがデータを取得します。
Partition(パーティション)
役割:トピックをさらに分割
- 1つのトピックを複数のパーティションに分割
- 並列処理でスループットを向上
なぜ分割するのか:
1つのトピックに大量のデータが流れる場合、1台のサーバーでは処理しきれません。
パーティションに分割することで、複数のサーバーで分散処理できます。
イメージ:
Topic「access-logs」
├─ Partition 0(サーバーA)
├─ Partition 1(サーバーB)
└─ Partition 2(サーバーC)
データの分配方法:
- キー指定:同じキーのデータは同じパーティションへ
- ラウンドロビン:順番に振り分け
Broker(ブローカー)
役割:Kafkaサーバー本体
- データを受け取って保管するサーバー
- パーティションを管理
- 複数のブローカーでKafkaクラスターを構成
クラスター構成の利点:
- 高可用性:1台が故障しても継続稼働
- スケーラビリティ:サーバーを追加して性能向上
- 負荷分散:複数サーバーで処理を分散
イメージ:
Kafkaクラスター
├─ Broker 1(リーダー)
├─ Broker 2(レプリカ)
└─ Broker 3(レプリカ)
ZooKeeper(ズーキーパー)
役割:Kafkaクラスターの管理
- ブローカーの状態を監視
- リーダー選出を管理
- メタデータの保管
注意:
最新版のKafka(2.8以降)では、ZooKeeperなしでも動作するようになりました。
これを「KRaft(Kafka Raft)モード」と呼びます。
Kafkaの動作原理を詳しく解説

実際にデータがどう流れるか、ステップバイステップで見ていきましょう。
データの送信フロー
ステップ1:Producerがメッセージを送信
Producer → メッセージ作成
├─ トピック:access-logs
├─ キー:user-123
└─ 値:{"url": "/home", "time": "10:30:00"}
ステップ2:パーティションの決定
Kafkaは、キーのハッシュ値を使ってパーティションを決定します。
キー「user-123」のハッシュ値 % パーティション数 = Partition 1
同じキーのメッセージは、常に同じパーティションに送られます。
ステップ3:ブローカーがメッセージを保存
Broker 1(Partition 1のリーダー)
└─ ディスクに書き込み
└─ レプリカにも複製
ステップ4:Producerに確認応答
Broker → Producer:「メッセージを受信しました」
データの受信フロー
ステップ1:Consumerがトピックを購読
Consumer → Kafka:「access-logsトピックのデータをください」
ステップ2:Consumerグループの管理
複数のConsumerをConsumer Groupにまとめることで、並列処理ができます。
Consumer Group「log-processors」
├─ Consumer 1 → Partition 0を担当
├─ Consumer 2 → Partition 1を担当
└─ Consumer 3 → Partition 2を担当
各Consumerが異なるパーティションを担当するため、効率的に処理できます。
ステップ3:メッセージの取得
Consumer → Broker:「オフセット100から読み取り開始」
Broker → Consumer:メッセージを配信
オフセット(Offset)は、パーティション内のメッセージの位置を示す番号です。
ステップ4:オフセットの保存
Consumer → Kafka:「オフセット150まで処理完了」
Consumerが処理の進捗を記録することで、障害時に途中から再開できます。
レプリケーション(複製)の仕組み
データの信頼性を高めるため、Kafkaは複数のブローカーにデータを複製します。
レプリケーション構成:
Partition 0
├─ リーダー(Broker 1) ← 読み書きを担当
├─ フォロワー(Broker 2) ← コピーを保持
└─ フォロワー(Broker 3) ← コピーを保持
リーダーの役割:
- すべての読み書きリクエストを処理
- フォロワーにデータを複製
フォロワーの役割:
- リーダーからデータをコピー
- リーダーが故障したら、代わりにリーダーになる
障害時の動作:
Broker 1(リーダー)が故障
↓
ZooKeeperが検出
↓
Broker 2がリーダーに昇格
↓
サービス継続
これにより、高可用性が実現されます。
Kafkaの主な使い道と実例
Kafkaは、様々な場面で活用されています。
ログ収集と分析
シナリオ:Webサービスのアクセスログ収集
複数のWebサーバー
├─ サーバー1 → Kafka(Topic: access-logs)
├─ サーバー2 → Kafka(Topic: access-logs)
└─ サーバー3 → Kafka(Topic: access-logs)
↓
┌───────┴────────┐
↓ ↓
リアルタイム分析 長期保存
(Elasticsearch) (Hadoop)
メリット:
- すべてのログを1箇所に集約
- リアルタイムで異常を検知
- 過去のログも保管して後から分析
ストリーム処理
シナリオ:Twitterのトレンド分析
ツイートデータ → Kafka → ストリーム処理
├─ キーワード集計
├─ 感情分析
└─ トレンド検出
↓
ダッシュボード表示
リアルタイムで処理できること:
- 急上昇中のキーワード検出
- ユーザーの感情傾向分析
- バズっているトピックの特定
イベントソーシング
シナリオ:ECサイトの注文処理
注文イベント → Kafka(Topic: orders)
├→ 在庫システム(在庫を減らす)
├→ 決済システム(請求処理)
├→ 配送システム(配送手配)
└→ データ分析(売上集計)
メリット:
- すべてのイベントを記録
- 後から過去のイベントを再生可能
- システム間の結合度が低い
メトリクス収集
シナリオ:システム監視
各サーバーのメトリクス
├─ CPU使用率
├─ メモリ使用率
└─ ディスクI/O
↓
Kafka(Topic: metrics)
↓
監視システム(Grafana、Prometheus)
リアルタイムで監視:
- サーバーの負荷状況
- 異常値の検出
- アラート通知
データパイプライン
シナリオ:データウェアハウスへの投入
各種データソース
├─ 営業データ(Salesforce)
├─ ユーザーデータ(MySQL)
└─ ログデータ(ファイル)
↓
Kafka(データハブ)
↓
変換処理(Kafka Streams)
↓
データウェアハウス(BigQuery、Redshift)
メリット:
- 複数のデータソースを統合
- データの変換・加工が容易
- リアルタイムでデータ投入
IoTデータ処理
シナリオ:スマートシティのセンサーデータ
数千台のセンサー
├─ 温度センサー
├─ 湿度センサー
└─ 人流センサー
↓
Kafka(高速処理)
↓
リアルタイム分析
├─ 異常検知
├─ 予測モデル
└─ ダッシュボード
処理できる規模:
- 毎秒数百万件のイベント
- 低遅延(数ミリ秒)
- 高い信頼性
Kafkaと他のメッセージングシステムとの違い
Kafkaは独自の特徴を持っています。他のシステムと比較してみましょう。
RabbitMQとの比較
RabbitMQ:
- 伝統的なメッセージキュー
- メッセージを配信したら削除
- 複雑なルーティングが得意
Kafka:
- ログベースのストレージ
- メッセージを一定期間保持
- 高スループットが得意
使い分け:
- RabbitMQ:少量のメッセージを確実に配信したい
- Kafka:大量のデータをリアルタイムで処理したい
Amazon Kinesis(AWS)との比較
共通点:
- どちらもストリーミングデータ処理
- リアルタイム分析が可能
違い:
| 項目 | Kafka | Kinesis |
|---|---|---|
| 運用 | 自分で管理 | AWSが管理 |
| カスタマイズ | 高い | 限定的 |
| コスト | 固定(サーバー代) | 従量課金 |
| ベンダーロックイン | なし | AWS依存 |
使い分け:
- Kafka:オンプレミスや自由度が必要な場合
- Kinesis:AWSで手軽に始めたい場合
Apache Pulsarとの比較
Apache Pulsar:
- Kafkaの次世代を目指すプロジェクト
- ストレージとコンピューティングの分離
- より柔軟なアーキテクチャ
Kafka:
- 成熟したエコシステム
- 豊富な実績と情報
- 安定性が高い
使い分け:
- Kafka:実績を重視、安定性優先
- Pulsar:最新技術を試したい、特定の機能が必要
Kafkaの基本的なセットアップ
実際にKafkaを動かしてみましょう。
必要な環境
システム要件:
- Java 8以上(Java 11推奨)
- メモリ:最低4GB(本番は16GB以上推奨)
- ディスク:SSD推奨
- OS:Linux、macOS、Windows
Kafkaのインストール(Linux)
ステップ1:Javaのインストール
# Ubuntu/Debian
sudo apt update
sudo apt install openjdk-11-jdk
# CentOS/RHEL
sudo yum install java-11-openjdk
# バージョン確認
java -version
ステップ2:Kafkaのダウンロード
# Kafkaの最新版をダウンロード
cd /opt
sudo wget https://downloads.apache.org/kafka/3.6.0/kafka_2.13-3.6.0.tgz
# 解凍
sudo tar -xzf kafka_2.13-3.6.0.tgz
sudo mv kafka_2.13-3.6.0 kafka
# ディレクトリ移動
cd kafka
ステップ3:KRaftモードでの起動(ZooKeeper不要)
# ユニークなクラスターIDを生成
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
# ログディレクトリをフォーマット
bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties
# Kafkaサーバーの起動
bin/kafka-server-start.sh config/kraft/server.properties
Kafkaが起動すると、ポート9092でリスニングします。
トピックの作成
# トピックの作成
bin/kafka-topics.sh --create \
--topic test-topic \
--bootstrap-server localhost:9092 \
--partitions 3 \
--replication-factor 1
# トピックの一覧表示
bin/kafka-topics.sh --list \
--bootstrap-server localhost:9092
# トピックの詳細表示
bin/kafka-topics.sh --describe \
--topic test-topic \
--bootstrap-server localhost:9092
パラメータの説明:
--partitions 3:パーティション数を3に設定--replication-factor 1:レプリカを1つ作成(単一サーバーの場合)
メッセージの送受信テスト
メッセージの送信(Producer):
# コンソールプロデューサーの起動
bin/kafka-console-producer.sh \
--topic test-topic \
--bootstrap-server localhost:9092
# プロンプトが表示されたら、メッセージを入力
> Hello Kafka
> This is a test message
> Press Ctrl+C to exit
メッセージの受信(Consumer):
別のターミナルを開いて:
# コンソールコンシューマーの起動
bin/kafka-console-consumer.sh \
--topic test-topic \
--bootstrap-server localhost:9092 \
--from-beginning
# Producerで送信したメッセージが表示される
Hello Kafka
This is a test message
--from-beginningを付けると、トピックの最初からすべてのメッセージを読み取ります。
プログラムからKafkaを使う
実際のアプリケーションでKafkaを使う方法を見ていきましょう。
Javaでの実装
Producer(送信側):
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class SimpleProducer {
public static void main(String[] args) {
// 設定
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// Producerの作成
Producer<String, String> producer = new KafkaProducer<>(props);
// メッセージの送信
for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> record =
new ProducerRecord<>("test-topic", "key-" + i, "value-" + i);
producer.send(record, (metadata, exception) -> {
if (exception == null) {
System.out.println("送信成功: " + metadata.offset());
} else {
exception.printStackTrace();
}
});
}
producer.close();
}
}
Consumer(受信側):
import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.*;
public class SimpleConsumer {
public static void main(String[] args) {
// 設定
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// Consumerの作成
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test-topic"));
// メッセージの受信
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("受信: key=%s, value=%s, offset=%d%n",
record.key(), record.value(), record.offset());
}
}
}
}
Pythonでの実装
インストール:
pip install kafka-python
Producer(送信側):
from kafka import KafkaProducer
import json
# Producerの作成
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
# メッセージの送信
for i in range(10):
data = {'number': i, 'message': f'Hello {i}'}
producer.send('test-topic', value=data)
print(f'送信: {data}')
producer.flush()
producer.close()
Consumer(受信側):
from kafka import KafkaConsumer
import json
# Consumerの作成
consumer = KafkaConsumer(
'test-topic',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
group_id='python-group',
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
# メッセージの受信
for message in consumer:
print(f'受信: {message.value}')
Node.js (JavaScript)での実装
インストール:
npm install kafkajs
Producer(送信側):
const { Kafka } = require('kafkajs');
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092']
});
const producer = kafka.producer();
async function sendMessages() {
await producer.connect();
for (let i = 0; i < 10; i++) {
await producer.send({
topic: 'test-topic',
messages: [
{ key: `key-${i}`, value: `value-${i}` }
]
});
console.log(`送信: ${i}`);
}
await producer.disconnect();
}
sendMessages();
Consumer(受信側):
const { Kafka } = require('kafkajs');
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092']
});
const consumer = kafka.consumer({ groupId: 'node-group' });
async function receiveMessages() {
await consumer.connect();
await consumer.subscribe({ topic: 'test-topic', fromBeginning: true });
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
受信: message.value.toString(),
key: message.key.toString(),
offset: message.offset
});
}
});
}
receiveMessages();
Kafka Streamsでストリーム処理
Kafka Streamsを使うと、データを加工しながらリアルタイム処理できます。
Kafka Streamsとは
特徴:
- Kafkaに特化したストリーム処理ライブラリ
- 軽量で使いやすい
- 別のクラスターが不要(Kafkaだけで完結)
簡単なストリーム処理の例
シナリオ:文字列を大文字に変換
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
public class StreamExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
StreamsBuilder builder = new StreamsBuilder();
// 入力トピックからデータを読み取り
KStream<String, String> source = builder.stream("input-topic");
// 大文字に変換
KStream<String, String> transformed = source.mapValues(value -> value.toUpperCase());
// 出力トピックに書き込み
transformed.to("output-topic");
// ストリームの起動
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
// シャットダウンフック
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
集計処理の例
シナリオ:単語の出現回数をカウント
// 文字列を単語に分割してカウント
KStream<String, String> source = builder.stream("text-input");
KTable<String, Long> wordCounts = source
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, word) -> word)
.count();
// 結果を出力トピックへ
wordCounts.toStream().to("word-count-output");
このコードで、リアルタイムに単語の出現回数を集計できます。
Kafkaの監視と運用
本番環境では、適切な監視が重要です。
重要な監視メトリクス
ブローカーレベル:
UnderReplicatedPartitions:レプリケーションが遅れているパーティション数ActiveControllerCount:アクティブなコントローラー数(1であるべき)OfflinePartitionsCount:オフラインのパーティション数(0であるべき)RequestsPerSecond:秒間リクエスト数
トピックレベル:
MessagesInPerSec:受信メッセージ数/秒BytesInPerSec:受信バイト数/秒BytesOutPerSec:送信バイト数/秒
Consumer Groupレベル:
Lag:処理の遅延(未処理メッセージ数)Commit Rate:オフセットのコミット頻度
JMXでのメトリクス取得
Kafkaは、JMX(Java Management Extensions)でメトリクスを公開しています。
JMXの有効化:
# 環境変数を設定してKafkaを起動
export JMX_PORT=9999
bin/kafka-server-start.sh config/server.properties
JConsoleで接続:
jconsole localhost:9999
Prometheus + Grafanaでの監視
JMX Exporterの設定:
# JMX Exporterのダウンロード
wget https://repo1.maven.org/maven2/io/prometheus/jmx/jmx_prometheus_javaagent/0.19.0/jmx_prometheus_javaagent-0.19.0.jar
# Kafka起動時にエージェントを指定
export KAFKA_OPTS="-javaagent:/path/to/jmx_prometheus_javaagent-0.19.0.jar=7071:/path/to/kafka-broker.yml"
Prometheusの設定(prometheus.yml):
scrape_configs:
- job_name: 'kafka'
static_configs:
- targets: ['localhost:7071']
Grafanaダッシュボード:
Grafana公式で、Kafka用のダッシュボードテンプレートが提供されています。
インポートするだけで、すぐに可視化できます。
ログの確認
ブローカーのログ:
# ログファイルの場所
tail -f /opt/kafka/logs/server.log
# エラーの確認
grep ERROR /opt/kafka/logs/server.log
重要なログメッセージ:
Shutting down:ブローカーのシャットダウンUnderReplicated:レプリケーションの遅延OfflinePartitions:パーティションのオフライン
よくあるトラブルと解決方法
Kafkaの運用でよく遭遇する問題と対処法です。
問題1:Consumer Lagが増え続ける
症状:
Consumerの処理が追いつかず、未処理メッセージが溜まっていく
原因:
- Consumerの処理速度が遅い
- Consumerの数が少ない
- Producerの送信量が多すぎる
解決方法:
# Consumer Groupの状態確認
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--describe --group my-group
# 結果の例
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
test-topic 0 100 1000 900 ← Lag が大きい
対策:
- Consumerの数を増やす(パーティション数まで)
- 処理ロジックを最適化
- バッチ処理のサイズを調整
// バッチサイズの調整
props.put("max.poll.records", 100); // 一度に取得する件数を減らす
問題2:ディスク容量不足
症状:
ブローカーがメッセージを受け付けなくなる
原因:
古いログファイルが削除されていない
解決方法:
# server.properties
# ログの保持期間を設定(デフォルトは7日)
log.retention.hours=168
# またはサイズで制限
log.retention.bytes=10737418240 # 10GB
# ログセグメントのサイズ
log.segment.bytes=1073741824 # 1GB
手動でのログ削除:
# 古いログセグメントを確認
ls -lh /opt/kafka/data/kafka-logs/test-topic-0/
# Kafkaを停止してから削除(推奨)
bin/kafka-server-stop.sh
rm -rf /opt/kafka/data/kafka-logs/test-topic-0/*.log
bin/kafka-server-start.sh config/server.properties
問題3:レプリケーションの同期が遅れる
症状:UnderReplicatedPartitionsの値が0以外
原因:
- ネットワーク遅延
- ブローカーの負荷が高い
- ディスクI/Oの遅延
解決方法:
# レプリケーション状態の確認
bin/kafka-topics.sh --describe --topic test-topic \
--bootstrap-server localhost:9092
# パーティションの再割り当て
bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \
--reassignment-json-file reassignment.json --execute
レプリケーション設定の調整:
# server.properties
# レプリケーションの遅延許容時間
replica.lag.time.max.ms=10000
# フェッチのバッファサイズ
replica.fetch.max.bytes=1048576
問題4:Producerのタイムアウト
症状:
メッセージ送信時にTimeoutExceptionが発生
原因:
- ブローカーの応答が遅い
- ネットワークの問題
- バッファが満杯
解決方法:
// タイムアウト時間を延長
props.put("request.timeout.ms", 30000); // 30秒
props.put("delivery.timeout.ms", 120000); // 2分
// バッファサイズを増やす
props.put("buffer.memory", 67108864); // 64MB
// 再試行回数を設定
props.put("retries", 3);
問題5:Consumer Rebalanceが頻発
症状:
Consumerが頻繁に再割り当てされて、処理が進まない
原因:
- ハートビートのタイムアウト
- 処理時間が長すぎる
解決方法:
// ハートビート間隔の調整
props.put("session.timeout.ms", 30000); // 30秒
props.put("heartbeat.interval.ms", 10000); // 10秒
// 最大ポーリング間隔を延長
props.put("max.poll.interval.ms", 600000); // 10分
Kafkaのベストプラクティス
効率的で安定したKafka運用のためのヒントです。
パーティション数の決定
考慮すべき要素:
- スループット要件
- 必要なスループット ÷ 1パーティションの処理能力
- 例:1000MB/s必要、1パーティション100MB/s → 10パーティション以上
- Consumer数
- Consumerの最大数 = パーティション数
- 並列度を考慮して決定
- ディスク容量
- パーティション数 × レプリカ数 × 保持期間
- 多すぎるとメタデータの管理負荷が増える
推奨:
- 小規模:3~6パーティション
- 中規模:10~30パーティション
- 大規模:50~100パーティション
後から増やせますが、減らせませんので、最初は控えめに設定しましょう。
レプリケーション係数の設定
推奨値:
レプリケーション係数 = 3
理由:
- 1台故障しても2台で継続稼働
- 2台故障しても1台でデータ保持
- 可用性と性能のバランスが良い
開発環境:
レプリケーション係数 = 1
リソースを節約できますが、本番環境では避けましょう。
Producerの設定最適化
信頼性重視の設定:
props.put("acks", "all"); // すべてのレプリカの確認を待つ
props.put("retries", Integer.MAX_VALUE); // 無限リトライ
props.put("enable.idempotence", true); // 冪等性を保証
スループット重視の設定:
props.put("acks", "1"); // リーダーの確認だけ待つ
props.put("linger.ms", 100); // バッチ送信のための待機時間
props.put("batch.size", 163840); // バッチサイズを大きく
props.put("compression.type", "snappy"); // 圧縮を有効化
Consumerの設定最適化
確実な処理のための設定:
props.put("enable.auto.commit", false); // 手動コミット
props.put("isolation.level", "read_committed"); // コミット済みのみ読む
// メッセージ処理後にコミット
consumer.commitSync();
高速処理のための設定:
props.put("enable.auto.commit", true); // 自動コミット
props.put("fetch.min.bytes", 1048576); // 最小フェッチサイズ(1MB)
props.put("max.poll.records", 500); // 一度に多く取得
トピックの命名規則
推奨:
<環境>.<システム名>.<データ種別>.<詳細>
例:
prod.order-system.orders.created
prod.order-system.orders.updated
dev.user-system.events.login
メリット:
- 一目で用途が分かる
- 検索しやすい
- 環境ごとに分離できる
セキュリティの設定
SSL/TLS暗号化:
# server.properties
listeners=SSL://0.0.0.0:9093
ssl.keystore.location=/path/to/keystore.jks
ssl.keystore.password=password
ssl.key.password=password
SASL認証:
# server.properties
listeners=SASL_SSL://0.0.0.0:9093
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
本番環境では、必ず暗号化と認証を設定しましょう。
まとめ:Kafkaで大規模データ処理を実現しよう
この記事では、Apache Kafkaについて詳しく解説してきました。
重要なポイントのおさらい:
- Kafkaは大量データをリアルタイムで処理する分散ストリーミングプラットフォーム
- Producer、Consumer、Broker、Topic、Partitionが主要な構成要素
- データを永続化し、高可用性を実現
- ログ収集、ストリーム処理、イベントソーシングなど幅広い用途
- パーティション分割で並列処理とスケーラビリティを実現
- レプリケーションで障害に強いシステムを構築
- Java、Python、Node.jsなど多くの言語からライブラリ利用可能
- 適切な監視と運用が本番環境では重要
Kafkaが特に向いている場面:
- 毎秒数千~数百万件のイベント処理
- リアルタイムデータ分析
- マイクロサービス間のデータ連携
- ログ集約と監視
- IoTデータの収集
まずは小さく始めよう:
- 単一ブローカーでローカル環境にセットアップ
- コンソールツールでProducer/Consumerを試す
- 簡単なプログラムでメッセージ送受信
- Kafka Streamsでストリーム処理に挑戦
- 本番環境への展開を計画
Kafkaは最初は複雑に感じるかもしれませんが、基本を押さえれば非常に強力なツールです。
大量のデータをリアルタイムで処理したい時は、ぜひKafkaを検討してみてください!

コメント