feat(nifi): 新增参数上下文创建和控制器更新功能

- 新增创建参数上下文并绑定到指定流程组的功能
- 新增查询需要修改的控制器详情功能
- 新增更新控制器服务的功能
- 重构了基础操作接口,增加了新的操作方法
-优化了代码结构,提高了可维护性和可扩展性
This commit is contained in:
liuy 2025-05-22 11:51:11 +08:00
parent 58ff225c47
commit 83f99be452
12 changed files with 299 additions and 67 deletions

View File

@ -1,5 +1,6 @@
package com.hzya.frame.nifiapi.config;
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
@ -14,6 +15,7 @@ import org.springframework.context.annotation.Configuration;
* @FilenameNifiConfig
*/
@Configuration
@Data
public class NifiServiceConfig {
@Value("${nifi.api.url:https://192.168.2.233:8443/nifi-api}")
@ -25,27 +27,6 @@ public class NifiServiceConfig {
@Value("${nifi.api.password:hzya1314*nifi}")
private String password;
public String getApiUrl() {
return apiUrl;
}
public void setApiUrl(String apiUrl) {
this.apiUrl = apiUrl;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
@Value("${nifi.api.controllerModifymark:接口修改标记}")
private String controllerModifyMark;
}

View File

@ -0,0 +1,18 @@
package com.hzya.frame.nifiapi.model.joincreparamcontext;
import com.hzya.frame.nifiapi.model.joinparametercontexts.ParameterContextsJoin;
import lombok.Data;
/**
* @Authorliuyang
* @Packagecom.hzya.frame.nifiapi.model.joincreparamcontext
* @Projectfw-nifi
* @nameCreateParamContextJoin
* @Date2025/5/22 08:58
* @FilenameCreateParamContextJoin
*/
@Data
public class CreateParamContextJoin {
private ParameterContextsJoin parameterContextsJoin;
private String processGroupsId;
}

View File

@ -0,0 +1,20 @@
package com.hzya.frame.nifiapi.model.joinfindneedmodifycontroller;
import com.hzya.frame.nifiapi.model.joingetcontroller.ControllerService12;
import lombok.Data;
import java.util.List;
/**
* @Authorliuyang
* @Packagecom.hzya.frame.nifiapi.model.joinfindneedmodifycontroller
* @Projectfw-nifi
* @nameFindNeedModifyController
* @Date2025/5/22 11:03
* @FilenameFindNeedModifyController
*/
@Data
public class FindNeedModifyController {
private String processGroupId;
private List<ControllerService12> needModifyController;
}

View File

@ -0,0 +1,31 @@
package com.hzya.frame.nifiapi.model.joingetallcontrollerservice;
import lombok.Data;
import java.util.Map;
/**
* @Authorliuyang
* @Packagecom.hzya.frame.nifiapi.model.joingetallcontrollerservice
* @Projectfw-nifi
* @nameComponent19
* @Date2025/5/22 10:28
* @FilenameComponent19
*/
@Data
public class Component19 {
private String id;
private String name;
private String type;
private String state;
private String validationStatus;
private String bulletinLevel;
private String extensionMissing;
private String comments;
private Map<String, String> properties;
private String persistsState;
private String restricted;
private String deprecated;
private String multipleVersionsAvailable;
private String supportsSensitiveDynamicProperties;
}

View File

@ -0,0 +1,18 @@
package com.hzya.frame.nifiapi.model.joingetallcontrollerservice;
import lombok.Data;
/**
* @Authorliuyang
* @Packagecom.hzya.frame.nifiapi.model.joingetallcontrollerservice
* @Projectfw-nifi
* @nameControllerServices19
* @Date2025/5/22 10:15
* @FilenameControllerServices19
*/
@Data
public class ControllerServices19 {
private String id;
private String uri;
private Component19 component;
}

View File

@ -0,0 +1,18 @@
package com.hzya.frame.nifiapi.model.joingetallcontrollerservice;
import lombok.Data;
import java.util.List;
/**
* @Author liuyang
* @Package com.hzya.frame.nifiapi.model.joingetallcontrollerservice
* @Project fw-nifi
* @name GetAllController
* @Date 2025/5/22 10:13
* @Filename GetAllController
*/
@Data
public class GetAllController19 {
private List<ControllerServices19> controllerServices;
}

View File

@ -1,21 +0,0 @@
package com.hzya.frame.nifiapi.operation;
/**
* @Authorliuyang
* @Packagecom.hzya.frame.nifiapi.operation
* @Projectfw-nifi
* @nameBaseNifiOperation
* @Date2025/5/21 17:50
* @FilenameBaseNifiOperation
*/
public interface BaseOperation {
/**
* 实例化某个应用对应nifi流程组id
*
* @param appProcessGroupId 需要复制的流程组id(应用id)比如Oracle应用idMysql应用id
* @param parentProcessGroupId 流程组id(应用id)的父流程组id
* @param copyTargetProcessGroupId 指定创建的流程组id
* @return 返回流程组实例化id
*/
String instantiateApp(String appProcessGroupId, String parentProcessGroupId, String copyTargetProcessGroupId) throws Exception;
}

View File

@ -0,0 +1,53 @@
package com.hzya.frame.nifiapi.operation;
import com.hzya.frame.nifiapi.model.joincreparamcontext.CreateParamContextJoin;
import com.hzya.frame.nifiapi.model.joinfindneedmodifycontroller.FindNeedModifyController;
import com.hzya.frame.nifiapi.model.joingetallcontrollerservice.ControllerServices19;
import java.util.List;
/**
* Nifi基础操作封装
*
* @Authorliuyang
* @Packagecom.hzya.frame.nifiapi.operation
* @Projectfw-nifi
* @nameBaseNifiOperation
* @Date2025/5/21 17:50
* @FilenameBaseNifiOperation
*/
public interface IBaseOperation {
/**
* 实例化某个应用对应nifi流程组id
*
* @param appProcessGroupId 需要复制的流程组id(应用id)比如Oracle应用idMysql应用id
* @param parentProcessGroupId 流程组id(应用id)的父流程组id
* @param copyTargetProcessGroupId 指定创建的流程组id
* @return 返回流程组实例化id
*/
String instantiateApp(String appProcessGroupId, String parentProcessGroupId, String copyTargetProcessGroupId) throws Exception;
/**
* 新增上下文参数环境并绑定到指定的流程组
*
* @param createParamContextJoin 绑定上下文参数封装对象
* @return 返回被绑定参数上下文的流程组id
*/
String createParameterContextsBindingProcessGroup(CreateParamContextJoin createParamContextJoin) throws Exception;
/**
* 查询需要修改的控制器详情
*/
List<ControllerServices19> getNeedModifyControllerServicesByProcessGroup(String processGroupId) throws Exception;
/**
* 查找某个流程组(应用app),带有"修改标记"的控制器服务并更新控制器从而达到实例化要求
*
* @param isThrowError true抛出异常false不抛出异常假设某个流程下有3个控制器服务需要修改但入参只传了2个在不满足业务需求且isError=true的情况下就会抛出异常主要是提醒业务调用代码
*/
void findNeedModifyControllerAndChangesOccur(FindNeedModifyController findNeedModifyController, boolean isThrowError) throws Exception;
/**
* 批量创建连接关系
*/
}

View File

@ -13,7 +13,7 @@ import com.hzya.frame.nifiapi.model.joincreateoracleapp.CreateOracleApp;
* @Date2025/5/21 17:05
* @FilenameNifiOperation
*/
public interface NifiOperation {
public interface INifiOperation {
/**
* Oracle应用创建并实例化
* 主要包含一系列nifi接口调用操作其他应用开发封转类以此类推

View File

@ -1,22 +1,41 @@
package com.hzya.frame.nifiapi.operation.impl;
import cn.hutool.core.util.StrUtil;
import com.hzya.frame.nifiapi.config.NifiServiceConfig;
import com.hzya.frame.nifiapi.model.joinbindparametercontexts.BindParameterContextsJoin11;
import com.hzya.frame.nifiapi.model.joinbindparametercontexts.Component11;
import com.hzya.frame.nifiapi.model.joinbindparametercontexts.ParameterContext11;
import com.hzya.frame.nifiapi.model.joinbindparametercontexts.Revision11;
import com.hzya.frame.nifiapi.model.joincreparamcontext.CreateParamContextJoin;
import com.hzya.frame.nifiapi.model.joinfindneedmodifycontroller.FindNeedModifyController;
import com.hzya.frame.nifiapi.model.joingetallcontrollerservice.ControllerServices19;
import com.hzya.frame.nifiapi.model.joingetallcontrollerservice.GetAllController19;
import com.hzya.frame.nifiapi.model.joingetcontroller.ControllerService12;
import com.hzya.frame.nifiapi.model.joingetcontroller.Revision12;
import com.hzya.frame.nifiapi.model.joinparametercontexts.*;
import com.hzya.frame.nifiapi.model.joinsnippetinstance.SnippetInstanceJoin;
import com.hzya.frame.nifiapi.model.joinsnippets.Snippet;
import com.hzya.frame.nifiapi.model.joinsnippets.SnippetsJoin;
import com.hzya.frame.nifiapi.model.resultcreatesnippet.Flow15;
import com.hzya.frame.nifiapi.model.resultcreatesnippet.ProcessGroups15;
import com.hzya.frame.nifiapi.model.resultcreatesnippet.SnippetInstance15;
import com.hzya.frame.nifiapi.model.resultparametercontexts.ParameterContextsResult;
import com.hzya.frame.nifiapi.model.resultprocessgroupsinfo.ProcessGroupsInfoResult9;
import com.hzya.frame.nifiapi.model.resultprocessgroupsinfo.Revision9;
import com.hzya.frame.nifiapi.model.resultsnippets.Snippet13;
import com.hzya.frame.nifiapi.model.resultsnippets.SnippetResult13;
import com.hzya.frame.nifiapi.operation.BaseOperation;
import com.hzya.frame.nifiapi.operation.IBaseOperation;
import com.hzya.frame.nifiapi.service.NifiApiService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Repository;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* @Authorliuyang
@ -27,16 +46,21 @@ import java.util.Map;
* @FilenameBaseOperationImpl
*/
@Repository(value = "BaseOperationImpl")
public class BaseOperationImpl implements BaseOperation {
public class BaseOperationImpl implements IBaseOperation {
Logger logger = LoggerFactory.getLogger(BaseOperationImpl.class);
@Autowired
private NifiApiService nifiApiService;
@Autowired
private NifiServiceConfig nifiServiceConfig;
@Override
public String instantiateApp(String appProcessGroupId, String parentProcessGroupId, String copyTargetProcessGroupId) throws Exception {
String processGroups15Id = null;
try {
//1.创建流程组对应的片段得到片段id
//创建流程组片段
//查询目标流程组版本
ProcessGroupsInfoResult9 processGroups = nifiApiService.getProcessGroups(appProcessGroupId);
Revision9 revision = processGroups.getRevision();
@ -77,4 +101,88 @@ public class BaseOperationImpl implements BaseOperation {
}
return processGroups15Id;
}
@Override
public String createParameterContextsBindingProcessGroup(CreateParamContextJoin createParamContextJoin) throws Exception {
String paramGroupsId = null;
try {
//1.创建参数上下文
ParameterContextsJoin parameterContextsJoin1 = createParamContextJoin.getParameterContextsJoin();
ParameterContextsResult parameterContexts = nifiApiService.createParameterContexts(parameterContextsJoin1);
System.out.println(parameterContexts);
//2.绑定到指定流程组
//查询指定流程组的详情
String processGroupsId = createParamContextJoin.getProcessGroupsId();
ProcessGroupsInfoResult9 processGroups = nifiApiService.getProcessGroups(processGroupsId);
Revision9 revision = processGroups.getRevision();
//流程组的版本号
Revision11 revision11 = new Revision11();
revision11.setVersion(revision.getVersion());
//参数上下文对象id
ParameterContext11 parameterContext11 = new ParameterContext11();
parameterContext11.setId(parameterContexts.getId());
//指定的组件对象
Component11 component11 = new Component11();
component11.setId(processGroups.getId());
component11.setParameterContext(parameterContext11);
BindParameterContextsJoin11 bindParameterContextsJoin11 = new BindParameterContextsJoin11();
bindParameterContextsJoin11.setComponent(component11);
bindParameterContextsJoin11.setRevision(revision11);
ProcessGroupsInfoResult9 processGroupsInfoResult9 = nifiApiService.bindParameterContexts(bindParameterContextsJoin11);
logger.info("流程组id:{}", processGroupsInfoResult9.getId());
paramGroupsId = parameterContexts.getId();
} catch (Exception e) {
throw new Exception(e);
}
return paramGroupsId;
}
@Override
public List<ControllerServices19> getNeedModifyControllerServicesByProcessGroup(String processGroupId) throws Exception {
String controllerModifyMark = nifiServiceConfig.getControllerModifyMark();
//1.查找某个流程组内的所有控制器筛选出带"接口修改标记"的控制器片段
List<ControllerServices19> modifiedControllers = new ArrayList<>();
GetAllController19 allControllerServices = nifiApiService.getAllControllerServices(processGroupId);
if (allControllerServices != null && allControllerServices.getControllerServices() != null) {
modifiedControllers = allControllerServices.getControllerServices().stream().filter(service -> service.getComponent() != null && controllerModifyMark.equals(service.getComponent().getComments())).collect(Collectors.toList());
}
return modifiedControllers;
}
@Override
public void findNeedModifyControllerAndChangesOccur(FindNeedModifyController findNeedModifyController, boolean isThrowError) throws Exception {
try {
//1.得到"接口修改标记"的控制器并验证入参数量是否匹配
List<ControllerServices19> needModifyControllerServicesByProcessGroup = getNeedModifyControllerServicesByProcessGroup(findNeedModifyController.getProcessGroupId());
List<ControllerService12> needModifyController = findNeedModifyController.getNeedModifyController();
if (needModifyControllerServicesByProcessGroup != null) {
if ((needModifyControllerServicesByProcessGroup.size() != needModifyController.size()) && isThrowError) {
throw new Exception(StrUtil.format("需要接口修改{}个控制器,但接口传了{}个", needModifyControllerServicesByProcessGroup.size(), needModifyController.size()));
}
}
//2.修改指定控制器
for (ControllerService12 controllerService11 : needModifyController) {
//查询控制器版本
ControllerService12 controllerServices = nifiApiService.getControllerServices(controllerService11.getId());
Revision12 revision = controllerServices.getRevision();
Revision12 revision12 = new Revision12();
revision12.setVersion(revision.getVersion());
controllerService11.setRevision(revision12);
ControllerService12 controllerService12 = nifiApiService.updateControllerServices(controllerService11.getId(), controllerService11);
//在传参没有问题的情况下有控制器id返回即代表修改成功
logger.info("控制器id:{}", controllerService12.getId());
}
} catch (Exception e) {
throw new Exception(e);
}
}
}

View File

@ -1,29 +1,15 @@
package com.hzya.frame.nifiapi.operation.impl;
import com.hzya.frame.nifiapi.client.NifiClient;
import com.hzya.frame.nifiapi.model.joincreatemysqlapp.CreateMysqlApp;
import com.hzya.frame.nifiapi.model.joincreateoracleapp.CreateOracleApp;
import com.hzya.frame.nifiapi.model.joinsnippetinstance.SnippetInstanceJoin;
import com.hzya.frame.nifiapi.model.joinsnippets.Snippet;
import com.hzya.frame.nifiapi.model.joinsnippets.SnippetsJoin;
import com.hzya.frame.nifiapi.model.resultcreatesnippet.Flow15;
import com.hzya.frame.nifiapi.model.resultcreatesnippet.ProcessGroups15;
import com.hzya.frame.nifiapi.model.resultcreatesnippet.SnippetInstance15;
import com.hzya.frame.nifiapi.model.resultprocessgroupsinfo.ProcessGroupsInfoResult9;
import com.hzya.frame.nifiapi.model.resultprocessgroupsinfo.Revision9;
import com.hzya.frame.nifiapi.model.resultsnippets.Snippet13;
import com.hzya.frame.nifiapi.model.resultsnippets.SnippetResult13;
import com.hzya.frame.nifiapi.operation.BaseOperation;
import com.hzya.frame.nifiapi.operation.NifiOperation;
import com.hzya.frame.nifiapi.operation.IBaseOperation;
import com.hzya.frame.nifiapi.operation.INifiOperation;
import com.hzya.frame.nifiapi.service.NifiApiService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Repository;
import java.util.HashMap;
import java.util.Map;
/**
* NiFi 操作实现类封装通用操作
*
@ -35,7 +21,7 @@ import java.util.Map;
* @FilenameNifiOperationImpl
*/
@Repository(value = "NifiOperationImpl")
public class NifiOperationImpl implements NifiOperation {
public class NifiOperationImpl implements INifiOperation {
Logger logger = LoggerFactory.getLogger(NifiOperationImpl.class);
@ -43,7 +29,7 @@ public class NifiOperationImpl implements NifiOperation {
private NifiApiService nifiApiService;
@Autowired
private BaseOperation baseOperation;
private IBaseOperation baseOperation;
@Override
public void createOracleApp(CreateOracleApp createOracleApp) throws Exception {

View File

@ -9,6 +9,7 @@ import com.hzya.frame.nifiapi.model.joincontrollerenabled.EnOrDiControllerServic
import com.hzya.frame.nifiapi.model.joincreateconnection.CreateConnection18;
import com.hzya.frame.nifiapi.model.joincreateconnections.CreateConnections18;
import com.hzya.frame.nifiapi.model.joincreatetemp.CreateTemplateJoin;
import com.hzya.frame.nifiapi.model.joingetallcontrollerservice.GetAllController19;
import com.hzya.frame.nifiapi.model.joingetcontroller.ControllerService12;
import com.hzya.frame.nifiapi.model.joinparametercontexts.ParameterContextsJoin;
import com.hzya.frame.nifiapi.model.joinprocessgroups.Component7;
@ -169,7 +170,16 @@ public class NifiApiService {
}
/**
* 创建流程组组级别的控制器服务,控制器服务分为流程组级别和全局级别其中流程组级别只能在流程组内的处理器共享
* 查询某个流程组内的所有控制器
*/
public GetAllController19 getAllControllerServices(String processGroupId) throws Exception {
return client.get(StrUtil.format("/flow/process-groups/{}/controller-services", processGroupId), GetAllController19.class);
}
/**
* 创建流程组组级别的控制器服务
* 控制器服务分为流程组级别和全局级别其中流程组级别只能在流程组内的处理器共享子流程组继承父流程组控制器
* 全局控制器无法给普通流程组使用
*
* @param id 对应的流程组id
* @param controllerService11 提交参数
@ -178,6 +188,16 @@ public class NifiApiService {
return client.post(StrUtil.format("/process-groups/{}/controller-services", id), controllerService11, ControllerService12.class);
}
/**
* 更新控制器服务
*
* @param id 对应的控制器id主键
* @param controllerService11 修改提交参数注意传递主键版本
*/
public ControllerService12 updateControllerServices(String id, ControllerService12 controllerService11) throws Exception {
return client.put(StrUtil.format("/controller-services/{}", id), controllerService11, ControllerService12.class);
}
/**
* 启用或禁用控制器服务
*/