From 053ca7802a98f98773f260d1e80cd29559d3a49a Mon Sep 17 00:00:00 2001
From: liuy <37787198+LiuyCodes@users.noreply.github.com>
Date: Wed, 30 Jul 2025 09:38:12 +0800
Subject: [PATCH] =?UTF-8?q?feat(nifi):=20=E6=B7=BB=E5=8A=A0=E5=87=AD?=
=?UTF-8?q?=E8=AF=81=E8=BD=AC=E6=8D=A2=E5=A4=84=E7=90=86=E5=99=A8?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
- 新增 VoucherConversion处理器,用于处理凭证转换逻辑
- 支持 MySQL 数据库查询和更新操作
- 添加数据库连接池服务、查询 SQL 和更新 SQL 等配置项- 实现基本的凭证转换和处理逻辑
---
.../hzya-nifi-VoucherConversion-nar/pom.xml | 32 +++
.../pom.xml | 55 ++++
.../com/hzya/frame/VoucherConversion.java | 271 ++++++++++++++++++
nifi-hzyadev-bundle/pom.xml | 2 +
4 files changed, 360 insertions(+)
create mode 100644 nifi-hzyadev-bundle/hzya-nifi-VoucherConversion-nar/pom.xml
create mode 100644 nifi-hzyadev-bundle/hzya-nifi-VoucherConversion-processors/pom.xml
create mode 100644 nifi-hzyadev-bundle/hzya-nifi-VoucherConversion-processors/src/main/java/com/hzya/frame/VoucherConversion.java
diff --git a/nifi-hzyadev-bundle/hzya-nifi-VoucherConversion-nar/pom.xml b/nifi-hzyadev-bundle/hzya-nifi-VoucherConversion-nar/pom.xml
new file mode 100644
index 0000000..c311ea3
--- /dev/null
+++ b/nifi-hzyadev-bundle/hzya-nifi-VoucherConversion-nar/pom.xml
@@ -0,0 +1,32 @@
+
+
+
+ nifi-hzyadev-bundle
+ com.hzya
+ 1.0
+
+ 4.0.0
+
+ hzya-nifi-VoucherConversion-nar
+
+
+ 8
+ 8
+
+
+
+
+ com.hzya
+ hzya-nifi-VoucherConversion-processors
+ 1.0
+
+
+ org.apache.nifi
+ nifi-standard-services-api-nar
+ ${nifi-revision}
+ nar
+
+
+
\ No newline at end of file
diff --git a/nifi-hzyadev-bundle/hzya-nifi-VoucherConversion-processors/pom.xml b/nifi-hzyadev-bundle/hzya-nifi-VoucherConversion-processors/pom.xml
new file mode 100644
index 0000000..75979ee
--- /dev/null
+++ b/nifi-hzyadev-bundle/hzya-nifi-VoucherConversion-processors/pom.xml
@@ -0,0 +1,55 @@
+
+
+
+ nifi-hzyadev-bundle
+ com.hzya
+ 1.0
+
+ 4.0.0
+
+ hzya-nifi-VoucherConversion-processors
+
+
+ 8
+ 8
+
+
+
+
+ org.apache.nifi
+ nifi-api
+ ${nifi-revision}
+
+
+ org.apache.nifi
+ nifi-dbcp-service-api
+ ${nifi-revision}
+
+
+
+ org.apache.nifi
+ nifi-processor-utils
+ 1.15.3
+
+
+ org.apache.nifi
+ nifi-mock
+ ${nifi-revision}
+ test
+
+
+ junit
+ junit
+ 4.13
+ test
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/nifi-hzyadev-bundle/hzya-nifi-VoucherConversion-processors/src/main/java/com/hzya/frame/VoucherConversion.java b/nifi-hzyadev-bundle/hzya-nifi-VoucherConversion-processors/src/main/java/com/hzya/frame/VoucherConversion.java
new file mode 100644
index 0000000..7f5bdf5
--- /dev/null
+++ b/nifi-hzyadev-bundle/hzya-nifi-VoucherConversion-processors/src/main/java/com/hzya/frame/VoucherConversion.java
@@ -0,0 +1,271 @@
+package com.hzya.frame;
+
+import org.apache.nifi.processor.AbstractProcessor;
+
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.dbcp.DBCPService;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * @Author:liuyang
+ * @Package:com.hzya.frame
+ * @Project:nifi-hzyadev-bundle
+ * @name:VoucherConversion
+ * @Date:2025/7/30 09:30
+ * @Filename:VoucherConversion
+ */
+@Tags({"credential", "transform", "custom"})
+@CapabilityDescription("自定义处理器用于处理凭证转换逻辑,支持MySQL数据库查询")
+@SeeAlso({})
+@ReadsAttributes({@ReadsAttribute(attribute = "", description = "")})
+@WritesAttributes({@WritesAttribute(attribute = "", description = "")})
+public class VoucherConversion extends AbstractProcessor {
+ // 数据库连接池服务属性
+ public static final PropertyDescriptor DATABASE_CONNECTION_POOLING_SERVICE = new PropertyDescriptor.Builder().name("database-connection-pooling-service").displayName("Database Connection Pooling Service").description("数据库连接池服务").required(true).identifiesControllerService(DBCPService.class).build();
+
+ // 查询SQL属性
+ public static final PropertyDescriptor QUERY_SQL = new PropertyDescriptor.Builder().name("query-sql").displayName("Query SQL").description("查询凭证信息的SQL语句").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).defaultValue("SELECT username, password, token FROM credentials WHERE user_id = ?").build();
+
+ // 更新SQL属性
+ public static final PropertyDescriptor UPDATE_SQL = new PropertyDescriptor.Builder().name("update-sql").displayName("Update SQL").description("更新凭证信息的SQL语句").required(false).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).defaultValue("UPDATE credentials SET last_access_time = NOW() WHERE user_id = ?").build();
+
+ // 关系定义
+ public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("成功处理的FlowFile").build();
+
+ public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("处理失败的FlowFile").build();
+
+ private List descriptors;
+ private Set relationships;
+
+ @Override
+ protected void init(final ProcessorInitializationContext context) {
+ final List descriptors = new ArrayList();
+ descriptors.add(DATABASE_CONNECTION_POOLING_SERVICE);
+ descriptors.add(QUERY_SQL);
+ descriptors.add(UPDATE_SQL);
+ this.descriptors = Collections.unmodifiableList(descriptors);
+
+ final Set relationships = new HashSet();
+ relationships.add(REL_SUCCESS);
+ relationships.add(REL_FAILURE);
+ this.relationships = Collections.unmodifiableSet(relationships);
+ }
+
+ @Override
+ public Set getRelationships() {
+ return this.relationships;
+ }
+
+ @Override
+ public final List getSupportedPropertyDescriptors() {
+ return descriptors;
+ }
+
+ @OnScheduled
+ public void onScheduled(final ProcessContext context) {
+ // 处理器调度时的初始化逻辑
+ getLogger().info("CredentialTransformProcessor scheduled");
+ }
+
+ @OnStopped
+ public void onStopped() {
+ // 处理器停止时的清理逻辑
+ getLogger().info("CredentialTransformProcessor stopped");
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ try {
+ // 获取数据库连接池服务
+ final DBCPService dbcpService = context.getProperty(DATABASE_CONNECTION_POOLING_SERVICE).asControllerService(DBCPService.class);
+
+ // 获取SQL语句
+ final String querySQL = context.getProperty(QUERY_SQL).getValue();
+ final String updateSQL = context.getProperty(UPDATE_SQL).getValue();
+
+ // 执行数据库操作示例
+ executeCredentialTransform(dbcpService, querySQL, updateSQL, flowFile);
+
+ // 这里可以添加你的凭证转换业务逻辑
+ // processCredentialLogic(flowFile, session);
+
+ // 转移到成功关系
+ session.transfer(flowFile, REL_SUCCESS);
+ getLogger().info("Successfully processed FlowFile {}", new Object[]{flowFile});
+
+ } catch (Exception e) {
+ getLogger().error("Failed to process FlowFile {}", new Object[]{flowFile, e});
+ session.transfer(flowFile, REL_FAILURE);
+ }
+ }
+
+ /**
+ * 执行凭证转换相关的数据库操作示例
+ */
+ private void executeCredentialTransform(DBCPService dbcpService, String querySQL, String updateSQL, FlowFile flowFile) throws SQLException {
+
+ Connection connection = null;
+ PreparedStatement queryStmt = null;
+ PreparedStatement updateStmt = null;
+ ResultSet resultSet = null;
+
+ try {
+ // 获取数据库连接
+ connection = dbcpService.getConnection();
+
+ // 示例:从FlowFile属性中获取用户ID(实际场景中可能从FlowFile内容中解析)
+ String userId = flowFile.getAttribute("user_id");
+ if (userId == null) {
+ userId = "default_user"; // 默认值或从其他地方获取
+ }
+
+ // 执行查询操作
+ queryStmt = connection.prepareStatement(querySQL);
+ queryStmt.setString(1, userId);
+ resultSet = queryStmt.executeQuery();
+
+ // 处理查询结果
+ while (resultSet.next()) {
+ String username = resultSet.getString("username");
+ String password = resultSet.getString("password");
+ String token = resultSet.getString("token");
+
+ getLogger().info("Retrieved credential for user: {}, username: {}", new Object[]{userId, username});
+
+ // 这里可以添加凭证转换逻辑
+ // String transformedCredential = transformCredential(username, password, token);
+ }
+
+ // 执行更新操作(如果提供了更新SQL)
+ if (updateSQL != null && !updateSQL.trim().isEmpty()) {
+ updateStmt = connection.prepareStatement(updateSQL);
+ updateStmt.setString(1, userId);
+ int updatedRows = updateStmt.executeUpdate();
+ getLogger().info("Updated {} rows for user {}", new Object[]{updatedRows, userId});
+ }
+
+ // 示例:执行其他SQL操作
+ executeSampleQueries(connection, userId);
+
+ } finally {
+ // 关闭资源
+ if (resultSet != null) {
+ try {
+ resultSet.close();
+ } catch (SQLException e) { /* ignore */ }
+ }
+ if (queryStmt != null) {
+ try {
+ queryStmt.close();
+ } catch (SQLException e) { /* ignore */ }
+ }
+ if (updateStmt != null) {
+ try {
+ updateStmt.close();
+ } catch (SQLException e) { /* ignore */ }
+ }
+ if (connection != null) {
+ try {
+ connection.close();
+ } catch (SQLException e) { /* ignore */ }
+ }
+ }
+ }
+
+ /**
+ * 执行示例查询的方法
+ */
+ private void executeSampleQueries(Connection connection, String userId) throws SQLException {
+
+ // 示例1:查询用户权限
+ String permissionQuery = "SELECT permission_name FROM user_permissions WHERE user_id = ?";
+ try (PreparedStatement stmt = connection.prepareStatement(permissionQuery)) {
+ stmt.setString(1, userId);
+ try (ResultSet rs = stmt.executeQuery()) {
+ while (rs.next()) {
+ String permission = rs.getString("permission_name");
+ getLogger().debug("User {} has permission: {}", new Object[]{userId, permission});
+ }
+ }
+ }
+
+ // 示例2:插入审计日志
+ String auditInsert = "INSERT INTO audit_log (user_id, operation, operation_time) VALUES (?, ?, NOW())";
+ try (PreparedStatement stmt = connection.prepareStatement(auditInsert)) {
+ stmt.setString(1, userId);
+ stmt.setString(2, "credential_transform");
+ int inserted = stmt.executeUpdate();
+ getLogger().debug("Inserted {} audit log entry for user {}", new Object[]{inserted, userId});
+ }
+
+ // 示例3:批量操作
+ String batchUpdate = "UPDATE user_session SET last_activity = NOW() WHERE user_id = ?";
+ try (PreparedStatement stmt = connection.prepareStatement(batchUpdate)) {
+ // 可以添加到批次中
+ stmt.setString(1, userId);
+ stmt.addBatch();
+
+ // 执行批次
+ int[] results = stmt.executeBatch();
+ getLogger().debug("Batch update results: {} for user {}", new Object[]{results.length, userId});
+ }
+ }
+
+ /**
+ * 示例凭证转换逻辑(占位方法)
+ */
+ private String transformCredential(String username, String password, String token) {
+ // 这里添加你的具体凭证转换逻辑
+ // 例如:加密、解密、格式转换等
+
+ getLogger().debug("Transforming credential for username: {}", username);
+
+ // 示例转换逻辑
+ return "transformed_" + token;
+ }
+
+ /**
+ * 处理凭证相关业务逻辑的示例方法
+ */
+ private void processCredentialLogic(FlowFile flowFile, ProcessSession session) {
+ // 这里可以添加更多的凭证处理逻辑
+ // 例如:
+ // 1. 解析FlowFile内容
+ // 2. 验证凭证格式
+ // 3. 执行转换操作
+ // 4. 更新FlowFile属性
+
+ getLogger().info("Processing credential logic for FlowFile: {}", flowFile);
+ }
+}
diff --git a/nifi-hzyadev-bundle/pom.xml b/nifi-hzyadev-bundle/pom.xml
index cc38ff8..2da3e48 100644
--- a/nifi-hzyadev-bundle/pom.xml
+++ b/nifi-hzyadev-bundle/pom.xml
@@ -19,6 +19,8 @@
hzya-nifi-AutoJsonTableCreate-nar
hzya-nifi-U8CInterface-nar
hzya-nifi-U8CInterface-processors
+ hzya-nifi-VoucherConversion-nar
+ hzya-nifi-VoucherConversion-processors