feat(nifi): 添加 JSON拆分处理器并更新项目结构- 新增 DevJsonSplitterProcessor,用于拆分和展平多级 JSON 数据

- 移除未使用的 AutoJsonTableCreate 和 DevGeneratePaginatedSqlProcessor 模块
- 新增 hzya-nifi-package-nar 模块,用于打包 JSON 拆分处理器
- 更新项目结构和依赖关系
This commit is contained in:
liuy 2025-07-11 10:27:06 +08:00
parent 10bafb2b50
commit c9672ec085
6 changed files with 415 additions and 38 deletions

View File

@ -1,34 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>nifi-hzyadev-bundle</artifactId>
<groupId>com.hzya</groupId>
<version>1.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>hzya-nifi-DevGeneratePaginatedSqlProcessor-nar</artifactId>
<packaging>nar</packaging>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<maven.javadoc.skip>true</maven.javadoc.skip>
<source.skip>true</source.skip>
</properties>
<dependencies>
<dependency>
<groupId>com.hzya</groupId>
<artifactId>hzya-nifi-DevGeneratePaginatedSqlProcessor-processors</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-services-api-nar</artifactId>
<version>${nifi-revision}</version>
<type>nar</type>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,50 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>nifi-hzyadev-bundle</artifactId>
<groupId>com.hzya</groupId>
<version>1.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>hzya-nifi-JsonSplitter-processors</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
<version>${nifi-revision}</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-dbcp-service-api</artifactId>
<version>${nifi-revision}</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId>
<version>1.15.3</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<version>${nifi-revision}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,330 @@
package com.hzya.frame;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.*;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.*;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.*;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.*;
import java.util.concurrent.atomic.AtomicReference;
/**
* @Authorliuyang
* @Packagecom.hzya.frame
* @Projectnifi-hzyadev-bundle
* @nameDevJsonSplitterProcessor
* @Date2025/7/11 09:27
* @FilenameDevJsonSplitterProcessor
*/
@Tags({"json", "split", "flatten", "transform", "custom"})
@CapabilityDescription("将多级JSON拆分为多个FlowFiles使父字段名变平并添加前缀。如果JSON是单层的它会传递原始的FlowFile。")
@ReadsAttributes({@ReadsAttribute(attribute = "storageprefix", description = "用于“table_name”属性的前缀。")})
@WritesAttributes({@WritesAttribute(attribute = "table_name", description = "为扁平化JSON数据生成的表名。")})
public class DevJsonSplitterProcessor extends AbstractProcessor {
// 定义成功关系用于拆分和处理后的 FlowFile
public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("已成功拆分和展平的FlowFiles。").build();
// 定义原始关系用于未拆分的单层级 JSON FlowFile
public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("未拆分的原始FlowFiles单层JSON").build();
private List<PropertyDescriptor> properties;
private Set<Relationship> relationships;
private ObjectMapper objectMapper;
// 这是你应该重写的方法
@Override
protected void init(final ProcessorInitializationContext context) { // 注意这里是 protected void init
// 初始化属性列表
final List<PropertyDescriptor> properties = new ArrayList<>();
// 示例如果你想让用户配置 storageprefix可以这样定义一个属性
// properties.add(new PropertyDescriptor.Builder()
// .name("Storage Prefix")
// .description("The prefix to use for the 'table_name' attribute.")
// .required(true)
// .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
// .build());
this.properties = Collections.unmodifiableList(properties);
// 初始化关系集合
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
relationships.add(REL_ORIGINAL);
this.relationships = Collections.unmodifiableSet(relationships);
// 初始化 Jackson ObjectMapper
this.objectMapper = new ObjectMapper();
// 可以在这里获取 Logger但通常在 onTrigger 中使用 ProcessorContext 提供的getLogger() 更好
// this.getLogger().info("SplitAndFlattenJson processor initialized.");
}
@Override
public Set<Relationship> getRelationships() {
return this.relationships;
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return this.properties;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
// 获取输入的 FlowFile
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
// 获取 storageprefix 属性
final String storagePrefix = flowFile.getAttribute("storageprefix");
if (storagePrefix == null) {
getLogger().warn("FlowFile没有“storageprefix”属性。转为REL_ORIGINAL。", flowFile);
session.transfer(flowFile, REL_ORIGINAL); // 如果没有 storageprefix则直接传递给原始关系
return;
}
final AtomicReference<String> jsonContent = new AtomicReference<>();
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(final InputStream in) throws IOException {
// 使用 IOUtils 读取所有字节
byte[] bytes = IOUtils.toByteArray(in);
jsonContent.set(new String(bytes, "UTF-8"));
}
});
JsonNode rootNode;
try {
// 解析 JSON 字符串
rootNode = objectMapper.readTree(jsonContent.get());
} catch (IOException e) {
getLogger().error("未能解析FlowFile{}的JSON:{}", flowFile, e.getMessage(), e);
session.transfer(flowFile, REL_ORIGINAL); // 解析失败传递给原始关系
return;
}
// 检查 JSON 是否为单层级
if (isSingleLevel(rootNode)) {
getLogger().debug("FlowFile{}包含单层JSON。转为REL_ORIGINAL", flowFile);
session.transfer(flowFile, REL_ORIGINAL);
return;
}
// 递归处理 JSON 节点
processNode(rootNode, "", "", flowFile, storagePrefix, session);
// 移除原始 FlowFile
session.remove(flowFile);
getLogger().debug("已处理并删除原始FlowFile {}.", flowFile);
}
/**
* 检查 JSON 节点是否为单层级
* 单层级定义为
* 1. 如果是对象所有值都不是对象或数组
* 2. 如果是数组所有元素都是对象且这些对象的所有值都不是对象或数组
*
* @param node 要检查的 JSON 节点
* @return 如果是单层级 JSON则返回 true否则返回 false
*/
private boolean isSingleLevel(JsonNode node) {
if (node.isObject()) {
// 如果是对象检查所有字段的值是否都是非对象或非数组
for (JsonNode child : node) {
if (child.isObject() || child.isArray()) {
return false; // 包含子对象或子数组不是单层级
}
}
return true; // 所有值都是非对象或非数组是单层级
} else if (node.isArray()) {
// 如果是数组检查所有元素是否都是单层级对象
for (JsonNode element : node) {
if (!element.isObject() || !isSingleLevel(element)) {
return false; // 包含非对象元素或非单层级对象元素不是单层级
}
}
return true; // 所有元素都是单层级对象是单层级
}
return true; // 其他类型如基本类型也视为单层级
}
/**
* 递归处理 JSON 节点生成 FlowFile
*
* @param node 当前要处理的 JSON 节点
* @param parentName 父级字段名用于拼接
* @param path 当前节点在 JSON 结构中的路径用于生成 table_name
* @param parentFlowFile 原始 FlowFile
* @param storagePrefix storageprefix 属性值
* @param session NiFi 处理会话
*/
private void processNode(JsonNode node, String parentName, String path, FlowFile parentFlowFile, String storagePrefix, ProcessSession session) {
if (node.isObject()) {
// 如果是 JSON 对象
ObjectNode objectNode = (ObjectNode) node;
// 检查是否为叶子对象 (所有值都不是对象或数组)
boolean isLeafObject = true;
Iterator<Map.Entry<String, JsonNode>> fields = objectNode.fields();
while (fields.hasNext()) {
Map.Entry<String, JsonNode> field = fields.next();
if (field.getValue().isObject() || field.getValue().isArray()) {
isLeafObject = false;
break;
}
}
if (isLeafObject) {
// 如果是叶子对象创建新的 ObjectNode 用于扁平化
ObjectNode leafObj = objectMapper.createObjectNode();
objectNode.fields().forEachRemaining(entry -> {
// 拼接字段名并去除空格
String fieldName = parentName.isEmpty() ? entry.getKey().replace(" ", "") : (parentName + "_" + entry.getKey()).replace(" ", "");
leafObj.set(fieldName, entry.getValue());
});
publishFlowFile(leafObj, path, parentFlowFile, storagePrefix, session);
} else {
// 非叶子对象递归处理其子节点
objectNode.fields().forEachRemaining(entry -> {
JsonNode childNode = entry.getValue();
String currentKey = entry.getKey().replace(" ", ""); // 去除当前键的空格
String newPath = path.isEmpty() ? currentKey : (path + "_" + currentKey); // 拼接路径
// 如果子节点是对象或数组则递归处理
if (childNode.isObject() || childNode.isArray()) {
processNode(childNode, currentKey, newPath, parentFlowFile, storagePrefix, session);
}
});
}
} else if (node.isArray()) {
// 如果是 JSON 数组
ArrayNode arrayNode = (ArrayNode) node;
// 检查是否为叶子数组 (所有元素都是叶子对象)
boolean isLeafArray = true;
for (JsonNode element : arrayNode) {
if (!element.isObject() || !isLeafObject(element)) { // isLeafObject 辅助方法判断
isLeafArray = false;
break;
}
}
if (isLeafArray) {
// 如果是叶子数组为数组中的每个对象添加拼接的字段名称
ArrayNode leafArray = objectMapper.createArrayNode();
arrayNode.forEach(item -> {
if (item.isObject()) {
ObjectNode newItem = objectMapper.createObjectNode();
item.fields().forEachRemaining(entry -> {
// 拼接字段名并去除空格
String fieldName = parentName.isEmpty() ? entry.getKey().replace(" ", "") : (parentName + "_" + entry.getKey()).replace(" ", "");
newItem.set(fieldName, entry.getValue());
});
leafArray.add(newItem);
}
});
publishFlowFile(leafArray, path, parentFlowFile, storagePrefix, session);
} else {
// 非叶子数组递归处理每个元素
arrayNode.forEach(item -> {
processNode(item, parentName, path, parentFlowFile, storagePrefix, session);
});
}
}
// 对于基本类型节点不进行处理因为它们不是可以被拆分的层级
}
/**
* 辅助方法检查一个 JsonNode 是否为叶子对象
*
* @param node 要检查的 JsonNode
* @return 如果是叶子对象则返回 true否则返回 false
*/
private boolean isLeafObject(JsonNode node) {
if (!node.isObject()) {
return false;
}
Iterator<Map.Entry<String, JsonNode>> fields = node.fields();
while (fields.hasNext()) {
Map.Entry<String, JsonNode> field = fields.next();
if (field.getValue().isObject() || field.getValue().isArray()) {
return false;
}
}
return true;
}
/**
* 将处理后的 JSON 内容写入新的 FlowFile 并传输
*
* @param contentNode 要写入 FlowFile JSON 内容
* @param path 当前节点在 JSON 结构中的路径
* @param parentFlowFile 原始 FlowFile
* @param storagePrefix storageprefix 属性值
* @param session NiFi 处理会话
*/
private void publishFlowFile(JsonNode contentNode, String path, FlowFile parentFlowFile, String storagePrefix, ProcessSession session) {
FlowFile newFlowFile = session.create(parentFlowFile);
newFlowFile = session.write(newFlowFile, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
// JSON 内容写入输出流
objectMapper.writeValue(out, contentNode);
}
});
// 设置 table_name 属性
String tableNamePath = path.isEmpty() ? "data_details" : path;
newFlowFile = session.putAttribute(newFlowFile, "table_name", storagePrefix + "_" + tableNamePath);
// 传输新创建的 FlowFile 到成功关系
session.transfer(newFlowFile, REL_SUCCESS);
getLogger().debug("已使用表名创建新的FlowFile {}{}", newFlowFile, storagePrefix + "_" + tableNamePath);
}
}

View File

@ -0,0 +1,15 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
com.hzya.frame.DevJsonSplitterProcessor

View File

@ -9,7 +9,7 @@
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>hzya-nifi-AutoJsonTableCreate-nar</artifactId>
<artifactId>hzya-nifi-package-nar</artifactId>
<packaging>nar</packaging>
<properties>
@ -19,11 +19,24 @@
</properties>
<dependencies>
<!-- json拆分器-->
<dependency>
<groupId>com.hzya</groupId>
<artifactId>hzya-nifi-AutoJsonTableCreate-processors</artifactId>
<artifactId>hzya-nifi-JsonSplitter-processors</artifactId>
<version>1.0</version>
</dependency>
<!-- 根据json生成建表ddl语句-->
<!-- <dependency>-->
<!-- <groupId>com.hzya</groupId>-->
<!-- <artifactId>hzya-nifi-AutoJsonTableCreate-processors</artifactId>-->
<!-- <version>1.0</version>-->
<!-- </dependency>-->
<!-- 生成分页sql语句-->
<!-- <dependency>-->
<!-- <groupId>com.hzya</groupId>-->
<!-- <artifactId>hzya-nifi-DevGeneratePaginatedSqlProcessor-processors</artifactId>-->
<!-- <version>1.0</version>-->
<!-- </dependency>-->
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-services-api-nar</artifactId>
@ -31,4 +44,5 @@
<type>nar</type>
</dependency>
</dependencies>
</project>

View File

@ -9,10 +9,12 @@
<packaging>pom</packaging>
<version>1.0</version>
<modules>
<module>hzya-nifi-DevGeneratePaginatedSqlProcessor-nar</module>
<!-- <module>hzya-nifi-DevGeneratePaginatedSqlProcessor-nar</module>-->
<module>hzya-nifi-DevGeneratePaginatedSqlProcessor-processors</module>
<module>hzya-nifi-AutoJsonTableCreate-nar</module>
<!-- <module>hzya-nifi-AutoJsonTableCreate-nar</module>-->
<module>hzya-nifi-AutoJsonTableCreate-processors</module>
<module>hzya-nifi-package-nar</module>
<module>hzya-nifi-JsonSplitter-processors</module>
</modules>
<parent>