From d34165962c5d5fbfe00f74067576b7fad8a67f38 Mon Sep 17 00:00:00 2001 From: liuy <37787198+LiuyCodes@users.noreply.github.com> Date: Fri, 12 Sep 2025 13:17:46 +0800 Subject: [PATCH] =?UTF-8?q?fix(hzya-nifi-GetJackyunOpenData-processors):?= =?UTF-8?q?=E4=BC=98=E5=8C=96=E6=95=B0=E6=8D=AE=E8=8E=B7=E5=8F=96=E5=88=A4?= =?UTF-8?q?=E6=96=AD=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit -补充判断吉客云接口返回数据的两种情况 -增加 checkIsNull 方法检查 JSON 对象是否为空 - 优化数据获取流程,确保准确判断是否有数据 --- .../frame/GetJackyunOpenDataProcessor.java | 35 +++++++++++++++---- 1 file changed, 28 insertions(+), 7 deletions(-) diff --git a/nifi-hzyadev-bundle/hzya-nifi-GetJackyunOpenData-processors/src/main/java/com/hzya/frame/GetJackyunOpenDataProcessor.java b/nifi-hzyadev-bundle/hzya-nifi-GetJackyunOpenData-processors/src/main/java/com/hzya/frame/GetJackyunOpenDataProcessor.java index eb8f87e..0f48f34 100644 --- a/nifi-hzyadev-bundle/hzya-nifi-GetJackyunOpenData-processors/src/main/java/com/hzya/frame/GetJackyunOpenDataProcessor.java +++ b/nifi-hzyadev-bundle/hzya-nifi-GetJackyunOpenData-processors/src/main/java/com/hzya/frame/GetJackyunOpenDataProcessor.java @@ -1,6 +1,7 @@ package com.hzya.frame; import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.hzya.frame.dto.ResultDataDto; import com.hzya.frame.dto.ResultDto; @@ -243,8 +244,17 @@ public class GetJackyunOpenDataProcessor extends AbstractProcessor { } ResultDto result = resultDataDto.getResult(); - if (result != null && result.getData() != null && !result.getData().isEmpty()) { - hasFetchedDataInThisRun = true; + //2025年9月12日11:39:07 补充判断逻辑 + //吉客云返回接口的类型,可能出现两种情况: + //{"code":200,"msg":"","result":{"data":{}}} + //{"code":200,"msg":"","result":{"data":{"warehouseInfo":[]}}} + if (result != null && result.getData() != null && result.getData().size() > 0 && result.getData().get(0) instanceof JSONObject) { + JSONObject jsonObject = (JSONObject) result.getData().get(0); + if (!checkIsNull(jsonObject)) { + hasNextPageInWindow = false; + } + } + if (hasNextPageInWindow && result != null && result.getData() != null && result.getData().size() > 0 && !result.getData().isEmpty()) { getLogger().info("任务 '{}' 在时间窗 [{} -> {}] 的第 {} 页获取到 {} 条数据。", stateKey, windowStartTimeStr, windowEndTimeStr, currentPage, result.getData().size()); FlowFile ff = session.create(); @@ -268,11 +278,7 @@ public class GetJackyunOpenDataProcessor extends AbstractProcessor { // 成功处理完一个时间窗口(包括所有页),立即更新MongoDB中的状态 String newWatermarkStr = sdf.format(new Date(newWatermarkMillis)); - stateCollection.updateOne( - Filters.eq("_id", stateKey), - Updates.set("value", newWatermarkStr), - new UpdateOptions().upsert(true) - ); + stateCollection.updateOne(Filters.eq("_id", stateKey), Updates.set("value", newWatermarkStr), new UpdateOptions().upsert(true)); getLogger().info("任务 '{}' 的时间窗口 [{} -> {}] 数据同步完成,状态水位线更新至: {}", stateKey, windowStartTimeStr, windowEndTimeStr, newWatermarkStr); // 更新循环变量,为下一个时间窗口做准备 @@ -353,4 +359,19 @@ public class GetJackyunOpenDataProcessor extends AbstractProcessor { } return hexValue.toString(); } + + private boolean checkIsNull(JSONObject jsonObject) { + if (jsonObject != null) { + for (Map.Entry entry : jsonObject.entrySet()) { + String key = entry.getKey(); + Object value = entry.getValue(); + if (value instanceof JSONArray) { + if (((JSONArray) value).size() > 0) { + return true; + } + } + } + } + return false; + } } \ No newline at end of file