diff --git a/fw-nifi/src/main/java/com/hzya/frame/nifiapi/config/NifiServiceConfig.java b/fw-nifi/src/main/java/com/hzya/frame/nifiapi/config/NifiServiceConfig.java index 6111ea4a..73321c27 100644 --- a/fw-nifi/src/main/java/com/hzya/frame/nifiapi/config/NifiServiceConfig.java +++ b/fw-nifi/src/main/java/com/hzya/frame/nifiapi/config/NifiServiceConfig.java @@ -29,4 +29,7 @@ public class NifiServiceConfig { @Value("${nifi.api.controllerModifymark:接口修改标记}") private String controllerModifyMark; + + @Value("${nifi.api.relationshipMark:接口关系标记}") + private String relationshipMark; } diff --git a/fw-nifi/src/main/java/com/hzya/frame/nifiapi/model/processgroupid/Component.java b/fw-nifi/src/main/java/com/hzya/frame/nifiapi/model/processgroupid/Component.java index 89c9685a..d599c404 100644 --- a/fw-nifi/src/main/java/com/hzya/frame/nifiapi/model/processgroupid/Component.java +++ b/fw-nifi/src/main/java/com/hzya/frame/nifiapi/model/processgroupid/Component.java @@ -6,4 +6,8 @@ import lombok.Data; public class Component { private String id; private String name; + private String comments; + private String state; + private String type; + private String concurrentlySchedulableTaskCount; } \ No newline at end of file diff --git a/fw-nifi/src/main/java/com/hzya/frame/nifiapi/model/resultNeedModifyController/PortFilterResult.java b/fw-nifi/src/main/java/com/hzya/frame/nifiapi/model/resultNeedModifyController/PortFilterResult.java new file mode 100644 index 00000000..22810486 --- /dev/null +++ b/fw-nifi/src/main/java/com/hzya/frame/nifiapi/model/resultNeedModifyController/PortFilterResult.java @@ -0,0 +1,29 @@ +package com.hzya.frame.nifiapi.model.resultNeedModifyController; + +import com.hzya.frame.nifiapi.model.processgroupid.InputPorts; +import com.hzya.frame.nifiapi.model.processgroupid.OutputPorts; +import lombok.Data; + +import java.util.List; + +/** + * @Author:liuyang + * @Package:com.hzya.frame.nifiapi.model.resultNeedModifyController + * @Project:fw-nifi + * @name:PortFilterResult + * @Date:2025/5/23 14:51 + * @Filename:PortFilterResult + */ +@Data +public class PortFilterResult { + private List inputPorts; + private List outputPorts; + + public PortFilterResult() { + } + + public PortFilterResult(List inputPorts, List outputPorts) { + this.inputPorts = inputPorts; + this.outputPorts = outputPorts; + } +} diff --git a/fw-nifi/src/main/java/com/hzya/frame/nifiapi/operation/INifiBaseOperation.java b/fw-nifi/src/main/java/com/hzya/frame/nifiapi/operation/INifiBaseOperation.java index da89185f..47c7b5ed 100644 --- a/fw-nifi/src/main/java/com/hzya/frame/nifiapi/operation/INifiBaseOperation.java +++ b/fw-nifi/src/main/java/com/hzya/frame/nifiapi/operation/INifiBaseOperation.java @@ -4,6 +4,7 @@ import com.hzya.frame.nifiapi.model.joincreateprocessconnection.CreateProcessorC import com.hzya.frame.nifiapi.model.joincreparamcontext.CreateParamContextJoin; import com.hzya.frame.nifiapi.model.joinfindneedmodifycontroller.FindNeedModifyController; import com.hzya.frame.nifiapi.model.joingetallcontrollerservice.ControllerServices19; +import com.hzya.frame.nifiapi.model.resultNeedModifyController.PortFilterResult; import java.util.List; @@ -45,9 +46,20 @@ public interface INifiBaseOperation { /** * 查找带有接口修改标记的控制器服务,eg:接口修改标记:CZFJwnb9sb + * + * @param processGroupId 流程组id + * @param mark 接口修改标记字符串 */ ControllerServices19 getNeedModifyControllerServicesByProcessGroup(String processGroupId, String mark) throws Exception; + /** + * 查找带有接口关系标记的输入和输出,eg:接口关系标记:f5228bc8-0360-41eb-a640-9f4df953937c + * + * @param processGroupId 流程组id + * @param mark 接口修改标记字符串 + */ + PortFilterResult getNeedModifyControllerRelationshipByProcessGroup(String processGroupId, String mark) throws Exception; + /** * 查找某个流程组(应用app)内,带有"修改标记"的控制器服务,并更新控制器,从而达到实例化要求 * diff --git a/fw-nifi/src/main/java/com/hzya/frame/nifiapi/operation/impl/NifiBaseOperationImpl.java b/fw-nifi/src/main/java/com/hzya/frame/nifiapi/operation/impl/NifiBaseOperationImpl.java index ec90b6d8..8b1540b8 100644 --- a/fw-nifi/src/main/java/com/hzya/frame/nifiapi/operation/impl/NifiBaseOperationImpl.java +++ b/fw-nifi/src/main/java/com/hzya/frame/nifiapi/operation/impl/NifiBaseOperationImpl.java @@ -21,6 +21,8 @@ import com.hzya.frame.nifiapi.model.joinparametercontexts.*; import com.hzya.frame.nifiapi.model.joinsnippetinstance.SnippetInstanceJoin; import com.hzya.frame.nifiapi.model.joinsnippets.Snippet; import com.hzya.frame.nifiapi.model.joinsnippets.SnippetsJoin; +import com.hzya.frame.nifiapi.model.processgroupid.*; +import com.hzya.frame.nifiapi.model.resultNeedModifyController.PortFilterResult; import com.hzya.frame.nifiapi.model.resultcreatesnippet.Flow15; import com.hzya.frame.nifiapi.model.resultcreatesnippet.ProcessGroups15; import com.hzya.frame.nifiapi.model.resultcreatesnippet.SnippetInstance15; @@ -31,6 +33,7 @@ import com.hzya.frame.nifiapi.model.resultsnippets.Snippet13; import com.hzya.frame.nifiapi.model.resultsnippets.SnippetResult13; import com.hzya.frame.nifiapi.operation.INifiBaseOperation; import com.hzya.frame.nifiapi.service.NifiApiService; +import com.hzya.frame.nifiapi.util.NifiFilterUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -61,6 +64,9 @@ public class NifiBaseOperationImpl implements INifiBaseOperation { @Autowired private NifiServiceConfig nifiServiceConfig; + @Autowired + private NifiFilterUtil nifiFilterUtil; + @Override public String instantiateApp(String appProcessGroupId, String parentProcessGroupId, String copyTargetProcessGroupId) throws Exception { String processGroups15Id = null; @@ -154,11 +160,7 @@ public class NifiBaseOperationImpl implements INifiBaseOperation { List modifiedControllers = new ArrayList<>(); GetAllController19 allControllerServices = nifiApiService.getAllControllerServices(processGroupId); if (allControllerServices != null && allControllerServices.getControllerServices() != null) { - modifiedControllers = allControllerServices.getControllerServices().stream().filter(service -> - service.getComponent() != null - && service.getComponent().getComments() != null - && service.getComponent().getComments().contains(controllerModifyMark) - ).collect(Collectors.toList()); + modifiedControllers = allControllerServices.getControllerServices().stream().filter(service -> service.getComponent() != null && service.getComponent().getComments() != null && service.getComponent().getComments().contains(controllerModifyMark)).collect(Collectors.toList()); } return modifiedControllers; } @@ -169,15 +171,31 @@ public class NifiBaseOperationImpl implements INifiBaseOperation { List modifiedControllers = new ArrayList<>(); GetAllController19 allControllerServices = nifiApiService.getAllControllerServices(processGroupId); if (allControllerServices != null && allControllerServices.getControllerServices() != null) { - modifiedControllers = allControllerServices.getControllerServices().stream().filter(service -> - service.getComponent() != null - && service.getComponent().getComments() != null - && service.getComponent().getComments().contains(mark) - ).collect(Collectors.toList()); + modifiedControllers = allControllerServices.getControllerServices().stream().filter(service -> service.getComponent() != null && service.getComponent().getComments() != null && service.getComponent().getComments().contains(mark)).collect(Collectors.toList()); } return modifiedControllers.get(0); } + @Override + public PortFilterResult getNeedModifyControllerRelationshipByProcessGroup(String processGroupId, String mark) throws Exception { + try { + //查询指定流程组 + ProcessGroupsId processGroupsId = nifiApiService.queryFlowProcessGroupsRoot(processGroupId); + if (processGroupsId != null && processGroupsId.getProcessGroupFlow() != null) { + ProcessGroupFlow processGroupFlow = processGroupsId.getProcessGroupFlow(); + if (processGroupFlow.getFlow() != null) { + Flow flow = processGroupFlow.getFlow(); + List inputPorts = nifiFilterUtil.filterPortsByComments(flow.getInputPorts(), mark, "inputPorts"); + List outputPorts = nifiFilterUtil.filterPortsByComments(flow.getOutputPorts(), mark, "outputPorts"); + return new PortFilterResult(inputPorts, outputPorts); + } + } + } catch (Exception e) { + throw new Exception(e); + } + return new PortFilterResult(); + } + @Override public void findNeedModifyControllerAndChangesOccur(FindNeedModifyController findNeedModifyController, boolean isThrowError, boolean enabledAll) throws Exception { try { diff --git a/fw-nifi/src/main/java/com/hzya/frame/nifiapi/service/NifiApiService.java b/fw-nifi/src/main/java/com/hzya/frame/nifiapi/service/NifiApiService.java index 31596829..e9165ab8 100644 --- a/fw-nifi/src/main/java/com/hzya/frame/nifiapi/service/NifiApiService.java +++ b/fw-nifi/src/main/java/com/hzya/frame/nifiapi/service/NifiApiService.java @@ -38,7 +38,6 @@ import org.springframework.stereotype.Service; import java.io.InputStream; import java.util.ArrayList; -import java.util.Iterator; import java.util.List; import java.util.Map; @@ -80,7 +79,7 @@ public class NifiApiService { } /** - * 查询指定流程 + * 查询指定流程组,获取指定流程组的完整结构 */ public ProcessGroupsId queryFlowProcessGroupsRoot(String flowGroupId) throws Exception { return client.get(StrUtil.format("/flow/process-groups/{}", flowGroupId), ProcessGroupsId.class); @@ -128,6 +127,13 @@ public class NifiApiService { return client.get(StrUtil.format("/process-groups/{}", processGroups), ProcessGroupsInfoResult9.class); } + /** + * 查询某个流程组的子流程组列表 + */ +// public void getSubProcessGroups(String processGroupsId) throws Exception { +// client.get(StrUtil.format("/process-groups/{}/process-groups", processGroupsId), null); +// } + /** * 创建流程组 */ diff --git a/fw-nifi/src/main/java/com/hzya/frame/nifiapi/util/NifiFilterUtil.java b/fw-nifi/src/main/java/com/hzya/frame/nifiapi/util/NifiFilterUtil.java new file mode 100644 index 00000000..f73a8e22 --- /dev/null +++ b/fw-nifi/src/main/java/com/hzya/frame/nifiapi/util/NifiFilterUtil.java @@ -0,0 +1,65 @@ +package com.hzya.frame.nifiapi.util; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +/** + * @Author:liuyang + * @Package:com.hzya.frame.nifiapi.util + * @Project:fw-nifi + * @name:NifiFilterUtil + * @Date:2025/5/23 14:45 + * @Filename:NifiFilterUtil + */ +@Component +public class NifiFilterUtil { + + Logger logger = LoggerFactory.getLogger(NifiFilterUtil.class); + + /** + * 过滤端口列表,保留 comments 包含指定 mark 的端口 + * + * @param ports 端口列表(InputPorts 或 OutputPorts) + * @param mark 过滤关键字 + * @param portType 端口类型(用于日志) + * @param 端口类型(InputPorts 或 OutputPorts) + * @return 过滤后的端口列表 + */ + public List filterPortsByComments(List ports, String mark, String portType) { + if (ports == null || mark == null) { + logger.debug("未提供{}或标记,返回空列表", portType); + return Collections.emptyList(); + } + + return ports.stream() + .filter(port -> hasMatchingComments(port, mark)) + .collect(Collectors.toList()); + } + + /** + * 检查端口的 comments 是否包含指定 mark + * + * @param port 端口对象 + * @param mark 过滤关键字 + * @return 是否匹配 + */ + private boolean hasMatchingComments(Object port, String mark) { + try { + // 假设 InputPorts 和 OutputPorts 有 getComponent 方法 + Object component = port.getClass().getMethod("getComponent").invoke(port); + if (component == null) { + return false; + } + String comments = (String) component.getClass().getMethod("getComments").invoke(component); + return comments != null && comments.contains(mark); + } catch (Exception e) { + logger.error("访问comments出错: {}", port, e); + return false; + } + } +}