feat(nifi): 添加凭证转换处理器

- 新增 VoucherConversion处理器,用于处理凭证转换逻辑
- 支持 MySQL 数据库查询和更新操作
- 添加数据库连接池服务、查询 SQL 和更新 SQL 等配置项- 实现基本的凭证转换和处理逻辑
This commit is contained in:
liuy 2025-07-30 09:38:12 +08:00
parent c016d949b6
commit 053ca7802a
4 changed files with 360 additions and 0 deletions

View File

@ -0,0 +1,32 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>nifi-hzyadev-bundle</artifactId>
<groupId>com.hzya</groupId>
<version>1.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>hzya-nifi-VoucherConversion-nar</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>com.hzya</groupId>
<artifactId>hzya-nifi-VoucherConversion-processors</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-services-api-nar</artifactId>
<version>${nifi-revision}</version>
<type>nar</type>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,55 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>nifi-hzyadev-bundle</artifactId>
<groupId>com.hzya</groupId>
<version>1.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>hzya-nifi-VoucherConversion-processors</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
<version>${nifi-revision}</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-dbcp-service-api</artifactId>
<version>${nifi-revision}</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId>
<version>1.15.3</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<version>${nifi-revision}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13</version>
<scope>test</scope>
</dependency>
<!-- <dependency>-->
<!-- <groupId>com.squareup.okhttp3</groupId>-->
<!-- <artifactId>mockwebserver</artifactId> &lt;!&ndash; 注意是mockwebserver而非okhttp &ndash;&gt;-->
<!-- <version>5.1.0</version> &lt;!&ndash; 版本与主库一致 &ndash;&gt;-->
<!-- <scope>test</scope> &lt;!&ndash; 测试专用 &ndash;&gt;-->
<!-- </dependency>-->
</dependencies>
</project>

View File

@ -0,0 +1,271 @@
package com.hzya.frame;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
/**
* @Authorliuyang
* @Packagecom.hzya.frame
* @Projectnifi-hzyadev-bundle
* @nameVoucherConversion
* @Date2025/7/30 09:30
* @FilenameVoucherConversion
*/
@Tags({"credential", "transform", "custom"})
@CapabilityDescription("自定义处理器用于处理凭证转换逻辑支持MySQL数据库查询")
@SeeAlso({})
@ReadsAttributes({@ReadsAttribute(attribute = "", description = "")})
@WritesAttributes({@WritesAttribute(attribute = "", description = "")})
public class VoucherConversion extends AbstractProcessor {
// 数据库连接池服务属性
public static final PropertyDescriptor DATABASE_CONNECTION_POOLING_SERVICE = new PropertyDescriptor.Builder().name("database-connection-pooling-service").displayName("Database Connection Pooling Service").description("数据库连接池服务").required(true).identifiesControllerService(DBCPService.class).build();
// 查询SQL属性
public static final PropertyDescriptor QUERY_SQL = new PropertyDescriptor.Builder().name("query-sql").displayName("Query SQL").description("查询凭证信息的SQL语句").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).defaultValue("SELECT username, password, token FROM credentials WHERE user_id = ?").build();
// 更新SQL属性
public static final PropertyDescriptor UPDATE_SQL = new PropertyDescriptor.Builder().name("update-sql").displayName("Update SQL").description("更新凭证信息的SQL语句").required(false).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).defaultValue("UPDATE credentials SET last_access_time = NOW() WHERE user_id = ?").build();
// 关系定义
public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("成功处理的FlowFile").build();
public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("处理失败的FlowFile").build();
private List<PropertyDescriptor> descriptors;
private Set<Relationship> relationships;
@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
descriptors.add(DATABASE_CONNECTION_POOLING_SERVICE);
descriptors.add(QUERY_SQL);
descriptors.add(UPDATE_SQL);
this.descriptors = Collections.unmodifiableList(descriptors);
final Set<Relationship> relationships = new HashSet<Relationship>();
relationships.add(REL_SUCCESS);
relationships.add(REL_FAILURE);
this.relationships = Collections.unmodifiableSet(relationships);
}
@Override
public Set<Relationship> getRelationships() {
return this.relationships;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
}
@OnScheduled
public void onScheduled(final ProcessContext context) {
// 处理器调度时的初始化逻辑
getLogger().info("CredentialTransformProcessor scheduled");
}
@OnStopped
public void onStopped() {
// 处理器停止时的清理逻辑
getLogger().info("CredentialTransformProcessor stopped");
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
try {
// 获取数据库连接池服务
final DBCPService dbcpService = context.getProperty(DATABASE_CONNECTION_POOLING_SERVICE).asControllerService(DBCPService.class);
// 获取SQL语句
final String querySQL = context.getProperty(QUERY_SQL).getValue();
final String updateSQL = context.getProperty(UPDATE_SQL).getValue();
// 执行数据库操作示例
executeCredentialTransform(dbcpService, querySQL, updateSQL, flowFile);
// 这里可以添加你的凭证转换业务逻辑
// processCredentialLogic(flowFile, session);
// 转移到成功关系
session.transfer(flowFile, REL_SUCCESS);
getLogger().info("Successfully processed FlowFile {}", new Object[]{flowFile});
} catch (Exception e) {
getLogger().error("Failed to process FlowFile {}", new Object[]{flowFile, e});
session.transfer(flowFile, REL_FAILURE);
}
}
/**
* 执行凭证转换相关的数据库操作示例
*/
private void executeCredentialTransform(DBCPService dbcpService, String querySQL, String updateSQL, FlowFile flowFile) throws SQLException {
Connection connection = null;
PreparedStatement queryStmt = null;
PreparedStatement updateStmt = null;
ResultSet resultSet = null;
try {
// 获取数据库连接
connection = dbcpService.getConnection();
// 示例从FlowFile属性中获取用户ID实际场景中可能从FlowFile内容中解析
String userId = flowFile.getAttribute("user_id");
if (userId == null) {
userId = "default_user"; // 默认值或从其他地方获取
}
// 执行查询操作
queryStmt = connection.prepareStatement(querySQL);
queryStmt.setString(1, userId);
resultSet = queryStmt.executeQuery();
// 处理查询结果
while (resultSet.next()) {
String username = resultSet.getString("username");
String password = resultSet.getString("password");
String token = resultSet.getString("token");
getLogger().info("Retrieved credential for user: {}, username: {}", new Object[]{userId, username});
// 这里可以添加凭证转换逻辑
// String transformedCredential = transformCredential(username, password, token);
}
// 执行更新操作如果提供了更新SQL
if (updateSQL != null && !updateSQL.trim().isEmpty()) {
updateStmt = connection.prepareStatement(updateSQL);
updateStmt.setString(1, userId);
int updatedRows = updateStmt.executeUpdate();
getLogger().info("Updated {} rows for user {}", new Object[]{updatedRows, userId});
}
// 示例执行其他SQL操作
executeSampleQueries(connection, userId);
} finally {
// 关闭资源
if (resultSet != null) {
try {
resultSet.close();
} catch (SQLException e) { /* ignore */ }
}
if (queryStmt != null) {
try {
queryStmt.close();
} catch (SQLException e) { /* ignore */ }
}
if (updateStmt != null) {
try {
updateStmt.close();
} catch (SQLException e) { /* ignore */ }
}
if (connection != null) {
try {
connection.close();
} catch (SQLException e) { /* ignore */ }
}
}
}
/**
* 执行示例查询的方法
*/
private void executeSampleQueries(Connection connection, String userId) throws SQLException {
// 示例1查询用户权限
String permissionQuery = "SELECT permission_name FROM user_permissions WHERE user_id = ?";
try (PreparedStatement stmt = connection.prepareStatement(permissionQuery)) {
stmt.setString(1, userId);
try (ResultSet rs = stmt.executeQuery()) {
while (rs.next()) {
String permission = rs.getString("permission_name");
getLogger().debug("User {} has permission: {}", new Object[]{userId, permission});
}
}
}
// 示例2插入审计日志
String auditInsert = "INSERT INTO audit_log (user_id, operation, operation_time) VALUES (?, ?, NOW())";
try (PreparedStatement stmt = connection.prepareStatement(auditInsert)) {
stmt.setString(1, userId);
stmt.setString(2, "credential_transform");
int inserted = stmt.executeUpdate();
getLogger().debug("Inserted {} audit log entry for user {}", new Object[]{inserted, userId});
}
// 示例3批量操作
String batchUpdate = "UPDATE user_session SET last_activity = NOW() WHERE user_id = ?";
try (PreparedStatement stmt = connection.prepareStatement(batchUpdate)) {
// 可以添加到批次中
stmt.setString(1, userId);
stmt.addBatch();
// 执行批次
int[] results = stmt.executeBatch();
getLogger().debug("Batch update results: {} for user {}", new Object[]{results.length, userId});
}
}
/**
* 示例凭证转换逻辑占位方法
*/
private String transformCredential(String username, String password, String token) {
// 这里添加你的具体凭证转换逻辑
// 例如加密解密格式转换等
getLogger().debug("Transforming credential for username: {}", username);
// 示例转换逻辑
return "transformed_" + token;
}
/**
* 处理凭证相关业务逻辑的示例方法
*/
private void processCredentialLogic(FlowFile flowFile, ProcessSession session) {
// 这里可以添加更多的凭证处理逻辑
// 例如
// 1. 解析FlowFile内容
// 2. 验证凭证格式
// 3. 执行转换操作
// 4. 更新FlowFile属性
getLogger().info("Processing credential logic for FlowFile: {}", flowFile);
}
}

View File

@ -19,6 +19,8 @@
<module>hzya-nifi-AutoJsonTableCreate-nar</module>
<module>hzya-nifi-U8CInterface-nar</module>
<module>hzya-nifi-U8CInterface-processors</module>
<module>hzya-nifi-VoucherConversion-nar</module>
<module>hzya-nifi-VoucherConversion-processors</module>
</modules>
<parent>