diff --git a/nifi-hzyadev-bundle/hzya-nifi-Zjnx-czb-nar/pom.xml b/nifi-hzyadev-bundle/hzya-nifi-Zjnx-czb-nar/pom.xml
new file mode 100644
index 0000000..4a57fdf
--- /dev/null
+++ b/nifi-hzyadev-bundle/hzya-nifi-Zjnx-czb-nar/pom.xml
@@ -0,0 +1,34 @@
+
+
+
+ nifi-hzyadev-bundle
+ com.hzya
+ 1.0
+
+ 4.0.0
+
+ hzya-nifi-Zjnx-czb-nar
+
+ nar
+
+
+ 8
+ 8
+
+
+
+
+ com.hzya
+ hzya-nifi-Zjnx-czb-processors
+ 1.0
+
+
+ org.apache.nifi
+ nifi-standard-services-api-nar
+ ${nifi-revision}
+ nar
+
+
+
\ No newline at end of file
diff --git a/nifi-hzyadev-bundle/hzya-nifi-Zjnx-czb-processors/pom.xml b/nifi-hzyadev-bundle/hzya-nifi-Zjnx-czb-processors/pom.xml
new file mode 100644
index 0000000..dd2f7b5
--- /dev/null
+++ b/nifi-hzyadev-bundle/hzya-nifi-Zjnx-czb-processors/pom.xml
@@ -0,0 +1,54 @@
+
+
+
+ nifi-hzyadev-bundle
+ com.hzya
+ 1.0
+
+ 4.0.0
+
+ hzya-nifi-Zjnx-czb-processors
+
+
+ 8
+ 8
+
+
+
+
+ org.apache.nifi
+ nifi-api
+ ${nifi-revision}
+
+
+ org.apache.nifi
+ nifi-dbcp-service-api
+ ${nifi-revision}
+
+
+ org.apache.nifi
+ nifi-processor-utils
+ 1.15.3
+
+
+ org.apache.nifi
+ nifi-mock
+ ${nifi-revision}
+ test
+
+
+ junit
+ junit
+ 4.13
+ test
+
+
+ com.squareup.okhttp3
+ mockwebserver
+ 4.9.3
+ test
+
+
+
\ No newline at end of file
diff --git a/nifi-hzyadev-bundle/hzya-nifi-Zjnx-czb-processors/src/main/java/com/hzya/frame/ZJRCGenericApiProcessor.java b/nifi-hzyadev-bundle/hzya-nifi-Zjnx-czb-processors/src/main/java/com/hzya/frame/ZJRCGenericApiProcessor.java
new file mode 100644
index 0000000..011e43f
--- /dev/null
+++ b/nifi-hzyadev-bundle/hzya-nifi-Zjnx-czb-processors/src/main/java/com/hzya/frame/ZJRCGenericApiProcessor.java
@@ -0,0 +1,224 @@
+package com.hzya.frame;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.*;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.*;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import java.text.SimpleDateFormat;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+@Tags({"zjrc", "bank", "api", "财资宝", "农商行", "pagination"})
+@CapabilityDescription("调用浙江农商联合银行财资宝的通用接口处理器,并自动处理分页查询。通过配置'交易码'属性来指定调用的具体接口。处理器接收一个包含reqBody的JSON作为输入FlowFile。")
+@SeeAlso({})
+@ReadsAttributes({@ReadsAttribute(attribute = "", description = "")})
+@WritesAttributes({@WritesAttribute(attribute = "zjrc.api.error.message", description = "当API调用失败或银行返回错误时,此处会包含错误信息。"), @WritesAttribute(attribute = "zjrc.api.retCode", description = "银行返回的业务代码,例如 '0000'。"), @WritesAttribute(attribute = "zjrc.api.total.pages.fetched", description = "成功完成分页查询后,记录总共获取了多少页的数据。")})
+public class ZJRCGenericApiProcessor extends AbstractProcessor {
+
+ // --- 新增超时属性 ---
+ public static final PropertyDescriptor PROP_CONNECT_TIMEOUT = new PropertyDescriptor.Builder().name("连接超时时间").displayName("连接超时时间").description("建立HTTP连接的超时时间。").defaultValue("10 s").required(true).addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).build();
+
+ public static final PropertyDescriptor PROP_READ_TIMEOUT = new PropertyDescriptor.Builder().name("读取超时时间").displayName("读取超时时间").description("等待HTTP响应的超时时间。").defaultValue("30 s").required(true).addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).build();
+
+ // --- 已有属性保持不变 ---
+ public static final PropertyDescriptor PROP_TRAN_CODE = new PropertyDescriptor.Builder().name("交易码 (tranCode)").displayName("交易码 (tranCode)").description("需要调用的接口交易码,例如 300004 (账户交易明细查询)。").required(true).defaultValue("300004").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
+
+ public static final PropertyDescriptor PROP_CLIENT_URL = new PropertyDescriptor.Builder().name("财资宝前置客户端URL").displayName("财资宝前置客户端URL").description("财资宝前置客户端的URL地址,不包含任何查询参数,例如:http://115.231.255.202:8433/ERPClient/receive").required(true).addValidator(StandardValidators.URL_VALIDATOR).build();
+
+ public static final PropertyDescriptor PROP_CUST_NO = new PropertyDescriptor.Builder().name("企业客户号 (custNo)").displayName("企业客户号 (custNo)").description("银行分配的企业网银客户号。").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
+
+ public static final PropertyDescriptor PROP_USER_ID = new PropertyDescriptor.Builder().name("用户号 (userId)").displayName("用户号 (userId)").description("虚拟用户ID,通常以前缀'Y'开头。").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
+
+ public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("API调用成功且所有分页数据成功获取后,包含合并结果的FlowFile将被路由到此。").build();
+
+ public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("API调用失败或银行返回非'0000'错误码的FlowFile将被路由到此。").build();
+
+ private List descriptors;
+ private Set relationships;
+ private final ObjectMapper objectMapper = new ObjectMapper();
+
+ // --- 核心修改点:将“查询结束”的错误码定义为常量 ---
+ private static final String END_OF_PAGES_RET_CODE = "CTM0000006";
+
+ @Override
+ protected void init(final ProcessorInitializationContext context) {
+ final List descriptors = new ArrayList<>();
+ descriptors.add(PROP_CLIENT_URL);
+ descriptors.add(PROP_CUST_NO);
+ descriptors.add(PROP_USER_ID);
+ descriptors.add(PROP_TRAN_CODE);
+ descriptors.add(PROP_CONNECT_TIMEOUT);
+ descriptors.add(PROP_READ_TIMEOUT);
+ this.descriptors = Collections.unmodifiableList(descriptors);
+
+ final Set relationships = new HashSet<>();
+ relationships.add(REL_SUCCESS);
+ relationships.add(REL_FAILURE);
+ this.relationships = Collections.unmodifiableSet(relationships);
+ }
+
+ @Override
+ public Set getRelationships() {
+ return this.relationships;
+ }
+
+ @Override
+ public final List getSupportedPropertyDescriptors() {
+ return descriptors;
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ try {
+ final StringBuilder reqBodyBuilder = new StringBuilder();
+ try (InputStream in = session.read(flowFile)) {
+ reqBodyBuilder.append(new BufferedReader(new InputStreamReader(in)).lines().collect(Collectors.joining("\n")));
+ }
+ ObjectNode reqBodyTemplate = (ObjectNode) objectMapper.readTree(reqBodyBuilder.toString());
+
+ int currentPage = 1;
+ List allDetails = new ArrayList<>();
+
+ while (true) {
+ ObjectNode bizContent = (ObjectNode) reqBodyTemplate.path("bizContent");
+ if (bizContent.isMissingNode()) {
+ bizContent = reqBodyTemplate;
+ }
+ bizContent.put("current", currentPage);
+
+ String responseBody = sendRequest(context, reqBodyTemplate);
+ JsonNode responseJson = objectMapper.readTree(responseBody);
+
+ JsonNode resHead = responseJson.path("resHead");
+ String retCode = resHead.path("retCode").asText();
+
+ // --- 核心改造点:更智能的分页终止逻辑 ---
+ if ("0000".equals(retCode)) {
+ // 正常成功返回
+ JsonNode resBody = responseJson.path("resBody");
+ JsonNode contentNode = resBody.path("content");
+ if (resBody.isNull() || contentNode.isNull() || contentNode.isMissingNode() || contentNode.asText().isEmpty()) {
+ getLogger().info("在第 {} 页未获取到content内容,分页结束。总共获取了 {} 页数据。", currentPage, currentPage - 1);
+ break; // 终止条件1:内容为空
+ }
+
+ String contentStr = contentNode.asText();
+ JsonNode contentJson = objectMapper.readTree(contentStr);
+ JsonNode detailsList = contentJson.path("detailsResBodyList");
+ if (detailsList.isArray()) {
+ for (JsonNode detail : detailsList) {
+ allDetails.add(detail);
+ }
+ }
+ } else if (END_OF_PAGES_RET_CODE.equals(retCode)) {
+ // 将特定的错误码视为分页结束的信号
+ getLogger().info("在第 {} 页收到错误码 {},判定为分页已结束。", currentPage, retCode);
+ break; // 终止条件2:收到“已查尽”的特定错误码
+ } else {
+ // 其他所有错误码均视为硬性失败
+ String errMsg = resHead.path("errMsg").asText("未知错误");
+ getLogger().error("银行API在第 {} 页返回不可恢复的错误: retCode={}, errMsg={}, FlowFile: {}", currentPage, retCode, errMsg, flowFile.getId());
+ flowFile = session.putAttribute(flowFile, "zjrc.api.error.message", "Page " + currentPage + ": " + errMsg);
+ flowFile = session.putAttribute(flowFile, "zjrc.api.retCode", retCode);
+ session.transfer(flowFile, REL_FAILURE);
+ return; // 终止整个流程
+ }
+
+ currentPage++;
+ }
+
+ ObjectNode finalResult = objectMapper.createObjectNode();
+ ArrayNode finalArray = objectMapper.valueToTree(allDetails);
+ finalResult.set("allTransactionDetails", finalArray);
+
+ flowFile = session.write(flowFile, out -> objectMapper.writerWithDefaultPrettyPrinter().writeValue(out, finalResult));
+ flowFile = session.putAttribute(flowFile, "zjrc.api.total.pages.fetched", String.valueOf(currentPage - 1));
+ session.transfer(flowFile, REL_SUCCESS);
+
+ } catch (Exception e) {
+ getLogger().error("处理FlowFile时发生未知异常: {}", e.getMessage(), e);
+ flowFile = session.putAttribute(flowFile, "zjrc.api.error.message", e.getMessage());
+ session.transfer(flowFile, REL_FAILURE);
+ }
+ }
+
+ private String sendRequest(final ProcessContext context, final JsonNode reqBodyNode) throws Exception {
+ final String baseClientUrl = context.getProperty(PROP_CLIENT_URL).getValue();
+ final String custNo = context.getProperty(PROP_CUST_NO).getValue();
+ final String userId = context.getProperty(PROP_USER_ID).getValue();
+ final String tranCode = context.getProperty(PROP_TRAN_CODE).getValue();
+ final int connectTimeout = context.getProperty(PROP_CONNECT_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
+ final int readTimeout = context.getProperty(PROP_READ_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
+
+ ObjectNode rootNode = objectMapper.createObjectNode();
+ ObjectNode reqHead = objectMapper.createObjectNode();
+ Date now = new Date();
+
+ reqHead.put("custNo", custNo);
+ reqHead.put("userId", userId);
+ reqHead.put("tranCode", tranCode);
+ reqHead.put("serialNo", UUID.randomUUID().toString().replace("-", ""));
+ reqHead.put("reqDate", new SimpleDateFormat("yyyyMMdd").format(now));
+ reqHead.put("reqTime", new SimpleDateFormat("HHmmssSSS").format(now));
+
+ rootNode.set("reqHead", reqHead);
+ rootNode.set("reqBody", reqBodyNode);
+
+ String jsonPayload = objectMapper.writeValueAsString(rootNode);
+ String encodedJson = URLEncoder.encode(jsonPayload, "UTF-8");
+ String finalUrl = baseClientUrl + "?reqJson=" + encodedJson;
+
+ URL url = new URL(finalUrl);
+ HttpURLConnection connection = (HttpURLConnection) url.openConnection();
+ connection.setRequestMethod("POST");
+ connection.setConnectTimeout(connectTimeout);
+ connection.setReadTimeout(readTimeout);
+ connection.setRequestProperty("Accept", "*/*");
+ connection.setDoOutput(true);
+
+ try (OutputStream os = connection.getOutputStream()) {
+ os.write("".getBytes());
+ }
+
+ int responseCode = connection.getResponseCode();
+ if (responseCode == HttpURLConnection.HTTP_OK) {
+ try (InputStream is = connection.getInputStream()) {
+ return new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8)).lines().collect(Collectors.joining("\n"));
+ }
+ } else {
+ try (InputStream es = connection.getErrorStream()) {
+ // 即便HTTP>400,也需要返回body,因为里面可能包含retCode
+ if (es != null) {
+ return new BufferedReader(new InputStreamReader(es, StandardCharsets.UTF_8)).lines().collect(Collectors.joining("\n"));
+ } else {
+ throw new IOException("HTTP请求失败,状态码: " + responseCode + ",且没有错误信息返回。");
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/nifi-hzyadev-bundle/hzya-nifi-Zjnx-czb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-hzyadev-bundle/hzya-nifi-Zjnx-czb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
new file mode 100644
index 0000000..a4433a6
--- /dev/null
+++ b/nifi-hzyadev-bundle/hzya-nifi-Zjnx-czb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -0,0 +1,15 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+com.hzya.frame.ZJRCGenericApiProcessor
\ No newline at end of file
diff --git a/nifi-hzyadev-bundle/hzya-nifi-Zjnx-czb-processors/src/test/java/com/hzya/frame/ZJRCGenericApiProcessorTest.java b/nifi-hzyadev-bundle/hzya-nifi-Zjnx-czb-processors/src/test/java/com/hzya/frame/ZJRCGenericApiProcessorTest.java
new file mode 100644
index 0000000..25cd2fe
--- /dev/null
+++ b/nifi-hzyadev-bundle/hzya-nifi-Zjnx-czb-processors/src/test/java/com/hzya/frame/ZJRCGenericApiProcessorTest.java
@@ -0,0 +1,129 @@
+package com.hzya.frame;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import okhttp3.mockwebserver.MockResponse;
+import okhttp3.mockwebserver.MockWebServer;
+import okhttp3.mockwebserver.RecordedRequest;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * @Author:liuyang
+ * @Package:com.hzya.frame
+ * @Project:nifi-hzyadev-bundle
+ * @name:ZJRCGenericApiProcessorTest
+ * @Date:2025/9/8 13:30
+ * @Filename:ZJRCGenericApiProcessorTest
+ */
+public class ZJRCGenericApiProcessorTest {
+ private TestRunner runner;
+ private MockWebServer mockWebServer;
+ private final ObjectMapper objectMapper = new ObjectMapper();
+
+ @Before
+ public void setup() throws IOException {
+ // 1. 初始化 MockWebServer,它会模拟银行的HTTP接口
+ mockWebServer = new MockWebServer();
+ mockWebServer.start();
+
+ // 2. 初始化 NiFi 的 TestRunner
+ runner = TestRunners.newTestRunner(ZJRCGenericApiProcessor.class);
+ }
+
+ @After
+ public void teardown() throws IOException {
+ // 3. 测试结束后关闭 MockWebServer
+ mockWebServer.shutdown();
+ }
+
+ @Test
+ public void testSuccessfulApiCall() throws Exception {
+ // --- Arrange (准备阶段) ---
+
+ // 1. 配置处理器的属性
+ runner.setProperty(ZJRCGenericApiProcessor.PROP_CLIENT_URL, "http://115.231.255.202:8433/ERPClient/receive");
+ runner.setProperty(ZJRCGenericApiProcessor.PROP_CUST_NO, "2100140132");
+ runner.setProperty(ZJRCGenericApiProcessor.PROP_USER_ID, "Y0000000122032");
+ runner.setProperty(ZJRCGenericApiProcessor.PROP_TRAN_CODE, "300004");
+
+ // 2. 准备一个模拟的成功响应 (retCode: "0000")
+ String successfulResponseJson = "{" +
+ " \"resHead\": {" +
+ " \"retCode\": \"0000\"," +
+ " \"errMsg\": \"\"," +
+ " \"resDate\": \"20250908\"," +
+ " \"resTime\": \"133000123\"" +
+ " }," +
+ " \"resBody\": {" +
+ " \"recordTotal\": 1," +
+ " \"detailResBodyList\": [" +
+ " {" +
+ " \"bankAcc\": \"123456789\"," +
+ " \"amt\": 100.00" +
+ " }" +
+ " ]" +
+ " }" +
+ "}";
+ mockWebServer.enqueue(new MockResponse()
+ .setResponseCode(100) // HTTP 200 OK
+ .setBody(successfulResponseJson)
+ .addHeader("Content-Type", "application/json; charset=utf-8"));
+
+ // 3. 准备输入的FlowFile内容 (即reqBody)
+ String reqBodyJson = "{\n" +
+ " \"erpSysFlag\": \"0000\",\n" +
+ " \"bizContent\": {\n" +
+ " \"bankAccNo\": \"201000270605501\",\n" +
+ " \"queryBeginDate\": \"2021-01-01\",\n" +
+ " \"queryEndDate\": \"2021-01-31\",\n" +
+ " \"current\": 1,\n" +
+ " \"size\": 100\n" +
+ " }\n" +
+ " }";
+ runner.enqueue(reqBodyJson);
+
+ // --- Act (执行阶段) ---
+
+ // 4. 运行处理器
+ runner.run();
+
+ // --- Assert (断言阶段) ---
+
+ // 5. 验证FlowFile是否被正确路由到了success关系
+ runner.assertTransferCount(ZJRCGenericApiProcessor.REL_SUCCESS, 1);
+ runner.assertTransferCount(ZJRCGenericApiProcessor.REL_FAILURE, 0);
+
+ // 6. 获取输出的FlowFile并验证其内容和属性
+ MockFlowFile resultFlowFile = runner.getFlowFilesForRelationship(ZJRCGenericApiProcessor.REL_SUCCESS).get(0);
+
+ // 验证输出内容是否是我们模拟的成功响应
+ resultFlowFile.assertContentEquals(successfulResponseJson);
+ // 验证处理器是否添加了正确的属性
+ resultFlowFile.assertAttributeEquals("zjrc.api.retCode", "0000");
+
+ // (可选) 验证处理器发送到银行的请求是否正确
+ RecordedRequest recordedRequest = mockWebServer.takeRequest();
+ assertNotNull(recordedRequest);
+ assertEquals("POST", recordedRequest.getMethod()); // 确认是POST请求
+
+ // 解析请求体,并验证reqHead中的字段是否由处理器自动生成并填充
+ String receivedRequestBody = recordedRequest.getBody().readUtf8();
+ JsonNode root = objectMapper.readTree(receivedRequestBody);
+
+ JsonNode reqHead = root.path("reqHead");
+ assertEquals("2100029034", reqHead.path("custNo").asText());
+ assertEquals("Y001", reqHead.path("userId").asText());
+ assertEquals("300004", reqHead.path("tranCode").asText());
+ assertNotNull(reqHead.path("serialNo").asText()); // 确认serialNo已生成
+ }
+}
\ No newline at end of file
diff --git a/nifi-hzyadev-bundle/pom.xml b/nifi-hzyadev-bundle/pom.xml
index a061eb5..cbdaf5b 100644
--- a/nifi-hzyadev-bundle/pom.xml
+++ b/nifi-hzyadev-bundle/pom.xml
@@ -25,6 +25,8 @@
hzya-nifi-AdvancedJoltTransformer-processors
hzya-nifi-AutoAddOracleDatafile-nar
hzya-nifi-AutoAddOracleDatafile-processors
+ hzya-nifi-Zjnx-czb-processors
+ hzya-nifi-Zjnx-czb-nar