refactor(hzya-nifi-Zsyh-cbs-processors):重构 API 处理逻辑

- 优化了 ApiTokenManager 类的注释
- 更新了 CbsApiHandler 类的注释和作者信息
- 改进了 ZsyhCbsProcessor 类的代码结构和异常处理
- 更新了 ZsyhCbsProcessorTest 类的测试用例
- 调整了 pom.xml 中的依赖范围
This commit is contained in:
liuy 2025-09-10 09:40:04 +08:00
parent 72b71f2084
commit 1dfbe07f4c
5 changed files with 33 additions and 22 deletions

View File

@ -69,7 +69,7 @@
<groupId>com.fasterxml.jackson.core</groupId> <groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId> <artifactId>jackson-databind</artifactId>
<version>2.15.2</version> <version>2.15.2</version>
<scope>test</scope> <!-- <scope>test</scope>-->
</dependency> </dependency>

View File

@ -20,6 +20,7 @@ import org.apache.nifi.processor.util.StandardValidators;
import org.bouncycastle.jce.provider.BouncyCastleProvider; import org.bouncycastle.jce.provider.BouncyCastleProvider;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.security.Security; import java.security.Security;
import java.util.*; import java.util.*;
@ -104,23 +105,27 @@ public class ZsyhCbsProcessor extends AbstractProcessor {
CloseableHttpClient client = HttpClients.custom().disableContentCompression().build(); CloseableHttpClient client = HttpClients.custom().disableContentCompression().build();
try { try {
// 1. 读取FlowFile内容作为业务请求体 // 读取FlowFile内容作为业务请求体
byte[] requestDataBytes = IOUtils.toByteArray(session.read(flowFile)); // byte[] requestDataBytes = IOUtils.toByteArray(session.read(flowFile));
byte[] requestDataBytes;
try (InputStream in = session.read(flowFile)) {
requestDataBytes = IOUtils.toByteArray(in);
}
// 2. 从处理器属性中获取所有动态配置 //从处理器属性中获取所有动态配置
String targetUrl = context.getProperty(PROP_TARGET_URL).getValue(); String targetUrl = context.getProperty(PROP_TARGET_URL).getValue();
String bodyEncryptionKey = context.getProperty(PROP_BODY_ENCRYPTION_KEY).getValue(); String bodyEncryptionKey = context.getProperty(PROP_BODY_ENCRYPTION_KEY).getValue();
String signPrivateKey = context.getProperty(PROP_SIGN_PRIVATE_KEY).getValue(); String signPrivateKey = context.getProperty(PROP_SIGN_PRIVATE_KEY).getValue();
String bodyDecryptionKey = context.getProperty(PROP_BODY_DECRYPTION_KEY).getValue(); String bodyDecryptionKey = context.getProperty(PROP_BODY_DECRYPTION_KEY).getValue();
// 3. 从Token管理器获取有效Token //从Token管理器获取有效Token
String bearerToken = ApiTokenManager.getInstance().getValidToken(); String bearerToken = ApiTokenManager.getInstance().getValidToken();
String token = bearerToken.startsWith(Constants.BEARER) ? bearerToken.substring(Constants.BEARER.length()) : bearerToken; String token = bearerToken.startsWith(Constants.BEARER) ? bearerToken.substring(Constants.BEARER.length()) : bearerToken;
// 4. 实例化您改造后的业务类并注入所有配置 //实例化业务请求并注入所有配置
CbsApiHandler sm2Example = new CbsApiHandler(targetUrl, token, bodyEncryptionKey, signPrivateKey, bodyDecryptionKey, requestDataBytes); CbsApiHandler sm2Example = new CbsApiHandler(targetUrl, token, bodyEncryptionKey, signPrivateKey, bodyDecryptionKey, requestDataBytes);
// 5. 调用业务类的方法完成请求准备发送和响应处理 (逻辑来自您的测试类) //调用业务类的方法完成请求准备发送和响应处理
HttpPost httpPost = sm2Example.setupRequest(); HttpPost httpPost = sm2Example.setupRequest();
try (CloseableHttpResponse response = client.execute(httpPost)) { try (CloseableHttpResponse response = client.execute(httpPost)) {
@ -130,11 +135,12 @@ public class ZsyhCbsProcessor extends AbstractProcessor {
byte[] finalResponseData = sm2Example.handleResponse(response); byte[] finalResponseData = sm2Example.handleResponse(response);
// 6. 将最终解密后的结果写入FlowFile并路由到success //将最终解密后的结果写入FlowFile并路由到success
String finalResponseDataStr = new String(finalResponseData);
getLogger().info("接口返回结果:{}", finalResponseDataStr);
flowFile = session.write(flowFile, out -> out.write(finalResponseData)); flowFile = session.write(flowFile, out -> out.write(finalResponseData));
session.transfer(flowFile, REL_SUCCESS); session.transfer(flowFile, REL_SUCCESS);
} }
} catch (Exception e) { } catch (Exception e) {
getLogger().error("调用招行CBS接口失败: {}", e.getMessage(), e); getLogger().error("调用招行CBS接口失败: {}", e.getMessage(), e);
flowFile = session.putAttribute(flowFile, "api.error", e.getMessage()); flowFile = session.putAttribute(flowFile, "api.error", e.getMessage());

View File

@ -1,4 +1,4 @@
package com.hzya.frame.util; // 请替换为您项目的包名 package com.hzya.frame.util;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
@ -15,8 +15,8 @@ import java.nio.charset.StandardCharsets;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
* 线程安全的API Token管理器重构版 * 线程安全的API Token管理器
* 遵循一次配置多次使用的原则 * 遵循一次配置多次使用的原则
*/ */
public class ApiTokenManager { public class ApiTokenManager {

View File

@ -16,8 +16,10 @@ import java.nio.charset.StandardCharsets;
import java.util.zip.GZIPInputStream; import java.util.zip.GZIPInputStream;
/** /**
* SM2请求加解密实例 (已改造为可配置版本) * SM2请求加解密实例
* 这个类现在通过构造函数接收所有配置不再硬编码任何值 * 这个类现在通过构造函数接收所有配置
*
* @author liuyang
*/ */
public class CbsApiHandler { public class CbsApiHandler {

View File

@ -54,21 +54,24 @@ public class ZsyhCbsProcessorTest {
@Test @Test
public void testProcessorHappyPath() throws Exception { public void testProcessorHappyPath() throws Exception {
// 配置处理器属性 // 配置处理器属性
// 让所有URL都指向我们的模拟服务器 runner.setProperty(ZsyhCbsProcessor.PROP_TOKEN_URL, "https://cbs8-openapi-reprd.csuat.cmburl.cn/openapi/app/v1/app/token");//获取token
runner.setProperty(ZsyhCbsProcessor.PROP_TOKEN_URL, ""); runner.setProperty(ZsyhCbsProcessor.PROP_REFRESH_URL, "https://cbs8-openapi-reprd.csuat.cmburl.cn/openapi/app/v1/app/refresh-token");//刷新token
runner.setProperty(ZsyhCbsProcessor.PROP_REFRESH_URL, ""); runner.setProperty(ZsyhCbsProcessor.PROP_TARGET_URL, "https://cbs8-openapi-reprd.csuat.cmburl.cn/openapi/draft2/openapi/v2/trsinfo/query-trs-info-page");//交易明细查询
runner.setProperty(ZsyhCbsProcessor.PROP_TARGET_URL, "");
// 配置其他必要的密钥和ID // 配置其他必要的密钥和ID
runner.setProperty(ZsyhCbsProcessor.PROP_APP_ID, ""); runner.setProperty(ZsyhCbsProcessor.PROP_APP_ID, "1P4AGrpz");
runner.setProperty(ZsyhCbsProcessor.PROP_APP_SECRET, ""); runner.setProperty(ZsyhCbsProcessor.PROP_APP_SECRET, "2c2369ae5dc04382844bbe3a5abf39e1bea9cd3a");
runner.setProperty(ZsyhCbsProcessor.PROP_BODY_ENCRYPTION_KEY, MOCK_PLATFORM_PUBLIC_KEY); runner.setProperty(ZsyhCbsProcessor.PROP_BODY_ENCRYPTION_KEY, MOCK_PLATFORM_PUBLIC_KEY);
runner.setProperty(ZsyhCbsProcessor.PROP_SIGN_PRIVATE_KEY, MOCK_ENTERPRISE_SIGN_PRIVATE_KEY); runner.setProperty(ZsyhCbsProcessor.PROP_SIGN_PRIVATE_KEY, MOCK_ENTERPRISE_SIGN_PRIVATE_KEY);
runner.setProperty(ZsyhCbsProcessor.PROP_BODY_DECRYPTION_KEY, MOCK_ENTERPRISE_DECRYPT_PRIVATE_KEY); runner.setProperty(ZsyhCbsProcessor.PROP_BODY_DECRYPTION_KEY, MOCK_ENTERPRISE_DECRYPT_PRIVATE_KEY);
// 3. 准备输入的FlowFile其内容就是原始的业务请求JSON // 3. 准备输入的FlowFile其内容就是原始的业务请求JSON
// String businessRequestJson = "{\"request_param\":\"some_value\"}"; String businessRequestJson = "{\n" +
// runner.enqueue(businessRequestJson); "\t\"applyDateStart\":\"2025-08-01\",\n" +
"\t\"applyDateEnd\":\"2025-08-31\",\n" +
"\t\"initAccountList\":\"591915131310106\"\n" +
"}";
runner.enqueue(businessRequestJson);
// --- 执行阶段 (Act) --- // --- 执行阶段 (Act) ---
System.out.println("--- 开始运行处理器 ---"); System.out.println("--- 开始运行处理器 ---");