test(jolt): 添加新的 Jolt 测试用例和功能

- 在 TestJoltCardinality 类中添加了新的测试方法 testDemo3
- 在 TestJoltShift 类中添加了新的测试方法 testDemo16
- 新增了 UserVo 类用于测试
- 更新了 TestNifiDbSync 类中的参数配置
- 添加了新的 JSON 输入文件和规范文件用于测试
- 引入了 QLExpress依赖并添加了相关测试类 TestQl
This commit is contained in:
liuy 2025-07-23 14:14:46 +08:00
parent f01f5cd00a
commit 0b7a644ed0
10 changed files with 300 additions and 19 deletions

View File

@ -34,6 +34,12 @@
<artifactId>json-utils</artifactId> <artifactId>json-utils</artifactId>
<version>${latest.jolt.version}</version> <version>${latest.jolt.version}</version>
</dependency> </dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba/QLExpress -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>QLExpress</artifactId>
<version>3.3.4</version>
</dependency>
</dependencies> </dependencies>
<profiles> <profiles>

View File

@ -128,8 +128,8 @@ public class TestNifiDbSync {
//TYPE=OUTPUT_PORT //TYPE=OUTPUT_PORT
PortFilterResult mysqlMarkInput = iNifiBaseOperation.getNeedModifyControllerRelationshipByProcessGroup(mysqlAppContext.getNewProcessGroupId(), "be05a6ad-a440-43ed-b4c0-d1edcf03334f"); PortFilterResult mysqlMarkInput = iNifiBaseOperation.getNeedModifyControllerRelationshipByProcessGroup(mysqlAppContext.getNewProcessGroupId(), "be05a6ad-a440-43ed-b4c0-d1edcf03334f");
PortFilterResult mysqlMarkOutput = iNifiBaseOperation.getNeedModifyControllerRelationshipByProcessGroup(mysqlAppContext.getNewProcessGroupId(), "7ac85368-f2c9-48df-ab5b-ea610449083d"); PortFilterResult mysqlMarkOutput = iNifiBaseOperation.getNeedModifyControllerRelationshipByProcessGroup(mysqlAppContext.getNewProcessGroupId(), "7ac85368-f2c9-48df-ab5b-ea610449083d");
//查找Mysql应用的输入端口 //
PortFilterResult automaticTableCreation = iNifiBaseOperation.getNeedModifyControllerRelationshipByProcessGroup(mysqlAppContext.getNewProcessGroupId(), "9480f217-9d94-4dcd-ba80-f718cc6e59db"); // PortFilterResult automaticTableCreation = iNifiBaseOperation.getNeedModifyControllerRelationshipByProcessGroup(mysqlAppContext.getNewProcessGroupId(), "9480f217-9d94-4dcd-ba80-f718cc6e59db");
//批量创建连接关系 //批量创建连接关系
//连接关系一:开始处理器->Oracle应用 //连接关系一:开始处理器->Oracle应用
@ -224,23 +224,23 @@ public class TestNifiDbSync {
source4.setGroupId(newProcessGroupsId); source4.setGroupId(newProcessGroupsId);
source4.setType("PROCESSOR"); source4.setType("PROCESSOR");
SourceOrDestination18 destination4 = new SourceOrDestination18(); // SourceOrDestination18 destination4 = new SourceOrDestination18();
destination4.setId(automaticTableCreation.getInputPorts().get(0).getId()); // destination4.setId(automaticTableCreation.getInputPorts().get(0).getId());
destination4.setGroupId(mysqlAppContext.getNewProcessGroupId()); // destination4.setGroupId(mysqlAppContext.getNewProcessGroupId());
destination4.setType("INPUT_PORT"); // destination4.setType("INPUT_PORT");
Component18 component184 = new Component18(); // Component18 component184 = new Component18();
component184.setSource(source4); // component184.setSource(source4);
component184.setDestination(destination4); // component184.setDestination(destination4);
component184.setSelectedRelationships(selectedRelationships2); // component184.setSelectedRelationships(selectedRelationships2);
Revision18 revision184 = new Revision18(); // Revision18 revision184 = new Revision18();
revision184.setVersion("0");//默认传递0 // revision184.setVersion("0");//默认传递0
CreateConnections18 createConnections174 = new CreateConnections18(); // CreateConnections18 createConnections174 = new CreateConnections18();
createConnections174.setComponent(component184); // createConnections174.setComponent(component184);
createConnections174.setRevision(revision184); // createConnections174.setRevision(revision184);
createConnections18List.add(createConnections174); // createConnections18List.add(createConnections174);
CreateProcessorConnections createProcessorConnections = new CreateProcessorConnections(); CreateProcessorConnections createProcessorConnections = new CreateProcessorConnections();
createProcessorConnections.setProcessGroupsId(newProcessGroupsId); createProcessorConnections.setProcessGroupsId(newProcessGroupsId);
@ -285,6 +285,18 @@ public class TestNifiDbSync {
parameter2.setValue("csaleid,vreceiptcode,ts"); parameter2.setValue("csaleid,vreceiptcode,ts");
parameter2.setDescription("需要查询的表字段"); parameter2.setDescription("需要查询的表字段");
/**
* 参数描述
* 注意如果传递了排序参数不能仅按ts时间排序需要加上主键ts时间存在重复排序时存在随机性会导致数据重复
*
* 下面两个sql经过验证会导致部分数据重复
* sql1SELECT PK_SYSINIT, TS FROM U8C241231.PUB_SYSINIT WHERE ( 1 = 1 ) AND ts <= '2025-05-16 15:56:25' ORDER BY ts FETCH NEXT 10000 ROWS ONLY
* sql2SELECT PK_SYSINIT, TS FROM U8C241231.PUB_SYSINIT WHERE ( 1 = 1 ) AND ts <= '2025-05-16 15:56:25' ORDER BY ts OFFSET 10000 ROWS FETCH NEXT 10000 ROWS ONLY
*
* ORDER BY ts 仅按 ts 排序 ts 有重复值如多行 ts = '2024-08-12 11:52:23'数据库无法确定这些行的相对顺序可能在不同子查询中产生不同排列导致 0001A110... 被重复选中OFFSET FETCH 依赖排序如果排序不稳定分页范围可能错误地包含同一行
*
* 解决办法是ts+主键排序这样比较稳定
*/
Parameter2 parameter3 = new Parameter2(); Parameter2 parameter3 = new Parameter2();
parameter3.setSensitive(false); parameter3.setSensitive(false);
parameter3.setName("customOrderByColumn"); parameter3.setName("customOrderByColumn");
@ -362,11 +374,43 @@ public class TestNifiDbSync {
// parameter2.setValue("Y"); // parameter2.setValue("Y");
// parameter2.setDescription("是否自动建表"); // parameter2.setDescription("是否自动建表");
// Parameter2 parameter3 = new Parameter2();
// parameter3.setSensitive(false);
// parameter3.setName("mappingRelationship");
// parameter3.setValue("[\n" + " {\n" + " \"targetTbName\": \"iep_so_sale\",\n" + " \"targetDBType\": \"mysql8.0.44\",\n" + " \"writeType\": \"1\",\n" + " \"fieldRelationship\": [\n" + " {\n" + " \"sourceFieldName\": \"csaleid\",\n" + " \"sourceFieldType\": \"varchar2(30)\",\n" + " \"targetFieldName\": \"csaleid\",\n" + " \"targetFieldType\": \"varchar(100)\",\n" + " \"targetFieldNameNotes\": \"主键\",\n" + " \"primaryKey\": \"Y\"\n" + " },\n" + " {\n" + " \"sourceFieldName\": \"vreceiptcode\",\n" + " \"sourceFieldType\": \"varchar2(30)\",\n" + " \"targetFieldName\": \"vreceiptcode\",\n" + " \"targetFieldType\": \"varchar(100)\",\n" + " \"targetFieldNameNotes\": \"编码\"\n" + " },\n" + " {\n" + " \"sourceFieldName\": \"ts\",\n" + " \"sourceFieldType\": \"varchar2(30)\",\n" + " \"targetFieldName\": \"ts\",\n" + " \"targetFieldType\": \"varchar(100)\",\n" + " \"targetFieldNameNotes\": \"更新时间\"\n" + " }\n" + " ]\n" + " }\n" + "]");
// parameter3.setDescription("字段映射关系");
Parameter2 parameter3 = new Parameter2(); Parameter2 parameter3 = new Parameter2();
parameter3.setSensitive(false); parameter3.setSensitive(false);
parameter3.setName("mappingRelationship"); parameter3.setName("autoTableCreation");
parameter3.setValue("[\n" + " {\n" + " \"targetTbName\": \"iep_so_sale\",\n" + " \"targetDBType\": \"mysql8.0.44\",\n" + " \"writeType\": \"1\",\n" + " \"fieldRelationship\": [\n" + " {\n" + " \"sourceFieldName\": \"csaleid\",\n" + " \"sourceFieldType\": \"varchar2(30)\",\n" + " \"targetFieldName\": \"csaleid\",\n" + " \"targetFieldType\": \"varchar(100)\",\n" + " \"targetFieldNameNotes\": \"主键\",\n" + " \"primaryKey\": \"Y\"\n" + " },\n" + " {\n" + " \"sourceFieldName\": \"vreceiptcode\",\n" + " \"sourceFieldType\": \"varchar2(30)\",\n" + " \"targetFieldName\": \"vreceiptcode\",\n" + " \"targetFieldType\": \"varchar(100)\",\n" + " \"targetFieldNameNotes\": \"编码\"\n" + " },\n" + " {\n" + " \"sourceFieldName\": \"ts\",\n" + " \"sourceFieldType\": \"varchar2(30)\",\n" + " \"targetFieldName\": \"ts\",\n" + " \"targetFieldType\": \"varchar(100)\",\n" + " \"targetFieldNameNotes\": \"更新时间\"\n" + " }\n" + " ]\n" + " }\n" + "]"); parameter3.setValue("Y");
parameter3.setDescription("字段映射关系"); parameter3.setDescription("");
Parameter2 parameter4 = new Parameter2();
parameter4.setSensitive(false);
parameter4.setName("joltSpec");
parameter4.setValue("[\n" +
" {\n" +
" \"operation\": \"shift\",\n" +
" \"spec\": {\n" +
" \"A\": \"B\",\n" +
" \"*\": \"&\"\n" +
" }\n" +
" }\n" +
"]");
parameter4.setDescription("");
Parameter2 parameter5 = new Parameter2();
parameter5.setSensitive(false);
parameter5.setName("jsonPrimaryKeyTag");
parameter5.setValue("[\"details_id\",\"header_id\",\"CSALEID\"]");
parameter5.setDescription("");
Parameter2 parameter6 = new Parameter2();
parameter6.setSensitive(false);
parameter6.setName("targetInsertTableName");
parameter6.setValue("iep_so_sale");
parameter6.setDescription("");
// Parameter2 parameter4 = new Parameter2(); // Parameter2 parameter4 = new Parameter2();
// parameter4.setSensitive(false); // parameter4.setSensitive(false);
@ -395,6 +439,15 @@ public class TestNifiDbSync {
Parameters2 parameters3 = new Parameters2(); Parameters2 parameters3 = new Parameters2();
parameters3.setParameter(parameter3); parameters3.setParameter(parameter3);
Parameters2 parameters4 = new Parameters2();
parameters4.setParameter(parameter4);
Parameters2 parameters5 = new Parameters2();
parameters5.setParameter(parameter5);
Parameters2 parameters6 = new Parameters2();
parameters6.setParameter(parameter6);
// Parameters2 parameters4 = new Parameters2(); // Parameters2 parameters4 = new Parameters2();
// parameters4.setParameter(parameter4); // parameters4.setParameter(parameter4);
@ -405,6 +458,9 @@ public class TestNifiDbSync {
// parametersList.add(parameters); // parametersList.add(parameters);
// parametersList.add(parameters2); // parametersList.add(parameters2);
parametersList.add(parameters3); parametersList.add(parameters3);
parametersList.add(parameters4);
parametersList.add(parameters5);
parametersList.add(parameters6);
// parametersList.add(parameters4); // parametersList.add(parameters4);
// parametersList.add(parameters5); // parametersList.add(parameters5);

View File

@ -0,0 +1,26 @@
package com.hzya.frame;
import com.ql.util.express.ExpressRunner;
import org.junit.Assert;
import org.junit.Test;
/**
* @Authorliuyang
* @Packagecom.hzya.frame
* @Projectfw-nifi
* @nameTestQl
* @Date2025/7/18 15:44
* @FilenameTestQl
*/
public class TestQl {
@Test
public void test1() throws Exception {
String exp = "import com.hzya.frame.vo.UserVo;" +
"UserVo cust = new UserVo();" +
"cust.setName(\"小强\");" +
"return cust.getName();";
ExpressRunner runner = new ExpressRunner();
String r = (String) runner.execute(exp, null, null, false, false);
Assert.assertEquals("操作符执行错误", "小强", r);
}
}

View File

@ -48,4 +48,16 @@ public class TestJoltCardinality {
Object transform = chainr.transform(input); Object transform = chainr.transform(input);
System.out.println(JsonUtils.toJsonString(transform)); System.out.println(JsonUtils.toJsonString(transform));
} }
@Test
public void testDemo3() {
//输入json
Object input = JsonUtils.filepathToObject(jsonFileRootRoute + "/cardinality/input3.json");
//spec
Object spec = JsonUtils.filepathToObject(jsonFileRootRoute + "/cardinality/spec3.json");
Chainr chainr = Chainr.fromSpec(spec);
Object transform = chainr.transform(input);
System.out.println(JsonUtils.toJsonString(transform));
}
} }

View File

@ -215,4 +215,17 @@ public class TestJoltShift {
Object transform = chainr.transform(input); Object transform = chainr.transform(input);
System.out.println(JsonUtils.toJsonString(transform)); System.out.println(JsonUtils.toJsonString(transform));
} }
/**
* 字段名替换
*/
@Test
public void testDemo16() {
Object input = JsonUtils.filepathToObject(jsonFileRootRoute + "/shift/input16.json");
Object spec = JsonUtils.filepathToObject(jsonFileRootRoute + "/shift/spec16.json");
Chainr chainr = Chainr.fromSpec(spec);
Object transform = chainr.transform(input);
System.out.println(JsonUtils.toJsonString(transform));
}
} }

View File

@ -0,0 +1,16 @@
package com.hzya.frame.vo;
import lombok.Data;
/**
* @Authorliuyang
* @Packagecom.hzya.frame.vo
* @Projectfw-nifi
* @nameUser
* @Date2025/7/18 15:46
* @FilenameUser
*/
@Data
public class UserVo {
private String name;
}

View File

@ -0,0 +1,57 @@
{
"voucher": [
{
"pk_corp": "1",
"pk_corp_name":"广脉科技股份有限公司",
"pk_glorgbook": "1-0003",
"pk_glorgbook_id": "0001A1100000000001M9",
"pk_prepared": "tbadmin",
"pk_prepared_id": "TB_NEW100000000004OP",
"pk_vouchertype": "记账",
"prepareddate": "2025-07-04",
"details": [
{
"explanation": "转资申报单-1017252021000007-ZJ-ZY-YD-201808-018-横塘村西",
"pk_accsubj": "160105",
"pk_accsubj_id":"0001A1100000000008TZ",
"pk_currtype": "CNY",
"pk_currtype_id": "00010000000000000001",
"debitamount": "1.00",
"localdebitamount": "1.00"
},
{
"explanation": "转资申报单-1017252021000007-ZJ-ZY-YD-201808-018-横塘村西",
"pk_accsubj": "160106",
"pk_accsubj_id":"0001A1100000000008U0",
"pk_currtype": "CNY",
"pk_currtype_id": "00010000000000000001",
"debitamount": "1.00",
"localdebitamount": "1.00"
},
{
"explanation": "转资申报单-1017252021000007-ZJ-ZY-YD-201808-018-横塘村西",
"pk_accsubj": "16040501",
"pk_accsubj_id":"0001A2100000000CCIRN",
"pk_currtype": "CNY",
"pk_currtype_id": "00010000000000000001",
"creditamount": "2.00",
"localcreditamount": "2.00",
"ass": [
{
"checktypecode": "D0002",
"checkvaluecode": "25"
},
{
"checktypecode": "jobass",
"checkvaluecode": "02010100858"
},
{
"checktypecode": "73",
"checkvaluecode": "1100032021000083"
}
]
}
]
}
]
}

View File

@ -0,0 +1,33 @@
[
{
"operation": "shift",
"spec": {
"voucher": {
"*": {
"pk_corp": "voucher[&1].orgcorp",
"pk_glorgbook": "voucher[&1].pk_glorgbook",
"pk_prepared": "voucher[&1].pk_prepared",
"pk_vouchertype": "voucher[&1].pk_vouchertype",
"prepareddate": "voucher[&1].prepareddate",
"details": {
"*": {
"explanation": "voucher[&3].details[&1].explanation",
"pk_accsubj": "voucher[&3].details[&1].pk_accsubj",
"pk_currtype": "voucher[&3].details[&1].pk_currtype",
"debitamount": "voucher[&3].details[&1].debitamount",
"localdebitamount": "voucher[&3].details[&1].localdebitamount",
"creditamount": "voucher[&3].details[&1].creditamount",
"localcreditamount": "voucher[&3].details[&1].localcreditamount",
"ass": {
"*": {
"checktypecode": "voucher[&5].details[&3].ass[&1].checktypecode",
"checkvaluecode": "voucher[&5].details[&3].ass[&1].checkvaluecode"
}
}
}
}
}
}
}
}
]

View File

@ -0,0 +1,44 @@
{
"ALRCONFINCOMNY": "0",
"BCOOPTOPO": "N",
"BFREECUSTFLAG": "N",
"BINITFLAG": "N",
"BINVOICENDFLAG": "N",
"BOUTENDFLAG": "N",
"BOVERDATE": "N",
"BPOCOOPTOME": "N",
"BRECEIPTENDFLAG": "Y",
"BRETINVFLAG": "Y",
"BTRANSENDFLAG": "N",
"CBIZTYPE": "0001A110000000000E7S",
"CCALBODYID": "1001A11000000000006C",
"CCUSTOMERID": "0001A110000000001SXQ",
"CDEPTID": "1001A11000000000006L",
"COPERATORID": "0001A110000000000X6H",
"CRECEIPTCORPID": "0001A110000000001SXQ",
"CRECEIPTCUSTOMERID": "0001A110000000001SXQ",
"CRECEIPTTYPE": "30",
"CSALECORPID": "0001A110000000000DXH",
"CSALEID": "1001A1100000000005KT",
"DBILLDATE": "2024-09-11",
"DBILLTIME": "2024-09-11 15:37:49",
"DMAKEDATE": "2024-09-11",
"DMODITIME": "2024-09-11 15:38:01",
"DR": "1",
"EDITIONNUM": "1.0",
"FSTATUS": "5",
"MODIFIER": "0001A110000000000X6H",
"NDISCOUNTRATE": "1",
"NEVALUATECARRIAGE": "0",
"NHEADSUMMNY": "-11.3",
"NTOTALNUM": "-1",
"PK_CORP": "1001",
"PK_DEFDOC1": "0001A110000000000Z3L",
"PK_DEFDOC3": "0001A110000000000E75",
"TRANSACTIONPRICE": "-10",
"TS": "2024-10-15 15:30:02",
"VDEF1": "速卖通-海外",
"VDEF3": "销售出库",
"VRECEIPTCODE": "SO2409110001",
"RN": "1"
}

View File

@ -0,0 +1,18 @@
[
{
"operation": "shift",
"spec": {
"*": {
"clientCode": "[&1].bbbbbb",
"internalInstructionType": "[&1].aaa",
"test01": {
"*": {
"test03": "[&3].test01[&1].test04",
"*": "[&3].test01[&1].&"
}
},
"*": "[&1].&"
}
}
}
]