feat(nifi): 添加 U8C 接口处理器

- 新增 hzya-nifi-U8CInterface-nar 模块
- 新增 hzya-nifi-U8CInterface-processors 模块
- 实现 DevU8CInterfaceProcessor 用于调用用友 U8C API
- 添加相关依赖和测试用例
This commit is contained in:
liuy 2025-07-18 14:06:50 +08:00
parent b8b44952ae
commit c016d949b6
6 changed files with 338 additions and 0 deletions

View File

@ -0,0 +1,34 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>nifi-hzyadev-bundle</artifactId>
<groupId>com.hzya</groupId>
<version>1.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<packaging>nar</packaging>
<artifactId>hzya-nifi-U8CInterface-nar</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>com.hzya</groupId>
<artifactId>hzya-nifi-U8CInterface-processors</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-services-api-nar</artifactId>
<version>${nifi-revision}</version>
<type>nar</type>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,55 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>nifi-hzyadev-bundle</artifactId>
<groupId>com.hzya</groupId>
<version>1.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>hzya-nifi-U8CInterface-processors</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
<version>${nifi-revision}</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-dbcp-service-api</artifactId>
<version>${nifi-revision}</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId>
<version>1.15.3</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<version>${nifi-revision}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13</version>
<scope>test</scope>
</dependency>
<!-- <dependency>-->
<!-- <groupId>com.squareup.okhttp3</groupId>-->
<!-- <artifactId>mockwebserver</artifactId> &lt;!&ndash; 注意是mockwebserver而非okhttp &ndash;&gt;-->
<!-- <version>5.1.0</version> &lt;!&ndash; 版本与主库一致 &ndash;&gt;-->
<!-- <scope>test</scope> &lt;!&ndash; 测试专用 &ndash;&gt;-->
<!-- </dependency>-->
</dependencies>
</project>

View File

@ -0,0 +1,196 @@
package com.hzya.frame;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.*;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.StreamCallback;
import java.io.*;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.TimeUnit;
/**
* @Authorliuyang
* @Packagecom.hzya.frame
* @Projectnifi-hzyadev-bundle
* @nameDevU8CInterfaceProcessor
* @Date2025/7/15 09:23
* @FilenameDevU8CInterfaceProcessor
*/
@Tags({"http", "api", "u8c", "yonyou", "jdk1.8"})
@CapabilityDescription("将FlowFile的内容作为HTTP POST请求发送到用友 U8C API端点API参数通过FlowFile属性进行配置。原始JSON响应被写入输出FlowFile")
@ReadsAttributes({@ReadsAttribute(attribute = "u8cUrl", description = "U8C API终结点的URL"), @ReadsAttribute(attribute = "u8cApiUsercode", description = "API身份验证的用户代码。"), @ReadsAttribute(attribute = "u8cApiPassword", description = "API身份验证的密码。"), @ReadsAttribute(attribute = "u8cApiSystem", description = "API调用的系统标识符。"), @ReadsAttribute(attribute = "u8cApiTrantype", description = "API调用的事务类型"), @ReadsAttribute(attribute = "apiCode", description = "操作的特定API代码")})
@WritesAttributes({@WritesAttribute(attribute = "invoke.status.code", description = "API调用返回的HTTP状态代码"), @WritesAttribute(attribute = "invoke.execution.time.ms", description = "API调用的执行时间以毫秒为单位")})
public class DevU8CInterfaceProcessor extends AbstractProcessor {
// NiFi关系成功
public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("此处路由API调用成功的FlowFiles2xx状态代码").build();
// NiFi故障关系
public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("此处路由API调用失败的FlowFiles").build();
private Set<Relationship> relationships;
@Override
protected void init(final ProcessorInitializationContext context) {
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
relationships.add(REL_FAILURE);
this.relationships = Collections.unmodifiableSet(relationships);
}
@Override
public Set<Relationship> getRelationships() {
return this.relationships;
}
// 此处理器没有自己的属性因为所有配置都来自FlowFile属性
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return Collections.emptyList();
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final ComponentLog logger = getLogger();
final long startNanos = System.nanoTime();
// FlowFile 属性中获取配置参数
final Map<String, String> requiredAttributes = new HashMap<>();
requiredAttributes.put("u8c_url", flowFile.getAttribute("u8cUrl"));
requiredAttributes.put("usercode", flowFile.getAttribute("u8cApiUsercode"));
requiredAttributes.put("password", flowFile.getAttribute("u8cApiPassword"));
requiredAttributes.put("system", flowFile.getAttribute("u8cApiSystem"));
requiredAttributes.put("trantype", flowFile.getAttribute("u8cApiTrantype"));
// requiredAttributes.put("apiCode", flowFile.getAttribute("apiCode"));
// 验证参数是否为空
for (Map.Entry<String, String> entry : requiredAttributes.entrySet()) {
// isBlank check
if (entry.getValue() == null || entry.getValue().trim().isEmpty()) {
logger.error("FlowFile {} 验证失败:必需属性“{}”缺失或为空", new Object[]{flowFile, entry.getKey()});
session.transfer(flowFile, REL_FAILURE);
return;
}
}
final String u8cUrl = requiredAttributes.get("u8c_url");
// 读取 FlowFile 内容作为请求体
final StringBuilder requestBodyBuilder = new StringBuilder();
session.read(flowFile, in -> {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8))) {
String line;
while ((line = reader.readLine()) != null) {
requestBodyBuilder.append(line);
}
}
});
final String requestBody = requestBodyBuilder.toString();
if (requestBody.isEmpty()) {
logger.error("FlowFile {} 内容为空无法发出正文为空的POST请求", new Object[]{flowFile});
session.transfer(flowFile, REL_FAILURE);
return;
}
HttpURLConnection conn = null;
try {
// 使用 JDK 1.8 HttpURLConnection 发送请求
URL url = new URL(u8cUrl);
conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod("POST");
// 设置请求头
conn.setRequestProperty("Content-Type", "application/json; charset=utf-8");
conn.setRequestProperty("Accept", "application/json");
conn.setRequestProperty("usercode", requiredAttributes.get("usercode"));
conn.setRequestProperty("password", requiredAttributes.get("password"));
conn.setRequestProperty("system", requiredAttributes.get("system"));
conn.setRequestProperty("trantype", requiredAttributes.get("trantype"));
// conn.setRequestProperty("apiCode", requiredAttributes.get("apiCode"));
// 设置超时时间 (10 minutes)
conn.setConnectTimeout(600000);
conn.setReadTimeout(600000);
// 允许发送请求体
conn.setDoOutput(true);
// 写入请求体
try (OutputStream os = conn.getOutputStream()) {
byte[] input = requestBody.getBytes(StandardCharsets.UTF_8);
os.write(input, 0, input.length);
}
// 获取响应码
final int statusCode = conn.getResponseCode();
// 根据响应码选择输入流 (正常流或错误流)
boolean isSuccess = statusCode >= 200 && statusCode < 300;
InputStream responseStream = isSuccess ? conn.getInputStream() : conn.getErrorStream();
if (responseStream == null) {
logger.error("对 {} 的API调用失败状态代码为 {},但未返回响应主体", new Object[]{u8cUrl, statusCode});
flowFile = session.putAttribute(flowFile, "invoke.status.code", String.valueOf(statusCode));
session.transfer(flowFile, REL_FAILURE);
return;
}
// 创建一个新的 FlowFile 来存储响应结果
FlowFile responseFlowFile = session.create(flowFile);
final long executionTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
// 将响应流写入新的 FlowFile
responseFlowFile = session.write(responseFlowFile, out -> {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(responseStream, StandardCharsets.UTF_8))) {
try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8))) {
char[] buffer = new char[4096];
int length;
while ((length = reader.read(buffer)) != -1) {
writer.write(buffer, 0, length);
}
}
}
});
// 添加属性并路由
responseFlowFile = session.putAttribute(responseFlowFile, "invoke.status.code", String.valueOf(statusCode));
responseFlowFile = session.putAttribute(responseFlowFile, "invoke.execution.time.ms", String.valueOf(executionTime));
if (isSuccess) {
session.transfer(responseFlowFile, REL_SUCCESS);
} else {
session.transfer(responseFlowFile, REL_FAILURE);
}
session.remove(flowFile); // 消费原始 FlowFile
logger.info("已成功调用FlowFile {} 的U8C API状态{} 执行时间:{} 毫秒", new Object[]{flowFile.getAttribute("uuid"), statusCode, executionTime});
} catch (IOException e) {
logger.error("由于 {}无法调用FlowFile {} 的U8C API,路由失败", new Object[]{flowFile, e.getMessage()}, e);
session.transfer(flowFile, REL_FAILURE);
} finally {
// 确保关闭连接
if (conn != null) {
conn.disconnect();
}
}
}
}

View File

@ -0,0 +1,15 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
com.hzya.frame.DevU8CInterfaceProcessor

View File

@ -0,0 +1,36 @@
//package com.hzya.frame;
//
//import okhttp3.mockwebserver.MockResponse;
//import okhttp3.mockwebserver.MockWebServer;
//import okhttp3.mockwebserver.RecordedRequest;
//import org.apache.nifi.util.MockFlowFile;
//import org.apache.nifi.util.TestRunner;
//import org.apache.nifi.util.TestRunners;
//import org.junit.After;
//import org.junit.Before;
//import org.junit.Test;
//
//import java.io.IOException;
//import java.util.HashMap;
//import java.util.Map;
//
//import static org.junit.Assert.assertEquals;
//
//
///**
// * @Authorliuyang
// * @Packagecom.hzya.frame
// * @Projectnifi-hzyadev-bundle
// * @nameDevU8CInterfaceProcessorTest
// * @Date2025/7/15 09:52
// * @FilenameDevU8CInterfaceProcessorTest
// */
//public class DevU8CInterfaceProcessorTest {
//
// private TestRunner runner;
// private MockWebServer mockWebServer;
//
// @Test
// public void onTrigger() {
// }
//}

View File

@ -17,6 +17,8 @@
<module>hzya-nifi-JsonSplitter-nar</module> <module>hzya-nifi-JsonSplitter-nar</module>
<module>hzya-nifi-DevGeneratePaginatedSqlProcessor-nar</module> <module>hzya-nifi-DevGeneratePaginatedSqlProcessor-nar</module>
<module>hzya-nifi-AutoJsonTableCreate-nar</module> <module>hzya-nifi-AutoJsonTableCreate-nar</module>
<module>hzya-nifi-U8CInterface-nar</module>
<module>hzya-nifi-U8CInterface-processors</module>
</modules> </modules>
<parent> <parent>