From 0b7a644ed0aa97e8754370eecef0b10acad3a436 Mon Sep 17 00:00:00 2001
From: liuy <37787198+LiuyCodes@users.noreply.github.com>
Date: Wed, 23 Jul 2025 14:14:46 +0800
Subject: [PATCH] =?UTF-8?q?test(jolt):=20=E6=B7=BB=E5=8A=A0=E6=96=B0?=
=?UTF-8?q?=E7=9A=84=20Jolt=20=E6=B5=8B=E8=AF=95=E7=94=A8=E4=BE=8B?=
=?UTF-8?q?=E5=92=8C=E5=8A=9F=E8=83=BD?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
- 在 TestJoltCardinality 类中添加了新的测试方法 testDemo3
- 在 TestJoltShift 类中添加了新的测试方法 testDemo16
- 新增了 UserVo 类用于测试
- 更新了 TestNifiDbSync 类中的参数配置
- 添加了新的 JSON 输入文件和规范文件用于测试
- 引入了 QLExpress依赖并添加了相关测试类 TestQl
---
base-buildpackage/pom.xml | 6 ++
.../java/com/hzya/frame/TestNifiDbSync.java | 94 +++++++++++++++----
.../src/test/java/com/hzya/frame/TestQl.java | 26 +++++
.../hzya/frame/jolt/TestJoltCardinality.java | 12 +++
.../com/hzya/frame/jolt/TestJoltShift.java | 13 +++
.../test/java/com/hzya/frame/vo/UserVo.java | 16 ++++
jolt-demo/cardinality/input3.json | 57 +++++++++++
jolt-demo/cardinality/spec3.json | 33 +++++++
jolt-demo/shift/input16.json | 44 +++++++++
jolt-demo/shift/spec16.json | 18 ++++
10 files changed, 300 insertions(+), 19 deletions(-)
create mode 100644 base-buildpackage/src/test/java/com/hzya/frame/TestQl.java
create mode 100644 base-buildpackage/src/test/java/com/hzya/frame/vo/UserVo.java
create mode 100644 jolt-demo/cardinality/input3.json
create mode 100644 jolt-demo/cardinality/spec3.json
create mode 100644 jolt-demo/shift/input16.json
create mode 100644 jolt-demo/shift/spec16.json
diff --git a/base-buildpackage/pom.xml b/base-buildpackage/pom.xml
index 60e0e147..45345b42 100644
--- a/base-buildpackage/pom.xml
+++ b/base-buildpackage/pom.xml
@@ -34,6 +34,12 @@
json-utils
${latest.jolt.version}
+
+
+ com.alibaba
+ QLExpress
+ 3.3.4
+
diff --git a/base-buildpackage/src/test/java/com/hzya/frame/TestNifiDbSync.java b/base-buildpackage/src/test/java/com/hzya/frame/TestNifiDbSync.java
index 6ee5a721..ddfe8c6f 100644
--- a/base-buildpackage/src/test/java/com/hzya/frame/TestNifiDbSync.java
+++ b/base-buildpackage/src/test/java/com/hzya/frame/TestNifiDbSync.java
@@ -128,8 +128,8 @@ public class TestNifiDbSync {
//TYPE=OUTPUT_PORT
PortFilterResult mysqlMarkInput = iNifiBaseOperation.getNeedModifyControllerRelationshipByProcessGroup(mysqlAppContext.getNewProcessGroupId(), "be05a6ad-a440-43ed-b4c0-d1edcf03334f");
PortFilterResult mysqlMarkOutput = iNifiBaseOperation.getNeedModifyControllerRelationshipByProcessGroup(mysqlAppContext.getNewProcessGroupId(), "7ac85368-f2c9-48df-ab5b-ea610449083d");
- //查找Mysql应用的输入端口
- PortFilterResult automaticTableCreation = iNifiBaseOperation.getNeedModifyControllerRelationshipByProcessGroup(mysqlAppContext.getNewProcessGroupId(), "9480f217-9d94-4dcd-ba80-f718cc6e59db");
+ //
+// PortFilterResult automaticTableCreation = iNifiBaseOperation.getNeedModifyControllerRelationshipByProcessGroup(mysqlAppContext.getNewProcessGroupId(), "9480f217-9d94-4dcd-ba80-f718cc6e59db");
//批量创建连接关系
//连接关系一:开始处理器->Oracle应用
@@ -224,23 +224,23 @@ public class TestNifiDbSync {
source4.setGroupId(newProcessGroupsId);
source4.setType("PROCESSOR");
- SourceOrDestination18 destination4 = new SourceOrDestination18();
- destination4.setId(automaticTableCreation.getInputPorts().get(0).getId());
- destination4.setGroupId(mysqlAppContext.getNewProcessGroupId());
- destination4.setType("INPUT_PORT");
+// SourceOrDestination18 destination4 = new SourceOrDestination18();
+// destination4.setId(automaticTableCreation.getInputPorts().get(0).getId());
+// destination4.setGroupId(mysqlAppContext.getNewProcessGroupId());
+// destination4.setType("INPUT_PORT");
- Component18 component184 = new Component18();
- component184.setSource(source4);
- component184.setDestination(destination4);
- component184.setSelectedRelationships(selectedRelationships2);
+// Component18 component184 = new Component18();
+// component184.setSource(source4);
+// component184.setDestination(destination4);
+// component184.setSelectedRelationships(selectedRelationships2);
- Revision18 revision184 = new Revision18();
- revision184.setVersion("0");//默认传递0
+// Revision18 revision184 = new Revision18();
+// revision184.setVersion("0");//默认传递0
- CreateConnections18 createConnections174 = new CreateConnections18();
- createConnections174.setComponent(component184);
- createConnections174.setRevision(revision184);
- createConnections18List.add(createConnections174);
+// CreateConnections18 createConnections174 = new CreateConnections18();
+// createConnections174.setComponent(component184);
+// createConnections174.setRevision(revision184);
+// createConnections18List.add(createConnections174);
CreateProcessorConnections createProcessorConnections = new CreateProcessorConnections();
createProcessorConnections.setProcessGroupsId(newProcessGroupsId);
@@ -285,6 +285,18 @@ public class TestNifiDbSync {
parameter2.setValue("csaleid,vreceiptcode,ts");
parameter2.setDescription("需要查询的表字段");
+ /**
+ * 参数描述
+ * 注意,如果传递了排序参数,不能仅按ts时间排序,需要加上主键,ts时间存在重复,排序时存在随机性,会导致数据重复!
+ *
+ * 下面两个sql经过验证会导致部分数据重复
+ * sql1:SELECT PK_SYSINIT, TS FROM U8C241231.PUB_SYSINIT WHERE ( 1 = 1 ) AND ts <= '2025-05-16 15:56:25' ORDER BY ts FETCH NEXT 10000 ROWS ONLY
+ * sql2:SELECT PK_SYSINIT, TS FROM U8C241231.PUB_SYSINIT WHERE ( 1 = 1 ) AND ts <= '2025-05-16 15:56:25' ORDER BY ts OFFSET 10000 ROWS FETCH NEXT 10000 ROWS ONLY
+ *
+ * ORDER BY ts 仅按 ts 排序,但 ts 有重复值(如多行 ts = '2024-08-12 11:52:23')。数据库无法确定这些行的相对顺序,可能在不同子查询中产生不同排列,导致 0001A110... 被重复选中,OFFSET 和 FETCH 依赖排序,如果排序不稳定,分页范围可能错误地包含同一行
+ *
+ * 解决办法是,ts+主键排序,这样比较稳定
+ */
Parameter2 parameter3 = new Parameter2();
parameter3.setSensitive(false);
parameter3.setName("customOrderByColumn");
@@ -362,11 +374,43 @@ public class TestNifiDbSync {
// parameter2.setValue("Y");
// parameter2.setDescription("是否自动建表");
+// Parameter2 parameter3 = new Parameter2();
+// parameter3.setSensitive(false);
+// parameter3.setName("mappingRelationship");
+// parameter3.setValue("[\n" + " {\n" + " \"targetTbName\": \"iep_so_sale\",\n" + " \"targetDBType\": \"mysql8.0.44\",\n" + " \"writeType\": \"1\",\n" + " \"fieldRelationship\": [\n" + " {\n" + " \"sourceFieldName\": \"csaleid\",\n" + " \"sourceFieldType\": \"varchar2(30)\",\n" + " \"targetFieldName\": \"csaleid\",\n" + " \"targetFieldType\": \"varchar(100)\",\n" + " \"targetFieldNameNotes\": \"主键\",\n" + " \"primaryKey\": \"Y\"\n" + " },\n" + " {\n" + " \"sourceFieldName\": \"vreceiptcode\",\n" + " \"sourceFieldType\": \"varchar2(30)\",\n" + " \"targetFieldName\": \"vreceiptcode\",\n" + " \"targetFieldType\": \"varchar(100)\",\n" + " \"targetFieldNameNotes\": \"编码\"\n" + " },\n" + " {\n" + " \"sourceFieldName\": \"ts\",\n" + " \"sourceFieldType\": \"varchar2(30)\",\n" + " \"targetFieldName\": \"ts\",\n" + " \"targetFieldType\": \"varchar(100)\",\n" + " \"targetFieldNameNotes\": \"更新时间\"\n" + " }\n" + " ]\n" + " }\n" + "]");
+// parameter3.setDescription("字段映射关系");
+
Parameter2 parameter3 = new Parameter2();
parameter3.setSensitive(false);
- parameter3.setName("mappingRelationship");
- parameter3.setValue("[\n" + " {\n" + " \"targetTbName\": \"iep_so_sale\",\n" + " \"targetDBType\": \"mysql8.0.44\",\n" + " \"writeType\": \"1\",\n" + " \"fieldRelationship\": [\n" + " {\n" + " \"sourceFieldName\": \"csaleid\",\n" + " \"sourceFieldType\": \"varchar2(30)\",\n" + " \"targetFieldName\": \"csaleid\",\n" + " \"targetFieldType\": \"varchar(100)\",\n" + " \"targetFieldNameNotes\": \"主键\",\n" + " \"primaryKey\": \"Y\"\n" + " },\n" + " {\n" + " \"sourceFieldName\": \"vreceiptcode\",\n" + " \"sourceFieldType\": \"varchar2(30)\",\n" + " \"targetFieldName\": \"vreceiptcode\",\n" + " \"targetFieldType\": \"varchar(100)\",\n" + " \"targetFieldNameNotes\": \"编码\"\n" + " },\n" + " {\n" + " \"sourceFieldName\": \"ts\",\n" + " \"sourceFieldType\": \"varchar2(30)\",\n" + " \"targetFieldName\": \"ts\",\n" + " \"targetFieldType\": \"varchar(100)\",\n" + " \"targetFieldNameNotes\": \"更新时间\"\n" + " }\n" + " ]\n" + " }\n" + "]");
- parameter3.setDescription("字段映射关系");
+ parameter3.setName("autoTableCreation");
+ parameter3.setValue("Y");
+ parameter3.setDescription("");
+
+ Parameter2 parameter4 = new Parameter2();
+ parameter4.setSensitive(false);
+ parameter4.setName("joltSpec");
+ parameter4.setValue("[\n" +
+ " {\n" +
+ " \"operation\": \"shift\",\n" +
+ " \"spec\": {\n" +
+ " \"A\": \"B\",\n" +
+ " \"*\": \"&\"\n" +
+ " }\n" +
+ " }\n" +
+ "]");
+ parameter4.setDescription("");
+
+ Parameter2 parameter5 = new Parameter2();
+ parameter5.setSensitive(false);
+ parameter5.setName("jsonPrimaryKeyTag");
+ parameter5.setValue("[\"details_id\",\"header_id\",\"CSALEID\"]");
+ parameter5.setDescription("");
+
+ Parameter2 parameter6 = new Parameter2();
+ parameter6.setSensitive(false);
+ parameter6.setName("targetInsertTableName");
+ parameter6.setValue("iep_so_sale");
+ parameter6.setDescription("");
// Parameter2 parameter4 = new Parameter2();
// parameter4.setSensitive(false);
@@ -395,6 +439,15 @@ public class TestNifiDbSync {
Parameters2 parameters3 = new Parameters2();
parameters3.setParameter(parameter3);
+ Parameters2 parameters4 = new Parameters2();
+ parameters4.setParameter(parameter4);
+
+ Parameters2 parameters5 = new Parameters2();
+ parameters5.setParameter(parameter5);
+
+ Parameters2 parameters6 = new Parameters2();
+ parameters6.setParameter(parameter6);
+
// Parameters2 parameters4 = new Parameters2();
// parameters4.setParameter(parameter4);
@@ -405,6 +458,9 @@ public class TestNifiDbSync {
// parametersList.add(parameters);
// parametersList.add(parameters2);
parametersList.add(parameters3);
+ parametersList.add(parameters4);
+ parametersList.add(parameters5);
+ parametersList.add(parameters6);
// parametersList.add(parameters4);
// parametersList.add(parameters5);
diff --git a/base-buildpackage/src/test/java/com/hzya/frame/TestQl.java b/base-buildpackage/src/test/java/com/hzya/frame/TestQl.java
new file mode 100644
index 00000000..b71e6a68
--- /dev/null
+++ b/base-buildpackage/src/test/java/com/hzya/frame/TestQl.java
@@ -0,0 +1,26 @@
+package com.hzya.frame;
+
+import com.ql.util.express.ExpressRunner;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * @Author:liuyang
+ * @Package:com.hzya.frame
+ * @Project:fw-nifi
+ * @name:TestQl
+ * @Date:2025/7/18 15:44
+ * @Filename:TestQl
+ */
+public class TestQl {
+ @Test
+ public void test1() throws Exception {
+ String exp = "import com.hzya.frame.vo.UserVo;" +
+ "UserVo cust = new UserVo();" +
+ "cust.setName(\"小强\");" +
+ "return cust.getName();";
+ ExpressRunner runner = new ExpressRunner();
+ String r = (String) runner.execute(exp, null, null, false, false);
+ Assert.assertEquals("操作符执行错误", "小强", r);
+ }
+}
diff --git a/base-buildpackage/src/test/java/com/hzya/frame/jolt/TestJoltCardinality.java b/base-buildpackage/src/test/java/com/hzya/frame/jolt/TestJoltCardinality.java
index e10924a7..c6ec58cf 100644
--- a/base-buildpackage/src/test/java/com/hzya/frame/jolt/TestJoltCardinality.java
+++ b/base-buildpackage/src/test/java/com/hzya/frame/jolt/TestJoltCardinality.java
@@ -48,4 +48,16 @@ public class TestJoltCardinality {
Object transform = chainr.transform(input);
System.out.println(JsonUtils.toJsonString(transform));
}
+
+ @Test
+ public void testDemo3() {
+ //输入json
+ Object input = JsonUtils.filepathToObject(jsonFileRootRoute + "/cardinality/input3.json");
+ //spec
+ Object spec = JsonUtils.filepathToObject(jsonFileRootRoute + "/cardinality/spec3.json");
+
+ Chainr chainr = Chainr.fromSpec(spec);
+ Object transform = chainr.transform(input);
+ System.out.println(JsonUtils.toJsonString(transform));
+ }
}
\ No newline at end of file
diff --git a/base-buildpackage/src/test/java/com/hzya/frame/jolt/TestJoltShift.java b/base-buildpackage/src/test/java/com/hzya/frame/jolt/TestJoltShift.java
index 2f20e331..e40e10de 100644
--- a/base-buildpackage/src/test/java/com/hzya/frame/jolt/TestJoltShift.java
+++ b/base-buildpackage/src/test/java/com/hzya/frame/jolt/TestJoltShift.java
@@ -215,4 +215,17 @@ public class TestJoltShift {
Object transform = chainr.transform(input);
System.out.println(JsonUtils.toJsonString(transform));
}
+
+ /**
+ * 字段名替换
+ */
+ @Test
+ public void testDemo16() {
+ Object input = JsonUtils.filepathToObject(jsonFileRootRoute + "/shift/input16.json");
+ Object spec = JsonUtils.filepathToObject(jsonFileRootRoute + "/shift/spec16.json");
+
+ Chainr chainr = Chainr.fromSpec(spec);
+ Object transform = chainr.transform(input);
+ System.out.println(JsonUtils.toJsonString(transform));
+ }
}
\ No newline at end of file
diff --git a/base-buildpackage/src/test/java/com/hzya/frame/vo/UserVo.java b/base-buildpackage/src/test/java/com/hzya/frame/vo/UserVo.java
new file mode 100644
index 00000000..0e24a0d3
--- /dev/null
+++ b/base-buildpackage/src/test/java/com/hzya/frame/vo/UserVo.java
@@ -0,0 +1,16 @@
+package com.hzya.frame.vo;
+
+import lombok.Data;
+
+/**
+ * @Author:liuyang
+ * @Package:com.hzya.frame.vo
+ * @Project:fw-nifi
+ * @name:User
+ * @Date:2025/7/18 15:46
+ * @Filename:User
+ */
+@Data
+public class UserVo {
+ private String name;
+}
diff --git a/jolt-demo/cardinality/input3.json b/jolt-demo/cardinality/input3.json
new file mode 100644
index 00000000..f8dfb03c
--- /dev/null
+++ b/jolt-demo/cardinality/input3.json
@@ -0,0 +1,57 @@
+{
+ "voucher": [
+ {
+ "pk_corp": "1",
+ "pk_corp_name":"广脉科技股份有限公司",
+ "pk_glorgbook": "1-0003",
+ "pk_glorgbook_id": "0001A1100000000001M9",
+ "pk_prepared": "tbadmin",
+ "pk_prepared_id": "TB_NEW100000000004OP",
+ "pk_vouchertype": "记账",
+ "prepareddate": "2025-07-04",
+ "details": [
+ {
+ "explanation": "转资申报单-1017252021000007-ZJ-ZY-YD-201808-018-横塘村西",
+ "pk_accsubj": "160105",
+ "pk_accsubj_id":"0001A1100000000008TZ",
+ "pk_currtype": "CNY",
+ "pk_currtype_id": "00010000000000000001",
+ "debitamount": "1.00",
+ "localdebitamount": "1.00"
+ },
+ {
+ "explanation": "转资申报单-1017252021000007-ZJ-ZY-YD-201808-018-横塘村西",
+ "pk_accsubj": "160106",
+ "pk_accsubj_id":"0001A1100000000008U0",
+ "pk_currtype": "CNY",
+ "pk_currtype_id": "00010000000000000001",
+ "debitamount": "1.00",
+ "localdebitamount": "1.00"
+ },
+ {
+ "explanation": "转资申报单-1017252021000007-ZJ-ZY-YD-201808-018-横塘村西",
+ "pk_accsubj": "16040501",
+ "pk_accsubj_id":"0001A2100000000CCIRN",
+ "pk_currtype": "CNY",
+ "pk_currtype_id": "00010000000000000001",
+ "creditamount": "2.00",
+ "localcreditamount": "2.00",
+ "ass": [
+ {
+ "checktypecode": "D0002",
+ "checkvaluecode": "25"
+ },
+ {
+ "checktypecode": "jobass",
+ "checkvaluecode": "02010100858"
+ },
+ {
+ "checktypecode": "73",
+ "checkvaluecode": "1100032021000083"
+ }
+ ]
+ }
+ ]
+ }
+ ]
+}
\ No newline at end of file
diff --git a/jolt-demo/cardinality/spec3.json b/jolt-demo/cardinality/spec3.json
new file mode 100644
index 00000000..75fa7c50
--- /dev/null
+++ b/jolt-demo/cardinality/spec3.json
@@ -0,0 +1,33 @@
+[
+ {
+ "operation": "shift",
+ "spec": {
+ "voucher": {
+ "*": {
+ "pk_corp": "voucher[&1].orgcorp",
+ "pk_glorgbook": "voucher[&1].pk_glorgbook",
+ "pk_prepared": "voucher[&1].pk_prepared",
+ "pk_vouchertype": "voucher[&1].pk_vouchertype",
+ "prepareddate": "voucher[&1].prepareddate",
+ "details": {
+ "*": {
+ "explanation": "voucher[&3].details[&1].explanation",
+ "pk_accsubj": "voucher[&3].details[&1].pk_accsubj",
+ "pk_currtype": "voucher[&3].details[&1].pk_currtype",
+ "debitamount": "voucher[&3].details[&1].debitamount",
+ "localdebitamount": "voucher[&3].details[&1].localdebitamount",
+ "creditamount": "voucher[&3].details[&1].creditamount",
+ "localcreditamount": "voucher[&3].details[&1].localcreditamount",
+ "ass": {
+ "*": {
+ "checktypecode": "voucher[&5].details[&3].ass[&1].checktypecode",
+ "checkvaluecode": "voucher[&5].details[&3].ass[&1].checkvaluecode"
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+]
\ No newline at end of file
diff --git a/jolt-demo/shift/input16.json b/jolt-demo/shift/input16.json
new file mode 100644
index 00000000..6066f0fa
--- /dev/null
+++ b/jolt-demo/shift/input16.json
@@ -0,0 +1,44 @@
+{
+ "ALRCONFINCOMNY": "0",
+ "BCOOPTOPO": "N",
+ "BFREECUSTFLAG": "N",
+ "BINITFLAG": "N",
+ "BINVOICENDFLAG": "N",
+ "BOUTENDFLAG": "N",
+ "BOVERDATE": "N",
+ "BPOCOOPTOME": "N",
+ "BRECEIPTENDFLAG": "Y",
+ "BRETINVFLAG": "Y",
+ "BTRANSENDFLAG": "N",
+ "CBIZTYPE": "0001A110000000000E7S",
+ "CCALBODYID": "1001A11000000000006C",
+ "CCUSTOMERID": "0001A110000000001SXQ",
+ "CDEPTID": "1001A11000000000006L",
+ "COPERATORID": "0001A110000000000X6H",
+ "CRECEIPTCORPID": "0001A110000000001SXQ",
+ "CRECEIPTCUSTOMERID": "0001A110000000001SXQ",
+ "CRECEIPTTYPE": "30",
+ "CSALECORPID": "0001A110000000000DXH",
+ "CSALEID": "1001A1100000000005KT",
+ "DBILLDATE": "2024-09-11",
+ "DBILLTIME": "2024-09-11 15:37:49",
+ "DMAKEDATE": "2024-09-11",
+ "DMODITIME": "2024-09-11 15:38:01",
+ "DR": "1",
+ "EDITIONNUM": "1.0",
+ "FSTATUS": "5",
+ "MODIFIER": "0001A110000000000X6H",
+ "NDISCOUNTRATE": "1",
+ "NEVALUATECARRIAGE": "0",
+ "NHEADSUMMNY": "-11.3",
+ "NTOTALNUM": "-1",
+ "PK_CORP": "1001",
+ "PK_DEFDOC1": "0001A110000000000Z3L",
+ "PK_DEFDOC3": "0001A110000000000E75",
+ "TRANSACTIONPRICE": "-10",
+ "TS": "2024-10-15 15:30:02",
+ "VDEF1": "速卖通-海外",
+ "VDEF3": "销售出库",
+ "VRECEIPTCODE": "SO2409110001",
+ "RN": "1"
+}
\ No newline at end of file
diff --git a/jolt-demo/shift/spec16.json b/jolt-demo/shift/spec16.json
new file mode 100644
index 00000000..79f3fbe2
--- /dev/null
+++ b/jolt-demo/shift/spec16.json
@@ -0,0 +1,18 @@
+[
+ {
+ "operation": "shift",
+ "spec": {
+ "*": {
+ "clientCode": "[&1].bbbbbb",
+ "internalInstructionType": "[&1].aaa",
+ "test01": {
+ "*": {
+ "test03": "[&3].test01[&1].test04",
+ "*": "[&3].test01[&1].&"
+ }
+ },
+ "*": "[&1].&"
+ }
+ }
+ }
+]
\ No newline at end of file