diff --git a/nifi-hzyadev-bundle/hzya-nifi-Zsyh-cbs-processors/pom.xml b/nifi-hzyadev-bundle/hzya-nifi-Zsyh-cbs-processors/pom.xml index cd958ad..1f1c168 100644 --- a/nifi-hzyadev-bundle/hzya-nifi-Zsyh-cbs-processors/pom.xml +++ b/nifi-hzyadev-bundle/hzya-nifi-Zsyh-cbs-processors/pom.xml @@ -69,7 +69,7 @@ com.fasterxml.jackson.core jackson-databind 2.15.2 - test + diff --git a/nifi-hzyadev-bundle/hzya-nifi-Zsyh-cbs-processors/src/main/java/com/hzya/frame/ZsyhCbsProcessor.java b/nifi-hzyadev-bundle/hzya-nifi-Zsyh-cbs-processors/src/main/java/com/hzya/frame/ZsyhCbsProcessor.java index f15cbbd..7f82574 100644 --- a/nifi-hzyadev-bundle/hzya-nifi-Zsyh-cbs-processors/src/main/java/com/hzya/frame/ZsyhCbsProcessor.java +++ b/nifi-hzyadev-bundle/hzya-nifi-Zsyh-cbs-processors/src/main/java/com/hzya/frame/ZsyhCbsProcessor.java @@ -20,6 +20,7 @@ import org.apache.nifi.processor.util.StandardValidators; import org.bouncycastle.jce.provider.BouncyCastleProvider; import java.io.IOException; +import java.io.InputStream; import java.security.Security; import java.util.*; @@ -104,23 +105,27 @@ public class ZsyhCbsProcessor extends AbstractProcessor { CloseableHttpClient client = HttpClients.custom().disableContentCompression().build(); try { - // 1. 读取FlowFile内容,作为业务请求体 - byte[] requestDataBytes = IOUtils.toByteArray(session.read(flowFile)); + // 读取FlowFile内容,作为业务请求体 +// byte[] requestDataBytes = IOUtils.toByteArray(session.read(flowFile)); + byte[] requestDataBytes; + try (InputStream in = session.read(flowFile)) { + requestDataBytes = IOUtils.toByteArray(in); + } - // 2. 从处理器属性中获取所有动态配置 + //从处理器属性中获取所有动态配置 String targetUrl = context.getProperty(PROP_TARGET_URL).getValue(); String bodyEncryptionKey = context.getProperty(PROP_BODY_ENCRYPTION_KEY).getValue(); String signPrivateKey = context.getProperty(PROP_SIGN_PRIVATE_KEY).getValue(); String bodyDecryptionKey = context.getProperty(PROP_BODY_DECRYPTION_KEY).getValue(); - // 3. 从Token管理器获取有效Token + //从Token管理器获取有效Token String bearerToken = ApiTokenManager.getInstance().getValidToken(); String token = bearerToken.startsWith(Constants.BEARER) ? bearerToken.substring(Constants.BEARER.length()) : bearerToken; - // 4. 实例化您改造后的业务类,并注入所有配置 + //实例化业务请求类,并注入所有配置 CbsApiHandler sm2Example = new CbsApiHandler(targetUrl, token, bodyEncryptionKey, signPrivateKey, bodyDecryptionKey, requestDataBytes); - // 5. 调用业务类的方法,完成请求准备、发送和响应处理 (逻辑来自您的测试类) + //调用业务类的方法,完成请求准备、发送和响应处理 HttpPost httpPost = sm2Example.setupRequest(); try (CloseableHttpResponse response = client.execute(httpPost)) { @@ -130,11 +135,12 @@ public class ZsyhCbsProcessor extends AbstractProcessor { byte[] finalResponseData = sm2Example.handleResponse(response); - // 6. 将最终解密后的结果写入FlowFile并路由到success + //将最终解密后的结果写入FlowFile并路由到success + String finalResponseDataStr = new String(finalResponseData); + getLogger().info("接口返回结果:{}", finalResponseDataStr); flowFile = session.write(flowFile, out -> out.write(finalResponseData)); session.transfer(flowFile, REL_SUCCESS); } - } catch (Exception e) { getLogger().error("调用招行CBS接口失败: {}", e.getMessage(), e); flowFile = session.putAttribute(flowFile, "api.error", e.getMessage()); diff --git a/nifi-hzyadev-bundle/hzya-nifi-Zsyh-cbs-processors/src/main/java/com/hzya/frame/util/ApiTokenManager.java b/nifi-hzyadev-bundle/hzya-nifi-Zsyh-cbs-processors/src/main/java/com/hzya/frame/util/ApiTokenManager.java index 3a435b7..49f37aa 100644 --- a/nifi-hzyadev-bundle/hzya-nifi-Zsyh-cbs-processors/src/main/java/com/hzya/frame/util/ApiTokenManager.java +++ b/nifi-hzyadev-bundle/hzya-nifi-Zsyh-cbs-processors/src/main/java/com/hzya/frame/util/ApiTokenManager.java @@ -1,4 +1,4 @@ -package com.hzya.frame.util; // 请替换为您项目的包名 +package com.hzya.frame.util; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; @@ -15,8 +15,8 @@ import java.nio.charset.StandardCharsets; import java.util.stream.Collectors; /** - * 线程安全的API Token管理器(重构版)。 - * 遵循“一次配置,多次使用”的原则。 + * 线程安全的API Token管理器 + * 遵循“一次配置,多次使用”的原则 */ public class ApiTokenManager { diff --git a/nifi-hzyadev-bundle/hzya-nifi-Zsyh-cbs-processors/src/main/java/com/hzya/frame/util/CbsApiHandler.java b/nifi-hzyadev-bundle/hzya-nifi-Zsyh-cbs-processors/src/main/java/com/hzya/frame/util/CbsApiHandler.java index 04012e2..a8ba455 100644 --- a/nifi-hzyadev-bundle/hzya-nifi-Zsyh-cbs-processors/src/main/java/com/hzya/frame/util/CbsApiHandler.java +++ b/nifi-hzyadev-bundle/hzya-nifi-Zsyh-cbs-processors/src/main/java/com/hzya/frame/util/CbsApiHandler.java @@ -16,8 +16,10 @@ import java.nio.charset.StandardCharsets; import java.util.zip.GZIPInputStream; /** - * SM2请求加解密实例 (已改造为可配置版本) - * 这个类现在通过构造函数接收所有配置,不再硬编码任何值。 + * SM2请求加解密实例 + * 这个类现在通过构造函数接收所有配置 + * + * @author liuyang */ public class CbsApiHandler { diff --git a/nifi-hzyadev-bundle/hzya-nifi-Zsyh-cbs-processors/src/test/java/com/hzya/frame/ZsyhCbsProcessorTest.java b/nifi-hzyadev-bundle/hzya-nifi-Zsyh-cbs-processors/src/test/java/com/hzya/frame/ZsyhCbsProcessorTest.java index 5115945..70642e5 100644 --- a/nifi-hzyadev-bundle/hzya-nifi-Zsyh-cbs-processors/src/test/java/com/hzya/frame/ZsyhCbsProcessorTest.java +++ b/nifi-hzyadev-bundle/hzya-nifi-Zsyh-cbs-processors/src/test/java/com/hzya/frame/ZsyhCbsProcessorTest.java @@ -54,21 +54,24 @@ public class ZsyhCbsProcessorTest { @Test public void testProcessorHappyPath() throws Exception { // 配置处理器属性 - // 让所有URL都指向我们的模拟服务器 - runner.setProperty(ZsyhCbsProcessor.PROP_TOKEN_URL, ""); - runner.setProperty(ZsyhCbsProcessor.PROP_REFRESH_URL, ""); - runner.setProperty(ZsyhCbsProcessor.PROP_TARGET_URL, ""); + runner.setProperty(ZsyhCbsProcessor.PROP_TOKEN_URL, "https://cbs8-openapi-reprd.csuat.cmburl.cn/openapi/app/v1/app/token");//获取token + runner.setProperty(ZsyhCbsProcessor.PROP_REFRESH_URL, "https://cbs8-openapi-reprd.csuat.cmburl.cn/openapi/app/v1/app/refresh-token");//刷新token + runner.setProperty(ZsyhCbsProcessor.PROP_TARGET_URL, "https://cbs8-openapi-reprd.csuat.cmburl.cn/openapi/draft2/openapi/v2/trsinfo/query-trs-info-page");//交易明细查询 // 配置其他必要的密钥和ID - runner.setProperty(ZsyhCbsProcessor.PROP_APP_ID, ""); - runner.setProperty(ZsyhCbsProcessor.PROP_APP_SECRET, ""); + runner.setProperty(ZsyhCbsProcessor.PROP_APP_ID, "1P4AGrpz"); + runner.setProperty(ZsyhCbsProcessor.PROP_APP_SECRET, "2c2369ae5dc04382844bbe3a5abf39e1bea9cd3a"); runner.setProperty(ZsyhCbsProcessor.PROP_BODY_ENCRYPTION_KEY, MOCK_PLATFORM_PUBLIC_KEY); runner.setProperty(ZsyhCbsProcessor.PROP_SIGN_PRIVATE_KEY, MOCK_ENTERPRISE_SIGN_PRIVATE_KEY); runner.setProperty(ZsyhCbsProcessor.PROP_BODY_DECRYPTION_KEY, MOCK_ENTERPRISE_DECRYPT_PRIVATE_KEY); // 3. 准备输入的FlowFile,其内容就是原始的业务请求JSON -// String businessRequestJson = "{\"request_param\":\"some_value\"}"; -// runner.enqueue(businessRequestJson); + String businessRequestJson = "{\n" + + "\t\"applyDateStart\":\"2025-08-01\",\n" + + "\t\"applyDateEnd\":\"2025-08-31\",\n" + + "\t\"initAccountList\":\"591915131310106\"\n" + + "}"; + runner.enqueue(businessRequestJson); // --- 执行阶段 (Act) --- System.out.println("--- 开始运行处理器 ---");