test(nifi): 添加 Nifi 数据同步相关测试用例
- 新增 TestINifiBaseOperation 类,包含控制器服务和连接关系测试- 新增 TestNifiDbSync 类,涵盖参数上下文、控制器服务、流程组创建等测试- 移除 TestNifiLog 中的未使用引用
This commit is contained in:
parent
346dcdf75d
commit
d504b44601
|
@ -0,0 +1,66 @@
|
|||
package com.hzya.frame;
|
||||
|
||||
import com.hzya.frame.nifiapi.model.joingetallcontrollerservice.Component19;
|
||||
import com.hzya.frame.nifiapi.model.joingetallcontrollerservice.ControllerServices19;
|
||||
import com.hzya.frame.nifiapi.model.resultNeedModifyController.PortFilterResult;
|
||||
import com.hzya.frame.nifiapi.operation.INifiBaseOperation;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.test.context.junit4.SpringRunner;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @Author:liuyang
|
||||
* @Package:com.hzya.frame
|
||||
* @Project:fw-nifi
|
||||
* @name:TestINifiBaseOperation
|
||||
* @Date:2025/5/22 17:02
|
||||
* @Filename:TestINifiBaseOperation
|
||||
*/
|
||||
@RunWith(SpringRunner.class)
|
||||
@SpringBootTest(classes = {WebappApplication.class})
|
||||
public class TestINifiBaseOperation {
|
||||
|
||||
@Autowired
|
||||
private INifiBaseOperation iNifiBaseOperation;
|
||||
|
||||
@Test
|
||||
public void testGetNeedModifyControllerServicesByProcessGroup() throws Exception {
|
||||
try {
|
||||
List<ControllerServices19> needModifyControllerServicesByProcessGroup = iNifiBaseOperation.getNeedModifyControllerServicesByProcessGroup("4143d803-0196-1000-968f-b06f55b43a3f");
|
||||
System.out.println(needModifyControllerServicesByProcessGroup.size());
|
||||
ControllerServices19 controllerServices19 = needModifyControllerServicesByProcessGroup.get(0);
|
||||
Component19 component = controllerServices19.getComponent();
|
||||
System.out.println(component.getName());
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetNeedModifyControllerServicesByProcessGroup2() throws Exception {
|
||||
try {
|
||||
ControllerServices19 czfJwnb9sb = iNifiBaseOperation.getNeedModifyControllerServicesByProcessGroup("4143d803-0196-1000-968f-b06f55b43a3f", "CZFJwnb9sb");
|
||||
if (czfJwnb9sb != null && czfJwnb9sb.getComponent() != null) {
|
||||
Component19 component = czfJwnb9sb.getComponent();
|
||||
String name = component.getName();
|
||||
System.out.println(name);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetNeedModifyControllerRelationshipByProcessGroup() throws Exception {
|
||||
try {
|
||||
PortFilterResult needModifyControllerRelationshipByProcessGroup = iNifiBaseOperation.getNeedModifyControllerRelationshipByProcessGroup("41437b87-0196-1000-6951-50e41e75c0da", "f5228bc8-0360-41eb-a640-9f4df953937c");
|
||||
System.out.println(needModifyControllerRelationshipByProcessGroup);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,688 @@
|
|||
package com.hzya.frame;
|
||||
|
||||
import com.hzya.frame.nifiapi.model.joincreateconnections.Component18;
|
||||
import com.hzya.frame.nifiapi.model.joincreateconnections.CreateConnections18;
|
||||
import com.hzya.frame.nifiapi.model.joincreateconnections.Revision18;
|
||||
import com.hzya.frame.nifiapi.model.joincreateconnections.SourceOrDestination18;
|
||||
import com.hzya.frame.nifiapi.model.joincreateoracleapp.CreateAppInstanceJoin;
|
||||
import com.hzya.frame.nifiapi.model.joincreateprocessconnection.CreateProcessorConnections;
|
||||
import com.hzya.frame.nifiapi.model.joincreparamcontext.CreateParamContextJoin;
|
||||
import com.hzya.frame.nifiapi.model.joinfindneedmodifycontroller.FindNeedModifyController;
|
||||
import com.hzya.frame.nifiapi.model.joingetcontroller.Component12;
|
||||
import com.hzya.frame.nifiapi.model.joingetcontroller.ControllerService12;
|
||||
import com.hzya.frame.nifiapi.model.joinparametercontexts.*;
|
||||
import com.hzya.frame.nifiapi.model.joinprocessgroups.Component7;
|
||||
import com.hzya.frame.nifiapi.model.joinprocessgroups.ProcessGroupsJoin;
|
||||
import com.hzya.frame.nifiapi.model.joinprocessgroups.Revision5;
|
||||
import com.hzya.frame.nifiapi.model.joinsnippetinstance.SnippetInstanceJoin;
|
||||
import com.hzya.frame.nifiapi.model.joinsnippets.Snippet;
|
||||
import com.hzya.frame.nifiapi.model.joinsnippets.SnippetsJoin;
|
||||
import com.hzya.frame.nifiapi.model.joinstartorstopprocessgroup.Revision10;
|
||||
import com.hzya.frame.nifiapi.model.joinstartorstopprocessgroup.StartOrStopProcessGroupsInfoJoin10;
|
||||
import com.hzya.frame.nifiapi.model.resultNeedModifyController.PortFilterResult;
|
||||
import com.hzya.frame.nifiapi.model.resultcreateoracleapp.CreateAppInstanceResult;
|
||||
import com.hzya.frame.nifiapi.model.resultcreateprocessor.*;
|
||||
import com.hzya.frame.nifiapi.model.resultcreateprocessors.CreateProcess16;
|
||||
import com.hzya.frame.nifiapi.model.resultcreatesnippet.Flow15;
|
||||
import com.hzya.frame.nifiapi.model.resultcreatesnippet.ProcessGroups15;
|
||||
import com.hzya.frame.nifiapi.model.resultcreatesnippet.SnippetInstance15;
|
||||
import com.hzya.frame.nifiapi.model.resultprocessgroups.ProcessgroupsResult;
|
||||
import com.hzya.frame.nifiapi.model.resultprocessgroupsinfo.ProcessGroupsInfoResult9;
|
||||
import com.hzya.frame.nifiapi.model.resultprocessgroupsinfo.Revision9;
|
||||
import com.hzya.frame.nifiapi.model.resultsnippets.Snippet13;
|
||||
import com.hzya.frame.nifiapi.model.resultsnippets.SnippetResult13;
|
||||
import com.hzya.frame.nifiapi.operation.INifiAppOperation;
|
||||
import com.hzya.frame.nifiapi.operation.INifiBaseOperation;
|
||||
import com.hzya.frame.nifiapi.service.NifiApiService;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.test.context.junit4.SpringRunner;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* @Author:liuyang
|
||||
* @Package:com.hzya.frame
|
||||
* @Project:fw-nifi
|
||||
* @name:TestNifiDbSync
|
||||
* @Date:2025/5/20 17:49
|
||||
* @Filename:TestNifiDbSync
|
||||
*/
|
||||
@RunWith(SpringRunner.class)
|
||||
@SpringBootTest(classes = {WebappApplication.class})
|
||||
public class TestNifiDbSync {
|
||||
|
||||
@Autowired
|
||||
private NifiApiService nifiApiService;
|
||||
|
||||
@Autowired
|
||||
private INifiAppOperation nifiOperation;
|
||||
|
||||
@Autowired
|
||||
private INifiBaseOperation iNifiBaseOperation;
|
||||
|
||||
/**
|
||||
* 测试nifi数据同步场景调用
|
||||
*/
|
||||
@Test
|
||||
public void testDbSync2() throws Exception {
|
||||
//创建业务数据流对应的流程组
|
||||
String parentProcessGroupId = "76e23115-ad84-1aab-7630-21c76aa3973d";
|
||||
String processGroupsName = "Oracle19c->Mysql跨数据源同步";
|
||||
String newProcessGroupsId = testCreateProcessGroups(parentProcessGroupId, processGroupsName);
|
||||
|
||||
//新建GenerateFlowFile处理器(开始的处理器)
|
||||
String generateFlowFileId = testCreateProcessor(newProcessGroupsId);
|
||||
|
||||
//创建Oracle应用
|
||||
//初始化实例
|
||||
CreateAppInstanceJoin createOracleApp = new CreateAppInstanceJoin();
|
||||
createOracleApp.setAppProcessGroupId("41437b87-0196-1000-6951-50e41e75c0da");
|
||||
createOracleApp.setParentProcessGroupId("76e23100-ad84-1aab-4447-b9ee936ee7d5");
|
||||
createOracleApp.setCopyTargetProcessGroupId(newProcessGroupsId);
|
||||
//上下文
|
||||
CreateParamContextJoin createParamContextJoin = setOracleParamContextJoin(newProcessGroupsId);
|
||||
createOracleApp.setCreateParamContextJoin(createParamContextJoin);
|
||||
//控制器
|
||||
FindNeedModifyController findNeedModifyController = setOracleFindNeedModifyController(newProcessGroupsId);
|
||||
createOracleApp.setFindNeedModifyController(findNeedModifyController);
|
||||
CreateAppInstanceResult oracleAppContext = nifiOperation.createAppContext(createOracleApp);
|
||||
|
||||
//创建Mysql应用
|
||||
//初始化实例
|
||||
CreateAppInstanceJoin createOracleApp2 = new CreateAppInstanceJoin();
|
||||
createOracleApp2.setAppProcessGroupId("4143d803-0196-1000-968f-b06f55b43a3f");
|
||||
createOracleApp2.setParentProcessGroupId("76e23100-ad84-1aab-4447-b9ee936ee7d5");
|
||||
createOracleApp2.setCopyTargetProcessGroupId(newProcessGroupsId);
|
||||
//上下文
|
||||
CreateParamContextJoin createParamContextJoin2 = setMysqlParamContextJoin(newProcessGroupsId);
|
||||
createOracleApp2.setCreateParamContextJoin(createParamContextJoin2);
|
||||
//控制器
|
||||
FindNeedModifyController findNeedModifyController2 = setMysqlFindNeedModifyController(newProcessGroupsId);
|
||||
createOracleApp2.setFindNeedModifyController(findNeedModifyController2);
|
||||
CreateAppInstanceResult mysqlAppContext = nifiOperation.createAppContext(createOracleApp2);
|
||||
|
||||
//新建LogAttribute处理器(结束处理器)
|
||||
String logAttributeId = testCreateProcessor2(newProcessGroupsId);
|
||||
|
||||
//查找对应应用的连接标记
|
||||
//TYPE=INPUT_PORT
|
||||
PortFilterResult oracleMark = iNifiBaseOperation.getNeedModifyControllerRelationshipByProcessGroup(oracleAppContext.getNewProcessGroupId(), null);
|
||||
//TYPE=OUTPUT_PORT
|
||||
PortFilterResult mysqlMark = iNifiBaseOperation.getNeedModifyControllerRelationshipByProcessGroup(mysqlAppContext.getNewProcessGroupId(), null);
|
||||
|
||||
//批量创建连接关系
|
||||
//连接关系一:开始处理器->Oracle应用
|
||||
SourceOrDestination18 source = new SourceOrDestination18();
|
||||
source.setId(generateFlowFileId);
|
||||
source.setGroupId(newProcessGroupsId);
|
||||
source.setType("PROCESSOR");
|
||||
|
||||
SourceOrDestination18 destination = new SourceOrDestination18();
|
||||
destination.setId(oracleMark.getInputPorts().get(0).getId());
|
||||
destination.setGroupId(oracleAppContext.getNewProcessGroupId());
|
||||
destination.setType("INPUT_PORT");
|
||||
|
||||
List<String> selectedRelationships = new ArrayList<>();
|
||||
selectedRelationships.add("success");
|
||||
|
||||
Component18 component18 = new Component18();
|
||||
component18.setSource(source);
|
||||
component18.setDestination(destination);
|
||||
component18.setSelectedRelationships(selectedRelationships);
|
||||
|
||||
Revision18 revision18 = new Revision18();
|
||||
revision18.setVersion("0");//默认传递0
|
||||
|
||||
CreateConnections18 createConnections17 = new CreateConnections18();
|
||||
createConnections17.setComponent(component18);
|
||||
createConnections17.setRevision(revision18);
|
||||
|
||||
List<CreateConnections18> createConnections18List = new ArrayList<>();
|
||||
createConnections18List.add(createConnections17);
|
||||
|
||||
//连接关系二:Oracle应用->Mysql应用
|
||||
SourceOrDestination18 source2 = new SourceOrDestination18();
|
||||
source2.setId(oracleMark.getOutputPorts().get(0).getId());
|
||||
source2.setGroupId(oracleAppContext.getNewProcessGroupId());
|
||||
source2.setType("OUTPUT_PORT");
|
||||
|
||||
SourceOrDestination18 destination2 = new SourceOrDestination18();
|
||||
destination2.setId(mysqlMark.getInputPorts().get(0).getId());
|
||||
destination2.setGroupId(mysqlAppContext.getNewProcessGroupId());
|
||||
destination2.setType("INPUT_PORT");
|
||||
|
||||
// List<String> selectedRelationships2 = new ArrayList<>();
|
||||
// selectedRelationships2.add("success");
|
||||
|
||||
Component18 component182 = new Component18();
|
||||
component182.setSource(source2);
|
||||
component182.setDestination(destination2);
|
||||
// component182.setSelectedRelationships(selectedRelationships2);
|
||||
|
||||
Revision18 revision182 = new Revision18();
|
||||
revision182.setVersion("0");//默认传递0
|
||||
|
||||
CreateConnections18 createConnections172 = new CreateConnections18();
|
||||
createConnections172.setComponent(component182);
|
||||
createConnections172.setRevision(revision182);
|
||||
createConnections18List.add(createConnections172);
|
||||
|
||||
//连接关系三:Mysql应用->结束处理器
|
||||
SourceOrDestination18 source3 = new SourceOrDestination18();
|
||||
source3.setId(mysqlMark.getOutputPorts().get(0).getId());
|
||||
source3.setGroupId(mysqlAppContext.getNewProcessGroupId());
|
||||
source3.setType("OUTPUT_PORT");
|
||||
|
||||
SourceOrDestination18 destination3 = new SourceOrDestination18();
|
||||
destination3.setId(logAttributeId);
|
||||
destination3.setGroupId(newProcessGroupsId);
|
||||
destination3.setType("PROCESSOR");
|
||||
|
||||
// List<String> selectedRelationships3 = new ArrayList<>();
|
||||
// selectedRelationships3.add("success");
|
||||
|
||||
Component18 component183 = new Component18();
|
||||
component183.setSource(source3);
|
||||
component183.setDestination(destination3);
|
||||
// component183.setSelectedRelationships(selectedRelationships3);
|
||||
|
||||
Revision18 revision183 = new Revision18();
|
||||
revision183.setVersion("0");//默认传递0
|
||||
|
||||
CreateConnections18 createConnections173 = new CreateConnections18();
|
||||
createConnections173.setComponent(component183);
|
||||
createConnections173.setRevision(revision183);
|
||||
createConnections18List.add(createConnections173);
|
||||
|
||||
CreateProcessorConnections createProcessorConnections = new CreateProcessorConnections();
|
||||
createProcessorConnections.setProcessGroupsId(newProcessGroupsId);
|
||||
createProcessorConnections.setCreateConnections18(createConnections18List);
|
||||
iNifiBaseOperation.batchCreateProcessorConnections(createProcessorConnections);
|
||||
|
||||
//启动业务处理器
|
||||
ProcessGroupsInfoResult9 processGroups = nifiApiService.getProcessGroups(newProcessGroupsId);
|
||||
Revision9 revision = processGroups.getRevision();
|
||||
|
||||
Revision10 revision10 = new Revision10();
|
||||
revision10.setVersion(revision.getVersion());
|
||||
|
||||
StartOrStopProcessGroupsInfoJoin10 startOrStopProcessGroupsInfoJoin10 = new StartOrStopProcessGroupsInfoJoin10();
|
||||
startOrStopProcessGroupsInfoJoin10.setId(processGroups.getId());
|
||||
startOrStopProcessGroupsInfoJoin10.setState("RUNNING");
|
||||
startOrStopProcessGroupsInfoJoin10.setRevision(revision10);
|
||||
ProcessGroupsInfoResult9 processGroupsInfoResult9 = nifiApiService.startOrStopProcessGroups(startOrStopProcessGroupsInfoJoin10);
|
||||
System.out.println("流程组id:" + processGroupsInfoResult9.getId());
|
||||
}
|
||||
|
||||
|
||||
public CreateParamContextJoin setOracleParamContextJoin(String processGroupsId) throws Exception {
|
||||
Revision revision = new Revision();
|
||||
revision.setVersion(0);
|
||||
|
||||
Parameter2 parameter = new Parameter2();
|
||||
parameter.setSensitive(false);
|
||||
parameter.setName("additionalWhereClause");
|
||||
parameter.setValue("1=1");
|
||||
parameter.setDescription("参数描述");
|
||||
|
||||
Parameter2 parameter2 = new Parameter2();
|
||||
parameter2.setSensitive(false);
|
||||
parameter2.setName("columnsToReturn");
|
||||
parameter2.setValue("CSALEID,VRECEIPTCODE,NHEADSUMMNY,TS");
|
||||
parameter2.setDescription("参数描述");
|
||||
|
||||
Parameter2 parameter3 = new Parameter2();
|
||||
parameter3.setSensitive(false);
|
||||
parameter3.setName("customOrderByColumn");
|
||||
parameter3.setValue("ts asc");
|
||||
parameter3.setDescription("参数描述");
|
||||
|
||||
Parameter2 parameter4 = new Parameter2();
|
||||
parameter4.setSensitive(false);
|
||||
parameter4.setName("maximumValueColumns");
|
||||
parameter4.setValue("ts");
|
||||
parameter4.setDescription("参数描述");
|
||||
|
||||
Parameter2 parameter5 = new Parameter2();
|
||||
parameter5.setSensitive(false);
|
||||
parameter5.setName("sourceTbName");
|
||||
parameter5.setValue("U8C241231.SO_SALE");
|
||||
parameter5.setDescription("参数描述");
|
||||
|
||||
Parameters2 parameters = new Parameters2();
|
||||
parameters.setParameter(parameter);
|
||||
|
||||
Parameters2 parameters2 = new Parameters2();
|
||||
parameters2.setParameter(parameter2);
|
||||
|
||||
Parameters2 parameters3 = new Parameters2();
|
||||
parameters3.setParameter(parameter3);
|
||||
|
||||
Parameters2 parameters4 = new Parameters2();
|
||||
parameters4.setParameter(parameter4);
|
||||
|
||||
Parameters2 parameters5 = new Parameters2();
|
||||
parameters5.setParameter(parameter5);
|
||||
|
||||
List<Parameters2> parametersList = new ArrayList<>();
|
||||
parametersList.add(parameters);
|
||||
parametersList.add(parameters2);
|
||||
parametersList.add(parameters3);
|
||||
parametersList.add(parameters4);
|
||||
parametersList.add(parameters5);
|
||||
|
||||
Component3 component = new Component3();
|
||||
component.setName("oracle参数环境2505221548");
|
||||
component.setParameters(parametersList);
|
||||
|
||||
ParameterContextsJoin parameterContextsJoin = new ParameterContextsJoin();
|
||||
parameterContextsJoin.setComponent(component);
|
||||
parameterContextsJoin.setRevision(revision);
|
||||
|
||||
CreateParamContextJoin createParamContextJoin = new CreateParamContextJoin();
|
||||
createParamContextJoin.setParameterContextsJoin(parameterContextsJoin);
|
||||
createParamContextJoin.setProcessGroupsId(processGroupsId);
|
||||
return createParamContextJoin;
|
||||
}
|
||||
|
||||
public CreateParamContextJoin setMysqlParamContextJoin(String processGroupsId) throws Exception {
|
||||
Revision revision = new Revision();
|
||||
revision.setVersion(0);
|
||||
|
||||
Parameter2 parameter = new Parameter2();
|
||||
parameter.setSensitive(false);
|
||||
parameter.setName("Release Signal Identifier");
|
||||
parameter.setValue("table_creation");
|
||||
parameter.setDescription("参数描述");
|
||||
|
||||
Parameter2 parameter2 = new Parameter2();
|
||||
parameter2.setSensitive(false);
|
||||
parameter2.setName("autoTableCreation");
|
||||
parameter2.setValue("Y");
|
||||
parameter2.setDescription("参数描述");
|
||||
|
||||
Parameter2 parameter3 = new Parameter2();
|
||||
parameter3.setSensitive(false);
|
||||
parameter3.setName("mappingRelationship");
|
||||
parameter3.setValue("[\n" + " {\n" + " \"targetTbName\": \"iep_so_sale\",\n" + " \"targetDBType\": \"mysql8.0.44\",\n" + " \"writeType\": \"1\",\n" + " \"fieldRelationship\": [\n" + " {\n" + " \"sourceFieldName\": \"CSALEID\",\n" + " \"sourceFieldType\": \"CHAR(20)\",\n" + " \"targetFieldName\": \"csaleid\",\n" + " \"targetFieldType\": \"varchar(20)\",\n" + " \"targetFieldNameNotes\": \"销售主表id\",\n" + " \"primaryKey\": \"Y\"\n" + " },\n" + " {\n" + " \"sourceFieldName\": \"VRECEIPTCODE\",\n" + " \"sourceFieldType\": \"VARCHAR(30)\",\n" + " \"targetFieldName\": \"vreceiptcode\",\n" + " \"targetFieldType\": \"varchar(20)\",\n" + " \"targetFieldNameNotes\": \"单据号\"\n" + " },\n" + " {\n" + " \"sourceFieldName\": \"NHEADSUMMNY\",\n" + " \"sourceFieldType\": \"DECIMAL(13,8)\",\n" + " \"targetFieldName\": \"nheadsummny\",\n" + " \"targetFieldType\": \"decimal(20,10)\",\n" + " \"targetFieldNameNotes\": \"金额合计\"\n" + " },\n" + " {\n" + " \"sourceFieldName\": \"TS\",\n" + " \"sourceFieldType\": \"CHAR(19)\",\n" + " \"targetFieldName\": \"ts\",\n" + " \"targetFieldType\": \"varchar(20)\",\n" + " \"targetFieldNameNotes\": \"时间戳\"\n" + " }\n" + " ]\n" + " }\n" + " ]");
|
||||
parameter3.setDescription("参数描述");
|
||||
|
||||
// Parameter2 parameter4 = new Parameter2();
|
||||
// parameter4.setSensitive(false);
|
||||
// parameter4.setName("maximumValueColumns");
|
||||
// parameter4.setValue("ts");
|
||||
// parameter4.setDescription("参数描述");
|
||||
|
||||
// Parameter2 parameter5 = new Parameter2();
|
||||
// parameter5.setSensitive(false);
|
||||
// parameter5.setName("sourceTbName");
|
||||
// parameter5.setValue("U8C241231.SO_SALE");
|
||||
// parameter5.setDescription("参数描述");
|
||||
|
||||
Parameters2 parameters = new Parameters2();
|
||||
parameters.setParameter(parameter);
|
||||
|
||||
Parameters2 parameters2 = new Parameters2();
|
||||
parameters2.setParameter(parameter2);
|
||||
|
||||
Parameters2 parameters3 = new Parameters2();
|
||||
parameters3.setParameter(parameter3);
|
||||
|
||||
// Parameters2 parameters4 = new Parameters2();
|
||||
// parameters4.setParameter(parameter4);
|
||||
|
||||
// Parameters2 parameters5 = new Parameters2();
|
||||
// parameters5.setParameter(parameter5);
|
||||
|
||||
List<Parameters2> parametersList = new ArrayList<>();
|
||||
parametersList.add(parameters);
|
||||
parametersList.add(parameters2);
|
||||
parametersList.add(parameters3);
|
||||
// parametersList.add(parameters4);
|
||||
// parametersList.add(parameters5);
|
||||
|
||||
Component3 component = new Component3();
|
||||
component.setName("mysql参数环境2505230915");
|
||||
component.setParameters(parametersList);
|
||||
|
||||
ParameterContextsJoin parameterContextsJoin = new ParameterContextsJoin();
|
||||
parameterContextsJoin.setComponent(component);
|
||||
parameterContextsJoin.setRevision(revision);
|
||||
|
||||
CreateParamContextJoin createParamContextJoin = new CreateParamContextJoin();
|
||||
createParamContextJoin.setParameterContextsJoin(parameterContextsJoin);
|
||||
createParamContextJoin.setProcessGroupsId(processGroupsId);
|
||||
return createParamContextJoin;
|
||||
}
|
||||
|
||||
/**
|
||||
* 控制器参数-oracle
|
||||
*/
|
||||
private FindNeedModifyController setOracleFindNeedModifyController(String processGroupId) {
|
||||
//创建Mysql数据源
|
||||
Map<String, String> properties = new HashMap<>();
|
||||
properties.put("Database Connection URL", "jdbc:oracle:thin:@39.170.109.90:1521:orcl");
|
||||
properties.put("Database Driver Class Name", "oracle.jdbc.OracleDriver");
|
||||
properties.put("database-driver-locations", "/data/nifi-1.28.1/lib/ojdbc8.jar");
|
||||
properties.put("Database User", "U8C241231");
|
||||
properties.put("Password", "U8C241231");
|
||||
properties.put("Max Wait Time", "600001 millis");
|
||||
properties.put("Max Total Connections", "20");
|
||||
properties.put("Validation-query", "select 1 from dual");
|
||||
properties.put("dbcp-min-idle-conns", "5");
|
||||
properties.put("dbcp-max-idle-conns", "10");
|
||||
properties.put("dbcp-max-conn-lifetime", "-1");
|
||||
properties.put("dbcp-time-between-eviction-runs", "-1");
|
||||
properties.put("dbcp-min-evictable-idle-time", "30 mins");
|
||||
properties.put("dbcp-soft-min-evictable-idle-time", "-1");
|
||||
|
||||
Component12 component12 = new Component12();
|
||||
component12.setName("Oracle数据源");
|
||||
component12.setType("org.apache.nifi.dbcp.DBCPConnectionPool");
|
||||
//传这个值并没有用
|
||||
// component12.setState("ENABLED");
|
||||
component12.setProperties(properties);
|
||||
component12.setComments("Oracle数据源备注");
|
||||
component12.setBulletinLevel("DEBUG");
|
||||
// component12.setId();//需要修改的控制器id
|
||||
|
||||
ControllerService12 controllerService11 = new ControllerService12();
|
||||
controllerService11.setComponent(component12);
|
||||
controllerService11.setModifyMarkId("vuBXdbEu7v");
|
||||
// controllerService11.setId();//需要修改的控制器id
|
||||
|
||||
List<ControllerService12> controllerService12List = new ArrayList<>();
|
||||
controllerService12List.add(controllerService11);
|
||||
|
||||
FindNeedModifyController findNeedModifyController = new FindNeedModifyController();
|
||||
findNeedModifyController.setNeedModifyController(controllerService12List);
|
||||
findNeedModifyController.setProcessGroupId(processGroupId);
|
||||
return findNeedModifyController;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 控制器参数-oracle
|
||||
*/
|
||||
private FindNeedModifyController setMysqlFindNeedModifyController(String processGroupId) {
|
||||
//创建Mysql数据源
|
||||
Map<String, String> properties = new HashMap<>();
|
||||
properties.put("Database Connection URL", "jdbc:mysql://ufidahz.com.cn:9015/apache_nifi_sync?useSSL=false&serverTimezone=Asia/Shanghai&characterEncoding=utf8&useLegacyDatetimeCode=false");
|
||||
properties.put("Database Driver Class Name", "com.mysql.cj.jdbc.Driver");
|
||||
properties.put("database-driver-locations", "/data/nifi-1.28.1/lib/mysql-connector-java-8.0.30.jar");
|
||||
properties.put("Database User", "root");
|
||||
properties.put("Password", "hzya1314");
|
||||
properties.put("Max Wait Time", "600001 millis");
|
||||
properties.put("Max Total Connections", "15");
|
||||
properties.put("Validation-query", "select 1");
|
||||
properties.put("dbcp-min-idle-conns", "5");
|
||||
properties.put("dbcp-max-idle-conns", "10");
|
||||
properties.put("dbcp-max-conn-lifetime", "-1");
|
||||
properties.put("dbcp-time-between-eviction-runs", "-1");
|
||||
properties.put("dbcp-min-evictable-idle-time", "30 mins");
|
||||
properties.put("dbcp-soft-min-evictable-idle-time", "-1");
|
||||
|
||||
Component12 component12 = new Component12();
|
||||
component12.setName("Mysql数据源");
|
||||
component12.setType("org.apache.nifi.dbcp.DBCPConnectionPool");
|
||||
//传这个值并没有用
|
||||
// component12.setState("ENABLED");
|
||||
component12.setProperties(properties);
|
||||
component12.setComments("Mysql数据源备注");
|
||||
component12.setBulletinLevel("DEBUG");
|
||||
// component12.setId();//需要修改的控制器id
|
||||
|
||||
ControllerService12 controllerService11 = new ControllerService12();
|
||||
controllerService11.setComponent(component12);
|
||||
controllerService11.setModifyMarkId("CZFJwnb9sb");
|
||||
// controllerService11.setId();//需要修改的控制器id
|
||||
|
||||
List<ControllerService12> controllerService12List = new ArrayList<>();
|
||||
controllerService12List.add(controllerService11);
|
||||
|
||||
FindNeedModifyController findNeedModifyController = new FindNeedModifyController();
|
||||
findNeedModifyController.setNeedModifyController(controllerService12List);
|
||||
findNeedModifyController.setProcessGroupId(processGroupId);
|
||||
return findNeedModifyController;
|
||||
}
|
||||
|
||||
/**
|
||||
* 测试nifi数据同步场景调用
|
||||
*/
|
||||
@Test
|
||||
public void testDbSync() throws Exception {
|
||||
//新建「开始」处理器
|
||||
String parentProcessGroupId = "76e23115-ad84-1aab-7630-21c76aa3973d";
|
||||
String processGroupsName = "跨数据源同步";
|
||||
String processGroupsId = testCreateProcessGroups(parentProcessGroupId, processGroupsName);
|
||||
|
||||
//新建GenerateFlowFile处理器(最开始的处理器)
|
||||
String generateFlowFileId = testCreateProcessor(processGroupsId);
|
||||
|
||||
//拷贝Oracle应用,并实例化到新建的流程组
|
||||
// String processGroupId = "41437b87-0196-1000-6951-50e41e75c0da";
|
||||
// String parentProcessGroupId = "76e23100-ad84-1aab-4447-b9ee936ee7d5";
|
||||
String oracleSnippetId = testCreateSnippets("41437b87-0196-1000-6951-50e41e75c0da", "76e23100-ad84-1aab-4447-b9ee936ee7d5");
|
||||
String oracleProcessGroupId = testCreateSnippetInstance(processGroupsId, parentProcessGroupId, oracleSnippetId);
|
||||
|
||||
//拷贝Mysql应用,并实例化到新建的流程组
|
||||
// String processGroupId2 = "4143d803-0196-1000-968f-b06f55b43a3f";
|
||||
// String parentProcessGroupId2 = "76e23100-ad84-1aab-4447-b9ee936ee7d5";
|
||||
String mysqlSnippetId = testCreateSnippets("4143d803-0196-1000-968f-b06f55b43a3f", "76e23100-ad84-1aab-4447-b9ee936ee7d5");
|
||||
String mysqlProcessGroupId = testCreateSnippetInstance(processGroupsId, parentProcessGroupId, mysqlSnippetId);
|
||||
|
||||
//新建LogAttribute处理器(结束处理器)
|
||||
String logAttributeId = testCreateProcessor2(processGroupsId);
|
||||
|
||||
//修改Oracle应用数据源
|
||||
//修改Oracle应用上下文环境
|
||||
//修改mysql应用数据源
|
||||
//修改mysql应用上下文环境
|
||||
//启动Mysql应用对应的控制器服务
|
||||
//启动Oracle应用对应的控制器服务
|
||||
//创建连接关系
|
||||
//启动处理器
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建空的流程组
|
||||
*
|
||||
* @param parentProcessGroupId 需要新建的目标流程组的父流程组id
|
||||
* @param name 目标流程组名称
|
||||
*/
|
||||
public String testCreateProcessGroups(String parentProcessGroupId, String name) throws Exception {
|
||||
String processGroupsId = "";
|
||||
try {
|
||||
Revision5 revision5 = new Revision5();
|
||||
revision5.setVersion(0);
|
||||
|
||||
Component7 component7 = new Component7();
|
||||
component7.setName(name);
|
||||
|
||||
ProcessGroupsJoin processGroupsJoin = new ProcessGroupsJoin();
|
||||
processGroupsJoin.setRevision(revision5);
|
||||
processGroupsJoin.setComponent(component7);
|
||||
|
||||
//父处理器组
|
||||
// String parentGroupId = "76e23115-ad84-1aab-7630-21c76aa3973d";
|
||||
ProcessgroupsResult processGroups = nifiApiService.createProcessGroups(parentProcessGroupId, processGroupsJoin);
|
||||
processGroupsId = processGroups.getId();
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
return processGroupsId;
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建GenerateFlowFile处理器
|
||||
*/
|
||||
public String testCreateProcessor(String groupId) {
|
||||
String processId = null;
|
||||
try {
|
||||
Revision15 revision15 = new Revision15();
|
||||
revision15.setVersion("0");
|
||||
|
||||
Component15 component15 = new Component15();
|
||||
component15.setType("org.apache.nifi.processors.standard.GenerateFlowFile");
|
||||
component15.setName("定时调度");
|
||||
|
||||
// Position15 position15 = new Position15();
|
||||
// position15.setX("0");
|
||||
// position15.setY("0");
|
||||
|
||||
Config15 config15 = new Config15();
|
||||
config15.setSchedulingPeriod("1 min");
|
||||
|
||||
Properties15 properties15 = new Properties15();
|
||||
properties15.setFileSize("1KB");
|
||||
|
||||
config15.setProperties(properties15);
|
||||
|
||||
component15.setConfig(config15);
|
||||
|
||||
CreateProcessorJoin15 createProcessorJoin15 = new CreateProcessorJoin15();
|
||||
createProcessorJoin15.setComponent(component15);
|
||||
createProcessorJoin15.setRevision(revision15);
|
||||
|
||||
// String groupId = "019610d1-f933-1f70-16ea-3cf005012c4b";
|
||||
CreateProcess16 processor = nifiApiService.createProcessor(groupId, createProcessorJoin15);
|
||||
processId = processor.getId();
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
return processId;
|
||||
}
|
||||
|
||||
/**
|
||||
* 为指定流程组创建片段
|
||||
*
|
||||
* @param targetCopyProcessGroupId 需要复制的目标流程组id
|
||||
* @param parentProcessGroupId 目标流程组的父流程组id
|
||||
*/
|
||||
public String testCreateSnippets(String targetCopyProcessGroupId, String parentProcessGroupId) {
|
||||
String snippetId = null;
|
||||
try {
|
||||
//查询目标流程组版本
|
||||
ProcessGroupsInfoResult9 processGroups = nifiApiService.getProcessGroups(targetCopyProcessGroupId);
|
||||
Revision9 revision = processGroups.getRevision();
|
||||
|
||||
Map<String, String> processGroupsMap2 = new HashMap<>();
|
||||
processGroupsMap2.put("version", revision.getVersion());//目标流程组版本号
|
||||
|
||||
Map<String, Map<String, String>> processGroupsMap = new HashMap<>();
|
||||
processGroupsMap.put(targetCopyProcessGroupId, processGroupsMap2);//目标流程组id
|
||||
|
||||
Snippet snippet = new Snippet();
|
||||
snippet.setParentGroupId(parentProcessGroupId);//目标流程组id的父流程组id
|
||||
snippet.setProcessGroups(processGroupsMap);
|
||||
|
||||
SnippetsJoin snippetsJoin = new SnippetsJoin();
|
||||
snippetsJoin.setSnippet(snippet);
|
||||
SnippetResult13 snippets = nifiApiService.createSnippets(snippetsJoin);
|
||||
Snippet13 snippet1 = snippets.getSnippet();
|
||||
snippetId = snippet1.getId();
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
return snippetId;
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建LogAttribute处理器
|
||||
*/
|
||||
public String testCreateProcessor2(String groupId) throws Exception {
|
||||
String processorId = null;
|
||||
try {
|
||||
Revision15 revision15 = new Revision15();
|
||||
revision15.setVersion("0");
|
||||
|
||||
Component15 component15 = new Component15();
|
||||
component15.setType("org.apache.nifi.processors.standard.LogAttribute");
|
||||
component15.setName("结束节点");
|
||||
|
||||
// Position15 position15 = new Position15();
|
||||
// position15.setX("0");
|
||||
// position15.setY("0");
|
||||
|
||||
Properties15 properties15 = new Properties15();
|
||||
properties15.setLogLevel("info");
|
||||
properties15.setLogPayload("false");
|
||||
properties15.setAttributesToLogRegex(".*");
|
||||
properties15.setLogFlowFileProperties("true");
|
||||
properties15.setOutputFormat("Line per Attribute");
|
||||
properties15.setCharacterSet("UTF-8");
|
||||
|
||||
Config15 config15 = new Config15();
|
||||
config15.setProperties(properties15);
|
||||
config15.setBulletinLevel("DEBUG");
|
||||
|
||||
component15.setConfig(config15);
|
||||
// component15.setPosition(position15);
|
||||
|
||||
CreateProcessorJoin15 createProcessorJoin15 = new CreateProcessorJoin15();
|
||||
createProcessorJoin15.setComponent(component15);
|
||||
createProcessorJoin15.setRevision(revision15);
|
||||
|
||||
// String groupId = "019610d1-f933-1f70-16ea-3cf005012c4b";
|
||||
CreateProcess16 processor = nifiApiService.createProcessor(groupId, createProcessorJoin15);
|
||||
// System.out.println("处理器id:" + processor.getId());
|
||||
processorId = processor.getId();
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
return processorId;
|
||||
}
|
||||
|
||||
/**
|
||||
* 将片段实例到指定位置
|
||||
*
|
||||
* @param groupId 目标流程组id(需要拷贝到哪个流程组)
|
||||
* @param parentGroupId 目标流程组的父流程组id()
|
||||
* @param snippetId 片段id
|
||||
*/
|
||||
public String testCreateSnippetInstance(String groupId, String parentGroupId, String snippetId) throws Exception {
|
||||
String processGroupId = null;
|
||||
try {
|
||||
//先请求得到代码片段id
|
||||
ProcessGroupsInfoResult9 processGroups = nifiApiService.getProcessGroups(groupId);
|
||||
Revision9 revision = processGroups.getRevision();
|
||||
|
||||
Map<String, String> processGroupsMap2 = new HashMap<>();
|
||||
processGroupsMap2.put("version", revision.getVersion());//目标流程组版本号
|
||||
|
||||
Map<String, Map<String, String>> processGroupsMap = new HashMap<>();
|
||||
processGroupsMap.put(groupId, processGroupsMap2);//目标流程组id
|
||||
|
||||
// Snippet snippet = new Snippet();
|
||||
// snippet.setParentGroupId(parentGroupId);//目标流程组id的父流程组id
|
||||
// snippet.setProcessGroups(processGroupsMap);
|
||||
|
||||
// SnippetsJoin snippetsJoin = new SnippetsJoin();
|
||||
// snippetsJoin.setSnippet(snippet);
|
||||
// SnippetResult13 snippets = nifiApiService.createSnippets(snippetsJoin);
|
||||
// Snippet13 snippet1 = snippets.getSnippet();
|
||||
// System.out.println(snippet1.getId());
|
||||
|
||||
//将片段实例化到指定位置
|
||||
// String groupId = "019610d1-f933-1f70-16ea-3cf005012c4b";//流程组id
|
||||
SnippetInstanceJoin snippetInstanceJoin = new SnippetInstanceJoin();
|
||||
snippetInstanceJoin.setSnippetId(snippetId);
|
||||
// snippetInstanceJoin.setOriginY("0");
|
||||
// snippetInstanceJoin.setOriginX("0");
|
||||
SnippetInstance15 snippetInstance = nifiApiService.createSnippetInstance(groupId, snippetInstanceJoin);
|
||||
Flow15 flow = snippetInstance.getFlow();
|
||||
ProcessGroups15 processGroups15 = flow.getProcessGroups().get(0);
|
||||
// System.out.println(processGroups15.getId());
|
||||
processGroupId = processGroups15.getId();
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
return processGroupId;
|
||||
}
|
||||
}
|
|
@ -3,7 +3,6 @@ package com.hzya.frame;
|
|||
import com.github.pagehelper.PageInfo;
|
||||
import com.hzya.frame.nifilog.dao.ILoggingEvent20250430Dao;
|
||||
import com.hzya.frame.nifilog.entity.LoggingEvent20250430Entity;
|
||||
import com.hzya.frame.page.PageAttribute;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
|
|
@ -12,6 +12,8 @@ import com.hzya.frame.nifiapi.model.joincreateconnections.CreateConnections18;
|
|||
import com.hzya.frame.nifiapi.model.joincreateconnections.Revision18;
|
||||
import com.hzya.frame.nifiapi.model.joincreateconnections.SourceOrDestination18;
|
||||
import com.hzya.frame.nifiapi.model.joincreatetemp.CreateTemplateJoin;
|
||||
import com.hzya.frame.nifiapi.model.joingetallcontrollerservice.ControllerServices19;
|
||||
import com.hzya.frame.nifiapi.model.joingetallcontrollerservice.GetAllController19;
|
||||
import com.hzya.frame.nifiapi.model.joingetcontroller.Component12;
|
||||
import com.hzya.frame.nifiapi.model.joingetcontroller.ControllerService12;
|
||||
import com.hzya.frame.nifiapi.model.joingetcontroller.Revision12;
|
||||
|
@ -27,6 +29,7 @@ import com.hzya.frame.nifiapi.model.joinstartorstopprocessgroup.StartOrStopProce
|
|||
import com.hzya.frame.nifiapi.model.joinupdateprocessor.Revision17;
|
||||
import com.hzya.frame.nifiapi.model.joinupdateprocessor.RunStatusOrStop17;
|
||||
import com.hzya.frame.nifiapi.model.nifitemplates.NifiTemplates;
|
||||
import com.hzya.frame.nifiapi.model.processgroupid.ProcessGroupsId;
|
||||
import com.hzya.frame.nifiapi.model.resultcreateprocessor.*;
|
||||
import com.hzya.frame.nifiapi.model.resultcreateprocessors.CreateProcess16;
|
||||
import com.hzya.frame.nifiapi.model.resultcreatesnippet.Flow15;
|
||||
|
@ -94,8 +97,8 @@ public class TestNifiServiceApi {
|
|||
public void testQueryFlowProcessGroupsRoot() {
|
||||
try {
|
||||
// nifiApiService.queryFlowProcessGroupsRoot(null);
|
||||
Object o = nifiApiService.queryFlowProcessGroupsRoot("41437b87-0196-1000-6951-50e41e75c0da");
|
||||
System.out.println(o);
|
||||
ProcessGroupsId processGroupsId = nifiApiService.queryFlowProcessGroupsRoot("41437b87-0196-1000-6951-50e41e75c0da");
|
||||
System.out.println(processGroupsId);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
@ -231,12 +234,12 @@ public class TestNifiServiceApi {
|
|||
try {
|
||||
// ProcessGroupStatus processGroupStatus = new ProcessGroupStatus();
|
||||
// processGroupStatus.setName("测试流程组");
|
||||
|
||||
// for (int i = 0; i < 100; i++) {
|
||||
Revision5 revision5 = new Revision5();
|
||||
revision5.setVersion(0);
|
||||
|
||||
Component7 component7 = new Component7();
|
||||
component7.setName("测试流程组");
|
||||
component7.setName("测试位置排列");
|
||||
|
||||
ProcessGroupsJoin processGroupsJoin = new ProcessGroupsJoin();
|
||||
// processGroupsJoin.setStatus(processGroupStatus);
|
||||
|
@ -244,9 +247,10 @@ public class TestNifiServiceApi {
|
|||
processGroupsJoin.setComponent(component7);
|
||||
|
||||
//父处理器组
|
||||
String parentGroupId = "76e23115-ad84-1aab-7630-21c76aa3973d";
|
||||
String parentGroupId = "019611c2-f933-1f70-3bde-71084293c3e4";
|
||||
ProcessgroupsResult processGroups = nifiApiService.createProcessGroups(parentGroupId, processGroupsJoin);
|
||||
System.out.println("流程组id:" + processGroups.getId());
|
||||
// }
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
@ -348,6 +352,56 @@ public class TestNifiServiceApi {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 修改某个控制器服务
|
||||
*/
|
||||
@Test
|
||||
public void updateCreateControllerServices() {
|
||||
try {
|
||||
//查询控制器详情,带版本号
|
||||
ControllerService12 controllerServices = nifiApiService.getControllerServices("019612d0-f933-1f70-424e-014d36a06b10");
|
||||
Revision12 revision = controllerServices.getRevision();
|
||||
|
||||
Revision12 revision12 = new Revision12();
|
||||
revision12.setVersion(revision.getVersion());
|
||||
|
||||
//创建Mysql数据源
|
||||
Map<String, String> properties = new HashMap<>();
|
||||
properties.put("Database Connection URL", "jdbc:mysql://ufidahz.com.cn:9015/apache_nifi_sync?useSSL=false&serverTimezone=Asia/Shanghai&characterEncoding=utf8&useLegacyDatetimeCode=false");
|
||||
properties.put("Database Driver Class Name", "com.mysql.cj.jdbc.Driver");
|
||||
properties.put("database-driver-locations", "/data/nifi-1.28.1/lib/mysql-connector-java-8.0.30.jar");
|
||||
properties.put("Database User", "root");
|
||||
properties.put("Password", "hzya1314");
|
||||
properties.put("Max Wait Time", "600001 millis");
|
||||
properties.put("Max Total Connections", "15");
|
||||
properties.put("Validation-query", "select 1");
|
||||
properties.put("dbcp-min-idle-conns", "5");
|
||||
properties.put("dbcp-max-idle-conns", "10");
|
||||
properties.put("dbcp-max-conn-lifetime", "-1");
|
||||
properties.put("dbcp-time-between-eviction-runs", "-1");
|
||||
properties.put("dbcp-min-evictable-idle-time", "30 mins");
|
||||
properties.put("dbcp-soft-min-evictable-idle-time", "-1");
|
||||
|
||||
Component12 component12 = new Component12();
|
||||
component12.setName("Mysql数据源");
|
||||
component12.setType("org.apache.nifi.dbcp.DBCPConnectionPool");
|
||||
component12.setState("ENABLED");
|
||||
component12.setProperties(properties);
|
||||
component12.setComments("一个备注");
|
||||
component12.setBulletinLevel("DEBUG");
|
||||
component12.setId(controllerServices.getId());
|
||||
|
||||
ControllerService12 controllerService11 = new ControllerService12();
|
||||
controllerService11.setComponent(component12);
|
||||
controllerService11.setRevision(revision12);
|
||||
controllerService11.setId(controllerServices.getId());
|
||||
ControllerService12 controllerService12 = nifiApiService.updateControllerServices("019612d0-f933-1f70-424e-014d36a06b10", controllerService11);
|
||||
System.out.println(controllerService12.getId());
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 控制器服务-Mysql数据源
|
||||
*/
|
||||
|
@ -386,7 +440,7 @@ public class TestNifiServiceApi {
|
|||
ControllerService12 controllerService11 = new ControllerService12();
|
||||
controllerService11.setComponent(component12);
|
||||
controllerService11.setRevision(revision12);
|
||||
ControllerService12 controllerServices = nifiApiService.createControllerServices("d7f1b14d-0196-1000-ce43-7499a84f6096", controllerService11);
|
||||
ControllerService12 controllerServices = nifiApiService.createControllerServices("76e23115-ad84-1aab-7630-21c76aa3973d", controllerService11);
|
||||
System.out.println(controllerServices.getId());
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
|
@ -785,6 +839,56 @@ public class TestNifiServiceApi {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建LogAttribute处理器
|
||||
*/
|
||||
@Test
|
||||
public void testCreateProcessor2() throws Exception {
|
||||
try {
|
||||
// for (int i = 0; i < 100; i++) {
|
||||
String parentGroupId = "019610d1-f933-1f70-16ea-3cf005012c4b";
|
||||
double[] doubles = nifiApiService.calculatePosition(parentGroupId);
|
||||
|
||||
Revision15 revision15 = new Revision15();
|
||||
revision15.setVersion("0");
|
||||
|
||||
Component15 component15 = new Component15();
|
||||
component15.setType("org.apache.nifi.processors.standard.LogAttribute");
|
||||
component15.setName("结束节点");
|
||||
|
||||
Position15 position15 = new Position15();
|
||||
position15.setX(String.valueOf(doubles[0]));
|
||||
position15.setY(String.valueOf(doubles[1]));
|
||||
|
||||
Properties15 properties15 = new Properties15();
|
||||
properties15.setLogLevel("info");
|
||||
properties15.setLogPayload("false");
|
||||
properties15.setAttributesToLogRegex(".*");
|
||||
properties15.setLogFlowFileProperties("true");
|
||||
properties15.setOutputFormat("Line per Attribute");
|
||||
properties15.setCharacterSet("UTF-8");
|
||||
|
||||
Config15 config15 = new Config15();
|
||||
config15.setProperties(properties15);
|
||||
config15.setBulletinLevel("DEBUG");
|
||||
|
||||
component15.setConfig(config15);
|
||||
component15.setPosition(position15);
|
||||
|
||||
CreateProcessorJoin15 createProcessorJoin15 = new CreateProcessorJoin15();
|
||||
createProcessorJoin15.setComponent(component15);
|
||||
createProcessorJoin15.setRevision(revision15);
|
||||
|
||||
String groupId = "019610d1-f933-1f70-16ea-3cf005012c4b";
|
||||
CreateProcess16 processor = nifiApiService.createProcessor(groupId, createProcessorJoin15);
|
||||
// String id = processor.getId();
|
||||
System.out.println("处理器id:" + processor.getId());
|
||||
// }
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 查询处理器详情
|
||||
*/
|
||||
|
@ -792,7 +896,7 @@ public class TestNifiServiceApi {
|
|||
public void testGetProcessor() throws Exception {
|
||||
try {
|
||||
// String processId = "01961137-f933-1f70-4f08-6fc7041f99bd";
|
||||
String processId2 = "ad841aed-30f7-16e2-4fa0-595ab0ab9fdc";
|
||||
String processId2 = "7f70897a-0196-1000-241a-3a7cdf5bf521";
|
||||
ProcessorsInfo16 processor = nifiApiService.getProcessor(processId2);
|
||||
String id = processor.getId();
|
||||
System.out.println(id);
|
||||
|
@ -927,4 +1031,60 @@ public class TestNifiServiceApi {
|
|||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 测试组件坐标计算
|
||||
*/
|
||||
@Test
|
||||
public void testCalculatePosition() throws Exception {
|
||||
try {
|
||||
String parentGroupId = "76e23100-ad84-1aab-4447-b9ee936ee7d5";
|
||||
double[] doubles = nifiApiService.calculatePosition(parentGroupId);
|
||||
System.out.println(doubles);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 测试查询流程组内全部的控制器
|
||||
*/
|
||||
@Test
|
||||
public void testGetAllControllerServices() throws Exception {
|
||||
try {
|
||||
String processGroupId = "4143d803-0196-1000-968f-b06f55b43a3f";
|
||||
GetAllController19 allControllerServices = nifiApiService.getAllControllerServices(processGroupId);
|
||||
List<ControllerServices19> controllerServices = allControllerServices.getControllerServices();
|
||||
System.out.println(controllerServices.size());
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 查询流程组详情
|
||||
*/
|
||||
@Test
|
||||
public void testGetProcessGroups() throws Exception {
|
||||
try {
|
||||
ProcessGroupsInfoResult9 processGroups = nifiApiService.getProcessGroups("41437b87-0196-1000-6951-50e41e75c0da");
|
||||
System.out.println(processGroups.getId());
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 查询某个流程组内的子流程组
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
// @Test
|
||||
// public void testGetSubProcessGroups() throws Exception {
|
||||
// try {
|
||||
// nifiApiService.getSubProcessGroups("41437b87-0196-1000-6951-50e41e75c0da");
|
||||
// } catch (Exception e) {
|
||||
// e.printStackTrace();
|
||||
// }
|
||||
// }
|
||||
}
|
Loading…
Reference in New Issue