This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.
模型推理
模型推理 #
StreamingFlink SQL 提供了 ML_PREDICT
表值函数(TVF)来在 SQL 查询中执行模型推理。该函数允许您直接在 SQL 中对数据流应用机器学习模型。
请参阅模型创建了解如何创建模型。
ML_PREDICT 函数 #
ML_PREDICT
函数接收一个表输入,对其应用模型,并返回一个包含模型预测结果的新表。当底层模型允许时,该函数提供同步/异步推理模式支持。
语法 #
SELECT * FROM
ML_PREDICT(
TABLE input_table,
MODEL model_name,
DESCRIPTOR(feature_columns),
[CONFIG => MAP['key', 'value']]
)
参数 #
input_table
:包含待处理数据的输入表model_name
:用于推理的模型名称feature_columns
:指定输入表中哪些列应作为模型特征的描述符config
:(可选)模型推理的配置选项映射
配置选项 #
以下配置选项可以在配置映射中指定:
Key | Default | Type | Description |
---|---|---|---|
async |
(none) | Boolean | Value can be 'true' or 'false' to suggest the planner choose the corresponding predict function. If the backend predict function provider does not support the suggested mode, it will throw exception to notify users. |
max-concurrent-operations |
(none) | Integer | The max number of async i/o operation that the async ml predict can trigger. |
output-mode |
(none) | Enum |
Output mode for asynchronous operations which will convert to {@see AsyncDataStream.OutputMode}, ORDERED by default. If set to ALLOW_UNORDERED, will attempt to use {@see AsyncDataStream.OutputMode.UNORDERED} when it does not affect the correctness of the result, otherwise ORDERED will be still used. Possible values:
|
timeout |
(none) | Duration | Timeout from first invoke to final completion of asynchronous operation, may include multiple retries, and will be reset in case of failover. |
示例 #
-- 基本用法
SELECT * FROM ML_PREDICT(
TABLE input_table,
MODEL my_model,
DESCRIPTOR(feature1, feature2)
);
-- 使用配置选项
SELECT * FROM ML_PREDICT(
TABLE input_table,
MODEL my_model,
DESCRIPTOR(feature1, feature2),
MAP['async', 'true', 'timeout', '100s']
);
-- 使用命名参数
SELECT * FROM ML_PREDICT(
INPUT => TABLE input_table,
MODEL => MODEL my_model,
ARGS => DESCRIPTOR(feature1, feature2),
CONFIG => MAP['async', 'true']
);
输出 #
输出表包含输入表的所有列以及模型的预测列。预测列根据模型的输出模式添加。
注意事项 #
- 在使用
ML_PREDICT
之前,模型必须在目录中注册。 - 描述符中指定的特征列数量必须与模型的输入模式匹配。
- 如果输出中的列名与输入表中的现有列名冲突,将在输出列名中添加索引以避免冲突。例如,如果输出列名为
prediction
,且输入表中已存在同名列,则输出列将被重命名为prediction0
。 - 对于异步推理,模型提供者必须支持
AsyncPredictRuntimeProvider
接口。 ML_PREDICT
仅支持仅追加表。不支持 CDC(变更数据捕获)表,因为ML_PREDICT
的结果是非确定性的。
模型提供者 #
ML_PREDICT
函数使用 ModelProvider
执行实际的模型推理。提供者根据注册模型时指定的提供者标识符进行查找。有两种类型的模型提供者:
-
PredictRuntimeProvider
:用于同步模型推理- 实现
createPredictFunction
方法以创建同步预测函数 - 当配置中
async
设置为false
时使用
- 实现
-
AsyncPredictRuntimeProvider
:用于异步模型推理- 实现
createAsyncPredictFunction
方法以创建异步预测函数 - 当配置中
async
设置为true
时使用 - 需要额外的超时和缓冲区容量配置
- 实现
如果配置中未设置 async
,系统将选择同步或异步模型提供者,如果两者都存在,则优先使用异步模型提供者。
错误处理 #
在以下情况下,函数将抛出异常:
- 模型中不存在于目录中
- 特征列数量与模型的输入模式不匹配
- 缺少模型参数
- 提供的参数过少或过多
性能考虑 #
- 对于高吞吐量场景,考虑使用异步推理模式。
- 为异步推理配置适当的超时和缓冲区容量值。
- 函数的性能取决于底层模型提供者的实现。