diff --git a/base-buildpackage/src/test/java/com/hzya/frame/TestINifiBaseOperation.java b/base-buildpackage/src/test/java/com/hzya/frame/TestINifiBaseOperation.java new file mode 100644 index 00000000..57cdab46 --- /dev/null +++ b/base-buildpackage/src/test/java/com/hzya/frame/TestINifiBaseOperation.java @@ -0,0 +1,66 @@ +package com.hzya.frame; + +import com.hzya.frame.nifiapi.model.joingetallcontrollerservice.Component19; +import com.hzya.frame.nifiapi.model.joingetallcontrollerservice.ControllerServices19; +import com.hzya.frame.nifiapi.model.resultNeedModifyController.PortFilterResult; +import com.hzya.frame.nifiapi.operation.INifiBaseOperation; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.junit4.SpringRunner; + +import java.util.List; + +/** + * @Author:liuyang + * @Package:com.hzya.frame + * @Project:fw-nifi + * @name:TestINifiBaseOperation + * @Date:2025/5/22 17:02 + * @Filename:TestINifiBaseOperation + */ +@RunWith(SpringRunner.class) +@SpringBootTest(classes = {WebappApplication.class}) +public class TestINifiBaseOperation { + + @Autowired + private INifiBaseOperation iNifiBaseOperation; + + @Test + public void testGetNeedModifyControllerServicesByProcessGroup() throws Exception { + try { + List needModifyControllerServicesByProcessGroup = iNifiBaseOperation.getNeedModifyControllerServicesByProcessGroup("4143d803-0196-1000-968f-b06f55b43a3f"); + System.out.println(needModifyControllerServicesByProcessGroup.size()); + ControllerServices19 controllerServices19 = needModifyControllerServicesByProcessGroup.get(0); + Component19 component = controllerServices19.getComponent(); + System.out.println(component.getName()); + } catch (Exception e) { + e.printStackTrace(); + } + } + + @Test + public void testGetNeedModifyControllerServicesByProcessGroup2() throws Exception { + try { + ControllerServices19 czfJwnb9sb = iNifiBaseOperation.getNeedModifyControllerServicesByProcessGroup("4143d803-0196-1000-968f-b06f55b43a3f", "CZFJwnb9sb"); + if (czfJwnb9sb != null && czfJwnb9sb.getComponent() != null) { + Component19 component = czfJwnb9sb.getComponent(); + String name = component.getName(); + System.out.println(name); + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + @Test + public void testGetNeedModifyControllerRelationshipByProcessGroup() throws Exception { + try { + PortFilterResult needModifyControllerRelationshipByProcessGroup = iNifiBaseOperation.getNeedModifyControllerRelationshipByProcessGroup("41437b87-0196-1000-6951-50e41e75c0da", "f5228bc8-0360-41eb-a640-9f4df953937c"); + System.out.println(needModifyControllerRelationshipByProcessGroup); + } catch (Exception e) { + e.printStackTrace(); + } + } +} diff --git a/base-buildpackage/src/test/java/com/hzya/frame/TestNifiDbSync.java b/base-buildpackage/src/test/java/com/hzya/frame/TestNifiDbSync.java new file mode 100644 index 00000000..4fa25848 --- /dev/null +++ b/base-buildpackage/src/test/java/com/hzya/frame/TestNifiDbSync.java @@ -0,0 +1,688 @@ +package com.hzya.frame; + +import com.hzya.frame.nifiapi.model.joincreateconnections.Component18; +import com.hzya.frame.nifiapi.model.joincreateconnections.CreateConnections18; +import com.hzya.frame.nifiapi.model.joincreateconnections.Revision18; +import com.hzya.frame.nifiapi.model.joincreateconnections.SourceOrDestination18; +import com.hzya.frame.nifiapi.model.joincreateoracleapp.CreateAppInstanceJoin; +import com.hzya.frame.nifiapi.model.joincreateprocessconnection.CreateProcessorConnections; +import com.hzya.frame.nifiapi.model.joincreparamcontext.CreateParamContextJoin; +import com.hzya.frame.nifiapi.model.joinfindneedmodifycontroller.FindNeedModifyController; +import com.hzya.frame.nifiapi.model.joingetcontroller.Component12; +import com.hzya.frame.nifiapi.model.joingetcontroller.ControllerService12; +import com.hzya.frame.nifiapi.model.joinparametercontexts.*; +import com.hzya.frame.nifiapi.model.joinprocessgroups.Component7; +import com.hzya.frame.nifiapi.model.joinprocessgroups.ProcessGroupsJoin; +import com.hzya.frame.nifiapi.model.joinprocessgroups.Revision5; +import com.hzya.frame.nifiapi.model.joinsnippetinstance.SnippetInstanceJoin; +import com.hzya.frame.nifiapi.model.joinsnippets.Snippet; +import com.hzya.frame.nifiapi.model.joinsnippets.SnippetsJoin; +import com.hzya.frame.nifiapi.model.joinstartorstopprocessgroup.Revision10; +import com.hzya.frame.nifiapi.model.joinstartorstopprocessgroup.StartOrStopProcessGroupsInfoJoin10; +import com.hzya.frame.nifiapi.model.resultNeedModifyController.PortFilterResult; +import com.hzya.frame.nifiapi.model.resultcreateoracleapp.CreateAppInstanceResult; +import com.hzya.frame.nifiapi.model.resultcreateprocessor.*; +import com.hzya.frame.nifiapi.model.resultcreateprocessors.CreateProcess16; +import com.hzya.frame.nifiapi.model.resultcreatesnippet.Flow15; +import com.hzya.frame.nifiapi.model.resultcreatesnippet.ProcessGroups15; +import com.hzya.frame.nifiapi.model.resultcreatesnippet.SnippetInstance15; +import com.hzya.frame.nifiapi.model.resultprocessgroups.ProcessgroupsResult; +import com.hzya.frame.nifiapi.model.resultprocessgroupsinfo.ProcessGroupsInfoResult9; +import com.hzya.frame.nifiapi.model.resultprocessgroupsinfo.Revision9; +import com.hzya.frame.nifiapi.model.resultsnippets.Snippet13; +import com.hzya.frame.nifiapi.model.resultsnippets.SnippetResult13; +import com.hzya.frame.nifiapi.operation.INifiAppOperation; +import com.hzya.frame.nifiapi.operation.INifiBaseOperation; +import com.hzya.frame.nifiapi.service.NifiApiService; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.junit4.SpringRunner; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * @Author:liuyang + * @Package:com.hzya.frame + * @Project:fw-nifi + * @name:TestNifiDbSync + * @Date:2025/5/20 17:49 + * @Filename:TestNifiDbSync + */ +@RunWith(SpringRunner.class) +@SpringBootTest(classes = {WebappApplication.class}) +public class TestNifiDbSync { + + @Autowired + private NifiApiService nifiApiService; + + @Autowired + private INifiAppOperation nifiOperation; + + @Autowired + private INifiBaseOperation iNifiBaseOperation; + + /** + * 测试nifi数据同步场景调用 + */ + @Test + public void testDbSync2() throws Exception { + //创建业务数据流对应的流程组 + String parentProcessGroupId = "76e23115-ad84-1aab-7630-21c76aa3973d"; + String processGroupsName = "Oracle19c->Mysql跨数据源同步"; + String newProcessGroupsId = testCreateProcessGroups(parentProcessGroupId, processGroupsName); + + //新建GenerateFlowFile处理器(开始的处理器) + String generateFlowFileId = testCreateProcessor(newProcessGroupsId); + + //创建Oracle应用 + //初始化实例 + CreateAppInstanceJoin createOracleApp = new CreateAppInstanceJoin(); + createOracleApp.setAppProcessGroupId("41437b87-0196-1000-6951-50e41e75c0da"); + createOracleApp.setParentProcessGroupId("76e23100-ad84-1aab-4447-b9ee936ee7d5"); + createOracleApp.setCopyTargetProcessGroupId(newProcessGroupsId); + //上下文 + CreateParamContextJoin createParamContextJoin = setOracleParamContextJoin(newProcessGroupsId); + createOracleApp.setCreateParamContextJoin(createParamContextJoin); + //控制器 + FindNeedModifyController findNeedModifyController = setOracleFindNeedModifyController(newProcessGroupsId); + createOracleApp.setFindNeedModifyController(findNeedModifyController); + CreateAppInstanceResult oracleAppContext = nifiOperation.createAppContext(createOracleApp); + + //创建Mysql应用 + //初始化实例 + CreateAppInstanceJoin createOracleApp2 = new CreateAppInstanceJoin(); + createOracleApp2.setAppProcessGroupId("4143d803-0196-1000-968f-b06f55b43a3f"); + createOracleApp2.setParentProcessGroupId("76e23100-ad84-1aab-4447-b9ee936ee7d5"); + createOracleApp2.setCopyTargetProcessGroupId(newProcessGroupsId); + //上下文 + CreateParamContextJoin createParamContextJoin2 = setMysqlParamContextJoin(newProcessGroupsId); + createOracleApp2.setCreateParamContextJoin(createParamContextJoin2); + //控制器 + FindNeedModifyController findNeedModifyController2 = setMysqlFindNeedModifyController(newProcessGroupsId); + createOracleApp2.setFindNeedModifyController(findNeedModifyController2); + CreateAppInstanceResult mysqlAppContext = nifiOperation.createAppContext(createOracleApp2); + + //新建LogAttribute处理器(结束处理器) + String logAttributeId = testCreateProcessor2(newProcessGroupsId); + + //查找对应应用的连接标记 + //TYPE=INPUT_PORT + PortFilterResult oracleMark = iNifiBaseOperation.getNeedModifyControllerRelationshipByProcessGroup(oracleAppContext.getNewProcessGroupId(), null); + //TYPE=OUTPUT_PORT + PortFilterResult mysqlMark = iNifiBaseOperation.getNeedModifyControllerRelationshipByProcessGroup(mysqlAppContext.getNewProcessGroupId(), null); + + //批量创建连接关系 + //连接关系一:开始处理器->Oracle应用 + SourceOrDestination18 source = new SourceOrDestination18(); + source.setId(generateFlowFileId); + source.setGroupId(newProcessGroupsId); + source.setType("PROCESSOR"); + + SourceOrDestination18 destination = new SourceOrDestination18(); + destination.setId(oracleMark.getInputPorts().get(0).getId()); + destination.setGroupId(oracleAppContext.getNewProcessGroupId()); + destination.setType("INPUT_PORT"); + + List selectedRelationships = new ArrayList<>(); + selectedRelationships.add("success"); + + Component18 component18 = new Component18(); + component18.setSource(source); + component18.setDestination(destination); + component18.setSelectedRelationships(selectedRelationships); + + Revision18 revision18 = new Revision18(); + revision18.setVersion("0");//默认传递0 + + CreateConnections18 createConnections17 = new CreateConnections18(); + createConnections17.setComponent(component18); + createConnections17.setRevision(revision18); + + List createConnections18List = new ArrayList<>(); + createConnections18List.add(createConnections17); + + //连接关系二:Oracle应用->Mysql应用 + SourceOrDestination18 source2 = new SourceOrDestination18(); + source2.setId(oracleMark.getOutputPorts().get(0).getId()); + source2.setGroupId(oracleAppContext.getNewProcessGroupId()); + source2.setType("OUTPUT_PORT"); + + SourceOrDestination18 destination2 = new SourceOrDestination18(); + destination2.setId(mysqlMark.getInputPorts().get(0).getId()); + destination2.setGroupId(mysqlAppContext.getNewProcessGroupId()); + destination2.setType("INPUT_PORT"); + +// List selectedRelationships2 = new ArrayList<>(); +// selectedRelationships2.add("success"); + + Component18 component182 = new Component18(); + component182.setSource(source2); + component182.setDestination(destination2); +// component182.setSelectedRelationships(selectedRelationships2); + + Revision18 revision182 = new Revision18(); + revision182.setVersion("0");//默认传递0 + + CreateConnections18 createConnections172 = new CreateConnections18(); + createConnections172.setComponent(component182); + createConnections172.setRevision(revision182); + createConnections18List.add(createConnections172); + + //连接关系三:Mysql应用->结束处理器 + SourceOrDestination18 source3 = new SourceOrDestination18(); + source3.setId(mysqlMark.getOutputPorts().get(0).getId()); + source3.setGroupId(mysqlAppContext.getNewProcessGroupId()); + source3.setType("OUTPUT_PORT"); + + SourceOrDestination18 destination3 = new SourceOrDestination18(); + destination3.setId(logAttributeId); + destination3.setGroupId(newProcessGroupsId); + destination3.setType("PROCESSOR"); + +// List selectedRelationships3 = new ArrayList<>(); +// selectedRelationships3.add("success"); + + Component18 component183 = new Component18(); + component183.setSource(source3); + component183.setDestination(destination3); +// component183.setSelectedRelationships(selectedRelationships3); + + Revision18 revision183 = new Revision18(); + revision183.setVersion("0");//默认传递0 + + CreateConnections18 createConnections173 = new CreateConnections18(); + createConnections173.setComponent(component183); + createConnections173.setRevision(revision183); + createConnections18List.add(createConnections173); + + CreateProcessorConnections createProcessorConnections = new CreateProcessorConnections(); + createProcessorConnections.setProcessGroupsId(newProcessGroupsId); + createProcessorConnections.setCreateConnections18(createConnections18List); + iNifiBaseOperation.batchCreateProcessorConnections(createProcessorConnections); + + //启动业务处理器 + ProcessGroupsInfoResult9 processGroups = nifiApiService.getProcessGroups(newProcessGroupsId); + Revision9 revision = processGroups.getRevision(); + + Revision10 revision10 = new Revision10(); + revision10.setVersion(revision.getVersion()); + + StartOrStopProcessGroupsInfoJoin10 startOrStopProcessGroupsInfoJoin10 = new StartOrStopProcessGroupsInfoJoin10(); + startOrStopProcessGroupsInfoJoin10.setId(processGroups.getId()); + startOrStopProcessGroupsInfoJoin10.setState("RUNNING"); + startOrStopProcessGroupsInfoJoin10.setRevision(revision10); + ProcessGroupsInfoResult9 processGroupsInfoResult9 = nifiApiService.startOrStopProcessGroups(startOrStopProcessGroupsInfoJoin10); + System.out.println("流程组id:" + processGroupsInfoResult9.getId()); + } + + + public CreateParamContextJoin setOracleParamContextJoin(String processGroupsId) throws Exception { + Revision revision = new Revision(); + revision.setVersion(0); + + Parameter2 parameter = new Parameter2(); + parameter.setSensitive(false); + parameter.setName("additionalWhereClause"); + parameter.setValue("1=1"); + parameter.setDescription("参数描述"); + + Parameter2 parameter2 = new Parameter2(); + parameter2.setSensitive(false); + parameter2.setName("columnsToReturn"); + parameter2.setValue("CSALEID,VRECEIPTCODE,NHEADSUMMNY,TS"); + parameter2.setDescription("参数描述"); + + Parameter2 parameter3 = new Parameter2(); + parameter3.setSensitive(false); + parameter3.setName("customOrderByColumn"); + parameter3.setValue("ts asc"); + parameter3.setDescription("参数描述"); + + Parameter2 parameter4 = new Parameter2(); + parameter4.setSensitive(false); + parameter4.setName("maximumValueColumns"); + parameter4.setValue("ts"); + parameter4.setDescription("参数描述"); + + Parameter2 parameter5 = new Parameter2(); + parameter5.setSensitive(false); + parameter5.setName("sourceTbName"); + parameter5.setValue("U8C241231.SO_SALE"); + parameter5.setDescription("参数描述"); + + Parameters2 parameters = new Parameters2(); + parameters.setParameter(parameter); + + Parameters2 parameters2 = new Parameters2(); + parameters2.setParameter(parameter2); + + Parameters2 parameters3 = new Parameters2(); + parameters3.setParameter(parameter3); + + Parameters2 parameters4 = new Parameters2(); + parameters4.setParameter(parameter4); + + Parameters2 parameters5 = new Parameters2(); + parameters5.setParameter(parameter5); + + List parametersList = new ArrayList<>(); + parametersList.add(parameters); + parametersList.add(parameters2); + parametersList.add(parameters3); + parametersList.add(parameters4); + parametersList.add(parameters5); + + Component3 component = new Component3(); + component.setName("oracle参数环境2505221548"); + component.setParameters(parametersList); + + ParameterContextsJoin parameterContextsJoin = new ParameterContextsJoin(); + parameterContextsJoin.setComponent(component); + parameterContextsJoin.setRevision(revision); + + CreateParamContextJoin createParamContextJoin = new CreateParamContextJoin(); + createParamContextJoin.setParameterContextsJoin(parameterContextsJoin); + createParamContextJoin.setProcessGroupsId(processGroupsId); + return createParamContextJoin; + } + + public CreateParamContextJoin setMysqlParamContextJoin(String processGroupsId) throws Exception { + Revision revision = new Revision(); + revision.setVersion(0); + + Parameter2 parameter = new Parameter2(); + parameter.setSensitive(false); + parameter.setName("Release Signal Identifier"); + parameter.setValue("table_creation"); + parameter.setDescription("参数描述"); + + Parameter2 parameter2 = new Parameter2(); + parameter2.setSensitive(false); + parameter2.setName("autoTableCreation"); + parameter2.setValue("Y"); + parameter2.setDescription("参数描述"); + + Parameter2 parameter3 = new Parameter2(); + parameter3.setSensitive(false); + parameter3.setName("mappingRelationship"); + parameter3.setValue("[\n" + " {\n" + " \"targetTbName\": \"iep_so_sale\",\n" + " \"targetDBType\": \"mysql8.0.44\",\n" + " \"writeType\": \"1\",\n" + " \"fieldRelationship\": [\n" + " {\n" + " \"sourceFieldName\": \"CSALEID\",\n" + " \"sourceFieldType\": \"CHAR(20)\",\n" + " \"targetFieldName\": \"csaleid\",\n" + " \"targetFieldType\": \"varchar(20)\",\n" + " \"targetFieldNameNotes\": \"销售主表id\",\n" + " \"primaryKey\": \"Y\"\n" + " },\n" + " {\n" + " \"sourceFieldName\": \"VRECEIPTCODE\",\n" + " \"sourceFieldType\": \"VARCHAR(30)\",\n" + " \"targetFieldName\": \"vreceiptcode\",\n" + " \"targetFieldType\": \"varchar(20)\",\n" + " \"targetFieldNameNotes\": \"单据号\"\n" + " },\n" + " {\n" + " \"sourceFieldName\": \"NHEADSUMMNY\",\n" + " \"sourceFieldType\": \"DECIMAL(13,8)\",\n" + " \"targetFieldName\": \"nheadsummny\",\n" + " \"targetFieldType\": \"decimal(20,10)\",\n" + " \"targetFieldNameNotes\": \"金额合计\"\n" + " },\n" + " {\n" + " \"sourceFieldName\": \"TS\",\n" + " \"sourceFieldType\": \"CHAR(19)\",\n" + " \"targetFieldName\": \"ts\",\n" + " \"targetFieldType\": \"varchar(20)\",\n" + " \"targetFieldNameNotes\": \"时间戳\"\n" + " }\n" + " ]\n" + " }\n" + " ]"); + parameter3.setDescription("参数描述"); + +// Parameter2 parameter4 = new Parameter2(); +// parameter4.setSensitive(false); +// parameter4.setName("maximumValueColumns"); +// parameter4.setValue("ts"); +// parameter4.setDescription("参数描述"); + +// Parameter2 parameter5 = new Parameter2(); +// parameter5.setSensitive(false); +// parameter5.setName("sourceTbName"); +// parameter5.setValue("U8C241231.SO_SALE"); +// parameter5.setDescription("参数描述"); + + Parameters2 parameters = new Parameters2(); + parameters.setParameter(parameter); + + Parameters2 parameters2 = new Parameters2(); + parameters2.setParameter(parameter2); + + Parameters2 parameters3 = new Parameters2(); + parameters3.setParameter(parameter3); + +// Parameters2 parameters4 = new Parameters2(); +// parameters4.setParameter(parameter4); + +// Parameters2 parameters5 = new Parameters2(); +// parameters5.setParameter(parameter5); + + List parametersList = new ArrayList<>(); + parametersList.add(parameters); + parametersList.add(parameters2); + parametersList.add(parameters3); +// parametersList.add(parameters4); +// parametersList.add(parameters5); + + Component3 component = new Component3(); + component.setName("mysql参数环境2505230915"); + component.setParameters(parametersList); + + ParameterContextsJoin parameterContextsJoin = new ParameterContextsJoin(); + parameterContextsJoin.setComponent(component); + parameterContextsJoin.setRevision(revision); + + CreateParamContextJoin createParamContextJoin = new CreateParamContextJoin(); + createParamContextJoin.setParameterContextsJoin(parameterContextsJoin); + createParamContextJoin.setProcessGroupsId(processGroupsId); + return createParamContextJoin; + } + + /** + * 控制器参数-oracle + */ + private FindNeedModifyController setOracleFindNeedModifyController(String processGroupId) { + //创建Mysql数据源 + Map properties = new HashMap<>(); + properties.put("Database Connection URL", "jdbc:oracle:thin:@39.170.109.90:1521:orcl"); + properties.put("Database Driver Class Name", "oracle.jdbc.OracleDriver"); + properties.put("database-driver-locations", "/data/nifi-1.28.1/lib/ojdbc8.jar"); + properties.put("Database User", "U8C241231"); + properties.put("Password", "U8C241231"); + properties.put("Max Wait Time", "600001 millis"); + properties.put("Max Total Connections", "20"); + properties.put("Validation-query", "select 1 from dual"); + properties.put("dbcp-min-idle-conns", "5"); + properties.put("dbcp-max-idle-conns", "10"); + properties.put("dbcp-max-conn-lifetime", "-1"); + properties.put("dbcp-time-between-eviction-runs", "-1"); + properties.put("dbcp-min-evictable-idle-time", "30 mins"); + properties.put("dbcp-soft-min-evictable-idle-time", "-1"); + + Component12 component12 = new Component12(); + component12.setName("Oracle数据源"); + component12.setType("org.apache.nifi.dbcp.DBCPConnectionPool"); + //传这个值并没有用 +// component12.setState("ENABLED"); + component12.setProperties(properties); + component12.setComments("Oracle数据源备注"); + component12.setBulletinLevel("DEBUG"); +// component12.setId();//需要修改的控制器id + + ControllerService12 controllerService11 = new ControllerService12(); + controllerService11.setComponent(component12); + controllerService11.setModifyMarkId("vuBXdbEu7v"); +// controllerService11.setId();//需要修改的控制器id + + List controllerService12List = new ArrayList<>(); + controllerService12List.add(controllerService11); + + FindNeedModifyController findNeedModifyController = new FindNeedModifyController(); + findNeedModifyController.setNeedModifyController(controllerService12List); + findNeedModifyController.setProcessGroupId(processGroupId); + return findNeedModifyController; + } + + + /** + * 控制器参数-oracle + */ + private FindNeedModifyController setMysqlFindNeedModifyController(String processGroupId) { + //创建Mysql数据源 + Map properties = new HashMap<>(); + properties.put("Database Connection URL", "jdbc:mysql://ufidahz.com.cn:9015/apache_nifi_sync?useSSL=false&serverTimezone=Asia/Shanghai&characterEncoding=utf8&useLegacyDatetimeCode=false"); + properties.put("Database Driver Class Name", "com.mysql.cj.jdbc.Driver"); + properties.put("database-driver-locations", "/data/nifi-1.28.1/lib/mysql-connector-java-8.0.30.jar"); + properties.put("Database User", "root"); + properties.put("Password", "hzya1314"); + properties.put("Max Wait Time", "600001 millis"); + properties.put("Max Total Connections", "15"); + properties.put("Validation-query", "select 1"); + properties.put("dbcp-min-idle-conns", "5"); + properties.put("dbcp-max-idle-conns", "10"); + properties.put("dbcp-max-conn-lifetime", "-1"); + properties.put("dbcp-time-between-eviction-runs", "-1"); + properties.put("dbcp-min-evictable-idle-time", "30 mins"); + properties.put("dbcp-soft-min-evictable-idle-time", "-1"); + + Component12 component12 = new Component12(); + component12.setName("Mysql数据源"); + component12.setType("org.apache.nifi.dbcp.DBCPConnectionPool"); + //传这个值并没有用 +// component12.setState("ENABLED"); + component12.setProperties(properties); + component12.setComments("Mysql数据源备注"); + component12.setBulletinLevel("DEBUG"); +// component12.setId();//需要修改的控制器id + + ControllerService12 controllerService11 = new ControllerService12(); + controllerService11.setComponent(component12); + controllerService11.setModifyMarkId("CZFJwnb9sb"); +// controllerService11.setId();//需要修改的控制器id + + List controllerService12List = new ArrayList<>(); + controllerService12List.add(controllerService11); + + FindNeedModifyController findNeedModifyController = new FindNeedModifyController(); + findNeedModifyController.setNeedModifyController(controllerService12List); + findNeedModifyController.setProcessGroupId(processGroupId); + return findNeedModifyController; + } + + /** + * 测试nifi数据同步场景调用 + */ + @Test + public void testDbSync() throws Exception { + //新建「开始」处理器 + String parentProcessGroupId = "76e23115-ad84-1aab-7630-21c76aa3973d"; + String processGroupsName = "跨数据源同步"; + String processGroupsId = testCreateProcessGroups(parentProcessGroupId, processGroupsName); + + //新建GenerateFlowFile处理器(最开始的处理器) + String generateFlowFileId = testCreateProcessor(processGroupsId); + + //拷贝Oracle应用,并实例化到新建的流程组 +// String processGroupId = "41437b87-0196-1000-6951-50e41e75c0da"; +// String parentProcessGroupId = "76e23100-ad84-1aab-4447-b9ee936ee7d5"; + String oracleSnippetId = testCreateSnippets("41437b87-0196-1000-6951-50e41e75c0da", "76e23100-ad84-1aab-4447-b9ee936ee7d5"); + String oracleProcessGroupId = testCreateSnippetInstance(processGroupsId, parentProcessGroupId, oracleSnippetId); + + //拷贝Mysql应用,并实例化到新建的流程组 +// String processGroupId2 = "4143d803-0196-1000-968f-b06f55b43a3f"; +// String parentProcessGroupId2 = "76e23100-ad84-1aab-4447-b9ee936ee7d5"; + String mysqlSnippetId = testCreateSnippets("4143d803-0196-1000-968f-b06f55b43a3f", "76e23100-ad84-1aab-4447-b9ee936ee7d5"); + String mysqlProcessGroupId = testCreateSnippetInstance(processGroupsId, parentProcessGroupId, mysqlSnippetId); + + //新建LogAttribute处理器(结束处理器) + String logAttributeId = testCreateProcessor2(processGroupsId); + + //修改Oracle应用数据源 + //修改Oracle应用上下文环境 + //修改mysql应用数据源 + //修改mysql应用上下文环境 + //启动Mysql应用对应的控制器服务 + //启动Oracle应用对应的控制器服务 + //创建连接关系 + //启动处理器 + } + + /** + * 创建空的流程组 + * + * @param parentProcessGroupId 需要新建的目标流程组的父流程组id + * @param name 目标流程组名称 + */ + public String testCreateProcessGroups(String parentProcessGroupId, String name) throws Exception { + String processGroupsId = ""; + try { + Revision5 revision5 = new Revision5(); + revision5.setVersion(0); + + Component7 component7 = new Component7(); + component7.setName(name); + + ProcessGroupsJoin processGroupsJoin = new ProcessGroupsJoin(); + processGroupsJoin.setRevision(revision5); + processGroupsJoin.setComponent(component7); + + //父处理器组 +// String parentGroupId = "76e23115-ad84-1aab-7630-21c76aa3973d"; + ProcessgroupsResult processGroups = nifiApiService.createProcessGroups(parentProcessGroupId, processGroupsJoin); + processGroupsId = processGroups.getId(); + } catch (Exception e) { + e.printStackTrace(); + } + return processGroupsId; + } + + /** + * 创建GenerateFlowFile处理器 + */ + public String testCreateProcessor(String groupId) { + String processId = null; + try { + Revision15 revision15 = new Revision15(); + revision15.setVersion("0"); + + Component15 component15 = new Component15(); + component15.setType("org.apache.nifi.processors.standard.GenerateFlowFile"); + component15.setName("定时调度"); + +// Position15 position15 = new Position15(); +// position15.setX("0"); +// position15.setY("0"); + + Config15 config15 = new Config15(); + config15.setSchedulingPeriod("1 min"); + + Properties15 properties15 = new Properties15(); + properties15.setFileSize("1KB"); + + config15.setProperties(properties15); + + component15.setConfig(config15); + + CreateProcessorJoin15 createProcessorJoin15 = new CreateProcessorJoin15(); + createProcessorJoin15.setComponent(component15); + createProcessorJoin15.setRevision(revision15); + +// String groupId = "019610d1-f933-1f70-16ea-3cf005012c4b"; + CreateProcess16 processor = nifiApiService.createProcessor(groupId, createProcessorJoin15); + processId = processor.getId(); + } catch (Exception e) { + e.printStackTrace(); + } + return processId; + } + + /** + * 为指定流程组创建片段 + * + * @param targetCopyProcessGroupId 需要复制的目标流程组id + * @param parentProcessGroupId 目标流程组的父流程组id + */ + public String testCreateSnippets(String targetCopyProcessGroupId, String parentProcessGroupId) { + String snippetId = null; + try { + //查询目标流程组版本 + ProcessGroupsInfoResult9 processGroups = nifiApiService.getProcessGroups(targetCopyProcessGroupId); + Revision9 revision = processGroups.getRevision(); + + Map processGroupsMap2 = new HashMap<>(); + processGroupsMap2.put("version", revision.getVersion());//目标流程组版本号 + + Map> processGroupsMap = new HashMap<>(); + processGroupsMap.put(targetCopyProcessGroupId, processGroupsMap2);//目标流程组id + + Snippet snippet = new Snippet(); + snippet.setParentGroupId(parentProcessGroupId);//目标流程组id的父流程组id + snippet.setProcessGroups(processGroupsMap); + + SnippetsJoin snippetsJoin = new SnippetsJoin(); + snippetsJoin.setSnippet(snippet); + SnippetResult13 snippets = nifiApiService.createSnippets(snippetsJoin); + Snippet13 snippet1 = snippets.getSnippet(); + snippetId = snippet1.getId(); + } catch (Exception e) { + e.printStackTrace(); + } + return snippetId; + } + + /** + * 创建LogAttribute处理器 + */ + public String testCreateProcessor2(String groupId) throws Exception { + String processorId = null; + try { + Revision15 revision15 = new Revision15(); + revision15.setVersion("0"); + + Component15 component15 = new Component15(); + component15.setType("org.apache.nifi.processors.standard.LogAttribute"); + component15.setName("结束节点"); + +// Position15 position15 = new Position15(); +// position15.setX("0"); +// position15.setY("0"); + + Properties15 properties15 = new Properties15(); + properties15.setLogLevel("info"); + properties15.setLogPayload("false"); + properties15.setAttributesToLogRegex(".*"); + properties15.setLogFlowFileProperties("true"); + properties15.setOutputFormat("Line per Attribute"); + properties15.setCharacterSet("UTF-8"); + + Config15 config15 = new Config15(); + config15.setProperties(properties15); + config15.setBulletinLevel("DEBUG"); + + component15.setConfig(config15); +// component15.setPosition(position15); + + CreateProcessorJoin15 createProcessorJoin15 = new CreateProcessorJoin15(); + createProcessorJoin15.setComponent(component15); + createProcessorJoin15.setRevision(revision15); + +// String groupId = "019610d1-f933-1f70-16ea-3cf005012c4b"; + CreateProcess16 processor = nifiApiService.createProcessor(groupId, createProcessorJoin15); +// System.out.println("处理器id:" + processor.getId()); + processorId = processor.getId(); + } catch (Exception e) { + e.printStackTrace(); + } + return processorId; + } + + /** + * 将片段实例到指定位置 + * + * @param groupId 目标流程组id(需要拷贝到哪个流程组) + * @param parentGroupId 目标流程组的父流程组id() + * @param snippetId 片段id + */ + public String testCreateSnippetInstance(String groupId, String parentGroupId, String snippetId) throws Exception { + String processGroupId = null; + try { + //先请求得到代码片段id + ProcessGroupsInfoResult9 processGroups = nifiApiService.getProcessGroups(groupId); + Revision9 revision = processGroups.getRevision(); + + Map processGroupsMap2 = new HashMap<>(); + processGroupsMap2.put("version", revision.getVersion());//目标流程组版本号 + + Map> processGroupsMap = new HashMap<>(); + processGroupsMap.put(groupId, processGroupsMap2);//目标流程组id + +// Snippet snippet = new Snippet(); +// snippet.setParentGroupId(parentGroupId);//目标流程组id的父流程组id +// snippet.setProcessGroups(processGroupsMap); + +// SnippetsJoin snippetsJoin = new SnippetsJoin(); +// snippetsJoin.setSnippet(snippet); +// SnippetResult13 snippets = nifiApiService.createSnippets(snippetsJoin); +// Snippet13 snippet1 = snippets.getSnippet(); +// System.out.println(snippet1.getId()); + + //将片段实例化到指定位置 +// String groupId = "019610d1-f933-1f70-16ea-3cf005012c4b";//流程组id + SnippetInstanceJoin snippetInstanceJoin = new SnippetInstanceJoin(); + snippetInstanceJoin.setSnippetId(snippetId); +// snippetInstanceJoin.setOriginY("0"); +// snippetInstanceJoin.setOriginX("0"); + SnippetInstance15 snippetInstance = nifiApiService.createSnippetInstance(groupId, snippetInstanceJoin); + Flow15 flow = snippetInstance.getFlow(); + ProcessGroups15 processGroups15 = flow.getProcessGroups().get(0); +// System.out.println(processGroups15.getId()); + processGroupId = processGroups15.getId(); + } catch (Exception e) { + e.printStackTrace(); + } + return processGroupId; + } +} \ No newline at end of file diff --git a/base-buildpackage/src/test/java/com/hzya/frame/TestNifiLog.java b/base-buildpackage/src/test/java/com/hzya/frame/TestNifiLog.java index eaa19d60..57ba5fc4 100644 --- a/base-buildpackage/src/test/java/com/hzya/frame/TestNifiLog.java +++ b/base-buildpackage/src/test/java/com/hzya/frame/TestNifiLog.java @@ -3,7 +3,6 @@ package com.hzya.frame; import com.github.pagehelper.PageInfo; import com.hzya.frame.nifilog.dao.ILoggingEvent20250430Dao; import com.hzya.frame.nifilog.entity.LoggingEvent20250430Entity; -import com.hzya.frame.page.PageAttribute; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; diff --git a/base-buildpackage/src/test/java/com/hzya/frame/TestNifiServiceApi.java b/base-buildpackage/src/test/java/com/hzya/frame/TestNifiServiceApi.java index 95f6299c..d578dae1 100644 --- a/base-buildpackage/src/test/java/com/hzya/frame/TestNifiServiceApi.java +++ b/base-buildpackage/src/test/java/com/hzya/frame/TestNifiServiceApi.java @@ -12,6 +12,8 @@ import com.hzya.frame.nifiapi.model.joincreateconnections.CreateConnections18; import com.hzya.frame.nifiapi.model.joincreateconnections.Revision18; import com.hzya.frame.nifiapi.model.joincreateconnections.SourceOrDestination18; import com.hzya.frame.nifiapi.model.joincreatetemp.CreateTemplateJoin; +import com.hzya.frame.nifiapi.model.joingetallcontrollerservice.ControllerServices19; +import com.hzya.frame.nifiapi.model.joingetallcontrollerservice.GetAllController19; import com.hzya.frame.nifiapi.model.joingetcontroller.Component12; import com.hzya.frame.nifiapi.model.joingetcontroller.ControllerService12; import com.hzya.frame.nifiapi.model.joingetcontroller.Revision12; @@ -27,6 +29,7 @@ import com.hzya.frame.nifiapi.model.joinstartorstopprocessgroup.StartOrStopProce import com.hzya.frame.nifiapi.model.joinupdateprocessor.Revision17; import com.hzya.frame.nifiapi.model.joinupdateprocessor.RunStatusOrStop17; import com.hzya.frame.nifiapi.model.nifitemplates.NifiTemplates; +import com.hzya.frame.nifiapi.model.processgroupid.ProcessGroupsId; import com.hzya.frame.nifiapi.model.resultcreateprocessor.*; import com.hzya.frame.nifiapi.model.resultcreateprocessors.CreateProcess16; import com.hzya.frame.nifiapi.model.resultcreatesnippet.Flow15; @@ -94,8 +97,8 @@ public class TestNifiServiceApi { public void testQueryFlowProcessGroupsRoot() { try { // nifiApiService.queryFlowProcessGroupsRoot(null); - Object o = nifiApiService.queryFlowProcessGroupsRoot("41437b87-0196-1000-6951-50e41e75c0da"); - System.out.println(o); + ProcessGroupsId processGroupsId = nifiApiService.queryFlowProcessGroupsRoot("41437b87-0196-1000-6951-50e41e75c0da"); + System.out.println(processGroupsId); } catch (Exception e) { e.printStackTrace(); } @@ -231,12 +234,12 @@ public class TestNifiServiceApi { try { // ProcessGroupStatus processGroupStatus = new ProcessGroupStatus(); // processGroupStatus.setName("测试流程组"); - +// for (int i = 0; i < 100; i++) { Revision5 revision5 = new Revision5(); revision5.setVersion(0); Component7 component7 = new Component7(); - component7.setName("测试流程组"); + component7.setName("测试位置排列"); ProcessGroupsJoin processGroupsJoin = new ProcessGroupsJoin(); // processGroupsJoin.setStatus(processGroupStatus); @@ -244,9 +247,10 @@ public class TestNifiServiceApi { processGroupsJoin.setComponent(component7); //父处理器组 - String parentGroupId = "76e23115-ad84-1aab-7630-21c76aa3973d"; + String parentGroupId = "019611c2-f933-1f70-3bde-71084293c3e4"; ProcessgroupsResult processGroups = nifiApiService.createProcessGroups(parentGroupId, processGroupsJoin); System.out.println("流程组id:" + processGroups.getId()); +// } } catch (Exception e) { e.printStackTrace(); } @@ -348,6 +352,56 @@ public class TestNifiServiceApi { } } + /** + * 修改某个控制器服务 + */ + @Test + public void updateCreateControllerServices() { + try { + //查询控制器详情,带版本号 + ControllerService12 controllerServices = nifiApiService.getControllerServices("019612d0-f933-1f70-424e-014d36a06b10"); + Revision12 revision = controllerServices.getRevision(); + + Revision12 revision12 = new Revision12(); + revision12.setVersion(revision.getVersion()); + + //创建Mysql数据源 + Map properties = new HashMap<>(); + properties.put("Database Connection URL", "jdbc:mysql://ufidahz.com.cn:9015/apache_nifi_sync?useSSL=false&serverTimezone=Asia/Shanghai&characterEncoding=utf8&useLegacyDatetimeCode=false"); + properties.put("Database Driver Class Name", "com.mysql.cj.jdbc.Driver"); + properties.put("database-driver-locations", "/data/nifi-1.28.1/lib/mysql-connector-java-8.0.30.jar"); + properties.put("Database User", "root"); + properties.put("Password", "hzya1314"); + properties.put("Max Wait Time", "600001 millis"); + properties.put("Max Total Connections", "15"); + properties.put("Validation-query", "select 1"); + properties.put("dbcp-min-idle-conns", "5"); + properties.put("dbcp-max-idle-conns", "10"); + properties.put("dbcp-max-conn-lifetime", "-1"); + properties.put("dbcp-time-between-eviction-runs", "-1"); + properties.put("dbcp-min-evictable-idle-time", "30 mins"); + properties.put("dbcp-soft-min-evictable-idle-time", "-1"); + + Component12 component12 = new Component12(); + component12.setName("Mysql数据源"); + component12.setType("org.apache.nifi.dbcp.DBCPConnectionPool"); + component12.setState("ENABLED"); + component12.setProperties(properties); + component12.setComments("一个备注"); + component12.setBulletinLevel("DEBUG"); + component12.setId(controllerServices.getId()); + + ControllerService12 controllerService11 = new ControllerService12(); + controllerService11.setComponent(component12); + controllerService11.setRevision(revision12); + controllerService11.setId(controllerServices.getId()); + ControllerService12 controllerService12 = nifiApiService.updateControllerServices("019612d0-f933-1f70-424e-014d36a06b10", controllerService11); + System.out.println(controllerService12.getId()); + } catch (Exception e) { + e.printStackTrace(); + } + } + /** * 控制器服务-Mysql数据源 */ @@ -386,7 +440,7 @@ public class TestNifiServiceApi { ControllerService12 controllerService11 = new ControllerService12(); controllerService11.setComponent(component12); controllerService11.setRevision(revision12); - ControllerService12 controllerServices = nifiApiService.createControllerServices("d7f1b14d-0196-1000-ce43-7499a84f6096", controllerService11); + ControllerService12 controllerServices = nifiApiService.createControllerServices("76e23115-ad84-1aab-7630-21c76aa3973d", controllerService11); System.out.println(controllerServices.getId()); } catch (Exception e) { e.printStackTrace(); @@ -785,6 +839,56 @@ public class TestNifiServiceApi { } } + /** + * 创建LogAttribute处理器 + */ + @Test + public void testCreateProcessor2() throws Exception { + try { +// for (int i = 0; i < 100; i++) { + String parentGroupId = "019610d1-f933-1f70-16ea-3cf005012c4b"; + double[] doubles = nifiApiService.calculatePosition(parentGroupId); + + Revision15 revision15 = new Revision15(); + revision15.setVersion("0"); + + Component15 component15 = new Component15(); + component15.setType("org.apache.nifi.processors.standard.LogAttribute"); + component15.setName("结束节点"); + + Position15 position15 = new Position15(); + position15.setX(String.valueOf(doubles[0])); + position15.setY(String.valueOf(doubles[1])); + + Properties15 properties15 = new Properties15(); + properties15.setLogLevel("info"); + properties15.setLogPayload("false"); + properties15.setAttributesToLogRegex(".*"); + properties15.setLogFlowFileProperties("true"); + properties15.setOutputFormat("Line per Attribute"); + properties15.setCharacterSet("UTF-8"); + + Config15 config15 = new Config15(); + config15.setProperties(properties15); + config15.setBulletinLevel("DEBUG"); + + component15.setConfig(config15); + component15.setPosition(position15); + + CreateProcessorJoin15 createProcessorJoin15 = new CreateProcessorJoin15(); + createProcessorJoin15.setComponent(component15); + createProcessorJoin15.setRevision(revision15); + + String groupId = "019610d1-f933-1f70-16ea-3cf005012c4b"; + CreateProcess16 processor = nifiApiService.createProcessor(groupId, createProcessorJoin15); +// String id = processor.getId(); + System.out.println("处理器id:" + processor.getId()); +// } + } catch (Exception e) { + e.printStackTrace(); + } + } + /** * 查询处理器详情 */ @@ -792,7 +896,7 @@ public class TestNifiServiceApi { public void testGetProcessor() throws Exception { try { // String processId = "01961137-f933-1f70-4f08-6fc7041f99bd"; - String processId2 = "ad841aed-30f7-16e2-4fa0-595ab0ab9fdc"; + String processId2 = "7f70897a-0196-1000-241a-3a7cdf5bf521"; ProcessorsInfo16 processor = nifiApiService.getProcessor(processId2); String id = processor.getId(); System.out.println(id); @@ -927,4 +1031,60 @@ public class TestNifiServiceApi { e.printStackTrace(); } } + + /** + * 测试组件坐标计算 + */ + @Test + public void testCalculatePosition() throws Exception { + try { + String parentGroupId = "76e23100-ad84-1aab-4447-b9ee936ee7d5"; + double[] doubles = nifiApiService.calculatePosition(parentGroupId); + System.out.println(doubles); + } catch (Exception e) { + e.printStackTrace(); + } + } + + /** + * 测试查询流程组内全部的控制器 + */ + @Test + public void testGetAllControllerServices() throws Exception { + try { + String processGroupId = "4143d803-0196-1000-968f-b06f55b43a3f"; + GetAllController19 allControllerServices = nifiApiService.getAllControllerServices(processGroupId); + List controllerServices = allControllerServices.getControllerServices(); + System.out.println(controllerServices.size()); + } catch (Exception e) { + e.printStackTrace(); + } + } + + /** + * 查询流程组详情 + */ + @Test + public void testGetProcessGroups() throws Exception { + try { + ProcessGroupsInfoResult9 processGroups = nifiApiService.getProcessGroups("41437b87-0196-1000-6951-50e41e75c0da"); + System.out.println(processGroups.getId()); + } catch (Exception e) { + e.printStackTrace(); + } + } + + /** + * 查询某个流程组内的子流程组 + * + * @throws Exception + */ +// @Test +// public void testGetSubProcessGroups() throws Exception { +// try { +// nifiApiService.getSubProcessGroups("41437b87-0196-1000-6951-50e41e75c0da"); +// } catch (Exception e) { +// e.printStackTrace(); +// } +// } } \ No newline at end of file