feat(nifi): 增加创建模板、实例化代码片段和创建处理器的功能

- 新增创建 NiFi 模板的功能
- 实现将代码片段实例化到指定位置的方法
- 添加创建处理器的功能,支持 nifi 1.28.1 版本的 364 个处理器- 优化日志输出,使用 slf4j 替代 System.out.println
- 新增多个结果模型类,用于处理不同类型的响应数据
This commit is contained in:
liuy 2025-05-17 16:53:07 +08:00
parent 7a5e8bf6f2
commit b228ab67a5
23 changed files with 404 additions and 6 deletions

View File

@ -3,6 +3,8 @@ package com.hzya.frame.nifi.client;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.hzya.frame.nifi.config.NifiServiceConfig;
import okhttp3.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@ -24,12 +26,14 @@ import java.util.concurrent.atomic.AtomicReference;
*/
@Component
public class NifiClient {
private final NifiServiceConfig config;
private final OkHttpClient httpClient;
private final ObjectMapper objectMapper;
private final AtomicReference<String> accessToken = new AtomicReference<>();
private volatile long tokenIssueTime; // Token 发行时间毫秒
private volatile long tokenExpiration; // Token 过期时间毫秒
Logger logger = LoggerFactory.getLogger(NifiClient.class);
@Autowired
public NifiClient(NifiServiceConfig config, OkHttpClient httpClient, ObjectMapper objectMapper) {
@ -44,7 +48,7 @@ public class NifiClient {
accessToken.set(getAccessToken());
tokenIssueTime = System.currentTimeMillis();
tokenExpiration = tokenIssueTime + (12 * 60 * 60 * 1000); // 默认 12 小时
System.out.println("令牌已初始化,过期时间:" + new java.util.Date(tokenExpiration));
logger.info("令牌已初始化,过期时间:" + new java.util.Date(tokenExpiration));
} catch (Exception e) {
throw new RuntimeException("初始化NiFi访问令牌失败 " + e.getMessage());
}
@ -116,7 +120,7 @@ public class NifiClient {
if (attempt == maxRetries - 1 || !isTokenExpiredError(e)) {
throw e;
}
System.out.println("令牌可能已过期,请刷新并重试...");
logger.info("令牌可能已过期,请刷新并重试...");
try {
accessToken.set(getAccessToken());
tokenIssueTime = System.currentTimeMillis();
@ -136,7 +140,7 @@ public class NifiClient {
accessToken.set(getAccessToken());
tokenIssueTime = System.currentTimeMillis();
tokenExpiration = tokenIssueTime + (12 * 60 * 60 * 1000); // 默认 12 小时
System.out.println("令牌已刷新,到期时间: " + new java.util.Date(tokenExpiration));
logger.info("令牌已刷新,到期时间: " + new java.util.Date(tokenExpiration));
} catch (Exception e) {
throw new IOException("刷新令牌失败: " + e.getMessage());
}
@ -151,6 +155,7 @@ public class NifiClient {
private <T> T executeRequest(Request request, Class<T> responseType) throws IOException {
try (Response response = httpClient.newCall(request).execute()) {
String responseBody = response.body().string();
logger.info("响应体:" + responseBody);
if (!response.isSuccessful()) {
throw new IOException("意外的响应码: " + responseBody);
}

View File

@ -0,0 +1,18 @@
package com.hzya.frame.nifi.model.joinsnippetinstance;
import lombok.Data;
/**
* @Authorliuyang
* @Packagecom.hzya.frame.nifi.model.joinsnippetinstance
* @Projectfw-nifi
* @nameSnippetInstanceJoin
* @Date2025/5/17 14:21
* @FilenameSnippetInstanceJoin
*/
@Data
public class SnippetInstanceJoin {
private String snippetId;
private String originX;
private String originY;
}

View File

@ -0,0 +1,17 @@
package com.hzya.frame.nifi.model.resultcreateprocessor;
import lombok.Data;
/**
* Auto-generated: 2025-05-17 14:50:7
*
* @author bejson.com (i@bejson.com)
* @website http://www.bejson.com/java2pojo/
*/
@Data
public class Component15 {
private String type;
private String name;
private Position15 position;
private Config15 config;
}

View File

@ -0,0 +1,15 @@
package com.hzya.frame.nifi.model.resultcreateprocessor;
import lombok.Data;
/**
* Auto-generated: 2025-05-17 14:50:7
*
* @author bejson.com (i@bejson.com)
* @website http://www.bejson.com/java2pojo/
*/
@Data
public class Config15 {
private String schedulingPeriod;
private Properties15 properties;
}

View File

@ -0,0 +1,15 @@
package com.hzya.frame.nifi.model.resultcreateprocessor;
import lombok.Data;
/**
* Auto-generated: 2025-05-17 14:50:7
*
* @author bejson.com (i@bejson.com)
* @website http://www.bejson.com/java2pojo/
*/
@Data
public class CreateProcessorJoin15 {
private Revision15 revision;
private Component15 component;
}

View File

@ -0,0 +1,16 @@
package com.hzya.frame.nifi.model.resultcreateprocessor;
import lombok.Data;
/**
* @Authorliuyang
* @Packagecom.hzya.frame.nifi.model.resultcreateprocessor
* @Projectfw-nifi
* @nameCreateProcessorResult
* @Date2025/5/17 14:49
* @FilenameCreateProcessorResult
*/
@Data
public class CreateProcessorResult15 {
}

View File

@ -0,0 +1,15 @@
package com.hzya.frame.nifi.model.resultcreateprocessor;
import lombok.Data;
/**
* Auto-generated: 2025-05-17 14:50:7
*
* @author bejson.com (i@bejson.com)
* @website http://www.bejson.com/java2pojo/
*/
@Data
public class Position15 {
private String x;
private String y;
}

View File

@ -0,0 +1,16 @@
package com.hzya.frame.nifi.model.resultcreateprocessor;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;
/**
* Auto-generated: 2025-05-17 14:50:7
*
* @author bejson.com (i@bejson.com)
* @website http://www.bejson.com/java2pojo/
*/
@Data
public class Properties15 {
@JsonProperty("File Size")
private String fileSize;
}

View File

@ -0,0 +1,14 @@
package com.hzya.frame.nifi.model.resultcreateprocessor;
import lombok.Data;
/**
* Auto-generated: 2025-05-17 14:50:7
*
* @author bejson.com (i@bejson.com)
* @website http://www.bejson.com/java2pojo/
*/
@Data
public class Revision15 {
private String version;
}

View File

@ -0,0 +1,21 @@
package com.hzya.frame.nifi.model.resultcreateprocessors;
import lombok.Data;
/**
* @Authorliuyang
* @Packagecom.hzya.frame.nifi.model.resultcreateprocessors
* @Projectfw-nifi
* @nameComponent16
* @Date2025/5/17 16:02
* @FilenameComponent16
*/
@Data
public class Component16 {
private String id;
private String parentGroupId;
private String name;
private String type;
private String state;
private Config16 config;
}

View File

@ -0,0 +1,31 @@
package com.hzya.frame.nifi.model.resultcreateprocessors;
import lombok.Data;
import java.util.Map;
/**
* @Authorliuyang
* @Packagecom.hzya.frame.nifi.model.resultcreateprocessors
* @Projectfw-nifi
* @nameConfig16
* @Date2025/5/17 16:09
* @FilenameConfig16
*/
@Data
public class Config16 {
private Map<String, String> properties;
private String schedulingPeriod;
private String schedulingStrategy;
private String executionNode;
private String penaltyDuration;
private String yieldDuration;
private String bulletinLevel;
private String runDurationMillis;
private String concurrentlySchedulableTaskCount;
private String comments;
private String lossTolerant;
private String retryCount;
private String backoffMechanism;
private String maxBackoffPeriod;
}

View File

@ -0,0 +1,17 @@
package com.hzya.frame.nifi.model.resultcreateprocessors;
import lombok.Data;
/**
* @Authorliuyang
* @Packagecom.hzya.frame.nifi.model.resultcreateprocessors
* @Projectfw-nifi
* @nameCreateProcess
* @Date2025/5/17 16:01
* @FilenameCreateProcess
*/
@Data
public class CreateProcess16 {
private String id;
private Component16 component;
}

View File

@ -0,0 +1,28 @@
//package com.hzya.frame.nifi.model.resultcreateprocessors;
//
//import com.fasterxml.jackson.annotation.JsonProperty;
//
///**
// * @Authorliuyang
// * @Packagecom.hzya.frame.nifi.model.resultcreateprocessors
// * @Projectfw-nifi
// * @nameProperties16
// * @Date2025/5/17 16:10
// * @FilenameProperties16
// */
//public class Properties16 {
// @JsonProperty("File Size")
// private String fileSize;
//
// @JsonProperty("Batch Size")
// private String batchSize;
//
// @JsonProperty("Data Format")
// private String dataFormat;
//
// @JsonProperty("Unique FlowFiles")
// private String uniqueFlowFiles;
//
// @JsonProperty("character-set")
// private String characterSet;
//}

View File

@ -0,0 +1,17 @@
package com.hzya.frame.nifi.model.resultcreatesnippet;
import lombok.Data;
/**
* @Authorliuyang
* @Packagecom.hzya.frame.nifi.model.resultcreatesnippet
* @Projectfw-nifi
* @nameComponent15
* @Date2025/5/17 14:38
* @FilenameComponent15
*/
@Data
public class Component15 {
private String id;
private String name;
}

View File

@ -0,0 +1,18 @@
package com.hzya.frame.nifi.model.resultcreatesnippet;
import lombok.Data;
import java.util.List;
/**
* @Authorliuyang
* @Packagecom.hzya.frame.nifi.model.resultcreatesnippet
* @Projectfw-nifi
* @nameFlow15
* @Date2025/5/17 14:35
* @FilenameFlow15
*/
@Data
public class Flow15 {
private List<ProcessGroups15> processGroups;
}

View File

@ -0,0 +1,17 @@
package com.hzya.frame.nifi.model.resultcreatesnippet;
import lombok.Data;
/**
* @Authorliuyang
* @Packagecom.hzya.frame.nifi.model.resultcreatesnippet
* @Projectfw-nifi
* @namePosition15
* @Date2025/5/17 14:37
* @FilenamePosition15
*/
@Data
public class Position15 {
private String x;
private String y;
}

View File

@ -0,0 +1,20 @@
package com.hzya.frame.nifi.model.resultcreatesnippet;
import lombok.Data;
/**
* @Authorliuyang
* @Packagecom.hzya.frame.nifi.model.resultcreatesnippet
* @Projectfw-nifi
* @nameProcessGroups15
* @Date2025/5/17 14:35
* @FilenameProcessGroups15
*/
@Data
public class ProcessGroups15 {
private String id;
private Revision15 revision;
private Position15 position;
private Component15 component;
private Status15 status;
}

View File

@ -0,0 +1,16 @@
package com.hzya.frame.nifi.model.resultcreatesnippet;
import lombok.Data;
/**
* @Authorliuyang
* @Packagecom.hzya.frame.nifi.model.resultcreatesnippet
* @Projectfw-nifi
* @nameRevision15
* @Date2025/5/17 14:36
* @FilenameRevision15
*/
@Data
public class Revision15 {
private String version;
}

View File

@ -0,0 +1,18 @@
package com.hzya.frame.nifi.model.resultcreatesnippet;
import lombok.Data;
import java.util.List;
/**
* @Authorliuyang
* @Packagecom.hzya.frame.nifi.model.resultcreatesnippet
* @Projectfw-nifi
* @nameSnippetInstance
* @Date2025/5/17 14:34
* @FilenameSnippetInstance
*/
@Data
public class SnippetInstance15 {
private Flow15 flow;
}

View File

@ -0,0 +1,17 @@
package com.hzya.frame.nifi.model.resultcreatesnippet;
import lombok.Data;
/**
* @Authorliuyang
* @Packagecom.hzya.frame.nifi.model.resultcreatesnippet
* @Projectfw-nifi
* @nameStatus15
* @Date2025/5/17 14:39
* @FilenameStatus15
*/
@Data
public class Status15 {
private String id;
private String name;
}

View File

@ -0,0 +1,14 @@
package com.hzya.frame.nifi.model.resultcreatetemplate;
import lombok.Data;
/**
* Auto-generated: 2025-05-17 14:16:2
*
* @author bejson.com (i@bejson.com)
* @website http://www.bejson.com/java2pojo/
*/
@Data
public class NewNifiTemplatete14 {
private Template14 template;
}

View File

@ -0,0 +1,15 @@
package com.hzya.frame.nifi.model.resultcreatetemplate;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;
@Data
public class Template14 {
private String uri;
private String id;
private String groupId;
private String name;
private String timestamp;
@JsonProperty("encoding-version")
private String encodingVersion;
}

View File

@ -8,12 +8,16 @@ import com.hzya.frame.nifi.model.joincreatetemp.CreateTemplateJoin;
import com.hzya.frame.nifi.model.joingetcontroller.ControllerService12;
import com.hzya.frame.nifi.model.joinparametercontexts.ParameterContextsJoin;
import com.hzya.frame.nifi.model.joinprocessgroups.ProcessGroupsJoin;
import com.hzya.frame.nifi.model.joinsnippets.Snippet;
import com.hzya.frame.nifi.model.joinsnippetinstance.SnippetInstanceJoin;
import com.hzya.frame.nifi.model.joinsnippets.SnippetsJoin;
import com.hzya.frame.nifi.model.joinstartorstopprocessgroup.StartOrStopProcessGroupsInfoJoin10;
import com.hzya.frame.nifi.model.nifitemplates.NifiTemplates;
import com.hzya.frame.nifi.model.processgroupid.ProcessGroupsId;
import com.hzya.frame.nifi.model.processgrouproot.ProcessGroupsRoot;
import com.hzya.frame.nifi.model.resultcreateprocessor.CreateProcessorJoin15;
import com.hzya.frame.nifi.model.resultcreateprocessors.CreateProcess16;
import com.hzya.frame.nifi.model.resultcreatesnippet.SnippetInstance15;
import com.hzya.frame.nifi.model.resultcreatetemplate.NewNifiTemplatete14;
import com.hzya.frame.nifi.model.resultparametercontexts.ParameterContextsResult;
import com.hzya.frame.nifi.model.resultprocessgroups.ProcessgroupsResult;
import com.hzya.frame.nifi.model.resultprocessgroupsinfo.ProcessGroupsInfoResult9;
@ -184,8 +188,8 @@ public class NifiApiService {
/**
* 把指定的流程组生成模板
*/
public void createNifiTemplatete(String id, CreateTemplateJoin createTemplateJoin) throws Exception {
client.post(StrUtil.format("/process-groups/{}/templates", id), createTemplateJoin, null);
public NewNifiTemplatete14 createNifiTemplate(String id, CreateTemplateJoin createTemplateJoin) throws Exception {
return client.post(StrUtil.format("/process-groups/{}/templates", id), createTemplateJoin, NewNifiTemplatete14.class);
}
/**
@ -194,4 +198,18 @@ public class NifiApiService {
public SnippetResult13 createSnippets(SnippetsJoin snippetsJoin) throws Exception {
return client.post("/snippets", snippetsJoin, SnippetResult13.class);
}
/**
* 将代码片段实例化到指定位置
*/
public SnippetInstance15 createSnippetInstance(String groupId, SnippetInstanceJoin snippetInstanceJoin) throws Exception {
return client.post(StrUtil.format("/process-groups/{}/snippet-instance", groupId), snippetInstanceJoin, SnippetInstance15.class);
}
/**
* 创建处理器nifi1.28.1版本有364个处理器并且每个入参都有差异所以创建之前可以先查询一遍处理器得到对应的参数进行修改调整
*/
public CreateProcess16 createProcessor(String groupId, CreateProcessorJoin15 createProcessorJoin15) throws Exception {
return client.post(StrUtil.format("/process-groups/{}/processors", groupId), createProcessorJoin15, CreateProcess16.class);
}
}