From f9eb0bdea3abb4ebc1bcf785f026a64d33bf2f6b Mon Sep 17 00:00:00 2001 From: liuy <37787198+LiuyCodes@users.noreply.github.com> Date: Wed, 28 May 2025 16:07:44 +0800 Subject: [PATCH] =?UTF-8?q?feat(base-buildpackage):=20=E4=BC=98=E5=8C=96?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=E5=90=8C=E6=AD=A5=E5=8F=82=E6=95=B0=E5=8C=96?= =?UTF-8?q?=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增自动建表功能,增加相关连接和参数配置 - 更新 Oracle 和 MySQL 参数配置,适应新的数据同步需求 - 优化参数环境名称生成逻辑,使用当前时间作为后缀 - 调整字段映射关系配置,提高数据同步准确性 - 注释掉不必要的参数配置,简化配置结构 --- .../java/com/hzya/frame/TestNifiDbSync.java | 86 +++++++++++++------ 1 file changed, 62 insertions(+), 24 deletions(-) diff --git a/base-buildpackage/src/test/java/com/hzya/frame/TestNifiDbSync.java b/base-buildpackage/src/test/java/com/hzya/frame/TestNifiDbSync.java index 82ca91a1..6ee5a721 100644 --- a/base-buildpackage/src/test/java/com/hzya/frame/TestNifiDbSync.java +++ b/base-buildpackage/src/test/java/com/hzya/frame/TestNifiDbSync.java @@ -41,6 +41,8 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; import java.util.*; /** @@ -126,6 +128,8 @@ public class TestNifiDbSync { //TYPE=OUTPUT_PORT PortFilterResult mysqlMarkInput = iNifiBaseOperation.getNeedModifyControllerRelationshipByProcessGroup(mysqlAppContext.getNewProcessGroupId(), "be05a6ad-a440-43ed-b4c0-d1edcf03334f"); PortFilterResult mysqlMarkOutput = iNifiBaseOperation.getNeedModifyControllerRelationshipByProcessGroup(mysqlAppContext.getNewProcessGroupId(), "7ac85368-f2c9-48df-ab5b-ea610449083d"); + //查找Mysql应用的输入端口 + PortFilterResult automaticTableCreation = iNifiBaseOperation.getNeedModifyControllerRelationshipByProcessGroup(mysqlAppContext.getNewProcessGroupId(), "9480f217-9d94-4dcd-ba80-f718cc6e59db"); //批量创建连接关系 //连接关系一:开始处理器->Oracle应用 @@ -211,6 +215,33 @@ public class TestNifiDbSync { createConnections173.setRevision(revision183); createConnections18List.add(createConnections173); + //连接关系四:如果存在自动建表,则需要从开始节点-连接到Mysql应用的"自动建表"的输入端口 + List selectedRelationships2 = new ArrayList<>(); + selectedRelationships2.add("success"); + + SourceOrDestination18 source4 = new SourceOrDestination18(); + source4.setId(generateFlowFileId); + source4.setGroupId(newProcessGroupsId); + source4.setType("PROCESSOR"); + + SourceOrDestination18 destination4 = new SourceOrDestination18(); + destination4.setId(automaticTableCreation.getInputPorts().get(0).getId()); + destination4.setGroupId(mysqlAppContext.getNewProcessGroupId()); + destination4.setType("INPUT_PORT"); + + Component18 component184 = new Component18(); + component184.setSource(source4); + component184.setDestination(destination4); + component184.setSelectedRelationships(selectedRelationships2); + + Revision18 revision184 = new Revision18(); + revision184.setVersion("0");//默认传递0 + + CreateConnections18 createConnections174 = new CreateConnections18(); + createConnections174.setComponent(component184); + createConnections174.setRevision(revision184); + createConnections18List.add(createConnections174); + CreateProcessorConnections createProcessorConnections = new CreateProcessorConnections(); createProcessorConnections.setProcessGroupsId(newProcessGroupsId); createProcessorConnections.setCreateConnections18(createConnections18List); @@ -251,14 +282,14 @@ public class TestNifiDbSync { Parameter2 parameter2 = new Parameter2(); parameter2.setSensitive(false); parameter2.setName("columnsToReturn"); - parameter2.setValue("pk_invbasdoc,invcode,ts"); + parameter2.setValue("csaleid,vreceiptcode,ts"); parameter2.setDescription("需要查询的表字段"); Parameter2 parameter3 = new Parameter2(); parameter3.setSensitive(false); parameter3.setName("customOrderByColumn"); - parameter3.setValue("ts asc"); - parameter3.setDescription("排序规则"); + parameter3.setValue("ts,csaleid asc"); + parameter3.setDescription("排序规则,如果有增量同步需求,一定要通过时间+主键"); Parameter2 parameter4 = new Parameter2(); parameter4.setSensitive(false); @@ -269,7 +300,7 @@ public class TestNifiDbSync { Parameter2 parameter5 = new Parameter2(); parameter5.setSensitive(false); parameter5.setName("sourceTbName"); - parameter5.setValue("U8C241231.BD_INVBASDOC"); + parameter5.setValue("U8C241231.SO_SALE"); parameter5.setDescription("查询表名称"); Parameters2 parameters = new Parameters2(); @@ -295,7 +326,7 @@ public class TestNifiDbSync { parametersList.add(parameters5); Component3 component = new Component3(); - component.setName("oracle参数环境2505241627"); + component.setName("oracle参数环境" + createParamTime()); component.setParameters(parametersList); ParameterContextsJoin parameterContextsJoin = new ParameterContextsJoin(); @@ -308,6 +339,13 @@ public class TestNifiDbSync { return createParamContextJoin; } + public static String createParamTime() { + LocalDateTime now = LocalDateTime.now(); + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyMMddHHmm"); + String formattedTime = now.format(formatter); + return formattedTime; + } + public CreateParamContextJoin setMysqlParamContextJoin(String processGroupsId) throws Exception { Revision revision = new Revision(); revision.setVersion(0); @@ -318,23 +356,23 @@ public class TestNifiDbSync { // parameter.setValue("table_creation"); // parameter.setDescription("参数描述"); - Parameter2 parameter2 = new Parameter2(); - parameter2.setSensitive(false); - parameter2.setName("autoTableCreation"); - parameter2.setValue("Y"); - parameter2.setDescription("是否自动建表"); +// Parameter2 parameter2 = new Parameter2(); +// parameter2.setSensitive(false); +// parameter2.setName("autoTableCreation"); +// parameter2.setValue("Y"); +// parameter2.setDescription("是否自动建表"); Parameter2 parameter3 = new Parameter2(); parameter3.setSensitive(false); parameter3.setName("mappingRelationship"); - parameter3.setValue("[\n" + " {\n" + " \"targetTbName\": \"iep_bd_invbasdoc\",\n" + " \"targetDBType\": \"mysql8.0.44\",\n" + " \"writeType\": \"1\",\n" + " \"fieldRelationship\": [\n" + " {\n" + " \"sourceFieldName\": \"PK_INVBASDOC\",\n" + " \"sourceFieldType\": \"CHAR(20)\",\n" + " \"targetFieldName\": \"pk_invbasdoc\",\n" + " \"targetFieldType\": \"varchar(20)\",\n" + " \"targetFieldNameNotes\": \"存货档案id\",\n" + " \"primaryKey\": \"Y\"\n" + " },\n" + " {\n" + " \"sourceFieldName\": \"INVCODE\",\n" + " \"sourceFieldType\": \"VARCHAR(30)\",\n" + " \"targetFieldName\": \"invcode\",\n" + " \"targetFieldType\": \"varchar(20)\",\n" + " \"targetFieldNameNotes\": \"单据号\"\n" + " },\n" + " {\n" + " \"sourceFieldName\": \"TS\",\n" + " \"sourceFieldType\": \"CHAR(19)\",\n" + " \"targetFieldName\": \"ts\",\n" + " \"targetFieldType\": \"varchar(20)\",\n" + " \"targetFieldNameNotes\": \"时间戳\"\n" + " }\n" + " ]\n" + " }\n" + "]"); - parameter3.setDescription("字段对照关系"); + parameter3.setValue("[\n" + " {\n" + " \"targetTbName\": \"iep_so_sale\",\n" + " \"targetDBType\": \"mysql8.0.44\",\n" + " \"writeType\": \"1\",\n" + " \"fieldRelationship\": [\n" + " {\n" + " \"sourceFieldName\": \"csaleid\",\n" + " \"sourceFieldType\": \"varchar2(30)\",\n" + " \"targetFieldName\": \"csaleid\",\n" + " \"targetFieldType\": \"varchar(100)\",\n" + " \"targetFieldNameNotes\": \"主键\",\n" + " \"primaryKey\": \"Y\"\n" + " },\n" + " {\n" + " \"sourceFieldName\": \"vreceiptcode\",\n" + " \"sourceFieldType\": \"varchar2(30)\",\n" + " \"targetFieldName\": \"vreceiptcode\",\n" + " \"targetFieldType\": \"varchar(100)\",\n" + " \"targetFieldNameNotes\": \"编码\"\n" + " },\n" + " {\n" + " \"sourceFieldName\": \"ts\",\n" + " \"sourceFieldType\": \"varchar2(30)\",\n" + " \"targetFieldName\": \"ts\",\n" + " \"targetFieldType\": \"varchar(100)\",\n" + " \"targetFieldNameNotes\": \"更新时间\"\n" + " }\n" + " ]\n" + " }\n" + "]"); + parameter3.setDescription("字段映射关系"); - Parameter2 parameter4 = new Parameter2(); - parameter4.setSensitive(false); - parameter4.setName("releaseSignalIdentifier"); - parameter4.setValue(UUID.randomUUID().toString()); - parameter4.setDescription("建表信号,传随机值!"); +// Parameter2 parameter4 = new Parameter2(); +// parameter4.setSensitive(false); +// parameter4.setName("releaseSignalIdentifier"); +// parameter4.setValue(UUID.randomUUID().toString()); +// parameter4.setDescription("建表信号,传随机值!"); // Parameter2 parameter4 = new Parameter2(); // parameter4.setSensitive(false); @@ -348,11 +386,11 @@ public class TestNifiDbSync { // parameter5.setValue("U8C241231.SO_SALE"); // parameter5.setDescription("参数描述"); - Parameters2 parameters = new Parameters2(); - parameters.setParameter(parameter4); +// Parameters2 parameters = new Parameters2(); +// parameters.setParameter(parameter4); - Parameters2 parameters2 = new Parameters2(); - parameters2.setParameter(parameter2); +// Parameters2 parameters2 = new Parameters2(); +// parameters2.setParameter(parameter2); Parameters2 parameters3 = new Parameters2(); parameters3.setParameter(parameter3); @@ -364,14 +402,14 @@ public class TestNifiDbSync { // parameters5.setParameter(parameter5); List parametersList = new ArrayList<>(); - parametersList.add(parameters); - parametersList.add(parameters2); +// parametersList.add(parameters); +// parametersList.add(parameters2); parametersList.add(parameters3); // parametersList.add(parameters4); // parametersList.add(parameters5); Component3 component = new Component3(); - component.setName("mysql参数环境2505241627"); + component.setName("mysql参数环境" + createParamTime()); component.setParameters(parametersList); ParameterContextsJoin parameterContextsJoin = new ParameterContextsJoin();