feat(GetJackyunOpenDataProcessor): 增加单次全量同步模式

- 新增 syncByTime 变量判断是否按时间窗口同步
- 改造循环结构以支持单次全量同步
- 优化时间参数设置逻辑
- 调整状态更新和循环控制逻辑
This commit is contained in:
liuy 2025-09-12 14:44:40 +08:00
parent d34165962c
commit c25d5210ef
2 changed files with 263 additions and 83 deletions

View File

@ -165,15 +165,176 @@ public class AdvancedJoltTransformerProcessorTest {
@Test
public void testLogicallyInvalidJoltSpec() throws UnsupportedEncodingException {
// 这个规范是有效的JSON但在调用 Chainr.fromSpec() 时会失败
final String badLogicJoltSpec = "[{\"spec\": {\"a\": \"b\"}}]";
final String inputJson = "{\"a\": 1}";
final String badLogicJoltSpec = "[\n" +
" {\n" +
" \"operation\": \"shift\",\n" +
" \"spec\": {\n" +
" \"warehouseId\": \"_id\",\n" +
" \"@\": \"warehouseId\",\n" +
" \"*\": \"&\"\n" +
" }\n" +
" }\n" +
"]";
final String inputJson = "{\n" +
" \"memo\": \"\",\n" +
" \"isPositonStock\": 0,\n" +
" \"linkMan\": \"熊猫有礼\",\n" +
" \"warehouseTypeCode\": \"2\",\n" +
" \"warehouseDepartId\": \"2280428727957685504\",\n" +
" \"warehouseDepartCode\": \"04\",\n" +
" \"outsideCode\": \"HKZC\",\n" +
" \"tel\": \"18824350139\",\n" +
" \"longitude\": \"\",\n" +
" \"warehouseGroupId\": \"\",\n" +
" \"townName\": \"萧山区\",\n" +
" \"postcode\": \"\",\n" +
" \"warehouseGroupName\": \"\",\n" +
" \"isInoutTask\": 1,\n" +
" \"accountId\": \"2280430954952884608\",\n" +
" \"warehouseId\": \"2280429829956665728\",\n" +
" \"forbidAddWarehouse\": 0,\n" +
" \"logsn\": \"2302270119270320000\",\n" +
" \"countryName\": \"中国\",\n" +
" \"provinceName\": \"浙江省\",\n" +
" \"warehouseType\": 0,\n" +
" \"deliveryArea\": \"\",\n" +
" \"gmtModified\": 1757409157000,\n" +
" \"accountName\": \"萧山大江东非常鱼块仓库\",\n" +
" \"isLimitAllocate\": 0,\n" +
" \"latitude\": \"\",\n" +
" \"salesList\": [\n" +
" {\n" +
" \"warehouseId\": \"2280429829956665728\",\n" +
" \"channelName\": \"罗田县洲润庚亿商贸行\",\n" +
" \"channelId\": \"2289274112970392832\",\n" +
" \"channelCode\": \"0402\"\n" +
" },\n" +
" {\n" +
" \"warehouseId\": \"2280429829956665728\",\n" +
" \"channelName\": \"[微鹏达-代发]杭州力赏有品贸易有限公司\",\n" +
" \"channelId\": \"2286296843909398912\",\n" +
" \"channelCode\": \"0383\"\n" +
" },\n" +
" {\n" +
" \"warehouseId\": \"2280429829956665728\",\n" +
" \"channelName\": \"郴州团团贸易有限公司\",\n" +
" \"channelId\": \"2291453253081170816\",\n" +
" \"channelCode\": \"0429\"\n" +
" },\n" +
" {\n" +
" \"warehouseId\": \"2280429829956665728\",\n" +
" \"channelName\": \"[力赞-代发]广州博远科技有限公司\",\n" +
" \"channelId\": \"2292173369424020352\",\n" +
" \"channelCode\": \"C022769-02\"\n" +
" },\n" +
" {\n" +
" \"warehouseId\": \"2280429829956665728\",\n" +
" \"channelName\": \"[赞货-代发]杭州汇涵实业有限公司\",\n" +
" \"channelId\": \"2292173374952113024\",\n" +
" \"channelCode\": \"C062900-02\"\n" +
" },\n" +
" {\n" +
" \"warehouseId\": \"2280429829956665728\",\n" +
" \"channelName\": \"[力赞-代发]泰州市臻拓贸易有限公司\",\n" +
" \"channelId\": \"2292173377779073920\",\n" +
" \"channelCode\": \"C063690-02\"\n" +
" },\n" +
" {\n" +
" \"warehouseId\": \"2280429829956665728\",\n" +
" \"channelName\": \"[力赞-代发]杭州望澜品牌管理有限公司-吴欣欣\",\n" +
" \"channelId\": \"2292173378609546112\",\n" +
" \"channelCode\": \"C064220-02\"\n" +
" },\n" +
" {\n" +
" \"warehouseId\": \"2280429829956665728\",\n" +
" \"channelName\": \"[力赞-代发]湖北志梓商贸有限公司\",\n" +
" \"channelId\": \"2292173379465184128\",\n" +
" \"channelCode\": \"C064265-02\"\n" +
" },\n" +
" {\n" +
" \"warehouseId\": \"2280429829956665728\",\n" +
" \"channelName\": \"[欧阳个人-代发]供应链2-小蜜蜂-黄小鹏\",\n" +
" \"channelId\": \"2292173383323943808\",\n" +
" \"channelCode\": \"C064811-02\"\n" +
" },\n" +
" {\n" +
" \"warehouseId\": \"2280429829956665728\",\n" +
" \"channelName\": \"[力赞-代发]克东神州买卖提电子商务有限公司\",\n" +
" \"channelId\": \"2292173384171193216\",\n" +
" \"channelCode\": \"C065931-02\"\n" +
" },\n" +
" {\n" +
" \"warehouseId\": \"2280429829956665728\",\n" +
" \"channelName\": \"[力赞-代发]青岛信瑜时代网络发展有限公司\",\n" +
" \"channelId\": \"2292173385890857856\",\n" +
" \"channelCode\": \"C065953-02\"\n" +
" },\n" +
" {\n" +
" \"warehouseId\": \"2280429829956665728\",\n" +
" \"channelName\": \"[力赞-代发]江宁区指定能行百货经营部(个体工商户)\",\n" +
" \"channelId\": \"2292173386754884480\",\n" +
" \"channelCode\": \"C066026-02\"\n" +
" },\n" +
" {\n" +
" \"warehouseId\": \"2280429829956665728\",\n" +
" \"channelName\": \"[力赞-代发]长沙拾物堂生物科技有限公司\",\n" +
" \"channelId\": \"2292173388482937728\",\n" +
" \"channelCode\": \"C066049-02\"\n" +
" },\n" +
" {\n" +
" \"warehouseId\": \"2280429829956665728\",\n" +
" \"channelName\": \"[欧阳个人-代发]淘宝-始终饮品店铺-牛阳\",\n" +
" \"channelId\": \"2292173398020785024\",\n" +
" \"channelCode\": \"C074436-02\"\n" +
" },\n" +
" {\n" +
" \"warehouseId\": \"2280429829956665728\",\n" +
" \"channelName\": \"[力赞-代发]四川灵纳科技有限公司\",\n" +
" \"channelId\": \"2292173399807558528\",\n" +
" \"channelCode\": \"C074534-02\"\n" +
" },\n" +
" {\n" +
" \"warehouseId\": \"2280429829956665728\",\n" +
" \"channelName\": \"[力赞-代发]安徽泠水电子商务有限公司\",\n" +
" \"channelId\": \"2292173401510445952\",\n" +
" \"channelCode\": \"C074660-02\"\n" +
" },\n" +
" {\n" +
" \"warehouseId\": \"2280429829956665728\",\n" +
" \"channelName\": \"[微鹏达-代发]杭州飞鸿腾驰商务咨询服务有限公司\",\n" +
" \"channelId\": \"2292904251210138880\",\n" +
" \"channelCode\": \"0449\"\n" +
" }\n" +
" ],\n" +
" \"warehouseCompanyId\": \"2266861330852806912\",\n" +
" \"warehouseCompanyCode\": \"08\",\n" +
" \"warehouseName\": \"萧山大江东非常鱼块仓库\",\n" +
" \"warehouseCode\": \"0033\",\n" +
" \"isNegativeStock\": 0,\n" +
" \"principal\": \"[{\\\"userName\\\":\\\"\\\",\\\"userId\\\":\\\"\\\"}]\",\n" +
" \"interfaceType\": \"301138\",\n" +
" \"streetName\": \"\",\n" +
" \"cityName\": \"杭州市\",\n" +
" \"warehouseParentClassName\": \"\",\n" +
" \"warehouseOrderIndex\": 0,\n" +
" \"interfaceName\": \"奇门仓储\",\n" +
" \"address\": \"大江东\",\n" +
" \"isDelete\": 0,\n" +
" \"wmsHybrid\": 0,\n" +
" \"gmtCreate\": 1755048419000,\n" +
" \"isBlockup\": 0,\n" +
" \"warehouseTypeName\": \"委外托管\",\n" +
" \"warehouseDepartName\": \"仓储部\",\n" +
" \"warehouseCompanyName\": \"浙江非常鱼块科技有限公司\",\n" +
" \"warehouseClassName\": \"\"\n" +
"}";
runner.setProperty(AdvancedJoltTransformerProcessor.JOLT_SPEC, badLogicJoltSpec);
runner.enqueue(inputJson.getBytes("UTF-8"));
runner.run();
// 期望FlowFile被路由到failure关系
runner.assertTransferCount(AdvancedJoltTransformerProcessor.REL_SUCCESS, 0);
runner.assertTransferCount(AdvancedJoltTransformerProcessor.REL_FAILURE, 1);
// runner.assertTransferCount(AdvancedJoltTransformerProcessor.REL_SUCCESS, 0);
// runner.assertTransferCount(AdvancedJoltTransformerProcessor.REL_FAILURE, 1);
}
}

View File

@ -142,7 +142,6 @@ public class GetJackyunOpenDataProcessor extends AbstractProcessor {
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
// 从属性中动态获取用户为当前任务指定的状态键
final String stateKey = context.getProperty(PROP_STATE_KEY_NAME).evaluateAttributeExpressions().getValue();
if (stateKey == null || stateKey.trim().isEmpty()) {
getLogger().error("状态存储键名(State Key)未配置或为空,无法进行状态管理。");
@ -150,87 +149,94 @@ public class GetJackyunOpenDataProcessor extends AbstractProcessor {
return;
}
// 获取MongoDB集合对象
final MongoDatabase database = mongoClient.getDatabase(context.getProperty(PROP_MONGO_DATABASE).getValue());
final MongoCollection<Document> stateCollection = database.getCollection(context.getProperty(PROP_MONGO_COLLECTION).getValue());
try {
// 从属性中获取用户配置的开始和结束时间参数名
String startTimeParamName = context.getProperty(PROP_START_TIME_PARAM_NAME).getValue();
String endTimeParamName = context.getProperty(PROP_END_TIME_PARAM_NAME).getValue();
// 1. 确定需要追赶的最终目标时间
long safetyBufferMillis = context.getProperty(PROP_END_TIME_SAFETY_BUFFER).asTimePeriod(TimeUnit.MILLISECONDS);
long catchUpTargetTimeMillis = System.currentTimeMillis() - safetyBufferMillis;
// ============================ 核心修改 1: 增加模式判断 ============================
// 判断本次执行是否需要按时间窗口进行同步
final boolean syncByTime = !"null".equalsIgnoreCase(context.getProperty(PROP_INITIAL_START_TIME).getValue());
// ==========================================================================
// 2. 从MongoDB获取上次成功同步的时间点
Document stateDoc = stateCollection.find(Filters.eq("_id", stateKey)).first();
String lastSyncTimeStr = (stateDoc != null) ? stateDoc.getString("value") : null;
long loopStartTimeMillis = 0;
long catchUpTargetTimeMillis = System.currentTimeMillis(); // 初始化一个值
long loopStartTimeMillis;
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
if (lastSyncTimeStr == null || lastSyncTimeStr.isEmpty()) {
// 首次运行使用用户配置的初始时间
loopStartTimeMillis = sdf.parse(context.getProperty(PROP_INITIAL_START_TIME).getValue()).getTime();
getLogger().info("未在MongoDB中找到状态键 '{}' 的记录,将使用初始同步时间: {}", stateKey, sdf.format(new Date(loopStartTimeMillis)));
if (syncByTime) {
// --- 按时间同步模式的初始化逻辑 ---
long safetyBufferMillis = context.getProperty(PROP_END_TIME_SAFETY_BUFFER).asTimePeriod(TimeUnit.MILLISECONDS);
catchUpTargetTimeMillis = System.currentTimeMillis() - safetyBufferMillis;
Document stateDoc = stateCollection.find(Filters.eq("_id", stateKey)).first();
String lastSyncTimeStr = (stateDoc != null) ? stateDoc.getString("value") : null;
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
if (lastSyncTimeStr == null || lastSyncTimeStr.isEmpty()) {
loopStartTimeMillis = sdf.parse(context.getProperty(PROP_INITIAL_START_TIME).getValue()).getTime();
getLogger().info("按时间同步模式: 未在MongoDB中找到状态键 '{}' 的记录,将使用初始同步时间: {}", stateKey, sdf.format(new Date(loopStartTimeMillis)));
} else {
loopStartTimeMillis = sdf.parse(lastSyncTimeStr).getTime();
}
if (loopStartTimeMillis >= catchUpTargetTimeMillis) {
getLogger().info("任务 '{}' 已同步至最新进度(水位线: {}), 无需执行本次任务。", stateKey, sdf.format(new Date(loopStartTimeMillis)));
context.yield();
return;
}
} else {
// 后续运行使用上次保存的时间
loopStartTimeMillis = sdf.parse(lastSyncTimeStr).getTime();
// --- 单次全量模式的提示信息 ---
getLogger().info("单次全量同步模式: 任务 '{}' 将执行一次完整的全量分页拉取。", stateKey);
}
// 如果水位线已经超过或等于目标时间说明已追上无需执行
if (loopStartTimeMillis >= catchUpTargetTimeMillis) {
getLogger().info("任务 '{}' 已同步至最新进度(水位线: {}), 无需执行本次任务。", stateKey, sdf.format(new Date(loopStartTimeMillis)));
context.yield(); // 放弃本次执行提高效率
return;
}
// 3. 开始追赶循环将大时间段切分为小窗口
long timeWindowMillis = context.getProperty(PROP_TIME_WINDOW_INCREMENT).asTimePeriod(TimeUnit.MILLISECONDS);
boolean hasFetchedDataInThisRun = false;
while (loopStartTimeMillis < catchUpTargetTimeMillis) {
// ============================ 边界值处理核心修改 START ============================
// ============================ 核心修改 2: 改造循环结构 ============================
boolean continueLoop;
do {
String windowStartTimeStr = null;
String windowEndTimeStr = null;
long newWatermarkMillis = 0;
// 1. 定义下一个时间窗口的理论开始时间 (即当前窗口的边界)
long nextWindowStartTimeMillis = loopStartTimeMillis + timeWindowMillis;
if (syncByTime) {
// --- 按时间同步模式: 计算时间窗口 ---
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
long nextWindowStartTimeMillis = loopStartTimeMillis + timeWindowMillis;
long apiQueryEndTimeMillis;
// 2. 确定本次API调用的实际结束时间 (apiQueryEndTimeMillis) 和下次循环的开始时间 (newWatermarkMillis)
long apiQueryEndTimeMillis;
long newWatermarkMillis;
if (nextWindowStartTimeMillis >= catchUpTargetTimeMillis) {
// 这是最后一个窗口
// API查询的结束时间就是最终目标时间
apiQueryEndTimeMillis = catchUpTargetTimeMillis;
// 新的水位线(下次循环的起点)也更新为最终目标时间
newWatermarkMillis = catchUpTargetTimeMillis;
} else {
// 非最后一个窗口
// API查询的结束时间是下一个窗口开始时间的前一秒构成一个无重叠的闭合区间
// 例如对于 [01:00:00 ... 02:00:00) 的范围实际查询 [01:00:00, 01:59:59]
apiQueryEndTimeMillis = nextWindowStartTimeMillis - 1000; // 减去1000毫秒 (1秒)
// 但新的水位线必须是下一个窗口的准确开始时间以确保连续性
newWatermarkMillis = nextWindowStartTimeMillis;
if (nextWindowStartTimeMillis >= catchUpTargetTimeMillis) {
apiQueryEndTimeMillis = catchUpTargetTimeMillis;
newWatermarkMillis = catchUpTargetTimeMillis;
} else {
apiQueryEndTimeMillis = nextWindowStartTimeMillis - 1000;
newWatermarkMillis = nextWindowStartTimeMillis;
}
windowStartTimeStr = sdf.format(new Date(loopStartTimeMillis));
windowEndTimeStr = sdf.format(new Date(apiQueryEndTimeMillis));
getLogger().debug("任务 '{}' 准备处理时间窗口: [{} -> {}]", stateKey, windowStartTimeStr, windowEndTimeStr);
}
// 3. 格式化API所需的时间字符串
String windowStartTimeStr = sdf.format(new Date(loopStartTimeMillis));
String windowEndTimeStr = sdf.format(new Date(apiQueryEndTimeMillis));
// ============================ 边界值处理核心修改 END ==============================
getLogger().debug("任务 '{}' 准备处理时间窗口: [{} -> {}]", stateKey, windowStartTimeStr, windowEndTimeStr);
// --- 内部分页循环 ---
// --- 内部分页循环 (两种模式通用) ---
int currentPage = 0;
boolean hasNextPageInWindow = true;
while (hasNextPageInWindow) {
JSONObject bizData = JSON.parseObject(context.getProperty(PROP_REQUEST_BIZ_CONTENT).getValue());
bizData.put("pageIndex", String.valueOf(currentPage));
bizData.put("pageSize", context.getProperty(PROP_PAGE_SIZE).getValue());
bizData.put(startTimeParamName, windowStartTimeStr);
bizData.put(endTimeParamName, windowEndTimeStr);
// ============================ 核心修改 3: 条件化设置时间参数 ============================
if (syncByTime) {
if (startTimeParamName != null && !"".equals(startTimeParamName) && !"null".equalsIgnoreCase(startTimeParamName)) {
bizData.put(startTimeParamName, windowStartTimeStr);
}
if (endTimeParamName != null && !"".equals(endTimeParamName) && !"null".equalsIgnoreCase(endTimeParamName)) {
bizData.put(endTimeParamName, windowEndTimeStr);
}
}
// ==============================================================================
ResultDataDto resultDataDto = callJackyunApi(context, bizData);
@ -240,50 +246,61 @@ public class GetJackyunOpenDataProcessor extends AbstractProcessor {
FlowFile failureFlowFile = session.create();
failureFlowFile = session.write(failureFlowFile, out -> out.write(errorMsg.getBytes(StandardCharsets.UTF_8)));
session.transfer(failureFlowFile, REL_FAILURE);
return; // 发生业务错误终止本次执行
return;
}
ResultDto result = resultDataDto.getResult();
//2025年9月12日11:39:07 补充判断逻辑
//吉客云返回接口的类型可能出现两种情况
//{"code":200,"msg":"","result":{"data":{}}}
//{"code":200,"msg":"","result":{"data":{"warehouseInfo":[]}}}
if (result != null && result.getData() != null && result.getData().size() > 0 && result.getData().get(0) instanceof JSONObject) {
JSONObject jsonObject = (JSONObject) result.getData().get(0);
if (!checkIsNull(jsonObject)) {
hasNextPageInWindow = false;
boolean hasDataInPage = false;
if (result != null && result.getData() != null && !result.getData().isEmpty()) {
if (result.getData().get(0) instanceof JSONObject) {
JSONObject jsonObject = (JSONObject) result.getData().get(0);
if (checkIsNull(jsonObject)) {
hasDataInPage = true;
}
} else {
hasDataInPage = true;
}
}
if (hasNextPageInWindow && result != null && result.getData() != null && result.getData().size() > 0 && !result.getData().isEmpty()) {
getLogger().info("任务 '{}' 在时间窗 [{} -> {}] 的第 {} 页获取到 {} 条数据。", stateKey, windowStartTimeStr, windowEndTimeStr, currentPage, result.getData().size());
if (hasDataInPage) {
hasFetchedDataInThisRun = true; // 标记本次执行获取到了数据
getLogger().info("任务 '{}' 在第 {} 页获取到 {} 条数据。", stateKey, currentPage, result.getData().size());
FlowFile ff = session.create();
ff = session.putAttribute(ff, "jackyun.api.method", context.getProperty(PROP_API_METHOD).getValue());
ff = session.putAttribute(ff, "jackyun.api.startTime", windowStartTimeStr);
ff = session.putAttribute(ff, "jackyun.api.endTime", windowEndTimeStr);
ff = session.putAttribute(ff, "jackyun.api.pageIndex", String.valueOf(currentPage));
if (syncByTime) {
ff = session.putAttribute(ff, "jackyun.api.startTime", windowStartTimeStr);
ff = session.putAttribute(ff, "jackyun.api.endTime", windowEndTimeStr);
}
ff = session.write(ff, out -> out.write(JSON.toJSONString(result.getData()).getBytes(StandardCharsets.UTF_8)));
session.transfer(ff, REL_SUCCESS);
session.commitAsync();
currentPage++;
// 如果返回的数据量小于每页大小可以推断这是最后一页但为了保险起见继续查询直到返回空数据
// hasNextPageInWindow = result.getData().size() == Integer.parseInt(context.getProperty(PROP_PAGE_SIZE).getValue());
} else {
// API返回的数据为空说明当前时间窗口已无更多数据
hasNextPageInWindow = false;
}
} // --- 分页循环结束 ---
// 成功处理完一个时间窗口包括所有页立即更新MongoDB中的状态
String newWatermarkStr = sdf.format(new Date(newWatermarkMillis));
stateCollection.updateOne(Filters.eq("_id", stateKey), Updates.set("value", newWatermarkStr), new UpdateOptions().upsert(true));
getLogger().info("任务 '{}' 的时间窗口 [{} -> {}] 数据同步完成,状态水位线更新至: {}", stateKey, windowStartTimeStr, windowEndTimeStr, newWatermarkStr);
// ============================ 核心修改 4: 条件化更新状态和循环 ============================
if (syncByTime) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String newWatermarkStr = sdf.format(new Date(newWatermarkMillis));
stateCollection.updateOne(Filters.eq("_id", stateKey), Updates.set("value", newWatermarkStr), new UpdateOptions().upsert(true));
getLogger().info("任务 '{}' 的时间窗口 [{} -> {}] 数据同步完成,状态水位线更新至: {}", stateKey, windowStartTimeStr, windowEndTimeStr, newWatermarkStr);
loopStartTimeMillis = newWatermarkMillis;
// 判断是否继续外层的时间循环
continueLoop = loopStartTimeMillis < catchUpTargetTimeMillis;
} else {
// 如果不是按时间同步则执行一次后立即退出外层循环
continueLoop = false;
}
// =====================================================================================
// 更新循环变量为下一个时间窗口做准备
loopStartTimeMillis = newWatermarkMillis;
} // --- 追赶循环结束 ---
} while (continueLoop); // --- 追赶/单次循环结束 ---
if (!hasFetchedDataInThisRun) {
getLogger().info("任务 '{}' 在本次执行中未发现新数据。", stateKey);
@ -369,6 +386,8 @@ public class GetJackyunOpenDataProcessor extends AbstractProcessor {
if (((JSONArray) value).size() > 0) {
return true;
}
} else {
return true;
}
}
}