diff --git a/nifi-hzyadev-bundle/hzya-nifi-JsonSplitter-processors/src/main/java/com/hzya/frame/DevJsonSplitterProcessor.java b/nifi-hzyadev-bundle/hzya-nifi-JsonSplitter-processors/src/main/java/com/hzya/frame/DevJsonSplitterProcessor.java index b04dbb4..75b6eb4 100644 --- a/nifi-hzyadev-bundle/hzya-nifi-JsonSplitter-processors/src/main/java/com/hzya/frame/DevJsonSplitterProcessor.java +++ b/nifi-hzyadev-bundle/hzya-nifi-JsonSplitter-processors/src/main/java/com/hzya/frame/DevJsonSplitterProcessor.java @@ -70,6 +70,9 @@ public class DevJsonSplitterProcessor extends AbstractProcessor { // 定义原始关系,用于未拆分的单层级 JSON FlowFile public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("未拆分的原始FlowFiles(单层JSON)。").build(); + // 定义失败关系,用于未拆分的单层级 JSON FlowFile + public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("如果在处理过程中发生错误,则将原始FlowFile路由到此关系。").build(); + private List properties; private Set relationships; private ObjectMapper objectMapper; @@ -92,6 +95,7 @@ public class DevJsonSplitterProcessor extends AbstractProcessor { final Set relationships = new HashSet<>(); relationships.add(REL_SUCCESS); relationships.add(REL_ORIGINAL); + relationships.add(REL_FAILURE); this.relationships = Collections.unmodifiableSet(relationships); // 初始化 Jackson ObjectMapper @@ -122,8 +126,8 @@ public class DevJsonSplitterProcessor extends AbstractProcessor { // 获取 storageprefix 属性 final String storagePrefix = flowFile.getAttribute("storageprefix"); if (storagePrefix == null) { - getLogger().warn("FlowFile{}没有“storageprefix”属性。转为REL_ORIGINAL。", flowFile); - session.transfer(flowFile, REL_ORIGINAL); // 如果没有 storageprefix,则直接传递给原始关系 + getLogger().warn("FlowFile{}没有“storageprefix”属性。转为REL_FAILURE。", flowFile); + session.transfer(flowFile, REL_FAILURE); // 如果没有 storageprefix,则直接传递给失败 return; } @@ -143,7 +147,7 @@ public class DevJsonSplitterProcessor extends AbstractProcessor { rootNode = objectMapper.readTree(jsonContent.get()); } catch (IOException e) { getLogger().error("未能解析FlowFile{}的JSON:{}", flowFile, e.getMessage(), e); - session.transfer(flowFile, REL_ORIGINAL); // 解析失败,传递给原始关系 + session.transfer(flowFile, REL_FAILURE); // 解析失败,传递给失败 return; }