From ac1f4a460278777f26687b6e096850add738c0cf Mon Sep 17 00:00:00 2001 From: liuy <37787198+LiuyCodes@users.noreply.github.com> Date: Fri, 11 Jul 2025 11:14:52 +0800 Subject: [PATCH] =?UTF-8?q?feat(processor):=20=E6=B7=BB=E5=8A=A0=E5=A4=84?= =?UTF-8?q?=E7=90=86=E5=A4=B1=E8=B4=A5=E7=9A=84=20REL=5FFAILURE=20?= =?UTF-8?q?=E5=85=B3=E7=B3=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 REL_FAILURE 关系,用于处理过程中发生错误的 FlowFile - 修改代码,将原本传递给 REL_ORIGINAL 的错误情况改为传递给 REL_FAILURE - 优化错误日志输出,提高可读性和准确性 --- .../java/com/hzya/frame/DevJsonSplitterProcessor.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/nifi-hzyadev-bundle/hzya-nifi-JsonSplitter-processors/src/main/java/com/hzya/frame/DevJsonSplitterProcessor.java b/nifi-hzyadev-bundle/hzya-nifi-JsonSplitter-processors/src/main/java/com/hzya/frame/DevJsonSplitterProcessor.java index b04dbb4..75b6eb4 100644 --- a/nifi-hzyadev-bundle/hzya-nifi-JsonSplitter-processors/src/main/java/com/hzya/frame/DevJsonSplitterProcessor.java +++ b/nifi-hzyadev-bundle/hzya-nifi-JsonSplitter-processors/src/main/java/com/hzya/frame/DevJsonSplitterProcessor.java @@ -70,6 +70,9 @@ public class DevJsonSplitterProcessor extends AbstractProcessor { // 定义原始关系,用于未拆分的单层级 JSON FlowFile public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("未拆分的原始FlowFiles(单层JSON)。").build(); + // 定义失败关系,用于未拆分的单层级 JSON FlowFile + public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("如果在处理过程中发生错误,则将原始FlowFile路由到此关系。").build(); + private List properties; private Set relationships; private ObjectMapper objectMapper; @@ -92,6 +95,7 @@ public class DevJsonSplitterProcessor extends AbstractProcessor { final Set relationships = new HashSet<>(); relationships.add(REL_SUCCESS); relationships.add(REL_ORIGINAL); + relationships.add(REL_FAILURE); this.relationships = Collections.unmodifiableSet(relationships); // 初始化 Jackson ObjectMapper @@ -122,8 +126,8 @@ public class DevJsonSplitterProcessor extends AbstractProcessor { // 获取 storageprefix 属性 final String storagePrefix = flowFile.getAttribute("storageprefix"); if (storagePrefix == null) { - getLogger().warn("FlowFile{}没有“storageprefix”属性。转为REL_ORIGINAL。", flowFile); - session.transfer(flowFile, REL_ORIGINAL); // 如果没有 storageprefix,则直接传递给原始关系 + getLogger().warn("FlowFile{}没有“storageprefix”属性。转为REL_FAILURE。", flowFile); + session.transfer(flowFile, REL_FAILURE); // 如果没有 storageprefix,则直接传递给失败 return; } @@ -143,7 +147,7 @@ public class DevJsonSplitterProcessor extends AbstractProcessor { rootNode = objectMapper.readTree(jsonContent.get()); } catch (IOException e) { getLogger().error("未能解析FlowFile{}的JSON:{}", flowFile, e.getMessage(), e); - session.transfer(flowFile, REL_ORIGINAL); // 解析失败,传递给原始关系 + session.transfer(flowFile, REL_FAILURE); // 解析失败,传递给失败 return; }