feat(ZsyhCbsProcessor): 实现分页查询并合并结果

- 添加分页查询逻辑,支持多页数据的请求和合并
- 使用Jackson库处理JSON数据,提高代码可读性和维护性
- 优化错误处理,增加详细的日志记录和异常信息- 改进代码结构,提高可扩展性和重用性
This commit is contained in:
liuy 2025-09-10 18:38:05 +08:00
parent d44e73556f
commit ca263155c0
1 changed files with 88 additions and 33 deletions

View File

@ -1,5 +1,9 @@
package com.hzya.frame;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.hzya.frame.util.ApiTokenManager;
import com.hzya.frame.util.Constants;
import com.hzya.frame.util.CbsApiHandler;
@ -21,6 +25,7 @@ import org.bouncycastle.jce.provider.BouncyCastleProvider;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.security.Security;
import java.util.*;
@ -96,6 +101,7 @@ public class ZsyhCbsProcessor extends AbstractProcessor {
ApiTokenManager.getInstance().configure(context.getProperty(PROP_TOKEN_URL).getValue(), context.getProperty(PROP_REFRESH_URL).getValue(), context.getProperty(PROP_APP_ID).getValue(), context.getProperty(PROP_APP_SECRET).getValue(), getLogger());
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
@ -103,52 +109,101 @@ public class ZsyhCbsProcessor extends AbstractProcessor {
return;
}
CloseableHttpClient client = HttpClients.custom().disableContentCompression().build();
// 使用Jackson库来处理JSON
final ObjectMapper objectMapper = new ObjectMapper();
// 用于累加所有分页查询结果的列表
List<JsonNode> allRecords = new ArrayList<>();
try {
// 读取FlowFile内容作为业务请求体
// byte[] requestDataBytes = IOUtils.toByteArray(session.read(flowFile));
byte[] requestDataBytes;
try (InputStream in = session.read(flowFile)) {
requestDataBytes = IOUtils.toByteArray(in);
}
//读取初始FlowFile内容作为后续分页请求的模板
String initialRequestJson = IOUtils.toString(session.read(flowFile), StandardCharsets.UTF_8);
ObjectNode requestTemplate = (ObjectNode) objectMapper.readTree(initialRequestJson);
//从处理器属性中获取所有动态配置
String targetUrl = context.getProperty(PROP_TARGET_URL).getValue();
String bodyEncryptionKey = context.getProperty(PROP_BODY_ENCRYPTION_KEY).getValue();
String signPrivateKey = context.getProperty(PROP_SIGN_PRIVATE_KEY).getValue();
String bodyDecryptionKey = context.getProperty(PROP_BODY_DECRYPTION_KEY).getValue();
// 获取初始页码
int currentPage = requestTemplate.has("currentPage") ? requestTemplate.get("currentPage").asInt() : 1;
//从Token管理器获取有效Token
String bearerToken = ApiTokenManager.getInstance().getValidToken();
String token = bearerToken.startsWith(Constants.BEARER) ? bearerToken.substring(Constants.BEARER.length()) : bearerToken;
//开始分页循环
while (true) {
getLogger().info("正在请求第 {} 页数据...", currentPage);
//实例化业务请求类并注入所有配置
CbsApiHandler sm2Example = new CbsApiHandler(targetUrl, token, bodyEncryptionKey, signPrivateKey, bodyDecryptionKey, requestDataBytes);
//更新请求模板中的当前页码
requestTemplate.put("currentPage", currentPage);
byte[] requestDataBytes = objectMapper.writeValueAsBytes(requestTemplate);
//调用业务类的方法完成请求准备发送和响应处理
HttpPost httpPost = sm2Example.setupRequest();
try (CloseableHttpResponse response = client.execute(httpPost)) {
//执行单次API请求
byte[] decryptedResponseBytes;
CloseableHttpClient client = HttpClients.custom().disableContentCompression().build();
try {
String targetUrl = context.getProperty(PROP_TARGET_URL).getValue();
String bodyEncryptionKey = context.getProperty(PROP_BODY_ENCRYPTION_KEY).getValue();
String signPrivateKey = context.getProperty(PROP_SIGN_PRIVATE_KEY).getValue();
String bodyDecryptionKey = context.getProperty(PROP_BODY_DECRYPTION_KEY).getValue();
byte[] finalResponseData = sm2Example.handleResponse(response);
String finalResponseDataStr = new String(finalResponseData);
String bearerToken = ApiTokenManager.getInstance().getValidToken();
String token = bearerToken.startsWith(Constants.BEARER) ? bearerToken.substring(Constants.BEARER.length()) : bearerToken;
if (response.getStatusLine().getStatusCode() >= 300) {
throw new IOException("HTTP请求失败状态码: " + response.getStatusLine().getStatusCode() + " 失败详情:" + finalResponseDataStr);
CbsApiHandler apiHandler = new CbsApiHandler(targetUrl, token, bodyEncryptionKey, signPrivateKey, bodyDecryptionKey, requestDataBytes);
HttpPost httpPost = apiHandler.setupRequest();
try (CloseableHttpResponse response = client.execute(httpPost)) {
if (response.getStatusLine().getStatusCode() >= 300) {
String errorBody = new String(IOUtils.toByteArray(response.getEntity().getContent()), StandardCharsets.UTF_8);
throw new IOException("HTTP请求失败页码: " + currentPage + ", 状态码: " + response.getStatusLine().getStatusCode() + ", 详情: " + errorBody);
}
decryptedResponseBytes = apiHandler.handleResponse(response);
}
} finally {
client.close();
}
flowFile = session.write(flowFile, out -> out.write(finalResponseData));
session.transfer(flowFile, REL_SUCCESS);
//解析解密后的响应体
if (decryptedResponseBytes == null || decryptedResponseBytes.length == 0) {
getLogger().info("第 {} 页响应体为空,认定为分页结束。", currentPage);
break;
}
JsonNode responseJson = objectMapper.readTree(new String(decryptedResponseBytes, StandardCharsets.UTF_8));
// 检查业务返回码
String code = responseJson.path("code").asText();
if (!"0".equals(code)) {
throw new ProcessException("API业务错误页码: " + currentPage + ", 错误码: " + code + ", 消息: " + responseJson.path("msg").asText());
}
//提取当页的记录列表
JsonNode listNode = responseJson.path("data").path("list");
//判断循环终止条件
if (!listNode.isArray() || listNode.size() == 0) {
getLogger().info("第 {} 页返回的记录列表为空,认定为分页结束。", currentPage);
break; // 如果list为空或不存在则结束循环
}
//累加结果
for (JsonNode record : listNode) {
allRecords.add(record);
}
//准备请求下一页
currentPage++;
}
//所有分页成功后合并最终结果
ObjectNode finalResult = objectMapper.createObjectNode();
ArrayNode finalArray = objectMapper.valueToTree(allRecords);
finalResult.set("allRecords", finalArray); // 将所有记录放入一个名为"allRecords"的数组中
finalResult.put("totalFetched", allRecords.size());
//将合并后的JSON写入FlowFile并路由到success
flowFile = session.write(flowFile, out -> objectMapper.writerWithDefaultPrettyPrinter().writeValue(out, finalResult));
flowFile = session.putAttribute(flowFile, "api.total.records.fetched", String.valueOf(allRecords.size()));
session.transfer(flowFile, REL_SUCCESS);
} catch (Exception e) {
getLogger().error("调用招行CBS接口失败: {}", e.getMessage(), e);
getLogger().error("调用招行CBS接口并处理分页时失败: {}", e.getMessage(), e);
flowFile = session.putAttribute(flowFile, "api.error", e.getMessage());
session.transfer(session.penalize(flowFile), REL_FAILURE);
} finally {
try {
client.close();
} catch (IOException e) {
getLogger().warn("关闭HttpClient时出错: {}", e.getMessage(), e);
}
}
}
}