feat(nifi): 增加接口关系标记功能

- 在 NifiServiceConfig 中添加 relationshipMark配置项
- 在 Component 类中增加 comments、state、type等字段
- 新增 PortFilterResult 类用于存储过滤后的端口结果
- 在 NifiBaseOperationImpl 中实现 getNeedModifyControllerRelationshipByProcessGroup 方法
- 在 INifiBaseOperation 接口中添加相应的方法声明
- 新增 NifiFilterUtil 工具类用于过滤端口
This commit is contained in:
liuy 2025-05-23 15:12:12 +08:00
parent 095e58d812
commit a5ee210a22
7 changed files with 149 additions and 12 deletions

View File

@ -29,4 +29,7 @@ public class NifiServiceConfig {
@Value("${nifi.api.controllerModifymark:接口修改标记}")
private String controllerModifyMark;
@Value("${nifi.api.relationshipMark:接口关系标记}")
private String relationshipMark;
}

View File

@ -6,4 +6,8 @@ import lombok.Data;
public class Component {
private String id;
private String name;
private String comments;
private String state;
private String type;
private String concurrentlySchedulableTaskCount;
}

View File

@ -0,0 +1,29 @@
package com.hzya.frame.nifiapi.model.resultNeedModifyController;
import com.hzya.frame.nifiapi.model.processgroupid.InputPorts;
import com.hzya.frame.nifiapi.model.processgroupid.OutputPorts;
import lombok.Data;
import java.util.List;
/**
* @Authorliuyang
* @Packagecom.hzya.frame.nifiapi.model.resultNeedModifyController
* @Projectfw-nifi
* @namePortFilterResult
* @Date2025/5/23 14:51
* @FilenamePortFilterResult
*/
@Data
public class PortFilterResult {
private List<InputPorts> inputPorts;
private List<OutputPorts> outputPorts;
public PortFilterResult() {
}
public PortFilterResult(List<InputPorts> inputPorts, List<OutputPorts> outputPorts) {
this.inputPorts = inputPorts;
this.outputPorts = outputPorts;
}
}

View File

@ -4,6 +4,7 @@ import com.hzya.frame.nifiapi.model.joincreateprocessconnection.CreateProcessorC
import com.hzya.frame.nifiapi.model.joincreparamcontext.CreateParamContextJoin;
import com.hzya.frame.nifiapi.model.joinfindneedmodifycontroller.FindNeedModifyController;
import com.hzya.frame.nifiapi.model.joingetallcontrollerservice.ControllerServices19;
import com.hzya.frame.nifiapi.model.resultNeedModifyController.PortFilterResult;
import java.util.List;
@ -45,9 +46,20 @@ public interface INifiBaseOperation {
/**
* 查找带有接口修改标记的控制器服务eg接口修改标记:CZFJwnb9sb
*
* @param processGroupId 流程组id
* @param mark 接口修改标记字符串
*/
ControllerServices19 getNeedModifyControllerServicesByProcessGroup(String processGroupId, String mark) throws Exception;
/**
* 查找带有接口关系标记的输入和输出eg接口关系标记:f5228bc8-0360-41eb-a640-9f4df953937c
*
* @param processGroupId 流程组id
* @param mark 接口修改标记字符串
*/
PortFilterResult getNeedModifyControllerRelationshipByProcessGroup(String processGroupId, String mark) throws Exception;
/**
* 查找某个流程组(应用app),带有"修改标记"的控制器服务并更新控制器从而达到实例化要求
*

View File

@ -21,6 +21,8 @@ import com.hzya.frame.nifiapi.model.joinparametercontexts.*;
import com.hzya.frame.nifiapi.model.joinsnippetinstance.SnippetInstanceJoin;
import com.hzya.frame.nifiapi.model.joinsnippets.Snippet;
import com.hzya.frame.nifiapi.model.joinsnippets.SnippetsJoin;
import com.hzya.frame.nifiapi.model.processgroupid.*;
import com.hzya.frame.nifiapi.model.resultNeedModifyController.PortFilterResult;
import com.hzya.frame.nifiapi.model.resultcreatesnippet.Flow15;
import com.hzya.frame.nifiapi.model.resultcreatesnippet.ProcessGroups15;
import com.hzya.frame.nifiapi.model.resultcreatesnippet.SnippetInstance15;
@ -31,6 +33,7 @@ import com.hzya.frame.nifiapi.model.resultsnippets.Snippet13;
import com.hzya.frame.nifiapi.model.resultsnippets.SnippetResult13;
import com.hzya.frame.nifiapi.operation.INifiBaseOperation;
import com.hzya.frame.nifiapi.service.NifiApiService;
import com.hzya.frame.nifiapi.util.NifiFilterUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -61,6 +64,9 @@ public class NifiBaseOperationImpl implements INifiBaseOperation {
@Autowired
private NifiServiceConfig nifiServiceConfig;
@Autowired
private NifiFilterUtil nifiFilterUtil;
@Override
public String instantiateApp(String appProcessGroupId, String parentProcessGroupId, String copyTargetProcessGroupId) throws Exception {
String processGroups15Id = null;
@ -154,11 +160,7 @@ public class NifiBaseOperationImpl implements INifiBaseOperation {
List<ControllerServices19> modifiedControllers = new ArrayList<>();
GetAllController19 allControllerServices = nifiApiService.getAllControllerServices(processGroupId);
if (allControllerServices != null && allControllerServices.getControllerServices() != null) {
modifiedControllers = allControllerServices.getControllerServices().stream().filter(service ->
service.getComponent() != null
&& service.getComponent().getComments() != null
&& service.getComponent().getComments().contains(controllerModifyMark)
).collect(Collectors.toList());
modifiedControllers = allControllerServices.getControllerServices().stream().filter(service -> service.getComponent() != null && service.getComponent().getComments() != null && service.getComponent().getComments().contains(controllerModifyMark)).collect(Collectors.toList());
}
return modifiedControllers;
}
@ -169,15 +171,31 @@ public class NifiBaseOperationImpl implements INifiBaseOperation {
List<ControllerServices19> modifiedControllers = new ArrayList<>();
GetAllController19 allControllerServices = nifiApiService.getAllControllerServices(processGroupId);
if (allControllerServices != null && allControllerServices.getControllerServices() != null) {
modifiedControllers = allControllerServices.getControllerServices().stream().filter(service ->
service.getComponent() != null
&& service.getComponent().getComments() != null
&& service.getComponent().getComments().contains(mark)
).collect(Collectors.toList());
modifiedControllers = allControllerServices.getControllerServices().stream().filter(service -> service.getComponent() != null && service.getComponent().getComments() != null && service.getComponent().getComments().contains(mark)).collect(Collectors.toList());
}
return modifiedControllers.get(0);
}
@Override
public PortFilterResult getNeedModifyControllerRelationshipByProcessGroup(String processGroupId, String mark) throws Exception {
try {
//查询指定流程组
ProcessGroupsId processGroupsId = nifiApiService.queryFlowProcessGroupsRoot(processGroupId);
if (processGroupsId != null && processGroupsId.getProcessGroupFlow() != null) {
ProcessGroupFlow processGroupFlow = processGroupsId.getProcessGroupFlow();
if (processGroupFlow.getFlow() != null) {
Flow flow = processGroupFlow.getFlow();
List<InputPorts> inputPorts = nifiFilterUtil.filterPortsByComments(flow.getInputPorts(), mark, "inputPorts");
List<OutputPorts> outputPorts = nifiFilterUtil.filterPortsByComments(flow.getOutputPorts(), mark, "outputPorts");
return new PortFilterResult(inputPorts, outputPorts);
}
}
} catch (Exception e) {
throw new Exception(e);
}
return new PortFilterResult();
}
@Override
public void findNeedModifyControllerAndChangesOccur(FindNeedModifyController findNeedModifyController, boolean isThrowError, boolean enabledAll) throws Exception {
try {

View File

@ -38,7 +38,6 @@ import org.springframework.stereotype.Service;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@ -80,7 +79,7 @@ public class NifiApiService {
}
/**
* 查询指定流程
* 查询指定流程获取指定流程组的完整结构
*/
public ProcessGroupsId queryFlowProcessGroupsRoot(String flowGroupId) throws Exception {
return client.get(StrUtil.format("/flow/process-groups/{}", flowGroupId), ProcessGroupsId.class);
@ -128,6 +127,13 @@ public class NifiApiService {
return client.get(StrUtil.format("/process-groups/{}", processGroups), ProcessGroupsInfoResult9.class);
}
/**
* 查询某个流程组的子流程组列表
*/
// public void getSubProcessGroups(String processGroupsId) throws Exception {
// client.get(StrUtil.format("/process-groups/{}/process-groups", processGroupsId), null);
// }
/**
* 创建流程组
*/

View File

@ -0,0 +1,65 @@
package com.hzya.frame.nifiapi.util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
/**
* @Authorliuyang
* @Packagecom.hzya.frame.nifiapi.util
* @Projectfw-nifi
* @nameNifiFilterUtil
* @Date2025/5/23 14:45
* @FilenameNifiFilterUtil
*/
@Component
public class NifiFilterUtil {
Logger logger = LoggerFactory.getLogger(NifiFilterUtil.class);
/**
* 过滤端口列表保留 comments 包含指定 mark 的端口
*
* @param ports 端口列表InputPorts OutputPorts
* @param mark 过滤关键字
* @param portType 端口类型用于日志
* @param <T> 端口类型InputPorts OutputPorts
* @return 过滤后的端口列表
*/
public <T> List<T> filterPortsByComments(List<T> ports, String mark, String portType) {
if (ports == null || mark == null) {
logger.debug("未提供{}或标记,返回空列表", portType);
return Collections.emptyList();
}
return ports.stream()
.filter(port -> hasMatchingComments(port, mark))
.collect(Collectors.toList());
}
/**
* 检查端口的 comments 是否包含指定 mark
*
* @param port 端口对象
* @param mark 过滤关键字
* @return 是否匹配
*/
private boolean hasMatchingComments(Object port, String mark) {
try {
// 假设 InputPorts OutputPorts getComponent 方法
Object component = port.getClass().getMethod("getComponent").invoke(port);
if (component == null) {
return false;
}
String comments = (String) component.getClass().getMethod("getComments").invoke(component);
return comments != null && comments.contains(mark);
} catch (Exception e) {
logger.error("访问comments出错: {}", port, e);
return false;
}
}
}