diff --git a/nifi-hzyadev-bundle/hzya-nifi-U8CInterface-nar/pom.xml b/nifi-hzyadev-bundle/hzya-nifi-U8CInterface-nar/pom.xml new file mode 100644 index 0000000..06c121a --- /dev/null +++ b/nifi-hzyadev-bundle/hzya-nifi-U8CInterface-nar/pom.xml @@ -0,0 +1,34 @@ + + + + nifi-hzyadev-bundle + com.hzya + 1.0 + + 4.0.0 + nar + + hzya-nifi-U8CInterface-nar + + + 8 + 8 + + + + + com.hzya + hzya-nifi-U8CInterface-processors + 1.0 + + + org.apache.nifi + nifi-standard-services-api-nar + ${nifi-revision} + nar + + + + \ No newline at end of file diff --git a/nifi-hzyadev-bundle/hzya-nifi-U8CInterface-processors/pom.xml b/nifi-hzyadev-bundle/hzya-nifi-U8CInterface-processors/pom.xml new file mode 100644 index 0000000..ae99b72 --- /dev/null +++ b/nifi-hzyadev-bundle/hzya-nifi-U8CInterface-processors/pom.xml @@ -0,0 +1,55 @@ + + + + nifi-hzyadev-bundle + com.hzya + 1.0 + + 4.0.0 + + hzya-nifi-U8CInterface-processors + + + 8 + 8 + + + + + org.apache.nifi + nifi-api + ${nifi-revision} + + + org.apache.nifi + nifi-dbcp-service-api + ${nifi-revision} + + + + org.apache.nifi + nifi-processor-utils + 1.15.3 + + + org.apache.nifi + nifi-mock + ${nifi-revision} + test + + + junit + junit + 4.13 + test + + + + + + + + + \ No newline at end of file diff --git a/nifi-hzyadev-bundle/hzya-nifi-U8CInterface-processors/src/main/java/com/hzya/frame/DevU8CInterfaceProcessor.java b/nifi-hzyadev-bundle/hzya-nifi-U8CInterface-processors/src/main/java/com/hzya/frame/DevU8CInterfaceProcessor.java new file mode 100644 index 0000000..9ab5466 --- /dev/null +++ b/nifi-hzyadev-bundle/hzya-nifi-U8CInterface-processors/src/main/java/com/hzya/frame/DevU8CInterfaceProcessor.java @@ -0,0 +1,196 @@ +package com.hzya.frame; + +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.*; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.StreamCallback; + +import java.io.*; +import java.net.HttpURLConnection; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.util.*; +import java.util.concurrent.TimeUnit; + +/** + * @Author:liuyang + * @Package:com.hzya.frame + * @Project:nifi-hzyadev-bundle + * @name:DevU8CInterfaceProcessor + * @Date:2025/7/15 09:23 + * @Filename:DevU8CInterfaceProcessor + */ +@Tags({"http", "api", "u8c", "yonyou", "jdk1.8"}) +@CapabilityDescription("将FlowFile的内容作为HTTP POST请求发送到用友 U8C API端点,API参数通过FlowFile属性进行配置。原始JSON响应被写入输出FlowFile") +@ReadsAttributes({@ReadsAttribute(attribute = "u8cUrl", description = "U8C API终结点的URL"), @ReadsAttribute(attribute = "u8cApiUsercode", description = "API身份验证的用户代码。"), @ReadsAttribute(attribute = "u8cApiPassword", description = "API身份验证的密码。"), @ReadsAttribute(attribute = "u8cApiSystem", description = "API调用的系统标识符。"), @ReadsAttribute(attribute = "u8cApiTrantype", description = "API调用的事务类型"), @ReadsAttribute(attribute = "apiCode", description = "操作的特定API代码")}) +@WritesAttributes({@WritesAttribute(attribute = "invoke.status.code", description = "API调用返回的HTTP状态代码"), @WritesAttribute(attribute = "invoke.execution.time.ms", description = "API调用的执行时间(以毫秒为单位)")}) +public class DevU8CInterfaceProcessor extends AbstractProcessor { + + // NiFi关系成功 + public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("此处路由API调用成功的FlowFiles(2xx状态代码)").build(); + + // NiFi故障关系 + public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("此处路由API调用失败的FlowFiles").build(); + + private Set relationships; + + @Override + protected void init(final ProcessorInitializationContext context) { + final Set relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_FAILURE); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + public Set getRelationships() { + return this.relationships; + } + + // 此处理器没有自己的属性,因为所有配置都来自FlowFile属性 + @Override + public final List getSupportedPropertyDescriptors() { + return Collections.emptyList(); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final ComponentLog logger = getLogger(); + final long startNanos = System.nanoTime(); + + // 从 FlowFile 属性中获取配置参数 + final Map requiredAttributes = new HashMap<>(); + requiredAttributes.put("u8c_url", flowFile.getAttribute("u8cUrl")); + requiredAttributes.put("usercode", flowFile.getAttribute("u8cApiUsercode")); + requiredAttributes.put("password", flowFile.getAttribute("u8cApiPassword")); + requiredAttributes.put("system", flowFile.getAttribute("u8cApiSystem")); + requiredAttributes.put("trantype", flowFile.getAttribute("u8cApiTrantype")); +// requiredAttributes.put("apiCode", flowFile.getAttribute("apiCode")); + + // 验证参数是否为空 + for (Map.Entry entry : requiredAttributes.entrySet()) { + // isBlank check + if (entry.getValue() == null || entry.getValue().trim().isEmpty()) { + logger.error("FlowFile {} 验证失败:必需属性“{}”缺失或为空", new Object[]{flowFile, entry.getKey()}); + session.transfer(flowFile, REL_FAILURE); + return; + } + } + + final String u8cUrl = requiredAttributes.get("u8c_url"); + + // 读取 FlowFile 内容作为请求体 + final StringBuilder requestBodyBuilder = new StringBuilder(); + session.read(flowFile, in -> { + try (BufferedReader reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8))) { + String line; + while ((line = reader.readLine()) != null) { + requestBodyBuilder.append(line); + } + } + }); + final String requestBody = requestBodyBuilder.toString(); + + if (requestBody.isEmpty()) { + logger.error("FlowFile {} 内容为空,无法发出正文为空的POST请求", new Object[]{flowFile}); + session.transfer(flowFile, REL_FAILURE); + return; + } + + + HttpURLConnection conn = null; + try { + // 使用 JDK 1.8 的 HttpURLConnection 发送请求 + URL url = new URL(u8cUrl); + conn = (HttpURLConnection) url.openConnection(); + conn.setRequestMethod("POST"); + + // 设置请求头 + conn.setRequestProperty("Content-Type", "application/json; charset=utf-8"); + conn.setRequestProperty("Accept", "application/json"); + conn.setRequestProperty("usercode", requiredAttributes.get("usercode")); + conn.setRequestProperty("password", requiredAttributes.get("password")); + conn.setRequestProperty("system", requiredAttributes.get("system")); + conn.setRequestProperty("trantype", requiredAttributes.get("trantype")); +// conn.setRequestProperty("apiCode", requiredAttributes.get("apiCode")); + + // 设置超时时间 (10 minutes) + conn.setConnectTimeout(600000); + conn.setReadTimeout(600000); + + // 允许发送请求体 + conn.setDoOutput(true); + + // 写入请求体 + try (OutputStream os = conn.getOutputStream()) { + byte[] input = requestBody.getBytes(StandardCharsets.UTF_8); + os.write(input, 0, input.length); + } + + // 获取响应码 + final int statusCode = conn.getResponseCode(); + + // 根据响应码选择输入流 (正常流或错误流) + boolean isSuccess = statusCode >= 200 && statusCode < 300; + InputStream responseStream = isSuccess ? conn.getInputStream() : conn.getErrorStream(); + + if (responseStream == null) { + logger.error("对 {} 的API调用失败,状态代码为 {},但未返回响应主体", new Object[]{u8cUrl, statusCode}); + flowFile = session.putAttribute(flowFile, "invoke.status.code", String.valueOf(statusCode)); + session.transfer(flowFile, REL_FAILURE); + return; + } + + // 创建一个新的 FlowFile 来存储响应结果 + FlowFile responseFlowFile = session.create(flowFile); + final long executionTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); + + // 将响应流写入新的 FlowFile + responseFlowFile = session.write(responseFlowFile, out -> { + try (BufferedReader reader = new BufferedReader(new InputStreamReader(responseStream, StandardCharsets.UTF_8))) { + try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8))) { + char[] buffer = new char[4096]; + int length; + while ((length = reader.read(buffer)) != -1) { + writer.write(buffer, 0, length); + } + } + } + }); + + // 添加属性并路由 + responseFlowFile = session.putAttribute(responseFlowFile, "invoke.status.code", String.valueOf(statusCode)); + responseFlowFile = session.putAttribute(responseFlowFile, "invoke.execution.time.ms", String.valueOf(executionTime)); + + if (isSuccess) { + session.transfer(responseFlowFile, REL_SUCCESS); + } else { + session.transfer(responseFlowFile, REL_FAILURE); + } + session.remove(flowFile); // 消费原始 FlowFile + logger.info("已成功调用FlowFile {} 的U8C API,状态:{} 执行时间:{} 毫秒", new Object[]{flowFile.getAttribute("uuid"), statusCode, executionTime}); + + } catch (IOException e) { + logger.error("由于 {},无法调用FlowFile {} 的U8C API,路由失败", new Object[]{flowFile, e.getMessage()}, e); + session.transfer(flowFile, REL_FAILURE); + } finally { + // 确保关闭连接 + if (conn != null) { + conn.disconnect(); + } + } + } +} \ No newline at end of file diff --git a/nifi-hzyadev-bundle/hzya-nifi-U8CInterface-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-hzyadev-bundle/hzya-nifi-U8CInterface-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor new file mode 100644 index 0000000..dfe1aac --- /dev/null +++ b/nifi-hzyadev-bundle/hzya-nifi-U8CInterface-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -0,0 +1,15 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +com.hzya.frame.DevU8CInterfaceProcessor \ No newline at end of file diff --git a/nifi-hzyadev-bundle/hzya-nifi-U8CInterface-processors/src/test/java/com/hzya/frame/DevU8CInterfaceProcessorTest.java b/nifi-hzyadev-bundle/hzya-nifi-U8CInterface-processors/src/test/java/com/hzya/frame/DevU8CInterfaceProcessorTest.java new file mode 100644 index 0000000..8d85138 --- /dev/null +++ b/nifi-hzyadev-bundle/hzya-nifi-U8CInterface-processors/src/test/java/com/hzya/frame/DevU8CInterfaceProcessorTest.java @@ -0,0 +1,36 @@ +//package com.hzya.frame; +// +//import okhttp3.mockwebserver.MockResponse; +//import okhttp3.mockwebserver.MockWebServer; +//import okhttp3.mockwebserver.RecordedRequest; +//import org.apache.nifi.util.MockFlowFile; +//import org.apache.nifi.util.TestRunner; +//import org.apache.nifi.util.TestRunners; +//import org.junit.After; +//import org.junit.Before; +//import org.junit.Test; +// +//import java.io.IOException; +//import java.util.HashMap; +//import java.util.Map; +// +//import static org.junit.Assert.assertEquals; +// +// +///** +// * @Author:liuyang +// * @Package:com.hzya.frame +// * @Project:nifi-hzyadev-bundle +// * @name:DevU8CInterfaceProcessorTest +// * @Date:2025/7/15 09:52 +// * @Filename:DevU8CInterfaceProcessorTest +// */ +//public class DevU8CInterfaceProcessorTest { +// +// private TestRunner runner; +// private MockWebServer mockWebServer; +// +// @Test +// public void onTrigger() { +// } +//} \ No newline at end of file diff --git a/nifi-hzyadev-bundle/pom.xml b/nifi-hzyadev-bundle/pom.xml index 160103a..cc38ff8 100644 --- a/nifi-hzyadev-bundle/pom.xml +++ b/nifi-hzyadev-bundle/pom.xml @@ -17,6 +17,8 @@ hzya-nifi-JsonSplitter-nar hzya-nifi-DevGeneratePaginatedSqlProcessor-nar hzya-nifi-AutoJsonTableCreate-nar + hzya-nifi-U8CInterface-nar + hzya-nifi-U8CInterface-processors