Compare commits
3 Commits
e512bddad2
...
d34165962c
Author | SHA1 | Date |
---|---|---|
|
d34165962c | |
|
28867f0109 | |
|
f3cd0dc5dc |
|
@ -1,6 +1,7 @@
|
||||||
package com.hzya.frame;
|
package com.hzya.frame;
|
||||||
|
|
||||||
import com.alibaba.fastjson.JSON;
|
import com.alibaba.fastjson.JSON;
|
||||||
|
import com.alibaba.fastjson.JSONArray;
|
||||||
import com.alibaba.fastjson.JSONObject;
|
import com.alibaba.fastjson.JSONObject;
|
||||||
import com.hzya.frame.dto.ResultDataDto;
|
import com.hzya.frame.dto.ResultDataDto;
|
||||||
import com.hzya.frame.dto.ResultDto;
|
import com.hzya.frame.dto.ResultDto;
|
||||||
|
@ -243,8 +244,17 @@ public class GetJackyunOpenDataProcessor extends AbstractProcessor {
|
||||||
}
|
}
|
||||||
|
|
||||||
ResultDto result = resultDataDto.getResult();
|
ResultDto result = resultDataDto.getResult();
|
||||||
if (result != null && result.getData() != null && !result.getData().isEmpty()) {
|
//2025年9月12日11:39:07 补充判断逻辑
|
||||||
hasFetchedDataInThisRun = true;
|
//吉客云返回接口的类型,可能出现两种情况:
|
||||||
|
//{"code":200,"msg":"","result":{"data":{}}}
|
||||||
|
//{"code":200,"msg":"","result":{"data":{"warehouseInfo":[]}}}
|
||||||
|
if (result != null && result.getData() != null && result.getData().size() > 0 && result.getData().get(0) instanceof JSONObject) {
|
||||||
|
JSONObject jsonObject = (JSONObject) result.getData().get(0);
|
||||||
|
if (!checkIsNull(jsonObject)) {
|
||||||
|
hasNextPageInWindow = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (hasNextPageInWindow && result != null && result.getData() != null && result.getData().size() > 0 && !result.getData().isEmpty()) {
|
||||||
getLogger().info("任务 '{}' 在时间窗 [{} -> {}] 的第 {} 页获取到 {} 条数据。", stateKey, windowStartTimeStr, windowEndTimeStr, currentPage, result.getData().size());
|
getLogger().info("任务 '{}' 在时间窗 [{} -> {}] 的第 {} 页获取到 {} 条数据。", stateKey, windowStartTimeStr, windowEndTimeStr, currentPage, result.getData().size());
|
||||||
|
|
||||||
FlowFile ff = session.create();
|
FlowFile ff = session.create();
|
||||||
|
@ -268,11 +278,7 @@ public class GetJackyunOpenDataProcessor extends AbstractProcessor {
|
||||||
|
|
||||||
// 成功处理完一个时间窗口(包括所有页),立即更新MongoDB中的状态
|
// 成功处理完一个时间窗口(包括所有页),立即更新MongoDB中的状态
|
||||||
String newWatermarkStr = sdf.format(new Date(newWatermarkMillis));
|
String newWatermarkStr = sdf.format(new Date(newWatermarkMillis));
|
||||||
stateCollection.updateOne(
|
stateCollection.updateOne(Filters.eq("_id", stateKey), Updates.set("value", newWatermarkStr), new UpdateOptions().upsert(true));
|
||||||
Filters.eq("_id", stateKey),
|
|
||||||
Updates.set("value", newWatermarkStr),
|
|
||||||
new UpdateOptions().upsert(true)
|
|
||||||
);
|
|
||||||
getLogger().info("任务 '{}' 的时间窗口 [{} -> {}] 数据同步完成,状态水位线更新至: {}", stateKey, windowStartTimeStr, windowEndTimeStr, newWatermarkStr);
|
getLogger().info("任务 '{}' 的时间窗口 [{} -> {}] 数据同步完成,状态水位线更新至: {}", stateKey, windowStartTimeStr, windowEndTimeStr, newWatermarkStr);
|
||||||
|
|
||||||
// 更新循环变量,为下一个时间窗口做准备
|
// 更新循环变量,为下一个时间窗口做准备
|
||||||
|
@ -353,4 +359,19 @@ public class GetJackyunOpenDataProcessor extends AbstractProcessor {
|
||||||
}
|
}
|
||||||
return hexValue.toString();
|
return hexValue.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private boolean checkIsNull(JSONObject jsonObject) {
|
||||||
|
if (jsonObject != null) {
|
||||||
|
for (Map.Entry<String, Object> entry : jsonObject.entrySet()) {
|
||||||
|
String key = entry.getKey();
|
||||||
|
Object value = entry.getValue();
|
||||||
|
if (value instanceof JSONArray) {
|
||||||
|
if (((JSONArray) value).size() > 0) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
}
|
}
|
Loading…
Reference in New Issue