diff --git a/nifi-hzyadev-bundle/hzya-nifi-DevGeneratePaginatedSqlProcessor-processors/pom.xml b/nifi-hzyadev-bundle/hzya-nifi-DevGeneratePaginatedSqlProcessor-processors/pom.xml index 8ff9160..a2e7581 100644 --- a/nifi-hzyadev-bundle/hzya-nifi-DevGeneratePaginatedSqlProcessor-processors/pom.xml +++ b/nifi-hzyadev-bundle/hzya-nifi-DevGeneratePaginatedSqlProcessor-processors/pom.xml @@ -30,7 +30,7 @@ org.apache.nifi nifi-dbcp-service-api ${nifi-revision} - provided + org.apache.nifi @@ -49,5 +49,11 @@ 4.13 test + + + com.github.jsqlparser + jsqlparser + 4.6 + \ No newline at end of file diff --git a/nifi-hzyadev-bundle/hzya-nifi-DevGeneratePaginatedSqlProcessor-processors/src/main/java/com/hzya/frame/DevGeneratePaginatedSqlProcessor.java b/nifi-hzyadev-bundle/hzya-nifi-DevGeneratePaginatedSqlProcessor-processors/src/main/java/com/hzya/frame/DevGeneratePaginatedSqlProcessor.java index 6fef77c..8c410fe 100644 --- a/nifi-hzyadev-bundle/hzya-nifi-DevGeneratePaginatedSqlProcessor-processors/src/main/java/com/hzya/frame/DevGeneratePaginatedSqlProcessor.java +++ b/nifi-hzyadev-bundle/hzya-nifi-DevGeneratePaginatedSqlProcessor-processors/src/main/java/com/hzya/frame/DevGeneratePaginatedSqlProcessor.java @@ -1,5 +1,9 @@ package com.hzya.frame; +import net.sf.jsqlparser.expression.Expression; +import net.sf.jsqlparser.parser.CCJSqlParserUtil; +import net.sf.jsqlparser.statement.select.PlainSelect; +import net.sf.jsqlparser.statement.select.Select; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.Stateful; @@ -109,14 +113,45 @@ public class DevGeneratePaginatedSqlProcessor extends AbstractProcessor { long rowNumber = 0L; try (Connection conn = dbcpService.getConnection(); Statement stmt = conn.createStatement()) { - // 构建 WHERE 条件 + String dbType = conn.getMetaData().getDatabaseProductName().toLowerCase(); + + //增量同步 String whereClause = null; if (oldMaximumValue != null && !oldMaximumValue.isEmpty()) { whereClause = " WHERE " + maxValColumn + " > '" + oldMaximumValue + "'"; } // 执行 COUNT 和 MAX 查询 - String countMaxSql = "SELECT COUNT(*) AS rowNumber, MAX(" + maxValColumn + ") AS maximumValue FROM (" + baseSql + ") t" + whereClause; + String countMaxSql = null; + if ("oracle".equals(dbType) || "mysql".equals(dbType)) { + countMaxSql = "SELECT COUNT(*) AS rowNumber, MAX(" + maxValColumn + ") AS maximumValue FROM (" + baseSql + ") t " + whereClause; + } else if ("microsoft sql server".equals(dbType)) { + StringBuffer montageSql = new StringBuffer(); + montageSql.append("WITH SortedData AS ("); + montageSql.append(baseSql); + if (whereClause != null) { + if (baseSql.toUpperCase().contains("WHERE")) { + //替换为and + whereClause = whereClause.replace("WHERE", "AND"); + } + montageSql.append(whereClause); + } + if (baseSql.toUpperCase().contains("ORDER BY")) { + //确保排序生效 + montageSql.append(" OFFSET 0 ROWS"); + } + montageSql.append(")"); + montageSql.append("\n"); + montageSql.append("SELECT"); + montageSql.append(" COUNT(*) AS rowNumber,"); + montageSql.append(" MAX(ts) AS maximumValue"); + montageSql.append(" FROM SortedData"); + + countMaxSql = montageSql.toString(); + } else { + throw new ProcessException("不支持的数据库类型: " + dbType); + } + getLogger().info("正在执行计数/最大值查询: {}", countMaxSql); ResultSet rs = stmt.executeQuery(countMaxSql); @@ -137,29 +172,37 @@ public class DevGeneratePaginatedSqlProcessor extends AbstractProcessor { // 生成分页 SQL long totalPages = (rowNumber + pageSize - 1) / pageSize; - String dbType = conn.getMetaData().getDatabaseProductName().toLowerCase(); for (int page = 0; page < totalPages; page++) { long startRow = (long) page * pageSize + 1; long endRow = Math.min((long) (page + 1) * pageSize, rowNumber); - //全量同步阶段,确定全量数据范围 + //全量同步 if (whereClause == null) { whereClause = " WHERE " + maxValColumn + " <= '" + queriedMaximumValue + "'"; } String paginatedSql; if ("oracle".equals(dbType)) { - paginatedSql = "SELECT * FROM (\n" - + " SELECT inner_query.*, ROWNUM rn FROM (\n" - + " " - + "select * from (" + baseSql + ")" + whereClause + "\n" - + " ) inner_query WHERE ROWNUM <= " + endRow + "\n" + ") WHERE rn >= " + startRow; + paginatedSql = "SELECT * FROM (\n" + " SELECT inner_query.*, ROWNUM rn FROM (\n" + " " + "select * from (" + baseSql + ")" + whereClause + "\n" + " ) inner_query WHERE ROWNUM <= " + endRow + "\n" + ") WHERE rn >= " + startRow; } else if ("mysql".equals(dbType)) { long offset = startRow - 1; paginatedSql = "select * from (" + baseSql + ")" + whereClause + " LIMIT " + pageSize + " OFFSET " + offset; - } else if ("sqlserver".equals(dbType)) { - paginatedSql = "select * from (" + baseSql + ")" + whereClause + " OFFSET " + (startRow - 1) + " ROWS FETCH NEXT " + pageSize + " ROWS ONLY"; + } else if ("microsoft sql server".equals(dbType)) { + + whereClause = whereClause.replace("WHERE", ""); + whereClause = whereClause.replace("AND", ""); + + StringBuffer splicingSql = new StringBuffer(); + splicingSql.append(baseSql); + + splicingSql.append(" OFFSET "); + splicingSql.append((startRow - 1)); + splicingSql.append(" ROWS FETCH NEXT "); + splicingSql.append(pageSize); + splicingSql.append(" ROWS ONLY"); + + paginatedSql = insertWhereClause(splicingSql.toString(), whereClause); } else { throw new ProcessException("不支持的数据库类型: " + dbType); } @@ -183,7 +226,6 @@ public class DevGeneratePaginatedSqlProcessor extends AbstractProcessor { // 删除原始 FlowFile session.remove(flowFile); - } catch (SQLException e) { getLogger().error("由于{},无法执行SQL查询;故障路由", e.getMessage(), e); session.transfer(flowFile, REL_FAILURE); @@ -193,4 +235,44 @@ public class DevGeneratePaginatedSqlProcessor extends AbstractProcessor { session.transfer(flowFile, REL_FAILURE); } } + + public static String insertWhereClause(String originalSql, String additionalCondition) throws Exception { + // 解析SQL + net.sf.jsqlparser.statement.Statement statement = CCJSqlParserUtil.parse(originalSql); + if (!(statement instanceof Select)) { + throw new IllegalArgumentException("仅支持SELECT语句"); + } + + Select select = (Select) statement; + PlainSelect plainSelect = (PlainSelect) select.getSelectBody(); + + // 构建新WHERE条件 + Expression newWhere = buildNewWhere(plainSelect.getWhere(), additionalCondition); + + // 更新WHERE条件 + plainSelect.setWhere(newWhere); + + // 重新生成SQL字符串 + return select.toString(); + } + + private static Expression buildNewWhere(Expression existingWhere, String additionalCondition) throws Exception { + if (existingWhere == null) { + // 无原始WHERE,直接使用新条件 + return CCJSqlParserUtil.parseCondExpression(additionalCondition); + } + + // 合并新旧条件 (AND连接) + Expression newCondition = CCJSqlParserUtil.parseCondExpression(additionalCondition); + return CCJSqlParserUtil.parseCondExpression(existingWhere + " AND " + newCondition); + } + +// public static void main(String[] args) throws Exception { +// String originalSql = "SELECT * FROM so_sale ORDER BY ts, csaleid ASC OFFSET 0 ROWS FETCH NEXT 10 ROWS ONLY"; +// String additionalCondition = "status = 'ACTIVE' AND amount > 100"; +// +// String modifiedSql = insertWhereClause(originalSql, additionalCondition); +// System.out.println("Modified SQL:\n" + modifiedSql); +// } + } \ No newline at end of file