diff --git a/nifi-hzyadev-bundle/hzya-nifi-Zsyh-cbs-processors/src/main/java/com/hzya/frame/ZsyhCbsProcessor.java b/nifi-hzyadev-bundle/hzya-nifi-Zsyh-cbs-processors/src/main/java/com/hzya/frame/ZsyhCbsProcessor.java index 2bd0ad2..e9a945f 100644 --- a/nifi-hzyadev-bundle/hzya-nifi-Zsyh-cbs-processors/src/main/java/com/hzya/frame/ZsyhCbsProcessor.java +++ b/nifi-hzyadev-bundle/hzya-nifi-Zsyh-cbs-processors/src/main/java/com/hzya/frame/ZsyhCbsProcessor.java @@ -1,5 +1,9 @@ package com.hzya.frame; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.hzya.frame.util.ApiTokenManager; import com.hzya.frame.util.Constants; import com.hzya.frame.util.CbsApiHandler; @@ -21,6 +25,7 @@ import org.bouncycastle.jce.provider.BouncyCastleProvider; import java.io.IOException; import java.io.InputStream; +import java.nio.charset.StandardCharsets; import java.security.Security; import java.util.*; @@ -96,6 +101,7 @@ public class ZsyhCbsProcessor extends AbstractProcessor { ApiTokenManager.getInstance().configure(context.getProperty(PROP_TOKEN_URL).getValue(), context.getProperty(PROP_REFRESH_URL).getValue(), context.getProperty(PROP_APP_ID).getValue(), context.getProperty(PROP_APP_SECRET).getValue(), getLogger()); } + @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { FlowFile flowFile = session.get(); @@ -103,52 +109,101 @@ public class ZsyhCbsProcessor extends AbstractProcessor { return; } - CloseableHttpClient client = HttpClients.custom().disableContentCompression().build(); + // 使用Jackson库来处理JSON + final ObjectMapper objectMapper = new ObjectMapper(); + + // 用于累加所有分页查询结果的列表 + List allRecords = new ArrayList<>(); + try { - // 读取FlowFile内容,作为业务请求体 -// byte[] requestDataBytes = IOUtils.toByteArray(session.read(flowFile)); - byte[] requestDataBytes; - try (InputStream in = session.read(flowFile)) { - requestDataBytes = IOUtils.toByteArray(in); - } + //读取初始FlowFile内容,作为后续分页请求的模板 + String initialRequestJson = IOUtils.toString(session.read(flowFile), StandardCharsets.UTF_8); + ObjectNode requestTemplate = (ObjectNode) objectMapper.readTree(initialRequestJson); - //从处理器属性中获取所有动态配置 - String targetUrl = context.getProperty(PROP_TARGET_URL).getValue(); - String bodyEncryptionKey = context.getProperty(PROP_BODY_ENCRYPTION_KEY).getValue(); - String signPrivateKey = context.getProperty(PROP_SIGN_PRIVATE_KEY).getValue(); - String bodyDecryptionKey = context.getProperty(PROP_BODY_DECRYPTION_KEY).getValue(); + // 获取初始页码 + int currentPage = requestTemplate.has("currentPage") ? requestTemplate.get("currentPage").asInt() : 1; - //从Token管理器获取有效Token - String bearerToken = ApiTokenManager.getInstance().getValidToken(); - String token = bearerToken.startsWith(Constants.BEARER) ? bearerToken.substring(Constants.BEARER.length()) : bearerToken; + //开始分页循环 + while (true) { + getLogger().info("正在请求第 {} 页数据...", currentPage); - //实例化业务请求类,并注入所有配置 - CbsApiHandler sm2Example = new CbsApiHandler(targetUrl, token, bodyEncryptionKey, signPrivateKey, bodyDecryptionKey, requestDataBytes); + //更新请求模板中的当前页码 + requestTemplate.put("currentPage", currentPage); + byte[] requestDataBytes = objectMapper.writeValueAsBytes(requestTemplate); - //调用业务类的方法,完成请求准备、发送和响应处理 - HttpPost httpPost = sm2Example.setupRequest(); - try (CloseableHttpResponse response = client.execute(httpPost)) { + //执行单次API请求 + byte[] decryptedResponseBytes; + CloseableHttpClient client = HttpClients.custom().disableContentCompression().build(); + try { + String targetUrl = context.getProperty(PROP_TARGET_URL).getValue(); + String bodyEncryptionKey = context.getProperty(PROP_BODY_ENCRYPTION_KEY).getValue(); + String signPrivateKey = context.getProperty(PROP_SIGN_PRIVATE_KEY).getValue(); + String bodyDecryptionKey = context.getProperty(PROP_BODY_DECRYPTION_KEY).getValue(); - byte[] finalResponseData = sm2Example.handleResponse(response); - String finalResponseDataStr = new String(finalResponseData); + String bearerToken = ApiTokenManager.getInstance().getValidToken(); + String token = bearerToken.startsWith(Constants.BEARER) ? bearerToken.substring(Constants.BEARER.length()) : bearerToken; - if (response.getStatusLine().getStatusCode() >= 300) { - throw new IOException("HTTP请求失败,状态码: " + response.getStatusLine().getStatusCode() + " 失败详情:" + finalResponseDataStr); + CbsApiHandler apiHandler = new CbsApiHandler(targetUrl, token, bodyEncryptionKey, signPrivateKey, bodyDecryptionKey, requestDataBytes); + + HttpPost httpPost = apiHandler.setupRequest(); + try (CloseableHttpResponse response = client.execute(httpPost)) { + if (response.getStatusLine().getStatusCode() >= 300) { + String errorBody = new String(IOUtils.toByteArray(response.getEntity().getContent()), StandardCharsets.UTF_8); + throw new IOException("HTTP请求失败,页码: " + currentPage + ", 状态码: " + response.getStatusLine().getStatusCode() + ", 详情: " + errorBody); + } + decryptedResponseBytes = apiHandler.handleResponse(response); + } + } finally { + client.close(); } - flowFile = session.write(flowFile, out -> out.write(finalResponseData)); - session.transfer(flowFile, REL_SUCCESS); + //解析解密后的响应体 + if (decryptedResponseBytes == null || decryptedResponseBytes.length == 0) { + getLogger().info("第 {} 页响应体为空,认定为分页结束。", currentPage); + break; + } + + JsonNode responseJson = objectMapper.readTree(new String(decryptedResponseBytes, StandardCharsets.UTF_8)); + + // 检查业务返回码 + String code = responseJson.path("code").asText(); + if (!"0".equals(code)) { + throw new ProcessException("API业务错误,页码: " + currentPage + ", 错误码: " + code + ", 消息: " + responseJson.path("msg").asText()); + } + + //提取当页的记录列表 + JsonNode listNode = responseJson.path("data").path("list"); + + //判断循环终止条件 + if (!listNode.isArray() || listNode.size() == 0) { + getLogger().info("第 {} 页返回的记录列表为空,认定为分页结束。", currentPage); + break; // 如果list为空或不存在,则结束循环 + } + + //累加结果 + for (JsonNode record : listNode) { + allRecords.add(record); + } + + //准备请求下一页 + currentPage++; } + + //所有分页成功后,合并最终结果 + ObjectNode finalResult = objectMapper.createObjectNode(); + ArrayNode finalArray = objectMapper.valueToTree(allRecords); + finalResult.set("allRecords", finalArray); // 将所有记录放入一个名为"allRecords"的数组中 + finalResult.put("totalFetched", allRecords.size()); + + //将合并后的JSON写入FlowFile并路由到success + flowFile = session.write(flowFile, out -> objectMapper.writerWithDefaultPrettyPrinter().writeValue(out, finalResult)); + flowFile = session.putAttribute(flowFile, "api.total.records.fetched", String.valueOf(allRecords.size())); + session.transfer(flowFile, REL_SUCCESS); + } catch (Exception e) { - getLogger().error("调用招行CBS接口失败: {}", e.getMessage(), e); + getLogger().error("调用招行CBS接口并处理分页时失败: {}", e.getMessage(), e); flowFile = session.putAttribute(flowFile, "api.error", e.getMessage()); session.transfer(session.penalize(flowFile), REL_FAILURE); - } finally { - try { - client.close(); - } catch (IOException e) { - getLogger().warn("关闭HttpClient时出错: {}", e.getMessage(), e); - } } } } \ No newline at end of file