diff --git a/nifi-hzyadev-bundle/hzya-nifi-VoucherConversion-nar/pom.xml b/nifi-hzyadev-bundle/hzya-nifi-VoucherConversion-nar/pom.xml new file mode 100644 index 0000000..c311ea3 --- /dev/null +++ b/nifi-hzyadev-bundle/hzya-nifi-VoucherConversion-nar/pom.xml @@ -0,0 +1,32 @@ + + + + nifi-hzyadev-bundle + com.hzya + 1.0 + + 4.0.0 + + hzya-nifi-VoucherConversion-nar + + + 8 + 8 + + + + + com.hzya + hzya-nifi-VoucherConversion-processors + 1.0 + + + org.apache.nifi + nifi-standard-services-api-nar + ${nifi-revision} + nar + + + \ No newline at end of file diff --git a/nifi-hzyadev-bundle/hzya-nifi-VoucherConversion-processors/pom.xml b/nifi-hzyadev-bundle/hzya-nifi-VoucherConversion-processors/pom.xml new file mode 100644 index 0000000..75979ee --- /dev/null +++ b/nifi-hzyadev-bundle/hzya-nifi-VoucherConversion-processors/pom.xml @@ -0,0 +1,55 @@ + + + + nifi-hzyadev-bundle + com.hzya + 1.0 + + 4.0.0 + + hzya-nifi-VoucherConversion-processors + + + 8 + 8 + + + + + org.apache.nifi + nifi-api + ${nifi-revision} + + + org.apache.nifi + nifi-dbcp-service-api + ${nifi-revision} + + + + org.apache.nifi + nifi-processor-utils + 1.15.3 + + + org.apache.nifi + nifi-mock + ${nifi-revision} + test + + + junit + junit + 4.13 + test + + + + + + + + + \ No newline at end of file diff --git a/nifi-hzyadev-bundle/hzya-nifi-VoucherConversion-processors/src/main/java/com/hzya/frame/VoucherConversion.java b/nifi-hzyadev-bundle/hzya-nifi-VoucherConversion-processors/src/main/java/com/hzya/frame/VoucherConversion.java new file mode 100644 index 0000000..7f5bdf5 --- /dev/null +++ b/nifi-hzyadev-bundle/hzya-nifi-VoucherConversion-processors/src/main/java/com/hzya/frame/VoucherConversion.java @@ -0,0 +1,271 @@ +package com.hzya.frame; + +import org.apache.nifi.processor.AbstractProcessor; + +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * @Author:liuyang + * @Package:com.hzya.frame + * @Project:nifi-hzyadev-bundle + * @name:VoucherConversion + * @Date:2025/7/30 09:30 + * @Filename:VoucherConversion + */ +@Tags({"credential", "transform", "custom"}) +@CapabilityDescription("自定义处理器用于处理凭证转换逻辑,支持MySQL数据库查询") +@SeeAlso({}) +@ReadsAttributes({@ReadsAttribute(attribute = "", description = "")}) +@WritesAttributes({@WritesAttribute(attribute = "", description = "")}) +public class VoucherConversion extends AbstractProcessor { + // 数据库连接池服务属性 + public static final PropertyDescriptor DATABASE_CONNECTION_POOLING_SERVICE = new PropertyDescriptor.Builder().name("database-connection-pooling-service").displayName("Database Connection Pooling Service").description("数据库连接池服务").required(true).identifiesControllerService(DBCPService.class).build(); + + // 查询SQL属性 + public static final PropertyDescriptor QUERY_SQL = new PropertyDescriptor.Builder().name("query-sql").displayName("Query SQL").description("查询凭证信息的SQL语句").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).defaultValue("SELECT username, password, token FROM credentials WHERE user_id = ?").build(); + + // 更新SQL属性 + public static final PropertyDescriptor UPDATE_SQL = new PropertyDescriptor.Builder().name("update-sql").displayName("Update SQL").description("更新凭证信息的SQL语句").required(false).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).defaultValue("UPDATE credentials SET last_access_time = NOW() WHERE user_id = ?").build(); + + // 关系定义 + public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("成功处理的FlowFile").build(); + + public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("处理失败的FlowFile").build(); + + private List descriptors; + private Set relationships; + + @Override + protected void init(final ProcessorInitializationContext context) { + final List descriptors = new ArrayList(); + descriptors.add(DATABASE_CONNECTION_POOLING_SERVICE); + descriptors.add(QUERY_SQL); + descriptors.add(UPDATE_SQL); + this.descriptors = Collections.unmodifiableList(descriptors); + + final Set relationships = new HashSet(); + relationships.add(REL_SUCCESS); + relationships.add(REL_FAILURE); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + public Set getRelationships() { + return this.relationships; + } + + @Override + public final List getSupportedPropertyDescriptors() { + return descriptors; + } + + @OnScheduled + public void onScheduled(final ProcessContext context) { + // 处理器调度时的初始化逻辑 + getLogger().info("CredentialTransformProcessor scheduled"); + } + + @OnStopped + public void onStopped() { + // 处理器停止时的清理逻辑 + getLogger().info("CredentialTransformProcessor stopped"); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + try { + // 获取数据库连接池服务 + final DBCPService dbcpService = context.getProperty(DATABASE_CONNECTION_POOLING_SERVICE).asControllerService(DBCPService.class); + + // 获取SQL语句 + final String querySQL = context.getProperty(QUERY_SQL).getValue(); + final String updateSQL = context.getProperty(UPDATE_SQL).getValue(); + + // 执行数据库操作示例 + executeCredentialTransform(dbcpService, querySQL, updateSQL, flowFile); + + // 这里可以添加你的凭证转换业务逻辑 + // processCredentialLogic(flowFile, session); + + // 转移到成功关系 + session.transfer(flowFile, REL_SUCCESS); + getLogger().info("Successfully processed FlowFile {}", new Object[]{flowFile}); + + } catch (Exception e) { + getLogger().error("Failed to process FlowFile {}", new Object[]{flowFile, e}); + session.transfer(flowFile, REL_FAILURE); + } + } + + /** + * 执行凭证转换相关的数据库操作示例 + */ + private void executeCredentialTransform(DBCPService dbcpService, String querySQL, String updateSQL, FlowFile flowFile) throws SQLException { + + Connection connection = null; + PreparedStatement queryStmt = null; + PreparedStatement updateStmt = null; + ResultSet resultSet = null; + + try { + // 获取数据库连接 + connection = dbcpService.getConnection(); + + // 示例:从FlowFile属性中获取用户ID(实际场景中可能从FlowFile内容中解析) + String userId = flowFile.getAttribute("user_id"); + if (userId == null) { + userId = "default_user"; // 默认值或从其他地方获取 + } + + // 执行查询操作 + queryStmt = connection.prepareStatement(querySQL); + queryStmt.setString(1, userId); + resultSet = queryStmt.executeQuery(); + + // 处理查询结果 + while (resultSet.next()) { + String username = resultSet.getString("username"); + String password = resultSet.getString("password"); + String token = resultSet.getString("token"); + + getLogger().info("Retrieved credential for user: {}, username: {}", new Object[]{userId, username}); + + // 这里可以添加凭证转换逻辑 + // String transformedCredential = transformCredential(username, password, token); + } + + // 执行更新操作(如果提供了更新SQL) + if (updateSQL != null && !updateSQL.trim().isEmpty()) { + updateStmt = connection.prepareStatement(updateSQL); + updateStmt.setString(1, userId); + int updatedRows = updateStmt.executeUpdate(); + getLogger().info("Updated {} rows for user {}", new Object[]{updatedRows, userId}); + } + + // 示例:执行其他SQL操作 + executeSampleQueries(connection, userId); + + } finally { + // 关闭资源 + if (resultSet != null) { + try { + resultSet.close(); + } catch (SQLException e) { /* ignore */ } + } + if (queryStmt != null) { + try { + queryStmt.close(); + } catch (SQLException e) { /* ignore */ } + } + if (updateStmt != null) { + try { + updateStmt.close(); + } catch (SQLException e) { /* ignore */ } + } + if (connection != null) { + try { + connection.close(); + } catch (SQLException e) { /* ignore */ } + } + } + } + + /** + * 执行示例查询的方法 + */ + private void executeSampleQueries(Connection connection, String userId) throws SQLException { + + // 示例1:查询用户权限 + String permissionQuery = "SELECT permission_name FROM user_permissions WHERE user_id = ?"; + try (PreparedStatement stmt = connection.prepareStatement(permissionQuery)) { + stmt.setString(1, userId); + try (ResultSet rs = stmt.executeQuery()) { + while (rs.next()) { + String permission = rs.getString("permission_name"); + getLogger().debug("User {} has permission: {}", new Object[]{userId, permission}); + } + } + } + + // 示例2:插入审计日志 + String auditInsert = "INSERT INTO audit_log (user_id, operation, operation_time) VALUES (?, ?, NOW())"; + try (PreparedStatement stmt = connection.prepareStatement(auditInsert)) { + stmt.setString(1, userId); + stmt.setString(2, "credential_transform"); + int inserted = stmt.executeUpdate(); + getLogger().debug("Inserted {} audit log entry for user {}", new Object[]{inserted, userId}); + } + + // 示例3:批量操作 + String batchUpdate = "UPDATE user_session SET last_activity = NOW() WHERE user_id = ?"; + try (PreparedStatement stmt = connection.prepareStatement(batchUpdate)) { + // 可以添加到批次中 + stmt.setString(1, userId); + stmt.addBatch(); + + // 执行批次 + int[] results = stmt.executeBatch(); + getLogger().debug("Batch update results: {} for user {}", new Object[]{results.length, userId}); + } + } + + /** + * 示例凭证转换逻辑(占位方法) + */ + private String transformCredential(String username, String password, String token) { + // 这里添加你的具体凭证转换逻辑 + // 例如:加密、解密、格式转换等 + + getLogger().debug("Transforming credential for username: {}", username); + + // 示例转换逻辑 + return "transformed_" + token; + } + + /** + * 处理凭证相关业务逻辑的示例方法 + */ + private void processCredentialLogic(FlowFile flowFile, ProcessSession session) { + // 这里可以添加更多的凭证处理逻辑 + // 例如: + // 1. 解析FlowFile内容 + // 2. 验证凭证格式 + // 3. 执行转换操作 + // 4. 更新FlowFile属性 + + getLogger().info("Processing credential logic for FlowFile: {}", flowFile); + } +} diff --git a/nifi-hzyadev-bundle/pom.xml b/nifi-hzyadev-bundle/pom.xml index cc38ff8..2da3e48 100644 --- a/nifi-hzyadev-bundle/pom.xml +++ b/nifi-hzyadev-bundle/pom.xml @@ -19,6 +19,8 @@ hzya-nifi-AutoJsonTableCreate-nar hzya-nifi-U8CInterface-nar hzya-nifi-U8CInterface-processors + hzya-nifi-VoucherConversion-nar + hzya-nifi-VoucherConversion-processors