fix(DevGeneratePaginatedSqlProcessor): 修复全量同步阶段数据查询逻辑

- 修改 whereClause初始值为 null,优化逻辑判断
- 在全量同步阶段添加数据范围限制条件
-针对不同数据库类型调整分页 SQL 语句格式
This commit is contained in:
liuy 2025-07-10 10:27:51 +08:00
parent 496c69864a
commit bb4d2f4de3
2 changed files with 13 additions and 4 deletions

View File

@ -110,7 +110,7 @@ public class DevGeneratePaginatedSqlProcessor extends AbstractProcessor {
try (Connection conn = dbcpService.getConnection(); Statement stmt = conn.createStatement()) { try (Connection conn = dbcpService.getConnection(); Statement stmt = conn.createStatement()) {
// 构建 WHERE 条件 // 构建 WHERE 条件
String whereClause = ""; String whereClause = null;
if (oldMaximumValue != null && !oldMaximumValue.isEmpty()) { if (oldMaximumValue != null && !oldMaximumValue.isEmpty()) {
whereClause = " WHERE " + maxValColumn + " > '" + oldMaximumValue + "'"; whereClause = " WHERE " + maxValColumn + " > '" + oldMaximumValue + "'";
} }
@ -143,14 +143,23 @@ public class DevGeneratePaginatedSqlProcessor extends AbstractProcessor {
long startRow = (long) page * pageSize + 1; long startRow = (long) page * pageSize + 1;
long endRow = Math.min((long) (page + 1) * pageSize, rowNumber); long endRow = Math.min((long) (page + 1) * pageSize, rowNumber);
//全量同步阶段确定全量数据范围
if (whereClause == null) {
whereClause = " WHERE " + maxValColumn + " <= '" + queriedMaximumValue + "'";
}
String paginatedSql; String paginatedSql;
if ("oracle".equals(dbType)) { if ("oracle".equals(dbType)) {
paginatedSql = "SELECT * FROM (\n" + " SELECT inner_query.*, ROWNUM rn FROM (\n" + " " + baseSql + whereClause + "\n" + " ) inner_query WHERE ROWNUM <= " + endRow + "\n" + ") WHERE rn >= " + startRow; paginatedSql = "SELECT * FROM (\n"
+ " SELECT inner_query.*, ROWNUM rn FROM (\n"
+ " "
+ "select * from (" + baseSql + ")" + whereClause + "\n"
+ " ) inner_query WHERE ROWNUM <= " + endRow + "\n" + ") WHERE rn >= " + startRow;
} else if ("mysql".equals(dbType)) { } else if ("mysql".equals(dbType)) {
long offset = startRow - 1; long offset = startRow - 1;
paginatedSql = baseSql + whereClause + " LIMIT " + pageSize + " OFFSET " + offset; paginatedSql = "select * from (" + baseSql + ")" + whereClause + " LIMIT " + pageSize + " OFFSET " + offset;
} else if ("sqlserver".equals(dbType)) { } else if ("sqlserver".equals(dbType)) {
paginatedSql = baseSql + whereClause + " OFFSET " + (startRow - 1) + " ROWS FETCH NEXT " + pageSize + " ROWS ONLY"; paginatedSql = "select * from (" + baseSql + ")" + whereClause + " OFFSET " + (startRow - 1) + " ROWS FETCH NEXT " + pageSize + " ROWS ONLY";
} else { } else {
throw new ProcessException("不支持的数据库类型: " + dbType); throw new ProcessException("不支持的数据库类型: " + dbType);
} }