From 0e2554bf2eb06ee8faf3bea750e82c9741c67c15 Mon Sep 17 00:00:00 2001 From: liuy <37787198+LiuyCodes@users.noreply.github.com> Date: Mon, 8 Sep 2025 16:07:58 +0800 Subject: [PATCH] =?UTF-8?q?feat(nifi):=20=E6=B7=BB=E5=8A=A0=E6=B5=99?= =?UTF-8?q?=E6=B1=9F=E5=86=9C=E5=95=86=E8=A1=8C=E8=B4=A2=E8=B5=84=E5=AE=9D?= =?UTF-8?q?=E9=80=9A=E7=94=A8=E6=8E=A5=E5=8F=A3=E5=A4=84=E7=90=86=E5=99=A8?= =?UTF-8?q?-=20=E6=96=B0=E5=A2=9E=20ZJRCGenericApiProcessor=20=E5=A4=84?= =?UTF-8?q?=E7=90=86=E5=99=A8=EF=BC=8C=E7=94=A8=E4=BA=8E=E8=B0=83=E7=94=A8?= =?UTF-8?q?=E6=B5=99=E6=B1=9F=E5=86=9C=E5=95=86=E8=A1=8C=E8=B4=A2=E8=B5=84?= =?UTF-8?q?=E5=AE=9D=E6=8E=A5=E5=8F=A3=20-=20=E5=AE=9E=E7=8E=B0=E4=BA=86?= =?UTF-8?q?=E5=88=86=E9=A1=B5=E6=9F=A5=E8=AF=A2=E9=80=BB=E8=BE=91=EF=BC=8C?= =?UTF-8?q?=E8=83=BD=E5=A4=9F=E8=87=AA=E5=8A=A8=E5=A4=84=E7=90=86=E5=A4=9A?= =?UTF-8?q?=E9=A1=B5=E6=95=B0=E6=8D=AE=20-=20=E6=B7=BB=E5=8A=A0=E4=BA=86?= =?UTF-8?q?=E8=BF=9E=E6=8E=A5=E8=B6=85=E6=97=B6=E5=92=8C=E8=AF=BB=E5=8F=96?= =?UTF-8?q?=E8=B6=85=E6=97=B6=E7=9A=84=E9=85=8D=E7=BD=AE=E9=80=89=E9=A1=B9?= =?UTF-8?q?=20-=20=E4=BC=98=E5=8C=96=E4=BA=86=E9=94=99=E8=AF=AF=E5=A4=84?= =?UTF-8?q?=E7=90=86=EF=BC=8C=E6=94=AF=E6=8C=81=E7=89=B9=E5=AE=9A=E9=94=99?= =?UTF-8?q?=E8=AF=AF=E7=A0=81=E7=9A=84=E6=99=BA=E8=83=BD=E5=88=A4=E6=96=AD?= =?UTF-8?q?-=20=E5=A2=9E=E5=8A=A0=E4=BA=86=E5=8D=95=E5=85=83=E6=B5=8B?= =?UTF-8?q?=E8=AF=95=E7=94=A8=E4=BE=8B=EF=BC=8C=E7=A1=AE=E4=BF=9D=E5=A4=84?= =?UTF-8?q?=E7=90=86=E5=99=A8=E5=8A=9F=E8=83=BD=E7=9A=84=E6=AD=A3=E7=A1=AE?= =?UTF-8?q?=E6=80=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../hzya-nifi-Zjnx-czb-nar/pom.xml | 34 +++ .../hzya-nifi-Zjnx-czb-processors/pom.xml | 54 +++++ .../hzya/frame/ZJRCGenericApiProcessor.java | 224 ++++++++++++++++++ .../org.apache.nifi.processor.Processor | 15 ++ .../frame/ZJRCGenericApiProcessorTest.java | 129 ++++++++++ nifi-hzyadev-bundle/pom.xml | 2 + 6 files changed, 458 insertions(+) create mode 100644 nifi-hzyadev-bundle/hzya-nifi-Zjnx-czb-nar/pom.xml create mode 100644 nifi-hzyadev-bundle/hzya-nifi-Zjnx-czb-processors/pom.xml create mode 100644 nifi-hzyadev-bundle/hzya-nifi-Zjnx-czb-processors/src/main/java/com/hzya/frame/ZJRCGenericApiProcessor.java create mode 100644 nifi-hzyadev-bundle/hzya-nifi-Zjnx-czb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor create mode 100644 nifi-hzyadev-bundle/hzya-nifi-Zjnx-czb-processors/src/test/java/com/hzya/frame/ZJRCGenericApiProcessorTest.java diff --git a/nifi-hzyadev-bundle/hzya-nifi-Zjnx-czb-nar/pom.xml b/nifi-hzyadev-bundle/hzya-nifi-Zjnx-czb-nar/pom.xml new file mode 100644 index 0000000..4a57fdf --- /dev/null +++ b/nifi-hzyadev-bundle/hzya-nifi-Zjnx-czb-nar/pom.xml @@ -0,0 +1,34 @@ + + + + nifi-hzyadev-bundle + com.hzya + 1.0 + + 4.0.0 + + hzya-nifi-Zjnx-czb-nar + + nar + + + 8 + 8 + + + + + com.hzya + hzya-nifi-Zjnx-czb-processors + 1.0 + + + org.apache.nifi + nifi-standard-services-api-nar + ${nifi-revision} + nar + + + \ No newline at end of file diff --git a/nifi-hzyadev-bundle/hzya-nifi-Zjnx-czb-processors/pom.xml b/nifi-hzyadev-bundle/hzya-nifi-Zjnx-czb-processors/pom.xml new file mode 100644 index 0000000..dd2f7b5 --- /dev/null +++ b/nifi-hzyadev-bundle/hzya-nifi-Zjnx-czb-processors/pom.xml @@ -0,0 +1,54 @@ + + + + nifi-hzyadev-bundle + com.hzya + 1.0 + + 4.0.0 + + hzya-nifi-Zjnx-czb-processors + + + 8 + 8 + + + + + org.apache.nifi + nifi-api + ${nifi-revision} + + + org.apache.nifi + nifi-dbcp-service-api + ${nifi-revision} + + + org.apache.nifi + nifi-processor-utils + 1.15.3 + + + org.apache.nifi + nifi-mock + ${nifi-revision} + test + + + junit + junit + 4.13 + test + + + com.squareup.okhttp3 + mockwebserver + 4.9.3 + test + + + \ No newline at end of file diff --git a/nifi-hzyadev-bundle/hzya-nifi-Zjnx-czb-processors/src/main/java/com/hzya/frame/ZJRCGenericApiProcessor.java b/nifi-hzyadev-bundle/hzya-nifi-Zjnx-czb-processors/src/main/java/com/hzya/frame/ZJRCGenericApiProcessor.java new file mode 100644 index 0000000..011e43f --- /dev/null +++ b/nifi-hzyadev-bundle/hzya-nifi-Zjnx-czb-processors/src/main/java/com/hzya/frame/ZJRCGenericApiProcessor.java @@ -0,0 +1,224 @@ +package com.hzya.frame; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.*; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.io.*; +import java.net.HttpURLConnection; +import java.net.URL; +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; +import java.text.SimpleDateFormat; +import java.util.*; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +@Tags({"zjrc", "bank", "api", "财资宝", "农商行", "pagination"}) +@CapabilityDescription("调用浙江农商联合银行财资宝的通用接口处理器,并自动处理分页查询。通过配置'交易码'属性来指定调用的具体接口。处理器接收一个包含reqBody的JSON作为输入FlowFile。") +@SeeAlso({}) +@ReadsAttributes({@ReadsAttribute(attribute = "", description = "")}) +@WritesAttributes({@WritesAttribute(attribute = "zjrc.api.error.message", description = "当API调用失败或银行返回错误时,此处会包含错误信息。"), @WritesAttribute(attribute = "zjrc.api.retCode", description = "银行返回的业务代码,例如 '0000'。"), @WritesAttribute(attribute = "zjrc.api.total.pages.fetched", description = "成功完成分页查询后,记录总共获取了多少页的数据。")}) +public class ZJRCGenericApiProcessor extends AbstractProcessor { + + // --- 新增超时属性 --- + public static final PropertyDescriptor PROP_CONNECT_TIMEOUT = new PropertyDescriptor.Builder().name("连接超时时间").displayName("连接超时时间").description("建立HTTP连接的超时时间。").defaultValue("10 s").required(true).addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).build(); + + public static final PropertyDescriptor PROP_READ_TIMEOUT = new PropertyDescriptor.Builder().name("读取超时时间").displayName("读取超时时间").description("等待HTTP响应的超时时间。").defaultValue("30 s").required(true).addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).build(); + + // --- 已有属性保持不变 --- + public static final PropertyDescriptor PROP_TRAN_CODE = new PropertyDescriptor.Builder().name("交易码 (tranCode)").displayName("交易码 (tranCode)").description("需要调用的接口交易码,例如 300004 (账户交易明细查询)。").required(true).defaultValue("300004").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build(); + + public static final PropertyDescriptor PROP_CLIENT_URL = new PropertyDescriptor.Builder().name("财资宝前置客户端URL").displayName("财资宝前置客户端URL").description("财资宝前置客户端的URL地址,不包含任何查询参数,例如:http://115.231.255.202:8433/ERPClient/receive").required(true).addValidator(StandardValidators.URL_VALIDATOR).build(); + + public static final PropertyDescriptor PROP_CUST_NO = new PropertyDescriptor.Builder().name("企业客户号 (custNo)").displayName("企业客户号 (custNo)").description("银行分配的企业网银客户号。").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build(); + + public static final PropertyDescriptor PROP_USER_ID = new PropertyDescriptor.Builder().name("用户号 (userId)").displayName("用户号 (userId)").description("虚拟用户ID,通常以前缀'Y'开头。").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("API调用成功且所有分页数据成功获取后,包含合并结果的FlowFile将被路由到此。").build(); + + public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("API调用失败或银行返回非'0000'错误码的FlowFile将被路由到此。").build(); + + private List descriptors; + private Set relationships; + private final ObjectMapper objectMapper = new ObjectMapper(); + + // --- 核心修改点:将“查询结束”的错误码定义为常量 --- + private static final String END_OF_PAGES_RET_CODE = "CTM0000006"; + + @Override + protected void init(final ProcessorInitializationContext context) { + final List descriptors = new ArrayList<>(); + descriptors.add(PROP_CLIENT_URL); + descriptors.add(PROP_CUST_NO); + descriptors.add(PROP_USER_ID); + descriptors.add(PROP_TRAN_CODE); + descriptors.add(PROP_CONNECT_TIMEOUT); + descriptors.add(PROP_READ_TIMEOUT); + this.descriptors = Collections.unmodifiableList(descriptors); + + final Set relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_FAILURE); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + public Set getRelationships() { + return this.relationships; + } + + @Override + public final List getSupportedPropertyDescriptors() { + return descriptors; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + try { + final StringBuilder reqBodyBuilder = new StringBuilder(); + try (InputStream in = session.read(flowFile)) { + reqBodyBuilder.append(new BufferedReader(new InputStreamReader(in)).lines().collect(Collectors.joining("\n"))); + } + ObjectNode reqBodyTemplate = (ObjectNode) objectMapper.readTree(reqBodyBuilder.toString()); + + int currentPage = 1; + List allDetails = new ArrayList<>(); + + while (true) { + ObjectNode bizContent = (ObjectNode) reqBodyTemplate.path("bizContent"); + if (bizContent.isMissingNode()) { + bizContent = reqBodyTemplate; + } + bizContent.put("current", currentPage); + + String responseBody = sendRequest(context, reqBodyTemplate); + JsonNode responseJson = objectMapper.readTree(responseBody); + + JsonNode resHead = responseJson.path("resHead"); + String retCode = resHead.path("retCode").asText(); + + // --- 核心改造点:更智能的分页终止逻辑 --- + if ("0000".equals(retCode)) { + // 正常成功返回 + JsonNode resBody = responseJson.path("resBody"); + JsonNode contentNode = resBody.path("content"); + if (resBody.isNull() || contentNode.isNull() || contentNode.isMissingNode() || contentNode.asText().isEmpty()) { + getLogger().info("在第 {} 页未获取到content内容,分页结束。总共获取了 {} 页数据。", currentPage, currentPage - 1); + break; // 终止条件1:内容为空 + } + + String contentStr = contentNode.asText(); + JsonNode contentJson = objectMapper.readTree(contentStr); + JsonNode detailsList = contentJson.path("detailsResBodyList"); + if (detailsList.isArray()) { + for (JsonNode detail : detailsList) { + allDetails.add(detail); + } + } + } else if (END_OF_PAGES_RET_CODE.equals(retCode)) { + // 将特定的错误码视为分页结束的信号 + getLogger().info("在第 {} 页收到错误码 {},判定为分页已结束。", currentPage, retCode); + break; // 终止条件2:收到“已查尽”的特定错误码 + } else { + // 其他所有错误码均视为硬性失败 + String errMsg = resHead.path("errMsg").asText("未知错误"); + getLogger().error("银行API在第 {} 页返回不可恢复的错误: retCode={}, errMsg={}, FlowFile: {}", currentPage, retCode, errMsg, flowFile.getId()); + flowFile = session.putAttribute(flowFile, "zjrc.api.error.message", "Page " + currentPage + ": " + errMsg); + flowFile = session.putAttribute(flowFile, "zjrc.api.retCode", retCode); + session.transfer(flowFile, REL_FAILURE); + return; // 终止整个流程 + } + + currentPage++; + } + + ObjectNode finalResult = objectMapper.createObjectNode(); + ArrayNode finalArray = objectMapper.valueToTree(allDetails); + finalResult.set("allTransactionDetails", finalArray); + + flowFile = session.write(flowFile, out -> objectMapper.writerWithDefaultPrettyPrinter().writeValue(out, finalResult)); + flowFile = session.putAttribute(flowFile, "zjrc.api.total.pages.fetched", String.valueOf(currentPage - 1)); + session.transfer(flowFile, REL_SUCCESS); + + } catch (Exception e) { + getLogger().error("处理FlowFile时发生未知异常: {}", e.getMessage(), e); + flowFile = session.putAttribute(flowFile, "zjrc.api.error.message", e.getMessage()); + session.transfer(flowFile, REL_FAILURE); + } + } + + private String sendRequest(final ProcessContext context, final JsonNode reqBodyNode) throws Exception { + final String baseClientUrl = context.getProperty(PROP_CLIENT_URL).getValue(); + final String custNo = context.getProperty(PROP_CUST_NO).getValue(); + final String userId = context.getProperty(PROP_USER_ID).getValue(); + final String tranCode = context.getProperty(PROP_TRAN_CODE).getValue(); + final int connectTimeout = context.getProperty(PROP_CONNECT_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue(); + final int readTimeout = context.getProperty(PROP_READ_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue(); + + ObjectNode rootNode = objectMapper.createObjectNode(); + ObjectNode reqHead = objectMapper.createObjectNode(); + Date now = new Date(); + + reqHead.put("custNo", custNo); + reqHead.put("userId", userId); + reqHead.put("tranCode", tranCode); + reqHead.put("serialNo", UUID.randomUUID().toString().replace("-", "")); + reqHead.put("reqDate", new SimpleDateFormat("yyyyMMdd").format(now)); + reqHead.put("reqTime", new SimpleDateFormat("HHmmssSSS").format(now)); + + rootNode.set("reqHead", reqHead); + rootNode.set("reqBody", reqBodyNode); + + String jsonPayload = objectMapper.writeValueAsString(rootNode); + String encodedJson = URLEncoder.encode(jsonPayload, "UTF-8"); + String finalUrl = baseClientUrl + "?reqJson=" + encodedJson; + + URL url = new URL(finalUrl); + HttpURLConnection connection = (HttpURLConnection) url.openConnection(); + connection.setRequestMethod("POST"); + connection.setConnectTimeout(connectTimeout); + connection.setReadTimeout(readTimeout); + connection.setRequestProperty("Accept", "*/*"); + connection.setDoOutput(true); + + try (OutputStream os = connection.getOutputStream()) { + os.write("".getBytes()); + } + + int responseCode = connection.getResponseCode(); + if (responseCode == HttpURLConnection.HTTP_OK) { + try (InputStream is = connection.getInputStream()) { + return new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8)).lines().collect(Collectors.joining("\n")); + } + } else { + try (InputStream es = connection.getErrorStream()) { + // 即便HTTP>400,也需要返回body,因为里面可能包含retCode + if (es != null) { + return new BufferedReader(new InputStreamReader(es, StandardCharsets.UTF_8)).lines().collect(Collectors.joining("\n")); + } else { + throw new IOException("HTTP请求失败,状态码: " + responseCode + ",且没有错误信息返回。"); + } + } + } + } +} \ No newline at end of file diff --git a/nifi-hzyadev-bundle/hzya-nifi-Zjnx-czb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-hzyadev-bundle/hzya-nifi-Zjnx-czb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor new file mode 100644 index 0000000..a4433a6 --- /dev/null +++ b/nifi-hzyadev-bundle/hzya-nifi-Zjnx-czb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -0,0 +1,15 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +com.hzya.frame.ZJRCGenericApiProcessor \ No newline at end of file diff --git a/nifi-hzyadev-bundle/hzya-nifi-Zjnx-czb-processors/src/test/java/com/hzya/frame/ZJRCGenericApiProcessorTest.java b/nifi-hzyadev-bundle/hzya-nifi-Zjnx-czb-processors/src/test/java/com/hzya/frame/ZJRCGenericApiProcessorTest.java new file mode 100644 index 0000000..25cd2fe --- /dev/null +++ b/nifi-hzyadev-bundle/hzya-nifi-Zjnx-czb-processors/src/test/java/com/hzya/frame/ZJRCGenericApiProcessorTest.java @@ -0,0 +1,129 @@ +package com.hzya.frame; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import okhttp3.mockwebserver.MockResponse; +import okhttp3.mockwebserver.MockWebServer; +import okhttp3.mockwebserver.RecordedRequest; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +/** + * @Author:liuyang + * @Package:com.hzya.frame + * @Project:nifi-hzyadev-bundle + * @name:ZJRCGenericApiProcessorTest + * @Date:2025/9/8 13:30 + * @Filename:ZJRCGenericApiProcessorTest + */ +public class ZJRCGenericApiProcessorTest { + private TestRunner runner; + private MockWebServer mockWebServer; + private final ObjectMapper objectMapper = new ObjectMapper(); + + @Before + public void setup() throws IOException { + // 1. 初始化 MockWebServer,它会模拟银行的HTTP接口 + mockWebServer = new MockWebServer(); + mockWebServer.start(); + + // 2. 初始化 NiFi 的 TestRunner + runner = TestRunners.newTestRunner(ZJRCGenericApiProcessor.class); + } + + @After + public void teardown() throws IOException { + // 3. 测试结束后关闭 MockWebServer + mockWebServer.shutdown(); + } + + @Test + public void testSuccessfulApiCall() throws Exception { + // --- Arrange (准备阶段) --- + + // 1. 配置处理器的属性 + runner.setProperty(ZJRCGenericApiProcessor.PROP_CLIENT_URL, "http://115.231.255.202:8433/ERPClient/receive"); + runner.setProperty(ZJRCGenericApiProcessor.PROP_CUST_NO, "2100140132"); + runner.setProperty(ZJRCGenericApiProcessor.PROP_USER_ID, "Y0000000122032"); + runner.setProperty(ZJRCGenericApiProcessor.PROP_TRAN_CODE, "300004"); + + // 2. 准备一个模拟的成功响应 (retCode: "0000") + String successfulResponseJson = "{" + + " \"resHead\": {" + + " \"retCode\": \"0000\"," + + " \"errMsg\": \"\"," + + " \"resDate\": \"20250908\"," + + " \"resTime\": \"133000123\"" + + " }," + + " \"resBody\": {" + + " \"recordTotal\": 1," + + " \"detailResBodyList\": [" + + " {" + + " \"bankAcc\": \"123456789\"," + + " \"amt\": 100.00" + + " }" + + " ]" + + " }" + + "}"; + mockWebServer.enqueue(new MockResponse() + .setResponseCode(100) // HTTP 200 OK + .setBody(successfulResponseJson) + .addHeader("Content-Type", "application/json; charset=utf-8")); + + // 3. 准备输入的FlowFile内容 (即reqBody) + String reqBodyJson = "{\n" + + " \"erpSysFlag\": \"0000\",\n" + + " \"bizContent\": {\n" + + " \"bankAccNo\": \"201000270605501\",\n" + + " \"queryBeginDate\": \"2021-01-01\",\n" + + " \"queryEndDate\": \"2021-01-31\",\n" + + " \"current\": 1,\n" + + " \"size\": 100\n" + + " }\n" + + " }"; + runner.enqueue(reqBodyJson); + + // --- Act (执行阶段) --- + + // 4. 运行处理器 + runner.run(); + + // --- Assert (断言阶段) --- + + // 5. 验证FlowFile是否被正确路由到了success关系 + runner.assertTransferCount(ZJRCGenericApiProcessor.REL_SUCCESS, 1); + runner.assertTransferCount(ZJRCGenericApiProcessor.REL_FAILURE, 0); + + // 6. 获取输出的FlowFile并验证其内容和属性 + MockFlowFile resultFlowFile = runner.getFlowFilesForRelationship(ZJRCGenericApiProcessor.REL_SUCCESS).get(0); + + // 验证输出内容是否是我们模拟的成功响应 + resultFlowFile.assertContentEquals(successfulResponseJson); + // 验证处理器是否添加了正确的属性 + resultFlowFile.assertAttributeEquals("zjrc.api.retCode", "0000"); + + // (可选) 验证处理器发送到银行的请求是否正确 + RecordedRequest recordedRequest = mockWebServer.takeRequest(); + assertNotNull(recordedRequest); + assertEquals("POST", recordedRequest.getMethod()); // 确认是POST请求 + + // 解析请求体,并验证reqHead中的字段是否由处理器自动生成并填充 + String receivedRequestBody = recordedRequest.getBody().readUtf8(); + JsonNode root = objectMapper.readTree(receivedRequestBody); + + JsonNode reqHead = root.path("reqHead"); + assertEquals("2100029034", reqHead.path("custNo").asText()); + assertEquals("Y001", reqHead.path("userId").asText()); + assertEquals("300004", reqHead.path("tranCode").asText()); + assertNotNull(reqHead.path("serialNo").asText()); // 确认serialNo已生成 + } +} \ No newline at end of file diff --git a/nifi-hzyadev-bundle/pom.xml b/nifi-hzyadev-bundle/pom.xml index a061eb5..cbdaf5b 100644 --- a/nifi-hzyadev-bundle/pom.xml +++ b/nifi-hzyadev-bundle/pom.xml @@ -25,6 +25,8 @@ hzya-nifi-AdvancedJoltTransformer-processors hzya-nifi-AutoAddOracleDatafile-nar hzya-nifi-AutoAddOracleDatafile-processors + hzya-nifi-Zjnx-czb-processors + hzya-nifi-Zjnx-czb-nar