feat(nifi): 添加自动建表处理器

- 新增 DevAutoJsonTableCreateProcessor 处理器,用于自动创建数据库表
- 实现了连接数据库、检查表存在性、生成建表 SQL 并执行的功能
- 支持 MySQL、SQL Server 和 Oracle 数据库- 添加了自定义 JSON 数组解析方法
- 优化了表名和字段名的处理逻辑,支持不同数据库的特殊要求
This commit is contained in:
liuy 2025-07-10 19:14:40 +08:00
parent 9e05f88ce7
commit 10bafb2b50
6 changed files with 448 additions and 5 deletions

View File

@ -0,0 +1,34 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>nifi-hzyadev-bundle</artifactId>
<groupId>com.hzya</groupId>
<version>1.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>hzya-nifi-AutoJsonTableCreate-nar</artifactId>
<packaging>nar</packaging>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<source.skip>true</source.skip>
</properties>
<dependencies>
<dependency>
<groupId>com.hzya</groupId>
<artifactId>hzya-nifi-AutoJsonTableCreate-processors</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-services-api-nar</artifactId>
<version>${nifi-revision}</version>
<type>nar</type>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,50 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>nifi-hzyadev-bundle</artifactId>
<groupId>com.hzya</groupId>
<version>1.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>hzya-nifi-AutoJsonTableCreate-processors</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
<version>${nifi-revision}</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-dbcp-service-api</artifactId>
<version>${nifi-revision}</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId>
<version>1.15.3</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<version>${nifi-revision}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,342 @@
package com.hzya.frame;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.*;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.logging.ComponentLog;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.*;
import java.util.stream.Collectors;
/**
* @Authorliuyang
* @Packagecom.hzya.frame
* @Projectnifi-hzyadev-bundle
* @nameDevAutoJsonTableCreateProcessor
* @Date2025/7/10 17:24
* @FilenameDevAutoJsonTableCreateProcessor
*/
@Tags({"sql", "ddl", "json", "database", "schema", "ensure", "create", "mysql", "oracle", "sqlserver"})
@CapabilityDescription("连接到指定的数据库并确保目标表存在。如果表不存在则根据FlowFile的JSON内容生成并执行CREATE TABLE语句。" + "如果表已存在则不执行任何操作。成功或跳过时将原始FlowFile路由到success关系。")
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@ReadsAttributes({@ReadsAttribute(attribute = "targetInsertTableName", description = "要检查或创建的目标表的名称。"), @ReadsAttribute(attribute = "jsonPrimaryKeyTag", description = "一个JSON数组字符串指定主键字段例如 '[\"id\", \"user_name\"]'。")})
@WritesAttributes({@WritesAttribute(attribute = "ddl.sql.executed", description = "如果执行了CREATE TABLE语句该语句将被写入此属性用于审计。"), @WritesAttribute(attribute = "ddl.database.type", description = "检测到的数据库类型 (mysql, sqlserver, or oracle)。")})
public class DevAutoJsonTableCreateProcessor extends AbstractProcessor {
// --- 属性定义 (Properties) ---
public static final PropertyDescriptor DBCP_SERVICE = new PropertyDescriptor.Builder().name("Database Connection Pooling Service").displayName("数据库连接池服务").description("用于连接数据库、检查和创建表的DBCPConnectionPool控制器服务。").identifiesControllerService(DBCPService.class).required(true).build();
// --- 关系定义 (Relationships) ---
public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("当表已存在或成功创建时原始FlowFile将被路由到此关系。").build();
public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("在处理过程中发生任何错误如数据库连接失败、SQL执行失败FlowFile将被路由到此关系。").build();
private Set<Relationship> relationships;
private List<PropertyDescriptor> descriptors;
// ObjectMapper 仍然用于解析FlowFile的JSON内容
private final ObjectMapper objectMapper = new ObjectMapper();
@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(DBCP_SERVICE);
this.descriptors = Collections.unmodifiableList(descriptors);
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
relationships.add(REL_FAILURE);
this.relationships = Collections.unmodifiableSet(relationships);
}
@Override
public Set<Relationship> getRelationships() {
return this.relationships;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final ComponentLog logger = getLogger();
// --- 1. 获取并验证 FlowFile 属性 ---
final String tableName = flowFile.getAttribute("targetInsertTableName");
if (tableName == null || tableName.trim().isEmpty()) {
logger.error("属性 'targetInsertTableName' 缺失或为空FlowFile {} 将被路由到 failure。", flowFile);
session.transfer(session.penalize(flowFile), REL_FAILURE);
return;
}
final String autoTableCreation = flowFile.getAttribute("autoTableCreation");
if (autoTableCreation == null || autoTableCreation.trim().isEmpty()) {
logger.error("属性 'autoTableCreation' 缺失或为空FlowFile {} 将被路由到 failure。", flowFile);
session.transfer(session.penalize(flowFile), REL_FAILURE);
return;
}
if ("N".equals(autoTableCreation)) {
logger.info("autoTableCreation=N取消自动建表。跳过创建步骤直接将 FlowFile {} 路由到 success。", tableName, flowFile);
session.transfer(flowFile, REL_SUCCESS);
return;
}
final String jsonPrimaryKeyTag = flowFile.getAttribute("jsonPrimaryKeyTag");
final List<String> primaryKeys;
try {
// *** 代码改造点: 使用自定义的解析方法不再依赖外部库解析此属性 ***
primaryKeys = parseJsonArrayString(jsonPrimaryKeyTag);
if (primaryKeys.isEmpty()) {
logger.warn("属性 'jsonPrimaryKeyTag' 为空或解析后无内容将创建没有主键的表。FlowFile: {}", flowFile);
}
} catch (Exception e) {
logger.error("解析 'jsonPrimaryKeyTag' 失败: '{}'. 错误: {}. FlowFile {} 将被路由到 failure。", jsonPrimaryKeyTag, e.getMessage(), flowFile);
session.transfer(session.penalize(flowFile), REL_FAILURE);
return;
}
final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
// --- 2. 连接数据库检查并执行 ---
try (final Connection conn = dbcpService.getConnection(flowFile.getAttributes())) {
final String dbType = getDbType(conn);
if (tableExists(conn, tableName, dbType)) {
logger.info("表 '{}' 已存在。跳过创建步骤,直接将 FlowFile {} 路由到 success。", tableName, flowFile);
session.transfer(flowFile, REL_SUCCESS);
return;
}
logger.info("表 '{}' 不存在。准备根据 FlowFile {} 的内容生成并执行 DDL。", tableName, flowFile);
final Holder<String> sqlHolder = new Holder<>();
session.read(flowFile, in -> {
JsonNode rootNode = objectMapper.readTree(in);
JsonNode targetNode = rootNode.isArray() && rootNode.size() > 0 ? rootNode.get(0) : rootNode;
if (!targetNode.isObject()) throw new IOException("JSON 内容不是一个有效的对象结构。");
Map<String, String> columnDefinitions = new LinkedHashMap<>();
List<String> foundPrimaryKeys = new ArrayList<>();
Iterator<Map.Entry<String, JsonNode>> fields = targetNode.fields();
while (fields.hasNext()) {
Map.Entry<String, JsonNode> field = fields.next();
String fieldName = field.getKey().replaceAll("\\s+", "");
JsonNode valueNode = field.getValue();
int length = (valueNode == null || valueNode.isNull() || valueNode.asText().isEmpty()) ? 10 : Math.max(10, valueNode.asText().length() * 10);
if ("oracle".equals(dbType)) length = Math.min(length, 4000);
columnDefinitions.put(fieldName, getColumnTypeForDB(dbType, length));
// 检查JSON字段名是否在主键列表中
if (primaryKeys.contains(fieldName.toLowerCase())) {
foundPrimaryKeys.add(fieldName);
}
}
if (columnDefinitions.isEmpty()) throw new IOException("从JSON中未能解析出任何字段。");
sqlHolder.set(generateCreateTableSql(dbType, tableName, columnDefinitions, foundPrimaryKeys));
});
String generatedSql = sqlHolder.get();
if (generatedSql == null || generatedSql.isEmpty()) {
logger.error("生成的 DDL 为空无法执行。FlowFile {} 将被路由到 failure。", flowFile);
session.transfer(session.penalize(flowFile), REL_FAILURE);
return;
}
try (final Statement statement = conn.createStatement()) {
statement.execute(generatedSql);
logger.info("成功执行 DDL 创建了表 '{}'。SQL: {}", tableName, generatedSql);
flowFile = session.putAttribute(flowFile, "ddl.sql.executed", generatedSql);
flowFile = session.putAttribute(flowFile, "ddl.database.type", dbType);
session.transfer(flowFile, REL_SUCCESS);
} catch (final SQLException e) {
logger.error("执行 DDL 失败。SQL: [{}]. FlowFile {} 将被路由到 failure。", generatedSql, flowFile, e);
session.transfer(session.penalize(flowFile), REL_FAILURE);
}
} catch (Exception e) {
logger.error("在确保表 '{}' 存在时发生未知错误。FlowFile {} 将被路由到 failure。", tableName, flowFile, e);
session.transfer(session.penalize(flowFile), REL_FAILURE);
}
}
/**
* *** 新增方法 ***
* 手动解析类JSON数组格式的字符串不依赖任何外部库
* 例如 '["key1", "key2"]' 解析为 List<String>
*
* @param jsonArrayStr 包含主键的字符串
* @return 解析后的主键列表
*/
private List<String> parseJsonArrayString(String jsonArrayStr) {
if (jsonArrayStr == null || jsonArrayStr.trim().isEmpty()) {
return Collections.emptyList();
}
String trimmed = jsonArrayStr.trim();
if (!trimmed.startsWith("[") || !trimmed.endsWith("]")) {
throw new IllegalArgumentException("无效的格式:字符串必须以'['开头并以']'结尾。");
}
// 处理空数组 "[]" 的情况
if (trimmed.length() <= 2) {
return Collections.emptyList();
}
// 移除首尾的方括号
String content = trimmed.substring(1, trimmed.length() - 1).trim();
if (content.isEmpty()) {
return Collections.emptyList();
}
List<String> keys = new ArrayList<>();
String[] parts = content.split(",");
for (String part : parts) {
String cleanedPart = part.trim();
// 移除首尾可能存在的引号
if (cleanedPart.startsWith("\"") && cleanedPart.endsWith("\"")) {
if (cleanedPart.length() > 1) {
cleanedPart = cleanedPart.substring(1, cleanedPart.length() - 1);
} else {
cleanedPart = ""; // 处理只有引号的情况 ""
}
}
if (!cleanedPart.isEmpty()) {
//转换为大小用于后续对比
keys.add(cleanedPart.toLowerCase(Locale.ROOT));
}
}
return keys;
}
private String getDbType(final Connection conn) throws SQLException {
String dbProductName = conn.getMetaData().getDatabaseProductName().toLowerCase();
if (dbProductName.contains("mysql")) return "mysql";
if (dbProductName.contains("microsoft sql server")) return "sqlserver";
if (dbProductName.contains("oracle")) return "oracle";
throw new ProcessException("不支持的数据库类型: " + dbProductName);
}
private boolean tableExists(final Connection conn, final String tableName, final String dbType) throws SQLException {
final DatabaseMetaData metaData = conn.getMetaData();
String namePattern = "oracle".equals(dbType) ? tableName.toUpperCase() : tableName;
try (ResultSet rs = metaData.getTables(conn.getCatalog(), conn.getSchema(), namePattern, new String[]{"TABLE"})) {
return rs.next();
}
}
private String getColumnTypeForDB(String dbType, int length) {
switch (dbType) {
case "mysql":
return "VARCHAR(" + length + ")";
case "sqlserver":
return "VARCHAR(" + length + ")";
case "oracle":
return "VARCHAR2(" + length + " BYTE)";
default:
return "VARCHAR(" + length + ")";
}
}
private String generateCreateTableSql(String dbType, String tableName, Map<String, String> columnDefinitions, List<String> primaryKeys) {
String quotedTableName;
String quoteChar;
String pkQuoteChar;
String identifierCase = "none";
switch (dbType) {
case "mysql":
quotedTableName = getQuotedIdentifier(tableName, "`");
quoteChar = "`";
pkQuoteChar = "`";
break;
case "sqlserver":
quotedTableName = getQuotedIdentifier(tableName, "[");
quoteChar = "[";
pkQuoteChar = "[";
break;
case "oracle":
quotedTableName = getQuotedIdentifier(tableName.toUpperCase(), "\"");
quoteChar = "\"";
pkQuoteChar = "\"";
identifierCase = "upper";
break;
default:
throw new ProcessException("不支持的数据库类型: " + dbType);
}
final String finalIdentifierCase = identifierCase;
String columnsSql = columnDefinitions.entrySet().stream().map(entry -> {
String fieldName = "upper".equals(finalIdentifierCase) ? entry.getKey().toUpperCase() : entry.getKey();
// 注意这里比较时需要统一大小写因为primaryKeys列表是原始大小写
String originalFieldName = entry.getKey();
String notNull = (primaryKeys != null && primaryKeys.contains(originalFieldName)) ? " NOT NULL" : "";
return " " + getQuotedIdentifier(fieldName, quoteChar) + " " + entry.getValue() + notNull;
}).collect(Collectors.joining(",\n"));
StringBuilder sql = new StringBuilder("CREATE TABLE ").append(quotedTableName).append(" (\n").append(columnsSql);
if (primaryKeys != null && !primaryKeys.isEmpty()) {
sql.append(",\n");
String pkColumns = primaryKeys.stream().map(pk -> "upper".equals(finalIdentifierCase) ? pk.toUpperCase() : pk).map(pk -> getQuotedIdentifier(pk, pkQuoteChar)).collect(Collectors.joining(", "));
String constraintName = "PK_" + tableName.replaceAll("[^a-zA-Z0-9_]", "_");
if (constraintName.length() > 30) {
constraintName = constraintName.substring(0, 30);
}
if ("oracle".equals(dbType)) {
constraintName = constraintName.toUpperCase();
}
sql.append(" CONSTRAINT ").append(getQuotedIdentifier(constraintName, pkQuoteChar)).append(" PRIMARY KEY (").append(pkColumns).append(")");
}
sql.append("\n)");
return sql.toString();
}
private String getQuotedIdentifier(String identifier, String quoteChar) {
String endQuoteChar = quoteChar;
if ("[".equals(quoteChar)) endQuoteChar = "]";
return quoteChar + identifier.replace(endQuoteChar, endQuoteChar + endQuoteChar) + endQuoteChar;
}
private static class Holder<T> {
private T value;
void set(T value) {
this.value = value;
}
T get() {
return value;
}
}
}

View File

@ -0,0 +1,15 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
com.hzya.frame.DevAutoJsonTableCreateProcessor

View File

@ -116,20 +116,20 @@ public class DevGeneratePaginatedSqlProcessor extends AbstractProcessor {
String dbType = conn.getMetaData().getDatabaseProductName().toLowerCase(); String dbType = conn.getMetaData().getDatabaseProductName().toLowerCase();
//增量同步 //增量同步
String whereClause = null; String whereClause = "";
if (oldMaximumValue != null && !oldMaximumValue.isEmpty()) { if (oldMaximumValue != null && !oldMaximumValue.isEmpty()) {
whereClause = " WHERE " + maxValColumn + " > '" + oldMaximumValue + "'"; whereClause = " WHERE " + maxValColumn + " > '" + oldMaximumValue + "'";
} }
// 执行 COUNT MAX 查询 // 执行 COUNT MAX 查询
String countMaxSql = null; String countMaxSql = "";
if ("oracle".equals(dbType) || "mysql".equals(dbType)) { if ("oracle".equals(dbType) || "mysql".equals(dbType)) {
countMaxSql = "SELECT COUNT(*) AS rowNumber, MAX(" + maxValColumn + ") AS maximumValue FROM (" + baseSql + ") t " + whereClause; countMaxSql = "SELECT COUNT(*) AS rowNumber, MAX(" + maxValColumn + ") AS maximumValue FROM (" + baseSql + ") t " + whereClause;
} else if ("microsoft sql server".equals(dbType)) { } else if ("microsoft sql server".equals(dbType)) {
StringBuffer montageSql = new StringBuffer(); StringBuffer montageSql = new StringBuffer();
montageSql.append("WITH SortedData AS ("); montageSql.append("WITH SortedData AS (");
montageSql.append(baseSql); montageSql.append(baseSql);
if (whereClause != null) { if (!"".equals(whereClause)) {
if (baseSql.toUpperCase().contains("WHERE")) { if (baseSql.toUpperCase().contains("WHERE")) {
//替换为and //替换为and
whereClause = whereClause.replace("WHERE", "AND"); whereClause = whereClause.replace("WHERE", "AND");
@ -178,7 +178,7 @@ public class DevGeneratePaginatedSqlProcessor extends AbstractProcessor {
long endRow = Math.min((long) (page + 1) * pageSize, rowNumber); long endRow = Math.min((long) (page + 1) * pageSize, rowNumber);
//全量同步 //全量同步
if (whereClause == null) { if ("".equals(whereClause)) {
whereClause = " WHERE " + maxValColumn + " <= '" + queriedMaximumValue + "'"; whereClause = " WHERE " + maxValColumn + " <= '" + queriedMaximumValue + "'";
} }

View File

@ -11,6 +11,8 @@
<modules> <modules>
<module>hzya-nifi-DevGeneratePaginatedSqlProcessor-nar</module> <module>hzya-nifi-DevGeneratePaginatedSqlProcessor-nar</module>
<module>hzya-nifi-DevGeneratePaginatedSqlProcessor-processors</module> <module>hzya-nifi-DevGeneratePaginatedSqlProcessor-processors</module>
<module>hzya-nifi-AutoJsonTableCreate-nar</module>
<module>hzya-nifi-AutoJsonTableCreate-processors</module>
</modules> </modules>
<parent> <parent>