@PostMapping("/pushFlow") public ResResult pushFlow(@RequestParam("signature") String signature, @RequestParam("text") String text, @RequestParam("audio_file") MultipartFile audio_file, @RequestParam("break_stream") Integer break_stream, @RequestParam("in_queue_action") String in_queue_action, @RequestParam("auto_slice") Integer auto_slice) { try { //调用接口返回信息 Map map = digitalEngineService.pushFlow(signature, text, audio_file, break_stream, in_queue_action, auto_slice); if (StatusEnum.SUCCESS.code == Integer.valueOf(map.get("code").toString())) { log.info("调用数字人引擎-实时推流接口-请求成功:{}", map); } else { log.error("调用数字人引擎-实时推流接口-请求失败:{}", map); } return new ResResult(Integer.valueOf(map.get("code").toString()), String.valueOf(map.get("msg")), map.get("data")); } catch (Exception e) { log.error("调用数字人引擎-实时推流接口-请求失败:{}", e.getStackTrace()); e.printStackTrace(); } return ResResult.error(); } public Map pushFlow(String signature, String text, MultipartFile audio_file, Integer break_stream, String in_queue_action, Integer auto_slice) { log.info("调用数字人引擎-实时推流接口-请求参数:{}", signature, text, audio_file, break_stream, in_queue_action, auto_slice); //返回信息 Map resultMap = new HashMap<>(); Map map = new HashMap<>(); resultMap.put("code", StatusEnum.SUCCESS.code); resultMap.put("msg", "Success"); resultMap.put("data", map); //创建连接池管理器 PoolingHttpClientConnectionManager connManager = new PoolingHttpClientConnectionManager(); //最大连接数 connManager.setMaxTotal(200); //每个路由最大连接数 connManager.setDefaultMaxPerRoute(50); CloseableHttpClient client = null; try { //数字人引擎实时推流接口地址 String pushFlowUrl = baseurl + "/api/v1/live/ask"; log.info("调用数字人引擎-实时推流接口-请求地址-pushFlowUrl:{}", pushFlowUrl); //配置超时参数 RequestConfig requestConfig = RequestConfig.custom() .setConnectTimeout(3000) // 连接超时 3s .setSocketTimeout(5000) // 数据传输超时 5s .setConnectionRequestTimeout(2000) // 从池中获取连接超时 2s .build(); client = HttpClients.custom() .setDefaultRequestConfig(requestConfig) .setConnectionManager(connManager) .build(); HttpPost request = new HttpPost(pushFlowUrl); //请求头添加 token request.setHeader("Authorization", token); log.info("调用数字人引擎-实时推流接口-请求参数-token:{}", token); //检查文件是否为空 /*if (audio_file.isEmpty()) { resultMap.put("code", StatusEnum.sysError.code); resultMap.put("msg", "音频文件不能为空"); return resultMap; }*/ //将文件内容转为字节数组 byte[] audioBytes = audio_file.getBytes(); //使用Base64编码 String base64Audio = Base64.getEncoder().encodeToString(audioBytes); //构建Multipart请求体 HttpEntity multipartEntity = MultipartEntityBuilder.create() // 添加文本参数 .addTextBody("signature", signature, ContentType.TEXT_PLAIN) .addTextBody("text", text, ContentType.TEXT_PLAIN) .addTextBody("audio_file", base64Audio, ContentType.TEXT_PLAIN) .addTextBody("break_stream", break_stream.toString(), ContentType.TEXT_PLAIN) .addTextBody("in_queue_action", in_queue_action, ContentType.TEXT_PLAIN) .addTextBody("auto_slice", auto_slice.toString(), ContentType.TEXT_PLAIN) .build(); //设置请求体 request.setEntity(multipartEntity); log.info("调用数字人引擎-实时推流接口-请求参数-multipartEntity:{}", multipartEntity); //发送请求 CloseableHttpResponse response = client.execute(request); log.info("调用数字人引擎-实时推流接口-Status:{}", response.getStatusLine()); //响应code int statusCode = response.getStatusLine().getStatusCode(); log.info("调用数字人引擎-实时推流接口-statusCode:{}", statusCode); if (200 == statusCode) { String responseBody = EntityUtils.toString(response.getEntity(), "UTF-8"); log.info("调用数字人引擎-实时推流接口-responseBody:{}", responseBody); JSONObject parseObject = JSONObject.parseObject(responseBody); log.info("调用数字人引擎-实时推流接口-parseObject:{}", parseObject); Integer code = parseObject.getInteger("code"); log.info("调用数字人引擎-实时推流接口-code:{}", code); String msg = parseObject.getString("msg"); log.info("调用数字人引擎-实时推流接口-msg:{}", msg); JSONObject data = parseObject.getJSONObject("data"); log.info("调用数字人引擎-实时推流接口-data:{}", data); resultMap.put("code", code); resultMap.put("msg", msg); resultMap.put("data", data); if (200 == code) { //当前请求对话id Integer ask_id = data.getInteger("ask_id"); log.info("调用数字人引擎-实时推流接口-ask_id:{}", ask_id); map.put("ask_id", ask_id); resultMap.put("data", map); } } else { resultMap.put("code", statusCode); resultMap.put("msg", response.getStatusLine()); } response.close(); } catch (IOException e) { log.error("调用数字人引擎-实时推流接口失败", e.getStackTrace()); resultMap.put("code", StatusEnum.sysError.code); resultMap.put("msg", "Error"); e.printStackTrace(); } finally { try { client.close(); } catch (Exception e) { e.printStackTrace(); } } return resultMap; },这两段代码的内容为使用form-data格式请求数据请求系统外部接口,使用JSON格式响应接收响应数据,现如何调整为流式请求数据请求系统外部接口,流式响应接收响应数据,请求参数和响应参数保持不变?
时间: 2025-07-20 14:16:42 浏览: 8
<think>我们有一个需求:将现有的基于form-data的HTTP请求方式改为流式请求,并保持参数不变,同时接收流式响应。
原始请求是使用form-data格式(multipart/form-data)发送的,现在我们要改为流式请求(即使用流式上传,并且接收流式响应)。
根据引用[2]:发送大文件时建议将请求做成数据流,但默认下Requests不支持流式上传,可以使用第三方包requests-toolbelt(本质还是multipart/form-data上传)。
同时,引用[3]提到,表单的编码类型默认是application/x-www-form-urlencoded,但文件上传通常使用multipart/form-data。
我们的目标:
1. 保持请求参数不变(即仍然是multipart/form-data格式,因为可能有文件或其他字段)
2. 改为流式上传(即边生成数据边上传,而不是一次性读取所有数据到内存)
3. 接收流式响应(即边接收响应边处理,而不是等待整个响应完成)
步骤:
1. 使用requests-toolbelt来实现流式上传multipart/form-data数据。
2. 使用requests的流式响应功能(stream=True)来处理响应。
但是,注意:requests-toolbelt的MultipartEncoder可以用于流式上传,因为它可以将数据分块编码,避免一次性加载到内存。
示例代码结构:
原始代码可能是这样的(使用form-data,非流式):
```python
import requests
url = 'http://example.com/upload'
files = {'file': open('largefile.bin', 'rb')}
data = {'key1': 'value1', 'key2': 'value2'}
response = requests.post(url, files=files, data=data)
```
改为流式上传和流式响应:
```python
import requests
from requests_toolbelt.multipart.encoder import MultipartEncoder
from requests_toolbelt import MultipartEncoderMonitor
# 创建一个生成器函数来读取文件,这样我们可以控制每次读取的块大小
# 但实际上,MultipartEncoder内部会处理分块
# 定义表单数据
data = {
'key1': 'value1',
'key2': 'value2',
'file': ('largefile.bin', open('largefile.bin', 'rb'), 'application/octet-stream')
}
# 创建MultipartEncoder对象,用于分块编码
encoder = MultipartEncoder(fields=data)
# 为了监控上传进度,可以添加回调函数(可选)
def my_callback(monitor):
# 这里可以打印上传进度
print(f"Uploaded {monitor.bytes_read} bytes")
# 创建监视器
monitor = MultipartEncoderMonitor(encoder, my_callback)
# 设置请求头,注意:必须指定Content-Type,并且要带上boundary
headers = {'Content-Type': monitor.content_type}
# 发送请求,设置stream=True以支持流式响应
response = requests.post(url, data=monitor, headers=headers, stream=True)
# 处理流式响应
for chunk in response.iter_content(chunk_size=1024):
# 处理每一块响应数据,例如写入文件或直接处理
# 注意:iter_content会自动解码gzip和deflate编码,除非设置decode_content=False
process_chunk(chunk)
```
但是,注意:上面的代码中,文件是使用`open`一次性读入的?实际上,`MultipartEncoder`在初始化时并不会立即读取整个文件,而是在迭代时按需读取。所以对于大文件,这样是可行的。
然而,引用[2]提到requests-toolbelt支持流式上传,正是通过这种方式。
另外,如果我们的参数中不包含文件,而只是一些字段,那么我们可以考虑使用其他方式,比如分块传输编码(chunked transfer encoding),但这里要求保持参数不变(即仍然使用multipart/form-data),所以我们必须使用multipart格式。
重要提示:如果我们的请求参数中包含大文件,使用这种方式可以避免内存溢出,因为数据是分块读取和发送的。
关于接收流式响应:
我们设置`stream=True`,然后使用`response.iter_content()`来迭代读取响应内容。注意,`iter_content`的参数`chunk_size`可以控制每次迭代返回的字节数(近似值,因为可能会受到网络数据包的影响)。
但是,注意:如果服务器返回的是流式响应(例如,实时生成的音频或视频流),那么我们就可以边接收边处理。
另外,引用[5]提供了一个流式响应的例子(TTS语音合成),它使用`response.iter_content(chunk_size=None)`来逐块读取数据(chunk_size=None表示使用响应中定义的块大小或按需读取)。
总结:
1. 安装requests-toolbelt:`pip install requests-toolbelt`
2. 使用MultipartEncoder来构建multipart/form-data请求体,并利用它进行流式上传。
3. 设置请求头Content-Type为encoder.content_type(或monitor.content_type)。
4. 在requests.post中设置data=monitor(或encoder)和stream=True。
5. 使用iter_content处理流式响应。
注意:如果不需要上传进度,可以不用MultipartEncoderMonitor,直接使用encoder即可。
示例代码(简化版):
```python
import requests
from requests_toolbelt.multipart.encoder import MultipartEncoder
url = 'http://example.com/upload'
# 构建表单数据,如果有文件,使用元组格式:('filename', file_obj, 'content_type')
# 如果没有文件,可以直接用字典,但注意:即使没有文件,我们仍然用MultipartEncoder来构建multipart格式
data = {
'field1': 'value1',
'field2': 'value2',
'file_field': ('filename.bin', open('filename.bin', 'rb'), 'application/octet-stream')
}
encoder = MultipartEncoder(fields=data)
headers = {'Content-Type': encoder.content_type}
# 发送流式请求
response = requests.post(url, data=encoder, headers=headers, stream=True)
# 检查响应状态
if response.status_code == 200:
# 处理流式响应
for chunk in response.iter_content(chunk_size=1024):
# 处理每个chunk,例如写入文件或进行其他处理
with open('output.bin', 'ab') as f:
f.write(chunk)
else:
print(f"Request failed with status code: {response.status_code}")
```
注意:文件字段的格式是元组,元组第一个元素是文件名,第二个是文件对象,第三个是文件类型(可选,默认会尝试猜测)。
如果字段值不是文件,直接使用字符串即可。
另外,如果文件很大,建议使用生成器来读取文件,以避免内存问题,但MultipartEncoder内部已经处理了,它每次只读取一部分。
最后,记得关闭打开的文件。上面的代码中,文件在encoder中打开,但encoder在请求发送完成后会关闭文件吗?查看文档,实际上MultipartEncoder在迭代完成(即发送完成)后不会自动关闭文件,所以我们需要手动关闭。但我们可以通过使用上下文管理器来确保文件关闭。
改进:我们可以自己管理文件对象,并在发送后关闭。
例如:
```python
with open('filename.bin', 'rb') as f:
data = {
'field1': 'value1',
'field2': 'value2',
'file_field': ('filename.bin', f, 'application/octet-stream')
}
encoder = MultipartEncoder(fields=data)
headers = {'Content-Type': encoder.content_type}
response = requests.post(url, data=encoder, headers=headers, stream=True)
# ... 处理响应
# 文件在离开with块后关闭
```
但是注意:encoder在发送请求时才会读取文件,所以必须确保在with块内发送请求。因为一旦离开with块,文件就会关闭,而发送请求时读取会失败。
因此,我们可以这样:
```python
files = []
try:
data_dict = {}
# 添加普通字段
data_dict['field1'] = 'value1'
data_dict['field2'] = 'value2'
# 添加文件字段
f = open('filename.bin', 'rb')
files.append(f) # 记录打开的文件,以便后续关闭
data_dict['file_field'] = ('filename.bin', f, 'application/octet-stream')
encoder = MultipartEncoder(fields=data_dict)
headers = {'Content-Type': encoder.content_type}
response = requests.post(url, data=encoder, headers=headers, stream=True)
# 处理响应...
finally:
for f in files:
f.close()
```
或者,我们可以使用一个自定义的上下文管理器来管理多个文件。
如果只有一个文件,使用with语句即可。
综上所述,我们通过使用requests-toolbelt的MultipartEncoder实现了multipart/form-data的流式上传,并通过requests的stream=True实现了流式响应处理。
注意:服务器必须支持流式上传和流式响应,否则可能无法正常工作。
相关问题:
1. 如何监控流式上传的进度?
2. 如果请求参数中没有文件,只有普通字段,是否还需要使用multipart/form-data格式?能否使用其他方式实现流式上传?
3. 如何处理流式响应中的错误?
4. 在流式上传过程中如果中断,如何实现断点续传?
5. 除了multipart/form-data,还有哪些数据格式支持流式上传?</think>要将基于 form-data 的 HTTP 请求改为流式请求(同时保持参数不变)并处理流式响应,需结合流式上传和流式响应处理技术。以下是实现方案:
### 解决方案
```python
import requests
from requests_toolbelt.multipart.encoder import MultipartEncoder
from requests_toolbelt import MultipartEncoderMonitor
# 原始请求参数(保持参数不变)
params = {
"text": "3D视频的USD格式...", # 文本参数
"speaker": "中文男", # 配置参数
"file": ("audio.wav", open("audio.wav", "rb"), "audio/wav") # 文件参数
}
# 创建流式编码器
encoder = MultipartEncoder(fields=params)
# 创建流式监视器(可选:用于跟踪上传进度)
def callback(monitor):
print(f"已上传: {monitor.bytes_read} bytes")
monitor = MultipartEncoderMonitor(encoder, callback)
# 设置请求头(关键!)
headers = {
"Content-Type": monitor.content_type, # 动态获取boundary
"Accept": "application/octet-stream" # 根据实际响应类型调整
}
# 发送流式请求并接收流式响应
url = "http://your-api-endpoint.com/stream"
with requests.post(url, data=monitor, headers=headers, stream=True) as response:
if response.status_code == 200:
# 处理流式响应
for chunk in response.iter_content(chunk_size=1024):
if chunk: # 过滤keep-alive空块
process_chunk(chunk) # 自定义处理函数
else:
print(f"请求失败,状态码: {response.status_code}")
```
### 关键修改说明
1. **流式上传实现**:
- 使用 `MultipartEncoder` 将参数编码为流式格式,避免一次性加载大文件到内存[^2]
- `MultipartEncoderMonitor` 可选用于监控上传进度
- 保持原始参数结构不变(文本/文件/配置参数)
2. **流式响应处理**:
- 设置 `stream=True` 启用响应流
- 使用 `iter_content()` 按数据块迭代处理响应
- 使用 `with` 语句确保资源释放
3. **请求头配置**:
- `Content-Type` 必须设为 `multipart/form-data; boundary=...`(由编码器动态生成)
- `Accept` 头需匹配服务器的流式响应格式(如音频流、JSON流等)
### 注意事项
1. **文件资源管理**:
```python
# 建议显式关闭文件(或使用上下文管理器)
file = open("audio.wav", "rb")
try:
params["file"] = ("audio.wav", file, "audio/wav")
# ...发送请求...
finally:
file.close()
```
2. **响应处理优化**:
```python
# 示例:实时保存音频流
with open("output.wav", "wb") as f:
for chunk in response.iter_content(4096):
f.write(chunk)
```
3. **错误处理**:
```python
response.raise_for_status() # 检查HTTP错误
if "application/json" in response.headers.get("Content-Type", ""):
error = response.json() # 处理错误JSON响应
```
### 备选方案:纯 requests 流式上传
若不想引入第三方库,可使用生成器实现基础流式上传:
```python
def generate_stream():
yield encoder.to_string() # 注意:可能占用较多内存
requests.post(url, data=generate_stream(), headers=headers, stream=True)
```
[^1]: 保持参数结构不变需使用 `MultipartEncoder` 封装原始数据
[^2]: 流式上传通过分块传输减少内存占用,特别适合大文件
[^3]: 表单编码类型必须设为 `multipart/form-data` 以兼容原始接口
[^4]: 流式响应处理需按数据块迭代而非一次性加载
[^5]: 实时音频流等场景需匹配正确的采样率参数
---
### 相关问题
1. **如何监控流式上传/下载的实时进度?**
2. **当服务器返回分块编码(chunked)响应时,如何处理不完整的JSON数据?**
3. **流式请求中如何实现超时和重试机制?**
4. **多文件流式上传时如何优化内存管理?**
5. **哪些场景下流式请求不适合使用?**
阅读全文