diff --git a/nifi-hzyadev-bundle/hzya-nifi-AutoJsonTableCreate-nar/pom.xml b/nifi-hzyadev-bundle/hzya-nifi-AutoJsonTableCreate-nar/pom.xml new file mode 100644 index 0000000..e82856f --- /dev/null +++ b/nifi-hzyadev-bundle/hzya-nifi-AutoJsonTableCreate-nar/pom.xml @@ -0,0 +1,34 @@ + + + + nifi-hzyadev-bundle + com.hzya + 1.0 + + 4.0.0 + + hzya-nifi-AutoJsonTableCreate-nar + nar + + + 8 + 8 + true + + + + + com.hzya + hzya-nifi-AutoJsonTableCreate-processors + 1.0 + + + org.apache.nifi + nifi-standard-services-api-nar + ${nifi-revision} + nar + + + \ No newline at end of file diff --git a/nifi-hzyadev-bundle/hzya-nifi-AutoJsonTableCreate-processors/pom.xml b/nifi-hzyadev-bundle/hzya-nifi-AutoJsonTableCreate-processors/pom.xml new file mode 100644 index 0000000..34601fc --- /dev/null +++ b/nifi-hzyadev-bundle/hzya-nifi-AutoJsonTableCreate-processors/pom.xml @@ -0,0 +1,50 @@ + + + + nifi-hzyadev-bundle + com.hzya + 1.0 + + 4.0.0 + + hzya-nifi-AutoJsonTableCreate-processors + + + 8 + 8 + + + + + org.apache.nifi + nifi-api + ${nifi-revision} + + + org.apache.nifi + nifi-dbcp-service-api + ${nifi-revision} + + + + org.apache.nifi + nifi-processor-utils + 1.15.3 + + + org.apache.nifi + nifi-mock + ${nifi-revision} + test + + + junit + junit + 4.13 + test + + + + \ No newline at end of file diff --git a/nifi-hzyadev-bundle/hzya-nifi-AutoJsonTableCreate-processors/src/main/java/com/hzya/frame/DevAutoJsonTableCreateProcessor.java b/nifi-hzyadev-bundle/hzya-nifi-AutoJsonTableCreate-processors/src/main/java/com/hzya/frame/DevAutoJsonTableCreateProcessor.java new file mode 100644 index 0000000..2c71e5a --- /dev/null +++ b/nifi-hzyadev-bundle/hzya-nifi-AutoJsonTableCreate-processors/src/main/java/com/hzya/frame/DevAutoJsonTableCreateProcessor.java @@ -0,0 +1,342 @@ +package com.hzya.frame; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.*; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.logging.ComponentLog; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.*; +import java.util.stream.Collectors; + +/** + * @Author:liuyang + * @Package:com.hzya.frame + * @Project:nifi-hzyadev-bundle + * @name:DevAutoJsonTableCreateProcessor + * @Date:2025/7/10 17:24 + * @Filename:DevAutoJsonTableCreateProcessor + */ +@Tags({"sql", "ddl", "json", "database", "schema", "ensure", "create", "mysql", "oracle", "sqlserver"}) +@CapabilityDescription("连接到指定的数据库,并确保目标表存在。如果表不存在,则根据FlowFile的JSON内容生成并执行CREATE TABLE语句。" + "如果表已存在,则不执行任何操作。成功或跳过时,将原始FlowFile路由到success关系。") +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@ReadsAttributes({@ReadsAttribute(attribute = "targetInsertTableName", description = "要检查或创建的目标表的名称。"), @ReadsAttribute(attribute = "jsonPrimaryKeyTag", description = "一个JSON数组字符串,指定主键字段,例如 '[\"id\", \"user_name\"]'。")}) +@WritesAttributes({@WritesAttribute(attribute = "ddl.sql.executed", description = "如果执行了CREATE TABLE语句,该语句将被写入此属性用于审计。"), @WritesAttribute(attribute = "ddl.database.type", description = "检测到的数据库类型 (mysql, sqlserver, or oracle)。")}) +public class DevAutoJsonTableCreateProcessor extends AbstractProcessor { + // --- 属性定义 (Properties) --- + public static final PropertyDescriptor DBCP_SERVICE = new PropertyDescriptor.Builder().name("Database Connection Pooling Service").displayName("数据库连接池服务").description("用于连接数据库、检查和创建表的DBCPConnectionPool控制器服务。").identifiesControllerService(DBCPService.class).required(true).build(); + + // --- 关系定义 (Relationships) --- + public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("当表已存在或成功创建时,原始FlowFile将被路由到此关系。").build(); + + public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("在处理过程中发生任何错误(如数据库连接失败、SQL执行失败)时,FlowFile将被路由到此关系。").build(); + + private Set relationships; + private List descriptors; + // ObjectMapper 仍然用于解析FlowFile的JSON内容 + private final ObjectMapper objectMapper = new ObjectMapper(); + + @Override + protected void init(final ProcessorInitializationContext context) { + final List descriptors = new ArrayList<>(); + descriptors.add(DBCP_SERVICE); + this.descriptors = Collections.unmodifiableList(descriptors); + + final Set relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_FAILURE); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + public Set getRelationships() { + return this.relationships; + } + + @Override + public final List getSupportedPropertyDescriptors() { + return descriptors; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final ComponentLog logger = getLogger(); + + // --- 1. 获取并验证 FlowFile 属性 --- + final String tableName = flowFile.getAttribute("targetInsertTableName"); + if (tableName == null || tableName.trim().isEmpty()) { + logger.error("属性 'targetInsertTableName' 缺失或为空,FlowFile {} 将被路由到 failure。", flowFile); + session.transfer(session.penalize(flowFile), REL_FAILURE); + return; + } + + final String autoTableCreation = flowFile.getAttribute("autoTableCreation"); + if (autoTableCreation == null || autoTableCreation.trim().isEmpty()) { + logger.error("属性 'autoTableCreation' 缺失或为空,FlowFile {} 将被路由到 failure。", flowFile); + session.transfer(session.penalize(flowFile), REL_FAILURE); + return; + } + if ("N".equals(autoTableCreation)) { + logger.info("autoTableCreation=N(取消自动建表)。跳过创建步骤,直接将 FlowFile {} 路由到 success。", tableName, flowFile); + session.transfer(flowFile, REL_SUCCESS); + return; + } + + final String jsonPrimaryKeyTag = flowFile.getAttribute("jsonPrimaryKeyTag"); + final List primaryKeys; + try { + // *** 代码改造点: 使用自定义的解析方法,不再依赖外部库解析此属性 *** + primaryKeys = parseJsonArrayString(jsonPrimaryKeyTag); + if (primaryKeys.isEmpty()) { + logger.warn("属性 'jsonPrimaryKeyTag' 为空或解析后无内容,将创建没有主键的表。FlowFile: {}", flowFile); + } + } catch (Exception e) { + logger.error("解析 'jsonPrimaryKeyTag' 失败: '{}'. 错误: {}. FlowFile {} 将被路由到 failure。", jsonPrimaryKeyTag, e.getMessage(), flowFile); + session.transfer(session.penalize(flowFile), REL_FAILURE); + return; + } + + final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class); + + // --- 2. 连接数据库、检查并执行 --- + try (final Connection conn = dbcpService.getConnection(flowFile.getAttributes())) { + final String dbType = getDbType(conn); + + if (tableExists(conn, tableName, dbType)) { + logger.info("表 '{}' 已存在。跳过创建步骤,直接将 FlowFile {} 路由到 success。", tableName, flowFile); + session.transfer(flowFile, REL_SUCCESS); + return; + } + + logger.info("表 '{}' 不存在。准备根据 FlowFile {} 的内容生成并执行 DDL。", tableName, flowFile); + + final Holder sqlHolder = new Holder<>(); + session.read(flowFile, in -> { + JsonNode rootNode = objectMapper.readTree(in); + JsonNode targetNode = rootNode.isArray() && rootNode.size() > 0 ? rootNode.get(0) : rootNode; + + if (!targetNode.isObject()) throw new IOException("JSON 内容不是一个有效的对象结构。"); + + Map columnDefinitions = new LinkedHashMap<>(); + List foundPrimaryKeys = new ArrayList<>(); + Iterator> fields = targetNode.fields(); + while (fields.hasNext()) { + Map.Entry field = fields.next(); + String fieldName = field.getKey().replaceAll("\\s+", ""); + JsonNode valueNode = field.getValue(); + int length = (valueNode == null || valueNode.isNull() || valueNode.asText().isEmpty()) ? 10 : Math.max(10, valueNode.asText().length() * 10); + if ("oracle".equals(dbType)) length = Math.min(length, 4000); + + columnDefinitions.put(fieldName, getColumnTypeForDB(dbType, length)); + // 检查JSON字段名是否在主键列表中 + if (primaryKeys.contains(fieldName.toLowerCase())) { + foundPrimaryKeys.add(fieldName); + } + } + + if (columnDefinitions.isEmpty()) throw new IOException("从JSON中未能解析出任何字段。"); + + sqlHolder.set(generateCreateTableSql(dbType, tableName, columnDefinitions, foundPrimaryKeys)); + }); + + String generatedSql = sqlHolder.get(); + if (generatedSql == null || generatedSql.isEmpty()) { + logger.error("生成的 DDL 为空,无法执行。FlowFile {} 将被路由到 failure。", flowFile); + session.transfer(session.penalize(flowFile), REL_FAILURE); + return; + } + + try (final Statement statement = conn.createStatement()) { + statement.execute(generatedSql); + logger.info("成功执行 DDL 创建了表 '{}'。SQL: {}", tableName, generatedSql); + flowFile = session.putAttribute(flowFile, "ddl.sql.executed", generatedSql); + flowFile = session.putAttribute(flowFile, "ddl.database.type", dbType); + session.transfer(flowFile, REL_SUCCESS); + } catch (final SQLException e) { + logger.error("执行 DDL 失败。SQL: [{}]. FlowFile {} 将被路由到 failure。", generatedSql, flowFile, e); + session.transfer(session.penalize(flowFile), REL_FAILURE); + } + + } catch (Exception e) { + logger.error("在确保表 '{}' 存在时发生未知错误。FlowFile {} 将被路由到 failure。", tableName, flowFile, e); + session.transfer(session.penalize(flowFile), REL_FAILURE); + } + } + + /** + * *** 新增方法 *** + * 手动解析类JSON数组格式的字符串,不依赖任何外部库。 + * 例如,将 '["key1", "key2"]' 解析为 List。 + * + * @param jsonArrayStr 包含主键的字符串 + * @return 解析后的主键列表 + */ + private List parseJsonArrayString(String jsonArrayStr) { + if (jsonArrayStr == null || jsonArrayStr.trim().isEmpty()) { + return Collections.emptyList(); + } + + String trimmed = jsonArrayStr.trim(); + if (!trimmed.startsWith("[") || !trimmed.endsWith("]")) { + throw new IllegalArgumentException("无效的格式:字符串必须以'['开头并以']'结尾。"); + } + + // 处理空数组 "[]" 的情况 + if (trimmed.length() <= 2) { + return Collections.emptyList(); + } + + // 移除首尾的方括号 + String content = trimmed.substring(1, trimmed.length() - 1).trim(); + if (content.isEmpty()) { + return Collections.emptyList(); + } + + List keys = new ArrayList<>(); + String[] parts = content.split(","); + + for (String part : parts) { + String cleanedPart = part.trim(); + // 移除首尾可能存在的引号 + if (cleanedPart.startsWith("\"") && cleanedPart.endsWith("\"")) { + if (cleanedPart.length() > 1) { + cleanedPart = cleanedPart.substring(1, cleanedPart.length() - 1); + } else { + cleanedPart = ""; // 处理只有引号的情况 "" + } + } + if (!cleanedPart.isEmpty()) { + //转换为大小,用于后续对比 + keys.add(cleanedPart.toLowerCase(Locale.ROOT)); + } + } + return keys; + } + + + private String getDbType(final Connection conn) throws SQLException { + String dbProductName = conn.getMetaData().getDatabaseProductName().toLowerCase(); + if (dbProductName.contains("mysql")) return "mysql"; + if (dbProductName.contains("microsoft sql server")) return "sqlserver"; + if (dbProductName.contains("oracle")) return "oracle"; + throw new ProcessException("不支持的数据库类型: " + dbProductName); + } + + private boolean tableExists(final Connection conn, final String tableName, final String dbType) throws SQLException { + final DatabaseMetaData metaData = conn.getMetaData(); + String namePattern = "oracle".equals(dbType) ? tableName.toUpperCase() : tableName; + try (ResultSet rs = metaData.getTables(conn.getCatalog(), conn.getSchema(), namePattern, new String[]{"TABLE"})) { + return rs.next(); + } + } + + private String getColumnTypeForDB(String dbType, int length) { + switch (dbType) { + case "mysql": + return "VARCHAR(" + length + ")"; + case "sqlserver": + return "VARCHAR(" + length + ")"; + case "oracle": + return "VARCHAR2(" + length + " BYTE)"; + default: + return "VARCHAR(" + length + ")"; + } + } + + private String generateCreateTableSql(String dbType, String tableName, Map columnDefinitions, List primaryKeys) { + String quotedTableName; + String quoteChar; + String pkQuoteChar; + String identifierCase = "none"; + + switch (dbType) { + case "mysql": + quotedTableName = getQuotedIdentifier(tableName, "`"); + quoteChar = "`"; + pkQuoteChar = "`"; + break; + case "sqlserver": + quotedTableName = getQuotedIdentifier(tableName, "["); + quoteChar = "["; + pkQuoteChar = "["; + break; + case "oracle": + quotedTableName = getQuotedIdentifier(tableName.toUpperCase(), "\""); + quoteChar = "\""; + pkQuoteChar = "\""; + identifierCase = "upper"; + break; + default: + throw new ProcessException("不支持的数据库类型: " + dbType); + } + + final String finalIdentifierCase = identifierCase; + String columnsSql = columnDefinitions.entrySet().stream().map(entry -> { + String fieldName = "upper".equals(finalIdentifierCase) ? entry.getKey().toUpperCase() : entry.getKey(); + // 注意:这里比较时需要统一大小写,因为primaryKeys列表是原始大小写 + String originalFieldName = entry.getKey(); + String notNull = (primaryKeys != null && primaryKeys.contains(originalFieldName)) ? " NOT NULL" : ""; + return " " + getQuotedIdentifier(fieldName, quoteChar) + " " + entry.getValue() + notNull; + }).collect(Collectors.joining(",\n")); + + StringBuilder sql = new StringBuilder("CREATE TABLE ").append(quotedTableName).append(" (\n").append(columnsSql); + + if (primaryKeys != null && !primaryKeys.isEmpty()) { + sql.append(",\n"); + String pkColumns = primaryKeys.stream().map(pk -> "upper".equals(finalIdentifierCase) ? pk.toUpperCase() : pk).map(pk -> getQuotedIdentifier(pk, pkQuoteChar)).collect(Collectors.joining(", ")); + + String constraintName = "PK_" + tableName.replaceAll("[^a-zA-Z0-9_]", "_"); + if (constraintName.length() > 30) { + constraintName = constraintName.substring(0, 30); + } + if ("oracle".equals(dbType)) { + constraintName = constraintName.toUpperCase(); + } + sql.append(" CONSTRAINT ").append(getQuotedIdentifier(constraintName, pkQuoteChar)).append(" PRIMARY KEY (").append(pkColumns).append(")"); + } + + sql.append("\n)"); + return sql.toString(); + } + + private String getQuotedIdentifier(String identifier, String quoteChar) { + String endQuoteChar = quoteChar; + if ("[".equals(quoteChar)) endQuoteChar = "]"; + return quoteChar + identifier.replace(endQuoteChar, endQuoteChar + endQuoteChar) + endQuoteChar; + } + + private static class Holder { + private T value; + + void set(T value) { + this.value = value; + } + + T get() { + return value; + } + } +} \ No newline at end of file diff --git a/nifi-hzyadev-bundle/hzya-nifi-AutoJsonTableCreate-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-hzyadev-bundle/hzya-nifi-AutoJsonTableCreate-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor new file mode 100644 index 0000000..e9f3ad9 --- /dev/null +++ b/nifi-hzyadev-bundle/hzya-nifi-AutoJsonTableCreate-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -0,0 +1,15 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +com.hzya.frame.DevAutoJsonTableCreateProcessor \ No newline at end of file diff --git a/nifi-hzyadev-bundle/hzya-nifi-DevGeneratePaginatedSqlProcessor-processors/src/main/java/com/hzya/frame/DevGeneratePaginatedSqlProcessor.java b/nifi-hzyadev-bundle/hzya-nifi-DevGeneratePaginatedSqlProcessor-processors/src/main/java/com/hzya/frame/DevGeneratePaginatedSqlProcessor.java index 8c410fe..33b0a91 100644 --- a/nifi-hzyadev-bundle/hzya-nifi-DevGeneratePaginatedSqlProcessor-processors/src/main/java/com/hzya/frame/DevGeneratePaginatedSqlProcessor.java +++ b/nifi-hzyadev-bundle/hzya-nifi-DevGeneratePaginatedSqlProcessor-processors/src/main/java/com/hzya/frame/DevGeneratePaginatedSqlProcessor.java @@ -116,20 +116,20 @@ public class DevGeneratePaginatedSqlProcessor extends AbstractProcessor { String dbType = conn.getMetaData().getDatabaseProductName().toLowerCase(); //增量同步 - String whereClause = null; + String whereClause = ""; if (oldMaximumValue != null && !oldMaximumValue.isEmpty()) { whereClause = " WHERE " + maxValColumn + " > '" + oldMaximumValue + "'"; } // 执行 COUNT 和 MAX 查询 - String countMaxSql = null; + String countMaxSql = ""; if ("oracle".equals(dbType) || "mysql".equals(dbType)) { countMaxSql = "SELECT COUNT(*) AS rowNumber, MAX(" + maxValColumn + ") AS maximumValue FROM (" + baseSql + ") t " + whereClause; } else if ("microsoft sql server".equals(dbType)) { StringBuffer montageSql = new StringBuffer(); montageSql.append("WITH SortedData AS ("); montageSql.append(baseSql); - if (whereClause != null) { + if (!"".equals(whereClause)) { if (baseSql.toUpperCase().contains("WHERE")) { //替换为and whereClause = whereClause.replace("WHERE", "AND"); @@ -178,7 +178,7 @@ public class DevGeneratePaginatedSqlProcessor extends AbstractProcessor { long endRow = Math.min((long) (page + 1) * pageSize, rowNumber); //全量同步 - if (whereClause == null) { + if ("".equals(whereClause)) { whereClause = " WHERE " + maxValColumn + " <= '" + queriedMaximumValue + "'"; } @@ -189,7 +189,7 @@ public class DevGeneratePaginatedSqlProcessor extends AbstractProcessor { long offset = startRow - 1; paginatedSql = "select * from (" + baseSql + ")" + whereClause + " LIMIT " + pageSize + " OFFSET " + offset; } else if ("microsoft sql server".equals(dbType)) { - + whereClause = whereClause.replace("WHERE", ""); whereClause = whereClause.replace("AND", ""); diff --git a/nifi-hzyadev-bundle/pom.xml b/nifi-hzyadev-bundle/pom.xml index 7fc529e..f94fdda 100644 --- a/nifi-hzyadev-bundle/pom.xml +++ b/nifi-hzyadev-bundle/pom.xml @@ -11,6 +11,8 @@ hzya-nifi-DevGeneratePaginatedSqlProcessor-nar hzya-nifi-DevGeneratePaginatedSqlProcessor-processors + hzya-nifi-AutoJsonTableCreate-nar + hzya-nifi-AutoJsonTableCreate-processors