From 58ff225c47576476e1696f61fefea995b0870e66 Mon Sep 17 00:00:00 2001 From: liuy <37787198+LiuyCodes@users.noreply.github.com> Date: Wed, 21 May 2025 18:20:50 +0800 Subject: [PATCH] =?UTF-8?q?feat(nifi):=20=E6=96=B0=E5=A2=9E=E5=BA=94?= =?UTF-8?q?=E7=94=A8=E5=AE=9E=E4=BE=8B=E5=8C=96=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 Oracle 和 MySQL 应用实例化功能- 实现创建流程组、处理器和连接的功能 - 添加计算新组件坐标的逻辑 - 封装基础操作和 NiFi 操作接口 --- .../hzya/frame/nifiapi/client/NifiClient.java | 15 ++++ .../model/basemodel/JoinBashModel.java | 22 +++++ .../joincreatemysqlapp/CreateMysqlApp.java | 16 ++++ .../joincreateoracleapp/CreateOracleApp.java | 16 ++++ .../model/joinprocessgroups/Component7.java | 1 + .../model/joinprocessgroups/Position7.java | 25 ++++++ .../model/resultcreateprocessor/Config15.java | 2 + .../resultcreateprocessor/Position15.java | 9 ++ .../resultcreateprocessor/Properties15.java | 18 ++++ .../ProcessGroupsInfoResult9.java | 2 +- .../nifiapi/operation/BaseOperation.java | 21 +++++ .../nifiapi/operation/NifiOperation.java | 37 +++++++++ .../operation/impl/BaseOperationImpl.java | 80 ++++++++++++++++++ .../operation/impl/NifiOperationImpl.java | 77 +++++++++++++++++ .../frame/nifiapi/service/NifiApiService.java | 83 ++++++++++++++++++- 15 files changed, 422 insertions(+), 2 deletions(-) create mode 100644 fw-nifi/src/main/java/com/hzya/frame/nifiapi/model/basemodel/JoinBashModel.java create mode 100644 fw-nifi/src/main/java/com/hzya/frame/nifiapi/model/joincreatemysqlapp/CreateMysqlApp.java create mode 100644 fw-nifi/src/main/java/com/hzya/frame/nifiapi/model/joincreateoracleapp/CreateOracleApp.java create mode 100644 fw-nifi/src/main/java/com/hzya/frame/nifiapi/model/joinprocessgroups/Position7.java create mode 100644 fw-nifi/src/main/java/com/hzya/frame/nifiapi/operation/BaseOperation.java create mode 100644 fw-nifi/src/main/java/com/hzya/frame/nifiapi/operation/NifiOperation.java create mode 100644 fw-nifi/src/main/java/com/hzya/frame/nifiapi/operation/impl/BaseOperationImpl.java create mode 100644 fw-nifi/src/main/java/com/hzya/frame/nifiapi/operation/impl/NifiOperationImpl.java diff --git a/fw-nifi/src/main/java/com/hzya/frame/nifiapi/client/NifiClient.java b/fw-nifi/src/main/java/com/hzya/frame/nifiapi/client/NifiClient.java index 9058cc80..6fd6355d 100644 --- a/fw-nifi/src/main/java/com/hzya/frame/nifiapi/client/NifiClient.java +++ b/fw-nifi/src/main/java/com/hzya/frame/nifiapi/client/NifiClient.java @@ -62,6 +62,21 @@ public class NifiClient { }); } + /** + * 获取原始JSON响应字符串,不进行反序列化 + */ + public String getRaw(String path) throws IOException { + return executeRequestWithRetry(() -> { + Request request = new Request.Builder().url(config.getApiUrl() + path).get().header("Authorization", "Bearer " + accessToken.get()).build(); + try (Response response = httpClient.newCall(request).execute()) { + if (!response.isSuccessful()) { + throw new IOException("意外的响应码: " + response.code()); + } + return response.body().string(); + } + }); + } + /** * 执行GET请求并返回文件流,供调用者处理(如保存到文件或浏览器下载) */ diff --git a/fw-nifi/src/main/java/com/hzya/frame/nifiapi/model/basemodel/JoinBashModel.java b/fw-nifi/src/main/java/com/hzya/frame/nifiapi/model/basemodel/JoinBashModel.java new file mode 100644 index 00000000..208ded92 --- /dev/null +++ b/fw-nifi/src/main/java/com/hzya/frame/nifiapi/model/basemodel/JoinBashModel.java @@ -0,0 +1,22 @@ +package com.hzya.frame.nifiapi.model.basemodel; + +import lombok.Data; + +/** + * 基类封转创建应用操作公共的字段 + * + * @Author:liuyang + * @Package:com.hzya.frame.nifiapi.model.joincreateoracleapp + * @Project:fw-nifi + * @name:CreateOracleApp + * @Date:2025/5/21 17:44 + * @Filename:CreateOracleApp + */ +@Data +public class JoinBashModel { + private String appProcessGroupId; + private String parentProcessGroupId; + + private String copyTargetProcessGroupId; +// private String copyTargetParentProcessGroupId; +} diff --git a/fw-nifi/src/main/java/com/hzya/frame/nifiapi/model/joincreatemysqlapp/CreateMysqlApp.java b/fw-nifi/src/main/java/com/hzya/frame/nifiapi/model/joincreatemysqlapp/CreateMysqlApp.java new file mode 100644 index 00000000..75dd5068 --- /dev/null +++ b/fw-nifi/src/main/java/com/hzya/frame/nifiapi/model/joincreatemysqlapp/CreateMysqlApp.java @@ -0,0 +1,16 @@ +package com.hzya.frame.nifiapi.model.joincreatemysqlapp; + +import com.hzya.frame.nifiapi.model.basemodel.JoinBashModel; +import lombok.Data; + +/** + * @Author:liuyang + * @Package:com.hzya.frame.nifiapi.model.joincreateoracleapp + * @Project:fw-nifi + * @name:CreateOracleApp + * @Date:2025/5/21 17:44 + * @Filename:CreateOracleApp + */ +@Data +public class CreateMysqlApp extends JoinBashModel { +} \ No newline at end of file diff --git a/fw-nifi/src/main/java/com/hzya/frame/nifiapi/model/joincreateoracleapp/CreateOracleApp.java b/fw-nifi/src/main/java/com/hzya/frame/nifiapi/model/joincreateoracleapp/CreateOracleApp.java new file mode 100644 index 00000000..273d368a --- /dev/null +++ b/fw-nifi/src/main/java/com/hzya/frame/nifiapi/model/joincreateoracleapp/CreateOracleApp.java @@ -0,0 +1,16 @@ +package com.hzya.frame.nifiapi.model.joincreateoracleapp; + +import com.hzya.frame.nifiapi.model.basemodel.JoinBashModel; +import lombok.Data; + +/** + * @Author:liuyang + * @Package:com.hzya.frame.nifiapi.model.joincreateoracleapp + * @Project:fw-nifi + * @name:CreateOracleApp + * @Date:2025/5/21 17:44 + * @Filename:CreateOracleApp + */ +@Data +public class CreateOracleApp extends JoinBashModel { +} \ No newline at end of file diff --git a/fw-nifi/src/main/java/com/hzya/frame/nifiapi/model/joinprocessgroups/Component7.java b/fw-nifi/src/main/java/com/hzya/frame/nifiapi/model/joinprocessgroups/Component7.java index 5206fc61..c9dbb36a 100644 --- a/fw-nifi/src/main/java/com/hzya/frame/nifiapi/model/joinprocessgroups/Component7.java +++ b/fw-nifi/src/main/java/com/hzya/frame/nifiapi/model/joinprocessgroups/Component7.java @@ -5,4 +5,5 @@ import lombok.Data; @Data public class Component7 { private String name; + private Position7 position; } \ No newline at end of file diff --git a/fw-nifi/src/main/java/com/hzya/frame/nifiapi/model/joinprocessgroups/Position7.java b/fw-nifi/src/main/java/com/hzya/frame/nifiapi/model/joinprocessgroups/Position7.java new file mode 100644 index 00000000..42f99121 --- /dev/null +++ b/fw-nifi/src/main/java/com/hzya/frame/nifiapi/model/joinprocessgroups/Position7.java @@ -0,0 +1,25 @@ +package com.hzya.frame.nifiapi.model.joinprocessgroups; + +import lombok.Data; + +/** + * @Author:liuyang + * @Package:com.hzya.frame.nifiapi.model.joinprocessgroups + * @Project:fw-nifi + * @name:Position7 + * @Date:2025/5/21 14:57 + * @Filename:Position7 + */ +@Data +public class Position7 { + private String x; + private String y; + + public Position7() { + } + + public Position7(String x, String y) { + this.x = x; + this.y = y; + } +} diff --git a/fw-nifi/src/main/java/com/hzya/frame/nifiapi/model/resultcreateprocessor/Config15.java b/fw-nifi/src/main/java/com/hzya/frame/nifiapi/model/resultcreateprocessor/Config15.java index 92282e8e..e619ca6f 100644 --- a/fw-nifi/src/main/java/com/hzya/frame/nifiapi/model/resultcreateprocessor/Config15.java +++ b/fw-nifi/src/main/java/com/hzya/frame/nifiapi/model/resultcreateprocessor/Config15.java @@ -1,5 +1,6 @@ package com.hzya.frame.nifiapi.model.resultcreateprocessor; +import com.fasterxml.jackson.annotation.JsonProperty; import lombok.Data; /** @@ -12,4 +13,5 @@ import lombok.Data; public class Config15 { private String schedulingPeriod; private Properties15 properties; + private String bulletinLevel; } \ No newline at end of file diff --git a/fw-nifi/src/main/java/com/hzya/frame/nifiapi/model/resultcreateprocessor/Position15.java b/fw-nifi/src/main/java/com/hzya/frame/nifiapi/model/resultcreateprocessor/Position15.java index c4bc5c90..775e5b8e 100644 --- a/fw-nifi/src/main/java/com/hzya/frame/nifiapi/model/resultcreateprocessor/Position15.java +++ b/fw-nifi/src/main/java/com/hzya/frame/nifiapi/model/resultcreateprocessor/Position15.java @@ -12,4 +12,13 @@ import lombok.Data; public class Position15 { private String x; private String y; + + public Position15(String x, String y) { + this.x = x; + this.y = y; + } + + public Position15() { + + } } \ No newline at end of file diff --git a/fw-nifi/src/main/java/com/hzya/frame/nifiapi/model/resultcreateprocessor/Properties15.java b/fw-nifi/src/main/java/com/hzya/frame/nifiapi/model/resultcreateprocessor/Properties15.java index b8a714ce..bc86400c 100644 --- a/fw-nifi/src/main/java/com/hzya/frame/nifiapi/model/resultcreateprocessor/Properties15.java +++ b/fw-nifi/src/main/java/com/hzya/frame/nifiapi/model/resultcreateprocessor/Properties15.java @@ -13,4 +13,22 @@ import lombok.Data; public class Properties15 { @JsonProperty("File Size") private String fileSize; + + @JsonProperty("Log Level") + private String logLevel; + + @JsonProperty("Log Payload") + private String logPayload; + + @JsonProperty("attributes-to-log-regex") + private String attributesToLogRegex; + + @JsonProperty("Log FlowFile Properties") + private String LogFlowFileProperties; + + @JsonProperty("Output Format") + private String outputFormat; + + @JsonProperty("character-set") + private String characterSet; } \ No newline at end of file diff --git a/fw-nifi/src/main/java/com/hzya/frame/nifiapi/model/resultprocessgroupsinfo/ProcessGroupsInfoResult9.java b/fw-nifi/src/main/java/com/hzya/frame/nifiapi/model/resultprocessgroupsinfo/ProcessGroupsInfoResult9.java index c300378a..73a38710 100644 --- a/fw-nifi/src/main/java/com/hzya/frame/nifiapi/model/resultprocessgroupsinfo/ProcessGroupsInfoResult9.java +++ b/fw-nifi/src/main/java/com/hzya/frame/nifiapi/model/resultprocessgroupsinfo/ProcessGroupsInfoResult9.java @@ -17,7 +17,7 @@ public class ProcessGroupsInfoResult9 { private String uri; private Position9 position; private Permissions9 permissions; - private List bulletins; + // private List bulletins; private Component9 component; private Status9 status; private String runningCount; diff --git a/fw-nifi/src/main/java/com/hzya/frame/nifiapi/operation/BaseOperation.java b/fw-nifi/src/main/java/com/hzya/frame/nifiapi/operation/BaseOperation.java new file mode 100644 index 00000000..ad9f50ac --- /dev/null +++ b/fw-nifi/src/main/java/com/hzya/frame/nifiapi/operation/BaseOperation.java @@ -0,0 +1,21 @@ +package com.hzya.frame.nifiapi.operation; + +/** + * @Author:liuyang + * @Package:com.hzya.frame.nifiapi.operation + * @Project:fw-nifi + * @name:BaseNifiOperation + * @Date:2025/5/21 17:50 + * @Filename:BaseNifiOperation + */ +public interface BaseOperation { + /** + * 实例化某个应用(对应nifi流程组id) + * + * @param appProcessGroupId 需要复制的流程组id(应用id),比如Oracle应用id、Mysql应用id + * @param parentProcessGroupId 流程组id(应用id)的父流程组id + * @param copyTargetProcessGroupId 指定创建的流程组id + * @return 返回流程组实例化id + */ + String instantiateApp(String appProcessGroupId, String parentProcessGroupId, String copyTargetProcessGroupId) throws Exception; +} diff --git a/fw-nifi/src/main/java/com/hzya/frame/nifiapi/operation/NifiOperation.java b/fw-nifi/src/main/java/com/hzya/frame/nifiapi/operation/NifiOperation.java new file mode 100644 index 00000000..7c753206 --- /dev/null +++ b/fw-nifi/src/main/java/com/hzya/frame/nifiapi/operation/NifiOperation.java @@ -0,0 +1,37 @@ +package com.hzya.frame.nifiapi.operation; + +import com.hzya.frame.nifiapi.model.joincreatemysqlapp.CreateMysqlApp; +import com.hzya.frame.nifiapi.model.joincreateoracleapp.CreateOracleApp; + +/** + * NiFi应用实例化业务操作接口 + * + * @Author:liuyang + * @Package:com.hzya.frame.nifiapi.operation + * @Project:fw-nifi + * @name:NifiOperation + * @Date:2025/5/21 17:05 + * @Filename:NifiOperation + */ +public interface NifiOperation { + /** + * Oracle应用创建并实例化 + * 主要包含一系列nifi接口调用操作,其他应用开发封转类以此类推 + * 1.创建nifi片段 + * 2.把片段实例化 + * 3.修改nifi控制器 + * 4.新建上下文环境参数 + * 5.绑定上下文环境参数到流程组 + * 6.激活当前流程组内的所有控制器服务 + * + * @param createOracleApp 封装创建Oracle应用所需要的入参 + */ + void createOracleApp(CreateOracleApp createOracleApp) throws Exception; + + /** + * Mysql应用创建并实例化 + * + * @param createMysqlApp 封装创建Mysql应用所需要的入参 + */ + void createMysqlApp(CreateMysqlApp createMysqlApp) throws Exception; +} diff --git a/fw-nifi/src/main/java/com/hzya/frame/nifiapi/operation/impl/BaseOperationImpl.java b/fw-nifi/src/main/java/com/hzya/frame/nifiapi/operation/impl/BaseOperationImpl.java new file mode 100644 index 00000000..6b2296ee --- /dev/null +++ b/fw-nifi/src/main/java/com/hzya/frame/nifiapi/operation/impl/BaseOperationImpl.java @@ -0,0 +1,80 @@ +package com.hzya.frame.nifiapi.operation.impl; + +import com.hzya.frame.nifiapi.model.joinsnippetinstance.SnippetInstanceJoin; +import com.hzya.frame.nifiapi.model.joinsnippets.Snippet; +import com.hzya.frame.nifiapi.model.joinsnippets.SnippetsJoin; +import com.hzya.frame.nifiapi.model.resultcreatesnippet.Flow15; +import com.hzya.frame.nifiapi.model.resultcreatesnippet.ProcessGroups15; +import com.hzya.frame.nifiapi.model.resultcreatesnippet.SnippetInstance15; +import com.hzya.frame.nifiapi.model.resultprocessgroupsinfo.ProcessGroupsInfoResult9; +import com.hzya.frame.nifiapi.model.resultprocessgroupsinfo.Revision9; +import com.hzya.frame.nifiapi.model.resultsnippets.Snippet13; +import com.hzya.frame.nifiapi.model.resultsnippets.SnippetResult13; +import com.hzya.frame.nifiapi.operation.BaseOperation; +import com.hzya.frame.nifiapi.service.NifiApiService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Repository; + +import java.util.HashMap; +import java.util.Map; + +/** + * @Author:liuyang + * @Package:com.hzya.frame.nifiapi.operation.impl + * @Project:fw-nifi + * @name:BaseOperationImpl + * @Date:2025/5/21 18:07 + * @Filename:BaseOperationImpl + */ +@Repository(value = "BaseOperationImpl") +public class BaseOperationImpl implements BaseOperation { + + @Autowired + private NifiApiService nifiApiService; + + @Override + public String instantiateApp(String appProcessGroupId, String parentProcessGroupId, String copyTargetProcessGroupId) throws Exception { + String processGroups15Id = null; + try { + //1.创建流程组对应的片段,得到片段id + //查询目标流程组版本 + ProcessGroupsInfoResult9 processGroups = nifiApiService.getProcessGroups(appProcessGroupId); + Revision9 revision = processGroups.getRevision(); + + Map processGroupsMap2 = new HashMap<>(); + processGroupsMap2.put("version", revision.getVersion());//目标流程组版本号 + + Map> processGroupsMap = new HashMap<>(); + processGroupsMap.put(appProcessGroupId, processGroupsMap2);//目标流程组id + + Snippet snippet = new Snippet(); + snippet.setParentGroupId(parentProcessGroupId);//目标流程组id的父流程组id + snippet.setProcessGroups(processGroupsMap); + + SnippetsJoin snippetsJoin = new SnippetsJoin(); + snippetsJoin.setSnippet(snippet); + SnippetResult13 snippets = nifiApiService.createSnippets(snippetsJoin); + Snippet13 snippet1 = snippets.getSnippet(); + + //2.将片段实例化到指定位置,得到实例化后的应用id + ProcessGroupsInfoResult9 processGroups2 = nifiApiService.getProcessGroups(copyTargetProcessGroupId); + Revision9 revision2 = processGroups2.getRevision(); + + Map processGroupsMap3 = new HashMap<>(); + processGroupsMap3.put("version", revision2.getVersion());//目标流程组版本号 + + Map> processGroupId = new HashMap<>(); + processGroupId.put(copyTargetProcessGroupId, processGroupsMap3);//目标流程组id + + SnippetInstanceJoin snippetInstanceJoin = new SnippetInstanceJoin(); + snippetInstanceJoin.setSnippetId(snippet1.getId()); + SnippetInstance15 snippetInstance = nifiApiService.createSnippetInstance(copyTargetProcessGroupId, snippetInstanceJoin); + Flow15 flow = snippetInstance.getFlow(); + ProcessGroups15 processGroups15 = flow.getProcessGroups().get(0); + processGroups15Id = processGroups15.getId(); + } catch (Exception e) { + throw new Exception(e); + } + return processGroups15Id; + } +} diff --git a/fw-nifi/src/main/java/com/hzya/frame/nifiapi/operation/impl/NifiOperationImpl.java b/fw-nifi/src/main/java/com/hzya/frame/nifiapi/operation/impl/NifiOperationImpl.java new file mode 100644 index 00000000..31a06389 --- /dev/null +++ b/fw-nifi/src/main/java/com/hzya/frame/nifiapi/operation/impl/NifiOperationImpl.java @@ -0,0 +1,77 @@ +package com.hzya.frame.nifiapi.operation.impl; + +import com.hzya.frame.nifiapi.client.NifiClient; +import com.hzya.frame.nifiapi.model.joincreatemysqlapp.CreateMysqlApp; +import com.hzya.frame.nifiapi.model.joincreateoracleapp.CreateOracleApp; +import com.hzya.frame.nifiapi.model.joinsnippetinstance.SnippetInstanceJoin; +import com.hzya.frame.nifiapi.model.joinsnippets.Snippet; +import com.hzya.frame.nifiapi.model.joinsnippets.SnippetsJoin; +import com.hzya.frame.nifiapi.model.resultcreatesnippet.Flow15; +import com.hzya.frame.nifiapi.model.resultcreatesnippet.ProcessGroups15; +import com.hzya.frame.nifiapi.model.resultcreatesnippet.SnippetInstance15; +import com.hzya.frame.nifiapi.model.resultprocessgroupsinfo.ProcessGroupsInfoResult9; +import com.hzya.frame.nifiapi.model.resultprocessgroupsinfo.Revision9; +import com.hzya.frame.nifiapi.model.resultsnippets.Snippet13; +import com.hzya.frame.nifiapi.model.resultsnippets.SnippetResult13; +import com.hzya.frame.nifiapi.operation.BaseOperation; +import com.hzya.frame.nifiapi.operation.NifiOperation; +import com.hzya.frame.nifiapi.service.NifiApiService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Repository; + +import java.util.HashMap; +import java.util.Map; + +/** + * NiFi 操作实现类,封装通用操作 + * + * @Author:liuyang + * @Package:com.hzya.frame.nifiapi.operation.impl + * @Project:fw-nifi + * @name:NifiOperationImpl + * @Date:2025/5/21 17:06 + * @Filename:NifiOperationImpl + */ +@Repository(value = "NifiOperationImpl") +public class NifiOperationImpl implements NifiOperation { + + Logger logger = LoggerFactory.getLogger(NifiOperationImpl.class); + + @Autowired + private NifiApiService nifiApiService; + + @Autowired + private BaseOperation baseOperation; + + @Override + public void createOracleApp(CreateOracleApp createOracleApp) throws Exception { + try { + //1.实例化Oracle应用 + String appProcessGroupId = createOracleApp.getAppProcessGroupId(); + String appProcessGroupId1 = createOracleApp.getParentProcessGroupId(); + String copyTargetProcessGroupId = createOracleApp.getCopyTargetProcessGroupId(); + String processId = baseOperation.instantiateApp(appProcessGroupId, appProcessGroupId1, copyTargetProcessGroupId); + logger.info("Oracle应用实例化后的流程组id:{}", processId); + //2.新增应用对应的上下文参数 + + } catch (Exception e) { + throw new Exception(e); + } + } + + @Override + public void createMysqlApp(CreateMysqlApp createMysqlApp) throws Exception { + try { + //1.实例化Mysql应用 + String appProcessGroupId = createMysqlApp.getAppProcessGroupId(); + String appProcessGroupId1 = createMysqlApp.getParentProcessGroupId(); + String copyTargetProcessGroupId = createMysqlApp.getCopyTargetProcessGroupId(); + String processId = baseOperation.instantiateApp(appProcessGroupId, appProcessGroupId1, copyTargetProcessGroupId); + logger.info("Mysql应用实例化后的流程组id:{}", processId); + } catch (Exception e) { + throw new Exception(e); + } + } +} diff --git a/fw-nifi/src/main/java/com/hzya/frame/nifiapi/service/NifiApiService.java b/fw-nifi/src/main/java/com/hzya/frame/nifiapi/service/NifiApiService.java index b76e1f90..957dcc2e 100644 --- a/fw-nifi/src/main/java/com/hzya/frame/nifiapi/service/NifiApiService.java +++ b/fw-nifi/src/main/java/com/hzya/frame/nifiapi/service/NifiApiService.java @@ -1,6 +1,8 @@ package com.hzya.frame.nifiapi.service; import cn.hutool.core.util.StrUtil; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import com.hzya.frame.nifiapi.client.NifiClient; import com.hzya.frame.nifiapi.model.joinbindparametercontexts.BindParameterContextsJoin11; import com.hzya.frame.nifiapi.model.joincontrollerenabled.EnOrDiControllerServices12; @@ -9,6 +11,8 @@ import com.hzya.frame.nifiapi.model.joincreateconnections.CreateConnections18; import com.hzya.frame.nifiapi.model.joincreatetemp.CreateTemplateJoin; import com.hzya.frame.nifiapi.model.joingetcontroller.ControllerService12; import com.hzya.frame.nifiapi.model.joinparametercontexts.ParameterContextsJoin; +import com.hzya.frame.nifiapi.model.joinprocessgroups.Component7; +import com.hzya.frame.nifiapi.model.joinprocessgroups.Position7; import com.hzya.frame.nifiapi.model.joinprocessgroups.ProcessGroupsJoin; import com.hzya.frame.nifiapi.model.joinsnippetinstance.SnippetInstanceJoin; import com.hzya.frame.nifiapi.model.joinsnippets.SnippetsJoin; @@ -17,7 +21,9 @@ import com.hzya.frame.nifiapi.model.joinupdateprocessor.RunStatusOrStop17; import com.hzya.frame.nifiapi.model.nifitemplates.NifiTemplates; import com.hzya.frame.nifiapi.model.processgroupid.ProcessGroupsId; import com.hzya.frame.nifiapi.model.processgrouproot.ProcessGroupsRoot; +import com.hzya.frame.nifiapi.model.resultcreateprocessor.Component15; import com.hzya.frame.nifiapi.model.resultcreateprocessor.CreateProcessorJoin15; +import com.hzya.frame.nifiapi.model.resultcreateprocessor.Position15; import com.hzya.frame.nifiapi.model.resultcreateprocessors.CreateProcess16; import com.hzya.frame.nifiapi.model.resultcreatesnippet.SnippetInstance15; import com.hzya.frame.nifiapi.model.resultcreatetemplate.NewNifiTemplatete14; @@ -30,6 +36,9 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.io.InputStream; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; import java.util.Map; /** @@ -47,9 +56,12 @@ public class NifiApiService { private final NifiClient client; + private final ObjectMapper objectMapper; + @Autowired - public NifiApiService(NifiClient client) { + public NifiApiService(NifiClient client, ObjectMapper objectMapper) { this.client = client; + this.objectMapper = objectMapper; } /** @@ -119,6 +131,11 @@ public class NifiApiService { * 创建流程组 */ public ProcessgroupsResult createProcessGroups(String parentGroupId, ProcessGroupsJoin processGroupsJoin) throws Exception { + Component7 component = processGroupsJoin.getComponent(); + if (component != null && component.getPosition() == null) { + double[] doubles = calculatePosition(parentGroupId); + component.setPosition(new Position7(String.valueOf(doubles[0]), String.valueOf(doubles[1]))); + } return client.post(StrUtil.format("/process-groups/{}/process-groups", parentGroupId), processGroupsJoin, ProcessgroupsResult.class); } @@ -207,6 +224,11 @@ public class NifiApiService { * 将代码片段实例化到指定位置 */ public SnippetInstance15 createSnippetInstance(String groupId, SnippetInstanceJoin snippetInstanceJoin) throws Exception { + if (snippetInstanceJoin.getOriginX() == null || snippetInstanceJoin.getOriginY() == null) { + double[] doubles = calculatePosition(groupId); + snippetInstanceJoin.setOriginX(String.valueOf(doubles[0])); + snippetInstanceJoin.setOriginY(String.valueOf(doubles[1])); + } return client.post(StrUtil.format("/process-groups/{}/snippet-instance", groupId), snippetInstanceJoin, SnippetInstance15.class); } @@ -214,6 +236,11 @@ public class NifiApiService { * 创建处理器,nifi1.28.1版本有364个处理器,并且每个入参都有差异,所以创建之前,可以先查询一遍处理器得到对应的参数,进行修改调整 */ public CreateProcess16 createProcessor(String groupId, CreateProcessorJoin15 createProcessorJoin15) throws Exception { + Component15 component = createProcessorJoin15.getComponent(); + if (component != null && component.getPosition() == null) { + double[] doubles = calculatePosition(groupId); + component.setPosition(new Position15(String.valueOf(doubles[0]), String.valueOf(doubles[1]))); + } return client.post(StrUtil.format("/process-groups/{}/processors", groupId), createProcessorJoin15, CreateProcess16.class); } @@ -251,4 +278,58 @@ public class NifiApiService { public CreateConnection18 createProcessorConnections(String processGroupsId, CreateConnections18 createConnections17) throws Exception { return client.post(StrUtil.format("/process-groups/{}/connections", processGroupsId), createConnections17, CreateConnection18.class); } + + /** + * 计算新处理器或流程组的x、y坐标 + * + * @param parentGroupId 父流程组ID + * @return 包含X和Y坐标的数组 + * @throws Exception 如果获取流程组信息失败 + */ + public double[] calculatePosition(String parentGroupId) throws Exception { + String jsonString = client.getRaw(StrUtil.format("/flow/process-groups/{}", parentGroupId)); + JsonNode json = objectMapper.readTree(jsonString); + + //查找processGroupFlow.flow + JsonNode flow = json.path("processGroupFlow").path("flow"); + + List positions = new ArrayList<>(); + String[] componentTypes = {"processors", "processGroups", "funnels", "remoteProcessGroups", "inputPorts", "outputPorts", "labels"}; + + // 遍历所有组件类型,收集坐标 + for (String type : componentTypes) { + JsonNode array = flow.path(type); + if (array.isArray()) { + for (JsonNode entity : array) { + JsonNode position = entity.path("component").path("position"); + if (position.isMissingNode()) { + position = entity.path("position"); // 某些组件如端口可能直接包含 position + } + if (position.has("x") && position.has("y")) { + double x = position.get("x").asDouble(); + double y = position.get("y").asDouble(); + positions.add(new double[]{x, y}); + } + } + } + } + + // 如果没有子组件,代表流程组为null,返回默认坐标 + if (positions.isEmpty()) { + return new double[]{100.0, 100.0}; + } + + // 计算新坐标:基于最下方的组件向下偏移 + double maxY = Double.MIN_VALUE; + double xAtMaxY = 0; + for (double[] pos : positions) { + if (pos[1] > maxY) { + maxY = pos[1]; + xAtMaxY = pos[0]; + } + } + double newX = xAtMaxY; // 与最下方组件保持相同X坐标 + double newY = maxY + 260; // 向下偏移260像素 + return new double[]{newX, newY}; + } } \ No newline at end of file