メインコンテンツまでスキップ
バージョン: Latest-3.4

AutoMQ Kafka

AutoMQ for Kafka は、クラウド環境向けに再設計された Kafka のクラウドネイティブバージョンです。AutoMQ Kafka は オープンソース であり、Kafka プロトコルと完全に互換性があり、クラウドの利点を最大限に活用しています。自己管理型の Apache Kafka と比較して、AutoMQ Kafka はクラウドネイティブなアーキテクチャにより、容量の自動スケーリング、ネットワークトラフィックの自己バランス、パーティションの数秒での移動などの機能を提供します。これらの機能は、ユーザーにとっての総所有コスト (TCO) を大幅に削減します。

この記事では、StarRocks Routine Load を使用して AutoMQ Kafka にデータをインポートする方法を説明します。Routine Load の基本原則については、Routine Load Fundamentals のセクションを参照してください。

環境の準備

StarRocks とテストデータの準備

StarRocks クラスターが稼働していることを確認してください。

テスト用にデータベースと主キーテーブルを作成します。

create database automq_db;
create table users (
id bigint NOT NULL,
name string NOT NULL,
timestamp string NULL,
status string NULL
) PRIMARY KEY (id)
DISTRIBUTED BY HASH(id)
PROPERTIES (
"enable_persistent_index" = "true"
);
注記

ステージング環境の StarRocks クラスターに BE が 1 つしか含まれていない場合、PROPERTIES 句でレプリカの数を 1 に設定できます。例えば、PROPERTIES( "replication_num" = "1" ) のようにします。デフォルトのレプリカ数は 3 であり、これは本番環境の StarRocks クラスターにも推奨される数です。デフォルトの数を使用したい場合は、replication_num パラメータを設定する必要はありません。

AutoMQ Kafka とテストデータの準備

AutoMQ Kafka 環境とテストデータを準備するには、AutoMQ Quick Start ガイドに従って AutoMQ Kafka クラスターをデプロイします。StarRocks が AutoMQ Kafka サーバーに直接接続できることを確認してください。

AutoMQ Kafka で example_topic という名前のトピックをすばやく作成し、テスト JSON データを書き込むには、次の手順に従います。

トピックの作成

Kafka のコマンドラインツールを使用してトピックを作成します。Kafka 環境にアクセスでき、Kafka サービスが稼働していることを確認してください。トピックを作成するためのコマンドは次のとおりです。

./kafka-topics.sh --create --topic example_topic --bootstrap-server 10.0.96.4:9092 --partitions 1 --replication-factor 1

注: topicbootstrap-server を Kafka サーバーのアドレスに置き換えてください。

トピック作成の結果を確認するには、次のコマンドを使用します。

./kafka-topics.sh --describe example_topic --bootstrap-server 10.0.96.4:9092

テストデータの生成

シンプルな JSON 形式のテストデータを生成します。

{
"id": 1,
"name": "testuser",
"timestamp": "2023-11-10T12:00:00",
"status": "active"
}

テストデータの書き込み

Kafka のコマンドラインツールまたはプログラミング手法を使用して、example_topic にテストデータを書き込みます。コマンドラインツールを使用した例は次のとおりです。

echo '{"id": 1, "name": "testuser", "timestamp": "2023-11-10T12:00:00", "status": "active"}' | sh kafka-console-producer.sh --broker-list 10.0.96.4:9092 --topic example_topic

注: topicbootstrap-server を Kafka サーバーのアドレスに置き換えてください。

最近書き込まれたトピックデータを表示するには、次のコマンドを使用します。

sh kafka-console-consumer.sh --bootstrap-server 10.0.96.4:9092 --topic example_topic --from-beginning

Routine Load タスクの作成

StarRocks コマンドラインで、AutoMQ Kafka トピックからデータを継続的にインポートする Routine Load タスクを作成します。

CREATE ROUTINE LOAD automq_example_load ON users
COLUMNS(id, name, timestamp, status)
PROPERTIES
(
"desired_concurrent_number" = "5",
"format" = "json",
"jsonpaths" = "[\"$.id\",\"$.name\",\"$.timestamp\",\"$.status\"]"
)
FROM KAFKA
(
"kafka_broker_list" = "10.0.96.4:9092",
"kafka_topic" = "example_topic",
"kafka_partitions" = "0",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);

注: kafka_broker_list を Kafka サーバーのアドレスに置き換えてください。

パラメータの説明

データ形式

PROPERTIES 句の "format" = "json" でデータ形式を JSON と指定します。

データの抽出と変換

ソースデータとターゲットテーブル間のマッピングと変換の関係を指定するには、COLUMNS および jsonpaths パラメータを設定します。COLUMNS の列名はターゲットテーブルの列名に対応し、その順序はソースデータの列順に対応します。jsonpaths パラメータは、JSON データから必要なフィールドデータを抽出するために使用され、新しく生成された CSV データに似ています。その後、COLUMNS パラメータは一時的に jsonpaths のフィールドに順番に名前を付けます。データ変換の詳細については、Data Transformation during Import を参照してください。

注: 各 JSON オブジェクトの各行が、ターゲットテーブルの列に対応するキー名と数量(順序は不要)を持っている場合、COLUMNS を設定する必要はありません。

データインポートの確認

まず、Routine Load インポートジョブを確認し、Routine Load インポートタスクのステータスが RUNNING ステータスであることを確認します。

show routine load\G

次に、StarRocks データベース内の対応するテーブルをクエリすることで、データが正常にインポートされたことを確認できます。

StarRocks > select * from users;
+------+--------------+---------------------+--------+
| id | name | timestamp | status |
+------+--------------+---------------------+--------+
| 1 | testuser | 2023-11-10T12:00:00 | active |
| 2 | testuser | 2023-11-10T12:00:00 | active |
+------+--------------+---------------------+--------+
2 rows in set (0.01 sec)