commit bdb8766c26151b4ef44acf3adee879f59783ecd1 Author: liuy <37787198+LiuyCodes@users.noreply.github.com> Date: Wed Jul 9 15:47:21 2025 +0800 feat(nifi): 添加自定义分页 SQL 处理器 - 新增 DevGeneratePaginatedSqlProcessor 处理器 - 实现了 Oracle、MySQL 和 SQL Server 的分页查询 - 添加了状态管理,以跟踪每个表的最大值列 - 包含了错误处理和日志记录功能 - 新建了 Maven 项目结构和必要的配置文件 - 编写了处理器的单元测试 - 添加了项目文档和 README 文件 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..57deb9b --- /dev/null +++ b/.gitignore @@ -0,0 +1,60 @@ +# ---> macOS +# General +.DS_Store +.AppleDouble +.LSOverride + +# Icon must end with two \r +Icon + + +# Thumbnails +._* + +# Files that might appear in the root of a volume +.DocumentRevisions-V100 +.fseventsd +.Spotlight-V100 +.TemporaryItems +.Trashes +.VolumeIcon.icns +.com.apple.timemachine.donotpresent + +# Directories potentially created on remote AFP share +.AppleDB +.AppleDesktop +Network Trash Folder +Temporary Items +.apdisk + +# ---> Windows +# Windows thumbnail cache files +Thumbs.db +Thumbs.db:encryptable +ehthumbs.db +ehthumbs_vista.db + +# Dump file +*.stackdump + +# Folder config file +[Dd]esktop.ini + +# Recycle Bin used on file shares +$RECYCLE.BIN/ + +# Windows Installer files +*.cab +*.msi +*.msix +*.msm +*.msp +*.iml + +# Windows shortcuts +*.lnk +/.idea/ +/log/ +/file/ +/buildpackage/src/main/resources/banner.txt +/hzya-nifi-*/target/ diff --git a/nifi-hzyadev-bundle/README.md b/nifi-hzyadev-bundle/README.md new file mode 100644 index 0000000..78b7924 --- /dev/null +++ b/nifi-hzyadev-bundle/README.md @@ -0,0 +1,93 @@ +# nifi-hzyadev-bundle 项目 + +## 项目概述 + +这是一个基于 Apache NiFi 1.28.1 的自定义处理器开发项目,名为 `nifi-hzyadev-bundle`。本项目是一个用于 NiFi 自定义处理器的开发工程,旨在提供灵活的框架来构建和扩展 NiFi 数据流处理功能。项目使用 Maven 构建,基于 Java 开发,并遵循 NiFi 的 NAR 打包规范。 + +## 项目结构 + +``` +hzya-nifi-DevGeneratePaginatedSqlProcessor/ +├── .idea/ # IDE 配置文件 +├── hzya-nifi-DevGeneratePaginatedSqlProcessor-nar/ +│ ├── target/ # NAR 包构建输出 +│ └── m pom.xml # NAR 模块 Maven 配置文件 +├── hzya-nifi-DevGeneratePaginatedSqlProcessor-processor/ +│ ├── src/ +│ │ ├── main/ +│ │ │ ├── java/ +│ │ │ │ └── com/hzya/frame/DevGeneratePaginatedSqlProcessor.java # 自定义处理器主类 +│ │ │ ├── resources/ # 资源文件 +│ │ │ └── services/ # 服务相关代码 +│ │ │ └── org.apache.nifi.processor.Processor # 处理器接口或服务定义 +│ │ └── test/ # 测试代码目录 +│ ├── pom.xml # 处理器模块 Maven 配置文件 +│ └── README.md # 项目说明文档 +├── target/ # 构建输出目录 +├── External Libraries # 外部依赖库 +└── Scratches and Consoles # 临时脚本和控制台 +``` + +## 命名规则 + +本项目遵循统一的命名约定。如果需要开发新的自定义处理器,需创建两个 Maven 子工程: +- `hzya-nifi-处理器名称-nar`:用于生成 NiFi NAR 包,包含处理器的构建输出。 +- `hzya-nifi-处理器名称-processors`:包含处理器核心代码、资源和测试代码。 + +例如,开发一个名为 `MyCustomProcessor` 的处理器时,需创建: +- `hzya-nifi-MyCustomProcessor-nar` +- `hzya-nifi-MyCustomProcessor-processors` + +两个工程通过 Maven 模块化管理,`nar` 工程依赖 `processors` 工程以打包最终的 NAR 文件。 + +此外,命名规则还包括: +- `processors` 包的包名称统一为 `com.hzya.frame`。 +- 处理器的类名统一为 `Dev处理器名称Processor`,例如 `DevGeneratePaginatedSqlProcessor` 或 `DevMyCustomProcessor`。 + +## 依赖 + +本项目使用 Maven 管理依赖,主要依赖包括: + +- **Apache NiFi**: 1.28.1(自定义处理器开发框架)。 +- **Java**: 8 或更高版本。 +- **JUnit**: 用于单元测试(可选)。 + +具体依赖请查看 `pom.xml` 文件。 + +## 开发环境设置 + +### 前置条件 + +- **JDK**: 8 或更高版本。 +- **Maven**: 3.6.0 或更高版本。 +- **Apache NiFi**: 1.28.1(需正确安装并配置)。 +- **IDE**: 推荐使用 IntelliJ IDEA。 + +### 设置步骤 + +1. **克隆项目**: + ```bash + git clone + cd hzya-nifi-DevGeneratePaginatedSqlProcessor + ``` + +2. **安装依赖**: + ```bash + mvn clean install + ``` + +3. **配置 NiFi**: + - 将 NiFi 安装目录设置为环境变量 `NIFI_HOME`。 + - 确保 NiFi 服务已启动。 + +4. **导入 IDE**: + - 使用 IntelliJ IDEA 打开项目。 + - 配置 Maven 项目并同步依赖。 + +## 测试 + +在 `src/test/` 目录下编写单元测试,验证处理器逻辑。使用 JUnit 框架运行测试。 + +## 贡献 + +欢迎提交问题或拉取请求以改进项目。 \ No newline at end of file diff --git a/nifi-hzyadev-bundle/hzya-nifi-DevGeneratePaginatedSqlProcessor-nar/pom.xml b/nifi-hzyadev-bundle/hzya-nifi-DevGeneratePaginatedSqlProcessor-nar/pom.xml new file mode 100644 index 0000000..c4cda01 --- /dev/null +++ b/nifi-hzyadev-bundle/hzya-nifi-DevGeneratePaginatedSqlProcessor-nar/pom.xml @@ -0,0 +1,34 @@ + + + + nifi-hzyadev-bundle + com.hzya + 1.0 + + 4.0.0 + + hzya-nifi-DevGeneratePaginatedSqlProcessor-nar + nar + + 8 + 8 + true + true + + + + + com.hzya + hzya-nifi-DevGeneratePaginatedSqlProcessor-processors + 1.0 + + + org.apache.nifi + nifi-standard-services-api-nar + ${nifi-revision} + nar + + + \ No newline at end of file diff --git a/nifi-hzyadev-bundle/hzya-nifi-DevGeneratePaginatedSqlProcessor-processors/pom.xml b/nifi-hzyadev-bundle/hzya-nifi-DevGeneratePaginatedSqlProcessor-processors/pom.xml new file mode 100644 index 0000000..8ff9160 --- /dev/null +++ b/nifi-hzyadev-bundle/hzya-nifi-DevGeneratePaginatedSqlProcessor-processors/pom.xml @@ -0,0 +1,53 @@ + + + + nifi-hzyadev-bundle + com.hzya + 1.0 + + + 1.0 + 4.0.0 + + hzya-nifi-DevGeneratePaginatedSqlProcessor-processors + + + 8 + 8 + + + jar + + + + org.apache.nifi + nifi-api + ${nifi-revision} + + + org.apache.nifi + nifi-dbcp-service-api + ${nifi-revision} + provided + + + org.apache.nifi + nifi-processor-utils + 1.15.3 + + + org.apache.nifi + nifi-mock + ${nifi-revision} + test + + + junit + junit + 4.13 + test + + + \ No newline at end of file diff --git a/nifi-hzyadev-bundle/hzya-nifi-DevGeneratePaginatedSqlProcessor-processors/src/main/java/com/hzya/frame/DevGeneratePaginatedSqlProcessor.java b/nifi-hzyadev-bundle/hzya-nifi-DevGeneratePaginatedSqlProcessor-processors/src/main/java/com/hzya/frame/DevGeneratePaginatedSqlProcessor.java new file mode 100644 index 0000000..a5ce030 --- /dev/null +++ b/nifi-hzyadev-bundle/hzya-nifi-DevGeneratePaginatedSqlProcessor-processors/src/main/java/com/hzya/frame/DevGeneratePaginatedSqlProcessor.java @@ -0,0 +1,187 @@ +package com.hzya.frame; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.nio.charset.StandardCharsets; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.*; + +@Tags({"sql", "database", "pagination", "custom", "jdbc", "oracle", "mysql", "sqlserver"}) +@CapabilityDescription("基于基本查询生成分页SQL查询,并有状态地跟踪列的最大值,以仅获取新数据。每个输出FlowFile的内容都是特定页面的分页SQL查询。支持Oracle、MySQL和SQL Server。") +@InputRequirement(Requirement.INPUT_REQUIRED) +@Stateful(scopes = {Scope.CLUSTER}, description = "为每个表名存储指定列的最大值。这允许处理器在后续运行中仅查询新行。") +@WritesAttributes({@WritesAttribute(attribute = "page", description = "生成的SQL查询的当前页码。"), @WritesAttribute(attribute = "startRow", description = "此页的起始行号。"), @WritesAttribute(attribute = "endRow", description = "此页的结束行号。")}) +public class DevGeneratePaginatedSqlProcessor extends AbstractProcessor { + // 属性定义 + public static final PropertyDescriptor DBCP_SERVICE = new PropertyDescriptor.Builder().name("Database Connection Pooling Service").description("提供与数据库连接的控制器服务。").required(true).identifiesControllerService(DBCPService.class).build(); + + public static final PropertyDescriptor BASE_SQL_QUERY = new PropertyDescriptor.Builder().name("Base SQL Query").description("要分页的基本SQL查询。应该是SELECT语句。该值将从传入FlowFile的“executesql”属性中读取。").required(true).expressionLanguageSupported(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build(); + + public static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder().name("Table Name for State").description("正在查询的表的唯一名称。这被用作存储状态(最大值)的键。该值将从“queryTableName”属性中读取。").required(true).expressionLanguageSupported(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build(); + + public static final PropertyDescriptor MAX_VALUE_COLUMN = new PropertyDescriptor.Builder().name("Maximum-Value Column").description("将跟踪其最大值以获取增量数据的列的名称(例如,时间戳或自动递增的ID)。将从“maximumValueColumns”属性读取该值。").required(true).expressionLanguageSupported(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build(); + + public static final PropertyDescriptor PAGE_SIZE = new PropertyDescriptor.Builder().name("Page Size").description("每页中要包含的行数。该值将从“pagerow”属性读取,默认为1000。").defaultValue("1000").required(true).expressionLanguageSupported(true).addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).build(); + + // 关系定义 + public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("分页SQL查询已成功生成并发送到此关系。").build(); + + public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("如果在处理过程中发生错误,则将原始FlowFile路由到此关系。").build(); + + private final List descriptors; + private final Set relationships; + + public DevGeneratePaginatedSqlProcessor() { + final List descriptors = new ArrayList<>(); + descriptors.add(DBCP_SERVICE); + descriptors.add(BASE_SQL_QUERY); + descriptors.add(TABLE_NAME); + descriptors.add(MAX_VALUE_COLUMN); + descriptors.add(PAGE_SIZE); + this.descriptors = Collections.unmodifiableList(descriptors); + + final Set relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_FAILURE); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + public Set getRelationships() { + return this.relationships; + } + + @Override + public final List getSupportedPropertyDescriptors() { + return this.descriptors; + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + // 获取属性值 + final String baseSql = context.getProperty(BASE_SQL_QUERY).evaluateAttributeExpressions(flowFile).getValue(); + final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue(); + final String maxValColumn = context.getProperty(MAX_VALUE_COLUMN).evaluateAttributeExpressions(flowFile).getValue(); + final Integer pageSize = context.getProperty(PAGE_SIZE).evaluateAttributeExpressions(flowFile).asInteger(); + + // 验证属性 + if (baseSql == null || tableName == null || maxValColumn == null || pageSize == null) { + getLogger().error("缺少必需的属性。路由失败"); + session.transfer(flowFile, REL_FAILURE); + return; + } + + final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class); + final StateManager stateManager = context.getStateManager(); + final Scope scope = Scope.CLUSTER; + + try { + final StateMap stateMap = stateManager.getState(scope); + final String oldMaximumValue = stateMap.get(tableName); + + String queriedMaximumValue = null; + long rowNumber = 0L; + + try (Connection conn = dbcpService.getConnection(); Statement stmt = conn.createStatement()) { + // 构建 WHERE 条件 + String whereClause = ""; + if (oldMaximumValue != null && !oldMaximumValue.isEmpty()) { + whereClause = " WHERE " + maxValColumn + " > '" + oldMaximumValue + "'"; + } + + // 执行 COUNT 和 MAX 查询 + String countMaxSql = "SELECT COUNT(*) AS rowNumber, MAX(" + maxValColumn + ") AS maximumValue FROM (" + baseSql + ") t" + whereClause; + getLogger().info("正在执行计数/最大值查询: {}", countMaxSql); + ResultSet rs = stmt.executeQuery(countMaxSql); + + if (rs.next()) { + rowNumber = rs.getLong("rowNumber"); + queriedMaximumValue = rs.getString("maximumValue"); + } + rs.close(); + + getLogger().info("表“{}”的查询结果:新行总数={},新的最大值={}", tableName, rowNumber, queriedMaximumValue); + + // 无新数据时删除 FlowFile 并返回 + if (rowNumber == 0 || queriedMaximumValue == null) { + getLogger().info("找不到表“{}”的新数据。正在删除原始FlowFile。", tableName); + session.remove(flowFile); + return; + } + + // 生成分页 SQL + long totalPages = (rowNumber + pageSize - 1) / pageSize; + String dbType = conn.getMetaData().getDatabaseProductName().toLowerCase(); + + for (int page = 0; page < totalPages; page++) { + long startRow = (long) page * pageSize + 1; + long endRow = Math.min((long) (page + 1) * pageSize, rowNumber); + + String paginatedSql; + if ("oracle".equals(dbType)) { + paginatedSql = "SELECT * FROM (\n" + " SELECT inner_query.*, ROWNUM rn FROM (\n" + " " + baseSql + whereClause + "\n" + " ) inner_query WHERE ROWNUM <= " + endRow + "\n" + ") WHERE rn >= " + startRow; + } else if ("mysql".equals(dbType)) { + long offset = startRow - 1; + paginatedSql = baseSql + whereClause + " LIMIT " + pageSize + " OFFSET " + offset; + } else if ("sqlserver".equals(dbType)) { + paginatedSql = baseSql + whereClause + " OFFSET " + (startRow - 1) + " ROWS FETCH NEXT " + pageSize + " ROWS ONLY"; + } else { + throw new ProcessException("不支持的数据库类型: " + dbType); + } + + FlowFile newFlowFile = session.create(flowFile); + newFlowFile = session.write(newFlowFile, out -> out.write(paginatedSql.getBytes(StandardCharsets.UTF_8))); + + Map attributes = new HashMap<>(); + attributes.put("page", String.valueOf(page + 1)); + attributes.put("startRow", String.valueOf(startRow)); + attributes.put("endRow", String.valueOf(endRow)); + newFlowFile = session.putAllAttributes(newFlowFile, attributes); + + session.transfer(newFlowFile, REL_SUCCESS); + } + + // 更新状态 + final Map newState = new HashMap<>(stateMap.toMap()); + newState.put(tableName, queriedMaximumValue); + stateManager.setState(newState, scope); + + // 删除原始 FlowFile + session.remove(flowFile); + + } catch (SQLException e) { + getLogger().error("由于{},无法执行SQL查询;故障路由", e.getMessage(), e); + session.transfer(flowFile, REL_FAILURE); + } + } catch (Exception e) { + getLogger().error("处理过程中出错;故障路由", e); + session.transfer(flowFile, REL_FAILURE); + } + } +} \ No newline at end of file diff --git a/nifi-hzyadev-bundle/hzya-nifi-DevGeneratePaginatedSqlProcessor-processors/src/main/resources/services/org.apache.nifi.processor.Processor b/nifi-hzyadev-bundle/hzya-nifi-DevGeneratePaginatedSqlProcessor-processors/src/main/resources/services/org.apache.nifi.processor.Processor new file mode 100644 index 0000000..8512b67 --- /dev/null +++ b/nifi-hzyadev-bundle/hzya-nifi-DevGeneratePaginatedSqlProcessor-processors/src/main/resources/services/org.apache.nifi.processor.Processor @@ -0,0 +1,15 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +com.hzya.frame.DevGeneratePaginatedSqlProcessor \ No newline at end of file diff --git a/nifi-hzyadev-bundle/pom.xml b/nifi-hzyadev-bundle/pom.xml new file mode 100644 index 0000000..7fc529e --- /dev/null +++ b/nifi-hzyadev-bundle/pom.xml @@ -0,0 +1,30 @@ + + + 4.0.0 + + com.hzya + nifi-hzyadev-bundle + pom + 1.0 + + hzya-nifi-DevGeneratePaginatedSqlProcessor-nar + hzya-nifi-DevGeneratePaginatedSqlProcessor-processors + + + + org.apache.nifi + nifi-external + 1.28.1 + + + + 8 + 8 + 1.28.1 + + + + + \ No newline at end of file