From 66b6dbbf4ada13b4f2481dc6304b1b310e53d2c1 Mon Sep 17 00:00:00 2001
From: liuy <37787198+LiuyCodes@users.noreply.github.com>
Date: Thu, 14 Aug 2025 19:02:37 +0800
Subject: [PATCH] =?UTF-8?q?feat(nifi):=20=E6=B7=BB=E5=8A=A0=E8=87=AA?=
=?UTF-8?q?=E5=8A=A8=E6=89=A9=E5=B1=95=20Oracle=20=E8=A1=A8=E7=A9=BA?=
=?UTF-8?q?=E9=97=B4=E7=9A=84=E5=8A=9F=E8=83=BD?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
- 新增 DevAutoAddOracleDatafileProcessor处理器,用于监控 Oracle表空间使用情况
- 当表空间使用率超过阈值时,自动添加新的数据文件
-检查磁盘空间使用情况,如果超过阈值则生成告警
- 支持自定义表空间名称、使用率阈值、数据文件路径等参数
- 生成并输出告警信息,包括磁盘空间不足、表空间使用率过高、数据文件自动扩展问题等
---
.../pom.xml | 35 +++
.../pom.xml | 56 ++++
.../DevAutoAddOracleDatafileProcessor.java | 255 ++++++++++++++++++
.../org.apache.nifi.processor.Processor | 15 ++
nifi-hzyadev-bundle/pom.xml | 2 +
5 files changed, 363 insertions(+)
create mode 100644 nifi-hzyadev-bundle/hzya-nifi-AutoAddOracleDatafile-nar/pom.xml
create mode 100644 nifi-hzyadev-bundle/hzya-nifi-AutoAddOracleDatafile-processors/pom.xml
create mode 100644 nifi-hzyadev-bundle/hzya-nifi-AutoAddOracleDatafile-processors/src/main/java/com/hzya/frame/DevAutoAddOracleDatafileProcessor.java
create mode 100644 nifi-hzyadev-bundle/hzya-nifi-AutoAddOracleDatafile-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
diff --git a/nifi-hzyadev-bundle/hzya-nifi-AutoAddOracleDatafile-nar/pom.xml b/nifi-hzyadev-bundle/hzya-nifi-AutoAddOracleDatafile-nar/pom.xml
new file mode 100644
index 0000000..f2f5403
--- /dev/null
+++ b/nifi-hzyadev-bundle/hzya-nifi-AutoAddOracleDatafile-nar/pom.xml
@@ -0,0 +1,35 @@
+
+
+
+ nifi-hzyadev-bundle
+ com.hzya
+ 1.0
+
+ 4.0.0
+
+ hzya-nifi-AutoAddOracleDatafile-nar
+
+ nar
+
+
+ 8
+ 8
+
+
+
+
+ com.hzya
+ hzya-nifi-AutoAddOracleDatafile-processors
+ 1.0
+
+
+ org.apache.nifi
+ nifi-standard-services-api-nar
+ ${nifi-revision}
+ nar
+
+
+
+
\ No newline at end of file
diff --git a/nifi-hzyadev-bundle/hzya-nifi-AutoAddOracleDatafile-processors/pom.xml b/nifi-hzyadev-bundle/hzya-nifi-AutoAddOracleDatafile-processors/pom.xml
new file mode 100644
index 0000000..ec642b5
--- /dev/null
+++ b/nifi-hzyadev-bundle/hzya-nifi-AutoAddOracleDatafile-processors/pom.xml
@@ -0,0 +1,56 @@
+
+
+
+ nifi-hzyadev-bundle
+ com.hzya
+ 1.0
+
+ 4.0.0
+
+ hzya-nifi-AutoAddOracleDatafile-processors
+
+
+ 8
+ 8
+
+
+
+
+ org.apache.nifi
+ nifi-api
+ ${nifi-revision}
+
+
+ org.apache.nifi
+ nifi-dbcp-service-api
+ ${nifi-revision}
+
+
+
+ org.apache.nifi
+ nifi-processor-utils
+ 1.15.3
+
+
+ org.apache.nifi
+ nifi-mock
+ ${nifi-revision}
+ test
+
+
+ junit
+ junit
+ 4.13
+ test
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/nifi-hzyadev-bundle/hzya-nifi-AutoAddOracleDatafile-processors/src/main/java/com/hzya/frame/DevAutoAddOracleDatafileProcessor.java b/nifi-hzyadev-bundle/hzya-nifi-AutoAddOracleDatafile-processors/src/main/java/com/hzya/frame/DevAutoAddOracleDatafileProcessor.java
new file mode 100644
index 0000000..cb4eff8
--- /dev/null
+++ b/nifi-hzyadev-bundle/hzya-nifi-AutoAddOracleDatafile-processors/src/main/java/com/hzya/frame/DevAutoAddOracleDatafileProcessor.java
@@ -0,0 +1,255 @@
+package com.hzya.frame;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.dbcp.DBCPService;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.FileStore;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+@Tags({"oracle", "database", "dba", "tablespace", "auto", "datafile"})
+@CapabilityDescription("监控Oracle表空间的使用情况,并在使用率超过指定阈值时自动添加新的数据文件。" + "同时,它会检查磁盘空间和数据文件的AUTOEXTENSIBLE属性,并收集潜在问题的告警信息。")
+@SeeAlso({})
+@ReadsAttributes({})
+@WritesAttributes({@WritesAttribute(attribute = "autoadd.oracle.datafile.alerts", description = "包含在检查过程中生成的告警信息。多条信息将以换行符分隔。")})
+@InputRequirement(Requirement.INPUT_ALLOWED)
+public class DevAutoAddOracleDatafileProcessor extends AbstractProcessor {
+
+ // 处理器属性定义
+ public static final PropertyDescriptor DBCP_SERVICE = new PropertyDescriptor.Builder().name("dbcp-service").displayName("数据库连接池服务").description("提供Oracle数据库连接的控制器服务。").required(true).identifiesControllerService(DBCPService.class).build();
+
+ public static final PropertyDescriptor TABLESPACE_NAMES = new PropertyDescriptor.Builder().name("tablespace-names").displayName("表空间名称").description("需要监控的Oracle表空间名称列表,使用逗号分隔 (例如:USERS,DATA01)。").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
+
+ public static final PropertyDescriptor USAGE_THRESHOLD = new PropertyDescriptor.Builder().name("usage-threshold").displayName("使用率阈值 (%)").description("触发添加新数据文件的表空间使用率百分比 (1-99)。例如,填入 80。").required(true).defaultValue("80").addValidator(StandardValidators.createLongValidator(1, 99, true)).build();
+
+ public static final PropertyDescriptor DATAFILE_PATH = new PropertyDescriptor.Builder().name("datafile-path").displayName("新数据文件路径").description("用于创建新数据文件的目录的绝对路径。请确保运行Oracle的用户对此目录有写入权限。" + "同时支持Linux (例如 /u01/app/oracle/oradata/ORCL/) 和 Windows (例如 C:\\oracle\\oradata\\ORCL\\) 路径。").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
+
+ // public static final PropertyDescriptor NEW_DATAFILE_SIZE = new PropertyDescriptor.Builder().name("new-datafile-size").displayName("新数据文件大小").description("要创建的新数据文件的大小 (例如, 1G, 512M, 2048K)。").required(true).defaultValue("100M").addValidator(StandardValidators.DATA_SIZE_VALIDATOR).build();
+ public static final PropertyDescriptor NEW_DATAFILE_SIZE = new PropertyDescriptor.Builder().name("new-datafile-size").displayName("新数据文件大小").description("要创建的新数据文件的大小 (例如, 1G, 512M, 2048K)。").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).defaultValue("100M").build();
+
+ public static final PropertyDescriptor DISK_USAGE_THRESHOLD = new PropertyDescriptor.Builder().name("disk-usage-threshold").displayName("磁盘使用率阈值 (%)").description("如果数据文件路径所在磁盘的使用率超过此百分比(1-99),将生成告警并且不会添加数据文件。").required(true).defaultValue("90").addValidator(StandardValidators.createLongValidator(1, 99, true)).build();
+
+ // 关系定义
+ public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("检查成功完成后(无论是否添加了数据文件),FlowFile将路由到此关系。").build();
+
+ public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("当发生不可恢复的错误时(如数据库连接失败),FlowFile将路由到此关系。").build();
+
+ public static final Relationship REL_ALERT = new Relationship.Builder().name("alert").description("当检测到潜在问题时(如磁盘使用率过高、AUTOEXTEND关闭等),包含告警信息的新FlowFile将被路由到此关系。").build();
+
+ private List descriptors;
+ private Set relationships;
+
+ @Override
+ protected void init(final ProcessorInitializationContext context) {
+ final List descriptors = new ArrayList<>();
+ descriptors.add(DBCP_SERVICE);
+ descriptors.add(TABLESPACE_NAMES);
+ descriptors.add(USAGE_THRESHOLD);
+ descriptors.add(DATAFILE_PATH);
+ descriptors.add(NEW_DATAFILE_SIZE);
+ descriptors.add(DISK_USAGE_THRESHOLD);
+ this.descriptors = Collections.unmodifiableList(descriptors);
+
+ final Set relationships = new HashSet<>();
+ relationships.add(REL_SUCCESS);
+ relationships.add(REL_FAILURE);
+ relationships.add(REL_ALERT);
+ this.relationships = Collections.unmodifiableSet(relationships);
+ }
+
+ @Override
+ public Set getRelationships() {
+ return this.relationships;
+ }
+
+ @Override
+ public final List getSupportedPropertyDescriptors() {
+ return descriptors;
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+ // 如果没有输入FlowFile,则创建一个新的来触发执行逻辑
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ flowFile = session.create();
+ }
+
+ final long usageThreshold = context.getProperty(USAGE_THRESHOLD).asLong();
+ final long diskUsageThreshold = context.getProperty(DISK_USAGE_THRESHOLD).asLong();
+ final String datafileBasePath = context.getProperty(DATAFILE_PATH).getValue();
+ final String newDatafileSize = context.getProperty(NEW_DATAFILE_SIZE).getValue();
+ final List alertMessages = new ArrayList<>();
+
+ // 从属性中获取逗号分隔的表空间名称字符串,并转换为列表
+ final String rawTablespaceNames = context.getProperty(TABLESPACE_NAMES).getValue();
+ final List tablespaceNames = Arrays.stream(rawTablespaceNames.split(",")).map(String::trim) // 去除每个表空间名称前后的空格
+ .filter(name -> !name.isEmpty()) // 过滤掉可能因 "a,,b" 这种格式产生的空字符串
+ .collect(Collectors.toList());
+
+ final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
+
+ // 步骤 1: 检查数据文件目录的磁盘空间
+ // 此检查方法兼容 Windows 和 Linux 操作系统
+ try {
+ Path path = Paths.get(datafileBasePath);
+ if (!Files.exists(path)) {
+ throw new IOException("数据文件路径不存在: " + datafileBasePath);
+ }
+ FileStore store = Files.getFileStore(path);
+ long totalSpace = store.getTotalSpace();
+ long usableSpace = store.getUsableSpace();
+ long usedSpace = totalSpace - usableSpace;
+ // 处理 totalSpace 为 0 的情况,避免除零异常
+ double currentDiskUsage = totalSpace > 0 ? (double) usedSpace / totalSpace * 100 : 0;
+
+ String diskUsageMsg = String.format("磁盘路径 '%s' 空间耗用: %.2f%% (已用: %d GB, 总计: %d GB)", datafileBasePath, currentDiskUsage, usedSpace / 1024 / 1024 / 1024, totalSpace / 1024 / 1024 / 1024);
+ getLogger().info(diskUsageMsg); // 关键信息:记录磁盘耗用日志
+
+ if (currentDiskUsage >= diskUsageThreshold) {
+ String alert = String.format("严重告警: 磁盘路径 '%s' 的空间使用率 %.2f%%, 已超过设定的阈值 %d%%。", datafileBasePath, currentDiskUsage, diskUsageThreshold);
+ alertMessages.add(alert);
+ getLogger().error(alert); // 关键信息:记录到日志
+ }
+ } catch (IOException e) {
+ getLogger().error("检查磁盘空间失败,路径: '{}'。请检查路径是否正确以及NiFi运行用户是否有权访问。", new Object[]{datafileBasePath}, e);
+ session.transfer(session.penalize(flowFile), REL_FAILURE);
+ return;
+ }
+
+ // 步骤 2: 连接数据库并对每个表空间进行检查
+ try (final Connection conn = dbcpService.getConnection(flowFile.getAttributes())) {
+ // 循环处理每一个配置的表空间
+ for (String tablespaceName : tablespaceNames) {
+ tablespaceName = tablespaceName.trim().toUpperCase();
+ List currentTbsAlerts = new ArrayList<>(); // 用于收集当前循环中表空间的告警信息
+
+ // 步骤 2a: 验证表空间数据文件是否可以自动扩容 (AUTOEXTENSIBLE)
+ String autoextendSql = "SELECT file_name, autoextensible FROM dba_data_files WHERE tablespace_name = ?";
+ try (PreparedStatement ps = conn.prepareStatement(autoextendSql)) {
+ ps.setString(1, tablespaceName);
+ ResultSet rs = ps.executeQuery();
+ boolean hasNonAutoensibleFile = false;
+ while (rs.next()) {
+ if ("NO".equalsIgnoreCase(rs.getString("autoextensible"))) {
+ hasNonAutoensibleFile = true;
+ getLogger().warn("表空间 '{}' 的数据文件 '{}' 被设置为 AUTOEXTENSIBLE=NO。", new Object[]{tablespaceName, rs.getString("file_name")});
+ }
+ }
+ if (hasNonAutoensibleFile) {
+ currentTbsAlerts.add(String.format("告警: 表空间 '%s' 中存在一个或多个数据文件的 AUTOEXTENSIBLE 属性为 'NO'。", tablespaceName));
+ }
+ }
+
+ // 步骤 2b: 使用您提供的SQL1逻辑来查询表空间使用率 (兼容Oracle 11g, 12c, 19c)
+ String usageSql = "SELECT used_gb, current_gb, used_pct_max FROM (" + "SELECT df.tablespace_name, " + "ROUND( ( SUM( df.bytes ) - SUM( NVL(fs.free_bytes, 0) ) ) / 1024 / 1024 / 1024, 2 ) AS used_gb, " + "ROUND( SUM( df.bytes ) / 1024 / 1024 / 1024, 2 ) AS current_gb, " + "CASE WHEN SUM(df.maxbytes) = 0 THEN 100 ELSE ROUND( ( SUM( df.bytes ) - SUM( NVL(fs.free_bytes, 0) ) ) / SUM( df.maxbytes ) * 100, 2 ) END AS used_pct_max " + "FROM dba_data_files df LEFT JOIN ( SELECT tablespace_name, file_id, SUM( bytes ) AS free_bytes " + "FROM dba_free_space GROUP BY tablespace_name, file_id ) fs ON df.file_id = fs.file_id " + "GROUP BY df.tablespace_name) " + "WHERE tablespace_name = ?";
+
+ try (PreparedStatement ps = conn.prepareStatement(usageSql)) {
+ ps.setString(1, tablespaceName);
+ ResultSet rs = ps.executeQuery();
+ if (rs.next()) {
+ // 读取大数字时使用 getBigDecimal 保证精度,然后转换为 double
+ double usedPct = rs.getBigDecimal("used_pct_max").doubleValue();
+ double usedGb = rs.getBigDecimal("used_gb").doubleValue();
+ double currentGb = rs.getBigDecimal("current_gb").doubleValue();
+
+ String usageLogMsg = String.format("表空间 '%s' 空间耗用: %.2f%% (已用: %.2f GB, 当前总大小: %.2f GB)", tablespaceName, usedPct, usedGb, currentGb);
+ getLogger().info(usageLogMsg); // 关键信息:记录表空间耗用日志
+
+ if (usedPct > usageThreshold) {
+ // 表空间使用率已超过阈值
+ String thresholdAlert = String.format("告警: 表空间 '%s' 使用率 %.2f%%, 已超过设定的阈值 %d%%。", tablespaceName, usedPct, usageThreshold);
+ currentTbsAlerts.add(thresholdAlert);
+ getLogger().warn(thresholdAlert);
+
+ // 再次确认磁盘空间是否超限,如果已超限,则不执行添加操作
+ if (alertMessages.stream().anyMatch(m -> m.startsWith("严重告警: 磁盘路径"))) {
+ getLogger().error("由于磁盘空间使用率已超阈值,跳过为表空间 '{}' 添加数据文件的操作。", new Object[]{tablespaceName});
+ } else {
+ // 执行添加数据文件的逻辑
+ String timestamp = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date());
+ String newFileName = String.format("%s_%s.dbf", tablespaceName, timestamp); // 数据文件命名规则
+ String fullPath = new File(datafileBasePath, newFileName).getPath();
+ // 为Windows系统处理路径分隔符
+ if (System.getProperty("os.name").toLowerCase().contains("win")) {
+ fullPath = fullPath.replace('\\', '/');
+ }
+
+ String addDatafileSql = String.format("ALTER TABLESPACE %s ADD DATAFILE '%s' SIZE %s AUTOEXTEND ON NEXT 50M", tablespaceName, fullPath, newDatafileSize);
+
+ getLogger().info("准备执行SQL语句以添加数据文件: {}", new Object[]{addDatafileSql}); // 关键信息:记录创建数据文件的SQL
+
+ try (Statement stmt = conn.createStatement()) {
+ stmt.execute(addDatafileSql);
+ getLogger().info("成功为表空间 '{}' 添加数据文件 '{}'。", new Object[]{tablespaceName, fullPath});
+ } catch (Exception e) {
+ String addFileError = String.format("严重告警: 为表空间 '%s' 添加数据文件失败。SQL: %s", tablespaceName, addDatafileSql);
+ getLogger().error(addFileError, e);
+ currentTbsAlerts.add(addFileError + " - 错误详情: " + e.getMessage());
+ }
+ }
+ }
+ } else {
+ String notFoundAlert = String.format("警告: 未找到表空间 '%s',或相关的空间使用率查询没有返回数据。", tablespaceName);
+ getLogger().warn(notFoundAlert);
+ currentTbsAlerts.add(notFoundAlert);
+ }
+ }
+ // 将当前表空间的所有告警信息统一添加到主告警列表中
+ alertMessages.addAll(currentTbsAlerts);
+ }
+
+ } catch (Exception e) {
+ getLogger().error("处理Oracle表空间检查时发生错误: {}", e.getMessage(), e);
+ session.transfer(session.penalize(flowFile), REL_FAILURE);
+ return;
+ }
+
+ // 步骤 3: 如果收集到了任何告警信息,则创建一个新的FlowFile并路由到 'alert' 关系
+ if (!alertMessages.isEmpty()) {
+ FlowFile alertFlowFile = session.create(flowFile);
+ // 将所有告警信息用换行符连接成一个字符串
+ String alertContent = String.join("\n", alertMessages);
+ alertFlowFile = session.write(alertFlowFile, out -> out.write(alertContent.getBytes(StandardCharsets.UTF_8)));
+ // 同时将告警信息写入FlowFile的属性中,方便下游处理器直接使用
+ alertFlowFile = session.putAttribute(alertFlowFile, "autoadd.oracle.datafile.alerts", alertContent);
+ session.transfer(alertFlowFile, REL_ALERT);
+ }
+
+ // 步骤 4: 将原始的(或新创建的)FlowFile路由到 'success' 关系
+ session.transfer(flowFile, REL_SUCCESS);
+ }
+}
\ No newline at end of file
diff --git a/nifi-hzyadev-bundle/hzya-nifi-AutoAddOracleDatafile-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-hzyadev-bundle/hzya-nifi-AutoAddOracleDatafile-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
new file mode 100644
index 0000000..eab10c4
--- /dev/null
+++ b/nifi-hzyadev-bundle/hzya-nifi-AutoAddOracleDatafile-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -0,0 +1,15 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+com.hzya.frame.DevAutoAddOracleDatafileProcessor
\ No newline at end of file
diff --git a/nifi-hzyadev-bundle/pom.xml b/nifi-hzyadev-bundle/pom.xml
index 9aa8cbb..a061eb5 100644
--- a/nifi-hzyadev-bundle/pom.xml
+++ b/nifi-hzyadev-bundle/pom.xml
@@ -23,6 +23,8 @@
hzya-nifi-VoucherConversion-processors
hzya-nifi-AdvancedJoltTransformer-nar
hzya-nifi-AdvancedJoltTransformer-processors
+ hzya-nifi-AutoAddOracleDatafile-nar
+ hzya-nifi-AutoAddOracleDatafile-processors