Compare commits

..

3 Commits

Author SHA1 Message Date
liuy d34165962c fix(hzya-nifi-GetJackyunOpenData-processors):优化数据获取判断逻辑
-补充判断吉客云接口返回数据的两种情况
-增加 checkIsNull 方法检查 JSON 对象是否为空
- 优化数据获取流程,确保准确判断是否有数据
2025-09-12 13:17:46 +08:00
liuy 28867f0109 Merge remote-tracking branch 'origin/master'
# Conflicts:
#	nifi-hzyadev-bundle/hzya-nifi-AdvancedJoltTransformer-processors/src/main/java/com/hzya/frame/AdvancedJoltTransformerProcessor.java
2025-09-12 10:22:45 +08:00
liuy f3cd0dc5dc refactor: 更新自定义jolt转换器类的注释
- 在类AdvancedJoltTransformerProcessor的注释中增加了对JSONata的支持
- 表达了jolt的使用难度,为后续可能的替换做铺垫
2025-09-12 10:20:46 +08:00
1 changed files with 28 additions and 7 deletions

View File

@ -1,6 +1,7 @@
package com.hzya.frame;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.hzya.frame.dto.ResultDataDto;
import com.hzya.frame.dto.ResultDto;
@ -243,8 +244,17 @@ public class GetJackyunOpenDataProcessor extends AbstractProcessor {
}
ResultDto result = resultDataDto.getResult();
if (result != null && result.getData() != null && !result.getData().isEmpty()) {
hasFetchedDataInThisRun = true;
//2025年9月12日11:39:07 补充判断逻辑
//吉客云返回接口的类型可能出现两种情况
//{"code":200,"msg":"","result":{"data":{}}}
//{"code":200,"msg":"","result":{"data":{"warehouseInfo":[]}}}
if (result != null && result.getData() != null && result.getData().size() > 0 && result.getData().get(0) instanceof JSONObject) {
JSONObject jsonObject = (JSONObject) result.getData().get(0);
if (!checkIsNull(jsonObject)) {
hasNextPageInWindow = false;
}
}
if (hasNextPageInWindow && result != null && result.getData() != null && result.getData().size() > 0 && !result.getData().isEmpty()) {
getLogger().info("任务 '{}' 在时间窗 [{} -> {}] 的第 {} 页获取到 {} 条数据。", stateKey, windowStartTimeStr, windowEndTimeStr, currentPage, result.getData().size());
FlowFile ff = session.create();
@ -268,11 +278,7 @@ public class GetJackyunOpenDataProcessor extends AbstractProcessor {
// 成功处理完一个时间窗口包括所有页立即更新MongoDB中的状态
String newWatermarkStr = sdf.format(new Date(newWatermarkMillis));
stateCollection.updateOne(
Filters.eq("_id", stateKey),
Updates.set("value", newWatermarkStr),
new UpdateOptions().upsert(true)
);
stateCollection.updateOne(Filters.eq("_id", stateKey), Updates.set("value", newWatermarkStr), new UpdateOptions().upsert(true));
getLogger().info("任务 '{}' 的时间窗口 [{} -> {}] 数据同步完成,状态水位线更新至: {}", stateKey, windowStartTimeStr, windowEndTimeStr, newWatermarkStr);
// 更新循环变量为下一个时间窗口做准备
@ -353,4 +359,19 @@ public class GetJackyunOpenDataProcessor extends AbstractProcessor {
}
return hexValue.toString();
}
private boolean checkIsNull(JSONObject jsonObject) {
if (jsonObject != null) {
for (Map.Entry<String, Object> entry : jsonObject.entrySet()) {
String key = entry.getKey();
Object value = entry.getValue();
if (value instanceof JSONArray) {
if (((JSONArray) value).size() > 0) {
return true;
}
}
}
}
return false;
}
}