From 10bafb2b5088c86d23e79ebb26708f633719a7c0 Mon Sep 17 00:00:00 2001
From: liuy <37787198+LiuyCodes@users.noreply.github.com>
Date: Thu, 10 Jul 2025 19:14:40 +0800
Subject: [PATCH] =?UTF-8?q?feat(nifi):=20=E6=B7=BB=E5=8A=A0=E8=87=AA?=
=?UTF-8?q?=E5=8A=A8=E5=BB=BA=E8=A1=A8=E5=A4=84=E7=90=86=E5=99=A8?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
- 新增 DevAutoJsonTableCreateProcessor 处理器,用于自动创建数据库表
- 实现了连接数据库、检查表存在性、生成建表 SQL 并执行的功能
- 支持 MySQL、SQL Server 和 Oracle 数据库- 添加了自定义 JSON 数组解析方法
- 优化了表名和字段名的处理逻辑,支持不同数据库的特殊要求
---
.../hzya-nifi-AutoJsonTableCreate-nar/pom.xml | 34 ++
.../pom.xml | 50 +++
.../DevAutoJsonTableCreateProcessor.java | 342 ++++++++++++++++++
.../org.apache.nifi.processor.Processor | 15 +
.../DevGeneratePaginatedSqlProcessor.java | 10 +-
nifi-hzyadev-bundle/pom.xml | 2 +
6 files changed, 448 insertions(+), 5 deletions(-)
create mode 100644 nifi-hzyadev-bundle/hzya-nifi-AutoJsonTableCreate-nar/pom.xml
create mode 100644 nifi-hzyadev-bundle/hzya-nifi-AutoJsonTableCreate-processors/pom.xml
create mode 100644 nifi-hzyadev-bundle/hzya-nifi-AutoJsonTableCreate-processors/src/main/java/com/hzya/frame/DevAutoJsonTableCreateProcessor.java
create mode 100644 nifi-hzyadev-bundle/hzya-nifi-AutoJsonTableCreate-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
diff --git a/nifi-hzyadev-bundle/hzya-nifi-AutoJsonTableCreate-nar/pom.xml b/nifi-hzyadev-bundle/hzya-nifi-AutoJsonTableCreate-nar/pom.xml
new file mode 100644
index 0000000..e82856f
--- /dev/null
+++ b/nifi-hzyadev-bundle/hzya-nifi-AutoJsonTableCreate-nar/pom.xml
@@ -0,0 +1,34 @@
+
+
+
+ nifi-hzyadev-bundle
+ com.hzya
+ 1.0
+
+ 4.0.0
+
+ hzya-nifi-AutoJsonTableCreate-nar
+ nar
+
+
+ 8
+ 8
+ true
+
+
+
+
+ com.hzya
+ hzya-nifi-AutoJsonTableCreate-processors
+ 1.0
+
+
+ org.apache.nifi
+ nifi-standard-services-api-nar
+ ${nifi-revision}
+ nar
+
+
+
\ No newline at end of file
diff --git a/nifi-hzyadev-bundle/hzya-nifi-AutoJsonTableCreate-processors/pom.xml b/nifi-hzyadev-bundle/hzya-nifi-AutoJsonTableCreate-processors/pom.xml
new file mode 100644
index 0000000..34601fc
--- /dev/null
+++ b/nifi-hzyadev-bundle/hzya-nifi-AutoJsonTableCreate-processors/pom.xml
@@ -0,0 +1,50 @@
+
+
+
+ nifi-hzyadev-bundle
+ com.hzya
+ 1.0
+
+ 4.0.0
+
+ hzya-nifi-AutoJsonTableCreate-processors
+
+
+ 8
+ 8
+
+
+
+
+ org.apache.nifi
+ nifi-api
+ ${nifi-revision}
+
+
+ org.apache.nifi
+ nifi-dbcp-service-api
+ ${nifi-revision}
+
+
+
+ org.apache.nifi
+ nifi-processor-utils
+ 1.15.3
+
+
+ org.apache.nifi
+ nifi-mock
+ ${nifi-revision}
+ test
+
+
+ junit
+ junit
+ 4.13
+ test
+
+
+
+
\ No newline at end of file
diff --git a/nifi-hzyadev-bundle/hzya-nifi-AutoJsonTableCreate-processors/src/main/java/com/hzya/frame/DevAutoJsonTableCreateProcessor.java b/nifi-hzyadev-bundle/hzya-nifi-AutoJsonTableCreate-processors/src/main/java/com/hzya/frame/DevAutoJsonTableCreateProcessor.java
new file mode 100644
index 0000000..2c71e5a
--- /dev/null
+++ b/nifi-hzyadev-bundle/hzya-nifi-AutoJsonTableCreate-processors/src/main/java/com/hzya/frame/DevAutoJsonTableCreateProcessor.java
@@ -0,0 +1,342 @@
+package com.hzya.frame;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.dbcp.DBCPService;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.*;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.logging.ComponentLog;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.*;
+import java.util.stream.Collectors;
+
+/**
+ * @Author:liuyang
+ * @Package:com.hzya.frame
+ * @Project:nifi-hzyadev-bundle
+ * @name:DevAutoJsonTableCreateProcessor
+ * @Date:2025/7/10 17:24
+ * @Filename:DevAutoJsonTableCreateProcessor
+ */
+@Tags({"sql", "ddl", "json", "database", "schema", "ensure", "create", "mysql", "oracle", "sqlserver"})
+@CapabilityDescription("连接到指定的数据库,并确保目标表存在。如果表不存在,则根据FlowFile的JSON内容生成并执行CREATE TABLE语句。" + "如果表已存在,则不执行任何操作。成功或跳过时,将原始FlowFile路由到success关系。")
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@ReadsAttributes({@ReadsAttribute(attribute = "targetInsertTableName", description = "要检查或创建的目标表的名称。"), @ReadsAttribute(attribute = "jsonPrimaryKeyTag", description = "一个JSON数组字符串,指定主键字段,例如 '[\"id\", \"user_name\"]'。")})
+@WritesAttributes({@WritesAttribute(attribute = "ddl.sql.executed", description = "如果执行了CREATE TABLE语句,该语句将被写入此属性用于审计。"), @WritesAttribute(attribute = "ddl.database.type", description = "检测到的数据库类型 (mysql, sqlserver, or oracle)。")})
+public class DevAutoJsonTableCreateProcessor extends AbstractProcessor {
+ // --- 属性定义 (Properties) ---
+ public static final PropertyDescriptor DBCP_SERVICE = new PropertyDescriptor.Builder().name("Database Connection Pooling Service").displayName("数据库连接池服务").description("用于连接数据库、检查和创建表的DBCPConnectionPool控制器服务。").identifiesControllerService(DBCPService.class).required(true).build();
+
+ // --- 关系定义 (Relationships) ---
+ public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("当表已存在或成功创建时,原始FlowFile将被路由到此关系。").build();
+
+ public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("在处理过程中发生任何错误(如数据库连接失败、SQL执行失败)时,FlowFile将被路由到此关系。").build();
+
+ private Set relationships;
+ private List descriptors;
+ // ObjectMapper 仍然用于解析FlowFile的JSON内容
+ private final ObjectMapper objectMapper = new ObjectMapper();
+
+ @Override
+ protected void init(final ProcessorInitializationContext context) {
+ final List descriptors = new ArrayList<>();
+ descriptors.add(DBCP_SERVICE);
+ this.descriptors = Collections.unmodifiableList(descriptors);
+
+ final Set relationships = new HashSet<>();
+ relationships.add(REL_SUCCESS);
+ relationships.add(REL_FAILURE);
+ this.relationships = Collections.unmodifiableSet(relationships);
+ }
+
+ @Override
+ public Set getRelationships() {
+ return this.relationships;
+ }
+
+ @Override
+ public final List getSupportedPropertyDescriptors() {
+ return descriptors;
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ final ComponentLog logger = getLogger();
+
+ // --- 1. 获取并验证 FlowFile 属性 ---
+ final String tableName = flowFile.getAttribute("targetInsertTableName");
+ if (tableName == null || tableName.trim().isEmpty()) {
+ logger.error("属性 'targetInsertTableName' 缺失或为空,FlowFile {} 将被路由到 failure。", flowFile);
+ session.transfer(session.penalize(flowFile), REL_FAILURE);
+ return;
+ }
+
+ final String autoTableCreation = flowFile.getAttribute("autoTableCreation");
+ if (autoTableCreation == null || autoTableCreation.trim().isEmpty()) {
+ logger.error("属性 'autoTableCreation' 缺失或为空,FlowFile {} 将被路由到 failure。", flowFile);
+ session.transfer(session.penalize(flowFile), REL_FAILURE);
+ return;
+ }
+ if ("N".equals(autoTableCreation)) {
+ logger.info("autoTableCreation=N(取消自动建表)。跳过创建步骤,直接将 FlowFile {} 路由到 success。", tableName, flowFile);
+ session.transfer(flowFile, REL_SUCCESS);
+ return;
+ }
+
+ final String jsonPrimaryKeyTag = flowFile.getAttribute("jsonPrimaryKeyTag");
+ final List primaryKeys;
+ try {
+ // *** 代码改造点: 使用自定义的解析方法,不再依赖外部库解析此属性 ***
+ primaryKeys = parseJsonArrayString(jsonPrimaryKeyTag);
+ if (primaryKeys.isEmpty()) {
+ logger.warn("属性 'jsonPrimaryKeyTag' 为空或解析后无内容,将创建没有主键的表。FlowFile: {}", flowFile);
+ }
+ } catch (Exception e) {
+ logger.error("解析 'jsonPrimaryKeyTag' 失败: '{}'. 错误: {}. FlowFile {} 将被路由到 failure。", jsonPrimaryKeyTag, e.getMessage(), flowFile);
+ session.transfer(session.penalize(flowFile), REL_FAILURE);
+ return;
+ }
+
+ final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
+
+ // --- 2. 连接数据库、检查并执行 ---
+ try (final Connection conn = dbcpService.getConnection(flowFile.getAttributes())) {
+ final String dbType = getDbType(conn);
+
+ if (tableExists(conn, tableName, dbType)) {
+ logger.info("表 '{}' 已存在。跳过创建步骤,直接将 FlowFile {} 路由到 success。", tableName, flowFile);
+ session.transfer(flowFile, REL_SUCCESS);
+ return;
+ }
+
+ logger.info("表 '{}' 不存在。准备根据 FlowFile {} 的内容生成并执行 DDL。", tableName, flowFile);
+
+ final Holder sqlHolder = new Holder<>();
+ session.read(flowFile, in -> {
+ JsonNode rootNode = objectMapper.readTree(in);
+ JsonNode targetNode = rootNode.isArray() && rootNode.size() > 0 ? rootNode.get(0) : rootNode;
+
+ if (!targetNode.isObject()) throw new IOException("JSON 内容不是一个有效的对象结构。");
+
+ Map columnDefinitions = new LinkedHashMap<>();
+ List foundPrimaryKeys = new ArrayList<>();
+ Iterator> fields = targetNode.fields();
+ while (fields.hasNext()) {
+ Map.Entry field = fields.next();
+ String fieldName = field.getKey().replaceAll("\\s+", "");
+ JsonNode valueNode = field.getValue();
+ int length = (valueNode == null || valueNode.isNull() || valueNode.asText().isEmpty()) ? 10 : Math.max(10, valueNode.asText().length() * 10);
+ if ("oracle".equals(dbType)) length = Math.min(length, 4000);
+
+ columnDefinitions.put(fieldName, getColumnTypeForDB(dbType, length));
+ // 检查JSON字段名是否在主键列表中
+ if (primaryKeys.contains(fieldName.toLowerCase())) {
+ foundPrimaryKeys.add(fieldName);
+ }
+ }
+
+ if (columnDefinitions.isEmpty()) throw new IOException("从JSON中未能解析出任何字段。");
+
+ sqlHolder.set(generateCreateTableSql(dbType, tableName, columnDefinitions, foundPrimaryKeys));
+ });
+
+ String generatedSql = sqlHolder.get();
+ if (generatedSql == null || generatedSql.isEmpty()) {
+ logger.error("生成的 DDL 为空,无法执行。FlowFile {} 将被路由到 failure。", flowFile);
+ session.transfer(session.penalize(flowFile), REL_FAILURE);
+ return;
+ }
+
+ try (final Statement statement = conn.createStatement()) {
+ statement.execute(generatedSql);
+ logger.info("成功执行 DDL 创建了表 '{}'。SQL: {}", tableName, generatedSql);
+ flowFile = session.putAttribute(flowFile, "ddl.sql.executed", generatedSql);
+ flowFile = session.putAttribute(flowFile, "ddl.database.type", dbType);
+ session.transfer(flowFile, REL_SUCCESS);
+ } catch (final SQLException e) {
+ logger.error("执行 DDL 失败。SQL: [{}]. FlowFile {} 将被路由到 failure。", generatedSql, flowFile, e);
+ session.transfer(session.penalize(flowFile), REL_FAILURE);
+ }
+
+ } catch (Exception e) {
+ logger.error("在确保表 '{}' 存在时发生未知错误。FlowFile {} 将被路由到 failure。", tableName, flowFile, e);
+ session.transfer(session.penalize(flowFile), REL_FAILURE);
+ }
+ }
+
+ /**
+ * *** 新增方法 ***
+ * 手动解析类JSON数组格式的字符串,不依赖任何外部库。
+ * 例如,将 '["key1", "key2"]' 解析为 List。
+ *
+ * @param jsonArrayStr 包含主键的字符串
+ * @return 解析后的主键列表
+ */
+ private List parseJsonArrayString(String jsonArrayStr) {
+ if (jsonArrayStr == null || jsonArrayStr.trim().isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ String trimmed = jsonArrayStr.trim();
+ if (!trimmed.startsWith("[") || !trimmed.endsWith("]")) {
+ throw new IllegalArgumentException("无效的格式:字符串必须以'['开头并以']'结尾。");
+ }
+
+ // 处理空数组 "[]" 的情况
+ if (trimmed.length() <= 2) {
+ return Collections.emptyList();
+ }
+
+ // 移除首尾的方括号
+ String content = trimmed.substring(1, trimmed.length() - 1).trim();
+ if (content.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ List keys = new ArrayList<>();
+ String[] parts = content.split(",");
+
+ for (String part : parts) {
+ String cleanedPart = part.trim();
+ // 移除首尾可能存在的引号
+ if (cleanedPart.startsWith("\"") && cleanedPart.endsWith("\"")) {
+ if (cleanedPart.length() > 1) {
+ cleanedPart = cleanedPart.substring(1, cleanedPart.length() - 1);
+ } else {
+ cleanedPart = ""; // 处理只有引号的情况 ""
+ }
+ }
+ if (!cleanedPart.isEmpty()) {
+ //转换为大小,用于后续对比
+ keys.add(cleanedPart.toLowerCase(Locale.ROOT));
+ }
+ }
+ return keys;
+ }
+
+
+ private String getDbType(final Connection conn) throws SQLException {
+ String dbProductName = conn.getMetaData().getDatabaseProductName().toLowerCase();
+ if (dbProductName.contains("mysql")) return "mysql";
+ if (dbProductName.contains("microsoft sql server")) return "sqlserver";
+ if (dbProductName.contains("oracle")) return "oracle";
+ throw new ProcessException("不支持的数据库类型: " + dbProductName);
+ }
+
+ private boolean tableExists(final Connection conn, final String tableName, final String dbType) throws SQLException {
+ final DatabaseMetaData metaData = conn.getMetaData();
+ String namePattern = "oracle".equals(dbType) ? tableName.toUpperCase() : tableName;
+ try (ResultSet rs = metaData.getTables(conn.getCatalog(), conn.getSchema(), namePattern, new String[]{"TABLE"})) {
+ return rs.next();
+ }
+ }
+
+ private String getColumnTypeForDB(String dbType, int length) {
+ switch (dbType) {
+ case "mysql":
+ return "VARCHAR(" + length + ")";
+ case "sqlserver":
+ return "VARCHAR(" + length + ")";
+ case "oracle":
+ return "VARCHAR2(" + length + " BYTE)";
+ default:
+ return "VARCHAR(" + length + ")";
+ }
+ }
+
+ private String generateCreateTableSql(String dbType, String tableName, Map columnDefinitions, List primaryKeys) {
+ String quotedTableName;
+ String quoteChar;
+ String pkQuoteChar;
+ String identifierCase = "none";
+
+ switch (dbType) {
+ case "mysql":
+ quotedTableName = getQuotedIdentifier(tableName, "`");
+ quoteChar = "`";
+ pkQuoteChar = "`";
+ break;
+ case "sqlserver":
+ quotedTableName = getQuotedIdentifier(tableName, "[");
+ quoteChar = "[";
+ pkQuoteChar = "[";
+ break;
+ case "oracle":
+ quotedTableName = getQuotedIdentifier(tableName.toUpperCase(), "\"");
+ quoteChar = "\"";
+ pkQuoteChar = "\"";
+ identifierCase = "upper";
+ break;
+ default:
+ throw new ProcessException("不支持的数据库类型: " + dbType);
+ }
+
+ final String finalIdentifierCase = identifierCase;
+ String columnsSql = columnDefinitions.entrySet().stream().map(entry -> {
+ String fieldName = "upper".equals(finalIdentifierCase) ? entry.getKey().toUpperCase() : entry.getKey();
+ // 注意:这里比较时需要统一大小写,因为primaryKeys列表是原始大小写
+ String originalFieldName = entry.getKey();
+ String notNull = (primaryKeys != null && primaryKeys.contains(originalFieldName)) ? " NOT NULL" : "";
+ return " " + getQuotedIdentifier(fieldName, quoteChar) + " " + entry.getValue() + notNull;
+ }).collect(Collectors.joining(",\n"));
+
+ StringBuilder sql = new StringBuilder("CREATE TABLE ").append(quotedTableName).append(" (\n").append(columnsSql);
+
+ if (primaryKeys != null && !primaryKeys.isEmpty()) {
+ sql.append(",\n");
+ String pkColumns = primaryKeys.stream().map(pk -> "upper".equals(finalIdentifierCase) ? pk.toUpperCase() : pk).map(pk -> getQuotedIdentifier(pk, pkQuoteChar)).collect(Collectors.joining(", "));
+
+ String constraintName = "PK_" + tableName.replaceAll("[^a-zA-Z0-9_]", "_");
+ if (constraintName.length() > 30) {
+ constraintName = constraintName.substring(0, 30);
+ }
+ if ("oracle".equals(dbType)) {
+ constraintName = constraintName.toUpperCase();
+ }
+ sql.append(" CONSTRAINT ").append(getQuotedIdentifier(constraintName, pkQuoteChar)).append(" PRIMARY KEY (").append(pkColumns).append(")");
+ }
+
+ sql.append("\n)");
+ return sql.toString();
+ }
+
+ private String getQuotedIdentifier(String identifier, String quoteChar) {
+ String endQuoteChar = quoteChar;
+ if ("[".equals(quoteChar)) endQuoteChar = "]";
+ return quoteChar + identifier.replace(endQuoteChar, endQuoteChar + endQuoteChar) + endQuoteChar;
+ }
+
+ private static class Holder {
+ private T value;
+
+ void set(T value) {
+ this.value = value;
+ }
+
+ T get() {
+ return value;
+ }
+ }
+}
\ No newline at end of file
diff --git a/nifi-hzyadev-bundle/hzya-nifi-AutoJsonTableCreate-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-hzyadev-bundle/hzya-nifi-AutoJsonTableCreate-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
new file mode 100644
index 0000000..e9f3ad9
--- /dev/null
+++ b/nifi-hzyadev-bundle/hzya-nifi-AutoJsonTableCreate-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -0,0 +1,15 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+com.hzya.frame.DevAutoJsonTableCreateProcessor
\ No newline at end of file
diff --git a/nifi-hzyadev-bundle/hzya-nifi-DevGeneratePaginatedSqlProcessor-processors/src/main/java/com/hzya/frame/DevGeneratePaginatedSqlProcessor.java b/nifi-hzyadev-bundle/hzya-nifi-DevGeneratePaginatedSqlProcessor-processors/src/main/java/com/hzya/frame/DevGeneratePaginatedSqlProcessor.java
index 8c410fe..33b0a91 100644
--- a/nifi-hzyadev-bundle/hzya-nifi-DevGeneratePaginatedSqlProcessor-processors/src/main/java/com/hzya/frame/DevGeneratePaginatedSqlProcessor.java
+++ b/nifi-hzyadev-bundle/hzya-nifi-DevGeneratePaginatedSqlProcessor-processors/src/main/java/com/hzya/frame/DevGeneratePaginatedSqlProcessor.java
@@ -116,20 +116,20 @@ public class DevGeneratePaginatedSqlProcessor extends AbstractProcessor {
String dbType = conn.getMetaData().getDatabaseProductName().toLowerCase();
//增量同步
- String whereClause = null;
+ String whereClause = "";
if (oldMaximumValue != null && !oldMaximumValue.isEmpty()) {
whereClause = " WHERE " + maxValColumn + " > '" + oldMaximumValue + "'";
}
// 执行 COUNT 和 MAX 查询
- String countMaxSql = null;
+ String countMaxSql = "";
if ("oracle".equals(dbType) || "mysql".equals(dbType)) {
countMaxSql = "SELECT COUNT(*) AS rowNumber, MAX(" + maxValColumn + ") AS maximumValue FROM (" + baseSql + ") t " + whereClause;
} else if ("microsoft sql server".equals(dbType)) {
StringBuffer montageSql = new StringBuffer();
montageSql.append("WITH SortedData AS (");
montageSql.append(baseSql);
- if (whereClause != null) {
+ if (!"".equals(whereClause)) {
if (baseSql.toUpperCase().contains("WHERE")) {
//替换为and
whereClause = whereClause.replace("WHERE", "AND");
@@ -178,7 +178,7 @@ public class DevGeneratePaginatedSqlProcessor extends AbstractProcessor {
long endRow = Math.min((long) (page + 1) * pageSize, rowNumber);
//全量同步
- if (whereClause == null) {
+ if ("".equals(whereClause)) {
whereClause = " WHERE " + maxValColumn + " <= '" + queriedMaximumValue + "'";
}
@@ -189,7 +189,7 @@ public class DevGeneratePaginatedSqlProcessor extends AbstractProcessor {
long offset = startRow - 1;
paginatedSql = "select * from (" + baseSql + ")" + whereClause + " LIMIT " + pageSize + " OFFSET " + offset;
} else if ("microsoft sql server".equals(dbType)) {
-
+
whereClause = whereClause.replace("WHERE", "");
whereClause = whereClause.replace("AND", "");
diff --git a/nifi-hzyadev-bundle/pom.xml b/nifi-hzyadev-bundle/pom.xml
index 7fc529e..f94fdda 100644
--- a/nifi-hzyadev-bundle/pom.xml
+++ b/nifi-hzyadev-bundle/pom.xml
@@ -11,6 +11,8 @@
hzya-nifi-DevGeneratePaginatedSqlProcessor-nar
hzya-nifi-DevGeneratePaginatedSqlProcessor-processors
+ hzya-nifi-AutoJsonTableCreate-nar
+ hzya-nifi-AutoJsonTableCreate-processors