feat(nifi): 添加自动扩展 Oracle 表空间的功能

- 新增 DevAutoAddOracleDatafileProcessor处理器,用于监控 Oracle表空间使用情况
- 当表空间使用率超过阈值时,自动添加新的数据文件
-检查磁盘空间使用情况,如果超过阈值则生成告警
- 支持自定义表空间名称、使用率阈值、数据文件路径等参数
- 生成并输出告警信息,包括磁盘空间不足、表空间使用率过高、数据文件自动扩展问题等
This commit is contained in:
liuy 2025-08-14 19:02:37 +08:00
parent a07f92afa0
commit 66b6dbbf4a
5 changed files with 363 additions and 0 deletions

View File

@ -0,0 +1,35 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>nifi-hzyadev-bundle</artifactId>
<groupId>com.hzya</groupId>
<version>1.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>hzya-nifi-AutoAddOracleDatafile-nar</artifactId>
<packaging>nar</packaging>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>com.hzya</groupId>
<artifactId>hzya-nifi-AutoAddOracleDatafile-processors</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-services-api-nar</artifactId>
<version>${nifi-revision}</version>
<type>nar</type>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,56 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>nifi-hzyadev-bundle</artifactId>
<groupId>com.hzya</groupId>
<version>1.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>hzya-nifi-AutoAddOracleDatafile-processors</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
<version>${nifi-revision}</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-dbcp-service-api</artifactId>
<version>${nifi-revision}</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId>
<version>1.15.3</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<version>${nifi-revision}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13</version>
<scope>test</scope>
</dependency>
<!-- <dependency>-->
<!-- <groupId>com.squareup.okhttp3</groupId>-->
<!-- <artifactId>mockwebserver</artifactId> &lt;!&ndash; 注意是mockwebserver而非okhttp &ndash;&gt;-->
<!-- <version>5.1.0</version> &lt;!&ndash; 版本与主库一致 &ndash;&gt;-->
<!-- <scope>test</scope> &lt;!&ndash; 测试专用 &ndash;&gt;-->
<!-- </dependency>-->
</dependencies>
</project>

View File

@ -0,0 +1,255 @@
package com.hzya.frame;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.FileStore;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.Statement;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
@Tags({"oracle", "database", "dba", "tablespace", "auto", "datafile"})
@CapabilityDescription("监控Oracle表空间的使用情况并在使用率超过指定阈值时自动添加新的数据文件。" + "同时它会检查磁盘空间和数据文件的AUTOEXTENSIBLE属性并收集潜在问题的告警信息。")
@SeeAlso({})
@ReadsAttributes({})
@WritesAttributes({@WritesAttribute(attribute = "autoadd.oracle.datafile.alerts", description = "包含在检查过程中生成的告警信息。多条信息将以换行符分隔。")})
@InputRequirement(Requirement.INPUT_ALLOWED)
public class DevAutoAddOracleDatafileProcessor extends AbstractProcessor {
// 处理器属性定义
public static final PropertyDescriptor DBCP_SERVICE = new PropertyDescriptor.Builder().name("dbcp-service").displayName("数据库连接池服务").description("提供Oracle数据库连接的控制器服务。").required(true).identifiesControllerService(DBCPService.class).build();
public static final PropertyDescriptor TABLESPACE_NAMES = new PropertyDescriptor.Builder().name("tablespace-names").displayName("表空间名称").description("需要监控的Oracle表空间名称列表使用逗号分隔 (例如USERS,DATA01)。").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
public static final PropertyDescriptor USAGE_THRESHOLD = new PropertyDescriptor.Builder().name("usage-threshold").displayName("使用率阈值 (%)").description("触发添加新数据文件的表空间使用率百分比 (1-99)。例如,填入 80。").required(true).defaultValue("80").addValidator(StandardValidators.createLongValidator(1, 99, true)).build();
public static final PropertyDescriptor DATAFILE_PATH = new PropertyDescriptor.Builder().name("datafile-path").displayName("新数据文件路径").description("用于创建新数据文件的目录的绝对路径。请确保运行Oracle的用户对此目录有写入权限。" + "同时支持Linux (例如 /u01/app/oracle/oradata/ORCL/) 和 Windows (例如 C:\\oracle\\oradata\\ORCL\\) 路径。").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
// public static final PropertyDescriptor NEW_DATAFILE_SIZE = new PropertyDescriptor.Builder().name("new-datafile-size").displayName("新数据文件大小").description("要创建的新数据文件的大小 (例如, 1G, 512M, 2048K)。").required(true).defaultValue("100M").addValidator(StandardValidators.DATA_SIZE_VALIDATOR).build();
public static final PropertyDescriptor NEW_DATAFILE_SIZE = new PropertyDescriptor.Builder().name("new-datafile-size").displayName("新数据文件大小").description("要创建的新数据文件的大小 (例如, 1G, 512M, 2048K)。").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).defaultValue("100M").build();
public static final PropertyDescriptor DISK_USAGE_THRESHOLD = new PropertyDescriptor.Builder().name("disk-usage-threshold").displayName("磁盘使用率阈值 (%)").description("如果数据文件路径所在磁盘的使用率超过此百分比(1-99),将生成告警并且不会添加数据文件。").required(true).defaultValue("90").addValidator(StandardValidators.createLongValidator(1, 99, true)).build();
// 关系定义
public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("检查成功完成后无论是否添加了数据文件FlowFile将路由到此关系。").build();
public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("当发生不可恢复的错误时如数据库连接失败FlowFile将路由到此关系。").build();
public static final Relationship REL_ALERT = new Relationship.Builder().name("alert").description("当检测到潜在问题时如磁盘使用率过高、AUTOEXTEND关闭等包含告警信息的新FlowFile将被路由到此关系。").build();
private List<PropertyDescriptor> descriptors;
private Set<Relationship> relationships;
@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(DBCP_SERVICE);
descriptors.add(TABLESPACE_NAMES);
descriptors.add(USAGE_THRESHOLD);
descriptors.add(DATAFILE_PATH);
descriptors.add(NEW_DATAFILE_SIZE);
descriptors.add(DISK_USAGE_THRESHOLD);
this.descriptors = Collections.unmodifiableList(descriptors);
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
relationships.add(REL_FAILURE);
relationships.add(REL_ALERT);
this.relationships = Collections.unmodifiableSet(relationships);
}
@Override
public Set<Relationship> getRelationships() {
return this.relationships;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
// 如果没有输入FlowFile则创建一个新的来触发执行逻辑
FlowFile flowFile = session.get();
if (flowFile == null) {
flowFile = session.create();
}
final long usageThreshold = context.getProperty(USAGE_THRESHOLD).asLong();
final long diskUsageThreshold = context.getProperty(DISK_USAGE_THRESHOLD).asLong();
final String datafileBasePath = context.getProperty(DATAFILE_PATH).getValue();
final String newDatafileSize = context.getProperty(NEW_DATAFILE_SIZE).getValue();
final List<String> alertMessages = new ArrayList<>();
// 从属性中获取逗号分隔的表空间名称字符串并转换为列表
final String rawTablespaceNames = context.getProperty(TABLESPACE_NAMES).getValue();
final List<String> tablespaceNames = Arrays.stream(rawTablespaceNames.split(",")).map(String::trim) // 去除每个表空间名称前后的空格
.filter(name -> !name.isEmpty()) // 过滤掉可能因 "a,,b" 这种格式产生的空字符串
.collect(Collectors.toList());
final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
// 步骤 1: 检查数据文件目录的磁盘空间
// 此检查方法兼容 Windows Linux 操作系统
try {
Path path = Paths.get(datafileBasePath);
if (!Files.exists(path)) {
throw new IOException("数据文件路径不存在: " + datafileBasePath);
}
FileStore store = Files.getFileStore(path);
long totalSpace = store.getTotalSpace();
long usableSpace = store.getUsableSpace();
long usedSpace = totalSpace - usableSpace;
// 处理 totalSpace 0 的情况避免除零异常
double currentDiskUsage = totalSpace > 0 ? (double) usedSpace / totalSpace * 100 : 0;
String diskUsageMsg = String.format("磁盘路径 '%s' 空间耗用: %.2f%% (已用: %d GB, 总计: %d GB)", datafileBasePath, currentDiskUsage, usedSpace / 1024 / 1024 / 1024, totalSpace / 1024 / 1024 / 1024);
getLogger().info(diskUsageMsg); // 关键信息记录磁盘耗用日志
if (currentDiskUsage >= diskUsageThreshold) {
String alert = String.format("严重告警: 磁盘路径 '%s' 的空间使用率 %.2f%%, 已超过设定的阈值 %d%%。", datafileBasePath, currentDiskUsage, diskUsageThreshold);
alertMessages.add(alert);
getLogger().error(alert); // 关键信息记录到日志
}
} catch (IOException e) {
getLogger().error("检查磁盘空间失败,路径: '{}'。请检查路径是否正确以及NiFi运行用户是否有权访问。", new Object[]{datafileBasePath}, e);
session.transfer(session.penalize(flowFile), REL_FAILURE);
return;
}
// 步骤 2: 连接数据库并对每个表空间进行检查
try (final Connection conn = dbcpService.getConnection(flowFile.getAttributes())) {
// 循环处理每一个配置的表空间
for (String tablespaceName : tablespaceNames) {
tablespaceName = tablespaceName.trim().toUpperCase();
List<String> currentTbsAlerts = new ArrayList<>(); // 用于收集当前循环中表空间的告警信息
// 步骤 2a: 验证表空间数据文件是否可以自动扩容 (AUTOEXTENSIBLE)
String autoextendSql = "SELECT file_name, autoextensible FROM dba_data_files WHERE tablespace_name = ?";
try (PreparedStatement ps = conn.prepareStatement(autoextendSql)) {
ps.setString(1, tablespaceName);
ResultSet rs = ps.executeQuery();
boolean hasNonAutoensibleFile = false;
while (rs.next()) {
if ("NO".equalsIgnoreCase(rs.getString("autoextensible"))) {
hasNonAutoensibleFile = true;
getLogger().warn("表空间 '{}' 的数据文件 '{}' 被设置为 AUTOEXTENSIBLE=NO。", new Object[]{tablespaceName, rs.getString("file_name")});
}
}
if (hasNonAutoensibleFile) {
currentTbsAlerts.add(String.format("告警: 表空间 '%s' 中存在一个或多个数据文件的 AUTOEXTENSIBLE 属性为 'NO'。", tablespaceName));
}
}
// 步骤 2b: 使用您提供的SQL1逻辑来查询表空间使用率 (兼容Oracle 11g, 12c, 19c)
String usageSql = "SELECT used_gb, current_gb, used_pct_max FROM (" + "SELECT df.tablespace_name, " + "ROUND( ( SUM( df.bytes ) - SUM( NVL(fs.free_bytes, 0) ) ) / 1024 / 1024 / 1024, 2 ) AS used_gb, " + "ROUND( SUM( df.bytes ) / 1024 / 1024 / 1024, 2 ) AS current_gb, " + "CASE WHEN SUM(df.maxbytes) = 0 THEN 100 ELSE ROUND( ( SUM( df.bytes ) - SUM( NVL(fs.free_bytes, 0) ) ) / SUM( df.maxbytes ) * 100, 2 ) END AS used_pct_max " + "FROM dba_data_files df LEFT JOIN ( SELECT tablespace_name, file_id, SUM( bytes ) AS free_bytes " + "FROM dba_free_space GROUP BY tablespace_name, file_id ) fs ON df.file_id = fs.file_id " + "GROUP BY df.tablespace_name) " + "WHERE tablespace_name = ?";
try (PreparedStatement ps = conn.prepareStatement(usageSql)) {
ps.setString(1, tablespaceName);
ResultSet rs = ps.executeQuery();
if (rs.next()) {
// 读取大数字时使用 getBigDecimal 保证精度然后转换为 double
double usedPct = rs.getBigDecimal("used_pct_max").doubleValue();
double usedGb = rs.getBigDecimal("used_gb").doubleValue();
double currentGb = rs.getBigDecimal("current_gb").doubleValue();
String usageLogMsg = String.format("表空间 '%s' 空间耗用: %.2f%% (已用: %.2f GB, 当前总大小: %.2f GB)", tablespaceName, usedPct, usedGb, currentGb);
getLogger().info(usageLogMsg); // 关键信息记录表空间耗用日志
if (usedPct > usageThreshold) {
// 表空间使用率已超过阈值
String thresholdAlert = String.format("告警: 表空间 '%s' 使用率 %.2f%%, 已超过设定的阈值 %d%%。", tablespaceName, usedPct, usageThreshold);
currentTbsAlerts.add(thresholdAlert);
getLogger().warn(thresholdAlert);
// 再次确认磁盘空间是否超限如果已超限则不执行添加操作
if (alertMessages.stream().anyMatch(m -> m.startsWith("严重告警: 磁盘路径"))) {
getLogger().error("由于磁盘空间使用率已超阈值,跳过为表空间 '{}' 添加数据文件的操作。", new Object[]{tablespaceName});
} else {
// 执行添加数据文件的逻辑
String timestamp = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date());
String newFileName = String.format("%s_%s.dbf", tablespaceName, timestamp); // 数据文件命名规则
String fullPath = new File(datafileBasePath, newFileName).getPath();
// 为Windows系统处理路径分隔符
if (System.getProperty("os.name").toLowerCase().contains("win")) {
fullPath = fullPath.replace('\\', '/');
}
String addDatafileSql = String.format("ALTER TABLESPACE %s ADD DATAFILE '%s' SIZE %s AUTOEXTEND ON NEXT 50M", tablespaceName, fullPath, newDatafileSize);
getLogger().info("准备执行SQL语句以添加数据文件: {}", new Object[]{addDatafileSql}); // 关键信息记录创建数据文件的SQL
try (Statement stmt = conn.createStatement()) {
stmt.execute(addDatafileSql);
getLogger().info("成功为表空间 '{}' 添加数据文件 '{}'。", new Object[]{tablespaceName, fullPath});
} catch (Exception e) {
String addFileError = String.format("严重告警: 为表空间 '%s' 添加数据文件失败。SQL: %s", tablespaceName, addDatafileSql);
getLogger().error(addFileError, e);
currentTbsAlerts.add(addFileError + " - 错误详情: " + e.getMessage());
}
}
}
} else {
String notFoundAlert = String.format("警告: 未找到表空间 '%s',或相关的空间使用率查询没有返回数据。", tablespaceName);
getLogger().warn(notFoundAlert);
currentTbsAlerts.add(notFoundAlert);
}
}
// 将当前表空间的所有告警信息统一添加到主告警列表中
alertMessages.addAll(currentTbsAlerts);
}
} catch (Exception e) {
getLogger().error("处理Oracle表空间检查时发生错误: {}", e.getMessage(), e);
session.transfer(session.penalize(flowFile), REL_FAILURE);
return;
}
// 步骤 3: 如果收集到了任何告警信息则创建一个新的FlowFile并路由到 'alert' 关系
if (!alertMessages.isEmpty()) {
FlowFile alertFlowFile = session.create(flowFile);
// 将所有告警信息用换行符连接成一个字符串
String alertContent = String.join("\n", alertMessages);
alertFlowFile = session.write(alertFlowFile, out -> out.write(alertContent.getBytes(StandardCharsets.UTF_8)));
// 同时将告警信息写入FlowFile的属性中方便下游处理器直接使用
alertFlowFile = session.putAttribute(alertFlowFile, "autoadd.oracle.datafile.alerts", alertContent);
session.transfer(alertFlowFile, REL_ALERT);
}
// 步骤 4: 将原始的(或新创建的)FlowFile路由到 'success' 关系
session.transfer(flowFile, REL_SUCCESS);
}
}

View File

@ -0,0 +1,15 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
com.hzya.frame.DevAutoAddOracleDatafileProcessor

View File

@ -23,6 +23,8 @@
<module>hzya-nifi-VoucherConversion-processors</module>
<module>hzya-nifi-AdvancedJoltTransformer-nar</module>
<module>hzya-nifi-AdvancedJoltTransformer-processors</module>
<module>hzya-nifi-AutoAddOracleDatafile-nar</module>
<module>hzya-nifi-AutoAddOracleDatafile-processors</module>
</modules>
<parent>