feat(nifi): 新增应用实例化功能

- 新增 Oracle 和 MySQL 应用实例化功能- 实现创建流程组、处理器和连接的功能
- 添加计算新组件坐标的逻辑
- 封装基础操作和 NiFi 操作接口
This commit is contained in:
liuy 2025-05-21 18:20:50 +08:00
parent 675b4a7925
commit 58ff225c47
15 changed files with 422 additions and 2 deletions

View File

@ -62,6 +62,21 @@ public class NifiClient {
});
}
/**
* 获取原始JSON响应字符串不进行反序列化
*/
public String getRaw(String path) throws IOException {
return executeRequestWithRetry(() -> {
Request request = new Request.Builder().url(config.getApiUrl() + path).get().header("Authorization", "Bearer " + accessToken.get()).build();
try (Response response = httpClient.newCall(request).execute()) {
if (!response.isSuccessful()) {
throw new IOException("意外的响应码: " + response.code());
}
return response.body().string();
}
});
}
/**
* 执行GET请求并返回文件流供调用者处理如保存到文件或浏览器下载
*/

View File

@ -0,0 +1,22 @@
package com.hzya.frame.nifiapi.model.basemodel;
import lombok.Data;
/**
* 基类封转创建应用操作公共的字段
*
* @Authorliuyang
* @Packagecom.hzya.frame.nifiapi.model.joincreateoracleapp
* @Projectfw-nifi
* @nameCreateOracleApp
* @Date2025/5/21 17:44
* @FilenameCreateOracleApp
*/
@Data
public class JoinBashModel {
private String appProcessGroupId;
private String parentProcessGroupId;
private String copyTargetProcessGroupId;
// private String copyTargetParentProcessGroupId;
}

View File

@ -0,0 +1,16 @@
package com.hzya.frame.nifiapi.model.joincreatemysqlapp;
import com.hzya.frame.nifiapi.model.basemodel.JoinBashModel;
import lombok.Data;
/**
* @Authorliuyang
* @Packagecom.hzya.frame.nifiapi.model.joincreateoracleapp
* @Projectfw-nifi
* @nameCreateOracleApp
* @Date2025/5/21 17:44
* @FilenameCreateOracleApp
*/
@Data
public class CreateMysqlApp extends JoinBashModel {
}

View File

@ -0,0 +1,16 @@
package com.hzya.frame.nifiapi.model.joincreateoracleapp;
import com.hzya.frame.nifiapi.model.basemodel.JoinBashModel;
import lombok.Data;
/**
* @Authorliuyang
* @Packagecom.hzya.frame.nifiapi.model.joincreateoracleapp
* @Projectfw-nifi
* @nameCreateOracleApp
* @Date2025/5/21 17:44
* @FilenameCreateOracleApp
*/
@Data
public class CreateOracleApp extends JoinBashModel {
}

View File

@ -5,4 +5,5 @@ import lombok.Data;
@Data
public class Component7 {
private String name;
private Position7 position;
}

View File

@ -0,0 +1,25 @@
package com.hzya.frame.nifiapi.model.joinprocessgroups;
import lombok.Data;
/**
* @Authorliuyang
* @Packagecom.hzya.frame.nifiapi.model.joinprocessgroups
* @Projectfw-nifi
* @namePosition7
* @Date2025/5/21 14:57
* @FilenamePosition7
*/
@Data
public class Position7 {
private String x;
private String y;
public Position7() {
}
public Position7(String x, String y) {
this.x = x;
this.y = y;
}
}

View File

@ -1,5 +1,6 @@
package com.hzya.frame.nifiapi.model.resultcreateprocessor;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;
/**
@ -12,4 +13,5 @@ import lombok.Data;
public class Config15 {
private String schedulingPeriod;
private Properties15 properties;
private String bulletinLevel;
}

View File

@ -12,4 +12,13 @@ import lombok.Data;
public class Position15 {
private String x;
private String y;
public Position15(String x, String y) {
this.x = x;
this.y = y;
}
public Position15() {
}
}

View File

@ -13,4 +13,22 @@ import lombok.Data;
public class Properties15 {
@JsonProperty("File Size")
private String fileSize;
@JsonProperty("Log Level")
private String logLevel;
@JsonProperty("Log Payload")
private String logPayload;
@JsonProperty("attributes-to-log-regex")
private String attributesToLogRegex;
@JsonProperty("Log FlowFile Properties")
private String LogFlowFileProperties;
@JsonProperty("Output Format")
private String outputFormat;
@JsonProperty("character-set")
private String characterSet;
}

View File

@ -17,7 +17,7 @@ public class ProcessGroupsInfoResult9 {
private String uri;
private Position9 position;
private Permissions9 permissions;
private List<String> bulletins;
// private List<String> bulletins;
private Component9 component;
private Status9 status;
private String runningCount;

View File

@ -0,0 +1,21 @@
package com.hzya.frame.nifiapi.operation;
/**
* @Authorliuyang
* @Packagecom.hzya.frame.nifiapi.operation
* @Projectfw-nifi
* @nameBaseNifiOperation
* @Date2025/5/21 17:50
* @FilenameBaseNifiOperation
*/
public interface BaseOperation {
/**
* 实例化某个应用对应nifi流程组id
*
* @param appProcessGroupId 需要复制的流程组id(应用id)比如Oracle应用idMysql应用id
* @param parentProcessGroupId 流程组id(应用id)的父流程组id
* @param copyTargetProcessGroupId 指定创建的流程组id
* @return 返回流程组实例化id
*/
String instantiateApp(String appProcessGroupId, String parentProcessGroupId, String copyTargetProcessGroupId) throws Exception;
}

View File

@ -0,0 +1,37 @@
package com.hzya.frame.nifiapi.operation;
import com.hzya.frame.nifiapi.model.joincreatemysqlapp.CreateMysqlApp;
import com.hzya.frame.nifiapi.model.joincreateoracleapp.CreateOracleApp;
/**
* NiFi应用实例化业务操作接口
*
* @Authorliuyang
* @Packagecom.hzya.frame.nifiapi.operation
* @Projectfw-nifi
* @nameNifiOperation
* @Date2025/5/21 17:05
* @FilenameNifiOperation
*/
public interface NifiOperation {
/**
* Oracle应用创建并实例化
* 主要包含一系列nifi接口调用操作其他应用开发封转类以此类推
* 1.创建nifi片段
* 2.把片段实例化
* 3.修改nifi控制器
* 4.新建上下文环境参数
* 5.绑定上下文环境参数到流程组
* 6.激活当前流程组内的所有控制器服务
*
* @param createOracleApp 封装创建Oracle应用所需要的入参
*/
void createOracleApp(CreateOracleApp createOracleApp) throws Exception;
/**
* Mysql应用创建并实例化
*
* @param createMysqlApp 封装创建Mysql应用所需要的入参
*/
void createMysqlApp(CreateMysqlApp createMysqlApp) throws Exception;
}

View File

@ -0,0 +1,80 @@
package com.hzya.frame.nifiapi.operation.impl;
import com.hzya.frame.nifiapi.model.joinsnippetinstance.SnippetInstanceJoin;
import com.hzya.frame.nifiapi.model.joinsnippets.Snippet;
import com.hzya.frame.nifiapi.model.joinsnippets.SnippetsJoin;
import com.hzya.frame.nifiapi.model.resultcreatesnippet.Flow15;
import com.hzya.frame.nifiapi.model.resultcreatesnippet.ProcessGroups15;
import com.hzya.frame.nifiapi.model.resultcreatesnippet.SnippetInstance15;
import com.hzya.frame.nifiapi.model.resultprocessgroupsinfo.ProcessGroupsInfoResult9;
import com.hzya.frame.nifiapi.model.resultprocessgroupsinfo.Revision9;
import com.hzya.frame.nifiapi.model.resultsnippets.Snippet13;
import com.hzya.frame.nifiapi.model.resultsnippets.SnippetResult13;
import com.hzya.frame.nifiapi.operation.BaseOperation;
import com.hzya.frame.nifiapi.service.NifiApiService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Repository;
import java.util.HashMap;
import java.util.Map;
/**
* @Authorliuyang
* @Packagecom.hzya.frame.nifiapi.operation.impl
* @Projectfw-nifi
* @nameBaseOperationImpl
* @Date2025/5/21 18:07
* @FilenameBaseOperationImpl
*/
@Repository(value = "BaseOperationImpl")
public class BaseOperationImpl implements BaseOperation {
@Autowired
private NifiApiService nifiApiService;
@Override
public String instantiateApp(String appProcessGroupId, String parentProcessGroupId, String copyTargetProcessGroupId) throws Exception {
String processGroups15Id = null;
try {
//1.创建流程组对应的片段得到片段id
//查询目标流程组版本
ProcessGroupsInfoResult9 processGroups = nifiApiService.getProcessGroups(appProcessGroupId);
Revision9 revision = processGroups.getRevision();
Map<String, String> processGroupsMap2 = new HashMap<>();
processGroupsMap2.put("version", revision.getVersion());//目标流程组版本号
Map<String, Map<String, String>> processGroupsMap = new HashMap<>();
processGroupsMap.put(appProcessGroupId, processGroupsMap2);//目标流程组id
Snippet snippet = new Snippet();
snippet.setParentGroupId(parentProcessGroupId);//目标流程组id的父流程组id
snippet.setProcessGroups(processGroupsMap);
SnippetsJoin snippetsJoin = new SnippetsJoin();
snippetsJoin.setSnippet(snippet);
SnippetResult13 snippets = nifiApiService.createSnippets(snippetsJoin);
Snippet13 snippet1 = snippets.getSnippet();
//2.将片段实例化到指定位置,得到实例化后的应用id
ProcessGroupsInfoResult9 processGroups2 = nifiApiService.getProcessGroups(copyTargetProcessGroupId);
Revision9 revision2 = processGroups2.getRevision();
Map<String, String> processGroupsMap3 = new HashMap<>();
processGroupsMap3.put("version", revision2.getVersion());//目标流程组版本号
Map<String, Map<String, String>> processGroupId = new HashMap<>();
processGroupId.put(copyTargetProcessGroupId, processGroupsMap3);//目标流程组id
SnippetInstanceJoin snippetInstanceJoin = new SnippetInstanceJoin();
snippetInstanceJoin.setSnippetId(snippet1.getId());
SnippetInstance15 snippetInstance = nifiApiService.createSnippetInstance(copyTargetProcessGroupId, snippetInstanceJoin);
Flow15 flow = snippetInstance.getFlow();
ProcessGroups15 processGroups15 = flow.getProcessGroups().get(0);
processGroups15Id = processGroups15.getId();
} catch (Exception e) {
throw new Exception(e);
}
return processGroups15Id;
}
}

View File

@ -0,0 +1,77 @@
package com.hzya.frame.nifiapi.operation.impl;
import com.hzya.frame.nifiapi.client.NifiClient;
import com.hzya.frame.nifiapi.model.joincreatemysqlapp.CreateMysqlApp;
import com.hzya.frame.nifiapi.model.joincreateoracleapp.CreateOracleApp;
import com.hzya.frame.nifiapi.model.joinsnippetinstance.SnippetInstanceJoin;
import com.hzya.frame.nifiapi.model.joinsnippets.Snippet;
import com.hzya.frame.nifiapi.model.joinsnippets.SnippetsJoin;
import com.hzya.frame.nifiapi.model.resultcreatesnippet.Flow15;
import com.hzya.frame.nifiapi.model.resultcreatesnippet.ProcessGroups15;
import com.hzya.frame.nifiapi.model.resultcreatesnippet.SnippetInstance15;
import com.hzya.frame.nifiapi.model.resultprocessgroupsinfo.ProcessGroupsInfoResult9;
import com.hzya.frame.nifiapi.model.resultprocessgroupsinfo.Revision9;
import com.hzya.frame.nifiapi.model.resultsnippets.Snippet13;
import com.hzya.frame.nifiapi.model.resultsnippets.SnippetResult13;
import com.hzya.frame.nifiapi.operation.BaseOperation;
import com.hzya.frame.nifiapi.operation.NifiOperation;
import com.hzya.frame.nifiapi.service.NifiApiService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Repository;
import java.util.HashMap;
import java.util.Map;
/**
* NiFi 操作实现类封装通用操作
*
* @Authorliuyang
* @Packagecom.hzya.frame.nifiapi.operation.impl
* @Projectfw-nifi
* @nameNifiOperationImpl
* @Date2025/5/21 17:06
* @FilenameNifiOperationImpl
*/
@Repository(value = "NifiOperationImpl")
public class NifiOperationImpl implements NifiOperation {
Logger logger = LoggerFactory.getLogger(NifiOperationImpl.class);
@Autowired
private NifiApiService nifiApiService;
@Autowired
private BaseOperation baseOperation;
@Override
public void createOracleApp(CreateOracleApp createOracleApp) throws Exception {
try {
//1.实例化Oracle应用
String appProcessGroupId = createOracleApp.getAppProcessGroupId();
String appProcessGroupId1 = createOracleApp.getParentProcessGroupId();
String copyTargetProcessGroupId = createOracleApp.getCopyTargetProcessGroupId();
String processId = baseOperation.instantiateApp(appProcessGroupId, appProcessGroupId1, copyTargetProcessGroupId);
logger.info("Oracle应用实例化后的流程组id:{}", processId);
//2.新增应用对应的上下文参数
} catch (Exception e) {
throw new Exception(e);
}
}
@Override
public void createMysqlApp(CreateMysqlApp createMysqlApp) throws Exception {
try {
//1.实例化Mysql应用
String appProcessGroupId = createMysqlApp.getAppProcessGroupId();
String appProcessGroupId1 = createMysqlApp.getParentProcessGroupId();
String copyTargetProcessGroupId = createMysqlApp.getCopyTargetProcessGroupId();
String processId = baseOperation.instantiateApp(appProcessGroupId, appProcessGroupId1, copyTargetProcessGroupId);
logger.info("Mysql应用实例化后的流程组id:{}", processId);
} catch (Exception e) {
throw new Exception(e);
}
}
}

View File

@ -1,6 +1,8 @@
package com.hzya.frame.nifiapi.service;
import cn.hutool.core.util.StrUtil;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.hzya.frame.nifiapi.client.NifiClient;
import com.hzya.frame.nifiapi.model.joinbindparametercontexts.BindParameterContextsJoin11;
import com.hzya.frame.nifiapi.model.joincontrollerenabled.EnOrDiControllerServices12;
@ -9,6 +11,8 @@ import com.hzya.frame.nifiapi.model.joincreateconnections.CreateConnections18;
import com.hzya.frame.nifiapi.model.joincreatetemp.CreateTemplateJoin;
import com.hzya.frame.nifiapi.model.joingetcontroller.ControllerService12;
import com.hzya.frame.nifiapi.model.joinparametercontexts.ParameterContextsJoin;
import com.hzya.frame.nifiapi.model.joinprocessgroups.Component7;
import com.hzya.frame.nifiapi.model.joinprocessgroups.Position7;
import com.hzya.frame.nifiapi.model.joinprocessgroups.ProcessGroupsJoin;
import com.hzya.frame.nifiapi.model.joinsnippetinstance.SnippetInstanceJoin;
import com.hzya.frame.nifiapi.model.joinsnippets.SnippetsJoin;
@ -17,7 +21,9 @@ import com.hzya.frame.nifiapi.model.joinupdateprocessor.RunStatusOrStop17;
import com.hzya.frame.nifiapi.model.nifitemplates.NifiTemplates;
import com.hzya.frame.nifiapi.model.processgroupid.ProcessGroupsId;
import com.hzya.frame.nifiapi.model.processgrouproot.ProcessGroupsRoot;
import com.hzya.frame.nifiapi.model.resultcreateprocessor.Component15;
import com.hzya.frame.nifiapi.model.resultcreateprocessor.CreateProcessorJoin15;
import com.hzya.frame.nifiapi.model.resultcreateprocessor.Position15;
import com.hzya.frame.nifiapi.model.resultcreateprocessors.CreateProcess16;
import com.hzya.frame.nifiapi.model.resultcreatesnippet.SnippetInstance15;
import com.hzya.frame.nifiapi.model.resultcreatetemplate.NewNifiTemplatete14;
@ -30,6 +36,9 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
/**
@ -47,9 +56,12 @@ public class NifiApiService {
private final NifiClient client;
private final ObjectMapper objectMapper;
@Autowired
public NifiApiService(NifiClient client) {
public NifiApiService(NifiClient client, ObjectMapper objectMapper) {
this.client = client;
this.objectMapper = objectMapper;
}
/**
@ -119,6 +131,11 @@ public class NifiApiService {
* 创建流程组
*/
public ProcessgroupsResult createProcessGroups(String parentGroupId, ProcessGroupsJoin processGroupsJoin) throws Exception {
Component7 component = processGroupsJoin.getComponent();
if (component != null && component.getPosition() == null) {
double[] doubles = calculatePosition(parentGroupId);
component.setPosition(new Position7(String.valueOf(doubles[0]), String.valueOf(doubles[1])));
}
return client.post(StrUtil.format("/process-groups/{}/process-groups", parentGroupId), processGroupsJoin, ProcessgroupsResult.class);
}
@ -207,6 +224,11 @@ public class NifiApiService {
* 将代码片段实例化到指定位置
*/
public SnippetInstance15 createSnippetInstance(String groupId, SnippetInstanceJoin snippetInstanceJoin) throws Exception {
if (snippetInstanceJoin.getOriginX() == null || snippetInstanceJoin.getOriginY() == null) {
double[] doubles = calculatePosition(groupId);
snippetInstanceJoin.setOriginX(String.valueOf(doubles[0]));
snippetInstanceJoin.setOriginY(String.valueOf(doubles[1]));
}
return client.post(StrUtil.format("/process-groups/{}/snippet-instance", groupId), snippetInstanceJoin, SnippetInstance15.class);
}
@ -214,6 +236,11 @@ public class NifiApiService {
* 创建处理器nifi1.28.1版本有364个处理器并且每个入参都有差异所以创建之前可以先查询一遍处理器得到对应的参数进行修改调整
*/
public CreateProcess16 createProcessor(String groupId, CreateProcessorJoin15 createProcessorJoin15) throws Exception {
Component15 component = createProcessorJoin15.getComponent();
if (component != null && component.getPosition() == null) {
double[] doubles = calculatePosition(groupId);
component.setPosition(new Position15(String.valueOf(doubles[0]), String.valueOf(doubles[1])));
}
return client.post(StrUtil.format("/process-groups/{}/processors", groupId), createProcessorJoin15, CreateProcess16.class);
}
@ -251,4 +278,58 @@ public class NifiApiService {
public CreateConnection18 createProcessorConnections(String processGroupsId, CreateConnections18 createConnections17) throws Exception {
return client.post(StrUtil.format("/process-groups/{}/connections", processGroupsId), createConnections17, CreateConnection18.class);
}
/**
* 计算新处理器或流程组的xy坐标
*
* @param parentGroupId 父流程组ID
* @return 包含X和Y坐标的数组
* @throws Exception 如果获取流程组信息失败
*/
public double[] calculatePosition(String parentGroupId) throws Exception {
String jsonString = client.getRaw(StrUtil.format("/flow/process-groups/{}", parentGroupId));
JsonNode json = objectMapper.readTree(jsonString);
//查找processGroupFlow.flow
JsonNode flow = json.path("processGroupFlow").path("flow");
List<double[]> positions = new ArrayList<>();
String[] componentTypes = {"processors", "processGroups", "funnels", "remoteProcessGroups", "inputPorts", "outputPorts", "labels"};
// 遍历所有组件类型,收集坐标
for (String type : componentTypes) {
JsonNode array = flow.path(type);
if (array.isArray()) {
for (JsonNode entity : array) {
JsonNode position = entity.path("component").path("position");
if (position.isMissingNode()) {
position = entity.path("position"); // 某些组件如端口可能直接包含 position
}
if (position.has("x") && position.has("y")) {
double x = position.get("x").asDouble();
double y = position.get("y").asDouble();
positions.add(new double[]{x, y});
}
}
}
}
// 如果没有子组件代表流程组为null返回默认坐标
if (positions.isEmpty()) {
return new double[]{100.0, 100.0};
}
// 计算新坐标基于最下方的组件向下偏移
double maxY = Double.MIN_VALUE;
double xAtMaxY = 0;
for (double[] pos : positions) {
if (pos[1] > maxY) {
maxY = pos[1];
xAtMaxY = pos[0];
}
}
double newX = xAtMaxY; // 与最下方组件保持相同X坐标
double newY = maxY + 260; // 向下偏移260像素
return new double[]{newX, newY};
}
}