
2025/12/09 2:25
Show HN: DuckDB for Kafka Stream Processing
RSS: https://news.ycombinator.com/rss
要約▶
Japanese Translation:
SQLFlowは、通常のSQLクエリをリアルタイム分析パイプラインに変換するDockerベースのストリームプロセッサです。ローカルで実行するには:
- 前提条件 – Dockerをインストールし、
をクローンしてhttps://github.com/turbolytics/sql-flow
を実行します。pip install -r requirements.txt - イメージ – 最新のイメージを
で取得します。docker pull turbolytics/sql-flow:latest - Kafka – Docker Compose(
)を使ってシングルノードKafkaインスタンスを起動します。docker-compose -f dev/kafka-single.yml up -d - テスト設定 – 設定ファイルを確認します:
出力には都市の件数(例:New York、Baltimore)が表示されます。docker run -v $(pwd)/dev:/tmp/conf \ -v /tmp/sqlflow:/tmp/sqlflow \ turbolytics/sql-flow:latest \ dev invoke /tmp/conf/config/examples/basic.agg.mem.yml \ /tmp/conf/fixtures/simple.json - データ公開 – テストメッセージをKafkaにロードします:
。python3 cmd/publish-test-data.py --num-messages=10000 --topic="input-simple-agg-mem" - 結果消費 – SQLFlowの出力を確認するには:
docker exec -it kafka1 kafka-console-consumer \ --bootstrap-server=kafka1:9092 \ --topic=output-simple-agg-mem - フルジョブ実行 – ストリームに対してSQLFlowを実行します:
コンシューマーはdocker run -v $(pwd)/dev:/tmp/conf \ -v /tmp/sqlflow:/tmp/sqlflow \ -e SQLFLOW_KAFKA_BROKERS=host.docker.internal:29092 \ turbolytics/sql-flow:latest \ run /tmp/conf/config/examples/basic.agg.mem.yml \ --max-msgs-to-process=10000
のようなJSONレコードを表示します。{"city":"San Francisco504","city_count":1}
SQLFlowはデーモンとして動作し、Kafkaから継続的に読み取り、SQLロジックを実行し、結果をコンソールへ書き込みます。デモ後は設定ファイルを調整したり、Kafkaをスケールアップしたり、SQLFlowを本番パイプラインに組み込んで、馴染みのあるSQL構文でリアルタイム分析をプロトタイプできます。
本文
5 分以内に Kafka からデータを読み取るストリームプロセッサの作成
はじめに
SQL を実行して Kafka ストリームからデータを取得し、結果をコンソールへ書き出すストリームプロセッサを起動します。
必要なもの
| 項目 | 説明 |
|---|---|
| Docker | Docker Engine がインストールされていること |
| sql‑flow リポジトリ | をローカルにクローンします: |
git clone https://github.com/turbolytics/sql-flow.git ``` | | **Python の依存関係** | リポジトリディレクトリでインストールします: ```bash cd path/to/sql-flow && pip install -r requirements.txt ``` | | **sql‑flow Docker イメージ** | 最新版をプルします: ```bash docker pull turbolytics/sql-flow:latest ``` | | **Kafka** | Docker Compose を使ってローカルで起動します: ```bash cd path/to/sql-flow && docker-compose -f dev/kafka-single.yml up -d ``` | --- ### SQLFlow 設定ファイルをテストする SQLFlow は、設定ファイルがフィクスチャデータに対して正しく機能するかどうかを確認する CLI を提供します。 ```bash docker run \ -v $(pwd)/dev:/tmp/conf \ -v /tmp/sqlflow:/tmp/sqlflow \ turbolytics/sql-flow:latest \ dev invoke /tmp/conf/config/examples/basic.agg.mem.yml /tmp/conf/fixtures/simple.json
期待される出力
[ {"city":"New York","city_count":28672}, {"city":"Baltimore","city_count":28672} ]
Kafka ストリームに対して SQLFlow を実行する
-
テストメッセージを Kafka トピックへ送信
python3 cmd/publish-test-data.py --num-messages=10000 --topic="input-simple-agg-mem" -
Kafka コンソールコンシューマーを起動(SQLFlow の出力を見るため)
docker exec -it kafka1 \ kafka-console-consumer --bootstrap-server=kafka1:9092 \ --topic=output-simple-agg-mem -
SQLFlow を起動
docker run \ -v $(pwd)/dev:/tmp/conf \ -v /tmp/sqlflow:/tmp/sqlflow \ -e SQLFLOW_KAFKA_BROKERS=host.docker.internal:29092 \ turbolytics/sql-flow:latest \ run /tmp/conf/config/examples/basic.agg.mem.yml --max-msgs-to-process=10000
期待されるコンソール出力
...{"city":"San Francisco504","city_count":1} {"city":"San Francisco735","city_count":1} {"city":"San Francisco533","city_count":1} {"city":"San Francisco556","city_count":1} ...