feat(base-buildpackage): 优化数据同步参数化功能

- 新增自动建表功能,增加相关连接和参数配置
- 更新 Oracle 和 MySQL 参数配置,适应新的数据同步需求
- 优化参数环境名称生成逻辑,使用当前时间作为后缀
- 调整字段映射关系配置,提高数据同步准确性
- 注释掉不必要的参数配置,简化配置结构
This commit is contained in:
liuy 2025-05-28 16:07:44 +08:00
parent 460722dde6
commit f9eb0bdea3
1 changed files with 62 additions and 24 deletions

View File

@ -41,6 +41,8 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner; import org.springframework.test.context.junit4.SpringRunner;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*; import java.util.*;
/** /**
@ -126,6 +128,8 @@ public class TestNifiDbSync {
//TYPE=OUTPUT_PORT //TYPE=OUTPUT_PORT
PortFilterResult mysqlMarkInput = iNifiBaseOperation.getNeedModifyControllerRelationshipByProcessGroup(mysqlAppContext.getNewProcessGroupId(), "be05a6ad-a440-43ed-b4c0-d1edcf03334f"); PortFilterResult mysqlMarkInput = iNifiBaseOperation.getNeedModifyControllerRelationshipByProcessGroup(mysqlAppContext.getNewProcessGroupId(), "be05a6ad-a440-43ed-b4c0-d1edcf03334f");
PortFilterResult mysqlMarkOutput = iNifiBaseOperation.getNeedModifyControllerRelationshipByProcessGroup(mysqlAppContext.getNewProcessGroupId(), "7ac85368-f2c9-48df-ab5b-ea610449083d"); PortFilterResult mysqlMarkOutput = iNifiBaseOperation.getNeedModifyControllerRelationshipByProcessGroup(mysqlAppContext.getNewProcessGroupId(), "7ac85368-f2c9-48df-ab5b-ea610449083d");
//查找Mysql应用的输入端口
PortFilterResult automaticTableCreation = iNifiBaseOperation.getNeedModifyControllerRelationshipByProcessGroup(mysqlAppContext.getNewProcessGroupId(), "9480f217-9d94-4dcd-ba80-f718cc6e59db");
//批量创建连接关系 //批量创建连接关系
//连接关系一:开始处理器->Oracle应用 //连接关系一:开始处理器->Oracle应用
@ -211,6 +215,33 @@ public class TestNifiDbSync {
createConnections173.setRevision(revision183); createConnections173.setRevision(revision183);
createConnections18List.add(createConnections173); createConnections18List.add(createConnections173);
//连接关系四如果存在自动建表则需要从开始节点-连接到Mysql应用的"自动建表"的输入端口
List<String> selectedRelationships2 = new ArrayList<>();
selectedRelationships2.add("success");
SourceOrDestination18 source4 = new SourceOrDestination18();
source4.setId(generateFlowFileId);
source4.setGroupId(newProcessGroupsId);
source4.setType("PROCESSOR");
SourceOrDestination18 destination4 = new SourceOrDestination18();
destination4.setId(automaticTableCreation.getInputPorts().get(0).getId());
destination4.setGroupId(mysqlAppContext.getNewProcessGroupId());
destination4.setType("INPUT_PORT");
Component18 component184 = new Component18();
component184.setSource(source4);
component184.setDestination(destination4);
component184.setSelectedRelationships(selectedRelationships2);
Revision18 revision184 = new Revision18();
revision184.setVersion("0");//默认传递0
CreateConnections18 createConnections174 = new CreateConnections18();
createConnections174.setComponent(component184);
createConnections174.setRevision(revision184);
createConnections18List.add(createConnections174);
CreateProcessorConnections createProcessorConnections = new CreateProcessorConnections(); CreateProcessorConnections createProcessorConnections = new CreateProcessorConnections();
createProcessorConnections.setProcessGroupsId(newProcessGroupsId); createProcessorConnections.setProcessGroupsId(newProcessGroupsId);
createProcessorConnections.setCreateConnections18(createConnections18List); createProcessorConnections.setCreateConnections18(createConnections18List);
@ -251,14 +282,14 @@ public class TestNifiDbSync {
Parameter2 parameter2 = new Parameter2(); Parameter2 parameter2 = new Parameter2();
parameter2.setSensitive(false); parameter2.setSensitive(false);
parameter2.setName("columnsToReturn"); parameter2.setName("columnsToReturn");
parameter2.setValue("pk_invbasdoc,invcode,ts"); parameter2.setValue("csaleid,vreceiptcode,ts");
parameter2.setDescription("需要查询的表字段"); parameter2.setDescription("需要查询的表字段");
Parameter2 parameter3 = new Parameter2(); Parameter2 parameter3 = new Parameter2();
parameter3.setSensitive(false); parameter3.setSensitive(false);
parameter3.setName("customOrderByColumn"); parameter3.setName("customOrderByColumn");
parameter3.setValue("ts asc"); parameter3.setValue("ts,csaleid asc");
parameter3.setDescription("排序规则"); parameter3.setDescription("排序规则,如果有增量同步需求,一定要通过时间+主键");
Parameter2 parameter4 = new Parameter2(); Parameter2 parameter4 = new Parameter2();
parameter4.setSensitive(false); parameter4.setSensitive(false);
@ -269,7 +300,7 @@ public class TestNifiDbSync {
Parameter2 parameter5 = new Parameter2(); Parameter2 parameter5 = new Parameter2();
parameter5.setSensitive(false); parameter5.setSensitive(false);
parameter5.setName("sourceTbName"); parameter5.setName("sourceTbName");
parameter5.setValue("U8C241231.BD_INVBASDOC"); parameter5.setValue("U8C241231.SO_SALE");
parameter5.setDescription("查询表名称"); parameter5.setDescription("查询表名称");
Parameters2 parameters = new Parameters2(); Parameters2 parameters = new Parameters2();
@ -295,7 +326,7 @@ public class TestNifiDbSync {
parametersList.add(parameters5); parametersList.add(parameters5);
Component3 component = new Component3(); Component3 component = new Component3();
component.setName("oracle参数环境2505241627"); component.setName("oracle参数环境" + createParamTime());
component.setParameters(parametersList); component.setParameters(parametersList);
ParameterContextsJoin parameterContextsJoin = new ParameterContextsJoin(); ParameterContextsJoin parameterContextsJoin = new ParameterContextsJoin();
@ -308,6 +339,13 @@ public class TestNifiDbSync {
return createParamContextJoin; return createParamContextJoin;
} }
public static String createParamTime() {
LocalDateTime now = LocalDateTime.now();
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyMMddHHmm");
String formattedTime = now.format(formatter);
return formattedTime;
}
public CreateParamContextJoin setMysqlParamContextJoin(String processGroupsId) throws Exception { public CreateParamContextJoin setMysqlParamContextJoin(String processGroupsId) throws Exception {
Revision revision = new Revision(); Revision revision = new Revision();
revision.setVersion(0); revision.setVersion(0);
@ -318,23 +356,23 @@ public class TestNifiDbSync {
// parameter.setValue("table_creation"); // parameter.setValue("table_creation");
// parameter.setDescription("参数描述"); // parameter.setDescription("参数描述");
Parameter2 parameter2 = new Parameter2(); // Parameter2 parameter2 = new Parameter2();
parameter2.setSensitive(false); // parameter2.setSensitive(false);
parameter2.setName("autoTableCreation"); // parameter2.setName("autoTableCreation");
parameter2.setValue("Y"); // parameter2.setValue("Y");
parameter2.setDescription("是否自动建表"); // parameter2.setDescription("是否自动建表");
Parameter2 parameter3 = new Parameter2(); Parameter2 parameter3 = new Parameter2();
parameter3.setSensitive(false); parameter3.setSensitive(false);
parameter3.setName("mappingRelationship"); parameter3.setName("mappingRelationship");
parameter3.setValue("[\n" + " {\n" + " \"targetTbName\": \"iep_bd_invbasdoc\",\n" + " \"targetDBType\": \"mysql8.0.44\",\n" + " \"writeType\": \"1\",\n" + " \"fieldRelationship\": [\n" + " {\n" + " \"sourceFieldName\": \"PK_INVBASDOC\",\n" + " \"sourceFieldType\": \"CHAR(20)\",\n" + " \"targetFieldName\": \"pk_invbasdoc\",\n" + " \"targetFieldType\": \"varchar(20)\",\n" + " \"targetFieldNameNotes\": \"存货档案id\",\n" + " \"primaryKey\": \"Y\"\n" + " },\n" + " {\n" + " \"sourceFieldName\": \"INVCODE\",\n" + " \"sourceFieldType\": \"VARCHAR(30)\",\n" + " \"targetFieldName\": \"invcode\",\n" + " \"targetFieldType\": \"varchar(20)\",\n" + " \"targetFieldNameNotes\": \"单据号\"\n" + " },\n" + " {\n" + " \"sourceFieldName\": \"TS\",\n" + " \"sourceFieldType\": \"CHAR(19)\",\n" + " \"targetFieldName\": \"ts\",\n" + " \"targetFieldType\": \"varchar(20)\",\n" + " \"targetFieldNameNotes\": \"时间\"\n" + " }\n" + " ]\n" + " }\n" + "]"); parameter3.setValue("[\n" + " {\n" + " \"targetTbName\": \"iep_so_sale\",\n" + " \"targetDBType\": \"mysql8.0.44\",\n" + " \"writeType\": \"1\",\n" + " \"fieldRelationship\": [\n" + " {\n" + " \"sourceFieldName\": \"csaleid\",\n" + " \"sourceFieldType\": \"varchar2(30)\",\n" + " \"targetFieldName\": \"csaleid\",\n" + " \"targetFieldType\": \"varchar(100)\",\n" + " \"targetFieldNameNotes\": \"主键\",\n" + " \"primaryKey\": \"Y\"\n" + " },\n" + " {\n" + " \"sourceFieldName\": \"vreceiptcode\",\n" + " \"sourceFieldType\": \"varchar2(30)\",\n" + " \"targetFieldName\": \"vreceiptcode\",\n" + " \"targetFieldType\": \"varchar(100)\",\n" + " \"targetFieldNameNotes\": \"编码\"\n" + " },\n" + " {\n" + " \"sourceFieldName\": \"ts\",\n" + " \"sourceFieldType\": \"varchar2(30)\",\n" + " \"targetFieldName\": \"ts\",\n" + " \"targetFieldType\": \"varchar(100)\",\n" + " \"targetFieldNameNotes\": \"更新时间\"\n" + " }\n" + " ]\n" + " }\n" + "]");
parameter3.setDescription("字段对照关系"); parameter3.setDescription("字段映射关系");
Parameter2 parameter4 = new Parameter2(); // Parameter2 parameter4 = new Parameter2();
parameter4.setSensitive(false); // parameter4.setSensitive(false);
parameter4.setName("releaseSignalIdentifier"); // parameter4.setName("releaseSignalIdentifier");
parameter4.setValue(UUID.randomUUID().toString()); // parameter4.setValue(UUID.randomUUID().toString());
parameter4.setDescription("建表信号,传随机值!"); // parameter4.setDescription("建表信号,传随机值!");
// Parameter2 parameter4 = new Parameter2(); // Parameter2 parameter4 = new Parameter2();
// parameter4.setSensitive(false); // parameter4.setSensitive(false);
@ -348,11 +386,11 @@ public class TestNifiDbSync {
// parameter5.setValue("U8C241231.SO_SALE"); // parameter5.setValue("U8C241231.SO_SALE");
// parameter5.setDescription("参数描述"); // parameter5.setDescription("参数描述");
Parameters2 parameters = new Parameters2(); // Parameters2 parameters = new Parameters2();
parameters.setParameter(parameter4); // parameters.setParameter(parameter4);
Parameters2 parameters2 = new Parameters2(); // Parameters2 parameters2 = new Parameters2();
parameters2.setParameter(parameter2); // parameters2.setParameter(parameter2);
Parameters2 parameters3 = new Parameters2(); Parameters2 parameters3 = new Parameters2();
parameters3.setParameter(parameter3); parameters3.setParameter(parameter3);
@ -364,14 +402,14 @@ public class TestNifiDbSync {
// parameters5.setParameter(parameter5); // parameters5.setParameter(parameter5);
List<Parameters2> parametersList = new ArrayList<>(); List<Parameters2> parametersList = new ArrayList<>();
parametersList.add(parameters); // parametersList.add(parameters);
parametersList.add(parameters2); // parametersList.add(parameters2);
parametersList.add(parameters3); parametersList.add(parameters3);
// parametersList.add(parameters4); // parametersList.add(parameters4);
// parametersList.add(parameters5); // parametersList.add(parameters5);
Component3 component = new Component3(); Component3 component = new Component3();
component.setName("mysql参数环境2505241627"); component.setName("mysql参数环境" + createParamTime());
component.setParameters(parametersList); component.setParameters(parametersList);
ParameterContextsJoin parameterContextsJoin = new ParameterContextsJoin(); ParameterContextsJoin parameterContextsJoin = new ParameterContextsJoin();