From 9e05f88ce7564b3862aa8537479e0302e8f12eda Mon Sep 17 00:00:00 2001
From: liuy <37787198+LiuyCodes@users.noreply.github.com>
Date: Thu, 10 Jul 2025 17:05:41 +0800
Subject: [PATCH] =?UTF-8?q?feat(DevGeneratePaginatedSqlProcessor):=20?=
=?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=AF=B9=20Microsoft=20SQL=20Server=20?=
=?UTF-8?q?=E7=9A=84=E6=94=AF=E6=8C=81?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
- 新增针对 SQL Server 的分页查询和最大值查询逻辑
- 优化全量同步和增量同步的 SQL生成逻辑
- 增加 SQL 解析和条件插入功能,以支持复杂查询
- 更新依赖,增加 jsqlparser 用于 SQL 解析
---
.../pom.xml | 8 +-
.../DevGeneratePaginatedSqlProcessor.java | 106 ++++++++++++++++--
2 files changed, 101 insertions(+), 13 deletions(-)
diff --git a/nifi-hzyadev-bundle/hzya-nifi-DevGeneratePaginatedSqlProcessor-processors/pom.xml b/nifi-hzyadev-bundle/hzya-nifi-DevGeneratePaginatedSqlProcessor-processors/pom.xml
index 8ff9160..a2e7581 100644
--- a/nifi-hzyadev-bundle/hzya-nifi-DevGeneratePaginatedSqlProcessor-processors/pom.xml
+++ b/nifi-hzyadev-bundle/hzya-nifi-DevGeneratePaginatedSqlProcessor-processors/pom.xml
@@ -30,7 +30,7 @@
org.apache.nifi
nifi-dbcp-service-api
${nifi-revision}
- provided
+
org.apache.nifi
@@ -49,5 +49,11 @@
4.13
test
+
+
+ com.github.jsqlparser
+ jsqlparser
+ 4.6
+
\ No newline at end of file
diff --git a/nifi-hzyadev-bundle/hzya-nifi-DevGeneratePaginatedSqlProcessor-processors/src/main/java/com/hzya/frame/DevGeneratePaginatedSqlProcessor.java b/nifi-hzyadev-bundle/hzya-nifi-DevGeneratePaginatedSqlProcessor-processors/src/main/java/com/hzya/frame/DevGeneratePaginatedSqlProcessor.java
index 6fef77c..8c410fe 100644
--- a/nifi-hzyadev-bundle/hzya-nifi-DevGeneratePaginatedSqlProcessor-processors/src/main/java/com/hzya/frame/DevGeneratePaginatedSqlProcessor.java
+++ b/nifi-hzyadev-bundle/hzya-nifi-DevGeneratePaginatedSqlProcessor-processors/src/main/java/com/hzya/frame/DevGeneratePaginatedSqlProcessor.java
@@ -1,5 +1,9 @@
package com.hzya.frame;
+import net.sf.jsqlparser.expression.Expression;
+import net.sf.jsqlparser.parser.CCJSqlParserUtil;
+import net.sf.jsqlparser.statement.select.PlainSelect;
+import net.sf.jsqlparser.statement.select.Select;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.Stateful;
@@ -109,14 +113,45 @@ public class DevGeneratePaginatedSqlProcessor extends AbstractProcessor {
long rowNumber = 0L;
try (Connection conn = dbcpService.getConnection(); Statement stmt = conn.createStatement()) {
- // 构建 WHERE 条件
+ String dbType = conn.getMetaData().getDatabaseProductName().toLowerCase();
+
+ //增量同步
String whereClause = null;
if (oldMaximumValue != null && !oldMaximumValue.isEmpty()) {
whereClause = " WHERE " + maxValColumn + " > '" + oldMaximumValue + "'";
}
// 执行 COUNT 和 MAX 查询
- String countMaxSql = "SELECT COUNT(*) AS rowNumber, MAX(" + maxValColumn + ") AS maximumValue FROM (" + baseSql + ") t" + whereClause;
+ String countMaxSql = null;
+ if ("oracle".equals(dbType) || "mysql".equals(dbType)) {
+ countMaxSql = "SELECT COUNT(*) AS rowNumber, MAX(" + maxValColumn + ") AS maximumValue FROM (" + baseSql + ") t " + whereClause;
+ } else if ("microsoft sql server".equals(dbType)) {
+ StringBuffer montageSql = new StringBuffer();
+ montageSql.append("WITH SortedData AS (");
+ montageSql.append(baseSql);
+ if (whereClause != null) {
+ if (baseSql.toUpperCase().contains("WHERE")) {
+ //替换为and
+ whereClause = whereClause.replace("WHERE", "AND");
+ }
+ montageSql.append(whereClause);
+ }
+ if (baseSql.toUpperCase().contains("ORDER BY")) {
+ //确保排序生效
+ montageSql.append(" OFFSET 0 ROWS");
+ }
+ montageSql.append(")");
+ montageSql.append("\n");
+ montageSql.append("SELECT");
+ montageSql.append(" COUNT(*) AS rowNumber,");
+ montageSql.append(" MAX(ts) AS maximumValue");
+ montageSql.append(" FROM SortedData");
+
+ countMaxSql = montageSql.toString();
+ } else {
+ throw new ProcessException("不支持的数据库类型: " + dbType);
+ }
+
getLogger().info("正在执行计数/最大值查询: {}", countMaxSql);
ResultSet rs = stmt.executeQuery(countMaxSql);
@@ -137,29 +172,37 @@ public class DevGeneratePaginatedSqlProcessor extends AbstractProcessor {
// 生成分页 SQL
long totalPages = (rowNumber + pageSize - 1) / pageSize;
- String dbType = conn.getMetaData().getDatabaseProductName().toLowerCase();
for (int page = 0; page < totalPages; page++) {
long startRow = (long) page * pageSize + 1;
long endRow = Math.min((long) (page + 1) * pageSize, rowNumber);
- //全量同步阶段,确定全量数据范围
+ //全量同步
if (whereClause == null) {
whereClause = " WHERE " + maxValColumn + " <= '" + queriedMaximumValue + "'";
}
String paginatedSql;
if ("oracle".equals(dbType)) {
- paginatedSql = "SELECT * FROM (\n"
- + " SELECT inner_query.*, ROWNUM rn FROM (\n"
- + " "
- + "select * from (" + baseSql + ")" + whereClause + "\n"
- + " ) inner_query WHERE ROWNUM <= " + endRow + "\n" + ") WHERE rn >= " + startRow;
+ paginatedSql = "SELECT * FROM (\n" + " SELECT inner_query.*, ROWNUM rn FROM (\n" + " " + "select * from (" + baseSql + ")" + whereClause + "\n" + " ) inner_query WHERE ROWNUM <= " + endRow + "\n" + ") WHERE rn >= " + startRow;
} else if ("mysql".equals(dbType)) {
long offset = startRow - 1;
paginatedSql = "select * from (" + baseSql + ")" + whereClause + " LIMIT " + pageSize + " OFFSET " + offset;
- } else if ("sqlserver".equals(dbType)) {
- paginatedSql = "select * from (" + baseSql + ")" + whereClause + " OFFSET " + (startRow - 1) + " ROWS FETCH NEXT " + pageSize + " ROWS ONLY";
+ } else if ("microsoft sql server".equals(dbType)) {
+
+ whereClause = whereClause.replace("WHERE", "");
+ whereClause = whereClause.replace("AND", "");
+
+ StringBuffer splicingSql = new StringBuffer();
+ splicingSql.append(baseSql);
+
+ splicingSql.append(" OFFSET ");
+ splicingSql.append((startRow - 1));
+ splicingSql.append(" ROWS FETCH NEXT ");
+ splicingSql.append(pageSize);
+ splicingSql.append(" ROWS ONLY");
+
+ paginatedSql = insertWhereClause(splicingSql.toString(), whereClause);
} else {
throw new ProcessException("不支持的数据库类型: " + dbType);
}
@@ -183,7 +226,6 @@ public class DevGeneratePaginatedSqlProcessor extends AbstractProcessor {
// 删除原始 FlowFile
session.remove(flowFile);
-
} catch (SQLException e) {
getLogger().error("由于{},无法执行SQL查询;故障路由", e.getMessage(), e);
session.transfer(flowFile, REL_FAILURE);
@@ -193,4 +235,44 @@ public class DevGeneratePaginatedSqlProcessor extends AbstractProcessor {
session.transfer(flowFile, REL_FAILURE);
}
}
+
+ public static String insertWhereClause(String originalSql, String additionalCondition) throws Exception {
+ // 解析SQL
+ net.sf.jsqlparser.statement.Statement statement = CCJSqlParserUtil.parse(originalSql);
+ if (!(statement instanceof Select)) {
+ throw new IllegalArgumentException("仅支持SELECT语句");
+ }
+
+ Select select = (Select) statement;
+ PlainSelect plainSelect = (PlainSelect) select.getSelectBody();
+
+ // 构建新WHERE条件
+ Expression newWhere = buildNewWhere(plainSelect.getWhere(), additionalCondition);
+
+ // 更新WHERE条件
+ plainSelect.setWhere(newWhere);
+
+ // 重新生成SQL字符串
+ return select.toString();
+ }
+
+ private static Expression buildNewWhere(Expression existingWhere, String additionalCondition) throws Exception {
+ if (existingWhere == null) {
+ // 无原始WHERE,直接使用新条件
+ return CCJSqlParserUtil.parseCondExpression(additionalCondition);
+ }
+
+ // 合并新旧条件 (AND连接)
+ Expression newCondition = CCJSqlParserUtil.parseCondExpression(additionalCondition);
+ return CCJSqlParserUtil.parseCondExpression(existingWhere + " AND " + newCondition);
+ }
+
+// public static void main(String[] args) throws Exception {
+// String originalSql = "SELECT * FROM so_sale ORDER BY ts, csaleid ASC OFFSET 0 ROWS FETCH NEXT 10 ROWS ONLY";
+// String additionalCondition = "status = 'ACTIVE' AND amount > 100";
+//
+// String modifiedSql = insertWhereClause(originalSql, additionalCondition);
+// System.out.println("Modified SQL:\n" + modifiedSql);
+// }
+
}
\ No newline at end of file