From bb4d2f4de3a4c20754d84ed455e791c869ea5795 Mon Sep 17 00:00:00 2001 From: liuy <37787198+LiuyCodes@users.noreply.github.com> Date: Thu, 10 Jul 2025 10:27:51 +0800 Subject: [PATCH] =?UTF-8?q?fix(DevGeneratePaginatedSqlProcessor):=20?= =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E5=85=A8=E9=87=8F=E5=90=8C=E6=AD=A5=E9=98=B6?= =?UTF-8?q?=E6=AE=B5=E6=95=B0=E6=8D=AE=E6=9F=A5=E8=AF=A2=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 修改 whereClause初始值为 null,优化逻辑判断 - 在全量同步阶段添加数据范围限制条件 -针对不同数据库类型调整分页 SQL 语句格式 --- .../frame/DevGeneratePaginatedSqlProcessor.java | 17 +++++++++++++---- .../org.apache.nifi.processor.Processor | 0 2 files changed, 13 insertions(+), 4 deletions(-) rename nifi-hzyadev-bundle/hzya-nifi-DevGeneratePaginatedSqlProcessor-processors/src/main/resources/{ => META-INF}/services/org.apache.nifi.processor.Processor (100%) diff --git a/nifi-hzyadev-bundle/hzya-nifi-DevGeneratePaginatedSqlProcessor-processors/src/main/java/com/hzya/frame/DevGeneratePaginatedSqlProcessor.java b/nifi-hzyadev-bundle/hzya-nifi-DevGeneratePaginatedSqlProcessor-processors/src/main/java/com/hzya/frame/DevGeneratePaginatedSqlProcessor.java index a5ce030..6fef77c 100644 --- a/nifi-hzyadev-bundle/hzya-nifi-DevGeneratePaginatedSqlProcessor-processors/src/main/java/com/hzya/frame/DevGeneratePaginatedSqlProcessor.java +++ b/nifi-hzyadev-bundle/hzya-nifi-DevGeneratePaginatedSqlProcessor-processors/src/main/java/com/hzya/frame/DevGeneratePaginatedSqlProcessor.java @@ -110,7 +110,7 @@ public class DevGeneratePaginatedSqlProcessor extends AbstractProcessor { try (Connection conn = dbcpService.getConnection(); Statement stmt = conn.createStatement()) { // 构建 WHERE 条件 - String whereClause = ""; + String whereClause = null; if (oldMaximumValue != null && !oldMaximumValue.isEmpty()) { whereClause = " WHERE " + maxValColumn + " > '" + oldMaximumValue + "'"; } @@ -143,14 +143,23 @@ public class DevGeneratePaginatedSqlProcessor extends AbstractProcessor { long startRow = (long) page * pageSize + 1; long endRow = Math.min((long) (page + 1) * pageSize, rowNumber); + //全量同步阶段,确定全量数据范围 + if (whereClause == null) { + whereClause = " WHERE " + maxValColumn + " <= '" + queriedMaximumValue + "'"; + } + String paginatedSql; if ("oracle".equals(dbType)) { - paginatedSql = "SELECT * FROM (\n" + " SELECT inner_query.*, ROWNUM rn FROM (\n" + " " + baseSql + whereClause + "\n" + " ) inner_query WHERE ROWNUM <= " + endRow + "\n" + ") WHERE rn >= " + startRow; + paginatedSql = "SELECT * FROM (\n" + + " SELECT inner_query.*, ROWNUM rn FROM (\n" + + " " + + "select * from (" + baseSql + ")" + whereClause + "\n" + + " ) inner_query WHERE ROWNUM <= " + endRow + "\n" + ") WHERE rn >= " + startRow; } else if ("mysql".equals(dbType)) { long offset = startRow - 1; - paginatedSql = baseSql + whereClause + " LIMIT " + pageSize + " OFFSET " + offset; + paginatedSql = "select * from (" + baseSql + ")" + whereClause + " LIMIT " + pageSize + " OFFSET " + offset; } else if ("sqlserver".equals(dbType)) { - paginatedSql = baseSql + whereClause + " OFFSET " + (startRow - 1) + " ROWS FETCH NEXT " + pageSize + " ROWS ONLY"; + paginatedSql = "select * from (" + baseSql + ")" + whereClause + " OFFSET " + (startRow - 1) + " ROWS FETCH NEXT " + pageSize + " ROWS ONLY"; } else { throw new ProcessException("不支持的数据库类型: " + dbType); } diff --git a/nifi-hzyadev-bundle/hzya-nifi-DevGeneratePaginatedSqlProcessor-processors/src/main/resources/services/org.apache.nifi.processor.Processor b/nifi-hzyadev-bundle/hzya-nifi-DevGeneratePaginatedSqlProcessor-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor similarity index 100% rename from nifi-hzyadev-bundle/hzya-nifi-DevGeneratePaginatedSqlProcessor-processors/src/main/resources/services/org.apache.nifi.processor.Processor rename to nifi-hzyadev-bundle/hzya-nifi-DevGeneratePaginatedSqlProcessor-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor