diff --git a/base-buildpackage/src/test/java/com/hzya/frame/TestINifiBaseOperation.java b/base-buildpackage/src/test/java/com/hzya/frame/TestINifiBaseOperation.java index 57cdab46..2c7c1e04 100644 --- a/base-buildpackage/src/test/java/com/hzya/frame/TestINifiBaseOperation.java +++ b/base-buildpackage/src/test/java/com/hzya/frame/TestINifiBaseOperation.java @@ -1,8 +1,12 @@ package com.hzya.frame; +import com.hzya.frame.nifiapi.model.joincreateprocessorandupdateparam.CreateProcessorAndUpdateParamJoin; import com.hzya.frame.nifiapi.model.joingetallcontrollerservice.Component19; import com.hzya.frame.nifiapi.model.joingetallcontrollerservice.ControllerServices19; import com.hzya.frame.nifiapi.model.resultNeedModifyController.PortFilterResult; +import com.hzya.frame.nifiapi.model.resultprocessorsinfo.Component16; +import com.hzya.frame.nifiapi.model.resultprocessorsinfo.Config16; +import com.hzya.frame.nifiapi.operation.INifiAppOperation; import com.hzya.frame.nifiapi.operation.INifiBaseOperation; import org.junit.Test; import org.junit.runner.RunWith; @@ -10,7 +14,9 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; +import java.util.HashMap; import java.util.List; +import java.util.Map; /** * @Author:liuyang @@ -27,6 +33,9 @@ public class TestINifiBaseOperation { @Autowired private INifiBaseOperation iNifiBaseOperation; + @Autowired + private INifiAppOperation iNifiAppOperation; + @Test public void testGetNeedModifyControllerServicesByProcessGroup() throws Exception { try { @@ -63,4 +72,65 @@ public class TestINifiBaseOperation { e.printStackTrace(); } } + + @Test + public void testCopyProcessor() throws Exception { + try { + String targetProcessorId = "7f70897a-0196-1000-241a-3a7cdf5bf521"; + String parentProcessGroupId = "76e23100-ad84-1aab-4447-b9ee936ee7d5"; + String copyTargetProcessGroupId = "76e23115-ad84-1aab-7630-21c76aa3973d"; + String s = iNifiBaseOperation.copyProcessor(targetProcessorId, parentProcessGroupId, copyTargetProcessGroupId); + System.out.println("新处理器id:" + s); + } catch (Exception e) { + e.printStackTrace(); + } + } + + @Test + public void testCreateProcessorAndUpdateParam() { + try { + Map properties = new HashMap<>(); + properties.put("File Size", "1B"); + properties.put("Batch Size", "2"); + properties.put("Data Format", "Binary"); + properties.put("Unique FlowFiles", "true"); + properties.put("character-set", "GBK"); + + Config16 config16 = new Config16(); + config16.setProperties(properties); + + Component16 component16 = new Component16(); + component16.setName("修改处理器名称2"); + component16.setConfig(config16); + + CreateProcessorAndUpdateParamJoin createProcessorAndUpdateParamJoin = new CreateProcessorAndUpdateParamJoin(); + createProcessorAndUpdateParamJoin.setComponent(component16); + createProcessorAndUpdateParamJoin.setTargetProcessorId("7f70897a-0196-1000-241a-3a7cdf5bf521"); + createProcessorAndUpdateParamJoin.setCopyTargetProcessGroupId("76e23115-ad84-1aab-7630-21c76aa3973d"); + createProcessorAndUpdateParamJoin.setParentProcessGroupId("76e23100-ad84-1aab-4447-b9ee936ee7d5"); + + String processorAndUpdateParam = iNifiAppOperation.createProcessorAndUpdateParam(createProcessorAndUpdateParamJoin); + System.out.println("新处理器id:" + processorAndUpdateParam); + } catch (Exception e) { + e.printStackTrace(); + } + } + + @Test + public void testgGetFindStateClearMarkProcessor() throws Exception { + try { + iNifiBaseOperation.getFindStateClearMarkProcessor("41437b87-0196-1000-6951-50e41e75c0da", "接口状态清理标记"); + } catch (Exception e) { + e.printStackTrace(); + } + } + + @Test + public void testBatchEnabledControllerServices() throws Exception { + try { + iNifiBaseOperation.batchEnabledControllerServices("4143d803-0196-1000-968f-b06f55b43a3f", "xxxx"); + } catch (Exception e) { + e.printStackTrace(); + } + } } diff --git a/base-buildpackage/src/test/java/com/hzya/frame/TestNifiDbSync.java b/base-buildpackage/src/test/java/com/hzya/frame/TestNifiDbSync.java index 4fa25848..82ca91a1 100644 --- a/base-buildpackage/src/test/java/com/hzya/frame/TestNifiDbSync.java +++ b/base-buildpackage/src/test/java/com/hzya/frame/TestNifiDbSync.java @@ -6,6 +6,7 @@ import com.hzya.frame.nifiapi.model.joincreateconnections.Revision18; import com.hzya.frame.nifiapi.model.joincreateconnections.SourceOrDestination18; import com.hzya.frame.nifiapi.model.joincreateoracleapp.CreateAppInstanceJoin; import com.hzya.frame.nifiapi.model.joincreateprocessconnection.CreateProcessorConnections; +import com.hzya.frame.nifiapi.model.joincreateprocessorandupdateparam.CreateProcessorAndUpdateParamJoin; import com.hzya.frame.nifiapi.model.joincreparamcontext.CreateParamContextJoin; import com.hzya.frame.nifiapi.model.joinfindneedmodifycontroller.FindNeedModifyController; import com.hzya.frame.nifiapi.model.joingetcontroller.Component12; @@ -40,10 +41,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; /** * @Author:liuyang @@ -77,7 +75,12 @@ public class TestNifiDbSync { String newProcessGroupsId = testCreateProcessGroups(parentProcessGroupId, processGroupsName); //新建GenerateFlowFile处理器(开始的处理器) - String generateFlowFileId = testCreateProcessor(newProcessGroupsId); +// String generateFlowFileId = testCreateProcessor(newProcessGroupsId); + CreateProcessorAndUpdateParamJoin createProcessorAndUpdateParamJoin = new CreateProcessorAndUpdateParamJoin(); + createProcessorAndUpdateParamJoin.setTargetProcessorId("02d23f3c-7003-1a1b-1cdc-e9f37874bd78"); + createProcessorAndUpdateParamJoin.setParentProcessGroupId("76e23100-ad84-1aab-4447-b9ee936ee7d5"); + createProcessorAndUpdateParamJoin.setCopyTargetProcessGroupId(newProcessGroupsId); + String generateFlowFileId = nifiOperation.createProcessorAndUpdateParam(createProcessorAndUpdateParamJoin); //创建Oracle应用 //初始化实例 @@ -86,11 +89,11 @@ public class TestNifiDbSync { createOracleApp.setParentProcessGroupId("76e23100-ad84-1aab-4447-b9ee936ee7d5"); createOracleApp.setCopyTargetProcessGroupId(newProcessGroupsId); //上下文 - CreateParamContextJoin createParamContextJoin = setOracleParamContextJoin(newProcessGroupsId); + CreateParamContextJoin createParamContextJoin = setOracleParamContextJoin(); createOracleApp.setCreateParamContextJoin(createParamContextJoin); //控制器 - FindNeedModifyController findNeedModifyController = setOracleFindNeedModifyController(newProcessGroupsId); - createOracleApp.setFindNeedModifyController(findNeedModifyController); +// FindNeedModifyController findNeedModifyController = setOracleFindNeedModifyController(newProcessGroupsId); +// createOracleApp.setFindNeedModifyController(findNeedModifyController); CreateAppInstanceResult oracleAppContext = nifiOperation.createAppContext(createOracleApp); //创建Mysql应用 @@ -103,18 +106,26 @@ public class TestNifiDbSync { CreateParamContextJoin createParamContextJoin2 = setMysqlParamContextJoin(newProcessGroupsId); createOracleApp2.setCreateParamContextJoin(createParamContextJoin2); //控制器 - FindNeedModifyController findNeedModifyController2 = setMysqlFindNeedModifyController(newProcessGroupsId); - createOracleApp2.setFindNeedModifyController(findNeedModifyController2); +// FindNeedModifyController findNeedModifyController2 = setMysqlFindNeedModifyController(newProcessGroupsId); +// createOracleApp2.setFindNeedModifyController(findNeedModifyController2); CreateAppInstanceResult mysqlAppContext = nifiOperation.createAppContext(createOracleApp2); //新建LogAttribute处理器(结束处理器) - String logAttributeId = testCreateProcessor2(newProcessGroupsId); +// String logAttributeId = testCreateProcessor2(); +// String logAttributeId = null; + CreateProcessorAndUpdateParamJoin createProcessorAndUpdateParamJoin2 = new CreateProcessorAndUpdateParamJoin(); + createProcessorAndUpdateParamJoin2.setTargetProcessorId("7f70897a-0196-1000-241a-3a7cdf5bf521"); + createProcessorAndUpdateParamJoin2.setParentProcessGroupId("76e23100-ad84-1aab-4447-b9ee936ee7d5"); + createProcessorAndUpdateParamJoin2.setCopyTargetProcessGroupId(newProcessGroupsId); + String logAttributeId = nifiOperation.createProcessorAndUpdateParam(createProcessorAndUpdateParamJoin2); //查找对应应用的连接标记 //TYPE=INPUT_PORT - PortFilterResult oracleMark = iNifiBaseOperation.getNeedModifyControllerRelationshipByProcessGroup(oracleAppContext.getNewProcessGroupId(), null); + PortFilterResult oracleMarkInput = iNifiBaseOperation.getNeedModifyControllerRelationshipByProcessGroup(oracleAppContext.getNewProcessGroupId(), "f5228bc8-0360-41eb-a640-9f4df953937c"); + PortFilterResult oracleMarkOutPut = iNifiBaseOperation.getNeedModifyControllerRelationshipByProcessGroup(oracleAppContext.getNewProcessGroupId(), "4929a9ac-291a-4ea2-a4b1-286ddf47164a"); //TYPE=OUTPUT_PORT - PortFilterResult mysqlMark = iNifiBaseOperation.getNeedModifyControllerRelationshipByProcessGroup(mysqlAppContext.getNewProcessGroupId(), null); + PortFilterResult mysqlMarkInput = iNifiBaseOperation.getNeedModifyControllerRelationshipByProcessGroup(mysqlAppContext.getNewProcessGroupId(), "be05a6ad-a440-43ed-b4c0-d1edcf03334f"); + PortFilterResult mysqlMarkOutput = iNifiBaseOperation.getNeedModifyControllerRelationshipByProcessGroup(mysqlAppContext.getNewProcessGroupId(), "7ac85368-f2c9-48df-ab5b-ea610449083d"); //批量创建连接关系 //连接关系一:开始处理器->Oracle应用 @@ -124,7 +135,7 @@ public class TestNifiDbSync { source.setType("PROCESSOR"); SourceOrDestination18 destination = new SourceOrDestination18(); - destination.setId(oracleMark.getInputPorts().get(0).getId()); + destination.setId(oracleMarkInput.getInputPorts().get(0).getId()); destination.setGroupId(oracleAppContext.getNewProcessGroupId()); destination.setType("INPUT_PORT"); @@ -148,12 +159,12 @@ public class TestNifiDbSync { //连接关系二:Oracle应用->Mysql应用 SourceOrDestination18 source2 = new SourceOrDestination18(); - source2.setId(oracleMark.getOutputPorts().get(0).getId()); + source2.setId(oracleMarkOutPut.getOutputPorts().get(0).getId()); source2.setGroupId(oracleAppContext.getNewProcessGroupId()); source2.setType("OUTPUT_PORT"); SourceOrDestination18 destination2 = new SourceOrDestination18(); - destination2.setId(mysqlMark.getInputPorts().get(0).getId()); + destination2.setId(mysqlMarkInput.getInputPorts().get(0).getId()); destination2.setGroupId(mysqlAppContext.getNewProcessGroupId()); destination2.setType("INPUT_PORT"); @@ -175,7 +186,7 @@ public class TestNifiDbSync { //连接关系三:Mysql应用->结束处理器 SourceOrDestination18 source3 = new SourceOrDestination18(); - source3.setId(mysqlMark.getOutputPorts().get(0).getId()); + source3.setId(mysqlMarkOutput.getOutputPorts().get(0).getId()); source3.setGroupId(mysqlAppContext.getNewProcessGroupId()); source3.setType("OUTPUT_PORT"); @@ -205,6 +216,12 @@ public class TestNifiDbSync { createProcessorConnections.setCreateConnections18(createConnections18List); iNifiBaseOperation.batchCreateProcessorConnections(createProcessorConnections); + //去掉流程组内对应的处理器旧状态 +// List stringList = new ArrayList<>(); +// stringList.add(oracleAppContext.getNewProcessGroupId()); +// stringList.add(mysqlAppContext.getNewProcessGroupId()); +// iNifiBaseOperation.clearProcessGroupState(stringList); + //启动业务处理器 ProcessGroupsInfoResult9 processGroups = nifiApiService.getProcessGroups(newProcessGroupsId); Revision9 revision = processGroups.getRevision(); @@ -221,7 +238,7 @@ public class TestNifiDbSync { } - public CreateParamContextJoin setOracleParamContextJoin(String processGroupsId) throws Exception { + public CreateParamContextJoin setOracleParamContextJoin() throws Exception { Revision revision = new Revision(); revision.setVersion(0); @@ -229,31 +246,31 @@ public class TestNifiDbSync { parameter.setSensitive(false); parameter.setName("additionalWhereClause"); parameter.setValue("1=1"); - parameter.setDescription("参数描述"); + parameter.setDescription("where条件"); Parameter2 parameter2 = new Parameter2(); parameter2.setSensitive(false); parameter2.setName("columnsToReturn"); - parameter2.setValue("CSALEID,VRECEIPTCODE,NHEADSUMMNY,TS"); - parameter2.setDescription("参数描述"); + parameter2.setValue("pk_invbasdoc,invcode,ts"); + parameter2.setDescription("需要查询的表字段"); Parameter2 parameter3 = new Parameter2(); parameter3.setSensitive(false); parameter3.setName("customOrderByColumn"); parameter3.setValue("ts asc"); - parameter3.setDescription("参数描述"); + parameter3.setDescription("排序规则"); Parameter2 parameter4 = new Parameter2(); parameter4.setSensitive(false); parameter4.setName("maximumValueColumns"); parameter4.setValue("ts"); - parameter4.setDescription("参数描述"); + parameter4.setDescription("分片字段"); Parameter2 parameter5 = new Parameter2(); parameter5.setSensitive(false); parameter5.setName("sourceTbName"); - parameter5.setValue("U8C241231.SO_SALE"); - parameter5.setDescription("参数描述"); + parameter5.setValue("U8C241231.BD_INVBASDOC"); + parameter5.setDescription("查询表名称"); Parameters2 parameters = new Parameters2(); parameters.setParameter(parameter); @@ -278,7 +295,7 @@ public class TestNifiDbSync { parametersList.add(parameters5); Component3 component = new Component3(); - component.setName("oracle参数环境2505221548"); + component.setName("oracle参数环境2505241627"); component.setParameters(parametersList); ParameterContextsJoin parameterContextsJoin = new ParameterContextsJoin(); @@ -287,7 +304,7 @@ public class TestNifiDbSync { CreateParamContextJoin createParamContextJoin = new CreateParamContextJoin(); createParamContextJoin.setParameterContextsJoin(parameterContextsJoin); - createParamContextJoin.setProcessGroupsId(processGroupsId); +// createParamContextJoin.setProcessGroupsId(processGroupsId); return createParamContextJoin; } @@ -295,23 +312,29 @@ public class TestNifiDbSync { Revision revision = new Revision(); revision.setVersion(0); - Parameter2 parameter = new Parameter2(); - parameter.setSensitive(false); - parameter.setName("Release Signal Identifier"); - parameter.setValue("table_creation"); - parameter.setDescription("参数描述"); +// Parameter2 parameter = new Parameter2(); +// parameter.setSensitive(false); +// parameter.setName("Release Signal Identifier"); +// parameter.setValue("table_creation"); +// parameter.setDescription("参数描述"); Parameter2 parameter2 = new Parameter2(); parameter2.setSensitive(false); parameter2.setName("autoTableCreation"); parameter2.setValue("Y"); - parameter2.setDescription("参数描述"); + parameter2.setDescription("是否自动建表"); Parameter2 parameter3 = new Parameter2(); parameter3.setSensitive(false); parameter3.setName("mappingRelationship"); - parameter3.setValue("[\n" + " {\n" + " \"targetTbName\": \"iep_so_sale\",\n" + " \"targetDBType\": \"mysql8.0.44\",\n" + " \"writeType\": \"1\",\n" + " \"fieldRelationship\": [\n" + " {\n" + " \"sourceFieldName\": \"CSALEID\",\n" + " \"sourceFieldType\": \"CHAR(20)\",\n" + " \"targetFieldName\": \"csaleid\",\n" + " \"targetFieldType\": \"varchar(20)\",\n" + " \"targetFieldNameNotes\": \"销售主表id\",\n" + " \"primaryKey\": \"Y\"\n" + " },\n" + " {\n" + " \"sourceFieldName\": \"VRECEIPTCODE\",\n" + " \"sourceFieldType\": \"VARCHAR(30)\",\n" + " \"targetFieldName\": \"vreceiptcode\",\n" + " \"targetFieldType\": \"varchar(20)\",\n" + " \"targetFieldNameNotes\": \"单据号\"\n" + " },\n" + " {\n" + " \"sourceFieldName\": \"NHEADSUMMNY\",\n" + " \"sourceFieldType\": \"DECIMAL(13,8)\",\n" + " \"targetFieldName\": \"nheadsummny\",\n" + " \"targetFieldType\": \"decimal(20,10)\",\n" + " \"targetFieldNameNotes\": \"金额合计\"\n" + " },\n" + " {\n" + " \"sourceFieldName\": \"TS\",\n" + " \"sourceFieldType\": \"CHAR(19)\",\n" + " \"targetFieldName\": \"ts\",\n" + " \"targetFieldType\": \"varchar(20)\",\n" + " \"targetFieldNameNotes\": \"时间戳\"\n" + " }\n" + " ]\n" + " }\n" + " ]"); - parameter3.setDescription("参数描述"); + parameter3.setValue("[\n" + " {\n" + " \"targetTbName\": \"iep_bd_invbasdoc\",\n" + " \"targetDBType\": \"mysql8.0.44\",\n" + " \"writeType\": \"1\",\n" + " \"fieldRelationship\": [\n" + " {\n" + " \"sourceFieldName\": \"PK_INVBASDOC\",\n" + " \"sourceFieldType\": \"CHAR(20)\",\n" + " \"targetFieldName\": \"pk_invbasdoc\",\n" + " \"targetFieldType\": \"varchar(20)\",\n" + " \"targetFieldNameNotes\": \"存货档案id\",\n" + " \"primaryKey\": \"Y\"\n" + " },\n" + " {\n" + " \"sourceFieldName\": \"INVCODE\",\n" + " \"sourceFieldType\": \"VARCHAR(30)\",\n" + " \"targetFieldName\": \"invcode\",\n" + " \"targetFieldType\": \"varchar(20)\",\n" + " \"targetFieldNameNotes\": \"单据号\"\n" + " },\n" + " {\n" + " \"sourceFieldName\": \"TS\",\n" + " \"sourceFieldType\": \"CHAR(19)\",\n" + " \"targetFieldName\": \"ts\",\n" + " \"targetFieldType\": \"varchar(20)\",\n" + " \"targetFieldNameNotes\": \"时间戳\"\n" + " }\n" + " ]\n" + " }\n" + "]"); + parameter3.setDescription("字段对照关系"); + + Parameter2 parameter4 = new Parameter2(); + parameter4.setSensitive(false); + parameter4.setName("releaseSignalIdentifier"); + parameter4.setValue(UUID.randomUUID().toString()); + parameter4.setDescription("建表信号,传随机值!"); // Parameter2 parameter4 = new Parameter2(); // parameter4.setSensitive(false); @@ -326,7 +349,7 @@ public class TestNifiDbSync { // parameter5.setDescription("参数描述"); Parameters2 parameters = new Parameters2(); - parameters.setParameter(parameter); + parameters.setParameter(parameter4); Parameters2 parameters2 = new Parameters2(); parameters2.setParameter(parameter2); @@ -348,7 +371,7 @@ public class TestNifiDbSync { // parametersList.add(parameters5); Component3 component = new Component3(); - component.setName("mysql参数环境2505230915"); + component.setName("mysql参数环境2505241627"); component.setParameters(parametersList); ParameterContextsJoin parameterContextsJoin = new ParameterContextsJoin(); @@ -478,7 +501,8 @@ public class TestNifiDbSync { String mysqlProcessGroupId = testCreateSnippetInstance(processGroupsId, parentProcessGroupId, mysqlSnippetId); //新建LogAttribute处理器(结束处理器) - String logAttributeId = testCreateProcessor2(processGroupsId); +// String logAttributeId = testCreateProcessor2(); + String logAttributeId = null; //修改Oracle应用数据源 //修改Oracle应用上下文环境 @@ -596,8 +620,9 @@ public class TestNifiDbSync { /** * 创建LogAttribute处理器 */ - public String testCreateProcessor2(String groupId) throws Exception { - String processorId = null; + @Test + public void testCreateProcessor2() throws Exception { +// String processorId = null; try { Revision15 revision15 = new Revision15(); revision15.setVersion("0"); @@ -625,18 +650,28 @@ public class TestNifiDbSync { component15.setConfig(config15); // component15.setPosition(position15); + //定义连接关系 + Relationships15 relationships15 = new Relationships15(); + relationships15.setName("success"); + relationships15.setAutoTerminate(true); +// relationships15.setRetry(true); +// relationships15.setDescription("123"); + List relationships15List = new ArrayList<>(); + relationships15List.add(relationships15); + component15.setRelationships(relationships15List); + CreateProcessorJoin15 createProcessorJoin15 = new CreateProcessorJoin15(); createProcessorJoin15.setComponent(component15); createProcessorJoin15.setRevision(revision15); -// String groupId = "019610d1-f933-1f70-16ea-3cf005012c4b"; + String groupId = "76e23115-ad84-1aab-7630-21c76aa3973d"; CreateProcess16 processor = nifiApiService.createProcessor(groupId, createProcessorJoin15); // System.out.println("处理器id:" + processor.getId()); - processorId = processor.getId(); +// processorId = processor.getId(); } catch (Exception e) { e.printStackTrace(); } - return processorId; +// return processorId; } /** diff --git a/base-buildpackage/src/test/java/com/hzya/frame/TestNifiServiceApi.java b/base-buildpackage/src/test/java/com/hzya/frame/TestNifiServiceApi.java index d578dae1..837db505 100644 --- a/base-buildpackage/src/test/java/com/hzya/frame/TestNifiServiceApi.java +++ b/base-buildpackage/src/test/java/com/hzya/frame/TestNifiServiceApi.java @@ -896,7 +896,7 @@ public class TestNifiServiceApi { public void testGetProcessor() throws Exception { try { // String processId = "01961137-f933-1f70-4f08-6fc7041f99bd"; - String processId2 = "7f70897a-0196-1000-241a-3a7cdf5bf521"; + String processId2 = "02d23f3c-7003-1a1b-1cdc-e9f37874bd78"; ProcessorsInfo16 processor = nifiApiService.getProcessor(processId2); String id = processor.getId(); System.out.println(id); @@ -1087,4 +1087,12 @@ public class TestNifiServiceApi { // e.printStackTrace(); // } // } + @Test + public void testClearProcessorRequests() throws Exception { + try { + nifiApiService.clearProcessorRequests("019617a7-f933-1f70-9747-dac20033270f"); + } catch (Exception e) { + e.printStackTrace(); + } + } } \ No newline at end of file