From c9672ec0859164c4370efbcf92eada798c4b5ff1 Mon Sep 17 00:00:00 2001 From: liuy <37787198+LiuyCodes@users.noreply.github.com> Date: Fri, 11 Jul 2025 10:27:06 +0800 Subject: [PATCH] =?UTF-8?q?feat(nifi):=20=E6=B7=BB=E5=8A=A0=20JSON?= =?UTF-8?q?=E6=8B=86=E5=88=86=E5=A4=84=E7=90=86=E5=99=A8=E5=B9=B6=E6=9B=B4?= =?UTF-8?q?=E6=96=B0=E9=A1=B9=E7=9B=AE=E7=BB=93=E6=9E=84-=20=E6=96=B0?= =?UTF-8?q?=E5=A2=9E=20DevJsonSplitterProcessor=EF=BC=8C=E7=94=A8=E4=BA=8E?= =?UTF-8?q?=E6=8B=86=E5=88=86=E5=92=8C=E5=B1=95=E5=B9=B3=E5=A4=9A=E7=BA=A7?= =?UTF-8?q?=20JSON=20=E6=95=B0=E6=8D=AE=20-=20=E7=A7=BB=E9=99=A4=E6=9C=AA?= =?UTF-8?q?=E4=BD=BF=E7=94=A8=E7=9A=84=20AutoJsonTableCreate=20=E5=92=8C?= =?UTF-8?q?=20DevGeneratePaginatedSqlProcessor=20=E6=A8=A1=E5=9D=97=20-=20?= =?UTF-8?q?=E6=96=B0=E5=A2=9E=20hzya-nifi-package-nar=20=E6=A8=A1=E5=9D=97?= =?UTF-8?q?=EF=BC=8C=E7=94=A8=E4=BA=8E=E6=89=93=E5=8C=85=20JSON=20?= =?UTF-8?q?=E6=8B=86=E5=88=86=E5=A4=84=E7=90=86=E5=99=A8=20-=20=E6=9B=B4?= =?UTF-8?q?=E6=96=B0=E9=A1=B9=E7=9B=AE=E7=BB=93=E6=9E=84=E5=92=8C=E4=BE=9D?= =?UTF-8?q?=E8=B5=96=E5=85=B3=E7=B3=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../pom.xml | 34 -- .../hzya-nifi-JsonSplitter-processors/pom.xml | 50 +++ .../hzya/frame/DevJsonSplitterProcessor.java | 330 ++++++++++++++++++ .../org.apache.nifi.processor.Processor | 15 + .../pom.xml | 18 +- nifi-hzyadev-bundle/pom.xml | 6 +- 6 files changed, 415 insertions(+), 38 deletions(-) delete mode 100644 nifi-hzyadev-bundle/hzya-nifi-DevGeneratePaginatedSqlProcessor-nar/pom.xml create mode 100644 nifi-hzyadev-bundle/hzya-nifi-JsonSplitter-processors/pom.xml create mode 100644 nifi-hzyadev-bundle/hzya-nifi-JsonSplitter-processors/src/main/java/com/hzya/frame/DevJsonSplitterProcessor.java create mode 100644 nifi-hzyadev-bundle/hzya-nifi-JsonSplitter-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor rename nifi-hzyadev-bundle/{hzya-nifi-AutoJsonTableCreate-nar => hzya-nifi-package-nar}/pom.xml (58%) diff --git a/nifi-hzyadev-bundle/hzya-nifi-DevGeneratePaginatedSqlProcessor-nar/pom.xml b/nifi-hzyadev-bundle/hzya-nifi-DevGeneratePaginatedSqlProcessor-nar/pom.xml deleted file mode 100644 index c4cda01..0000000 --- a/nifi-hzyadev-bundle/hzya-nifi-DevGeneratePaginatedSqlProcessor-nar/pom.xml +++ /dev/null @@ -1,34 +0,0 @@ - - - - nifi-hzyadev-bundle - com.hzya - 1.0 - - 4.0.0 - - hzya-nifi-DevGeneratePaginatedSqlProcessor-nar - nar - - 8 - 8 - true - true - - - - - com.hzya - hzya-nifi-DevGeneratePaginatedSqlProcessor-processors - 1.0 - - - org.apache.nifi - nifi-standard-services-api-nar - ${nifi-revision} - nar - - - \ No newline at end of file diff --git a/nifi-hzyadev-bundle/hzya-nifi-JsonSplitter-processors/pom.xml b/nifi-hzyadev-bundle/hzya-nifi-JsonSplitter-processors/pom.xml new file mode 100644 index 0000000..a45972f --- /dev/null +++ b/nifi-hzyadev-bundle/hzya-nifi-JsonSplitter-processors/pom.xml @@ -0,0 +1,50 @@ + + + + nifi-hzyadev-bundle + com.hzya + 1.0 + + 4.0.0 + + hzya-nifi-JsonSplitter-processors + + + 8 + 8 + + + + + org.apache.nifi + nifi-api + ${nifi-revision} + + + org.apache.nifi + nifi-dbcp-service-api + ${nifi-revision} + + + + org.apache.nifi + nifi-processor-utils + 1.15.3 + + + org.apache.nifi + nifi-mock + ${nifi-revision} + test + + + junit + junit + 4.13 + test + + + + \ No newline at end of file diff --git a/nifi-hzyadev-bundle/hzya-nifi-JsonSplitter-processors/src/main/java/com/hzya/frame/DevJsonSplitterProcessor.java b/nifi-hzyadev-bundle/hzya-nifi-JsonSplitter-processors/src/main/java/com/hzya/frame/DevJsonSplitterProcessor.java new file mode 100644 index 0000000..b04dbb4 --- /dev/null +++ b/nifi-hzyadev-bundle/hzya-nifi-JsonSplitter-processors/src/main/java/com/hzya/frame/DevJsonSplitterProcessor.java @@ -0,0 +1,330 @@ +package com.hzya.frame; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.commons.io.IOUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.io.OutputStreamCallback; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.*; + +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.*; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.*; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.*; +import java.util.concurrent.atomic.AtomicReference; + +/** + * @Author:liuyang + * @Package:com.hzya.frame + * @Project:nifi-hzyadev-bundle + * @name:DevJsonSplitterProcessor + * @Date:2025/7/11 09:27 + * @Filename:DevJsonSplitterProcessor + */ + +@Tags({"json", "split", "flatten", "transform", "custom"}) +@CapabilityDescription("将多级JSON拆分为多个FlowFiles,使父字段名变平并添加前缀。如果JSON是单层的,它会传递原始的FlowFile。") +@ReadsAttributes({@ReadsAttribute(attribute = "storageprefix", description = "用于“table_name”属性的前缀。")}) +@WritesAttributes({@WritesAttribute(attribute = "table_name", description = "为扁平化JSON数据生成的表名。")}) +public class DevJsonSplitterProcessor extends AbstractProcessor { + + // 定义成功关系,用于拆分和处理后的 FlowFile + public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("已成功拆分和展平的FlowFiles。").build(); + + // 定义原始关系,用于未拆分的单层级 JSON FlowFile + public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("未拆分的原始FlowFiles(单层JSON)。").build(); + + private List properties; + private Set relationships; + private ObjectMapper objectMapper; + + // 这是你应该重写的方法 + @Override + protected void init(final ProcessorInitializationContext context) { // 注意这里是 protected void init + // 初始化属性列表 + final List properties = new ArrayList<>(); + // 示例:如果你想让用户配置 storageprefix,可以这样定义一个属性: + // properties.add(new PropertyDescriptor.Builder() + // .name("Storage Prefix") + // .description("The prefix to use for the 'table_name' attribute.") + // .required(true) + // .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + // .build()); + this.properties = Collections.unmodifiableList(properties); + + // 初始化关系集合 + final Set relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_ORIGINAL); + this.relationships = Collections.unmodifiableSet(relationships); + + // 初始化 Jackson ObjectMapper + this.objectMapper = new ObjectMapper(); + + // 可以在这里获取 Logger,但通常在 onTrigger 中使用 ProcessorContext 提供的getLogger() 更好 + // this.getLogger().info("SplitAndFlattenJson processor initialized."); + } + + @Override + public Set getRelationships() { + return this.relationships; + } + + @Override + protected List getSupportedPropertyDescriptors() { + return this.properties; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + // 获取输入的 FlowFile + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + // 获取 storageprefix 属性 + final String storagePrefix = flowFile.getAttribute("storageprefix"); + if (storagePrefix == null) { + getLogger().warn("FlowFile{}没有“storageprefix”属性。转为REL_ORIGINAL。", flowFile); + session.transfer(flowFile, REL_ORIGINAL); // 如果没有 storageprefix,则直接传递给原始关系 + return; + } + + final AtomicReference jsonContent = new AtomicReference<>(); + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(final InputStream in) throws IOException { + // 使用 IOUtils 读取所有字节 + byte[] bytes = IOUtils.toByteArray(in); + jsonContent.set(new String(bytes, "UTF-8")); + } + }); + + JsonNode rootNode; + try { + // 解析 JSON 字符串 + rootNode = objectMapper.readTree(jsonContent.get()); + } catch (IOException e) { + getLogger().error("未能解析FlowFile{}的JSON:{}", flowFile, e.getMessage(), e); + session.transfer(flowFile, REL_ORIGINAL); // 解析失败,传递给原始关系 + return; + } + + // 检查 JSON 是否为单层级 + if (isSingleLevel(rootNode)) { + getLogger().debug("FlowFile{}包含单层JSON。转为REL_ORIGINAL", flowFile); + session.transfer(flowFile, REL_ORIGINAL); + return; + } + + // 递归处理 JSON 节点 + processNode(rootNode, "", "", flowFile, storagePrefix, session); + + // 移除原始 FlowFile + session.remove(flowFile); + getLogger().debug("已处理并删除原始FlowFile {}.", flowFile); + } + + /** + * 检查 JSON 节点是否为单层级。 + * 单层级定义为: + * 1. 如果是对象,所有值都不是对象或数组。 + * 2. 如果是数组,所有元素都是对象,且这些对象的所有值都不是对象或数组。 + * + * @param node 要检查的 JSON 节点 + * @return 如果是单层级 JSON,则返回 true;否则返回 false。 + */ + private boolean isSingleLevel(JsonNode node) { + if (node.isObject()) { + // 如果是对象,检查所有字段的值是否都是非对象或非数组 + for (JsonNode child : node) { + if (child.isObject() || child.isArray()) { + return false; // 包含子对象或子数组,不是单层级 + } + } + return true; // 所有值都是非对象或非数组,是单层级 + } else if (node.isArray()) { + // 如果是数组,检查所有元素是否都是单层级对象 + for (JsonNode element : node) { + if (!element.isObject() || !isSingleLevel(element)) { + return false; // 包含非对象元素或非单层级对象元素,不是单层级 + } + } + return true; // 所有元素都是单层级对象,是单层级 + } + return true; // 其他类型(如基本类型),也视为单层级 + } + + + /** + * 递归处理 JSON 节点,生成 FlowFile。 + * + * @param node 当前要处理的 JSON 节点 + * @param parentName 父级字段名(用于拼接) + * @param path 当前节点在 JSON 结构中的路径(用于生成 table_name) + * @param parentFlowFile 原始 FlowFile + * @param storagePrefix storageprefix 属性值 + * @param session NiFi 处理会话 + */ + private void processNode(JsonNode node, String parentName, String path, FlowFile parentFlowFile, String storagePrefix, ProcessSession session) { + + if (node.isObject()) { + // 如果是 JSON 对象 + ObjectNode objectNode = (ObjectNode) node; + + // 检查是否为叶子对象 (所有值都不是对象或数组) + boolean isLeafObject = true; + Iterator> fields = objectNode.fields(); + while (fields.hasNext()) { + Map.Entry field = fields.next(); + if (field.getValue().isObject() || field.getValue().isArray()) { + isLeafObject = false; + break; + } + } + + if (isLeafObject) { + // 如果是叶子对象,创建新的 ObjectNode 用于扁平化 + ObjectNode leafObj = objectMapper.createObjectNode(); + objectNode.fields().forEachRemaining(entry -> { + // 拼接字段名并去除空格 + String fieldName = parentName.isEmpty() ? entry.getKey().replace(" ", "") : (parentName + "_" + entry.getKey()).replace(" ", ""); + leafObj.set(fieldName, entry.getValue()); + }); + publishFlowFile(leafObj, path, parentFlowFile, storagePrefix, session); + } else { + // 非叶子对象,递归处理其子节点 + objectNode.fields().forEachRemaining(entry -> { + JsonNode childNode = entry.getValue(); + String currentKey = entry.getKey().replace(" ", ""); // 去除当前键的空格 + String newPath = path.isEmpty() ? currentKey : (path + "_" + currentKey); // 拼接路径 + + // 如果子节点是对象或数组,则递归处理 + if (childNode.isObject() || childNode.isArray()) { + processNode(childNode, currentKey, newPath, parentFlowFile, storagePrefix, session); + } + }); + } + } else if (node.isArray()) { + // 如果是 JSON 数组 + ArrayNode arrayNode = (ArrayNode) node; + + // 检查是否为叶子数组 (所有元素都是叶子对象) + boolean isLeafArray = true; + for (JsonNode element : arrayNode) { + if (!element.isObject() || !isLeafObject(element)) { // isLeafObject 辅助方法判断 + isLeafArray = false; + break; + } + } + + if (isLeafArray) { + // 如果是叶子数组,为数组中的每个对象添加拼接的字段名称 + ArrayNode leafArray = objectMapper.createArrayNode(); + arrayNode.forEach(item -> { + if (item.isObject()) { + ObjectNode newItem = objectMapper.createObjectNode(); + item.fields().forEachRemaining(entry -> { + // 拼接字段名并去除空格 + String fieldName = parentName.isEmpty() ? entry.getKey().replace(" ", "") : (parentName + "_" + entry.getKey()).replace(" ", ""); + newItem.set(fieldName, entry.getValue()); + }); + leafArray.add(newItem); + } + }); + publishFlowFile(leafArray, path, parentFlowFile, storagePrefix, session); + } else { + // 非叶子数组,递归处理每个元素 + arrayNode.forEach(item -> { + processNode(item, parentName, path, parentFlowFile, storagePrefix, session); + }); + } + } + // 对于基本类型节点,不进行处理,因为它们不是可以被拆分的层级 + } + + /** + * 辅助方法:检查一个 JsonNode 是否为叶子对象。 + * + * @param node 要检查的 JsonNode + * @return 如果是叶子对象则返回 true,否则返回 false。 + */ + private boolean isLeafObject(JsonNode node) { + if (!node.isObject()) { + return false; + } + Iterator> fields = node.fields(); + while (fields.hasNext()) { + Map.Entry field = fields.next(); + if (field.getValue().isObject() || field.getValue().isArray()) { + return false; + } + } + return true; + } + + /** + * 将处理后的 JSON 内容写入新的 FlowFile 并传输。 + * + * @param contentNode 要写入 FlowFile 的 JSON 内容 + * @param path 当前节点在 JSON 结构中的路径 + * @param parentFlowFile 原始 FlowFile + * @param storagePrefix storageprefix 属性值 + * @param session NiFi 处理会话 + */ + private void publishFlowFile(JsonNode contentNode, String path, FlowFile parentFlowFile, String storagePrefix, ProcessSession session) { + FlowFile newFlowFile = session.create(parentFlowFile); + newFlowFile = session.write(newFlowFile, new OutputStreamCallback() { + @Override + public void process(final OutputStream out) throws IOException { + // 将 JSON 内容写入输出流 + objectMapper.writeValue(out, contentNode); + } + }); + + // 设置 table_name 属性 + String tableNamePath = path.isEmpty() ? "data_details" : path; + newFlowFile = session.putAttribute(newFlowFile, "table_name", storagePrefix + "_" + tableNamePath); + + // 传输新创建的 FlowFile 到成功关系 + session.transfer(newFlowFile, REL_SUCCESS); + getLogger().debug("已使用表名创建新的FlowFile {}:{}", newFlowFile, storagePrefix + "_" + tableNamePath); + } +} diff --git a/nifi-hzyadev-bundle/hzya-nifi-JsonSplitter-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-hzyadev-bundle/hzya-nifi-JsonSplitter-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor new file mode 100644 index 0000000..09fd489 --- /dev/null +++ b/nifi-hzyadev-bundle/hzya-nifi-JsonSplitter-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -0,0 +1,15 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +com.hzya.frame.DevJsonSplitterProcessor \ No newline at end of file diff --git a/nifi-hzyadev-bundle/hzya-nifi-AutoJsonTableCreate-nar/pom.xml b/nifi-hzyadev-bundle/hzya-nifi-package-nar/pom.xml similarity index 58% rename from nifi-hzyadev-bundle/hzya-nifi-AutoJsonTableCreate-nar/pom.xml rename to nifi-hzyadev-bundle/hzya-nifi-package-nar/pom.xml index e82856f..0602135 100644 --- a/nifi-hzyadev-bundle/hzya-nifi-AutoJsonTableCreate-nar/pom.xml +++ b/nifi-hzyadev-bundle/hzya-nifi-package-nar/pom.xml @@ -9,7 +9,7 @@ 4.0.0 - hzya-nifi-AutoJsonTableCreate-nar + hzya-nifi-package-nar nar @@ -19,11 +19,24 @@ + com.hzya - hzya-nifi-AutoJsonTableCreate-processors + hzya-nifi-JsonSplitter-processors 1.0 + + + + + + + + + + + + org.apache.nifi nifi-standard-services-api-nar @@ -31,4 +44,5 @@ nar + \ No newline at end of file diff --git a/nifi-hzyadev-bundle/pom.xml b/nifi-hzyadev-bundle/pom.xml index f94fdda..eb51c81 100644 --- a/nifi-hzyadev-bundle/pom.xml +++ b/nifi-hzyadev-bundle/pom.xml @@ -9,10 +9,12 @@ pom 1.0 - hzya-nifi-DevGeneratePaginatedSqlProcessor-nar + hzya-nifi-DevGeneratePaginatedSqlProcessor-processors - hzya-nifi-AutoJsonTableCreate-nar + hzya-nifi-AutoJsonTableCreate-processors + hzya-nifi-package-nar + hzya-nifi-JsonSplitter-processors