Compare commits

...

4 Commits

Author SHA1 Message Date
liuy e535d45df3 feat(nifi): 添加参数上下文功能并优化错误处理
- 新增参数上下文相关模型类- 实现参数上下文创建、更新、删除和查询的API接口
- 优化错误处理,提高异常信息的可读性
- 移除未使用的ProcessorConfig类
2025-05-14 19:09:55 +08:00
liuy 5d9fb90fb1 refactor(fw-nifi):重构 Nifi 相关代码- 更新 Jackson 配置,增加忽略未知属性的功能
- 新增多个 Nifi 模型类,包括 Breadcrumb2、Breadcrumb3、ParentBreadcrumb2 和 Permissions2
- 修改现有模型类,增加新字段以适应 Nifi API 变更
- 调整 NifiApiService 中的 queryFlowProcessGroupsRoot 方法,支持返回不同类型的响应
2025-05-14 18:27:58 +08:00
liuy 10aeddff70 (nfeatifi): 添加 NiFi API 客户端和相关模型类
- 新增 NiFi API客户端类 NifiClient,实现与 NiFi 服务器的通信
- 添加多个 NiFi 模型类,用于解析和处理 NiFi API 响应数据
- 新增 Jackson 配置类和 JSON 工具类,用于 JSON 数据处理
- 移除无用的 Main 类
2025-05-14 17:49:37 +08:00
xiang2lin 7262d4f9a3 fw-nifi模块初始化 2025-05-12 14:17:58 +08:00
120 changed files with 1886 additions and 1 deletions

61
fw-nifi/pom.xml Normal file
View File

@ -0,0 +1,61 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>com.hzya.frame</groupId>
<artifactId>kangarooDataCenterV3</artifactId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>fw-nifi</artifactId>
<packaging>jar</packaging>
<version>${revision}</version>
<dependencies>
<dependency>
<groupId>com.hzya.frame</groupId>
<artifactId>base-service</artifactId>
<version>${revision}</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>mysql</groupId>-->
<!-- <artifactId>mysql-connector-java</artifactId>-->
<!-- <version>${mysql-connector-java}</version>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>com.ibm.jsonata4java</groupId>-->
<!-- <artifactId>JSONata4Java</artifactId>-->
<!-- </dependency>-->
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<!-- <version>4.10.0</version>-->
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<mainClass>none</mainClass> <!-- 取消查找本项目下的Main方法为了解决Unable to find main class的问题 -->
<classifier>execute</classifier> <!-- 为了解决依赖模块找不到此模块中的类或属性 -->
<skip>true</skip>
</configuration>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,11 @@
//package com.hzya.frame.nifi;
//
//import org.springframework.boot.SpringApplication;
//import org.springframework.boot.autoconfigure.SpringBootApplication;
//
//@SpringBootApplication(scanBasePackages = "com.hzya.frame.nifi")
//public class NifiApplication {
// public static void main(String[] args) {
// SpringApplication.run(NifiApplication.class, args);
// }
//}

View File

@ -0,0 +1,169 @@
package com.hzya.frame.nifi.client;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.hzya.frame.nifi.config.NifiServiceConfig;
import okhttp3.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocketFactory;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.security.SecureRandom;
import java.security.cert.X509Certificate;
import java.util.concurrent.atomic.AtomicReference;
/**
* @Authorliuyang
* @Packagecom.hzya.frame.nifi.client
* @Projectfw-nifi
* @nameNifiClient
* @Date2025/5/14 10:00
* @FilenameNifiClient
*/
@Component
public class NifiClient {
private final NifiServiceConfig config;
private final OkHttpClient httpClient;
private final ObjectMapper objectMapper;
private final AtomicReference<String> accessToken = new AtomicReference<>();
private volatile long tokenIssueTime; // Token 发行时间毫秒
private volatile long tokenExpiration; // Token 过期时间毫秒
@Autowired
public NifiClient(NifiServiceConfig config, OkHttpClient httpClient, ObjectMapper objectMapper) {
this.config = config;
this.httpClient = httpClient;
this.objectMapper = objectMapper;
initializeToken();
}
private void initializeToken() {
try {
accessToken.set(getAccessToken());
tokenIssueTime = System.currentTimeMillis();
tokenExpiration = tokenIssueTime + (12 * 60 * 60 * 1000); // 默认 12 小时
System.out.println("令牌已初始化,过期时间:" + new java.util.Date(tokenExpiration));
} catch (Exception e) {
throw new RuntimeException("初始化NiFi访问令牌失败 " + e.getMessage());
}
}
public <T> T get(String path, Class<T> responseType) throws IOException {
return executeRequestWithRetry(() -> {
Request request = new Request.Builder().url(config.getApiUrl() + path).get().header("Authorization", "Bearer " + accessToken.get()).build();
return executeRequest(request, responseType);
});
}
public <T> T post(String path, Object requestBody, Class<T> responseType) throws IOException {
String jsonBody = objectMapper.writeValueAsString(requestBody);
RequestBody body = RequestBody.create(jsonBody, MediaType.get("application/json; charset=utf-8"));
return executeRequestWithRetry(() -> {
Request request = new Request.Builder().url(config.getApiUrl() + path).post(body).header("Authorization", "Bearer " + accessToken.get()).build();
return executeRequest(request, responseType);
});
}
private <T> T executeRequestWithRetry(IOExceptionRunnable<T> runnable) throws IOException {
int maxRetries = 2;
for (int attempt = 0; attempt < maxRetries; attempt++) {
try {
checkAndRefreshToken();
return runnable.run();
} catch (IOException e) {
if (attempt == maxRetries - 1 || !isTokenExpiredError(e)) {
throw e;
}
System.out.println("令牌可能已过期,请刷新并重试。。。");
try {
accessToken.set(getAccessToken());
tokenIssueTime = System.currentTimeMillis();
tokenExpiration = tokenIssueTime + (12 * 60 * 60 * 1000); // 默认 12 小时
} catch (Exception ex) {
throw new IOException("刷新令牌失败: " + ex.getMessage(), e);
}
}
}
throw new IOException("已达到最大重试次数");
}
private void checkAndRefreshToken() throws IOException {
long currentTime = System.currentTimeMillis();
if (currentTime > tokenExpiration - 300000) { // 提前 5 分钟刷新
try {
accessToken.set(getAccessToken());
tokenIssueTime = System.currentTimeMillis();
tokenExpiration = tokenIssueTime + (12 * 60 * 60 * 1000); // 默认 12 小时
System.out.println("令牌已刷新,到期时间: " + new java.util.Date(tokenExpiration));
} catch (Exception e) {
throw new IOException("刷新令牌失败: " + e.getMessage());
}
}
}
private boolean isTokenExpiredError(IOException e) {
// 判断是否为 401 错误Token 过期
return e.getMessage().contains("401") || e.getMessage().contains("Unauthorized");
}
private <T> T executeRequest(Request request, Class<T> responseType) throws IOException {
try (Response response = httpClient.newCall(request).execute()) {
String responseBody = response.body().string();
if (!response.isSuccessful()) {
throw new IOException("意外的响应码: " + responseBody);
}
return objectMapper.readValue(responseBody, responseType);
}
}
public synchronized String getAccessToken() throws Exception {
// 创建信任所有证书的 OkHttpClient
// OkHttpClient client = httpClient.newBuilder().sslSocketFactory(createTrustAllSslSocketFactory(), createTrustAllTrustManager()).hostnameVerifier((hostname, session) -> true).build();
MediaType mediaType = MediaType.parse("application/x-www-form-urlencoded");
RequestBody body = RequestBody.create(mediaType, "username=" + config.getUsername() + "&password=" + config.getPassword());
Request request = new Request.Builder().url(config.getApiUrl() + "/access/token").post(body).addHeader("Content-Type", "application/x-www-form-urlencoded").addHeader("Accept", "*/*").addHeader("User-Agent", "fw-nifi-client/1.0").addHeader("Connection", "keep-alive").build();
try (Response response = httpClient.newCall(request).execute()) {
if (!response.isSuccessful()) {
throw new IOException("获取访问令牌失败,响应码: " + response.code() + " - " + response.message());
}
byte[] bytes = response.body().bytes();
String newToken = new String(bytes, StandardCharsets.UTF_8);
return newToken;
}
}
// private SSLSocketFactory createTrustAllSslSocketFactory() throws Exception {
// SSLContext sslContext = SSLContext.getInstance("SSL");
// sslContext.init(null, new TrustManager[]{createTrustAllTrustManager()}, new SecureRandom());
// return sslContext.getSocketFactory();
// }
// private X509TrustManager createTrustAllTrustManager() {
// return new X509TrustManager() {
// @Override
// public void checkClientTrusted(X509Certificate[] chain, String authType) {
// }
//
// @Override
// public void checkServerTrusted(X509Certificate[] chain, String authType) {
// }
//
// @Override
// public X509Certificate[] getAcceptedIssuers() {
// return new X509Certificate[0];
// }
// };
// }
// 功能接口用于重试逻辑
@FunctionalInterface
private interface IOExceptionRunnable<T> {
T run() throws IOException;
}
}

View File

@ -0,0 +1,59 @@
package com.hzya.frame.nifi.config;
import okhttp3.OkHttpClient;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocketFactory;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
import java.security.SecureRandom;
import java.security.cert.X509Certificate;
import java.util.concurrent.TimeUnit;
/**
* OkHttpClient配置类
* @Authorliuyang
* @Packagecom.hzya.frame.nifi.config
* @Projectfw-nifi
* @nameHttpClientConfig
* @Date2025/5/14 11:45
* @FilenameHttpClientConfig
*/
@Configuration
public class HttpClientConfig {
@Bean
public OkHttpClient okHttpClient() throws Exception {
return new OkHttpClient.Builder().sslSocketFactory(createTrustAllSslSocketFactory(), createTrustAllTrustManager())
.hostnameVerifier((hostname, session) -> true)
.connectTimeout(60, TimeUnit.SECONDS) // 连接超时时间
.readTimeout(60, TimeUnit.SECONDS) // 读取超时时间
.writeTimeout(60, TimeUnit.SECONDS) // 写入超时时间
.build();
}
private SSLSocketFactory createTrustAllSslSocketFactory() throws Exception {
SSLContext sslContext = SSLContext.getInstance("SSL");
sslContext.init(null, new TrustManager[]{createTrustAllTrustManager()}, new SecureRandom());
return sslContext.getSocketFactory();
}
private X509TrustManager createTrustAllTrustManager() {
return new X509TrustManager() {
@Override
public void checkClientTrusted(X509Certificate[] chain, String authType) {
}
@Override
public void checkServerTrusted(X509Certificate[] chain, String authType) {
}
@Override
public X509Certificate[] getAcceptedIssuers() {
return new X509Certificate[0];
}
};
}
}

View File

@ -0,0 +1,29 @@
package com.hzya.frame.nifi.config;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* jackson配置类
*
* @Authorliuyang
* @Packagecom.hzya.frame.nifi.config
* @Projectfw-nifi
* @nameJacksonConfig
* @Date2025/5/14 11:54
* @FilenameJacksonConfig
*/
@Configuration
public class JacksonConfig {
@Bean
public ObjectMapper objectMapper() {
ObjectMapper objectMapper = new ObjectMapper();
// 自动注册模块
objectMapper.findAndRegisterModules();
// 忽略 JSON 中存在但实体类中缺少的字段
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
return objectMapper;
}
}

View File

@ -0,0 +1,51 @@
package com.hzya.frame.nifi.config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
/**
* nifi service配置类
*
* @Authorliuyang
* @Packagecom.hzya.frame.nifi.config
* @Projectfw-nifi
* @nameNifiConfig
* @Date2025/5/14 09:59
* @FilenameNifiConfig
*/
@Configuration
public class NifiServiceConfig {
@Value("${nifi.api.url:https://192.168.2.233:8443/nifi-api}")
private String apiUrl;
@Value("${nifi.api.username:hzya}")
private String username;
@Value("${nifi.api.password:hzya1314*nifi}")
private String password;
public String getApiUrl() {
return apiUrl;
}
public void setApiUrl(String apiUrl) {
this.apiUrl = apiUrl;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
}

View File

@ -0,0 +1,17 @@
package com.hzya.frame.nifi.model.joinparametercontexts;
import lombok.Data;
import java.util.List;
/**
* Auto-generated: 2025-05-14 18:33:21
*
* @author bejson.com (i@bejson.com)
* @website http://www.bejson.com/java2pojo/
*/
@Data
public class Component {
private String name;
private List<Parameters> parameters;
}

View File

@ -0,0 +1,11 @@
package com.hzya.frame.nifi.model.joinparametercontexts;
import lombok.Data;
@Data
public class Parameter {
private String name;
private String value;
private boolean sensitive;
private String description;
}

View File

@ -0,0 +1,9 @@
package com.hzya.frame.nifi.model.joinparametercontexts;
import lombok.Data;
@Data
public class ParameterContextsJoin {
private Revision revision;
private Component component;
}

View File

@ -0,0 +1,11 @@
/**
* Copyright 2025 bejson.com
*/
package com.hzya.frame.nifi.model.joinparametercontexts;
import lombok.Data;
@Data
public class Parameters {
private Parameter parameter;
}

View File

@ -0,0 +1,8 @@
package com.hzya.frame.nifi.model.joinparametercontexts;
import lombok.Data;
@Data
public class Revision {
private int version;
}

View File

@ -0,0 +1,30 @@
package com.hzya.frame.nifi.model.processgroupid;
import lombok.Data;
@Data
public class AggregateSnapshot {
private String id;
private String groupId;
private String name;
private String type;
private String runStatus;
private String executionNode;
private String bytesRead;
private String bytesWritten;
private String read;
private String written;
private String flowFilesIn;
private String bytesIn;
private String input;
private String flowFilesOut;
private String bytesOut;
private String output;
private String taskCount;
private String tasksDurationNanos;
private String tasks;
private String tasksDuration;
private String activeThreadCount;
private String terminatedThreadCount;
private ProcessingPerformanceStatus processingPerformanceStatus;
}

View File

@ -0,0 +1,10 @@
package com.hzya.frame.nifi.model.processgroupid;
import lombok.Data;
@Data
public class AllowableValue {
private String displayName;
private String value;
private String description;
}

View File

@ -0,0 +1,9 @@
package com.hzya.frame.nifi.model.processgroupid;
import lombok.Data;
@Data
public class AllowableValues {
private AllowableValue allowableValue;
private String canRead;
}

View File

@ -0,0 +1,18 @@
package com.hzya.frame.nifi.model.processgroupid;
import lombok.Data;
import java.util.List;
@Data
public class AttributesToIgnore {
private String name;
private String displayName;
private String description;
private String required;
private String sensitive;
private String dynamic;
private String supportsEl;
private String expressionLanguageScope;
private List<String> dependencies;
}

View File

@ -0,0 +1,18 @@
package com.hzya.frame.nifi.model.processgroupid;
import lombok.Data;
import java.util.List;
@Data
public class AttributesToIgnoreRegex {
private String name;
private String displayName;
private String description;
private String required;
private String sensitive;
private String dynamic;
private String supportsEl;
private String expressionLanguageScope;
private List<String> dependencies;
}

View File

@ -0,0 +1,18 @@
package com.hzya.frame.nifi.model.processgroupid;
import lombok.Data;
import java.util.List;
@Data
public class AttributesToLog {
private String name;
private String displayName;
private String description;
private String required;
private String sensitive;
private String dynamic;
private String supportsEl;
private String expressionLanguageScope;
private List<String> dependencies;
}

View File

@ -0,0 +1,19 @@
package com.hzya.frame.nifi.model.processgroupid;
import lombok.Data;
import java.util.List;
@Data
public class AttributesToLogRegex {
private String name;
private String displayName;
private String description;
private String defaultValue;
private String required;
private String sensitive;
private String dynamic;
private String supportsEl;
private String expressionLanguageScope;
private List<String> dependencies;
}

View File

@ -0,0 +1,9 @@
package com.hzya.frame.nifi.model.processgroupid;
import lombok.Data;
@Data
public class Breadcrumb {
private String id;
private String name;
}

View File

@ -0,0 +1,11 @@
package com.hzya.frame.nifi.model.processgroupid;
import lombok.Data;
@Data
public class Breadcrumb2 {
private String id;
private Permissions2 permissions;
private Breadcrumb3 breadcrumb;
private ParentBreadcrumb2 parentBreadcrumb;
}

View File

@ -0,0 +1,17 @@
package com.hzya.frame.nifi.model.processgroupid;
import lombok.Data;
/**
* @Authorliuyang
* @Packagecom.hzya.frame.nifi.model.processgroupid
* @Projectfw-nifi
* @nameBreadcrumb3
* @Date2025/5/14 17:57
* @FilenameBreadcrumb3
*/
@Data
public class Breadcrumb3 {
private String id;
private String name;
}

View File

@ -0,0 +1,15 @@
package com.hzya.frame.nifi.model.processgroupid;
import lombok.Data;
@Data
public class Bulletin {
private String id;
private String category;
private String groupId;
private String sourceId;
private String sourceName;
private String level;
private String message;
private String timestamp;
}

View File

@ -0,0 +1,13 @@
package com.hzya.frame.nifi.model.processgroupid;
import lombok.Data;
@Data
public class Bulletins {
private String id;
private String groupId;
private String sourceId;
private String timestamp;
private String canRead;
private Bulletin bulletin;
}

View File

@ -0,0 +1,10 @@
package com.hzya.frame.nifi.model.processgroupid;
import lombok.Data;
@Data
public class Bundle {
private String group;
private String artifact;
private String version;
}

View File

@ -0,0 +1,19 @@
package com.hzya.frame.nifi.model.processgroupid;
import lombok.Data;
import java.util.List;
@Data
public class CharacterSet {
private String name;
private String displayName;
private String description;
private String defaultValue;
private String required;
private String sensitive;
private String dynamic;
private String supportsEl;
private String expressionLanguageScope;
private List<String> dependencies;
}

View File

@ -0,0 +1,9 @@
package com.hzya.frame.nifi.model.processgroupid;
import lombok.Data;
@Data
public class Component {
private String id;
private String name;
}

View File

@ -0,0 +1,32 @@
package com.hzya.frame.nifi.model.processgroupid;
import lombok.Data;
import java.util.List;
@Data
public class Component2 {
private String id;
private String name;
private String versionedComponentId;
private String parentGroupId;
private Position position;
private String type;
private Bundle bundle;
private String state;
private Object style;
private List<Relationships> relationships;
private String supportsParallelProcessing;
private String supportsEventDriven;
private String supportsBatching;
private String supportsSensitiveDynamicProperties;
private String persistsState;
private String restricted;
private String deprecated;
private String executionNodeRestricted;
private String multipleVersionsAvailable;
private String inputRequirement;
private String validationStatus;
private String extensionMissing;
private Config config;
}

View File

@ -0,0 +1,27 @@
package com.hzya.frame.nifi.model.processgroupid;
import lombok.Data;
import java.util.List;
@Data
public class Config {
private Properties properties;
private Descriptors descriptors;
private String schedulingPeriod;
private String schedulingStrategy;
private String executionNode;
private String penaltyDuration;
private String yieldDuration;
private String bulletinLevel;
private String runDurationMillis;
private String concurrentlySchedulableTaskCount;
private String comments;
private String lossTolerant;
private DefaultConcurrentTasks defaultConcurrentTasks;
private DefaultSchedulingPeriod defaultSchedulingPeriod;
private String retryCount;
private List<String> retriedRelationships;
private String backoffMechanism;
private String maxBackoffPeriod;
}

View File

@ -0,0 +1,24 @@
package com.hzya.frame.nifi.model.processgroupid;
import lombok.Data;
import java.util.List;
@Data
public class Connections {
private Revision revision;
private String id;
private String uri;
private Permissions permissions;
private Component component;
private Status status;
private List<String> bends;
private String labelIndex;
private String zIndex;
private String sourceId;
private String sourceGroupId;
private String sourceType;
private String destinationId;
private String destinationGroupId;
private String destinationType;
}

View File

@ -0,0 +1,14 @@
package com.hzya.frame.nifi.model.processgroupid;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;
@Data
public class DefaultConcurrentTasks {
@JsonProperty("TIMER_DRIVEN")
private String tIMERDRIVEN;
@JsonProperty("EVENT_DRIVEN")
private String eVENTDRIVEN;
@JsonProperty("CRON_DRIVEN")
private String cRONDRIVEN;
}

View File

@ -0,0 +1,12 @@
package com.hzya.frame.nifi.model.processgroupid;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;
@Data
public class DefaultSchedulingPeriod {
@JsonProperty("TIMER_DRIVEN")
private String tIMERDRIVEN;
@JsonProperty("CRON_DRIVEN")
private String cRONDRIVEN;
}

View File

@ -0,0 +1,37 @@
package com.hzya.frame.nifi.model.processgroupid;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;
@Data
public class Descriptors {
@JsonProperty("Log Level")
private LogLevel logLevel;
@JsonProperty("Log Payload")
private LogPayload logPayload;
@JsonProperty("Attributes to Log")
private AttributesToLog attributesToLog;
@JsonProperty("attributes-to-log-regex")
private AttributesToLogRegex attributesToLogRegex;
@JsonProperty("Attributes to Ignore")
private AttributesToIgnore attributesToIgnore;
@JsonProperty("attributes-to-ignore-regex")
private AttributesToIgnoreRegex attributesToIgnoreRegex;
@JsonProperty("Log FlowFile Properties")
private LogFlowFileProperties logFlowFileProperties;
@JsonProperty("Output Format")
private OutputFormat outputFormat;
@JsonProperty("Log prefix")
private LogPrefix logprefix;
@JsonProperty("character-set")
private CharacterSet characterSet;
}

View File

@ -0,0 +1,14 @@
package com.hzya.frame.nifi.model.processgroupid;
import lombok.Data;
@Data
public class Destination {
private String id;
private String versionedComponentId;
private String type;
private String groupId;
private String name;
private String running;
private String comments;
}

View File

@ -0,0 +1,9 @@
package com.hzya.frame.nifi.model.processgroupid;
import lombok.Data;
@Data
public class Dimensions {
private String width;
private String height;
}

View File

@ -0,0 +1,17 @@
package com.hzya.frame.nifi.model.processgroupid;
import lombok.Data;
import java.util.List;
@Data
public class Flow {
private List<String> processGroups;
private List<String> remoteProcessGroups;
private List<Processors> processors;
private List<InputPorts> inputPorts;
private List<OutputPorts> outputPorts;
private List<Connections> connections;
private List<Labels> labels;
private List<String> funnels;
}

View File

@ -0,0 +1,19 @@
package com.hzya.frame.nifi.model.processgroupid;
import lombok.Data;
import java.util.List;
@Data
public class InputPorts {
private Revision revision;
private String id;
private String uri;
private Position position;
private Permissions permissions;
private List<String> bulletins;
private Component component;
private Status status;
private String portType;
private OperatePermissions operatePermissions;
}

View File

@ -0,0 +1,15 @@
package com.hzya.frame.nifi.model.processgroupid;
import lombok.Data;
@Data
public class Labels {
private Revision revision;
private String id;
private String uri;
private Position position;
private Permissions permissions;
private Dimensions dimensions;
private String zIndex;
private Component component;
}

View File

@ -0,0 +1,20 @@
package com.hzya.frame.nifi.model.processgroupid;
import lombok.Data;
import java.util.List;
@Data
public class LogFlowFileProperties {
private String name;
private String displayName;
private String description;
private String defaultValue;
private List<AllowableValues> allowableValues;
private String required;
private String sensitive;
private String dynamic;
private String supportsEl;
private String expressionLanguageScope;
private List<String> dependencies;
}

View File

@ -0,0 +1,20 @@
package com.hzya.frame.nifi.model.processgroupid;
import lombok.Data;
import java.util.List;
@Data
public class LogLevel {
private String name;
private String displayName;
private String description;
private String defaultValue;
private List<AllowableValues> allowableValues;
private String required;
private String sensitive;
private String dynamic;
private String supportsEl;
private String expressionLanguageScope;
private List<String> dependencies;
}

View File

@ -0,0 +1,20 @@
package com.hzya.frame.nifi.model.processgroupid;
import lombok.Data;
import java.util.List;
@Data
public class LogPayload {
private String name;
private String displayName;
private String description;
private String defaultValue;
private List<AllowableValues> allowableValues;
private String required;
private String sensitive;
private String dynamic;
private String supportsEl;
private String expressionLanguageScope;
private List<String> dependencies;
}

View File

@ -0,0 +1,18 @@
package com.hzya.frame.nifi.model.processgroupid;
import lombok.Data;
import java.util.List;
@Data
public class LogPrefix {
private String name;
private String displayName;
private String description;
private String required;
private String sensitive;
private String dynamic;
private String supportsEl;
private String expressionLanguageScope;
private List<String> dependencies;
}

View File

@ -0,0 +1,9 @@
package com.hzya.frame.nifi.model.processgroupid;
import lombok.Data;
@Data
public class OperatePermissions {
private String canRead;
private String canWrite;
}

View File

@ -0,0 +1,20 @@
package com.hzya.frame.nifi.model.processgroupid;
import lombok.Data;
import java.util.List;
@Data
public class OutputFormat {
private String name;
private String displayName;
private String description;
private String defaultValue;
private List<AllowableValues> allowableValues;
private String required;
private String sensitive;
private String dynamic;
private String supportsEl;
private String expressionLanguageScope;
private List<String> dependencies;
}

View File

@ -0,0 +1,19 @@
package com.hzya.frame.nifi.model.processgroupid;
import lombok.Data;
import java.util.List;
@Data
public class OutputPorts {
private Revision revision;
private String id;
private String uri;
private Position position;
private Permissions permissions;
private List<String> bulletins;
private Component component;
private Status status;
private String portType;
private OperatePermissions operatePermissions;
}

View File

@ -0,0 +1,10 @@
package com.hzya.frame.nifi.model.processgroupid;
import lombok.Data;
@Data
public class ParameterContext {
private String id;
private Permissions permissions;
private Component component;
}

View File

@ -0,0 +1,10 @@
package com.hzya.frame.nifi.model.processgroupid;
import lombok.Data;
@Data
public class ParentBreadcrumb {
private String id;
private Permissions permissions;
private Breadcrumb breadcrumb;
}

View File

@ -0,0 +1,19 @@
package com.hzya.frame.nifi.model.processgroupid;
import lombok.Data;
/**
* @Authorliuyang
* @Packagecom.hzya.frame.nifi.model.processgroupid
* @Projectfw-nifi
* @nameParentBreadcrumb2
* @Date2025/5/14 17:58
* @FilenameParentBreadcrumb2
*/
@Data
public class ParentBreadcrumb2 {
private String id;
private Permissions2 permissions;
private Breadcrumb3 breadcrumb;
private ParentBreadcrumb2 parentBreadcrumb;
}

View File

@ -0,0 +1,9 @@
package com.hzya.frame.nifi.model.processgroupid;
import lombok.Data;
@Data
public class Permissions {
private String canRead;
private String canWrite;
}

View File

@ -0,0 +1,17 @@
package com.hzya.frame.nifi.model.processgroupid;
import lombok.Data;
/**
* @Authorliuyang
* @Packagecom.hzya.frame.nifi.model.processgroupid
* @Projectfw-nifi
* @namePermissions2
* @Date2025/5/14 17:57
* @FilenamePermissions2
*/
@Data
public class Permissions2 {
private String canRead;
private String canWrite;
}

View File

@ -0,0 +1,9 @@
package com.hzya.frame.nifi.model.processgroupid;
import lombok.Data;
@Data
public class Position {
private String x;
private String y;
}

View File

@ -0,0 +1,14 @@
package com.hzya.frame.nifi.model.processgroupid;
import lombok.Data;
@Data
public class ProcessGroupFlow {
private String id;
private String uri;
private String parentGroupId;
private ParameterContext parameterContext;
private Breadcrumb2 breadcrumb;
private Flow flow;
private String lastRefreshed;
}

View File

@ -0,0 +1,9 @@
package com.hzya.frame.nifi.model.processgroupid;
import lombok.Data;
@Data
public class ProcessGroupsId {
private Permissions permissions;
private ProcessGroupFlow processGroupFlow;
}

View File

@ -0,0 +1,13 @@
package com.hzya.frame.nifi.model.processgroupid;
import lombok.Data;
@Data
public class ProcessingPerformanceStatus {
private String identifier;
private String cpuDuration;
private String contentReadDuration;
private String contentWriteDuration;
private String sessionCommitDuration;
private String garbageCollectionDuration;
}

View File

@ -0,0 +1,19 @@
package com.hzya.frame.nifi.model.processgroupid;
import lombok.Data;
import java.util.List;
@Data
public class Processors {
private Revision revision;
private String id;
private String uri;
private Position position;
private Permissions permissions;
private List<Bulletins> bulletins;
private Component2 component;
private String inputRequirement;
private Status status;
private OperatePermissions operatePermissions;
}

View File

@ -0,0 +1,47 @@
package com.hzya.frame.nifi.model.processgroupid;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;
/**
* Auto-generated: 2025-05-14 16:51:34
*
* @author lzltool.com
* @website https://www.lzltool.com/JsonToJava
*/
@Data
public class Properties {
@JsonProperty("Log Level")
private String logLevel;
@JsonProperty("Log Payload")
private String logPayload;
@JsonProperty("Attributes to Log")
private String attributesToLog;
@JsonProperty("attributes-to-log-regex")
private String attributesToLogRegex;
@JsonProperty("Attributes to Ignore")
private String attributesToIgnore;
@JsonProperty("attributes-to-ignore-regex")
private String attributesToIgnoreRegex;
@JsonProperty("Log FlowFile Properties")
private String logFlowFileProperties;
@JsonProperty("Output Format")
private String outputFormat;
@JsonProperty("Log prefix")
private String logPrefix;
@JsonProperty("character-set")
private String characterSet;
@JsonProperty("Database Connection Pooling Service")
private String databaseConnectionPoolingService;
}

View File

@ -0,0 +1,11 @@
package com.hzya.frame.nifi.model.processgroupid;
import lombok.Data;
@Data
public class Relationships {
private String name;
private String description;
private String autoTerminate;
private String retry;
}

View File

@ -0,0 +1,8 @@
package com.hzya.frame.nifi.model.processgroupid;
import lombok.Data;
@Data
public class Revision {
private int version;
}

View File

@ -0,0 +1,14 @@
package com.hzya.frame.nifi.model.processgroupid;
import lombok.Data;
@Data
public class Source {
private String id;
private String versionedComponentId;
private String type;
private String groupId;
private String name;
private String running;
private String comments;
}

View File

@ -0,0 +1,13 @@
package com.hzya.frame.nifi.model.processgroupid;
import lombok.Data;
@Data
public class Status {
private String groupId;
private String id;
private String name;
private String runStatus;
private String statsLastRefreshed;
private AggregateSnapshot aggregateSnapshot;
}

View File

@ -0,0 +1,7 @@
package com.hzya.frame.nifi.model.processgroupid;
import lombok.Data;
@Data
public class Style {
}

View File

@ -0,0 +1,43 @@
package com.hzya.frame.nifi.model.processgrouproot;
import lombok.Data;
/**
* Auto-generated: 2025-05-14 15:30:8
*
* @author bejson.com (i@bejson.com)
* @website http://www.bejson.com/java2pojo/
*/
@Data
public class AggregateSnapshot {
private String id;
private String name;
private String flowFilesIn;
private String bytesIn;
private String input;
private String flowFilesQueued;
private String bytesQueued;
private String queued;
private String queuedCount;
private String queuedSize;
private String bytesRead;
private String read;
private String bytesWritten;
private String written;
private String flowFilesOut;
private String bytesOut;
private String output;
private String flowFilesTransferred;
private String bytesTransferred;
private String transferred;
private String bytesReceived;
private String flowFilesReceived;
private String received;
private String bytesSent;
private String flowFilesSent;
private String sent;
private String activeThreadCount;
private String terminatedThreadCount;
private String processingNanos;
private ProcessingPerformanceStatus processingPerformanceStatus;
}

View File

@ -0,0 +1,10 @@
package com.hzya.frame.nifi.model.processgrouproot;
import lombok.Data;
@Data
public class Breadcrumb {
private String id;
private Permissions permissions;
private Breadcrumb2 breadcrumb;
}

View File

@ -0,0 +1,9 @@
package com.hzya.frame.nifi.model.processgrouproot;
import lombok.Data;
@Data
public class Breadcrumb2 {
private String id;
private String name;
}

View File

@ -0,0 +1,23 @@
package com.hzya.frame.nifi.model.processgrouproot;
import lombok.Data;
/**
* @Authorliuyang
* @Packagecom.hzya.frame.nifi.model.processgroup
* @Projectfw-nifi
* @nameBulletin
* @Date2025/5/14 15:59
* @FilenameBulletin
*/
@Data
public class Bulletin {
private String id;
private String category;
private String groupId;
private String sourceId;
private String sourceName;
private String level;
private String message;
private String timestamp;
}

View File

@ -0,0 +1,21 @@
package com.hzya.frame.nifi.model.processgrouproot;
import lombok.Data;
/**
* @Authorliuyang
* @Packagecom.hzya.frame.nifi.model.processgroup
* @Projectfw-nifi
* @nameBulletin
* @Date2025/5/14 15:57
* @FilenameBulletin
*/
@Data
public class Bulletins {
private String id;
private String groupId;
private String sourceId;
private String timestamp;
private String canRead;
private Bulletin bulletin;
}

View File

@ -0,0 +1,37 @@
package com.hzya.frame.nifi.model.processgrouproot;
import lombok.Data;
@Data
public class Component {
private String id;
private String versionedComponentId;
private String parentGroupId;
private Position position;
private String name;
private String comments;
private Variables variables;
private String flowfileConcurrency;
private String flowfileOutboundPolicy;
private String defaultFlowFileExpiration;
private String defaultBackPressureObjectThreshold;
private String defaultBackPressureDataSizeThreshold;
private String logFileSuffix;
private String runningCount;
private String stoppedCount;
private String invalidCount;
private String disabledCount;
private String activeRemotePortCount;
private String inactiveRemotePortCount;
private String upToDateCount;
private String locallyModifiedCount;
private String staleCount;
private String locallyModifiedAndStaleCount;
private String syncFailureCount;
private String localInputPortCount;
private String localOutputPortCount;
private String publicInputPortCount;
private String publicOutputPortCount;
private String inputPortCount;
private String outputPortCount;
}

View File

@ -0,0 +1,9 @@
package com.hzya.frame.nifi.model.processgrouproot;
import lombok.Data;
@Data
public class Component2 {
private String id;
private String name;
}

View File

@ -0,0 +1,17 @@
package com.hzya.frame.nifi.model.processgrouproot;
import lombok.Data;
import java.util.List;
@Data
public class Flow {
private List<ProcessGroups> processGroups;
private List<String> remoteProcessGroups;
private List<String> processors;
private List<String> inputPorts;
private List<String> outputPorts;
private List<String> connections;
private List<String> labels;
private List<String> funnels;
}

View File

@ -0,0 +1,18 @@
package com.hzya.frame.nifi.model.processgrouproot;
import lombok.Data;
/**
* @Authorliuyang
* @Packagecom.hzya.frame.nifi.model.processgrouproot
* @Projectfw-nifi
* @nameParameterContext
* @Date2025/5/14 16:43
* @FilenameParameterContext
*/
@Data
public class ParameterContext {
private String id;
private Permissions permissions;
private Component2 component;
}

View File

@ -0,0 +1,9 @@
package com.hzya.frame.nifi.model.processgrouproot;
import lombok.Data;
@Data
public class Permissions {
private String canRead;
private String canWrite;
}

View File

@ -0,0 +1,9 @@
package com.hzya.frame.nifi.model.processgrouproot;
import lombok.Data;
@Data
public class Position {
private String x;
private String y;
}

View File

@ -0,0 +1,16 @@
package com.hzya.frame.nifi.model.processgrouproot;
import lombok.Data;
@Data
public class ProcessGroupFlow {
private String id;
private String uri;
private Breadcrumb breadcrumb;
private Flow flow;
private String lastRefreshed;
//查询单个流程组
// private String parentGroupId;
//查询单个流程组
// private ParameterContext parameterContext;
}

View File

@ -0,0 +1,34 @@
package com.hzya.frame.nifi.model.processgrouproot;
import lombok.Data;
import java.util.List;
@Data
public class ProcessGroups {
private Revision revision;
private String id;
private String uri;
private Position position;
private Permissions permissions;
private List<Bulletins> bulletins;
private Component component;
private Status status;
private String runningCount;
private String stoppedCount;
private String invalidCount;
private String disabledCount;
private String activeRemotePortCount;
private String inactiveRemotePortCount;
private String upToDateCount;
private String locallyModifiedCount;
private String staleCount;
private String locallyModifiedAndStaleCount;
private String syncFailureCount;
private String localInputPortCount;
private String localOutputPortCount;
private String publicInputPortCount;
private String publicOutputPortCount;
private String inputPortCount;
private String outputPortCount;
}

View File

@ -0,0 +1,9 @@
package com.hzya.frame.nifi.model.processgrouproot;
import lombok.Data;
@Data
public class ProcessGroupsRoot {
private Permissions permissions;
private ProcessGroupFlow processGroupFlow;
}

View File

@ -0,0 +1,13 @@
package com.hzya.frame.nifi.model.processgrouproot;
import lombok.Data;
@Data
public class ProcessingPerformanceStatus {
private String identifier;
private String cpuDuration;
private String contentReadDuration;
private String contentWriteDuration;
private String sessionCommitDuration;
private String garbageCollectionDuration;
}

View File

@ -0,0 +1,8 @@
package com.hzya.frame.nifi.model.processgrouproot;
import lombok.Data;
@Data
public class Revision {
private String version;
}

View File

@ -0,0 +1,11 @@
package com.hzya.frame.nifi.model.processgrouproot;
import lombok.Data;
@Data
public class Status {
private String id;
private String name;
private String statsLastRefreshed;
private AggregateSnapshot aggregateSnapshot;
}

View File

@ -0,0 +1,8 @@
package com.hzya.frame.nifi.model.processgrouproot;
import lombok.Data;
@Data
public class Variables {
}

View File

@ -0,0 +1,14 @@
package com.hzya.frame.nifi.model.resultparametercontexts;
import lombok.Data;
import java.util.List;
@Data
public class Component {
private String name;
private List<Parameters> parameters;
private List<String> boundProcessGroups;
private List<String> inheritedParameterContexts;
private String id;
}

View File

@ -0,0 +1,22 @@
package com.hzya.frame.nifi.model.resultparametercontexts;
import lombok.Data;
import java.util.List;
/**
* Auto-generated: 2025-05-14 18:44:23
*
* @author bejson.com (i@bejson.com)
* @website http://www.bejson.com/java2pojo/
*/
@Data
public class Parameter {
private String name;
private String description;
private String sensitive;
private String value;
private String provided;
private List<String> referencingComponents;
private ParameterContext parameterContext;
}

View File

@ -0,0 +1,10 @@
package com.hzya.frame.nifi.model.resultparametercontexts;
import lombok.Data;
@Data
public class ParameterContext {
private String id;
private Permissions permissions;
private Component component;
}

View File

@ -0,0 +1,12 @@
package com.hzya.frame.nifi.model.resultparametercontexts;
import lombok.Data;
@Data
public class ParameterContextsResult {
private Revision revision;
private String id;
private String uri;
private Permissions permissions;
private Component component;
}

View File

@ -0,0 +1,9 @@
package com.hzya.frame.nifi.model.resultparametercontexts;
import lombok.Data;
@Data
public class Parameters {
private boolean canWrite;
private Parameter parameter;
}

View File

@ -0,0 +1,9 @@
package com.hzya.frame.nifi.model.resultparametercontexts;
import lombok.Data;
@Data
public class Permissions {
private boolean canRead;
private boolean canWrite;
}

View File

@ -0,0 +1,9 @@
package com.hzya.frame.nifi.model.resultparametercontexts;
import lombok.Data;
@Data
public class Revision {
private int version;
private String lastModifier;
}

View File

@ -0,0 +1,75 @@
package com.hzya.frame.nifi.service;
import cn.hutool.core.util.StrUtil;
import com.hzya.frame.nifi.client.NifiClient;
import com.hzya.frame.nifi.model.joinparametercontexts.ParameterContextsJoin;
import com.hzya.frame.nifi.model.processgroupid.ProcessGroupsId;
import com.hzya.frame.nifi.model.processgrouproot.ProcessGroupsRoot;
import com.hzya.frame.nifi.model.resultparametercontexts.ParameterContextsResult;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* nifi api 服务类
*
* @Authorliuyang
* @Packagecom.hzya.frame.nifi.service
* @Projectfw-nifi
* @nameNifiService
* @Date2025/5/14 10:02
* @FilenameNifiService
*/
@Service
public class NifiApiService {
private final NifiClient client;
@Autowired
public NifiApiService(NifiClient client) {
this.client = client;
}
/**
* 获取token
*/
public String getAccessToken() throws Exception {
return client.getAccessToken();
}
/**
* 查询全部流程组或指定流程组
*/
public Object queryFlowProcessGroupsRoot(String flowGroupId) throws Exception {
if (flowGroupId != null) {
return client.get(StrUtil.format("/flow/process-groups/{}", flowGroupId), ProcessGroupsId.class);
} else {
return client.get("/flow/process-groups/root", ProcessGroupsRoot.class);
}
}
/**
* 创建参数上下文
*/
public ParameterContextsResult createParameterContexts(ParameterContextsJoin parameterContextsJoin) throws Exception {
return client.post("/parameter-contexts", parameterContextsJoin, ParameterContextsResult.class);
}
/**
* 更新参数上下文
*/
public ParameterContextsResult updateParameterContexts(ParameterContextsResult parameterContextsJoin) throws Exception {
return client.post(StrUtil.format("/parameter-contexts/{contextId}/update-requests", parameterContextsJoin.getId()), parameterContextsJoin, ParameterContextsResult.class);
}
/**
* 删除参数上下文
*/
/**
* 查询参数上下文
*/
public ParameterContextsResult getParameterContexts(String paramContextsId) throws Exception {
return client.get(StrUtil.format("/parameter-contexts/{}", paramContextsId), ParameterContextsResult.class);
}
}

View File

@ -0,0 +1,31 @@
package com.hzya.frame.nifi.util;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.stereotype.Component;
/**
* @Authorliuyang
* @Packagecom.hzya.frame.nifi.util
* @Projectfw-nifi
* @nameJsonUtil
* @Date2025/5/14 10:03
* @FilenameJsonUtil
*/
@Component
public class JsonUtil {
private final ObjectMapper objectMapper;
public JsonUtil(ObjectMapper objectMapper) {
this.objectMapper = objectMapper;
}
public String toJson(Object obj) throws Exception {
return objectMapper.writeValueAsString(obj);
}
public <T> T fromJson(String json, Class<T> clazz) throws Exception {
return objectMapper.readValue(json, clazz);
}
}

View File

@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns="http://xmlns.jcp.org/xml/ns/javaee"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/web-app_4_0.xsd"
version="4.0">
</web-app>

View File

@ -0,0 +1,43 @@
//import com.bazaarvoice.jolt.Chainr;
//import com.bazaarvoice.jolt.JsonUtils;
//import com.fasterxml.jackson.databind.JsonNode;
//import com.fasterxml.jackson.databind.ObjectMapper;
//
//import java.io.File;
//import java.io.FileInputStream;
//import java.io.InputStream;
//import java.util.List;
//import java.util.Map;
//
///**
// * @Authorliuyang
// * @PackagePACKAGE_NAME
// * @Projectfw-nifi
// * @nameJSONata4Java
// * @Date2025/5/13 15:42
// * @FilenameJSONata4Java
// */
//public class JSONata4Java {
// public static void main(String[] args) throws Exception {
// // 原始 JSON 数据
// String aJson = "{ \"id\": 1, \"name\": \"ObjectA\" }";
// String bJson = "[{ \"id\": 1, \"value\": \"DataB1\" }, { \"id\": 2, \"value\": \"DataB2\" }]";
// String cJson = "[{ \"id\": 1, \"value\": \"DataC1\" }, { \"id\": 3, \"value\": \"DataC3\" }]";
//
// // 合并为 Jolt 输入格式
// String inputJson = "{ \"a\": " + aJson + ", \"b\": " + bJson + ", \"c\": " + cJson + " }";
//
// // 加载 Jolt 转换规则
// File newFile = new File("/Users/liuyang/workspaces/hzya/fw-nifi/fw-nifi/src/test/java/spec.json");
// InputStream inputStream = new FileInputStream(newFile);
// List<Object> spec = JsonUtils.jsonToList(inputStream);
// Chainr chainr = Chainr.fromSpec(spec);
//
// // 执行转换
// Map<String, Object> input = JsonUtils.jsonToMap(inputJson);
// Map<String, Object> output = (Map<String, Object>) chainr.transform(input);
//
// // 输出结果
// System.out.println(JsonUtils.toPrettyJsonString(output));
// }
//}

View File

@ -0,0 +1,28 @@
import com.hzya.frame.nifi.service.NifiApiService;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
/**
* @Authorliuyang
* @PackagePACKAGE_NAME
* @Projectfw-nifi
* @nameTestNifiConfig
* @Date2025/5/14 11:26
* @FilenameTestNifiConfig
*/
@SpringBootTest
public class TestNifiConfig {
// @Autowired
// private NifiApiService nifiService;
//
// @Test
// public void testCreateProcessor() {
// try {
// nifiService.getAccessToken();
// } catch (Exception e) {
// e.printStackTrace();
// }
// }
}

View File

@ -0,0 +1,35 @@
[
{
"operation": "shift",
"spec": {
"a": {
"id": "id",
"name": "name"
},
"b": {
"*": {
"id": {
"@(2,a.id)": {
"@2": {
"value": "matchedB.value",
"#B": "matchedB.source"
}
}
}
}
},
"c": {
"*": {
"id": {
"@(2,a.id)": {
"@2": {
"value": "matchedC.value",
"#C": "matchedC.source"
}
}
}
}
}
}
}
]

Some files were not shown because too many files have changed in this diff Show More