高级服务与应用管道技术解析
立即解锁
发布时间: 2025-09-01 00:10:53 阅读量: 21 订阅数: 14 AIGC 


MLOps实战:从零到生产
# 高级服务与应用管道技术解析
## 1. MLRun 服务概述
MLRun serving 提供了丰富的用户界面,它能与平台的其他元素进行原生集成。其主要作用是将模型服务的概念扩展到应用管道的快速交付上,加速 AI 应用的部署。同时,它的无服务器架构能够降低基础设施成本和工程开销,实现持续运营。
在构建 AI 应用时,需要考虑多方面因素:
- **功能集成**:要处理 API 集成、数据丰富化、验证、处理和存储等问题。
- **结果处理**:同一应用往往需要对多个模型的结果进行路由、级联或合并,并执行一个或多个操作。
- **全面监控**:必须对各个方面进行监控,包括资源使用情况、数据、模型性能和应用关键绩效指标(KPI)。
因此,需要围绕应用管道的设计、实现和维护来定义部署目标。在模型开发流程中,作业执行时间或频率可能不是关键因素,但在生产环境中,应用可能需要扩展以处理数千个请求和数 TB 的数据,有时客户端需要即时响应,这就要求更加关注性能和延迟。所以,启用并行性并考虑优化数据管道和模型性能的技术是很有必要的。另外,在某个阶段可能需要升级模型或增强应用管道,但当应用为在线客户端或关键业务服务时,升级并非易事,新模型在生产环境中的表现也可能不同。因此,在向所有客户端推出更改之前,应先进行隔离测试或仅向部分客户端公开最新版本。生产部署应包括实时升级、A/B 测试、故障恢复和回滚的策略和实现。
## 2. 可扩展应用管道的实现
### 2.1 应用管道活动及类型
服务应用管道执行一系列活动,例如拦截事件、丰富和处理数据、使用一个或多个模型进行预测以及返回响应或执行操作。这些活动可以按顺序执行(一个接一个)、并行执行或结合顺序和并行活动。管道可以是同步的(客户端等待响应)或异步的(客户端不等待)。
### 2.2 简单顺序实现示例
简单的顺序实现使用单个进程依次调用不同的活动。以下是一个使用 FastAPI 的顺序应用管道示例:
```python
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
app = FastAPI()
# Define the prediction request data (json) structure
class PredictRequest(BaseModel):
user: str
# ...
# API to get model endpoint status
@app.get("/")
async def get_status():
return {"model": "my-model", "version": 1.0, "status": "ok"}
# API to process data and make a prediction
@app.post("/predict")
async def predict(req: PredictRequest):
enriched_data = enrich_user(req)
data = pre_process(enriched_data)
prediction = model_predict(data)
return post_process(prediction, req)
def enrich_user(req: PredictRequest):
...
```
该示例的步骤如下:
1. 读取并丰富传入请求。
2. 数据预处理以生成特征向量。
3. 模型预测。
4. 处理模型结果并向客户端返回响应。
### 2.3 分布式架构及并行性
如果想将工作分配到多个微服务或避免包依赖,可以使用主进程通过 REST API 调用执行不同活动(通过单独的微服务实现)。但分布式架构需要处理额外的复杂性,如部分故障、重试、服务认证和跨多个微服务的滚动升级。在本地和分布式架构中,流程通常是顺序和同步的,这可能导致性能较慢。可以通过添加并行性来提高性能,例如主进程可以使用线程或异步方式并行执行多个活动,但这会增加代码的复杂性。
### 2.4 异步或流式管道框架
实现分布式处理、并行性和简单性的一种方法是使用异步或流式管道框架,在其中定义活动图(有向无环图,DAG)。然后,框架使用分布式计算资源执行、扩展和跟踪活动。此外,应用管道作为一个托管服务进行监控、部署和升级。常见的应用管道架构选项如下:
- 同一进程中的顺序活动。
- 使用几个进程结合顺序和并行活动。
- 异步流管道。
### 2.5 常见分布式管道框架
行业中有多种商业和开源的分布式管道框架,这里介绍几种常见的:
| 框架名称 | 特点 | 优势 | 劣势 |
| ---- | ---- | ---- | ---- |
| AWS Step Functions | 执行单个步骤的状态机,步骤可调用无服务器 Lambda 函数或 AWS 服务 | 功能丰富,可动态扩展资源,用户界面出色 | 每个步骤都需要创建新的 Lambda 函数,存在网络和数据序列化的性能开销 |
| Apache Beam | 专注于在线结构化数据处理的开源流处理框架 | 开源、可扩展,包含强大的数据操作符 | 对底层资源和每个步骤的包的灵活性和控制不如 AWS Step Functions |
| MLRun serving graphs | 结合了 AWS Step Functions 的多功能性和 Apache Beam 的并行数据处理能力 | 增加了机器学习和深度学习功能及内置组件,易于构建和调试可扩展管道 | - |
下面详细介绍这几种框架:
#### 2.5.1 AWS Step Functions
AWS Step Functions 是一个工作流服务,它执行单个步骤的状态机。步骤可以调用无服务器 Lambda 函数或 AWS 服务,该服务控制工作流的执行,其图形控制台将应用的工作流显示为一系列事件驱动的步骤。它有两种工作流类型:
- **标准工作流**:适用于长时间运行、可审计的工作流,因为它们显示执行历史记录和可视化调试信息。
- **快速工作流**:适用于高事件率的工作负载,如流数据处理和物联网数据摄取。快速工作流可以是同步的(等待工作流完成后返回结果)或异步的(不等待工作流完成)。
要构建具有数据准备和模型服务步骤的 ML 应用管道,需要先为每个步骤创建无服务器 Lambda 函数。以下是一个 TensorFlow 模型服务 Lambda 函数示例:
```python
import json
import boto3
import tensorflow.keras.models as models
s3 = boto3.client('s3')
# Load the Keras model from S3 during initialization
model_path = '<your-s3-bucket>/path/to/model.h5'
response = s3.get_object(Bucket='<your-s3-bucket>', Key=model_path)
model_bytes = response['Body'].read()
model = models.load_model(model_bytes)
def lambda_handler(event, context):
# Load input data from event
input_data = event['input_data']
# Make predictions using the preloaded model
predictions = model.predict(input_data)
# Return predictions as JSON
return json.dumps(predictions.tolist())
```
创建好所有函数后,可以定义多阶段工作流。以下是一个简单的异步管道示例,包含数据准备、模型预测和后处理:
```python
from stepfunctions.steps import LambdaStep, PassStep, Chain
from stepfunctions.workflow import Workflow
from stepfunctions.inputs import ExecutionInput
def create_workflow(input_data):
# Define the workflow using the stepfunctions library
with Workflow('MyWorkflow') as workflow:
# Define the Lambda function to preprocess the data
preprocess_data_step = LambdaStep(
'PreprocessData',
parameters={
'FunctionName': '<function-arn>',
'Payload': ExecutionInput(input=input_data)
}
)
# Define the Lambda function to load and run the model
run_model_step = LambdaStep(
'RunModel',
parameters={
'FunctionName': '<function-arn>',
'Payload': ExecutionInput(
input=preprocess_data_step.output()['Payload'])
}
)
# Data post processing
```
0
0
复制全文
相关推荐









