feat(nifi): 添加自定义分页 SQL 处理器
- 新增 DevGeneratePaginatedSqlProcessor 处理器 - 实现了 Oracle、MySQL 和 SQL Server 的分页查询 - 添加了状态管理,以跟踪每个表的最大值列 - 包含了错误处理和日志记录功能 - 新建了 Maven 项目结构和必要的配置文件 - 编写了处理器的单元测试 - 添加了项目文档和 README 文件
This commit is contained in:
commit
bdb8766c26
|
@ -0,0 +1,60 @@
|
||||||
|
# ---> macOS
|
||||||
|
# General
|
||||||
|
.DS_Store
|
||||||
|
.AppleDouble
|
||||||
|
.LSOverride
|
||||||
|
|
||||||
|
# Icon must end with two \r
|
||||||
|
Icon
|
||||||
|
|
||||||
|
|
||||||
|
# Thumbnails
|
||||||
|
._*
|
||||||
|
|
||||||
|
# Files that might appear in the root of a volume
|
||||||
|
.DocumentRevisions-V100
|
||||||
|
.fseventsd
|
||||||
|
.Spotlight-V100
|
||||||
|
.TemporaryItems
|
||||||
|
.Trashes
|
||||||
|
.VolumeIcon.icns
|
||||||
|
.com.apple.timemachine.donotpresent
|
||||||
|
|
||||||
|
# Directories potentially created on remote AFP share
|
||||||
|
.AppleDB
|
||||||
|
.AppleDesktop
|
||||||
|
Network Trash Folder
|
||||||
|
Temporary Items
|
||||||
|
.apdisk
|
||||||
|
|
||||||
|
# ---> Windows
|
||||||
|
# Windows thumbnail cache files
|
||||||
|
Thumbs.db
|
||||||
|
Thumbs.db:encryptable
|
||||||
|
ehthumbs.db
|
||||||
|
ehthumbs_vista.db
|
||||||
|
|
||||||
|
# Dump file
|
||||||
|
*.stackdump
|
||||||
|
|
||||||
|
# Folder config file
|
||||||
|
[Dd]esktop.ini
|
||||||
|
|
||||||
|
# Recycle Bin used on file shares
|
||||||
|
$RECYCLE.BIN/
|
||||||
|
|
||||||
|
# Windows Installer files
|
||||||
|
*.cab
|
||||||
|
*.msi
|
||||||
|
*.msix
|
||||||
|
*.msm
|
||||||
|
*.msp
|
||||||
|
*.iml
|
||||||
|
|
||||||
|
# Windows shortcuts
|
||||||
|
*.lnk
|
||||||
|
/.idea/
|
||||||
|
/log/
|
||||||
|
/file/
|
||||||
|
/buildpackage/src/main/resources/banner.txt
|
||||||
|
/hzya-nifi-*/target/
|
|
@ -0,0 +1,93 @@
|
||||||
|
# nifi-hzyadev-bundle 项目
|
||||||
|
|
||||||
|
## 项目概述
|
||||||
|
|
||||||
|
这是一个基于 Apache NiFi 1.28.1 的自定义处理器开发项目,名为 `nifi-hzyadev-bundle`。本项目是一个用于 NiFi 自定义处理器的开发工程,旨在提供灵活的框架来构建和扩展 NiFi 数据流处理功能。项目使用 Maven 构建,基于 Java 开发,并遵循 NiFi 的 NAR 打包规范。
|
||||||
|
|
||||||
|
## 项目结构
|
||||||
|
|
||||||
|
```
|
||||||
|
hzya-nifi-DevGeneratePaginatedSqlProcessor/
|
||||||
|
├── .idea/ # IDE 配置文件
|
||||||
|
├── hzya-nifi-DevGeneratePaginatedSqlProcessor-nar/
|
||||||
|
│ ├── target/ # NAR 包构建输出
|
||||||
|
│ └── m pom.xml # NAR 模块 Maven 配置文件
|
||||||
|
├── hzya-nifi-DevGeneratePaginatedSqlProcessor-processor/
|
||||||
|
│ ├── src/
|
||||||
|
│ │ ├── main/
|
||||||
|
│ │ │ ├── java/
|
||||||
|
│ │ │ │ └── com/hzya/frame/DevGeneratePaginatedSqlProcessor.java # 自定义处理器主类
|
||||||
|
│ │ │ ├── resources/ # 资源文件
|
||||||
|
│ │ │ └── services/ # 服务相关代码
|
||||||
|
│ │ │ └── org.apache.nifi.processor.Processor # 处理器接口或服务定义
|
||||||
|
│ │ └── test/ # 测试代码目录
|
||||||
|
│ ├── pom.xml # 处理器模块 Maven 配置文件
|
||||||
|
│ └── README.md # 项目说明文档
|
||||||
|
├── target/ # 构建输出目录
|
||||||
|
├── External Libraries # 外部依赖库
|
||||||
|
└── Scratches and Consoles # 临时脚本和控制台
|
||||||
|
```
|
||||||
|
|
||||||
|
## 命名规则
|
||||||
|
|
||||||
|
本项目遵循统一的命名约定。如果需要开发新的自定义处理器,需创建两个 Maven 子工程:
|
||||||
|
- `hzya-nifi-处理器名称-nar`:用于生成 NiFi NAR 包,包含处理器的构建输出。
|
||||||
|
- `hzya-nifi-处理器名称-processors`:包含处理器核心代码、资源和测试代码。
|
||||||
|
|
||||||
|
例如,开发一个名为 `MyCustomProcessor` 的处理器时,需创建:
|
||||||
|
- `hzya-nifi-MyCustomProcessor-nar`
|
||||||
|
- `hzya-nifi-MyCustomProcessor-processors`
|
||||||
|
|
||||||
|
两个工程通过 Maven 模块化管理,`nar` 工程依赖 `processors` 工程以打包最终的 NAR 文件。
|
||||||
|
|
||||||
|
此外,命名规则还包括:
|
||||||
|
- `processors` 包的包名称统一为 `com.hzya.frame`。
|
||||||
|
- 处理器的类名统一为 `Dev处理器名称Processor`,例如 `DevGeneratePaginatedSqlProcessor` 或 `DevMyCustomProcessor`。
|
||||||
|
|
||||||
|
## 依赖
|
||||||
|
|
||||||
|
本项目使用 Maven 管理依赖,主要依赖包括:
|
||||||
|
|
||||||
|
- **Apache NiFi**: 1.28.1(自定义处理器开发框架)。
|
||||||
|
- **Java**: 8 或更高版本。
|
||||||
|
- **JUnit**: 用于单元测试(可选)。
|
||||||
|
|
||||||
|
具体依赖请查看 `pom.xml` 文件。
|
||||||
|
|
||||||
|
## 开发环境设置
|
||||||
|
|
||||||
|
### 前置条件
|
||||||
|
|
||||||
|
- **JDK**: 8 或更高版本。
|
||||||
|
- **Maven**: 3.6.0 或更高版本。
|
||||||
|
- **Apache NiFi**: 1.28.1(需正确安装并配置)。
|
||||||
|
- **IDE**: 推荐使用 IntelliJ IDEA。
|
||||||
|
|
||||||
|
### 设置步骤
|
||||||
|
|
||||||
|
1. **克隆项目**:
|
||||||
|
```bash
|
||||||
|
git clone <repository-url>
|
||||||
|
cd hzya-nifi-DevGeneratePaginatedSqlProcessor
|
||||||
|
```
|
||||||
|
|
||||||
|
2. **安装依赖**:
|
||||||
|
```bash
|
||||||
|
mvn clean install
|
||||||
|
```
|
||||||
|
|
||||||
|
3. **配置 NiFi**:
|
||||||
|
- 将 NiFi 安装目录设置为环境变量 `NIFI_HOME`。
|
||||||
|
- 确保 NiFi 服务已启动。
|
||||||
|
|
||||||
|
4. **导入 IDE**:
|
||||||
|
- 使用 IntelliJ IDEA 打开项目。
|
||||||
|
- 配置 Maven 项目并同步依赖。
|
||||||
|
|
||||||
|
## 测试
|
||||||
|
|
||||||
|
在 `src/test/` 目录下编写单元测试,验证处理器逻辑。使用 JUnit 框架运行测试。
|
||||||
|
|
||||||
|
## 贡献
|
||||||
|
|
||||||
|
欢迎提交问题或拉取请求以改进项目。
|
|
@ -0,0 +1,34 @@
|
||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||||
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
|
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||||
|
<parent>
|
||||||
|
<artifactId>nifi-hzyadev-bundle</artifactId>
|
||||||
|
<groupId>com.hzya</groupId>
|
||||||
|
<version>1.0</version>
|
||||||
|
</parent>
|
||||||
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
|
||||||
|
<artifactId>hzya-nifi-DevGeneratePaginatedSqlProcessor-nar</artifactId>
|
||||||
|
<packaging>nar</packaging>
|
||||||
|
<properties>
|
||||||
|
<maven.compiler.source>8</maven.compiler.source>
|
||||||
|
<maven.compiler.target>8</maven.compiler.target>
|
||||||
|
<maven.javadoc.skip>true</maven.javadoc.skip>
|
||||||
|
<source.skip>true</source.skip>
|
||||||
|
</properties>
|
||||||
|
|
||||||
|
<dependencies>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.hzya</groupId>
|
||||||
|
<artifactId>hzya-nifi-DevGeneratePaginatedSqlProcessor-processors</artifactId>
|
||||||
|
<version>1.0</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.nifi</groupId>
|
||||||
|
<artifactId>nifi-standard-services-api-nar</artifactId>
|
||||||
|
<version>${nifi-revision}</version>
|
||||||
|
<type>nar</type>
|
||||||
|
</dependency>
|
||||||
|
</dependencies>
|
||||||
|
</project>
|
|
@ -0,0 +1,53 @@
|
||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||||
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
|
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||||
|
<parent>
|
||||||
|
<artifactId>nifi-hzyadev-bundle</artifactId>
|
||||||
|
<groupId>com.hzya</groupId>
|
||||||
|
<version>1.0</version>
|
||||||
|
</parent>
|
||||||
|
|
||||||
|
<version>1.0</version>
|
||||||
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
|
||||||
|
<artifactId>hzya-nifi-DevGeneratePaginatedSqlProcessor-processors</artifactId>
|
||||||
|
|
||||||
|
<properties>
|
||||||
|
<maven.compiler.source>8</maven.compiler.source>
|
||||||
|
<maven.compiler.target>8</maven.compiler.target>
|
||||||
|
</properties>
|
||||||
|
|
||||||
|
<packaging>jar</packaging>
|
||||||
|
|
||||||
|
<dependencies>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.nifi</groupId>
|
||||||
|
<artifactId>nifi-api</artifactId>
|
||||||
|
<version>${nifi-revision}</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.nifi</groupId>
|
||||||
|
<artifactId>nifi-dbcp-service-api</artifactId>
|
||||||
|
<version>${nifi-revision}</version>
|
||||||
|
<scope>provided</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.nifi</groupId>
|
||||||
|
<artifactId>nifi-processor-utils</artifactId>
|
||||||
|
<version>1.15.3</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.nifi</groupId>
|
||||||
|
<artifactId>nifi-mock</artifactId>
|
||||||
|
<version>${nifi-revision}</version>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>junit</groupId>
|
||||||
|
<artifactId>junit</artifactId>
|
||||||
|
<version>4.13</version>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
</dependencies>
|
||||||
|
</project>
|
|
@ -0,0 +1,187 @@
|
||||||
|
package com.hzya.frame;
|
||||||
|
|
||||||
|
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||||
|
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
|
||||||
|
import org.apache.nifi.annotation.behavior.Stateful;
|
||||||
|
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||||
|
import org.apache.nifi.annotation.behavior.WritesAttributes;
|
||||||
|
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||||
|
import org.apache.nifi.annotation.documentation.Tags;
|
||||||
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
|
import org.apache.nifi.components.state.Scope;
|
||||||
|
import org.apache.nifi.components.state.StateManager;
|
||||||
|
import org.apache.nifi.components.state.StateMap;
|
||||||
|
import org.apache.nifi.dbcp.DBCPService;
|
||||||
|
import org.apache.nifi.flowfile.FlowFile;
|
||||||
|
import org.apache.nifi.processor.AbstractProcessor;
|
||||||
|
import org.apache.nifi.processor.ProcessContext;
|
||||||
|
import org.apache.nifi.processor.ProcessSession;
|
||||||
|
import org.apache.nifi.processor.Relationship;
|
||||||
|
import org.apache.nifi.processor.exception.ProcessException;
|
||||||
|
import org.apache.nifi.processor.util.StandardValidators;
|
||||||
|
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.sql.Connection;
|
||||||
|
import java.sql.ResultSet;
|
||||||
|
import java.sql.SQLException;
|
||||||
|
import java.sql.Statement;
|
||||||
|
import java.util.*;
|
||||||
|
|
||||||
|
@Tags({"sql", "database", "pagination", "custom", "jdbc", "oracle", "mysql", "sqlserver"})
|
||||||
|
@CapabilityDescription("基于基本查询生成分页SQL查询,并有状态地跟踪列的最大值,以仅获取新数据。每个输出FlowFile的内容都是特定页面的分页SQL查询。支持Oracle、MySQL和SQL Server。")
|
||||||
|
@InputRequirement(Requirement.INPUT_REQUIRED)
|
||||||
|
@Stateful(scopes = {Scope.CLUSTER}, description = "为每个表名存储指定列的最大值。这允许处理器在后续运行中仅查询新行。")
|
||||||
|
@WritesAttributes({@WritesAttribute(attribute = "page", description = "生成的SQL查询的当前页码。"), @WritesAttribute(attribute = "startRow", description = "此页的起始行号。"), @WritesAttribute(attribute = "endRow", description = "此页的结束行号。")})
|
||||||
|
public class DevGeneratePaginatedSqlProcessor extends AbstractProcessor {
|
||||||
|
// 属性定义
|
||||||
|
public static final PropertyDescriptor DBCP_SERVICE = new PropertyDescriptor.Builder().name("Database Connection Pooling Service").description("提供与数据库连接的控制器服务。").required(true).identifiesControllerService(DBCPService.class).build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor BASE_SQL_QUERY = new PropertyDescriptor.Builder().name("Base SQL Query").description("要分页的基本SQL查询。应该是SELECT语句。该值将从传入FlowFile的“executesql”属性中读取。").required(true).expressionLanguageSupported(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder().name("Table Name for State").description("正在查询的表的唯一名称。这被用作存储状态(最大值)的键。该值将从“queryTableName”属性中读取。").required(true).expressionLanguageSupported(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor MAX_VALUE_COLUMN = new PropertyDescriptor.Builder().name("Maximum-Value Column").description("将跟踪其最大值以获取增量数据的列的名称(例如,时间戳或自动递增的ID)。将从“maximumValueColumns”属性读取该值。").required(true).expressionLanguageSupported(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor PAGE_SIZE = new PropertyDescriptor.Builder().name("Page Size").description("每页中要包含的行数。该值将从“pagerow”属性读取,默认为1000。").defaultValue("1000").required(true).expressionLanguageSupported(true).addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).build();
|
||||||
|
|
||||||
|
// 关系定义
|
||||||
|
public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("分页SQL查询已成功生成并发送到此关系。").build();
|
||||||
|
|
||||||
|
public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("如果在处理过程中发生错误,则将原始FlowFile路由到此关系。").build();
|
||||||
|
|
||||||
|
private final List<PropertyDescriptor> descriptors;
|
||||||
|
private final Set<Relationship> relationships;
|
||||||
|
|
||||||
|
public DevGeneratePaginatedSqlProcessor() {
|
||||||
|
final List<PropertyDescriptor> descriptors = new ArrayList<>();
|
||||||
|
descriptors.add(DBCP_SERVICE);
|
||||||
|
descriptors.add(BASE_SQL_QUERY);
|
||||||
|
descriptors.add(TABLE_NAME);
|
||||||
|
descriptors.add(MAX_VALUE_COLUMN);
|
||||||
|
descriptors.add(PAGE_SIZE);
|
||||||
|
this.descriptors = Collections.unmodifiableList(descriptors);
|
||||||
|
|
||||||
|
final Set<Relationship> relationships = new HashSet<>();
|
||||||
|
relationships.add(REL_SUCCESS);
|
||||||
|
relationships.add(REL_FAILURE);
|
||||||
|
this.relationships = Collections.unmodifiableSet(relationships);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Set<Relationship> getRelationships() {
|
||||||
|
return this.relationships;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||||
|
return this.descriptors;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
|
||||||
|
FlowFile flowFile = session.get();
|
||||||
|
if (flowFile == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 获取属性值
|
||||||
|
final String baseSql = context.getProperty(BASE_SQL_QUERY).evaluateAttributeExpressions(flowFile).getValue();
|
||||||
|
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
|
||||||
|
final String maxValColumn = context.getProperty(MAX_VALUE_COLUMN).evaluateAttributeExpressions(flowFile).getValue();
|
||||||
|
final Integer pageSize = context.getProperty(PAGE_SIZE).evaluateAttributeExpressions(flowFile).asInteger();
|
||||||
|
|
||||||
|
// 验证属性
|
||||||
|
if (baseSql == null || tableName == null || maxValColumn == null || pageSize == null) {
|
||||||
|
getLogger().error("缺少必需的属性。路由失败");
|
||||||
|
session.transfer(flowFile, REL_FAILURE);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
|
||||||
|
final StateManager stateManager = context.getStateManager();
|
||||||
|
final Scope scope = Scope.CLUSTER;
|
||||||
|
|
||||||
|
try {
|
||||||
|
final StateMap stateMap = stateManager.getState(scope);
|
||||||
|
final String oldMaximumValue = stateMap.get(tableName);
|
||||||
|
|
||||||
|
String queriedMaximumValue = null;
|
||||||
|
long rowNumber = 0L;
|
||||||
|
|
||||||
|
try (Connection conn = dbcpService.getConnection(); Statement stmt = conn.createStatement()) {
|
||||||
|
// 构建 WHERE 条件
|
||||||
|
String whereClause = "";
|
||||||
|
if (oldMaximumValue != null && !oldMaximumValue.isEmpty()) {
|
||||||
|
whereClause = " WHERE " + maxValColumn + " > '" + oldMaximumValue + "'";
|
||||||
|
}
|
||||||
|
|
||||||
|
// 执行 COUNT 和 MAX 查询
|
||||||
|
String countMaxSql = "SELECT COUNT(*) AS rowNumber, MAX(" + maxValColumn + ") AS maximumValue FROM (" + baseSql + ") t" + whereClause;
|
||||||
|
getLogger().info("正在执行计数/最大值查询: {}", countMaxSql);
|
||||||
|
ResultSet rs = stmt.executeQuery(countMaxSql);
|
||||||
|
|
||||||
|
if (rs.next()) {
|
||||||
|
rowNumber = rs.getLong("rowNumber");
|
||||||
|
queriedMaximumValue = rs.getString("maximumValue");
|
||||||
|
}
|
||||||
|
rs.close();
|
||||||
|
|
||||||
|
getLogger().info("表“{}”的查询结果:新行总数={},新的最大值={}", tableName, rowNumber, queriedMaximumValue);
|
||||||
|
|
||||||
|
// 无新数据时删除 FlowFile 并返回
|
||||||
|
if (rowNumber == 0 || queriedMaximumValue == null) {
|
||||||
|
getLogger().info("找不到表“{}”的新数据。正在删除原始FlowFile。", tableName);
|
||||||
|
session.remove(flowFile);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 生成分页 SQL
|
||||||
|
long totalPages = (rowNumber + pageSize - 1) / pageSize;
|
||||||
|
String dbType = conn.getMetaData().getDatabaseProductName().toLowerCase();
|
||||||
|
|
||||||
|
for (int page = 0; page < totalPages; page++) {
|
||||||
|
long startRow = (long) page * pageSize + 1;
|
||||||
|
long endRow = Math.min((long) (page + 1) * pageSize, rowNumber);
|
||||||
|
|
||||||
|
String paginatedSql;
|
||||||
|
if ("oracle".equals(dbType)) {
|
||||||
|
paginatedSql = "SELECT * FROM (\n" + " SELECT inner_query.*, ROWNUM rn FROM (\n" + " " + baseSql + whereClause + "\n" + " ) inner_query WHERE ROWNUM <= " + endRow + "\n" + ") WHERE rn >= " + startRow;
|
||||||
|
} else if ("mysql".equals(dbType)) {
|
||||||
|
long offset = startRow - 1;
|
||||||
|
paginatedSql = baseSql + whereClause + " LIMIT " + pageSize + " OFFSET " + offset;
|
||||||
|
} else if ("sqlserver".equals(dbType)) {
|
||||||
|
paginatedSql = baseSql + whereClause + " OFFSET " + (startRow - 1) + " ROWS FETCH NEXT " + pageSize + " ROWS ONLY";
|
||||||
|
} else {
|
||||||
|
throw new ProcessException("不支持的数据库类型: " + dbType);
|
||||||
|
}
|
||||||
|
|
||||||
|
FlowFile newFlowFile = session.create(flowFile);
|
||||||
|
newFlowFile = session.write(newFlowFile, out -> out.write(paginatedSql.getBytes(StandardCharsets.UTF_8)));
|
||||||
|
|
||||||
|
Map<String, String> attributes = new HashMap<>();
|
||||||
|
attributes.put("page", String.valueOf(page + 1));
|
||||||
|
attributes.put("startRow", String.valueOf(startRow));
|
||||||
|
attributes.put("endRow", String.valueOf(endRow));
|
||||||
|
newFlowFile = session.putAllAttributes(newFlowFile, attributes);
|
||||||
|
|
||||||
|
session.transfer(newFlowFile, REL_SUCCESS);
|
||||||
|
}
|
||||||
|
|
||||||
|
// 更新状态
|
||||||
|
final Map<String, String> newState = new HashMap<>(stateMap.toMap());
|
||||||
|
newState.put(tableName, queriedMaximumValue);
|
||||||
|
stateManager.setState(newState, scope);
|
||||||
|
|
||||||
|
// 删除原始 FlowFile
|
||||||
|
session.remove(flowFile);
|
||||||
|
|
||||||
|
} catch (SQLException e) {
|
||||||
|
getLogger().error("由于{},无法执行SQL查询;故障路由", e.getMessage(), e);
|
||||||
|
session.transfer(flowFile, REL_FAILURE);
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
getLogger().error("处理过程中出错;故障路由", e);
|
||||||
|
session.transfer(flowFile, REL_FAILURE);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,15 @@
|
||||||
|
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
# contributor license agreements. See the NOTICE file distributed with
|
||||||
|
# this work for additional information regarding copyright ownership.
|
||||||
|
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
# (the "License"); you may not use this file except in compliance with
|
||||||
|
# the License. You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
com.hzya.frame.DevGeneratePaginatedSqlProcessor
|
|
@ -0,0 +1,30 @@
|
||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||||
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
|
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||||
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
|
||||||
|
<groupId>com.hzya</groupId>
|
||||||
|
<artifactId>nifi-hzyadev-bundle</artifactId>
|
||||||
|
<packaging>pom</packaging>
|
||||||
|
<version>1.0</version>
|
||||||
|
<modules>
|
||||||
|
<module>hzya-nifi-DevGeneratePaginatedSqlProcessor-nar</module>
|
||||||
|
<module>hzya-nifi-DevGeneratePaginatedSqlProcessor-processors</module>
|
||||||
|
</modules>
|
||||||
|
|
||||||
|
<parent>
|
||||||
|
<groupId>org.apache.nifi</groupId>
|
||||||
|
<artifactId>nifi-external</artifactId>
|
||||||
|
<version>1.28.1</version>
|
||||||
|
</parent>
|
||||||
|
|
||||||
|
<properties>
|
||||||
|
<maven.compiler.source>8</maven.compiler.source>
|
||||||
|
<maven.compiler.target>8</maven.compiler.target>
|
||||||
|
<nifi-revision>1.28.1</nifi-revision>
|
||||||
|
</properties>
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
</project>
|
Loading…
Reference in New Issue