feat(nifi): 新增处理器复制和状态清理功能

- 新增 CreateProcessorAndUpdateParamJoin 类用于创建处理器和更新参数
- 在 NifiAppOperationImpl 中实现 createProcessorAndUpdateParam 方法
- 在 NifiBaseOperationImpl 中实现 copyProcessor 和 clearProcessGroupState 方法
- 更新 INifiAppOperation 和 INifiBaseOperation 接口
- 新增 Relationships15、Processors16 等相关模型类
This commit is contained in:
liuy 2025-05-26 19:15:03 +08:00
parent d504b44601
commit b9bca5795d
13 changed files with 236 additions and 2 deletions

View File

@ -32,4 +32,7 @@ public class NifiServiceConfig {
@Value("${nifi.api.relationshipMark:接口关系标记}")
private String relationshipMark;
@Value("${nifi.api.relationshipMark:接口状态清理标记}")
private String stateClearMark;
}

View File

@ -0,0 +1,20 @@
package com.hzya.frame.nifiapi.model.joincreateprocessorandupdateparam;
import com.hzya.frame.nifiapi.model.resultprocessorsinfo.Component16;
import lombok.Data;
/**
* @Authorliuyang
* @Packagecom.hzya.frame.nifiapi.model.joincerateprocessorandupdateparam
* @Projectfw-nifi
* @nameCreateProcessorAndUpdateParam
* @Date2025/5/26 15:59
* @FilenameCreateProcessorAndUpdateParam
*/
@Data
public class CreateProcessorAndUpdateParamJoin {
private String targetProcessorId;
private String parentProcessGroupId;
private String copyTargetProcessGroupId;
private Component16 component;
}

View File

@ -16,4 +16,5 @@ import java.util.Map;
public class Snippet {
private String parentGroupId;
private Map<String, Map<String, String>> processGroups;
private Map<String, Map<String, String>> processors;
}

View File

@ -2,6 +2,8 @@ package com.hzya.frame.nifiapi.model.resultcreateprocessor;
import lombok.Data;
import java.util.List;
/**
* Auto-generated: 2025-05-17 14:50:7
*
@ -14,4 +16,5 @@ public class Component15 {
private String name;
private Position15 position;
private Config15 config;
private List<Relationships15> relationships;
}

View File

@ -0,0 +1,22 @@
package com.hzya.frame.nifiapi.model.resultcreateprocessor;
import lombok.Data;
/**
* 连接关系
*
* @Authorliuyang
* @Packagecom.hzya.frame.nifiapi.model.resultcreateprocessor
* @Projectfw-nifi
* @nameRelationships15
* @Date2025/5/26 10:36
* @FilenameRelationships15
*/
@Data
public class Relationships15 {
private String name;
private String description;
private Boolean autoTerminate;
// private boolean terminate;
private Boolean retry;
}

View File

@ -15,4 +15,5 @@ import java.util.List;
@Data
public class Flow15 {
private List<ProcessGroups15> processGroups;
private List<Processors16> processors;
}

View File

@ -0,0 +1,18 @@
package com.hzya.frame.nifiapi.model.resultcreatesnippet;
import lombok.Data;
/**
* @Authorliuyang
* @Packagecom.hzya.frame.nifiapi.model.resultcreatesnippet
* @Projectfw-nifi
* @nameProcessors16
* @Date2025/5/26 15:42
* @FilenameProcessors16
*/
@Data
public class Processors16 {
private String id;
private String uri;
private Component15 component;
}

View File

@ -13,4 +13,5 @@ import lombok.Data;
@Data
public class SnippetInstance15 {
private Flow15 flow;
// private Component152 component;
}

View File

@ -1,6 +1,7 @@
package com.hzya.frame.nifiapi.operation;
import com.hzya.frame.nifiapi.model.joincreateoracleapp.CreateAppInstanceJoin;
import com.hzya.frame.nifiapi.model.joincreateprocessorandupdateparam.CreateProcessorAndUpdateParamJoin;
import com.hzya.frame.nifiapi.model.resultcreateoracleapp.CreateAppInstanceResult;
/**
@ -34,4 +35,11 @@ public interface INifiAppOperation {
* @param createMysqlApp 封装创建Mysql应用所需要的入参
*/
// void createMysqlApp(CreateMysqlApp createMysqlApp) throws Exception;
/**
* 新建处理器实例先拷贝再接收修改参数
*
* @param createProcessorAndUpdateParamJoin 封转创建处理器所需要的参数
*/
String createProcessorAndUpdateParam(CreateProcessorAndUpdateParamJoin createProcessorAndUpdateParamJoin) throws Exception;
}

View File

@ -4,6 +4,7 @@ import com.hzya.frame.nifiapi.model.joincreateprocessconnection.CreateProcessorC
import com.hzya.frame.nifiapi.model.joincreparamcontext.CreateParamContextJoin;
import com.hzya.frame.nifiapi.model.joinfindneedmodifycontroller.FindNeedModifyController;
import com.hzya.frame.nifiapi.model.joingetallcontrollerservice.ControllerServices19;
import com.hzya.frame.nifiapi.model.processgroupid.Processors;
import com.hzya.frame.nifiapi.model.resultNeedModifyController.PortFilterResult;
import java.util.List;
@ -29,6 +30,15 @@ public interface INifiBaseOperation {
*/
String instantiateApp(String appProcessGroupId, String parentProcessGroupId, String copyTargetProcessGroupId) throws Exception;
/**
* 拷贝业务处理器
*
* @param targetProcessorId 目标处理器id
* @param parentProcessGroupId 目标处理器id的父处理器id
* @param copyTargetProcessGroupId 需要复制到指定目标流程组的id
*/
String copyProcessor(String targetProcessorId, String parentProcessGroupId, String copyTargetProcessGroupId) throws Exception;
/**
* 新增上下文参数环境并绑定到指定的流程组
*
@ -60,6 +70,14 @@ public interface INifiBaseOperation {
*/
PortFilterResult getNeedModifyControllerRelationshipByProcessGroup(String processGroupId, String mark) throws Exception;
/**
* 查找带"接口状态清理标记"的处理器
*
* @param processGroupId 流程组id
* @param mark 接口修改标记字符串如果为null则不进行过滤
*/
List<Processors> getFindStateClearMarkProcessor(String processGroupId, String mark) throws Exception;
/**
* 查找某个流程组(应用app),带有"修改标记"的控制器服务并更新控制器从而达到实例化要求
*
@ -83,4 +101,11 @@ public interface INifiBaseOperation {
* @param status ENABLED 启用DISABLED停用
*/
void batchEnabledControllerServices(String processGroupId, String status) throws Exception;
}
/**
* 清理掉拷贝后新的流程组中的老鼠屎
*
* @param processGroupId 流程组id
*/
void clearProcessGroupState(List<String> processGroupId) throws Exception;
}

View File

@ -1,12 +1,16 @@
package com.hzya.frame.nifiapi.operation.impl;
import com.hzya.frame.nifiapi.model.joincreateoracleapp.CreateAppInstanceJoin;
import com.hzya.frame.nifiapi.model.joincreateprocessorandupdateparam.CreateProcessorAndUpdateParamJoin;
import com.hzya.frame.nifiapi.model.joincreparamcontext.CreateParamContextJoin;
import com.hzya.frame.nifiapi.model.joinfindneedmodifycontroller.FindNeedModifyController;
import com.hzya.frame.nifiapi.model.joingetallcontrollerservice.ControllerServices19;
import com.hzya.frame.nifiapi.model.joingetcontroller.Component12;
import com.hzya.frame.nifiapi.model.joingetcontroller.ControllerService12;
import com.hzya.frame.nifiapi.model.resultcreateoracleapp.CreateAppInstanceResult;
import com.hzya.frame.nifiapi.model.resultprocessorsinfo.Component16;
import com.hzya.frame.nifiapi.model.resultprocessorsinfo.ProcessorsInfo16;
import com.hzya.frame.nifiapi.model.resultprocessorsinfo.Revision16;
import com.hzya.frame.nifiapi.operation.INifiBaseOperation;
import com.hzya.frame.nifiapi.operation.INifiAppOperation;
import com.hzya.frame.nifiapi.service.NifiApiService;
@ -54,6 +58,7 @@ public class NifiAppOperationImpl implements INifiAppOperation {
//2.新增应用对应的上下文参数
CreateParamContextJoin createParamContextJoin = createOracleApp.getCreateParamContextJoin();
if (createParamContextJoin != null) {
createParamContextJoin.setProcessGroupsId(newProcessGroupId);
baseOperation.createParameterContextsBindingProcessGroup(createParamContextJoin);
}
//3.修改控制器默认激活当前流程组下的所有控制器
@ -84,6 +89,42 @@ public class NifiAppOperationImpl implements INifiAppOperation {
return createAppInstanceResult;
}
@Override
public String createProcessorAndUpdateParam(CreateProcessorAndUpdateParamJoin createProcessorAndUpdateParamJoin) throws Exception {
String newProcessId = null;
try {
//拷贝处理器
String targetProcessorId = createProcessorAndUpdateParamJoin.getTargetProcessorId();
String parentProcessGroupId = createProcessorAndUpdateParamJoin.getParentProcessGroupId();
String copyTargetProcessGroupId = createProcessorAndUpdateParamJoin.getCopyTargetProcessGroupId();
//允许不修改
Component16 component = createProcessorAndUpdateParamJoin.getComponent();
newProcessId = baseOperation.copyProcessor(targetProcessorId, parentProcessGroupId, copyTargetProcessGroupId);
if (newProcessId != null && component != null) {
//更新处理器
ProcessorsInfo16 processor = nifiApiService.getProcessor(newProcessId);
Revision16 revision16 = processor.getRevision();
String version = revision16.getVersion();
Revision16 revision161 = new Revision16();
revision161.setVersion(version);
component.setId(newProcessId);
ProcessorsInfo16 processorsInfo16 = new ProcessorsInfo16();
processorsInfo16.setRevision(revision161);
processorsInfo16.setComponent(component);
ProcessorsInfo16 processor2 = nifiApiService.updateProcessor(newProcessId, processorsInfo16);
String id = processor2.getId();
logger.info("新处理器id:{}" + id);
}
} catch (Exception e) {
throw new Exception(e);
}
return newProcessId;
}
// @Override
// public void createMysqlApp(CreateMysqlApp createMysqlApp) throws Exception {
// try {
@ -97,4 +138,4 @@ public class NifiAppOperationImpl implements INifiAppOperation {
// throw new Exception(e);
// }
// }
}
}

View File

@ -23,12 +23,16 @@ import com.hzya.frame.nifiapi.model.joinsnippets.Snippet;
import com.hzya.frame.nifiapi.model.joinsnippets.SnippetsJoin;
import com.hzya.frame.nifiapi.model.processgroupid.*;
import com.hzya.frame.nifiapi.model.resultNeedModifyController.PortFilterResult;
//import com.hzya.frame.nifiapi.model.resultcreatesnippet.Component152;
import com.hzya.frame.nifiapi.model.resultcreatesnippet.Flow15;
import com.hzya.frame.nifiapi.model.resultcreatesnippet.ProcessGroups15;
import com.hzya.frame.nifiapi.model.resultcreatesnippet.Processors16;
import com.hzya.frame.nifiapi.model.resultcreatesnippet.SnippetInstance15;
import com.hzya.frame.nifiapi.model.resultparametercontexts.ParameterContextsResult;
import com.hzya.frame.nifiapi.model.resultprocessgroupsinfo.ProcessGroupsInfoResult9;
import com.hzya.frame.nifiapi.model.resultprocessgroupsinfo.Revision9;
import com.hzya.frame.nifiapi.model.resultprocessorsinfo.ProcessorsInfo16;
import com.hzya.frame.nifiapi.model.resultprocessorsinfo.Revision16;
import com.hzya.frame.nifiapi.model.resultsnippets.Snippet13;
import com.hzya.frame.nifiapi.model.resultsnippets.SnippetResult13;
import com.hzya.frame.nifiapi.operation.INifiBaseOperation;
@ -85,6 +89,7 @@ public class NifiBaseOperationImpl implements INifiBaseOperation {
Snippet snippet = new Snippet();
snippet.setParentGroupId(parentProcessGroupId);//目标流程组id的父流程组id
snippet.setProcessGroups(processGroupsMap);
snippet.setProcessors(new HashMap<>());
SnippetsJoin snippetsJoin = new SnippetsJoin();
snippetsJoin.setSnippet(snippet);
@ -113,6 +118,44 @@ public class NifiBaseOperationImpl implements INifiBaseOperation {
return processGroups15Id;
}
@Override
public String copyProcessor(String targetProcessorId, String parentProcessGroupId, String copyTargetProcessGroupId) throws Exception {
String processId = null;
try {
//1.处理处理器片段
//查询目标处理器版本
ProcessorsInfo16 processor = nifiApiService.getProcessor(targetProcessorId);
Revision16 revision1 = processor.getRevision();
//目标处理器版本号
Map<String, String> processGroupsMap2 = new HashMap<>();
processGroupsMap2.put("version", revision1.getVersion());
Map<String, Map<String, String>> processGroupsMap = new HashMap<>();
processGroupsMap.put(targetProcessorId, processGroupsMap2);//目标流程组id
Snippet snippet = new Snippet();
snippet.setParentGroupId(parentProcessGroupId);//目标处理器id的父流程组id
snippet.setProcessors(processGroupsMap);
snippet.setProcessGroups(new HashMap<>());//这里真是一个坑不传抛出空指针异常
SnippetsJoin snippetsJoin = new SnippetsJoin();
snippetsJoin.setSnippet(snippet);
SnippetResult13 snippets = nifiApiService.createSnippets(snippetsJoin);
Snippet13 snippet1 = snippets.getSnippet();
//2.实例化片段
SnippetInstanceJoin snippetInstanceJoin = new SnippetInstanceJoin();
snippetInstanceJoin.setSnippetId(snippet1.getId());
SnippetInstance15 snippetInstance = nifiApiService.createSnippetInstance(copyTargetProcessGroupId, snippetInstanceJoin);
Processors16 processors16 = snippetInstance.getFlow().getProcessors().get(0);
processId = processors16.getId();
} catch (Exception e) {
throw new Exception(e);
}
return processId;
}
@Override
public String createParameterContextsBindingProcessGroup(CreateParamContextJoin createParamContextJoin) throws Exception {
String paramGroupsId = null;
@ -196,6 +239,30 @@ public class NifiBaseOperationImpl implements INifiBaseOperation {
return new PortFilterResult();
}
@Override
public List<Processors> getFindStateClearMarkProcessor(String processGroupId, String mark) throws Exception {
List<Processors> filteredProcessors = new ArrayList<>();
try {
// 查询指定流程组内的详情
ProcessGroupsId processGroupsId = nifiApiService.queryFlowProcessGroupsRoot(processGroupId);
if (processGroupsId != null && processGroupsId.getProcessGroupFlow() != null) {
ProcessGroupFlow processGroupFlow = processGroupsId.getProcessGroupFlow();
Flow flow = processGroupFlow.getFlow();
List<Processors> processors = flow.getProcessors();
// 过滤 comments 包含 mark Processors
filteredProcessors = processors.stream().filter(processor ->
processor.getComponent() != null
&& processor.getComponent().getConfig() != null
&& processor.getComponent().getConfig().getComments() != null
&& processor.getComponent().getConfig().getComments().contains(mark)).collect(Collectors.toList());
}
} catch (Exception e) {
throw new Exception("未能筛选带有标记的处理器:" + mark, e);
}
return filteredProcessors;
}
@Override
public void findNeedModifyControllerAndChangesOccur(FindNeedModifyController findNeedModifyController, boolean isThrowError, boolean enabledAll) throws Exception {
try {
@ -276,4 +343,21 @@ public class NifiBaseOperationImpl implements INifiBaseOperation {
throw new Exception(e);
}
}
@Override
public void clearProcessGroupState(List<String> processGroupId) throws Exception {
if (processGroupId != null && processGroupId.size() > 0) {
for (String processGroupIdIndex : processGroupId) {
//根据流程组id+标识名称查找需要清理的state处理器
String relationshipMark = nifiServiceConfig.getRelationshipMark();
List<Processors> findStateClearMarkProcessor = getFindStateClearMarkProcessor(processGroupIdIndex, relationshipMark);
for (Processors processors : findStateClearMarkProcessor) {
Component2 component = processors.getComponent();
if (component != null) {
nifiApiService.clearProcessorRequests(component.getId());
}
}
}
}
}
}

View File

@ -305,6 +305,13 @@ public class NifiApiService {
return client.post(StrUtil.format("/process-groups/{}/connections", processGroupsId), createConnections17, CreateConnection18.class);
}
/**
* 清除处理器的状态,此接口返回{}空对象
*/
public void clearProcessorRequests(String processorId) throws Exception {
client.post(StrUtil.format("/processors/{}/state/clear-requests", processorId), null, Object.class);
}
/**
* 计算新处理器或流程组的xy坐标
*