diff --git a/nifi-hzyadev-bundle/hzya-nifi-Zsyh-cbs-processors/pom.xml b/nifi-hzyadev-bundle/hzya-nifi-Zsyh-cbs-processors/pom.xml index 9eb5b51..cd958ad 100644 --- a/nifi-hzyadev-bundle/hzya-nifi-Zsyh-cbs-processors/pom.xml +++ b/nifi-hzyadev-bundle/hzya-nifi-Zsyh-cbs-processors/pom.xml @@ -65,8 +65,15 @@ bcprov-jdk18on 1.78.1 + + com.fasterxml.jackson.core + jackson-databind + 2.15.2 + test + - + + @@ -93,5 +100,14 @@ commons-logging 1.2 + + + commons-codec + commons-codec + 1.15 + + + + \ No newline at end of file diff --git a/nifi-hzyadev-bundle/hzya-nifi-Zsyh-cbs-processors/src/main/java/com/hzya/frame/ZsyhCbs.java b/nifi-hzyadev-bundle/hzya-nifi-Zsyh-cbs-processors/src/main/java/com/hzya/frame/ZsyhCbs.java deleted file mode 100644 index 8c28db8..0000000 --- a/nifi-hzyadev-bundle/hzya-nifi-Zsyh-cbs-processors/src/main/java/com/hzya/frame/ZsyhCbs.java +++ /dev/null @@ -1,12 +0,0 @@ -package com.hzya.frame; - -/** - * @Author:liuyang - * @Package:com.hzya.frame - * @Project:nifi-hzyadev-bundle - * @name:ZsyhCbs - * @Date:2025/9/9 14:14 - * @Filename:ZsyhCbs - */ -public class ZsyhCbs { -} diff --git a/nifi-hzyadev-bundle/hzya-nifi-Zsyh-cbs-processors/src/main/java/com/hzya/frame/ZsyhCbsProcessor.java b/nifi-hzyadev-bundle/hzya-nifi-Zsyh-cbs-processors/src/main/java/com/hzya/frame/ZsyhCbsProcessor.java new file mode 100644 index 0000000..f15cbbd --- /dev/null +++ b/nifi-hzyadev-bundle/hzya-nifi-Zsyh-cbs-processors/src/main/java/com/hzya/frame/ZsyhCbsProcessor.java @@ -0,0 +1,150 @@ +package com.hzya.frame; + +import com.hzya.frame.util.ApiTokenManager; +import com.hzya.frame.util.Constants; +import com.hzya.frame.util.CbsApiHandler; +import org.apache.commons.io.IOUtils; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.*; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.bouncycastle.jce.provider.BouncyCastleProvider; + +import java.io.IOException; +import java.security.Security; +import java.util.*; + +@Tags({"cbs", "sm2", "api", "招商银行"}) +@CapabilityDescription("调用招商银行CBS SM2加密接口的通用处理器。它负责管理Token并调用SM2Example类来执行核心的加解密和签名逻辑。") +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +public class ZsyhCbsProcessor extends AbstractProcessor { + + // 静态代码块,在类加载时自动、安全地注册Bouncy Castle,避免环境问题 + static { + if (Security.getProvider(BouncyCastleProvider.PROVIDER_NAME) == null) { + Security.addProvider(new BouncyCastleProvider()); + } + } + + // --- 定义处理器属性 --- + // API 业务接口相关属性 + public static final PropertyDescriptor PROP_TARGET_URL = new PropertyDescriptor.Builder().name("业务接口URL (TARGET_URL)").displayName("业务接口URL").description("需要请求的最终业务接口URL。").required(true).addValidator(StandardValidators.URL_VALIDATOR).build(); + + // SM2 加解密相关属性 + public static final PropertyDescriptor PROP_BODY_ENCRYPTION_KEY = new PropertyDescriptor.Builder().name("平台公钥 (bodyEncryptionKey)").displayName("平台公钥").description("用于加密请求体的平台方公钥(Hex编码)。").required(true).sensitive(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build(); + + public static final PropertyDescriptor PROP_SIGN_PRIVATE_KEY = new PropertyDescriptor.Builder().name("企业签名私钥 (signEncryptionPrivateKey)").displayName("企业签名私钥").description("用于对请求进行签名的企业私钥(Hex编码)。").required(true).sensitive(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build(); + + public static final PropertyDescriptor PROP_BODY_DECRYPTION_KEY = new PropertyDescriptor.Builder().name("企业解密私钥 (bodyDecryptionKey)").displayName("企业解密私钥").description("用于解密响应体的企业私钥(Hex编码)。").required(true).sensitive(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build(); + + // Token 管理器相关属性 + public static final PropertyDescriptor PROP_TOKEN_URL = new PropertyDescriptor.Builder().name("Token获取URL").displayName("Token获取URL").required(true).addValidator(StandardValidators.URL_VALIDATOR).build(); + public static final PropertyDescriptor PROP_REFRESH_URL = new PropertyDescriptor.Builder().name("Token刷新URL").displayName("Token刷新URL").required(true).addValidator(StandardValidators.URL_VALIDATOR).build(); + public static final PropertyDescriptor PROP_APP_ID = new PropertyDescriptor.Builder().name("App ID").displayName("App ID").required(true).sensitive(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build(); + public static final PropertyDescriptor PROP_APP_SECRET = new PropertyDescriptor.Builder().name("App Secret").displayName("App Secret").required(true).sensitive(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build(); + + // --- 处理器关系定义 --- + public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("请求成功且响应成功处理后的FlowFile。").build(); + public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("处理过程中发生任何错误的FlowFile。").build(); + + private List descriptors; + private Set relationships; + + @Override + protected void init(final ProcessorInitializationContext context) { + final List descriptors = new ArrayList<>(); + descriptors.add(PROP_TARGET_URL); + descriptors.add(PROP_BODY_ENCRYPTION_KEY); + descriptors.add(PROP_SIGN_PRIVATE_KEY); + descriptors.add(PROP_BODY_DECRYPTION_KEY); + descriptors.add(PROP_TOKEN_URL); + descriptors.add(PROP_REFRESH_URL); + descriptors.add(PROP_APP_ID); + descriptors.add(PROP_APP_SECRET); + this.descriptors = Collections.unmodifiableList(descriptors); + + final Set relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_FAILURE); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + public Set getRelationships() { + return this.relationships; + } + + @Override + public final List getSupportedPropertyDescriptors() { + return this.descriptors; + } + + @OnScheduled + public void onScheduled(final ProcessContext context) { + // 在处理器启动或配置更新时,调用一次ApiTokenManager的configure方法进行初始化 + getLogger().info("正在为招行CBS接口配置API Token管理器..."); + ApiTokenManager.getInstance().configure(context.getProperty(PROP_TOKEN_URL).getValue(), context.getProperty(PROP_REFRESH_URL).getValue(), context.getProperty(PROP_APP_ID).getValue(), context.getProperty(PROP_APP_SECRET).getValue(), getLogger()); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + CloseableHttpClient client = HttpClients.custom().disableContentCompression().build(); + try { + // 1. 读取FlowFile内容,作为业务请求体 + byte[] requestDataBytes = IOUtils.toByteArray(session.read(flowFile)); + + // 2. 从处理器属性中获取所有动态配置 + String targetUrl = context.getProperty(PROP_TARGET_URL).getValue(); + String bodyEncryptionKey = context.getProperty(PROP_BODY_ENCRYPTION_KEY).getValue(); + String signPrivateKey = context.getProperty(PROP_SIGN_PRIVATE_KEY).getValue(); + String bodyDecryptionKey = context.getProperty(PROP_BODY_DECRYPTION_KEY).getValue(); + + // 3. 从Token管理器获取有效Token + String bearerToken = ApiTokenManager.getInstance().getValidToken(); + String token = bearerToken.startsWith(Constants.BEARER) ? bearerToken.substring(Constants.BEARER.length()) : bearerToken; + + // 4. 实例化您改造后的业务类,并注入所有配置 + CbsApiHandler sm2Example = new CbsApiHandler(targetUrl, token, bodyEncryptionKey, signPrivateKey, bodyDecryptionKey, requestDataBytes); + + // 5. 调用业务类的方法,完成请求准备、发送和响应处理 (逻辑来自您的测试类) + HttpPost httpPost = sm2Example.setupRequest(); + try (CloseableHttpResponse response = client.execute(httpPost)) { + + if (response.getStatusLine().getStatusCode() >= 300) { + throw new IOException("HTTP请求失败,状态码: " + response.getStatusLine().getStatusCode()); + } + + byte[] finalResponseData = sm2Example.handleResponse(response); + + // 6. 将最终解密后的结果写入FlowFile并路由到success + flowFile = session.write(flowFile, out -> out.write(finalResponseData)); + session.transfer(flowFile, REL_SUCCESS); + } + + } catch (Exception e) { + getLogger().error("调用招行CBS接口失败: {}", e.getMessage(), e); + flowFile = session.putAttribute(flowFile, "api.error", e.getMessage()); + session.transfer(session.penalize(flowFile), REL_FAILURE); + } finally { + try { + client.close(); + } catch (IOException e) { + getLogger().warn("关闭HttpClient时出错: {}", e.getMessage(), e); + } + } + } +} \ No newline at end of file diff --git a/nifi-hzyadev-bundle/hzya-nifi-Zsyh-cbs-processors/src/main/java/com/hzya/frame/util/ApiTokenManager.java b/nifi-hzyadev-bundle/hzya-nifi-Zsyh-cbs-processors/src/main/java/com/hzya/frame/util/ApiTokenManager.java new file mode 100644 index 0000000..3a435b7 --- /dev/null +++ b/nifi-hzyadev-bundle/hzya-nifi-Zsyh-cbs-processors/src/main/java/com/hzya/frame/util/ApiTokenManager.java @@ -0,0 +1,186 @@ +package com.hzya.frame.util; // 请替换为您项目的包名 + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.nifi.logging.ComponentLog; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.net.HttpURLConnection; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.util.stream.Collectors; + +/** + * 线程安全的API Token管理器(重构版)。 + * 遵循“一次配置,多次使用”的原则。 + */ +public class ApiTokenManager { + + // ------------------- 单例实现 ------------------- + private static volatile ApiTokenManager instance; + + private ApiTokenManager() { + } + + public static ApiTokenManager getInstance() { + if (instance == null) { + synchronized (ApiTokenManager.class) { + if (instance == null) { + instance = new ApiTokenManager(); + } + } + } + return instance; + } + + // ------------------- 配置属性 ------------------- + private String tokenUrl; + private String refreshUrl; + private String appId; + private String appSecret; + private ComponentLog logger; + + private String cachedToken; + private long expiryTimestamp; + private volatile boolean isConfigured = false; // 配置状态标志 + + private static final long REFRESH_BUFFER_MS = 10 * 60 * 1000L; // 10分钟容差 + private final ObjectMapper objectMapper = new ObjectMapper(); + + /** + * 【新增】配置Token管理器,且只允许配置一次。 + * 建议在应用启动时(如NiFi处理器的@OnScheduled方法中)调用。 + * + * @param tokenUrl 获取Token的URL + * @param refreshUrl 刷新Token的URL + * @param appId 应用ID + * @param appSecret 应用密钥 + * @param logger 用于记录日志的NiFi ComponentLog + */ + public synchronized void configure(String tokenUrl, String refreshUrl, String appId, String appSecret, ComponentLog logger) { + if (isConfigured) { + logger.warn("Token管理器已被配置,本次配置请求将被忽略。"); + return; + } + this.tokenUrl = tokenUrl; + this.refreshUrl = refreshUrl; + this.appId = appId; + this.appSecret = appSecret; + this.logger = logger; + this.isConfigured = true; + logger.info("Token管理器配置成功。"); + } + + /** + * 【简化】对外提供的唯一公共方法,不再需要传递参数。 + * + * @return 有效的Bearer Token字符串,格式为 "Bearer " + * @throws Exception 如果获取或刷新失败 + */ + public synchronized String getValidToken() throws Exception { + if (!isConfigured) { + throw new IllegalStateException("Token管理器尚未配置,请先调用configure方法进行初始化。"); + } + + long now = System.currentTimeMillis(); + + if (cachedToken == null) { + logger.info("Token为空,首次获取新Token..."); + fetchNewToken(); + } else if (now >= expiryTimestamp) { + logger.info("Token已过期或即将过期(在10分钟容差内),正在刷新Token..."); + refreshToken(); + } else { + logger.debug("从缓存中获取有效Token。"); + } + + return "Bearer " + this.cachedToken; + } + + // 内部方法现在直接使用类成员变量,不再需要传递参数 + private void fetchNewToken() throws Exception { + HttpURLConnection connection = null; + try { + URL url = new URL(this.tokenUrl); + connection = (HttpURLConnection) url.openConnection(); + connection.setRequestMethod("POST"); + connection.setRequestProperty("Content-Type", "application/json; charset=utf-8"); + connection.setDoOutput(true); + + String requestBody = String.format("{\"app_id\":\"%s\",\"app_secret\":\"%s\",\"grant_type\":\"client_credentials\"}", this.appId, this.appSecret); + + try (OutputStream os = connection.getOutputStream()) { + os.write(requestBody.getBytes(StandardCharsets.UTF_8)); + } + + int responseCode = connection.getResponseCode(); + if (responseCode == HttpURLConnection.HTTP_OK) { + String responseBody = readResponse(connection.getInputStream()); + processTokenResponse(responseBody); + } else { + String errorBody = readResponse(connection.getErrorStream()); + throw new RuntimeException("获取Token失败,HTTP状态码: " + responseCode + ", 响应: " + errorBody); + } + } finally { + if (connection != null) { + connection.disconnect(); + } + } + } + + private void refreshToken() throws Exception { + if (this.cachedToken == null) { + throw new IllegalStateException("无法刷新Token,因为当前没有有效的Token。"); + } + HttpURLConnection connection = null; + try { + URL url = new URL(this.refreshUrl); + connection = (HttpURLConnection) url.openConnection(); + connection.setRequestMethod("GET"); + connection.setRequestProperty("Authorization", "Bearer " + this.cachedToken); + + int responseCode = connection.getResponseCode(); + if (responseCode == HttpURLConnection.HTTP_OK) { + String responseBody = readResponse(connection.getInputStream()); + processTokenResponse(responseBody); + } else { + String errorBody = readResponse(connection.getErrorStream()); + this.cachedToken = null; + this.expiryTimestamp = 0; + throw new RuntimeException("刷新Token失败,HTTP状态码: " + responseCode + ", 响应: " + errorBody); + } + } finally { + if (connection != null) { + connection.disconnect(); + } + } + } + + private void processTokenResponse(String responseBody) throws IOException { + JsonNode responseJson = objectMapper.readTree(responseBody); + String code = responseJson.path("code").asText(); + if (!"0".equals(code)) { + String msg = responseJson.path("msg").asText("未知错误"); + throw new RuntimeException("获取Token API返回业务错误: code=" + code + ", msg=" + msg); + } + JsonNode dataNode = responseJson.path("data"); + this.cachedToken = dataNode.path("token").asText(); + long expiresInSeconds = dataNode.path("expires").asLong(); + if (this.cachedToken == null || this.cachedToken.isEmpty() || expiresInSeconds <= 0) { + throw new RuntimeException("从API返回的JSON中未能获取到有效的Token或expires。响应: " + responseBody); + } + this.expiryTimestamp = System.currentTimeMillis() + (expiresInSeconds * 1000L) - REFRESH_BUFFER_MS; + logger.info("成功获取或刷新Token,Token将在 {} 秒后进行下一次刷新。", (this.expiryTimestamp - System.currentTimeMillis()) / 1000); + } + + private String readResponse(InputStream inputStream) throws IOException { + if (inputStream == null) return "No response body."; + try (BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) { + return reader.lines().collect(Collectors.joining("\n")); + } + } +} \ No newline at end of file diff --git a/nifi-hzyadev-bundle/hzya-nifi-Zsyh-cbs-processors/src/main/java/com/hzya/frame/util/CbsApiHandler.java b/nifi-hzyadev-bundle/hzya-nifi-Zsyh-cbs-processors/src/main/java/com/hzya/frame/util/CbsApiHandler.java new file mode 100644 index 0000000..04012e2 --- /dev/null +++ b/nifi-hzyadev-bundle/hzya-nifi-Zsyh-cbs-processors/src/main/java/com/hzya/frame/util/CbsApiHandler.java @@ -0,0 +1,109 @@ +package com.hzya.frame.util; + +import org.apache.commons.codec.binary.Base64; +import org.apache.commons.io.IOUtils; +import org.apache.http.Header; +import org.apache.http.HttpMessage; +import org.apache.http.HttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.ByteArrayEntity; +import org.apache.http.protocol.HTTP; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.zip.GZIPInputStream; + +/** + * SM2请求加解密实例 (已改造为可配置版本) + * 这个类现在通过构造函数接收所有配置,不再硬编码任何值。 + */ +public class CbsApiHandler { + + private final String bodyEncryptionKey; + private final String signEncryptionPrivateKey; + private final String bodyDecryptionKey; + private final String token; + private final String targetUrl; + private final byte[] requestDataBytes; + + /** + * @param targetUrl 业务接口URL + * @param token 认证Token (不含"Bearer ") + * @param bodyEncryptionKey 平台公钥 + * @param signEncryptionPrivateKey 企业签名私钥 + * @param bodyDecryptionKey 企业解密私钥 + * @param requestDataBytes 请求体字节数组 (来自FlowFile) + */ + public CbsApiHandler(String targetUrl, String token, String bodyEncryptionKey, String signEncryptionPrivateKey, String bodyDecryptionKey, byte[] requestDataBytes) { + this.targetUrl = targetUrl; + this.token = token; + this.bodyEncryptionKey = bodyEncryptionKey; + this.signEncryptionPrivateKey = signEncryptionPrivateKey; + this.bodyDecryptionKey = bodyDecryptionKey; + this.requestDataBytes = requestDataBytes; + } + + /** + * 根据实例配置生成请求报文 + */ + public HttpPost setupRequest() { + long timestamp = System.currentTimeMillis(); + + // 请求数据拼接:报文体+时间戳 + byte[] timestampBytes = ("×tamp=" + timestamp).getBytes(StandardCharsets.UTF_8); + byte[] newBytes = new byte[requestDataBytes.length + timestampBytes.length]; + System.arraycopy(requestDataBytes, 0, newBytes, 0, requestDataBytes.length); + System.arraycopy(timestampBytes, 0, newBytes, requestDataBytes.length, timestampBytes.length); + + // 生成签名 + byte[] signature = SM2Util.sign(this.signEncryptionPrivateKey, newBytes); + String sign = Base64.encodeBase64String(SM2Util.encodeDERSignature(signature)); + + // 设置请求URL和请求头 + HttpPost httpPost = new HttpPost(this.targetUrl); + httpPost.setHeader(Constants.SIGN_HEADER_NAME, sign); + httpPost.setHeader(Constants.TIMESTAMP_HEADER, Long.toString(timestamp)); + httpPost.setHeader(HTTP.CONTENT_TYPE, Constants.TARGET_CONTENT_TYPE); + httpPost.setHeader(Constants.AUTHORIZATION, Constants.BEARER + this.token); + + // 报文体加密并设置 + byte[] encryptedData = SM2Util.encrypt(this.bodyEncryptionKey, requestDataBytes); + httpPost.setEntity(new ByteArrayEntity(encryptedData)); + + return httpPost; + } + + /** + * 根据实例配置处理响应报文 + */ + public byte[] handleResponse(HttpResponse response) throws Exception { + InputStream content = response.getEntity().getContent(); + byte[] responseData = IOUtils.toByteArray(content); + + if (responseData == null || responseData.length == 0) { + return new byte[0]; + } + + if (getHeader(response, Constants.ENCRYPTION_ENABLED_HEADER_NAME)) { + responseData = SM2Util.decrypt(this.bodyDecryptionKey, responseData); + } + + if (getHeader(response, Constants.X_MBCLOUD_COMPRESS)) { + responseData = decompress(responseData); + } + return responseData; + } + + private boolean getHeader(HttpMessage message, String name) { + Header header = message.getFirstHeader(name); + return header != null; + } + + private byte[] decompress(byte[] data) throws IOException { + ByteArrayInputStream input = new ByteArrayInputStream(data); + GZIPInputStream gzipInput = new GZIPInputStream(input); + return IOUtils.toByteArray(gzipInput); + } +} \ No newline at end of file diff --git a/nifi-hzyadev-bundle/hzya-nifi-Zsyh-cbs-processors/src/main/java/com/hzya/frame/util/SM2Example.java b/nifi-hzyadev-bundle/hzya-nifi-Zsyh-cbs-processors/src/main/java/com/hzya/frame/util/SM2Example.java deleted file mode 100644 index e984d60..0000000 --- a/nifi-hzyadev-bundle/hzya-nifi-Zsyh-cbs-processors/src/main/java/com/hzya/frame/util/SM2Example.java +++ /dev/null @@ -1,161 +0,0 @@ -package com.hzya.frame.util; - -//import com.cmbyc.security.util.SM2Util; - -import org.apache.commons.codec.binary.Base64; -import org.apache.commons.io.IOUtils; -import org.apache.http.Header; -import org.apache.http.HttpMessage; -import org.apache.http.HttpResponse; -import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.client.methods.HttpPost; -import org.apache.http.entity.ByteArrayEntity; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.HttpClients; -import org.apache.http.protocol.HTTP; -//import org.slf4j.LoggerFactory; -//import org.slf4j.Logger; - -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.nio.charset.StandardCharsets; -import java.util.zip.GZIPInputStream; - -/** - * @author: KeXue - * @time: 2022/8/25 - * @description: SM2请求加解密实例 - */ -public class SM2Example { - - /** - * 财资管理云公钥(平台公钥) - */ - static final String bodyEncryptionKey = "04D0A5CDD879058F1D4DEFFFCE7F902402026B1C39F03FF426E6C0D43F0EBA5F8617D5ABC7501EA5E53038D93CD33036CEFF2F5A64D226144AB3D6D73CE901C3B8"; - - /** - * 企业私钥(加密) - */ - static final String signEncryptionPrivateKey = "D735102B815123AFB3B0779928FA8264B050BEB43071DEBD88BB115B28E4661C"; - - /** - * 企业私钥(解密) - */ - static final String bodyDecryptionKey = "D735102B815123AFB3B0779928FA8264B050BEB43071DEBD88BB115B28E4661C"; - - /** - * 根据appid和appsecert获取的token - */ - static final String token = "3152124a-2705-4cc0-b451-117047ef4d8d"; - - /** - * 接口路径 - */ -// static final String TARGET_URL = "http://cbs8-gateway-openapi-dev.paas.cmbchina.cn/openapi/account/accounts-current-balance/erp/query"; - static final String TARGET_URL = "https://cbs8-openapi-reprd.csuat.cmburl.cn/openapi/draft2/openapi/v2/trsinfo/query-trs-info-page"; - - /** - * 请求体数据 - */ -// static final String requestData = "\n" + "{\"accountNo\":\"\"}"; - static final String requestData = "{\n" + - "\t\"applyDateStart\":\"2025-08-01\",\n" + - "\t\"applyDateEnd\":\"2025-08-31\",\n" + - "\t\"initAccountList\":\"591915131310106\"\n" + - "}"; - -// private static final Logger log = LoggerFactory.getLogger(SM2Example.class); - -// public static void main(String[] args) throws Exception { -// CloseableHttpClient client = HttpClients.custom() -// // 禁止HttpClient自动解压缩 -// .disableContentCompression().build(); -// -// HttpPost httpPost = setupRequest(); -// try (CloseableHttpResponse response = client.execute(httpPost)) { -// byte[] finalResponseData = handleResponse(response); -//// log.info("\n返回结果:{}", new String(finalResponseData)); -// } catch (IOException ignored) { -// throw new IOException("网络连接失败或超时!"); -// } finally { -// client.close(); -// } -// } - - /** - * 生成请求报文 - */ - public static HttpPost setupRequest() { - long timestamp = System.currentTimeMillis(); - - // 请求数据拼接: 报文体+时间戳 - byte[] requestDataBytes = requestData.getBytes(StandardCharsets.UTF_8); - byte[] timestampBytes = ("×tamp=" + timestamp).getBytes(StandardCharsets.UTF_8); - byte[] newBytes = new byte[requestDataBytes.length + timestampBytes.length]; - System.arraycopy(requestDataBytes, 0, newBytes, 0, requestDataBytes.length); - System.arraycopy(timestampBytes, 0, newBytes, requestDataBytes.length, timestampBytes.length); - - // 生成签名 - byte[] signature = SM2Util.sign(signEncryptionPrivateKey, newBytes); - String sign = Base64.encodeBase64String(SM2Util.encodeDERSignature(signature)); -// log.info("签名:{}", sign); - - // 设置请求URL - HttpPost httpPost = new HttpPost(TARGET_URL); - // 请求头设置签名 - httpPost.setHeader(Constants.SIGN_HEADER_NAME, sign); - // 请求头设置时间戳 - httpPost.setHeader(Constants.TIMESTAMP_HEADER, Long.toString(timestamp)); - // 请求头设置请求参数格式,请根据实际情况改写 - httpPost.setHeader(HTTP.CONTENT_TYPE, Constants.TARGET_CONTENT_TYPE); - // 请求头设置TOKEN - httpPost.setHeader(Constants.AUTHORIZATION, Constants.BEARER + token); - - // 报文体加密 - byte[] encryptedData = SM2Util.encrypt(bodyEncryptionKey, requestDataBytes); - // 设置请求体 - httpPost.setEntity(new ByteArrayEntity(encryptedData)); - - return httpPost; - } - - /** - * 处理响应报文 - */ - public static byte[] handleResponse(HttpResponse response) throws Exception { - InputStream content = response.getEntity().getContent(); - byte[] responseData = IOUtils.toByteArray(content); - - if (responseData == null || responseData.length == 0) { - return responseData == null ? new byte[0] : responseData; - } - - // 步骤1 原始响应报文解密 如果服务网关获取加解密密钥失败,则无法解密请求报文,且无法加密响应报文。 这时候,网关会直接返回错误信息,响应报文是未加密状态。 - Boolean encryptionEnable = getHeader(response, Constants.ENCRYPTION_ENABLED_HEADER_NAME); - - if (Boolean.TRUE.equals(encryptionEnable)) { - responseData = SM2Util.decrypt(bodyDecryptionKey, responseData); - } - - Boolean xMbcloudCompress = getHeader(response, Constants.X_MBCLOUD_COMPRESS); - if (Boolean.TRUE.equals(xMbcloudCompress)) { - responseData = decompress(responseData); - } - - return responseData; - - - } - - private static Boolean getHeader(HttpMessage message, String name) { - Header header = message.getFirstHeader(name); - return header != null; - } - - public static byte[] decompress(byte[] data) throws IOException { - ByteArrayInputStream input = new ByteArrayInputStream(data); - GZIPInputStream gzipInput = new GZIPInputStream(input); - return IOUtils.toByteArray(gzipInput); - } -} diff --git a/nifi-hzyadev-bundle/hzya-nifi-Zsyh-cbs-processors/src/main/java/com/hzya/frame/util/SM2Util.java b/nifi-hzyadev-bundle/hzya-nifi-Zsyh-cbs-processors/src/main/java/com/hzya/frame/util/SM2Util.java index 8c46344..0899a10 100644 --- a/nifi-hzyadev-bundle/hzya-nifi-Zsyh-cbs-processors/src/main/java/com/hzya/frame/util/SM2Util.java +++ b/nifi-hzyadev-bundle/hzya-nifi-Zsyh-cbs-processors/src/main/java/com/hzya/frame/util/SM2Util.java @@ -13,6 +13,8 @@ import org.bouncycastle.jce.spec.ECParameterSpec; import org.bouncycastle.math.ec.ECCurve; import org.bouncycastle.math.ec.ECPoint; import org.bouncycastle.util.encoders.Hex; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.ByteArrayInputStream; import java.math.BigInteger; @@ -27,6 +29,8 @@ import java.util.Enumeration; */ public class SM2Util { + private static final Logger log = LoggerFactory.getLogger(SM2Util.class); + private SM2Util() { throw new IllegalStateException("Utility class"); } @@ -61,7 +65,7 @@ public class SM2Util { byte[] cipherText = engine.processBlock(data, 0, data.length); bytes = C1C2C3ToC1C3C2(cipherText); } catch (Exception e) { -// log.warn("SM2加密时出现异常:" + e.getMessage()); + log.warn("SM2加密时出现异常:" + e.getMessage()); } return bytes; } @@ -83,7 +87,7 @@ public class SM2Util { cipherData = C1C3C2ToC1C2C3(cipherData); bytes = engine.processBlock(cipherData, 0, cipherData.length); } catch (Exception e) { -// log.warn("SM2解密时出现异常:" + e.getMessage()); + log.warn("SM2解密时出现异常:" + e.getMessage()); } return bytes; } @@ -106,7 +110,7 @@ public class SM2Util { try { signature = decodeDERSignature(signer.generateSignature()); } catch (Exception e) { -// log.warn("SM2签名时出现异常:" + e.getMessage()); + log.warn("SM2签名时出现异常:" + e.getMessage()); } return signature; } @@ -189,7 +193,7 @@ public class SM2Util { System.arraycopy(r, 0, bytes, 0, 32); System.arraycopy(s, 0, bytes, 32, 32); } catch (Exception e) { -// log.warn("decodeDERSignature时出现异常:" + e.getMessage()); + log.warn("decodeDERSignature时出现异常:" + e.getMessage()); } return bytes; } @@ -207,7 +211,7 @@ public class SM2Util { try { encoded = (new DERSequence(vector)).getEncoded(); } catch (Exception e) { -// log.warn("encodeDERSignature时出现异常:" + e.getMessage()); + log.warn("encodeDERSignature时出现异常:" + e.getMessage()); } return encoded; } diff --git a/nifi-hzyadev-bundle/hzya-nifi-Zsyh-cbs-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-hzyadev-bundle/hzya-nifi-Zsyh-cbs-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 22938e5..39beb7c 100644 --- a/nifi-hzyadev-bundle/hzya-nifi-Zsyh-cbs-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-hzyadev-bundle/hzya-nifi-Zsyh-cbs-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -12,4 +12,4 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -com.hzya.frame.ZsyhCbs \ No newline at end of file +com.hzya.frame.ZsyhCbsProcessor \ No newline at end of file diff --git a/nifi-hzyadev-bundle/hzya-nifi-Zsyh-cbs-processors/src/test/java/com/hzya/frame/ZsyhCbsProcessorTest.java b/nifi-hzyadev-bundle/hzya-nifi-Zsyh-cbs-processors/src/test/java/com/hzya/frame/ZsyhCbsProcessorTest.java new file mode 100644 index 0000000..5115945 --- /dev/null +++ b/nifi-hzyadev-bundle/hzya-nifi-Zsyh-cbs-processors/src/test/java/com/hzya/frame/ZsyhCbsProcessorTest.java @@ -0,0 +1,87 @@ +package com.hzya.frame; + +import com.hzya.frame.util.SM2Util; +import okhttp3.mockwebserver.MockResponse; +import okhttp3.mockwebserver.MockWebServer; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.bouncycastle.jce.provider.BouncyCastleProvider; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.security.Security; +import java.util.List; + +/** + * ZsyhCbsProcessor 的Mock测试类 + */ +public class ZsyhCbsProcessorTest { + + private TestRunner runner; + private MockWebServer mockWebServer; + + // 定义用于测试的密钥和配置(请使用有效的Hex编码密钥对用于测试) + // 注意:这里的密钥仅用于测试,实际密钥应通过NiFi界面配置 + private static final String MOCK_PLATFORM_PUBLIC_KEY = "04D0A5CDD879058F1D4DEFFFCE7F902402026B1C39F03FF426E6C0D43F0EBA5F8617D5ABC7501EA5E53038D93CD33036CEFF2F5A64D226144AB3D6D73CE901C3B8";//财资管理云公钥(平台公钥) + private static final String MOCK_ENTERPRISE_SIGN_PRIVATE_KEY = "D735102B815123AFB3B0779928FA8264B050BEB43071DEBD88BB115B28E4661C";//企业私钥(加密) + private static final String MOCK_ENTERPRISE_DECRYPT_PRIVATE_KEY = "D735102B815123AFB3B0779928FA8264B050BEB43071DEBD88BB115B28E4661C";//企业私钥(解密) + + + @Before + public void setup() throws IOException { + // 为了确保测试环境稳定,手动注册Bouncy Castle Provider + if (Security.getProvider(BouncyCastleProvider.PROVIDER_NAME) == null) { + Security.addProvider(new BouncyCastleProvider()); + } + + // 初始化 MockWebServer,它会模拟所有外部HTTP接口 + mockWebServer = new MockWebServer(); + mockWebServer.start(); + + // 初始化 NiFi 的 TestRunner + runner = TestRunners.newTestRunner(ZsyhCbsProcessor.class); + } + + @After + public void teardown() throws IOException { + mockWebServer.shutdown(); + } + + @Test + public void testProcessorHappyPath() throws Exception { + // 配置处理器属性 + // 让所有URL都指向我们的模拟服务器 + runner.setProperty(ZsyhCbsProcessor.PROP_TOKEN_URL, ""); + runner.setProperty(ZsyhCbsProcessor.PROP_REFRESH_URL, ""); + runner.setProperty(ZsyhCbsProcessor.PROP_TARGET_URL, ""); + + // 配置其他必要的密钥和ID + runner.setProperty(ZsyhCbsProcessor.PROP_APP_ID, ""); + runner.setProperty(ZsyhCbsProcessor.PROP_APP_SECRET, ""); + runner.setProperty(ZsyhCbsProcessor.PROP_BODY_ENCRYPTION_KEY, MOCK_PLATFORM_PUBLIC_KEY); + runner.setProperty(ZsyhCbsProcessor.PROP_SIGN_PRIVATE_KEY, MOCK_ENTERPRISE_SIGN_PRIVATE_KEY); + runner.setProperty(ZsyhCbsProcessor.PROP_BODY_DECRYPTION_KEY, MOCK_ENTERPRISE_DECRYPT_PRIVATE_KEY); + + // 3. 准备输入的FlowFile,其内容就是原始的业务请求JSON +// String businessRequestJson = "{\"request_param\":\"some_value\"}"; +// runner.enqueue(businessRequestJson); + + // --- 执行阶段 (Act) --- + System.out.println("--- 开始运行处理器 ---"); + runner.run(); + System.out.println("--- 处理器运行结束 ---"); + + // --- 观察结果 --- + System.out.println("\n--- 运行结果分析 ---"); + + // 检查FlowFile的路由情况 + int successCount = runner.getFlowFilesForRelationship(ZsyhCbsProcessor.REL_SUCCESS).size(); + int failureCount = runner.getFlowFilesForRelationship(ZsyhCbsProcessor.REL_FAILURE).size(); + System.out.println("路由到 'success' 关系的FlowFile数量: " + successCount); + System.out.println("路由到 'failure' 关系的FlowFile数量: " + failureCount); + } +} \ No newline at end of file diff --git a/nifi-hzyadev-bundle/hzya-nifi-Zsyh-cbs-processors/src/test/java/com/hzya/frame/util/BCTest.java b/nifi-hzyadev-bundle/hzya-nifi-Zsyh-cbs-processors/src/test/java/com/hzya/frame/util/BCTest.java index 84a9047..4c5835a 100644 --- a/nifi-hzyadev-bundle/hzya-nifi-Zsyh-cbs-processors/src/test/java/com/hzya/frame/util/BCTest.java +++ b/nifi-hzyadev-bundle/hzya-nifi-Zsyh-cbs-processors/src/test/java/com/hzya/frame/util/BCTest.java @@ -2,6 +2,7 @@ package com.hzya.frame.util; //import org.bouncycastle.asn1.gm.GMNamedCurves; //import org.bouncycastle.asn1.x9.X9ECParameters; + import org.bouncycastle.asn1.gm.GMNamedCurves; import org.bouncycastle.asn1.x9.X9ECParameters; import org.bouncycastle.jce.ECNamedCurveTable; @@ -33,16 +34,16 @@ public class BCTest { // } // 2. 尝试获取曲线参数 - String curveName = "sm2p256v1"; - System.out.println("正在尝试获取曲线 '" + curveName + "' 的参数..."); - ECParameterSpec spec = ECNamedCurveTable.getParameterSpec(curveName); - - // 3. 检查结果 - if (spec != null) { - System.out.println("✅ 成功获取到曲线 '" + curveName + "' 的参数!spec对象不是null。"); - } else { - System.err.println("❌ 获取曲线 '" + curveName + "' 的参数失败!spec对象为null。"); - } +// String curveName = "sm2p256v1"; +// System.out.println("正在尝试获取曲线 '" + curveName + "' 的参数..."); +// ECParameterSpec spec = ECNamedCurveTable.getParameterSpec(curveName); +// +// // 3. 检查结果 +// if (spec != null) { +// System.out.println("✅ 成功获取到曲线 '" + curveName + "' 的参数!spec对象不是null。"); +// } else { +// System.err.println("❌ 获取曲线 '" + curveName + "' 的参数失败!spec对象为null。"); +// } // X9ECParameters params = GMNamedCurves.getByName(curveName); // System.out.println(params); diff --git a/nifi-hzyadev-bundle/hzya-nifi-Zsyh-cbs-processors/src/test/java/com/hzya/frame/util/SM2ExampleTest.java b/nifi-hzyadev-bundle/hzya-nifi-Zsyh-cbs-processors/src/test/java/com/hzya/frame/util/SM2ExampleTest.java index 3c816cd..8f288aa 100644 --- a/nifi-hzyadev-bundle/hzya-nifi-Zsyh-cbs-processors/src/test/java/com/hzya/frame/util/SM2ExampleTest.java +++ b/nifi-hzyadev-bundle/hzya-nifi-Zsyh-cbs-processors/src/test/java/com/hzya/frame/util/SM2ExampleTest.java @@ -1,28 +1,28 @@ -//package com.hzya.frame.util; -// -//import org.apache.http.client.methods.CloseableHttpResponse; -//import org.apache.http.client.methods.HttpPost; -//import org.apache.http.impl.client.CloseableHttpClient; -//import org.apache.http.impl.client.HttpClients; -//import org.junit.Test; -// -//import java.io.IOException; -// -//import static org.junit.Assert.*; -// -///** -// * @Author:liuyang -// * @Package:com.hzya.frame.util -// * @Project:nifi-hzyadev-bundle -// * @name:SM2ExampleTest -// * @Date:2025/9/9 15:14 -// * @Filename:SM2ExampleTest -// */ -//public class SM2ExampleTest { -// -// @Test -// public void test1() { -// try { +package com.hzya.frame.util; + +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.junit.Test; + +import java.io.IOException; + +import static org.junit.Assert.*; + +/** + * @Author:liuyang + * @Package:com.hzya.frame.util + * @Project:nifi-hzyadev-bundle + * @name:SM2ExampleTest + * @Date:2025/9/9 15:14 + * @Filename:SM2ExampleTest + */ +public class SM2ExampleTest { + + @Test + public void test1() { + try { // SM2Example sm2Example = new SM2Example(); // // CloseableHttpClient client = HttpClients.custom().disableContentCompression().build(); @@ -36,8 +36,8 @@ // } finally { // client.close(); // } -// } catch (Exception e) { -// e.printStackTrace(); -// } -// } -//} \ No newline at end of file + } catch (Exception e) { + e.printStackTrace(); + } + } +} \ No newline at end of file