通过 Arrow Flight SQL 与 StarRocks 交互
从 v3.5.1 开始,StarRocks 支持通过 Apache Arrow Flight SQL 协议进行连接。
概述
通过 Arrow Flight SQL 协议,您可以使用 ADBC 驱动或 Arrow Flight SQL JDBC 驱动执行常规的 DDL、DML、DQL 语句,并通过 Python 或 Java 代码高效读取大规模数据。
该解决方案建立了一个从 StarRocks 列式执行引擎到客户端的全列式数据传输管道,消除了传统 JDBC 和 ODBC 接口中常见的频繁行列转换和序列化开销。这使得 StarRocks 能够实现零拷贝、低延迟和高吞吐量的数据传输。
应用场景
Arrow Flight SQL 集成让 StarRocks 特别适用于以下场景:
- 数据科学工作流:Pandas 和 Apache Arrow 等工具通常需要列式数据。
- 数据湖分析:需要以高吞吐、低延迟访问海量数据集。
- 机器学习:对快速迭代与处理速度有极高要求。
- 实时分析平台:需要尽可能低延迟地交付数据。
使用 Arrow Flight SQL,您将获得:
- 端到端列式数据传输,避免列式与行式格式之间的昂贵转换。
- 零拷贝数据移动,降低 CPU 和内存开销。
- 低延迟与极高吞吐量,加快分析和响应速度。
技术原理
在传统方式下,StarRocks 内部以列式 Block 结构组织查询结果。但使用 JDBC、ODBC 或 MySQL 协议时,数据必须经过:
- 在服务器端序列化为行式字节流。
- 通过网络传输。
- 在客户端反序列化,并经常重新转换为列式格式。
此三步过程带来了:
- 高昂的序列化/ 反序列化开销。
- 复杂的数据转换逻辑。
- 随数据量增长而加剧的延迟。
Arrow Flight SQL 的集成通过以下方式解决了这些问题:
- 全程保持列式格式,从 StarRocks 执行引擎直达客户端。
- 利用 Apache Arrow 的内存列式表示,针对分析型负载进行优化。
- 通过 Arrow Flight 协议实现高性能传输,无需中间格式转换。

这一设计实现了真正的零拷贝传输,速度更快,资源效率更高。
此外,StarRocks 提供了支持 Arrow Flight SQL 的通用 JDBC 驱动,应用程序可以在不牺牲 JDBC 兼容性或与其他 Arrow Flight 系统互操作性的前提下,轻松接入这一路径。
性能对比
大量测试表明,数据读取速度显著提升。在多种数据类型(整数、浮点、字符串、布尔值和混合列)下,Arrow Flight SQL 始终优于传统 PyMySQL 与 Pandas read_sql 接口。主要结果包括:
- 读取一千万行整数数据时,执行时间从约 35 秒降至 0.4 秒(提升约 85 倍)。
- 混合列表的性能提升达到 160 倍。
- 即使是简单查询(如单列字符串),性能也提升超过 12 倍。
总体来看,Arrow Flight SQL 实现了:
- 根据查询复杂度和数据类型,传输速度提升 20–160 倍。
- 通过消除冗余的序列化步骤,显著降低 CPU 和内存使用。
这些性能优势直接 带来更快的仪表盘、更高效的数据科学工作流,以及实时分析更大规模数据集的能力。
使用方法
按照以下步骤,通过 Arrow Flight SQL 协议使用 Python ADBC 驱动连接并与 StarRocks 交互。完整代码示例请参见附录。
Python 3.9 或更高版本是前提条件。
步骤 1. 安装库
使用 pip 从 PyPI 安装 adbc_driver_manager 和 adbc_driver_flightsql:
pip install adbc_driver_manager
pip install adbc_driver_flightsql
将以下模块或库导入到你的代码中:
- 必需的库:
import adbc_driver_manager
import adbc_driver_flightsql.dbapi as flight_sql
- 可选模块以提高可用性和调试:
import pandas as pd # 可选:使用 DataFrame 更好地显示结果
import traceback # 可选:在 SQL 执行期间提供详细的错误回溯
import time # 可选:用于测量 SQL 执行时间
步骤 2. 连接到 StarRocks
-
如果你想使用命令行启动 FE 服务,可以使用以下任一方式:
-
指定环境变量
JAVA_TOOL_OPTIONS。export JAVA_TOOL_OPTIONS="--add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED" -
在 fe.conf 中指定 FE 配置项
JAVA_OPTS。这样,你可以追加其他JAVA_OPTS值。JAVA_OPTS="--add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED ..."
-
-
如果你想在 IntelliJ IDEA 中运行服务,必须在
Run/Debug Configurations中的Build and run添加以下选项:--add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED
配置 StarRocks
在通过 Arrow Flight SQL 连接到 StarRocks 之前,必须先配置 FE 和 BE 节点,以确保 Arrow Flight SQL 服务已启用并监听指定端口。
在 FE 配置文件 fe.conf 和 BE 配置文件 be.conf 中,将 arrow_flight_port 设置为可用端口。修改配置文件后,重启 FE 和 BE 服务以使修改生效。
你必须为 FE 和 BE 设置不同的 arrow_flight_port。
示例:
// fe.conf
arrow_flight_port = 9408
// be.conf
arrow_flight_port = 9419
建立连接
在客户端,使用以下信息创建一个 Arrow Flight SQL 客户端:
- StarRocks FE 的主机地址
- Arrow Flight 在 StarRocks FE 上监听的端口
- 拥有必要权限的 StarRocks 用户的用户名和密码
示例:
FE_HOST = "127.0.0.1"
FE_PORT = 9408
conn = flight_sql.connect(
uri=f"grpc://{FE_HOST}:{FE_PORT}",
db_kwargs={
adbc_driver_manager.DatabaseOptions.USERNAME.value: "root",
adbc_driver_manager.DatabaseOptions.PASSWORD.value: "",
}
)
cursor = conn.cursor()
连接建立后,你可以通过返回的 Cursor 执行 SQL 语句与 StarRocks 交互。
步骤 3. (可选)预定义工具函数
这些函数用于格式化输出、标准化格式和简化调试。你可以在代码中可选地定义它们以进行测试。
# =============================================================================
# 工具函数,用于更好的输出格式化和 SQL 执行
# =============================================================================
# 打印章节标题
def print_header(title: str):
"""
打印章节标题以提高可读性。
"""
print("\n" + "=" * 80)
print(f"🟢 {title}")
print("=" * 80)
# 打印正在执行的 SQL 语句
def print_sql(sql: str):
"""
在执行前打印 SQL 语句。
"""
print(f"\n🟡 SQL:\n{sql.strip()}")
# 打印结果 DataFrame
def print_result(df: pd.DataFrame):
"""
以可读格式打印结果 DataFrame。
"""
if df.empty:
print("\n🟢 Result: (no rows returned)\n")
else:
print("\n🟢 Result:\n")
print(df.to_string(index=False))
# 打印错误回溯
def print_error(e: Exception):
"""
如果 SQL 执行失败,打印错误回溯。
"""
print("\n🔴 Error occurred:")
traceback.print_exc()
# 执行 SQL 语句并打印结果
def execute(sql: str):
"""
执行 SQL 语句并打印结果和执行时间。
"""
print_sql(sql)
try:
start = time.time() # 可选:开始时间,用于测量执行时间
cursor.execute(sql)
result = cursor.fetchallarrow() # Arrow 表
df = result.to_pandas() # 可选:转换为 DataFrame 以便更好地显示
print_result(df)
print(f"\n⏱️ Execution time: {time.time() - start:.3f} seconds")
except Exception as e:
print_error(e)
步骤 4. 与 StarRocks 交互
本节将指导你完成一些基本操作,例如创建表、导入数据、检查表结构、设置变量和运行查询。
以下列出的输出示例是基于前述步骤中描述的可选模块和工具函数实现的。
-
创建一个数据库和一个将导入数据的表,并检查表结构。
# Step 1: Drop and create database
print_header("Step 1: Drop and Create Database")
execute("DROP DATABASE IF EXISTS sr_arrow_flight_sql FORCE;")
execute("SHOW DATABASES;")
execute("CREATE DATABASE sr_arrow_flight_sql;")
execute("SHOW DATABASES;")
execute("USE sr_arrow_flight_sql;")
# Step 2: Create table
print_header("Step 2: Create Table")
execute("""
CREATE TABLE sr_arrow_flight_sql_test
(
k0 INT,
k1 DOUBLE,
k2 VARCHAR(32) NULL DEFAULT "" COMMENT "",
k3 DECIMAL(27,9) DEFAULT "0",
k4 BIGINT NULL DEFAULT '10',
k5 DATE
)
DISTRIBUTED BY HASH(k5) BUCKETS 5
PROPERTIES("replication_num" = "1");
""")
execute("SHOW CREATE TABLE sr_arrow_flight_sql_test;")示例输出:
================================================================================
🟢 Step 1: Drop and Create Database
================================================================================
🟡 SQL:
DROP DATABASE IF EXISTS sr_arrow_flight_sql FORCE;
/Users/starrocks/test/venv/lib/python3.9/site-packages/adbc_driver_manager/dbapi.py:307: Warning: Cannot disable autocommit; conn will not be DB-API 2.0 compliant
warnings.warn(
🟢 Result:
StatusResult
0
⏱️ Execution time: 0.025 seconds
🟡 SQL:
SHOW DATABASES;
🟢 Result:
Database
_statistics_
hits
information_schema
sys
⏱️ Execution time: 0.014 seconds
🟡 SQL:
CREATE DATABASE sr_arrow_flight_sql;
🟢 Result:
StatusResult
0
⏱️ Execution time: 0.012 seconds
🟡 SQL:
SHOW DATABASES;
🟢 Result:
Database
_statistics_
hits
information_schema
sr_arrow_flight_sql
sys
⏱️ Execution time: 0.005 seconds
🟡 SQL:
USE sr_arrow_flight_sql;
🟢 Result:
StatusResult
0
⏱️ Execution time: 0.006 seconds
================================================================================
🟢 Step 2: Create Table
================================================================================
🟡 SQL:
CREATE TABLE sr_arrow_flight_sql_test
(
k0 INT,
k1 DOUBLE,
k2 VARCHAR(32) NULL DEFAULT "" COMMENT "",
k3 DECIMAL(27,9) DEFAULT "0",
k4 BIGINT NULL DEFAULT '10',
k5 DATE
)
DISTRIBUTED BY HASH(k5) BUCKETS 5
PROPERTIES("replication_num" = "1");
🟢 Result:
StatusResult
0
⏱️ Execution time: 0.021 seconds
🟡 SQL:
SHOW CREATE TABLE sr_arrow_flight_sql_test;
🟢 Result:
Table Create Table
sr_arrow_flight_sql_test CREATE TABLE `sr_arrow_flight_sql_test` (\n `k0` int(11) NULL COMMENT "",\n `k1` double NULL COMMENT "",\n `k2` varchar(32) NULL DEFAULT "" COMMENT "",\n `k3` decimal(27, 9) NULL DEFAULT "0" COMMENT "",\n `k4` bigint(20) NULL DEFAULT "10" COMMENT "",\n `k5` date NULL COMMENT ""\n) ENGINE=OLAP \nDUPLICATE KEY(`k0`)\nDISTRIBUTED BY HASH(`k5`) BUCKETS 5 \nPROPERTIES (\n"compression" = "LZ4",\n"fast_schema_evolution" = "true",\n"replicated_storage" = "true",\n"replication_num" = "1"\n);
⏱️ Execution time: 0.005 seconds -
插入数据,运行一些查询,并设置变量 。
# Step 3: Insert data
print_header("Step 3: Insert Data")
execute("""
INSERT INTO sr_arrow_flight_sql_test VALUES
(0, 0.1, "ID", 0.0001, 1111111111, '2025-04-21'),
(1, 0.20, "ID_1", 1.00000001, 0, '2025-04-21'),
(2, 3.4, "ID_1", 3.1, 123456, '2025-04-22'),
(3, 4, "ID", 4, 4, '2025-04-22'),
(4, 122345.54321, "ID", 122345.54321, 5, '2025-04-22');
""")
# Step 4: Query data
print_header("Step 4: Query Data")
execute("SELECT * FROM sr_arrow_flight_sql_test ORDER BY k0;")
# Step 5: Session variables
print_header("Step 5: Session Variables")
execute("SHOW VARIABLES LIKE '%query_mem_limit%';")
execute("SET query_mem_limit = 2147483648;")
execute("SHOW VARIABLES LIKE '%query_mem_limit%';")
# Step 6: Aggregation query
print_header("Step 6: Aggregation Query")
execute("""
SELECT k5, SUM(k1) AS total_k1, COUNT(1) AS row_count, AVG(k3) AS avg_k3
FROM sr_arrow_flight_sql_test
GROUP BY k5
ORDER BY k5;
""")示例输出:
================================================================================
🟢 Step 3: Insert Data
================================================================================
🟡 SQL:
INSERT INTO sr_arrow_flight_sql_test VALUES
(0, 0.1, "ID", 0.0001, 1111111111, '2025-04-21'),
(1, 0.20, "ID_1", 1.00000001, 0, '2025-04-21'),
(2, 3.4, "ID_1", 3.1, 123456, '2025-04-22'),
(3, 4, "ID", 4, 4, '2025-04-22'),
(4, 122345.54321, "ID", 122345.54321, 5, '2025-04-22');