メインコンテンツまでスキップ
バージョン: Latest-3.4

RisingWave から StarRocks へのデータシンク

RisingWave は、ストリーミングデータのシンプルで効率的かつ信頼性の高い処理を可能にする分散型 SQL ストリーミングデータベースです。RisingWave をすぐに始めるには、Get started を参照してください。

RisingWave はデータシンク機能を提供しており、ユーザーは他のサードパーティコンポーネントを必要とせずに直接データを StarRocks にシンクできます。この機能は、すべての StarRocks テーブルタイプ(Duplicate Key、主キーテーブル、Aggregate、ユニークキーテーブル)で動作します。

前提条件

  • v1.7 以降の RisingWave クラスターが稼働していること。
  • 対象の StarRocks テーブルにアクセスでき、StarRocks のバージョンが v2.5 以降であること。
  • StarRocks テーブルにデータをシンクするには、対象テーブルに対する SELECT および INSERT 権限が必要です。権限を付与するには、GRANT を参照してください。
ヒント

RisingWave は StarRocks Sink に対して少なくとも一度のセマンティクスのみをサポートしており、障害が発生した場合には重複したデータが書き込まれる可能性があります。データの重複排除とエンドツーエンドの冪等性のある書き込みを実現するために、StarRocks 主キーテーブル を使用することをお勧めします。

パラメータ

RisingWave から StarRocks にデータをシンクする際に設定する必要があるパラメータを以下に示します。特に指定がない限り、すべてのパラメータは必須です。

パラメータ説明
connectorstarrocks に設定します。
starrocks.hostStarRocks FE ノードの IP アドレス。
starrocks.query_portFE ノードのクエリポート。
starrocks.http_portFE ノードの HTTP ポート。
starrocks.userStarRocks クラスターにアクセスするためのユーザー名。
starrocks.passwordユーザー名に関連付けられたパスワード。
starrocks.database対象テーブルが存在する StarRocks データベース。
starrocks.tableデータをシンクしたい StarRocks テーブル。
starrocks.partial_update(オプション) StarRocks 部分更新機能を有効にするかどうか。この機能を有効にすると、更新が必要な列が少ない場合にシンクのパフォーマンスが向上します。
typeシンク中のデータ操作タイプ。
  • append-only: INSERT 操作のみを実行します。
  • upsert: Upsert 操作を実行します。この設定を使用する場合、StarRocks の対象テーブルは主キーテーブルでなければなりません。
force_append_only(オプション) typeappend-only に設定されているが、シンクプロセスに Upsert および Delete 操作が含まれている場合、この設定によりシンクタスクが append-only データを生成し、Upsert および Delete データを破棄することができます。
primary_key(オプション) StarRocks テーブルの主キー。typeupsert の場合に必要です。

データ型マッピング

RisingWave と StarRocks 間のデータ型マッピングを以下に示します。

RisingWaveStarRocks
BOOLEANBOOLEAN
SMALLINTSMALLINT
INTEGERINT
BIGINTBIGINT
REALFLOAT
DOUBLEDOUBLE
DECIMALDECIMAL
DATEDATE
VARCHARVARCHAR
TIME
(StarRocks にシンクする前に VARCHAR にキャスト)
サポートされていません
TIMESTAMPDATETIME
TIMESTAMP WITH TIME ZONE
(StarRocks にシンクする前に TIMESTAMP にキャスト)
サポートされていません
INTERVAL
(StarRocks にシンクする前に VARCHAR にキャスト)
サポートされていません
STRUCTJSON
ARRAYARRAY
BYTEA
(StarRocks にシンクする前に VARCHAR にキャスト)
サポートされていません
JSONBJSON
SERIALBIGINT

  1. StarRocks でデータベース demo を作成し、このデータベースに主キーテーブル score_board を作成します。

    CREATE DATABASE demo;
    USE demo;

    CREATE TABLE demo.score_board(
    id int(11) NOT NULL COMMENT "",
    name varchar(65533) NULL DEFAULT "" COMMENT "",
    score int(11) NOT NULL DEFAULT "0" COMMENT ""
    )
    PRIMARY KEY(id)
    DISTRIBUTED BY HASH(id);
  2. RisingWave から StarRocks にデータをシンクします。

    -- RisingWave でテーブルを作成します。
    CREATE TABLE score_board (
    id INT PRIMARY KEY,
    name VARCHAR,
    score INT
    );

    -- テーブルにデータを挿入します。
    INSERT INTO score_board VALUES (1, 'starrocks', 100), (2, 'risingwave', 100);

    -- このテーブルから StarRocks テーブルにデータをシンクします。
    CREATE SINK score_board_sink
    FROM score_board WITH (
    connector = 'starrocks',
    type = 'upsert',
    starrocks.host = 'starrocks-fe',
    starrocks.mysqlport = '9030',
    starrocks.httpport = '8030',
    starrocks.user = 'users',
    starrocks.password = '123456',
    starrocks.database = 'demo',
    starrocks.table = 'score_board',
    primary_key = 'id'
    );