feat(DevGeneratePaginatedSqlProcessor): 增加对 Microsoft SQL Server 的支持

- 新增针对 SQL Server 的分页查询和最大值查询逻辑
- 优化全量同步和增量同步的 SQL生成逻辑
- 增加 SQL 解析和条件插入功能,以支持复杂查询
- 更新依赖,增加 jsqlparser 用于 SQL 解析
This commit is contained in:
liuy 2025-07-10 17:05:41 +08:00
parent bb4d2f4de3
commit 9e05f88ce7
2 changed files with 101 additions and 13 deletions

View File

@ -30,7 +30,7 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-dbcp-service-api</artifactId>
<version>${nifi-revision}</version>
<scope>provided</scope>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
@ -49,5 +49,11 @@
<version>4.13</version>
<scope>test</scope>
</dependency>
<!--sql结构解析器-->
<dependency>
<groupId>com.github.jsqlparser</groupId>
<artifactId>jsqlparser</artifactId>
<version>4.6</version>
</dependency>
</dependencies>
</project>

View File

@ -1,5 +1,9 @@
package com.hzya.frame;
import net.sf.jsqlparser.expression.Expression;
import net.sf.jsqlparser.parser.CCJSqlParserUtil;
import net.sf.jsqlparser.statement.select.PlainSelect;
import net.sf.jsqlparser.statement.select.Select;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.Stateful;
@ -109,14 +113,45 @@ public class DevGeneratePaginatedSqlProcessor extends AbstractProcessor {
long rowNumber = 0L;
try (Connection conn = dbcpService.getConnection(); Statement stmt = conn.createStatement()) {
// 构建 WHERE 条件
String dbType = conn.getMetaData().getDatabaseProductName().toLowerCase();
//增量同步
String whereClause = null;
if (oldMaximumValue != null && !oldMaximumValue.isEmpty()) {
whereClause = " WHERE " + maxValColumn + " > '" + oldMaximumValue + "'";
}
// 执行 COUNT MAX 查询
String countMaxSql = "SELECT COUNT(*) AS rowNumber, MAX(" + maxValColumn + ") AS maximumValue FROM (" + baseSql + ") t" + whereClause;
String countMaxSql = null;
if ("oracle".equals(dbType) || "mysql".equals(dbType)) {
countMaxSql = "SELECT COUNT(*) AS rowNumber, MAX(" + maxValColumn + ") AS maximumValue FROM (" + baseSql + ") t " + whereClause;
} else if ("microsoft sql server".equals(dbType)) {
StringBuffer montageSql = new StringBuffer();
montageSql.append("WITH SortedData AS (");
montageSql.append(baseSql);
if (whereClause != null) {
if (baseSql.toUpperCase().contains("WHERE")) {
//替换为and
whereClause = whereClause.replace("WHERE", "AND");
}
montageSql.append(whereClause);
}
if (baseSql.toUpperCase().contains("ORDER BY")) {
//确保排序生效
montageSql.append(" OFFSET 0 ROWS");
}
montageSql.append(")");
montageSql.append("\n");
montageSql.append("SELECT");
montageSql.append(" COUNT(*) AS rowNumber,");
montageSql.append(" MAX(ts) AS maximumValue");
montageSql.append(" FROM SortedData");
countMaxSql = montageSql.toString();
} else {
throw new ProcessException("不支持的数据库类型: " + dbType);
}
getLogger().info("正在执行计数/最大值查询: {}", countMaxSql);
ResultSet rs = stmt.executeQuery(countMaxSql);
@ -137,29 +172,37 @@ public class DevGeneratePaginatedSqlProcessor extends AbstractProcessor {
// 生成分页 SQL
long totalPages = (rowNumber + pageSize - 1) / pageSize;
String dbType = conn.getMetaData().getDatabaseProductName().toLowerCase();
for (int page = 0; page < totalPages; page++) {
long startRow = (long) page * pageSize + 1;
long endRow = Math.min((long) (page + 1) * pageSize, rowNumber);
//全量同步阶段确定全量数据范围
//全量同步
if (whereClause == null) {
whereClause = " WHERE " + maxValColumn + " <= '" + queriedMaximumValue + "'";
}
String paginatedSql;
if ("oracle".equals(dbType)) {
paginatedSql = "SELECT * FROM (\n"
+ " SELECT inner_query.*, ROWNUM rn FROM (\n"
+ " "
+ "select * from (" + baseSql + ")" + whereClause + "\n"
+ " ) inner_query WHERE ROWNUM <= " + endRow + "\n" + ") WHERE rn >= " + startRow;
paginatedSql = "SELECT * FROM (\n" + " SELECT inner_query.*, ROWNUM rn FROM (\n" + " " + "select * from (" + baseSql + ")" + whereClause + "\n" + " ) inner_query WHERE ROWNUM <= " + endRow + "\n" + ") WHERE rn >= " + startRow;
} else if ("mysql".equals(dbType)) {
long offset = startRow - 1;
paginatedSql = "select * from (" + baseSql + ")" + whereClause + " LIMIT " + pageSize + " OFFSET " + offset;
} else if ("sqlserver".equals(dbType)) {
paginatedSql = "select * from (" + baseSql + ")" + whereClause + " OFFSET " + (startRow - 1) + " ROWS FETCH NEXT " + pageSize + " ROWS ONLY";
} else if ("microsoft sql server".equals(dbType)) {
whereClause = whereClause.replace("WHERE", "");
whereClause = whereClause.replace("AND", "");
StringBuffer splicingSql = new StringBuffer();
splicingSql.append(baseSql);
splicingSql.append(" OFFSET ");
splicingSql.append((startRow - 1));
splicingSql.append(" ROWS FETCH NEXT ");
splicingSql.append(pageSize);
splicingSql.append(" ROWS ONLY");
paginatedSql = insertWhereClause(splicingSql.toString(), whereClause);
} else {
throw new ProcessException("不支持的数据库类型: " + dbType);
}
@ -183,7 +226,6 @@ public class DevGeneratePaginatedSqlProcessor extends AbstractProcessor {
// 删除原始 FlowFile
session.remove(flowFile);
} catch (SQLException e) {
getLogger().error("由于{}无法执行SQL查询故障路由", e.getMessage(), e);
session.transfer(flowFile, REL_FAILURE);
@ -193,4 +235,44 @@ public class DevGeneratePaginatedSqlProcessor extends AbstractProcessor {
session.transfer(flowFile, REL_FAILURE);
}
}
public static String insertWhereClause(String originalSql, String additionalCondition) throws Exception {
// 解析SQL
net.sf.jsqlparser.statement.Statement statement = CCJSqlParserUtil.parse(originalSql);
if (!(statement instanceof Select)) {
throw new IllegalArgumentException("仅支持SELECT语句");
}
Select select = (Select) statement;
PlainSelect plainSelect = (PlainSelect) select.getSelectBody();
// 构建新WHERE条件
Expression newWhere = buildNewWhere(plainSelect.getWhere(), additionalCondition);
// 更新WHERE条件
plainSelect.setWhere(newWhere);
// 重新生成SQL字符串
return select.toString();
}
private static Expression buildNewWhere(Expression existingWhere, String additionalCondition) throws Exception {
if (existingWhere == null) {
// 无原始WHERE直接使用新条件
return CCJSqlParserUtil.parseCondExpression(additionalCondition);
}
// 合并新旧条件 (AND连接)
Expression newCondition = CCJSqlParserUtil.parseCondExpression(additionalCondition);
return CCJSqlParserUtil.parseCondExpression(existingWhere + " AND " + newCondition);
}
// public static void main(String[] args) throws Exception {
// String originalSql = "SELECT * FROM so_sale ORDER BY ts, csaleid ASC OFFSET 0 ROWS FETCH NEXT 10 ROWS ONLY";
// String additionalCondition = "status = 'ACTIVE' AND amount > 100";
//
// String modifiedSql = insertWhereClause(originalSql, additionalCondition);
// System.out.println("Modified SQL:\n" + modifiedSql);
// }
}