feat(nifi): 添加浙江农商行财资宝通用接口处理器- 新增 ZJRCGenericApiProcessor 处理器,用于调用浙江农商行财资宝接口

- 实现了分页查询逻辑,能够自动处理多页数据
- 添加了连接超时和读取超时的配置选项
- 优化了错误处理,支持特定错误码的智能判断- 增加了单元测试用例,确保处理器功能的正确性
This commit is contained in:
liuy 2025-09-08 16:07:58 +08:00
parent 66b6dbbf4a
commit 0e2554bf2e
6 changed files with 458 additions and 0 deletions

View File

@ -0,0 +1,34 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>nifi-hzyadev-bundle</artifactId>
<groupId>com.hzya</groupId>
<version>1.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>hzya-nifi-Zjnx-czb-nar</artifactId>
<packaging>nar</packaging>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>com.hzya</groupId>
<artifactId>hzya-nifi-Zjnx-czb-processors</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-services-api-nar</artifactId>
<version>${nifi-revision}</version>
<type>nar</type>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,54 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>nifi-hzyadev-bundle</artifactId>
<groupId>com.hzya</groupId>
<version>1.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>hzya-nifi-Zjnx-czb-processors</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
<version>${nifi-revision}</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-dbcp-service-api</artifactId>
<version>${nifi-revision}</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId>
<version>1.15.3</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<version>${nifi-revision}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>mockwebserver</artifactId>
<version>4.9.3</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,224 @@
package com.hzya.frame;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.*;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import java.io.*;
import java.net.HttpURLConnection;
import java.net.URL;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@Tags({"zjrc", "bank", "api", "财资宝", "农商行", "pagination"})
@CapabilityDescription("调用浙江农商联合银行财资宝的通用接口处理器,并自动处理分页查询。通过配置'交易码'属性来指定调用的具体接口。处理器接收一个包含reqBody的JSON作为输入FlowFile。")
@SeeAlso({})
@ReadsAttributes({@ReadsAttribute(attribute = "", description = "")})
@WritesAttributes({@WritesAttribute(attribute = "zjrc.api.error.message", description = "当API调用失败或银行返回错误时此处会包含错误信息。"), @WritesAttribute(attribute = "zjrc.api.retCode", description = "银行返回的业务代码,例如 '0000'。"), @WritesAttribute(attribute = "zjrc.api.total.pages.fetched", description = "成功完成分页查询后,记录总共获取了多少页的数据。")})
public class ZJRCGenericApiProcessor extends AbstractProcessor {
// --- 新增超时属性 ---
public static final PropertyDescriptor PROP_CONNECT_TIMEOUT = new PropertyDescriptor.Builder().name("连接超时时间").displayName("连接超时时间").description("建立HTTP连接的超时时间。").defaultValue("10 s").required(true).addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).build();
public static final PropertyDescriptor PROP_READ_TIMEOUT = new PropertyDescriptor.Builder().name("读取超时时间").displayName("读取超时时间").description("等待HTTP响应的超时时间。").defaultValue("30 s").required(true).addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).build();
// --- 已有属性保持不变 ---
public static final PropertyDescriptor PROP_TRAN_CODE = new PropertyDescriptor.Builder().name("交易码 (tranCode)").displayName("交易码 (tranCode)").description("需要调用的接口交易码,例如 300004 (账户交易明细查询)。").required(true).defaultValue("300004").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
public static final PropertyDescriptor PROP_CLIENT_URL = new PropertyDescriptor.Builder().name("财资宝前置客户端URL").displayName("财资宝前置客户端URL").description("财资宝前置客户端的URL地址不包含任何查询参数例如http://115.231.255.202:8433/ERPClient/receive").required(true).addValidator(StandardValidators.URL_VALIDATOR).build();
public static final PropertyDescriptor PROP_CUST_NO = new PropertyDescriptor.Builder().name("企业客户号 (custNo)").displayName("企业客户号 (custNo)").description("银行分配的企业网银客户号。").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
public static final PropertyDescriptor PROP_USER_ID = new PropertyDescriptor.Builder().name("用户号 (userId)").displayName("用户号 (userId)").description("虚拟用户ID通常以前缀'Y'开头。").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("API调用成功且所有分页数据成功获取后包含合并结果的FlowFile将被路由到此。").build();
public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("API调用失败或银行返回非'0000'错误码的FlowFile将被路由到此。").build();
private List<PropertyDescriptor> descriptors;
private Set<Relationship> relationships;
private final ObjectMapper objectMapper = new ObjectMapper();
// --- 核心修改点查询结束的错误码定义为常量 ---
private static final String END_OF_PAGES_RET_CODE = "CTM0000006";
@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(PROP_CLIENT_URL);
descriptors.add(PROP_CUST_NO);
descriptors.add(PROP_USER_ID);
descriptors.add(PROP_TRAN_CODE);
descriptors.add(PROP_CONNECT_TIMEOUT);
descriptors.add(PROP_READ_TIMEOUT);
this.descriptors = Collections.unmodifiableList(descriptors);
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
relationships.add(REL_FAILURE);
this.relationships = Collections.unmodifiableSet(relationships);
}
@Override
public Set<Relationship> getRelationships() {
return this.relationships;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
try {
final StringBuilder reqBodyBuilder = new StringBuilder();
try (InputStream in = session.read(flowFile)) {
reqBodyBuilder.append(new BufferedReader(new InputStreamReader(in)).lines().collect(Collectors.joining("\n")));
}
ObjectNode reqBodyTemplate = (ObjectNode) objectMapper.readTree(reqBodyBuilder.toString());
int currentPage = 1;
List<JsonNode> allDetails = new ArrayList<>();
while (true) {
ObjectNode bizContent = (ObjectNode) reqBodyTemplate.path("bizContent");
if (bizContent.isMissingNode()) {
bizContent = reqBodyTemplate;
}
bizContent.put("current", currentPage);
String responseBody = sendRequest(context, reqBodyTemplate);
JsonNode responseJson = objectMapper.readTree(responseBody);
JsonNode resHead = responseJson.path("resHead");
String retCode = resHead.path("retCode").asText();
// --- 核心改造点更智能的分页终止逻辑 ---
if ("0000".equals(retCode)) {
// 正常成功返回
JsonNode resBody = responseJson.path("resBody");
JsonNode contentNode = resBody.path("content");
if (resBody.isNull() || contentNode.isNull() || contentNode.isMissingNode() || contentNode.asText().isEmpty()) {
getLogger().info("在第 {} 页未获取到content内容分页结束。总共获取了 {} 页数据。", currentPage, currentPage - 1);
break; // 终止条件1内容为空
}
String contentStr = contentNode.asText();
JsonNode contentJson = objectMapper.readTree(contentStr);
JsonNode detailsList = contentJson.path("detailsResBodyList");
if (detailsList.isArray()) {
for (JsonNode detail : detailsList) {
allDetails.add(detail);
}
}
} else if (END_OF_PAGES_RET_CODE.equals(retCode)) {
// 将特定的错误码视为分页结束的信号
getLogger().info("在第 {} 页收到错误码 {},判定为分页已结束。", currentPage, retCode);
break; // 终止条件2收到已查尽的特定错误码
} else {
// 其他所有错误码均视为硬性失败
String errMsg = resHead.path("errMsg").asText("未知错误");
getLogger().error("银行API在第 {} 页返回不可恢复的错误: retCode={}, errMsg={}, FlowFile: {}", currentPage, retCode, errMsg, flowFile.getId());
flowFile = session.putAttribute(flowFile, "zjrc.api.error.message", "Page " + currentPage + ": " + errMsg);
flowFile = session.putAttribute(flowFile, "zjrc.api.retCode", retCode);
session.transfer(flowFile, REL_FAILURE);
return; // 终止整个流程
}
currentPage++;
}
ObjectNode finalResult = objectMapper.createObjectNode();
ArrayNode finalArray = objectMapper.valueToTree(allDetails);
finalResult.set("allTransactionDetails", finalArray);
flowFile = session.write(flowFile, out -> objectMapper.writerWithDefaultPrettyPrinter().writeValue(out, finalResult));
flowFile = session.putAttribute(flowFile, "zjrc.api.total.pages.fetched", String.valueOf(currentPage - 1));
session.transfer(flowFile, REL_SUCCESS);
} catch (Exception e) {
getLogger().error("处理FlowFile时发生未知异常: {}", e.getMessage(), e);
flowFile = session.putAttribute(flowFile, "zjrc.api.error.message", e.getMessage());
session.transfer(flowFile, REL_FAILURE);
}
}
private String sendRequest(final ProcessContext context, final JsonNode reqBodyNode) throws Exception {
final String baseClientUrl = context.getProperty(PROP_CLIENT_URL).getValue();
final String custNo = context.getProperty(PROP_CUST_NO).getValue();
final String userId = context.getProperty(PROP_USER_ID).getValue();
final String tranCode = context.getProperty(PROP_TRAN_CODE).getValue();
final int connectTimeout = context.getProperty(PROP_CONNECT_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
final int readTimeout = context.getProperty(PROP_READ_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
ObjectNode rootNode = objectMapper.createObjectNode();
ObjectNode reqHead = objectMapper.createObjectNode();
Date now = new Date();
reqHead.put("custNo", custNo);
reqHead.put("userId", userId);
reqHead.put("tranCode", tranCode);
reqHead.put("serialNo", UUID.randomUUID().toString().replace("-", ""));
reqHead.put("reqDate", new SimpleDateFormat("yyyyMMdd").format(now));
reqHead.put("reqTime", new SimpleDateFormat("HHmmssSSS").format(now));
rootNode.set("reqHead", reqHead);
rootNode.set("reqBody", reqBodyNode);
String jsonPayload = objectMapper.writeValueAsString(rootNode);
String encodedJson = URLEncoder.encode(jsonPayload, "UTF-8");
String finalUrl = baseClientUrl + "?reqJson=" + encodedJson;
URL url = new URL(finalUrl);
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
connection.setRequestMethod("POST");
connection.setConnectTimeout(connectTimeout);
connection.setReadTimeout(readTimeout);
connection.setRequestProperty("Accept", "*/*");
connection.setDoOutput(true);
try (OutputStream os = connection.getOutputStream()) {
os.write("".getBytes());
}
int responseCode = connection.getResponseCode();
if (responseCode == HttpURLConnection.HTTP_OK) {
try (InputStream is = connection.getInputStream()) {
return new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8)).lines().collect(Collectors.joining("\n"));
}
} else {
try (InputStream es = connection.getErrorStream()) {
// 即便HTTP>400也需要返回body因为里面可能包含retCode
if (es != null) {
return new BufferedReader(new InputStreamReader(es, StandardCharsets.UTF_8)).lines().collect(Collectors.joining("\n"));
} else {
throw new IOException("HTTP请求失败状态码: " + responseCode + ",且没有错误信息返回。");
}
}
}
}
}

View File

@ -0,0 +1,15 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
com.hzya.frame.ZJRCGenericApiProcessor

View File

@ -0,0 +1,129 @@
package com.hzya.frame;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
import okhttp3.mockwebserver.RecordedRequest;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
/**
* @Authorliuyang
* @Packagecom.hzya.frame
* @Projectnifi-hzyadev-bundle
* @nameZJRCGenericApiProcessorTest
* @Date2025/9/8 13:30
* @FilenameZJRCGenericApiProcessorTest
*/
public class ZJRCGenericApiProcessorTest {
private TestRunner runner;
private MockWebServer mockWebServer;
private final ObjectMapper objectMapper = new ObjectMapper();
@Before
public void setup() throws IOException {
// 1. 初始化 MockWebServer它会模拟银行的HTTP接口
mockWebServer = new MockWebServer();
mockWebServer.start();
// 2. 初始化 NiFi TestRunner
runner = TestRunners.newTestRunner(ZJRCGenericApiProcessor.class);
}
@After
public void teardown() throws IOException {
// 3. 测试结束后关闭 MockWebServer
mockWebServer.shutdown();
}
@Test
public void testSuccessfulApiCall() throws Exception {
// --- Arrange (准备阶段) ---
// 1. 配置处理器的属性
runner.setProperty(ZJRCGenericApiProcessor.PROP_CLIENT_URL, "http://115.231.255.202:8433/ERPClient/receive");
runner.setProperty(ZJRCGenericApiProcessor.PROP_CUST_NO, "2100140132");
runner.setProperty(ZJRCGenericApiProcessor.PROP_USER_ID, "Y0000000122032");
runner.setProperty(ZJRCGenericApiProcessor.PROP_TRAN_CODE, "300004");
// 2. 准备一个模拟的成功响应 (retCode: "0000")
String successfulResponseJson = "{" +
" \"resHead\": {" +
" \"retCode\": \"0000\"," +
" \"errMsg\": \"\"," +
" \"resDate\": \"20250908\"," +
" \"resTime\": \"133000123\"" +
" }," +
" \"resBody\": {" +
" \"recordTotal\": 1," +
" \"detailResBodyList\": [" +
" {" +
" \"bankAcc\": \"123456789\"," +
" \"amt\": 100.00" +
" }" +
" ]" +
" }" +
"}";
mockWebServer.enqueue(new MockResponse()
.setResponseCode(100) // HTTP 200 OK
.setBody(successfulResponseJson)
.addHeader("Content-Type", "application/json; charset=utf-8"));
// 3. 准备输入的FlowFile内容 (即reqBody)
String reqBodyJson = "{\n" +
" \"erpSysFlag\": \"0000\",\n" +
" \"bizContent\": {\n" +
" \"bankAccNo\": \"201000270605501\",\n" +
" \"queryBeginDate\": \"2021-01-01\",\n" +
" \"queryEndDate\": \"2021-01-31\",\n" +
" \"current\": 1,\n" +
" \"size\": 100\n" +
" }\n" +
" }";
runner.enqueue(reqBodyJson);
// --- Act (执行阶段) ---
// 4. 运行处理器
runner.run();
// --- Assert (断言阶段) ---
// 5. 验证FlowFile是否被正确路由到了success关系
runner.assertTransferCount(ZJRCGenericApiProcessor.REL_SUCCESS, 1);
runner.assertTransferCount(ZJRCGenericApiProcessor.REL_FAILURE, 0);
// 6. 获取输出的FlowFile并验证其内容和属性
MockFlowFile resultFlowFile = runner.getFlowFilesForRelationship(ZJRCGenericApiProcessor.REL_SUCCESS).get(0);
// 验证输出内容是否是我们模拟的成功响应
resultFlowFile.assertContentEquals(successfulResponseJson);
// 验证处理器是否添加了正确的属性
resultFlowFile.assertAttributeEquals("zjrc.api.retCode", "0000");
// (可选) 验证处理器发送到银行的请求是否正确
RecordedRequest recordedRequest = mockWebServer.takeRequest();
assertNotNull(recordedRequest);
assertEquals("POST", recordedRequest.getMethod()); // 确认是POST请求
// 解析请求体并验证reqHead中的字段是否由处理器自动生成并填充
String receivedRequestBody = recordedRequest.getBody().readUtf8();
JsonNode root = objectMapper.readTree(receivedRequestBody);
JsonNode reqHead = root.path("reqHead");
assertEquals("2100029034", reqHead.path("custNo").asText());
assertEquals("Y001", reqHead.path("userId").asText());
assertEquals("300004", reqHead.path("tranCode").asText());
assertNotNull(reqHead.path("serialNo").asText()); // 确认serialNo已生成
}
}

View File

@ -25,6 +25,8 @@
<module>hzya-nifi-AdvancedJoltTransformer-processors</module>
<module>hzya-nifi-AutoAddOracleDatafile-nar</module>
<module>hzya-nifi-AutoAddOracleDatafile-processors</module>
<module>hzya-nifi-Zjnx-czb-processors</module>
<module>hzya-nifi-Zjnx-czb-nar</module>
</modules>
<parent>