From 56841ca743a886bfb7c5389c5673d90da2c51ec0 Mon Sep 17 00:00:00 2001 From: liuy <37787198+LiuyCodes@users.noreply.github.com> Date: Tue, 27 May 2025 11:10:18 +0800 Subject: [PATCH] =?UTF-8?q?feat(nifi):=20=E4=BC=98=E5=8C=96=E6=B5=81?= =?UTF-8?q?=E7=A8=8B=E7=BB=84=E7=8A=B6=E6=80=81=E6=B8=85=E7=90=86=E5=92=8C?= =?UTF-8?q?=E6=8E=A7=E5=88=B6=E5=99=A8=E6=9C=8D=E5=8A=A1=E6=BF=80=E6=B4=BB?= =?UTF-8?q?=E5=8A=9F=E8=83=BD-=20=E4=BF=AE=E6=94=B9=E4=BA=86=20findNeedMod?= =?UTF-8?q?ifyControllerAndChangesOccur=20=E6=96=B9=E6=B3=95=E7=9A=84?= =?UTF-8?q?=E8=B0=83=E7=94=A8=E5=8F=82=E6=95=B0=EF=BC=8C=E4=BB=8E=20true,?= =?UTF-8?q?=20true=20=E6=94=B9=E4=B8=BA=20true,=20false=20-=20=E6=96=B0?= =?UTF-8?q?=E5=A2=9E=E4=BA=86=E6=BF=80=E6=B4=BB=E6=B5=81=E7=A8=8B=E7=BB=84?= =?UTF-8?q?=E5=86=85=E6=89=80=E6=9C=89=E6=8E=A7=E5=88=B6=E5=99=A8=E6=9C=8D?= =?UTF-8?q?=E5=8A=A1=E7=9A=84=E9=80=BB=E8=BE=91=20-=20=E4=BC=98=E5=8C=96?= =?UTF-8?q?=E4=BA=86=E6=B8=85=E7=90=86=E6=B5=81=E7=A8=8B=E7=BB=84=E7=8A=B6?= =?UTF-8?q?=E6=80=81=E7=9A=84=E9=80=BB=E8=BE=91=EF=BC=8C=E4=BD=BF=E7=94=A8?= =?UTF-8?q?=E6=96=B0=E7=9A=84=E7=8A=B6=E6=80=81=E6=B8=85=E7=90=86=E6=A0=87?= =?UTF-8?q?=E8=AE=B0=20-=20=E4=BF=AE=E5=A4=8D=E4=BA=86=E6=89=B9=E9=87=8F?= =?UTF-8?q?=E5=90=AF=E7=94=A8=E6=8E=A7=E5=88=B6=E5=99=A8=E6=9C=8D=E5=8A=A1?= =?UTF-8?q?=E6=97=B6=E7=9A=84=E7=89=88=E6=9C=AC=E6=8E=A7=E5=88=B6=E9=97=AE?= =?UTF-8?q?=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../nifiapi/operation/INifiBaseOperation.java | 6 +-- .../operation/impl/NifiAppOperationImpl.java | 10 ++++- .../operation/impl/NifiBaseOperationImpl.java | 39 +++++++++---------- 3 files changed, 31 insertions(+), 24 deletions(-) diff --git a/fw-nifi/src/main/java/com/hzya/frame/nifiapi/operation/INifiBaseOperation.java b/fw-nifi/src/main/java/com/hzya/frame/nifiapi/operation/INifiBaseOperation.java index 134a4ead..7fa242b7 100644 --- a/fw-nifi/src/main/java/com/hzya/frame/nifiapi/operation/INifiBaseOperation.java +++ b/fw-nifi/src/main/java/com/hzya/frame/nifiapi/operation/INifiBaseOperation.java @@ -76,7 +76,7 @@ public interface INifiBaseOperation { * @param processGroupId 流程组id * @param mark 接口修改标记字符串,如果为null,则不进行过滤 */ - List getFindStateClearMarkProcessor(String processGroupId, String mark) throws Exception; + List getFindStateClearMarkProcessor(String processGroupId, String mark) throws Exception; /** * 查找某个流程组(应用app)内,带有"修改标记"的控制器服务,并更新控制器,从而达到实例化要求 @@ -105,7 +105,7 @@ public interface INifiBaseOperation { /** * 清理掉拷贝后新的流程组中的老鼠屎 * - * @param processGroupId 流程组id + * @param processGroupIdList 流程组id集合 */ - void clearProcessGroupState(List processGroupId) throws Exception; + void clearProcessGroupState(List processGroupIdList) throws Exception; } \ No newline at end of file diff --git a/fw-nifi/src/main/java/com/hzya/frame/nifiapi/operation/impl/NifiAppOperationImpl.java b/fw-nifi/src/main/java/com/hzya/frame/nifiapi/operation/impl/NifiAppOperationImpl.java index 13824ba7..31505252 100644 --- a/fw-nifi/src/main/java/com/hzya/frame/nifiapi/operation/impl/NifiAppOperationImpl.java +++ b/fw-nifi/src/main/java/com/hzya/frame/nifiapi/operation/impl/NifiAppOperationImpl.java @@ -19,6 +19,7 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Repository; +import java.util.ArrayList; import java.util.List; /** @@ -79,8 +80,15 @@ public class NifiAppOperationImpl implements INifiAppOperation { } findNeedModifyController.setProcessGroupId(newProcessGroupId); - baseOperation.findNeedModifyControllerAndChangesOccur(findNeedModifyController, true, true); + baseOperation.findNeedModifyControllerAndChangesOccur(findNeedModifyController, true, false); } + //4.激活当前流程组内的所有控制器服务 + String controllerStatus = "ENABLED"; + baseOperation.batchEnabledControllerServices(newProcessGroupId, controllerStatus); + //5.清理state状态 + List processGroupIdList = new ArrayList<>(); + processGroupIdList.add(newProcessGroupId); + baseOperation.clearProcessGroupState(processGroupIdList); createAppInstanceResult.setNewProcessGroupId(newProcessGroupId); } catch (Exception e) { diff --git a/fw-nifi/src/main/java/com/hzya/frame/nifiapi/operation/impl/NifiBaseOperationImpl.java b/fw-nifi/src/main/java/com/hzya/frame/nifiapi/operation/impl/NifiBaseOperationImpl.java index 5eaaa80a..44365099 100644 --- a/fw-nifi/src/main/java/com/hzya/frame/nifiapi/operation/impl/NifiBaseOperationImpl.java +++ b/fw-nifi/src/main/java/com/hzya/frame/nifiapi/operation/impl/NifiBaseOperationImpl.java @@ -13,6 +13,7 @@ import com.hzya.frame.nifiapi.model.joincreateconnections.CreateConnections18; import com.hzya.frame.nifiapi.model.joincreateprocessconnection.CreateProcessorConnections; import com.hzya.frame.nifiapi.model.joincreparamcontext.CreateParamContextJoin; import com.hzya.frame.nifiapi.model.joinfindneedmodifycontroller.FindNeedModifyController; +import com.hzya.frame.nifiapi.model.joingetallcontrollerservice.Component19; import com.hzya.frame.nifiapi.model.joingetallcontrollerservice.ControllerServices19; import com.hzya.frame.nifiapi.model.joingetallcontrollerservice.GetAllController19; import com.hzya.frame.nifiapi.model.joingetcontroller.ControllerService12; @@ -251,11 +252,7 @@ public class NifiBaseOperationImpl implements INifiBaseOperation { List processors = flow.getProcessors(); // 过滤 comments 包含 mark 的 Processors - filteredProcessors = processors.stream().filter(processor -> - processor.getComponent() != null - && processor.getComponent().getConfig() != null - && processor.getComponent().getConfig().getComments() != null - && processor.getComponent().getConfig().getComments().contains(mark)).collect(Collectors.toList()); + filteredProcessors = processors.stream().filter(processor -> processor.getComponent() != null && processor.getComponent().getConfig() != null && processor.getComponent().getConfig().getComments() != null && processor.getComponent().getConfig().getComments().contains(mark)).collect(Collectors.toList()); } } catch (Exception e) { throw new Exception("未能筛选带有标记的处理器:" + mark, e); @@ -323,20 +320,22 @@ public class NifiBaseOperationImpl implements INifiBaseOperation { if (allControllerServices != null && allControllerServices.getControllerServices() != null) { List controllerServices = allControllerServices.getControllerServices(); for (ControllerServices19 controllerService : controllerServices) { + Component19 component = controllerService.getComponent(); String controllerServicesId = controllerService.getId(); + if (controllerServicesId != null && component != null && !status.equals(component.getState())) { + //得到控制器服务当前版本 + ControllerService12 controllerService12 = nifiApiService.getControllerServices(controllerServicesId); + Revision12 revision = controllerService12.getRevision(); - //得到控制器服务当前版本 - ControllerService12 controllerService12 = nifiApiService.getControllerServices(controllerServicesId); - Revision12 revision = controllerService12.getRevision(); + Revision13 revision13 = new Revision13(); + revision13.setVersion(revision.getVersion()); - Revision13 revision13 = new Revision13(); - revision13.setVersion(revision.getVersion()); - - EnOrDiControllerServices12 enOrDiControllerServices = new EnOrDiControllerServices12(); - enOrDiControllerServices.setState(status); - enOrDiControllerServices.setRevision(revision13); - ControllerService12 controllerService121 = nifiApiService.enabledOrDisabledControllerServices(controllerServicesId, enOrDiControllerServices); - logger.info("控制器服务id:{}", controllerService121.getId()); + EnOrDiControllerServices12 enOrDiControllerServices = new EnOrDiControllerServices12(); + enOrDiControllerServices.setState(status); + enOrDiControllerServices.setRevision(revision13); + ControllerService12 controllerService121 = nifiApiService.enabledOrDisabledControllerServices(controllerServicesId, enOrDiControllerServices); + logger.info("控制器服务id:{}", controllerService121.getId()); + } } } } catch (Exception e) { @@ -345,11 +344,11 @@ public class NifiBaseOperationImpl implements INifiBaseOperation { } @Override - public void clearProcessGroupState(List processGroupId) throws Exception { - if (processGroupId != null && processGroupId.size() > 0) { - for (String processGroupIdIndex : processGroupId) { + public void clearProcessGroupState(List processGroupIdList) throws Exception { + if (processGroupIdList != null && processGroupIdList.size() > 0) { + for (String processGroupIdIndex : processGroupIdList) { //根据流程组id+标识名称,查找需要清理的state处理器 - String relationshipMark = nifiServiceConfig.getRelationshipMark(); + String relationshipMark = nifiServiceConfig.getStateClearMark(); List findStateClearMarkProcessor = getFindStateClearMarkProcessor(processGroupIdIndex, relationshipMark); for (Processors processors : findStateClearMarkProcessor) { Component2 component = processors.getComponent();