feat(nifi): 完善应用创建并实例化的功能

-重构了应用创建流程,支持 Oracle 和 MySQL 应用的创建和实例化
- 新增了控制器服务的修改和启用功能
- 添加了批量创建连接关系的方法- 优化了代码结构,提高了可维护性和可扩展性
This commit is contained in:
liuy 2025-05-23 10:18:07 +08:00
parent 83f99be452
commit 095e58d812
11 changed files with 244 additions and 88 deletions

View File

@ -14,6 +14,7 @@ import java.util.concurrent.TimeUnit;
/**
* OkHttpClient配置类
*
* @Authorliuyang
* @Packagecom.hzya.frame.nifi.config
* @Projectfw-nifi
@ -26,9 +27,8 @@ public class HttpClientConfig {
@Bean
public OkHttpClient okHttpClient() throws Exception {
return new OkHttpClient.Builder().sslSocketFactory(createTrustAllSslSocketFactory(), createTrustAllTrustManager())
.hostnameVerifier((hostname, session) -> true)
.connectTimeout(60, TimeUnit.SECONDS) // 连接超时时间
//调用nifi接口正常情况不会超过60秒的目前遇到的接口1秒以内调用完毕
return new OkHttpClient.Builder().sslSocketFactory(createTrustAllSslSocketFactory(), createTrustAllTrustManager()).hostnameVerifier((hostname, session) -> true).connectTimeout(60, TimeUnit.SECONDS) // 连接超时时间
.readTimeout(60, TimeUnit.SECONDS) // 读取超时时间
.writeTimeout(60, TimeUnit.SECONDS) // 写入超时时间
.build();

View File

@ -1,5 +1,7 @@
package com.hzya.frame.nifiapi.model.basemodel;
import com.hzya.frame.nifiapi.model.joincreparamcontext.CreateParamContextJoin;
import com.hzya.frame.nifiapi.model.joinfindneedmodifycontroller.FindNeedModifyController;
import lombok.Data;
/**
@ -14,9 +16,15 @@ import lombok.Data;
*/
@Data
public class JoinBashModel {
//实例化
private String appProcessGroupId;
private String parentProcessGroupId;
private String copyTargetProcessGroupId;
// private String copyTargetParentProcessGroupId;
//上下文
private CreateParamContextJoin createParamContextJoin;
//控制器
private FindNeedModifyController findNeedModifyController;
}

View File

@ -12,5 +12,5 @@ import lombok.Data;
* @FilenameCreateOracleApp
*/
@Data
public class CreateOracleApp extends JoinBashModel {
public class CreateAppInstance extends JoinBashModel {
}

View File

@ -0,0 +1,20 @@
package com.hzya.frame.nifiapi.model.joincreateprocessconnection;
import com.hzya.frame.nifiapi.model.joincreateconnections.CreateConnections18;
import lombok.Data;
import java.util.List;
/**
* @Authorliuyang
* @Packagecom.hzya.frame.nifiapi.model.joincreateprocessconnection
* @Projectfw-nifi
* @nameCreateProcessorConnections
* @Date2025/5/22 14:06
* @FilenameCreateProcessorConnections
*/
@Data
public class CreateProcessorConnections {
private String processGroupsId;
private List<CreateConnections18> createConnections18;
}

View File

@ -14,7 +14,7 @@ import java.util.Map;
*/
@Data
public class Component12 {
private String id;
private String id;//随机字符串
private String name;
private String type;
private String state;

View File

@ -12,7 +12,10 @@ import lombok.Data;
*/
@Data
public class ControllerService12 {
//控制器服务id
private String id;
private Revision12 revision;
private Component12 component;
}
//修改标记id
private String modifyMarkId;
}

View File

@ -1,7 +1,6 @@
package com.hzya.frame.nifiapi.operation;
import com.hzya.frame.nifiapi.model.joincreatemysqlapp.CreateMysqlApp;
import com.hzya.frame.nifiapi.model.joincreateoracleapp.CreateOracleApp;
import com.hzya.frame.nifiapi.model.joincreateoracleapp.CreateAppInstance;
/**
* NiFi应用实例化业务操作接口
@ -13,9 +12,9 @@ import com.hzya.frame.nifiapi.model.joincreateoracleapp.CreateOracleApp;
* @Date2025/5/21 17:05
* @FilenameNifiOperation
*/
public interface INifiOperation {
public interface INifiAppOperation {
/**
* Oracle应用创建并实例化
* 完成应用创建并实例化
* 主要包含一系列nifi接口调用操作其他应用开发封转类以此类推
* 1.创建nifi片段
* 2.把片段实例化
@ -26,12 +25,12 @@ public interface INifiOperation {
*
* @param createOracleApp 封装创建Oracle应用所需要的入参
*/
void createOracleApp(CreateOracleApp createOracleApp) throws Exception;
void createAppContext(CreateAppInstance createOracleApp) throws Exception;
/**
* Mysql应用创建并实例化
*
* @param createMysqlApp 封装创建Mysql应用所需要的入参
*/
void createMysqlApp(CreateMysqlApp createMysqlApp) throws Exception;
// void createMysqlApp(CreateMysqlApp createMysqlApp) throws Exception;
}

View File

@ -1,5 +1,6 @@
package com.hzya.frame.nifiapi.operation;
import com.hzya.frame.nifiapi.model.joincreateprocessconnection.CreateProcessorConnections;
import com.hzya.frame.nifiapi.model.joincreparamcontext.CreateParamContextJoin;
import com.hzya.frame.nifiapi.model.joinfindneedmodifycontroller.FindNeedModifyController;
import com.hzya.frame.nifiapi.model.joingetallcontrollerservice.ControllerServices19;
@ -16,7 +17,7 @@ import java.util.List;
* @Date2025/5/21 17:50
* @FilenameBaseNifiOperation
*/
public interface IBaseOperation {
public interface INifiBaseOperation {
/**
* 实例化某个应用对应nifi流程组id
*
@ -36,18 +37,38 @@ public interface IBaseOperation {
String createParameterContextsBindingProcessGroup(CreateParamContextJoin createParamContextJoin) throws Exception;
/**
* 查询需要修改的控制器详情
* 查找带有接口修改标识的控制器连接池或其他的控制器详情
*
* @param processGroupId 流程组id
*/
List<ControllerServices19> getNeedModifyControllerServicesByProcessGroup(String processGroupId) throws Exception;
/**
* 查找某个流程组(应用app),带有"修改标记"的控制器服务并更新控制器从而达到实例化要求
*
* @param isThrowError true抛出异常false不抛出异常假设某个流程下有3个控制器服务需要修改但入参只传了2个在不满足业务需求且isError=true的情况下就会抛出异常主要是提醒业务调用代码
* 查找带有接口修改标记的控制器服务eg接口修改标记:CZFJwnb9sb
*/
void findNeedModifyControllerAndChangesOccur(FindNeedModifyController findNeedModifyController, boolean isThrowError) throws Exception;
ControllerServices19 getNeedModifyControllerServicesByProcessGroup(String processGroupId, String mark) throws Exception;
/**
* 批量创建连接关系
* 查找某个流程组(应用app),带有"修改标记"的控制器服务并更新控制器从而达到实例化要求
*
* @param findNeedModifyController 控制器更新封装
* @param isThrowError true抛出异常false不抛出异常假设某个流程下有3个控制器服务需要修改但入参只传了2个在不满足业务需求且isError=true的情况下就会抛出异常主要是提醒业务调用代码
* @param enabledAll 是否激活当前流程组下的所有控制器服务 true是 false否
*/
void findNeedModifyControllerAndChangesOccur(FindNeedModifyController findNeedModifyController, boolean isThrowError, boolean enabledAll) throws Exception;
/**
* 批量创建某个流程下的连接关系
*
* @param createProcessorConnections 连接关系封装
*/
void batchCreateProcessorConnections(CreateProcessorConnections createProcessorConnections) throws Exception;
/**
* 批量激活/停用某个流程组下的所有控制器服务
*
* @param processGroupId 流程组id
* @param status ENABLED 启用DISABLED停用
*/
void batchEnabledControllerServices(String processGroupId, String status) throws Exception;
}

View File

@ -0,0 +1,95 @@
package com.hzya.frame.nifiapi.operation.impl;
import com.hzya.frame.nifiapi.model.joincreateoracleapp.CreateAppInstance;
import com.hzya.frame.nifiapi.model.joincreparamcontext.CreateParamContextJoin;
import com.hzya.frame.nifiapi.model.joinfindneedmodifycontroller.FindNeedModifyController;
import com.hzya.frame.nifiapi.model.joingetallcontrollerservice.ControllerServices19;
import com.hzya.frame.nifiapi.model.joingetcontroller.Component12;
import com.hzya.frame.nifiapi.model.joingetcontroller.ControllerService12;
import com.hzya.frame.nifiapi.operation.INifiBaseOperation;
import com.hzya.frame.nifiapi.operation.INifiAppOperation;
import com.hzya.frame.nifiapi.service.NifiApiService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Repository;
import java.util.List;
/**
* NiFi 操作实现类封装通用操作
*
* @Authorliuyang
* @Packagecom.hzya.frame.nifiapi.operation.impl
* @Projectfw-nifi
* @nameNifiOperationImpl
* @Date2025/5/21 17:06
* @FilenameNifiOperationImpl
*/
@Repository(value = "NifiOperationImpl")
public class NifiAppOperationImpl implements INifiAppOperation {
Logger logger = LoggerFactory.getLogger(NifiAppOperationImpl.class);
@Autowired
private NifiApiService nifiApiService;
@Autowired
private INifiBaseOperation baseOperation;
@Override
public void createAppContext(CreateAppInstance createOracleApp) throws Exception {
try {
//1.实例化应用
String newProcessGroupId = null;
String appProcessGroupId = createOracleApp.getAppProcessGroupId();
String parentProcessGroupId = createOracleApp.getParentProcessGroupId();
String copyTargetProcessGroupId = createOracleApp.getCopyTargetProcessGroupId();
if (appProcessGroupId != null && parentProcessGroupId != null && copyTargetProcessGroupId != null) {
newProcessGroupId = baseOperation.instantiateApp(appProcessGroupId, parentProcessGroupId, copyTargetProcessGroupId);
logger.info("实例化后的流程组id:{}", newProcessGroupId);
}
//2.新增应用对应的上下文参数
CreateParamContextJoin createParamContextJoin = createOracleApp.getCreateParamContextJoin();
if (createParamContextJoin != null) {
baseOperation.createParameterContextsBindingProcessGroup(createParamContextJoin);
}
//3.修改控制器默认激活当前流程组下的所有控制器
FindNeedModifyController findNeedModifyController = createOracleApp.getFindNeedModifyController();
if (findNeedModifyController != null) {
List<ControllerService12> needModifyController = findNeedModifyController.getNeedModifyController();
for (ControllerService12 controllerService12 : needModifyController) {
// controllerService12.setId();
//根据修改标识查找控制器
String modifyMarkId = controllerService12.getModifyMarkId();
ControllerServices19 needModifyControllerServicesByProcessGroup = baseOperation.getNeedModifyControllerServicesByProcessGroup(newProcessGroupId, modifyMarkId);
controllerService12.setId(needModifyControllerServicesByProcessGroup.getId());
Component12 component = controllerService12.getComponent();
component.setId(needModifyControllerServicesByProcessGroup.getId());
}
findNeedModifyController.setProcessGroupId(newProcessGroupId);
baseOperation.findNeedModifyControllerAndChangesOccur(findNeedModifyController, true, true);
}
} catch (Exception e) {
throw new Exception(e);
}
}
// @Override
// public void createMysqlApp(CreateMysqlApp createMysqlApp) throws Exception {
// try {
// //1.实例化Mysql应用
// String appProcessGroupId = createMysqlApp.getAppProcessGroupId();
// String appProcessGroupId1 = createMysqlApp.getParentProcessGroupId();
// String copyTargetProcessGroupId = createMysqlApp.getCopyTargetProcessGroupId();
// String processId = baseOperation.instantiateApp(appProcessGroupId, appProcessGroupId1, copyTargetProcessGroupId);
// logger.info("Mysql应用实例化后的流程组id:{}", processId);
// } catch (Exception e) {
// throw new Exception(e);
// }
// }
}

View File

@ -6,6 +6,11 @@ import com.hzya.frame.nifiapi.model.joinbindparametercontexts.BindParameterConte
import com.hzya.frame.nifiapi.model.joinbindparametercontexts.Component11;
import com.hzya.frame.nifiapi.model.joinbindparametercontexts.ParameterContext11;
import com.hzya.frame.nifiapi.model.joinbindparametercontexts.Revision11;
import com.hzya.frame.nifiapi.model.joincontrollerenabled.EnOrDiControllerServices12;
import com.hzya.frame.nifiapi.model.joincontrollerenabled.Revision13;
import com.hzya.frame.nifiapi.model.joincreateconnection.CreateConnection18;
import com.hzya.frame.nifiapi.model.joincreateconnections.CreateConnections18;
import com.hzya.frame.nifiapi.model.joincreateprocessconnection.CreateProcessorConnections;
import com.hzya.frame.nifiapi.model.joincreparamcontext.CreateParamContextJoin;
import com.hzya.frame.nifiapi.model.joinfindneedmodifycontroller.FindNeedModifyController;
import com.hzya.frame.nifiapi.model.joingetallcontrollerservice.ControllerServices19;
@ -24,7 +29,7 @@ import com.hzya.frame.nifiapi.model.resultprocessgroupsinfo.ProcessGroupsInfoRes
import com.hzya.frame.nifiapi.model.resultprocessgroupsinfo.Revision9;
import com.hzya.frame.nifiapi.model.resultsnippets.Snippet13;
import com.hzya.frame.nifiapi.model.resultsnippets.SnippetResult13;
import com.hzya.frame.nifiapi.operation.IBaseOperation;
import com.hzya.frame.nifiapi.operation.INifiBaseOperation;
import com.hzya.frame.nifiapi.service.NifiApiService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -46,9 +51,9 @@ import java.util.stream.Collectors;
* @FilenameBaseOperationImpl
*/
@Repository(value = "BaseOperationImpl")
public class BaseOperationImpl implements IBaseOperation {
public class NifiBaseOperationImpl implements INifiBaseOperation {
Logger logger = LoggerFactory.getLogger(BaseOperationImpl.class);
Logger logger = LoggerFactory.getLogger(NifiBaseOperationImpl.class);
@Autowired
private NifiApiService nifiApiService;
@ -149,13 +154,32 @@ public class BaseOperationImpl implements IBaseOperation {
List<ControllerServices19> modifiedControllers = new ArrayList<>();
GetAllController19 allControllerServices = nifiApiService.getAllControllerServices(processGroupId);
if (allControllerServices != null && allControllerServices.getControllerServices() != null) {
modifiedControllers = allControllerServices.getControllerServices().stream().filter(service -> service.getComponent() != null && controllerModifyMark.equals(service.getComponent().getComments())).collect(Collectors.toList());
modifiedControllers = allControllerServices.getControllerServices().stream().filter(service ->
service.getComponent() != null
&& service.getComponent().getComments() != null
&& service.getComponent().getComments().contains(controllerModifyMark)
).collect(Collectors.toList());
}
return modifiedControllers;
}
@Override
public void findNeedModifyControllerAndChangesOccur(FindNeedModifyController findNeedModifyController, boolean isThrowError) throws Exception {
public ControllerServices19 getNeedModifyControllerServicesByProcessGroup(String processGroupId, String mark) throws Exception {
//1.查找某个流程组内的所有控制器筛选出带"接口修改标记"的控制器片段
List<ControllerServices19> modifiedControllers = new ArrayList<>();
GetAllController19 allControllerServices = nifiApiService.getAllControllerServices(processGroupId);
if (allControllerServices != null && allControllerServices.getControllerServices() != null) {
modifiedControllers = allControllerServices.getControllerServices().stream().filter(service ->
service.getComponent() != null
&& service.getComponent().getComments() != null
&& service.getComponent().getComments().contains(mark)
).collect(Collectors.toList());
}
return modifiedControllers.get(0);
}
@Override
public void findNeedModifyControllerAndChangesOccur(FindNeedModifyController findNeedModifyController, boolean isThrowError, boolean enabledAll) throws Exception {
try {
//1.得到"接口修改标记"的控制器并验证入参数量是否匹配
List<ControllerServices19> needModifyControllerServicesByProcessGroup = getNeedModifyControllerServicesByProcessGroup(findNeedModifyController.getProcessGroupId());
@ -181,6 +205,55 @@ public class BaseOperationImpl implements IBaseOperation {
//在传参没有问题的情况下有控制器id返回即代表修改成功
logger.info("控制器id:{}", controllerService12.getId());
}
//激活当前流程组下的所有控制器
if (enabledAll) {
String controllerStatus = "ENABLED";
batchEnabledControllerServices(findNeedModifyController.getProcessGroupId(), controllerStatus);
}
} catch (Exception e) {
throw new Exception(e);
}
}
@Override
public void batchCreateProcessorConnections(CreateProcessorConnections createProcessorConnections) throws Exception {
try {
String processGroupsId = createProcessorConnections.getProcessGroupsId();
List<CreateConnections18> createConnections18 = createProcessorConnections.getCreateConnections18();
for (CreateConnections18 indexCreateConnections : createConnections18) {
CreateConnection18 processorConnections = nifiApiService.createProcessorConnections(processGroupsId, indexCreateConnections);
logger.info("连线id{}", processorConnections.getId());
}
} catch (Exception e) {
throw new Exception(e);
}
}
@Override
public void batchEnabledControllerServices(String processGroupId, String status) throws Exception {
try {
GetAllController19 allControllerServices = nifiApiService.getAllControllerServices(processGroupId);
if (allControllerServices != null && allControllerServices.getControllerServices() != null) {
List<ControllerServices19> controllerServices = allControllerServices.getControllerServices();
for (ControllerServices19 controllerService : controllerServices) {
String controllerServicesId = controllerService.getId();
//得到控制器服务当前版本
ControllerService12 controllerService12 = nifiApiService.getControllerServices(controllerServicesId);
Revision12 revision = controllerService12.getRevision();
Revision13 revision13 = new Revision13();
revision13.setVersion(revision.getVersion());
EnOrDiControllerServices12 enOrDiControllerServices = new EnOrDiControllerServices12();
enOrDiControllerServices.setState(status);
enOrDiControllerServices.setRevision(revision13);
ControllerService12 controllerService121 = nifiApiService.enabledOrDisabledControllerServices(controllerServicesId, enOrDiControllerServices);
logger.info("控制器服务id{}", controllerService121.getId());
}
}
} catch (Exception e) {
throw new Exception(e);
}

View File

@ -1,63 +0,0 @@
package com.hzya.frame.nifiapi.operation.impl;
import com.hzya.frame.nifiapi.model.joincreatemysqlapp.CreateMysqlApp;
import com.hzya.frame.nifiapi.model.joincreateoracleapp.CreateOracleApp;
import com.hzya.frame.nifiapi.operation.IBaseOperation;
import com.hzya.frame.nifiapi.operation.INifiOperation;
import com.hzya.frame.nifiapi.service.NifiApiService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Repository;
/**
* NiFi 操作实现类封装通用操作
*
* @Authorliuyang
* @Packagecom.hzya.frame.nifiapi.operation.impl
* @Projectfw-nifi
* @nameNifiOperationImpl
* @Date2025/5/21 17:06
* @FilenameNifiOperationImpl
*/
@Repository(value = "NifiOperationImpl")
public class NifiOperationImpl implements INifiOperation {
Logger logger = LoggerFactory.getLogger(NifiOperationImpl.class);
@Autowired
private NifiApiService nifiApiService;
@Autowired
private IBaseOperation baseOperation;
@Override
public void createOracleApp(CreateOracleApp createOracleApp) throws Exception {
try {
//1.实例化Oracle应用
String appProcessGroupId = createOracleApp.getAppProcessGroupId();
String appProcessGroupId1 = createOracleApp.getParentProcessGroupId();
String copyTargetProcessGroupId = createOracleApp.getCopyTargetProcessGroupId();
String processId = baseOperation.instantiateApp(appProcessGroupId, appProcessGroupId1, copyTargetProcessGroupId);
logger.info("Oracle应用实例化后的流程组id:{}", processId);
//2.新增应用对应的上下文参数
} catch (Exception e) {
throw new Exception(e);
}
}
@Override
public void createMysqlApp(CreateMysqlApp createMysqlApp) throws Exception {
try {
//1.实例化Mysql应用
String appProcessGroupId = createMysqlApp.getAppProcessGroupId();
String appProcessGroupId1 = createMysqlApp.getParentProcessGroupId();
String copyTargetProcessGroupId = createMysqlApp.getCopyTargetProcessGroupId();
String processId = baseOperation.instantiateApp(appProcessGroupId, appProcessGroupId1, copyTargetProcessGroupId);
logger.info("Mysql应用实例化后的流程组id:{}", processId);
} catch (Exception e) {
throw new Exception(e);
}
}
}