feat(nifi): 优化流程组状态清理和控制器服务激活功能- 修改了 findNeedModifyControllerAndChangesOccur 方法的调用参数,从 true, true 改为 true, false

- 新增了激活流程组内所有控制器服务的逻辑
- 优化了清理流程组状态的逻辑,使用新的状态清理标记
- 修复了批量启用控制器服务时的版本控制问题
This commit is contained in:
liuy 2025-05-27 11:10:18 +08:00
parent b9bca5795d
commit 56841ca743
3 changed files with 31 additions and 24 deletions

View File

@ -76,7 +76,7 @@ public interface INifiBaseOperation {
* @param processGroupId 流程组id
* @param mark 接口修改标记字符串如果为null则不进行过滤
*/
List<Processors> getFindStateClearMarkProcessor(String processGroupId, String mark) throws Exception;
List<Processors> getFindStateClearMarkProcessor(String processGroupId, String mark) throws Exception;
/**
* 查找某个流程组(应用app),带有"修改标记"的控制器服务并更新控制器从而达到实例化要求
@ -105,7 +105,7 @@ public interface INifiBaseOperation {
/**
* 清理掉拷贝后新的流程组中的老鼠屎
*
* @param processGroupId 流程组id
* @param processGroupIdList 流程组id集合
*/
void clearProcessGroupState(List<String> processGroupId) throws Exception;
void clearProcessGroupState(List<String> processGroupIdList) throws Exception;
}

View File

@ -19,6 +19,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Repository;
import java.util.ArrayList;
import java.util.List;
/**
@ -79,8 +80,15 @@ public class NifiAppOperationImpl implements INifiAppOperation {
}
findNeedModifyController.setProcessGroupId(newProcessGroupId);
baseOperation.findNeedModifyControllerAndChangesOccur(findNeedModifyController, true, true);
baseOperation.findNeedModifyControllerAndChangesOccur(findNeedModifyController, true, false);
}
//4.激活当前流程组内的所有控制器服务
String controllerStatus = "ENABLED";
baseOperation.batchEnabledControllerServices(newProcessGroupId, controllerStatus);
//5.清理state状态
List<String> processGroupIdList = new ArrayList<>();
processGroupIdList.add(newProcessGroupId);
baseOperation.clearProcessGroupState(processGroupIdList);
createAppInstanceResult.setNewProcessGroupId(newProcessGroupId);
} catch (Exception e) {

View File

@ -13,6 +13,7 @@ import com.hzya.frame.nifiapi.model.joincreateconnections.CreateConnections18;
import com.hzya.frame.nifiapi.model.joincreateprocessconnection.CreateProcessorConnections;
import com.hzya.frame.nifiapi.model.joincreparamcontext.CreateParamContextJoin;
import com.hzya.frame.nifiapi.model.joinfindneedmodifycontroller.FindNeedModifyController;
import com.hzya.frame.nifiapi.model.joingetallcontrollerservice.Component19;
import com.hzya.frame.nifiapi.model.joingetallcontrollerservice.ControllerServices19;
import com.hzya.frame.nifiapi.model.joingetallcontrollerservice.GetAllController19;
import com.hzya.frame.nifiapi.model.joingetcontroller.ControllerService12;
@ -251,11 +252,7 @@ public class NifiBaseOperationImpl implements INifiBaseOperation {
List<Processors> processors = flow.getProcessors();
// 过滤 comments 包含 mark Processors
filteredProcessors = processors.stream().filter(processor ->
processor.getComponent() != null
&& processor.getComponent().getConfig() != null
&& processor.getComponent().getConfig().getComments() != null
&& processor.getComponent().getConfig().getComments().contains(mark)).collect(Collectors.toList());
filteredProcessors = processors.stream().filter(processor -> processor.getComponent() != null && processor.getComponent().getConfig() != null && processor.getComponent().getConfig().getComments() != null && processor.getComponent().getConfig().getComments().contains(mark)).collect(Collectors.toList());
}
} catch (Exception e) {
throw new Exception("未能筛选带有标记的处理器:" + mark, e);
@ -323,20 +320,22 @@ public class NifiBaseOperationImpl implements INifiBaseOperation {
if (allControllerServices != null && allControllerServices.getControllerServices() != null) {
List<ControllerServices19> controllerServices = allControllerServices.getControllerServices();
for (ControllerServices19 controllerService : controllerServices) {
Component19 component = controllerService.getComponent();
String controllerServicesId = controllerService.getId();
if (controllerServicesId != null && component != null && !status.equals(component.getState())) {
//得到控制器服务当前版本
ControllerService12 controllerService12 = nifiApiService.getControllerServices(controllerServicesId);
Revision12 revision = controllerService12.getRevision();
//得到控制器服务当前版本
ControllerService12 controllerService12 = nifiApiService.getControllerServices(controllerServicesId);
Revision12 revision = controllerService12.getRevision();
Revision13 revision13 = new Revision13();
revision13.setVersion(revision.getVersion());
Revision13 revision13 = new Revision13();
revision13.setVersion(revision.getVersion());
EnOrDiControllerServices12 enOrDiControllerServices = new EnOrDiControllerServices12();
enOrDiControllerServices.setState(status);
enOrDiControllerServices.setRevision(revision13);
ControllerService12 controllerService121 = nifiApiService.enabledOrDisabledControllerServices(controllerServicesId, enOrDiControllerServices);
logger.info("控制器服务id{}", controllerService121.getId());
EnOrDiControllerServices12 enOrDiControllerServices = new EnOrDiControllerServices12();
enOrDiControllerServices.setState(status);
enOrDiControllerServices.setRevision(revision13);
ControllerService12 controllerService121 = nifiApiService.enabledOrDisabledControllerServices(controllerServicesId, enOrDiControllerServices);
logger.info("控制器服务id{}", controllerService121.getId());
}
}
}
} catch (Exception e) {
@ -345,11 +344,11 @@ public class NifiBaseOperationImpl implements INifiBaseOperation {
}
@Override
public void clearProcessGroupState(List<String> processGroupId) throws Exception {
if (processGroupId != null && processGroupId.size() > 0) {
for (String processGroupIdIndex : processGroupId) {
public void clearProcessGroupState(List<String> processGroupIdList) throws Exception {
if (processGroupIdList != null && processGroupIdList.size() > 0) {
for (String processGroupIdIndex : processGroupIdList) {
//根据流程组id+标识名称查找需要清理的state处理器
String relationshipMark = nifiServiceConfig.getRelationshipMark();
String relationshipMark = nifiServiceConfig.getStateClearMark();
List<Processors> findStateClearMarkProcessor = getFindStateClearMarkProcessor(processGroupIdIndex, relationshipMark);
for (Processors processors : findStateClearMarkProcessor) {
Component2 component = processors.getComponent();