feat(processor): 添加处理失败的 REL_FAILURE 关系

- 新增 REL_FAILURE 关系,用于处理过程中发生错误的 FlowFile
- 修改代码,将原本传递给 REL_ORIGINAL 的错误情况改为传递给 REL_FAILURE
- 优化错误日志输出,提高可读性和准确性
This commit is contained in:
liuy 2025-07-11 11:14:52 +08:00
parent f946c18295
commit ac1f4a4602
1 changed files with 7 additions and 3 deletions

View File

@ -70,6 +70,9 @@ public class DevJsonSplitterProcessor extends AbstractProcessor {
// 定义原始关系用于未拆分的单层级 JSON FlowFile
public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("未拆分的原始FlowFiles单层JSON").build();
// 定义失败关系用于未拆分的单层级 JSON FlowFile
public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("如果在处理过程中发生错误则将原始FlowFile路由到此关系。").build();
private List<PropertyDescriptor> properties;
private Set<Relationship> relationships;
private ObjectMapper objectMapper;
@ -92,6 +95,7 @@ public class DevJsonSplitterProcessor extends AbstractProcessor {
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
relationships.add(REL_ORIGINAL);
relationships.add(REL_FAILURE);
this.relationships = Collections.unmodifiableSet(relationships);
// 初始化 Jackson ObjectMapper
@ -122,8 +126,8 @@ public class DevJsonSplitterProcessor extends AbstractProcessor {
// 获取 storageprefix 属性
final String storagePrefix = flowFile.getAttribute("storageprefix");
if (storagePrefix == null) {
getLogger().warn("FlowFile没有“storageprefix”属性。转为REL_ORIGINAL", flowFile);
session.transfer(flowFile, REL_ORIGINAL); // 如果没有 storageprefix则直接传递给原始关系
getLogger().warn("FlowFile没有“storageprefix”属性。转为REL_FAILURE", flowFile);
session.transfer(flowFile, REL_FAILURE); // 如果没有 storageprefix则直接传递给失败
return;
}
@ -143,7 +147,7 @@ public class DevJsonSplitterProcessor extends AbstractProcessor {
rootNode = objectMapper.readTree(jsonContent.get());
} catch (IOException e) {
getLogger().error("未能解析FlowFile{}的JSON:{}", flowFile, e.getMessage(), e);
session.transfer(flowFile, REL_ORIGINAL); // 解析失败传递给原始关系
session.transfer(flowFile, REL_FAILURE); // 解析失败传递给失败
return;
}