diff --git a/nifi-hzyadev-bundle/hzya-nifi-AdvancedJoltTransformer-processors/src/main/java/com/hzya/frame/AdvancedJoltTransformerProcessor.java b/nifi-hzyadev-bundle/hzya-nifi-AdvancedJoltTransformer-processors/src/main/java/com/hzya/frame/AdvancedJoltTransformerProcessor.java index 0a62458..2622890 100644 --- a/nifi-hzyadev-bundle/hzya-nifi-AdvancedJoltTransformer-processors/src/main/java/com/hzya/frame/AdvancedJoltTransformerProcessor.java +++ b/nifi-hzyadev-bundle/hzya-nifi-AdvancedJoltTransformer-processors/src/main/java/com/hzya/frame/AdvancedJoltTransformerProcessor.java @@ -1,8 +1,7 @@ package com.hzya.frame; -import org.apache.nifi.expression.ExpressionLanguageScope; -import org.apache.nifi.processor.AbstractProcessor; import com.bazaarvoice.jolt.Chainr; +import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.nifi.annotation.behavior.ReadsAttribute; import org.apache.nifi.annotation.behavior.ReadsAttributes; @@ -13,6 +12,7 @@ import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.*; import org.apache.nifi.processor.exception.ProcessException; @@ -23,32 +23,19 @@ import org.apache.nifi.processor.util.StandardValidators; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.nio.charset.StandardCharsets; import java.util.*; import java.util.concurrent.atomic.AtomicReference; -/** - * 自定义jolt转换器 - * - * @Author:liuyang - * @Package:com.hzya.frame - * @Project:nifi-hzyadev-bundle - * @name:AdvancedJoltTransformerProcessor - * @Date:2025/7/30 10:29 - * @Filename:AdvancedJoltTransformerProcessor - */ @Tags({"jolt", "json", "transform", "custom"}) @CapabilityDescription("自定义Jolt JSON转换处理器,使用Jolt 0.1.8版本执行JSON数据转换") @SeeAlso({}) @ReadsAttributes({@ReadsAttribute(attribute = "", description = "")}) @WritesAttributes({@WritesAttribute(attribute = "", description = "")}) public class AdvancedJoltTransformerProcessor extends AbstractProcessor { - // 属性:Jolt转换规范 - public static final PropertyDescriptor JOLT_SPEC = new PropertyDescriptor.Builder().name("JOLT_SPEC").displayName("Jolt转换规范").description("Jolt转换规范,JSON格式。支持表达式语言。").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build(); - // 关系:成功和失败 + public static final PropertyDescriptor JOLT_SPEC = new PropertyDescriptor.Builder().name("JOLT_SPEC").displayName("Jolt转换规范").description("Jolt转换规范,JSON格式。支持表达式语言和注释。").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build(); + public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("转换成功的FlowFile").build(); - public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("转换失败的FlowFile").build(); private List descriptors; @@ -57,8 +44,9 @@ public class AdvancedJoltTransformerProcessor extends AbstractProcessor { @Override protected void init(final ProcessorInitializationContext context) { - descriptors = Collections.singletonList(JOLT_SPEC); - descriptors = Collections.unmodifiableList(descriptors); + final List tempDescriptors = new ArrayList<>(); + tempDescriptors.add(JOLT_SPEC); + this.descriptors = Collections.unmodifiableList(tempDescriptors); final Set relationships = new HashSet<>(); relationships.add(REL_SUCCESS); @@ -66,6 +54,7 @@ public class AdvancedJoltTransformerProcessor extends AbstractProcessor { this.relationships = Collections.unmodifiableSet(relationships); this.objectMapper = new ObjectMapper(); + this.objectMapper.configure(JsonParser.Feature.ALLOW_COMMENTS, true); } @Override @@ -80,7 +69,7 @@ public class AdvancedJoltTransformerProcessor extends AbstractProcessor { @OnScheduled public void onScheduled(final ProcessContext context) { - // 处理器启动时的初始化逻辑 + // 无需特定逻辑 } @Override @@ -92,8 +81,6 @@ public class AdvancedJoltTransformerProcessor extends AbstractProcessor { try { String joltSpecStr = context.getProperty(JOLT_SPEC).evaluateAttributeExpressions(flowFile).getValue(); - - // 解析Jolt规范 Object joltSpec; try { joltSpec = objectMapper.readValue(joltSpecStr, Object.class); @@ -103,9 +90,9 @@ public class AdvancedJoltTransformerProcessor extends AbstractProcessor { return; } - // 创建Jolt Chainr Chainr chainr; try { + // 使用不带自定义函数的、最简单的 fromSpec 方法 chainr = Chainr.fromSpec(joltSpec); } catch (Exception e) { getLogger().error("无法创建Jolt Chainr: {}", new Object[]{e.getMessage()}, e); @@ -113,57 +100,49 @@ public class AdvancedJoltTransformerProcessor extends AbstractProcessor { return; } - // 读取FlowFile内容并进行转换 final AtomicReference transformedData = new AtomicReference<>(); final AtomicReference exception = new AtomicReference<>(); - session.read(flowFile, new InputStreamCallback() { - @Override - public void process(InputStream in) throws IOException { - try { - // 读取输入JSON - 兼容JDK 1.8 - StringBuilder inputBuilder = new StringBuilder(); - byte[] buffer = new byte[8192]; - int bytesRead; - while ((bytesRead = in.read(buffer)) != -1) { - inputBuilder.append(new String(buffer, 0, bytesRead, StandardCharsets.UTF_8)); + session.read(flowFile, in -> { + try { + Object inputData = objectMapper.readValue(in, Object.class); + Object result = chainr.transform(inputData); + + // --- [最终解决方案:在Java中手动转换_id类型] --- + if (result instanceof List) { + try { + @SuppressWarnings("unchecked") List> resultList = (List>) result; + for (Map record : resultList) { + if (record.containsKey("_id")) { + Object idValue = record.get("_id"); + if (idValue != null && !(idValue instanceof String)) { + record.put("_id", String.valueOf(idValue)); + } + } + } + } catch (ClassCastException e) { + getLogger().warn("Jolt转换结果中包含非标准格式的元素,无法进行_id类型转换。"); } - String inputJson = inputBuilder.toString(); - - // 解析输入JSON - Object inputData = objectMapper.readValue(inputJson, Object.class); - - // 执行Jolt转换 - Object result = chainr.transform(inputData); - transformedData.set(result); - - } catch (Exception e) { - exception.set(e); } + // --- [代码修改结束] --- + + transformedData.set(result); + + } catch (Exception e) { + exception.set(e); } }); - // 检查是否有异常 if (exception.get() != null) { getLogger().error("JSON转换过程中发生错误: {}", new Object[]{exception.get().getMessage()}, exception.get()); session.transfer(flowFile, REL_FAILURE); return; } - // 将转换结果写入新的FlowFile - flowFile = session.write(flowFile, new OutputStreamCallback() { - @Override - public void process(OutputStream out) throws IOException { - String resultJson = objectMapper.writeValueAsString(transformedData.get()); - out.write(resultJson.getBytes(StandardCharsets.UTF_8)); - } - }); - - // 更新FlowFile属性 + flowFile = session.write(flowFile, out -> objectMapper.writeValue(out, transformedData.get())); flowFile = session.putAttribute(flowFile, "jolt.transformed", "true"); flowFile = session.putAttribute(flowFile, "mime.type", "application/json"); - // 转移到成功关系 session.transfer(flowFile, REL_SUCCESS); getLogger().info("成功转换FlowFile {}", new Object[]{flowFile}); diff --git a/nifi-hzyadev-bundle/hzya-nifi-AdvancedJoltTransformer-processors/src/main/java/com/hzya/frame/joltext/ToString.java b/nifi-hzyadev-bundle/hzya-nifi-AdvancedJoltTransformer-processors/src/main/java/com/hzya/frame/joltext/ToString.java new file mode 100644 index 0000000..278d1fc --- /dev/null +++ b/nifi-hzyadev-bundle/hzya-nifi-AdvancedJoltTransformer-processors/src/main/java/com/hzya/frame/joltext/ToString.java @@ -0,0 +1,38 @@ +//package com.hzya.frame.joltext; // 请确保包名与您的项目一致 +// +//// 必须导入 Jolt 自己的 Optional 类 +//import com.bazaarvoice.jolt.common.Optional; +//import com.bazaarvoice.jolt.modifier.function.Function; +// +///** +// * Jolt 0.1.8 的自定义函数,用于将任何输入对象转换为其字符串表示形式。 +// * 实现了 Jolt 的 Function 接口,使其可以被 Chainr 加载并在表达式中调用。 +// * 使用 @SuppressWarnings("deprecation") 来抑制关于 Function 接口已过时的警告。 +// */ +//@SuppressWarnings("deprecation") +//public class ToString implements Function { +// +// /** +// * Jolt 在执行转换时会调用此方法。 +// * @param args 从JOLT表达式中传递过来的可变参数列表 +// * @return 参数的字符串表示形式,如果参数为null或不存在,则返回Optional.empty() +// */ +// @Override +// public Optional apply(Object... args) { +// // 检查是否有参数传入 +// if (args == null || args.length == 0) { +// return Optional.empty(); +// } +// +// // 我们只关心第一个参数 +// Object firstArg = args[0]; +// +// // 使用 String.valueOf() 可以安全地处理 null 和各种数据类型 +// if (firstArg == null) { +// return Optional.empty(); // 或者 return Optional.of("") 返回空字符串 +// } +// +// // 返回 Jolt 自己的 Optional 包装的字符串结果 +// return Optional.of(String.valueOf(firstArg)); +// } +//} \ No newline at end of file diff --git a/nifi-hzyadev-bundle/hzya-nifi-GetJackyunOpenData-nar/pom.xml b/nifi-hzyadev-bundle/hzya-nifi-GetJackyunOpenData-nar/pom.xml new file mode 100644 index 0000000..eab69dc --- /dev/null +++ b/nifi-hzyadev-bundle/hzya-nifi-GetJackyunOpenData-nar/pom.xml @@ -0,0 +1,34 @@ + + + + nifi-hzyadev-bundle + com.hzya + 1.0 + + 4.0.0 + + hzya-nifi-GetJackyunOpenData-nar + nar + + + 8 + 8 + + + + + com.hzya + hzya-nifi-GetJackyunOpenData-processors + 1.0 + + + org.apache.nifi + nifi-standard-services-api-nar + ${nifi-revision} + nar + + + + \ No newline at end of file diff --git a/nifi-hzyadev-bundle/hzya-nifi-GetJackyunOpenData-processors/pom.xml b/nifi-hzyadev-bundle/hzya-nifi-GetJackyunOpenData-processors/pom.xml new file mode 100644 index 0000000..92bd009 --- /dev/null +++ b/nifi-hzyadev-bundle/hzya-nifi-GetJackyunOpenData-processors/pom.xml @@ -0,0 +1,106 @@ + + + + nifi-hzyadev-bundle + com.hzya + 1.0 + + 4.0.0 + + hzya-nifi-GetJackyunOpenData-processors + + + 8 + 8 + + + + + + org.apache.nifi + nifi-api + ${nifi-revision} + + + org.apache.nifi + nifi-dbcp-service-api + ${nifi-revision} + + + org.apache.nifi + nifi-processor-utils + 1.15.3 + + + org.apache.nifi + nifi-mock + ${nifi-revision} + test + + + junit + junit + 4.13 + test + + + com.alibaba + fastjson + 1.2.83 + + + com.squareup.okhttp3 + okhttp + 4.10.0 + + + org.apache.nifi + nifi-distributed-cache-client-service-api + 1.28.1 + provided + + + org.apache.nifi + nifi-utils + 1.28.1 + provided + + + org.mongodb + mongodb-driver-sync + 4.11.1 + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/nifi-hzyadev-bundle/hzya-nifi-GetJackyunOpenData-processors/src/main/java/com/hzya/frame/GetJackyunOpenDataProcessor.java b/nifi-hzyadev-bundle/hzya-nifi-GetJackyunOpenData-processors/src/main/java/com/hzya/frame/GetJackyunOpenDataProcessor.java new file mode 100644 index 0000000..eb8f87e --- /dev/null +++ b/nifi-hzyadev-bundle/hzya-nifi-GetJackyunOpenData-processors/src/main/java/com/hzya/frame/GetJackyunOpenDataProcessor.java @@ -0,0 +1,356 @@ +package com.hzya.frame; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.hzya.frame.dto.ResultDataDto; +import com.hzya.frame.dto.ResultDto; +import com.mongodb.client.MongoClient; +import com.mongodb.client.MongoClients; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoDatabase; +import com.mongodb.client.model.Filters; +import com.mongodb.client.model.UpdateOptions; +import com.mongodb.client.model.Updates; +import okhttp3.*; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.*; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.bson.Document; + +import java.io.IOException; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.nio.charset.StandardCharsets; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.text.SimpleDateFormat; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.*; +import java.util.concurrent.TimeUnit; + + +@Tags({"吉客云", "电商", "ERP", "http", "api", "hzya", "mongodb"}) +@CapabilityDescription("从吉客云开放平台分页拉取数据。处理器使用外部MongoDB管理时间窗口和分页状态,实现自动化的全量及增量同步,并内置了签名逻辑。") +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@WritesAttributes({@WritesAttribute(attribute = "jackyun.api.method", description = "本次调用的API方法名"), @WritesAttribute(attribute = "jackyun.api.startTime", description = "本次API调用的查询开始时间"), @WritesAttribute(attribute = "jackyun.api.endTime", description = "本次API调用的查询结束时间"), @WritesAttribute(attribute = "jackyun.api.pageIndex", description = "本次API调用的数据页码")}) +public class GetJackyunOpenDataProcessor extends AbstractProcessor { + + // --- 吉客云相关属性 --- + public static final PropertyDescriptor PROP_GATEWAY_URL = new PropertyDescriptor.Builder().name("吉客云网关URL").description("吉客云开放平台的网关地址。").defaultValue("https://open.jackyun.com/open/openapi/do").required(true).addValidator(StandardValidators.URL_VALIDATOR).build(); + public static final PropertyDescriptor PROP_APP_KEY = new PropertyDescriptor.Builder().name("App Key").description("在吉客云开放平台上申请的AppKey。").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build(); + public static final PropertyDescriptor PROP_APP_SECRET = new PropertyDescriptor.Builder().name("App Secret").description("在吉客云开放平台上申请的AppSecret。").required(true).sensitive(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build(); + public static final PropertyDescriptor PROP_API_METHOD = new PropertyDescriptor.Builder().name("API方法名").description("要调用的吉客云API方法名,例如:erp.storage.goodsdocout.v2").required(true).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build(); + public static final PropertyDescriptor PROP_API_VERSION = new PropertyDescriptor.Builder().name("API版本号").description("要调用的吉客云API版本号,例如:v1.0").defaultValue("v1.0").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build(); + + // --- 数据拉取逻辑相关属性 --- + public static final PropertyDescriptor PROP_INITIAL_START_TIME = new PropertyDescriptor.Builder().name("初始同步开始时间").description("首次运行时使用的查询开始时间。格式必须为 'yyyy-MM-dd HH:mm:ss'。这是定义全量同步起点的关键。").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build(); + public static final PropertyDescriptor PROP_TIME_WINDOW_INCREMENT = new PropertyDescriptor.Builder().name("时间窗口增量").description("处理器内部循环调用API时,每个时间窗口的最大时长,用于遵守API的查询范围限制。").defaultValue("59 minutes").required(true).addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).build(); + public static final PropertyDescriptor PROP_END_TIME_SAFETY_BUFFER = new PropertyDescriptor.Builder().name("结束时间安全缓冲").description("从当前时间减去的缓冲量,以防止因服务器时钟不一致而查询未来时间。").defaultValue("5 minutes").required(true).addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).build(); + public static final PropertyDescriptor PROP_PAGE_SIZE = new PropertyDescriptor.Builder().name("每页数据条数").description("每次API请求期望返回的数据条数。").defaultValue("100").required(true).addValidator(StandardValidators.INTEGER_VALIDATOR).build(); + public static final PropertyDescriptor PROP_REQUEST_BIZ_CONTENT = new PropertyDescriptor.Builder().name("业务请求参数 (BizContent)").description("API的业务请求参数(JSON格式)。处理器会自动注入分页(pageIndex, pageSize)和时间(gmtModifiedStart, gmtModifiedEnd)参数,无需在此填写。").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build(); + public static final PropertyDescriptor PROP_START_TIME_PARAM_NAME = new PropertyDescriptor.Builder().name("开始时间参数名").description("在BizContent中用于传递查询开始时间的字段名。").defaultValue("gmtModifiedStart").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build(); + public static final PropertyDescriptor PROP_END_TIME_PARAM_NAME = new PropertyDescriptor.Builder().name("结束时间参数名").description("在BizContent中用于传递查询结束时间的字段名。").defaultValue("gmtModifiedEnd").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build(); + + // --- MongoDB 相关属性 --- + public static final PropertyDescriptor PROP_MONGO_URI = new PropertyDescriptor.Builder().name("MongoDB Connection String").description("MongoDB的连接URI,例如:mongodb://user:password@host1:27017,host2:27017").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build(); + public static final PropertyDescriptor PROP_MONGO_DATABASE = new PropertyDescriptor.Builder().name("MongoDB Database Name").description("用于存储状态的数据库名称。").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build(); + public static final PropertyDescriptor PROP_MONGO_COLLECTION = new PropertyDescriptor.Builder().name("MongoDB Collection Name").description("用于存储状态的集合名称。").defaultValue("nifi_processor_states").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build(); + public static final PropertyDescriptor PROP_STATE_KEY_NAME = new PropertyDescriptor.Builder().name("状态存储键名 (State Key)").description("在MongoDB中用于存储此任务状态的唯一键名。例如 '销售结算单同步'。这将作为文档的 _id。").required(true).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build(); + + // --- 关系定义 (Relationships) --- + public static final Relationship REL_SUCCESS = new Relationship.Builder().name("成功").description("成功从API获取并解析的数据页将路由到此关系。").build(); + public static final Relationship REL_FAILURE = new Relationship.Builder().name("失败").description("API调用失败或发生不可恢复的错误时,将触发FlowFile路由到此关系。").build(); + public static final Relationship REL_NO_DATA = new Relationship.Builder().name("无数据").description("在检查的时间范围内,API没有返回任何新数据时,将触发FlowFile路由到此关系。").build(); + + private List descriptors; + private Set relationships; + private static final DateTimeFormatter DATETIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + + private volatile OkHttpClient httpClient; + private volatile MongoClient mongoClient; + + @Override + protected void init(final ProcessorInitializationContext context) { + final List descriptors = new ArrayList<>(); + descriptors.add(PROP_GATEWAY_URL); + descriptors.add(PROP_APP_KEY); + descriptors.add(PROP_APP_SECRET); + descriptors.add(PROP_API_METHOD); + descriptors.add(PROP_API_VERSION); + descriptors.add(PROP_INITIAL_START_TIME); + descriptors.add(PROP_TIME_WINDOW_INCREMENT); + descriptors.add(PROP_END_TIME_SAFETY_BUFFER); + descriptors.add(PROP_PAGE_SIZE); + descriptors.add(PROP_REQUEST_BIZ_CONTENT); + descriptors.add(PROP_START_TIME_PARAM_NAME); + descriptors.add(PROP_END_TIME_PARAM_NAME); + // MongoDB属性 + descriptors.add(PROP_MONGO_URI); + descriptors.add(PROP_MONGO_DATABASE); + descriptors.add(PROP_MONGO_COLLECTION); + descriptors.add(PROP_STATE_KEY_NAME); + this.descriptors = Collections.unmodifiableList(descriptors); + + final Set relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_FAILURE); + relationships.add(REL_NO_DATA); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + public Set getRelationships() { + return this.relationships; + } + + @Override + public final List getSupportedPropertyDescriptors() { + return descriptors; + } + + @OnScheduled + public void onScheduled(final ProcessContext context) { + // 在处理器启动时,创建可复用的HTTP和MongoDB客户端,以利用连接池提高性能 + this.httpClient = new OkHttpClient.Builder().connectTimeout(30, TimeUnit.SECONDS).readTimeout(60, TimeUnit.SECONDS).build(); + this.mongoClient = MongoClients.create(context.getProperty(PROP_MONGO_URI).getValue()); + } + + @OnStopped + public void onStopped() { + // 在处理器停止时,安全地关闭客户端连接,释放资源 + if (this.mongoClient != null) { + this.mongoClient.close(); + } + } + + protected OkHttpClient getHttpClient() { + return this.httpClient; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + // 从属性中动态获取用户为当前任务指定的状态键 + final String stateKey = context.getProperty(PROP_STATE_KEY_NAME).evaluateAttributeExpressions().getValue(); + if (stateKey == null || stateKey.trim().isEmpty()) { + getLogger().error("状态存储键名(State Key)未配置或为空,无法进行状态管理。"); + context.yield(); + return; + } + + // 获取MongoDB集合对象 + final MongoDatabase database = mongoClient.getDatabase(context.getProperty(PROP_MONGO_DATABASE).getValue()); + final MongoCollection stateCollection = database.getCollection(context.getProperty(PROP_MONGO_COLLECTION).getValue()); + + try { + // 从属性中获取用户配置的开始和结束时间参数名 + String startTimeParamName = context.getProperty(PROP_START_TIME_PARAM_NAME).getValue(); + String endTimeParamName = context.getProperty(PROP_END_TIME_PARAM_NAME).getValue(); + + // 1. 确定需要追赶的最终目标时间 + long safetyBufferMillis = context.getProperty(PROP_END_TIME_SAFETY_BUFFER).asTimePeriod(TimeUnit.MILLISECONDS); + long catchUpTargetTimeMillis = System.currentTimeMillis() - safetyBufferMillis; + + // 2. 从MongoDB获取上次成功同步的时间点 + Document stateDoc = stateCollection.find(Filters.eq("_id", stateKey)).first(); + String lastSyncTimeStr = (stateDoc != null) ? stateDoc.getString("value") : null; + + long loopStartTimeMillis; + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + if (lastSyncTimeStr == null || lastSyncTimeStr.isEmpty()) { + // 首次运行,使用用户配置的初始时间 + loopStartTimeMillis = sdf.parse(context.getProperty(PROP_INITIAL_START_TIME).getValue()).getTime(); + getLogger().info("未在MongoDB中找到状态键 '{}' 的记录,将使用初始同步时间: {}", stateKey, sdf.format(new Date(loopStartTimeMillis))); + } else { + // 后续运行,使用上次保存的时间 + loopStartTimeMillis = sdf.parse(lastSyncTimeStr).getTime(); + } + + // 如果水位线已经超过或等于目标时间,说明已追上,无需执行 + if (loopStartTimeMillis >= catchUpTargetTimeMillis) { + getLogger().info("任务 '{}' 已同步至最新进度(水位线: {}), 无需执行本次任务。", stateKey, sdf.format(new Date(loopStartTimeMillis))); + context.yield(); // 放弃本次执行,提高效率 + return; + } + + // 3. 开始“追赶”循环,将大时间段切分为小窗口 + long timeWindowMillis = context.getProperty(PROP_TIME_WINDOW_INCREMENT).asTimePeriod(TimeUnit.MILLISECONDS); + boolean hasFetchedDataInThisRun = false; + + while (loopStartTimeMillis < catchUpTargetTimeMillis) { + // ============================ 边界值处理核心修改 START ============================ + + // 1. 定义下一个时间窗口的理论开始时间 (即当前窗口的“开”边界) + long nextWindowStartTimeMillis = loopStartTimeMillis + timeWindowMillis; + + // 2. 确定本次API调用的实际结束时间 (apiQueryEndTimeMillis) 和下次循环的开始时间 (newWatermarkMillis) + long apiQueryEndTimeMillis; + long newWatermarkMillis; + + if (nextWindowStartTimeMillis >= catchUpTargetTimeMillis) { + // 这是最后一个窗口。 + // API查询的结束时间就是最终目标时间。 + apiQueryEndTimeMillis = catchUpTargetTimeMillis; + // 新的水位线(下次循环的起点)也更新为最终目标时间。 + newWatermarkMillis = catchUpTargetTimeMillis; + } else { + // 非最后一个窗口。 + // API查询的结束时间是下一个窗口开始时间的前一秒,构成一个无重叠的闭合区间。 + // 例如,对于 [01:00:00 ... 02:00:00) 的范围,实际查询 [01:00:00, 01:59:59]。 + apiQueryEndTimeMillis = nextWindowStartTimeMillis - 1000; // 减去1000毫秒 (1秒) + // 但新的水位线必须是下一个窗口的准确开始时间,以确保连续性。 + newWatermarkMillis = nextWindowStartTimeMillis; + } + + // 3. 格式化API所需的时间字符串 + String windowStartTimeStr = sdf.format(new Date(loopStartTimeMillis)); + String windowEndTimeStr = sdf.format(new Date(apiQueryEndTimeMillis)); + + // ============================ 边界值处理核心修改 END ============================== + + getLogger().debug("任务 '{}' 准备处理时间窗口: [{} -> {}]", stateKey, windowStartTimeStr, windowEndTimeStr); + + // --- 内部分页循环 --- + int currentPage = 0; + boolean hasNextPageInWindow = true; + while (hasNextPageInWindow) { + JSONObject bizData = JSON.parseObject(context.getProperty(PROP_REQUEST_BIZ_CONTENT).getValue()); + bizData.put("pageIndex", String.valueOf(currentPage)); + bizData.put("pageSize", context.getProperty(PROP_PAGE_SIZE).getValue()); + bizData.put(startTimeParamName, windowStartTimeStr); + bizData.put(endTimeParamName, windowEndTimeStr); + + ResultDataDto resultDataDto = callJackyunApi(context, bizData); + + if (resultDataDto == null || !"200".equals(resultDataDto.getCode())) { + String errorMsg = (resultDataDto != null) ? resultDataDto.toString() : "API返回为空"; + getLogger().error("吉客云API返回业务错误: {}", errorMsg); + FlowFile failureFlowFile = session.create(); + failureFlowFile = session.write(failureFlowFile, out -> out.write(errorMsg.getBytes(StandardCharsets.UTF_8))); + session.transfer(failureFlowFile, REL_FAILURE); + return; // 发生业务错误,终止本次执行 + } + + ResultDto result = resultDataDto.getResult(); + if (result != null && result.getData() != null && !result.getData().isEmpty()) { + hasFetchedDataInThisRun = true; + getLogger().info("任务 '{}' 在时间窗 [{} -> {}] 的第 {} 页获取到 {} 条数据。", stateKey, windowStartTimeStr, windowEndTimeStr, currentPage, result.getData().size()); + + FlowFile ff = session.create(); + ff = session.putAttribute(ff, "jackyun.api.method", context.getProperty(PROP_API_METHOD).getValue()); + ff = session.putAttribute(ff, "jackyun.api.startTime", windowStartTimeStr); + ff = session.putAttribute(ff, "jackyun.api.endTime", windowEndTimeStr); + ff = session.putAttribute(ff, "jackyun.api.pageIndex", String.valueOf(currentPage)); + + ff = session.write(ff, out -> out.write(JSON.toJSONString(result.getData()).getBytes(StandardCharsets.UTF_8))); + session.transfer(ff, REL_SUCCESS); + session.commitAsync(); + + currentPage++; + // 如果返回的数据量小于每页大小,可以推断这是最后一页,但为了保险起见,继续查询直到返回空数据 + // hasNextPageInWindow = result.getData().size() == Integer.parseInt(context.getProperty(PROP_PAGE_SIZE).getValue()); + } else { + // API返回的数据为空,说明当前时间窗口已无更多数据 + hasNextPageInWindow = false; + } + } // --- 分页循环结束 --- + + // 成功处理完一个时间窗口(包括所有页),立即更新MongoDB中的状态 + String newWatermarkStr = sdf.format(new Date(newWatermarkMillis)); + stateCollection.updateOne( + Filters.eq("_id", stateKey), + Updates.set("value", newWatermarkStr), + new UpdateOptions().upsert(true) + ); + getLogger().info("任务 '{}' 的时间窗口 [{} -> {}] 数据同步完成,状态水位线更新至: {}", stateKey, windowStartTimeStr, windowEndTimeStr, newWatermarkStr); + + // 更新循环变量,为下一个时间窗口做准备 + loopStartTimeMillis = newWatermarkMillis; + } // --- 追赶循环结束 --- + + if (!hasFetchedDataInThisRun) { + getLogger().info("任务 '{}' 在本次执行中未发现新数据。", stateKey); + FlowFile noDataFlowFile = session.create(); + noDataFlowFile = session.putAttribute(noDataFlowFile, "message", "在指定的时间范围内没有查询到新数据。"); + session.transfer(noDataFlowFile, REL_NO_DATA); + } + + } catch (Exception e) { + getLogger().error("处理吉客云数据时发生未知错误: {}", e.getMessage(), e); + StringWriter sw = new StringWriter(); + e.printStackTrace(new PrintWriter(sw)); + String exceptionAsString = sw.toString(); + + FlowFile failureFlowFile = session.create(); + failureFlowFile = session.write(failureFlowFile, out -> out.write(exceptionAsString.getBytes(StandardCharsets.UTF_8))); + session.transfer(failureFlowFile, REL_FAILURE); + } + } + + private ResultDataDto callJackyunApi(ProcessContext context, JSONObject bizData) throws IOException, NoSuchAlgorithmException { + String appKey = context.getProperty(PROP_APP_KEY).getValue(); + String appSecret = context.getProperty(PROP_APP_SECRET).getValue(); + String method = context.getProperty(PROP_API_METHOD).getValue(); + String version = context.getProperty(PROP_API_VERSION).getValue(); + + SortedMap sortedMap = new TreeMap<>(); + sortedMap.put("method", method); + sortedMap.put("appkey", appKey); + sortedMap.put("version", version); + sortedMap.put("contenttype", "json"); + sortedMap.put("timestamp", DATETIME_FORMATTER.format(LocalDateTime.now())); + sortedMap.put("bizcontent", bizData.toJSONString()); + + StringBuilder sbSignData = new StringBuilder(appSecret); + for (Map.Entry entry : sortedMap.entrySet()) { + sbSignData.append(entry.getKey()).append(entry.getValue()); + } + sbSignData.append(appSecret); + + sortedMap.put("sign", md5Encrypt(sbSignData.toString().toLowerCase())); + + FormBody.Builder formBodyBuilder = new FormBody.Builder(); + for (Map.Entry entry : sortedMap.entrySet()) { + formBodyBuilder.add(entry.getKey(), entry.getValue()); + } + RequestBody formBody = formBodyBuilder.build(); + + Request request = new Request.Builder().url(context.getProperty(PROP_GATEWAY_URL).getValue()).post(formBody).build(); + + try (Response response = getHttpClient().newCall(request).execute()) { + if (!response.isSuccessful()) { + throw new IOException("非预期的HTTP响应码: " + response.code() + ", 响应体: " + response.body().string()); + } + if (response.body() != null) { + return JSON.parseObject(response.body().string(), ResultDataDto.class); + } + return null; + } + } + + private String md5Encrypt(String text) throws NoSuchAlgorithmException { + MessageDigest md5 = MessageDigest.getInstance("MD5"); + byte[] resultByte = text.getBytes(StandardCharsets.UTF_8); + byte[] md5Bytes = md5.digest(resultByte); + StringBuilder hexValue = new StringBuilder(); + for (byte md5Byte : md5Bytes) { + int val = (md5Byte) & 0xff; + if (val < 16) { + hexValue.append("0"); + } + hexValue.append(Integer.toHexString(val)); + } + return hexValue.toString(); + } +} \ No newline at end of file diff --git a/nifi-hzyadev-bundle/hzya-nifi-GetJackyunOpenData-processors/src/main/java/com/hzya/frame/dto/ResultDataDto.java b/nifi-hzyadev-bundle/hzya-nifi-GetJackyunOpenData-processors/src/main/java/com/hzya/frame/dto/ResultDataDto.java new file mode 100644 index 0000000..e5b56e8 --- /dev/null +++ b/nifi-hzyadev-bundle/hzya-nifi-GetJackyunOpenData-processors/src/main/java/com/hzya/frame/dto/ResultDataDto.java @@ -0,0 +1,53 @@ +package com.hzya.frame.dto; + +/** + * @Author:liuyang + * @Package:com.hzya.frame.dto + * @Project:nifi-hzyadev-bundle + * @name:ResultDataDto + * @Date:2025/8/27 16:15 + * @Filename:ResultDataDto + */ +public class ResultDataDto { + private String code; + private String developerInfo; + private String msg; + private ResultDto result; + + public String getCode() { + return code; + } + + public void setCode(String code) { + this.code = code; + } + + public String getDeveloperInfo() { + return developerInfo; + } + + public void setDeveloperInfo(String developerInfo) { + this.developerInfo = developerInfo; + } + + public String getMsg() { + return msg; + } + + public void setMsg(String msg) { + this.msg = msg; + } + + public ResultDto getResult() { + return result; + } + + public void setResult(ResultDto result) { + this.result = result; + } + + @Override + public String toString() { + return "ResultDataDto{" + "code='" + code + '\'' + ", developerInfo='" + developerInfo + '\'' + ", msg='" + msg + '\'' + ", result=" + result + '}'; + } +} \ No newline at end of file diff --git a/nifi-hzyadev-bundle/hzya-nifi-GetJackyunOpenData-processors/src/main/java/com/hzya/frame/dto/ResultDto.java b/nifi-hzyadev-bundle/hzya-nifi-GetJackyunOpenData-processors/src/main/java/com/hzya/frame/dto/ResultDto.java new file mode 100644 index 0000000..da902cf --- /dev/null +++ b/nifi-hzyadev-bundle/hzya-nifi-GetJackyunOpenData-processors/src/main/java/com/hzya/frame/dto/ResultDto.java @@ -0,0 +1,32 @@ +package com.hzya.frame.dto; + +import java.util.List; + +/** + * @Author:liuyang + * @Package:com.hzya.frame.dto + * @Project:nifi-hzyadev-bundle + * @name:ResultDataDto + * @Date:2025/8/27 16:15 + * @Filename:ResultDataDto + */ +public class ResultDto { + private String contextId; + private List data; + + public String getContextId() { + return contextId; + } + + public void setContextId(String contextId) { + this.contextId = contextId; + } + + public List getData() { + return data; + } + + public void setData(List data) { + this.data = data; + } +} diff --git a/nifi-hzyadev-bundle/hzya-nifi-GetJackyunOpenData-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-hzyadev-bundle/hzya-nifi-GetJackyunOpenData-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor new file mode 100644 index 0000000..2a47073 --- /dev/null +++ b/nifi-hzyadev-bundle/hzya-nifi-GetJackyunOpenData-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -0,0 +1,15 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +com.hzya.frame.GetJackyunOpenDataProcessor \ No newline at end of file diff --git a/nifi-hzyadev-bundle/hzya-nifi-GetJackyunOpenData-processors/src/test/java/com/hzya/frame/GetJackyunOpenDataProcessorTest.java b/nifi-hzyadev-bundle/hzya-nifi-GetJackyunOpenData-processors/src/test/java/com/hzya/frame/GetJackyunOpenDataProcessorTest.java new file mode 100644 index 0000000..31c3cf4 --- /dev/null +++ b/nifi-hzyadev-bundle/hzya-nifi-GetJackyunOpenData-processors/src/test/java/com/hzya/frame/GetJackyunOpenDataProcessorTest.java @@ -0,0 +1,138 @@ +package com.hzya.frame; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.hzya.frame.dto.ResultDataDto; +import com.hzya.frame.dto.ResultDto; +import okhttp3.*; +import org.apache.nifi.processor.ProcessContext; +import org.junit.Test; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.*; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.*; + +/** + * @Author:liuyang + * @Package:com.hzya.frame + * @Project:nifi-hzyadev-bundle + * @name:GetJackyunOpenDataProcessorTest + * @Date:2025/8/27 16:04 + * @Filename:GetJackyunOpenDataProcessorTest + */ +public class GetJackyunOpenDataProcessorTest { + + private static final DateTimeFormatter DATETIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + + @Test + public void test1() { + try { + Long total = 0L; + Integer pageIndex = 0; + Set ids = new HashSet<>(); + while (true) { + ResultDataDto resultDataDto = callJackyunApi(pageIndex++); + if (resultDataDto != null && resultDataDto.getResult() != null) { + ResultDto result = resultDataDto.getResult(); + if (result.getData() != null && result.getData().size() > 0) { + total += result.getData().size(); + System.out.println("查询得到数量:" + result.getData().size() + " 总数量:" + total); + List data = result.getData(); + for (int i = 0; i < data.size(); i++) { + JSONObject o = (JSONObject) data.get(i); + JSONObject tradeSettleOrder = (JSONObject) o.get("tradeSettleOrder"); + String id1 = String.valueOf(tradeSettleOrder.get("id")); + // 关键修改:检查重复并报错 + if (!ids.add(id1)) { // add() 返回 false 说明重复 + throw new IllegalStateException("发现重复 ID: " + id1); + } + } + } else { + System.out.println("查询完毕"); + break; + } + } + } + System.out.println("总数量:" + total + " set集合:" + ids.size()); + } catch (Exception e) { + e.printStackTrace(); + } + } + + private ResultDataDto callJackyunApi(Integer pageIndex) throws IOException, NoSuchAlgorithmException { + OkHttpClient httpClient = new OkHttpClient.Builder().connectTimeout(30, TimeUnit.SECONDS).readTimeout(60, TimeUnit.SECONDS).build(); + + String bizData = "{\n" + + " \"tradeNo\": \"\",\n" + + " \"settleStatus\": \"\",\n" + + " \"gmtModifiedEnd\": \"\",\n" + + " \"auditTimeBegin\": \"2025-08-27 00:00:00\",\n" + + " \"pageSize\": \"200\",\n" + + " \"gmtCreatedBegin\": \"\",\n" + + " \"billDateBegin\": \"\",\n" + + " \"settleNo\": \"\",\n" + + " \"billDateEnd\": \"\",\n" + + " \"gmtCreatedEnd\": \"\",\n" + + " \"pageIndex\": \""+pageIndex+"\",\n" + + " \"auditTimeEnd\": \"2025-08-27 18:16:00\",\n" + + " \"detailCols\": \"tradeSettleOrder.settleId, tradeSettleOrder.shopId, tradeSettleOrder.auditor, tradeSettleOrder.flagIds, tradeSettleOrder.payment, tradeSettleOrder.departId, tradeSettleOrder.settleNo, tradeSettleOrder.shopName, tradeSettleOrder.auditTime, tradeSettleOrder.auditorId, tradeSettleOrder.companyId, tradeSettleOrder.settleFee, tradeSettleOrder.settlerId, tradeSettleOrder.chargeType, tradeSettleOrder.departName, tradeSettleOrder.settleMemo, tradeSettleOrder.settleTime, tradeSettleOrder.sysFlagIds, tradeSettleOrder.accountName, tradeSettleOrder.companyName, tradeSettleOrder.settleStatus, tradeSettleOrder.chargeAccount, tradeSettleOrder.onlinePayTime, tradeSettleOrder.chargeCurrency, tradeSettleOrder.chargeCurrencyCode, tradeSettleOrder.chargeExchangeRate, tradeSettleOrder.settler, tradeSettleOrder.couponFee, tradeSettleOrder.differenceFee, tradeSettleOrder.settleTotalFee, tradeSettleOrder.settleDiscountFee, tradeSettleOrderDetailArr.subTradeId31177, tradeSettleOrderDetailArr.tradeFrom, tradeSettleOrderDetailArr.cost, tradeSettleOrderDetailArr.unit, tradeSettleOrderDetailArr.payNo, tradeSettleOrderDetailArr.cateId, tradeSettleOrderDetailArr.remark, tradeSettleOrderDetailArr.seller, tradeSettleOrderDetailArr.specId, tradeSettleOrderDetailArr.taxFee, tradeSettleOrderDetailArr.brandId, tradeSettleOrderDetailArr.goodsId, tradeSettleOrderDetailArr.goodsNo, tradeSettleOrderDetailArr.orderNo, tradeSettleOrderDetailArr.payTime, tradeSettleOrderDetailArr.payType, tradeSettleOrderDetailArr.taxRate, tradeSettleOrderDetailArr.cateName, tradeSettleOrderDetailArr.sellerId, tradeSettleOrderDetailArr.settleId, tradeSettleOrderDetailArr.specName, tradeSettleOrderDetailArr.auditTime, tradeSettleOrderDetailArr.brandName, tradeSettleOrderDetailArr.goodsName, tradeSettleOrderDetailArr.goodsTags, tradeSettleOrderDetailArr.sellCount, tradeSettleOrderDetailArr.sellPrice, tradeSettleOrderDetailArr.sellTotal, tradeSettleOrderDetailArr.tradeTime, tradeSettleOrderDetailArr.tradeType, tradeSettleOrderDetailArr.arriveTime, tradeSettleOrderDetailArr.chargeType, tradeSettleOrderDetailArr.goodsAlias, tradeSettleOrderDetailArr.logisticId, tradeSettleOrderDetailArr.mainPostid, tradeSettleOrderDetailArr.payAccount, tradeSettleOrderDetailArr.payDueDate, tradeSettleOrderDetailArr.stockoutNo, tradeSettleOrderDetailArr.accountName, tradeSettleOrderDetailArr.consignTime, tradeSettleOrderDetailArr.discountFee, tradeSettleOrderDetailArr.tradeStatus, tradeSettleOrderDetailArr.warehouseId, tradeSettleOrderDetailArr.lastShipTime, tradeSettleOrderDetailArr.logisticName, tradeSettleOrderDetailArr.logisticType, tradeSettleOrderDetailArr.chargeAccount, tradeSettleOrderDetailArr.sourceTradeNo, tradeSettleOrderDetailArr.warehouseName, tradeSettleOrderDetailArr.goodsAttribute, tradeSettleOrderDetailArr.shareFavourableAfterFee, tradeSettleOrderDetailArr.tradeNo, tradeSettleOrderDetailArr.innerSettleCost, tradeSettleOrderDetailArr.shareDiscountFee, tradeSettleOrderDetailArr.shareOtherPayment, tradeSettleOrderDetailArr.goodsReceiptAmount, tradeSettleOrderDetailArr.shareFairSellTotal, tradeSettleOrderDetailArr.shareFavourableFee, tradeSettleOrderDetailArr.otherShareFavourableFee, tradeSettleOrderDetailArr.customizeGoodsColumn1, tradeSettleOrderDetailArr.customizeGoodsColumn2, tradeSettleOrderDetailArr.customizeGoodsColumn3, tradeSettleOrderDetailArr.customizeGoodsColumn4, tradeSettleOrderDetailArr.customizeGoodsColumn5, tradeSettleOrderDetailArr.customizeGoodsColumn6, tradeSettleOrderDetailArr.customizeGoodsColumn7, tradeSettleOrderDetailArr.customizeGoodsColumn8, tradeSettleOrderDetailArr.customizeGoodsColumn9, tradeSettleOrderDetailArr.customizeGoodsColumn10, tradeSettleOrderCustomer.settleUnitId, tradeSettleOrderCustomer.qq, tradeSettleOrderCustomer.email, tradeSettleOrderCustomer.weiXin, tradeSettleOrderCustomer.wangWang, tradeSettleOrderCustomer.customerId, tradeSettleOrderCustomer.settleUnit, tradeSettleOrderCustomer.customerName, tradeSettleOrderCustomer.customerTags, tradeSettleOrderCustomer.customerType, tradeSettleOrderCustomer.customerGrade, tradeSettleOrderCustomer.customerAccount, tradeSettleOrderCustomer.customerTypeName, tradeSettleOrderCustomer.customerGradeName, tradeSettleOrderCustomer.endCustomerAccount, tradeSettleOrderColumnExt.customizeTradeColumn1, tradeSettleOrderColumnExt.customizeTradeColumn2, tradeSettleOrderColumnExt.customizeTradeColumn3, tradeSettleOrderColumnExt.customizeTradeColumn4, tradeSettleOrderColumnExt.customizeTradeColumn5, tradeSettleOrderColumnExt.customizeTradeColumn6, tradeSettleOrderColumnExt.customizeTradeColumn7, tradeSettleOrderColumnExt.customizeTradeColumn8, tradeSettleOrderColumnExt.customizeTradeColumn9, tradeSettleOrderColumnExt.customizeTradeColumn10\",\n" + + " \"gmtModifiedBegin\": \"\",\n" + + " \"hasQueryHistory\": \"\",\n" + + " \"shopId\": \"\",\n" + + " \"cols\": \"settleId,settleNo,tradeNo,sourceTradeNo,settleInvoiceStatus\"\n" + + "}"; + + SortedMap sortedMap = new TreeMap<>(); + sortedMap.put("method", "oms.open.trade.settle.allinfo"); + sortedMap.put("appkey", "12346738"); + sortedMap.put("version", "v1.0"); + sortedMap.put("contenttype", "json"); + sortedMap.put("timestamp", DATETIME_FORMATTER.format(LocalDateTime.now())); + sortedMap.put("bizcontent", bizData); + + StringBuilder sbSignData = new StringBuilder("94d715fc68214ce1ba48803b3bf19a9f"); + for (Map.Entry entry : sortedMap.entrySet()) { + sbSignData.append(entry.getKey()).append(entry.getValue()); + } + sbSignData.append("94d715fc68214ce1ba48803b3bf19a9f"); + + sortedMap.put("sign", md5Encrypt(sbSignData.toString().toLowerCase())); + + FormBody.Builder formBodyBuilder = new FormBody.Builder(); + for (Map.Entry entry : sortedMap.entrySet()) { + formBodyBuilder.add(entry.getKey(), entry.getValue()); + } + RequestBody formBody = formBodyBuilder.build(); + + Request request = new Request.Builder().url("https://open.jackyun.com/open/openapi/do").post(formBody).build(); + + try (Response response = httpClient.newCall(request).execute()) { + if (!response.isSuccessful()) { + throw new IOException("非预期的HTTP响应码: " + response.code() + ", 响应体: " + response.body().string()); + } + return JSON.parseObject(response.body().string(), ResultDataDto.class); + } + } + + private String md5Encrypt(String text) throws NoSuchAlgorithmException { + MessageDigest md5 = MessageDigest.getInstance("MD5"); + byte[] resultByte = text.getBytes(StandardCharsets.UTF_8); + byte[] md5Bytes = md5.digest(resultByte); + StringBuilder hexValue = new StringBuilder(); + for (byte md5Byte : md5Bytes) { + int val = (md5Byte) & 0xff; + if (val < 16) { + hexValue.append("0"); + } + hexValue.append(Integer.toHexString(val)); + } + return hexValue.toString(); + } +} \ No newline at end of file diff --git a/nifi-hzyadev-bundle/hzya-nifi-GetJackyunOpenData-processors/src/test/java/com/hzya/frame/GetJackyunOpenDataProcessorTest2.java b/nifi-hzyadev-bundle/hzya-nifi-GetJackyunOpenData-processors/src/test/java/com/hzya/frame/GetJackyunOpenDataProcessorTest2.java new file mode 100644 index 0000000..3057814 --- /dev/null +++ b/nifi-hzyadev-bundle/hzya-nifi-GetJackyunOpenData-processors/src/test/java/com/hzya/frame/GetJackyunOpenDataProcessorTest2.java @@ -0,0 +1,256 @@ +package com.hzya.frame; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.hzya.frame.dto.ResultDataDto; +import com.hzya.frame.dto.ResultDto; +import okhttp3.*; +import org.junit.Test; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.*; +import java.util.concurrent.TimeUnit; + +/** + * @Author:liuyang + * @Package:com.hzya.frame + * @Project:nifi-hzyadev-bundle + * @name:GetJackyunOpenDataProcessorTest + * @Date:2025/8/27 16:04 + * @Filename:GetJackyunOpenDataProcessorTest + */ +public class GetJackyunOpenDataProcessorTest2 { + + // 定义统一的时间格式化器 + private static final DateTimeFormatter DATETIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + + /** + * 新增的方法:按小时调度查询任务 + * 这个方法模拟在2025年8月27日,从 00:00:00 开始,每小时查询一次数据, + * 直到最后一个时间段覆盖到 18:16:00。 + */ + /** + * 新增的方法:按小时调度查询任务 + * 这个方法模拟在2025年8月27日,从 00:00:00 开始,每小时查询一次数据, + * 直到最后一个时间段覆盖到 18:16:00,并最后输出总计。 + */ + /** + * 修正后的方法:按小时调度查询任务 + * 通过将每个时间段的结束时间减去1秒,来避免边界数据的重复计算, + * 确保查询区间为 [start, end),实现无缝且不重叠的查询。 + */ + @Test + public void queryDataHourly() { + // 设置任务的起始和最终截止时间 + LocalDateTime taskStartTime = LocalDateTime.of(2025, 8, 27, 0, 0, 0); + LocalDateTime taskFinalEndTime = LocalDateTime.of(2025, 8, 27, 18, 16, 0); + + LocalDateTime currentQueryStartTime = taskStartTime; + + long grandTotal = 0L; + + System.out.println("开始执行按小时查询任务(已修正边界问题)..."); + + while (currentQueryStartTime.isBefore(taskFinalEndTime)) { + // 定义下一个小时的起点,作为当前区间的“开”边界 + LocalDateTime nextHourStartTime = currentQueryStartTime.plusHours(1); + + // 确定本次查询的实际结束时间 + LocalDateTime actualQueryEndTime; + + // 如果下一个小时的起点已经超过了最终截止时间,或者就是最终截止时间 + // 那么本次查询的结束时间就应该是最终截止时间,这是最后一个区间 + if (nextHourStartTime.isAfter(taskFinalEndTime) || nextHourStartTime.isEqual(taskFinalEndTime)) { + actualQueryEndTime = taskFinalEndTime; + } else { + // 否则,本次查询的结束时间是下一个小时的起点减去1秒 + // 例如,对于 [00:00:00, 01:00:00) 的区间,实际查询的是 [00:00:00, 00:59:59] + actualQueryEndTime = nextHourStartTime.minusSeconds(1); + } + + // 将时间对象格式化为字符串 + String startTimeStr = DATETIME_FORMATTER.format(currentQueryStartTime); + String endTimeStr = DATETIME_FORMATTER.format(actualQueryEndTime); + + try { + long totalCountInHour = getTotalCountForTimeRange(startTimeStr, endTimeStr); + System.out.println("时间范围: " + startTimeStr + " - " + endTimeStr + " 查询数量: " + totalCountInHour); + grandTotal += totalCountInHour; + } catch (Exception e) { + System.err.println("查询时间范围 " + startTimeStr + " - " + endTimeStr + " 时发生错误: " + e.getMessage()); + e.printStackTrace(); + } + + // 下一次循环的开始时间,直接就是下一个小时的整点 + currentQueryStartTime = nextHourStartTime; + } + + System.out.println("所有时间段查询完毕。"); + System.out.println("查询合计总数量: " + grandTotal); + } + + /** + * 辅助方法:获取指定时间范围内的总数据量(包含分页逻辑) + * @param startTimeStr 查询开始时间 (格式: "yyyy-MM-dd HH:mm:ss") + * @param endTimeStr 查询结束时间 (格式: "yyyy-MM-dd HH:mm:ss") + * @return 该时间范围内的总记录数 + * @throws IOException API请求异常 + * @throws NoSuchAlgorithmException MD5加密异常 + */ + private long getTotalCountForTimeRange(String startTimeStr, String endTimeStr) throws IOException, NoSuchAlgorithmException { + long total = 0L; + int pageIndex = 0; + // 循环调用API,直到返回的数据为空,实现分页查询 + while (true) { + // 调用API,传入分页索引和时间范围 + ResultDataDto resultDataDto = callJackyunApi(pageIndex++, startTimeStr, endTimeStr); + if (resultDataDto != null && resultDataDto.getResult() != null) { + ResultDto result = resultDataDto.getResult(); + // 如果返回的数据列表不为空且有数据 + if (result.getData() != null && !result.getData().isEmpty()) { + total += result.getData().size(); + } else { + // 如果没有数据返回,说明已经查询完所有分页,跳出循环 + break; + } + } else { + // 如果返回结果为空,也跳出循环 + break; + } + } + return total; + } + + /** + * 修改后的API调用方法 + * @param pageIndex 页码 + * @param auditTimeBegin 审核开始时间 + * @param auditTimeEnd 审核结束时间 + * @return API返回结果 + * @throws IOException API请求异常 + * @throws NoSuchAlgorithmException MD5加密异常 + */ + private ResultDataDto callJackyunApi(Integer pageIndex, String auditTimeBegin, String auditTimeEnd) throws IOException, NoSuchAlgorithmException { + OkHttpClient httpClient = new OkHttpClient.Builder() + .connectTimeout(30, TimeUnit.SECONDS) + .readTimeout(60, TimeUnit.SECONDS) + .build(); + + // 将 auditTimeBegin 和 auditTimeEnd 参数动态拼接到 bizData 中 + String bizData = "{\n" + + " \"tradeNo\": \"\",\n" + + " \"settleStatus\": \"\",\n" + + " \"gmtModifiedEnd\": \"\",\n" + + " \"auditTimeBegin\": \"" + auditTimeBegin + "\",\n" + + " \"pageSize\": \"200\",\n" + + " \"gmtCreatedBegin\": \"\",\n" + + " \"billDateBegin\": \"\",\n" + + " \"settleNo\": \"\",\n" + + " \"billDateEnd\": \"\",\n" + + " \"gmtCreatedEnd\": \"\",\n" + + " \"pageIndex\": \"" + pageIndex + "\",\n" + + " \"auditTimeEnd\": \"" + auditTimeEnd + "\",\n" + + " \"detailCols\": \"tradeSettleOrder.settleId, tradeSettleOrder.shopId, tradeSettleOrder.auditor, tradeSettleOrder.flagIds, tradeSettleOrder.payment, tradeSettleOrder.departId, tradeSettleOrder.settleNo, tradeSettleOrder.shopName, tradeSettleOrder.auditTime, tradeSettleOrder.auditorId, tradeSettleOrder.companyId, tradeSettleOrder.settleFee, tradeSettleOrder.settlerId, tradeSettleOrder.chargeType, tradeSettleOrder.departName, tradeSettleOrder.settleMemo, tradeSettleOrder.settleTime, tradeSettleOrder.sysFlagIds, tradeSettleOrder.accountName, tradeSettleOrder.companyName, tradeSettleOrder.settleStatus, tradeSettleOrder.chargeAccount, tradeSettleOrder.onlinePayTime, tradeSettleOrder.chargeCurrency, tradeSettleOrder.chargeCurrencyCode, tradeSettleOrder.chargeExchangeRate, tradeSettleOrder.settler, tradeSettleOrder.couponFee, tradeSettleOrder.differenceFee, tradeSettleOrder.settleTotalFee, tradeSettleOrder.settleDiscountFee, tradeSettleOrderDetailArr.subTradeId31177, tradeSettleOrderDetailArr.tradeFrom, tradeSettleOrderDetailArr.cost, tradeSettleOrderDetailArr.unit, tradeSettleOrderDetailArr.payNo, tradeSettleOrderDetailArr.cateId, tradeSettleOrderDetailArr.remark, tradeSettleOrderDetailArr.seller, tradeSettleOrderDetailArr.specId, tradeSettleOrderDetailArr.taxFee, tradeSettleOrderDetailArr.brandId, tradeSettleOrderDetailArr.goodsId, tradeSettleOrderDetailArr.goodsNo, tradeSettleOrderDetailArr.orderNo, tradeSettleOrderDetailArr.payTime, tradeSettleOrderDetailArr.payType, tradeSettleOrderDetailArr.taxRate, tradeSettleOrderDetailArr.cateName, tradeSettleOrderDetailArr.sellerId, tradeSettleOrderDetailArr.settleId, tradeSettleOrderDetailArr.specName, tradeSettleOrderDetailArr.auditTime, tradeSettleOrderDetailArr.brandName, tradeSettleOrderDetailArr.goodsName, tradeSettleOrderDetailArr.goodsTags, tradeSettleOrderDetailArr.sellCount, tradeSettleOrderDetailArr.sellPrice, tradeSettleOrderDetailArr.sellTotal, tradeSettleOrderDetailArr.tradeTime, tradeSettleOrderDetailArr.tradeType, tradeSettleOrderDetailArr.arriveTime, tradeSettleOrderDetailArr.chargeType, tradeSettleOrderDetailArr.goodsAlias, tradeSettleOrderDetailArr.logisticId, tradeSettleOrderDetailArr.mainPostid, tradeSettleOrderDetailArr.payAccount, tradeSettleOrderDetailArr.payDueDate, tradeSettleOrderDetailArr.stockoutNo, tradeSettleOrderDetailArr.accountName, tradeSettleOrderDetailArr.consignTime, tradeSettleOrderDetailArr.discountFee, tradeSettleOrderDetailArr.tradeStatus, tradeSettleOrderDetailArr.warehouseId, tradeSettleOrderDetailArr.lastShipTime, tradeSettleOrderDetailArr.logisticName, tradeSettleOrderDetailArr.logisticType, tradeSettleOrderDetailArr.chargeAccount, tradeSettleOrderDetailArr.sourceTradeNo, tradeSettleOrderDetailArr.warehouseName, tradeSettleOrderDetailArr.goodsAttribute, tradeSettleOrderDetailArr.shareFavourableAfterFee, tradeSettleOrderDetailArr.tradeNo, tradeSettleOrderDetailArr.innerSettleCost, tradeSettleOrderDetailArr.shareDiscountFee, tradeSettleOrderDetailArr.shareOtherPayment, tradeSettleOrderDetailArr.goodsReceiptAmount, tradeSettleOrderDetailArr.shareFairSellTotal, tradeSettleOrderDetailArr.shareFavourableFee, tradeSettleOrderDetailArr.otherShareFavourableFee, tradeSettleOrderDetailArr.customizeGoodsColumn1, tradeSettleOrderDetailArr.customizeGoodsColumn2, tradeSettleOrderDetailArr.customizeGoodsColumn3, tradeSettleOrderDetailArr.customizeGoodsColumn4, tradeSettleOrderDetailArr.customizeGoodsColumn5, tradeSettleOrderDetailArr.customizeGoodsColumn6, tradeSettleOrderDetailArr.customizeGoodsColumn7, tradeSettleOrderDetailArr.customizeGoodsColumn8, tradeSettleOrderDetailArr.customizeGoodsColumn9, tradeSettleOrderDetailArr.customizeGoodsColumn10, tradeSettleOrderCustomer.settleUnitId, tradeSettleOrderCustomer.qq, tradeSettleOrderCustomer.email, tradeSettleOrderCustomer.weiXin, tradeSettleOrderCustomer.wangWang, tradeSettleOrderCustomer.customerId, tradeSettleOrderCustomer.settleUnit, tradeSettleOrderCustomer.customerName, tradeSettleOrderCustomer.customerTags, tradeSettleOrderCustomer.customerType, tradeSettleOrderCustomer.customerGrade, tradeSettleOrderCustomer.customerAccount, tradeSettleOrderCustomer.customerTypeName, tradeSettleOrderCustomer.customerGradeName, tradeSettleOrderCustomer.endCustomerAccount, tradeSettleOrderColumnExt.customizeTradeColumn1, tradeSettleOrderColumnExt.customizeTradeColumn2, tradeSettleOrderColumnExt.customizeTradeColumn3, tradeSettleOrderColumnExt.customizeTradeColumn4, tradeSettleOrderColumnExt.customizeTradeColumn5, tradeSettleOrderColumnExt.customizeTradeColumn6, tradeSettleOrderColumnExt.customizeTradeColumn7, tradeSettleOrderColumnExt.customizeTradeColumn8, tradeSettleOrderColumnExt.customizeTradeColumn9, tradeSettleOrderColumnExt.customizeTradeColumn10\",\n" + + " \"gmtModifiedBegin\": \"\",\n" + + " \"hasQueryHistory\": \"\",\n" + + " \"shopId\": \"\",\n" + + " \"cols\": \"settleId,settleNo,tradeNo,sourceTradeNo,settleInvoiceStatus\"\n" + + "}"; + + SortedMap sortedMap = new TreeMap<>(); + sortedMap.put("method", "oms.open.trade.settle.allinfo"); + sortedMap.put("appkey", "12346738"); + sortedMap.put("version", "v1.0"); + sortedMap.put("contenttype", "json"); + sortedMap.put("timestamp", DATETIME_FORMATTER.format(LocalDateTime.now())); + sortedMap.put("bizcontent", bizData); + + StringBuilder sbSignData = new StringBuilder("94d715fc68214ce1ba48803b3bf19a9f"); + for (Map.Entry entry : sortedMap.entrySet()) { + sbSignData.append(entry.getKey()).append(entry.getValue()); + } + sbSignData.append("94d715fc68214ce1ba48803b3bf19a9f"); + + sortedMap.put("sign", md5Encrypt(sbSignData.toString().toLowerCase())); + + FormBody.Builder formBodyBuilder = new FormBody.Builder(); + for (Map.Entry entry : sortedMap.entrySet()) { + formBodyBuilder.add(entry.getKey(), entry.getValue()); + } + RequestBody formBody = formBodyBuilder.build(); + + Request request = new Request.Builder().url("https://open.jackyun.com/open/openapi/do").post(formBody).build(); + + try (Response response = httpClient.newCall(request).execute()) { + if (!response.isSuccessful()) { + throw new IOException("非预期的HTTP响应码: " + response.code() + ", 响应体: " + response.body().string()); + } + return JSON.parseObject(response.body().string(), ResultDataDto.class); + } + } + + /** + * MD5加密工具方法 + * @param text 待加密的字符串 + * @return 加密后的32位小写MD5字符串 + * @throws NoSuchAlgorithmException + */ + private String md5Encrypt(String text) throws NoSuchAlgorithmException { + MessageDigest md5 = MessageDigest.getInstance("MD5"); + byte[] resultByte = text.getBytes(StandardCharsets.UTF_8); + byte[] md5Bytes = md5.digest(resultByte); + StringBuilder hexValue = new StringBuilder(); + for (byte md5Byte : md5Bytes) { + int val = (md5Byte) & 0xff; + if (val < 16) { + hexValue.append("0"); + } + hexValue.append(Integer.toHexString(val)); + } + return hexValue.toString(); + } + + /* + * 保留您原有的 test1 方法,不做修改 + */ + @Test + public void test1() { + try { + Long total = 0L; + Integer pageIndex = 0; + Set ids = new HashSet<>(); + while (true) { + // 为了让这个旧方法能运行,我们让它调用修改后的API方法,但传入固定的时间 + ResultDataDto resultDataDto = callJackyunApi(pageIndex++, "2025-08-27 00:00:00", "2025-08-27 18:16:00"); + if (resultDataDto != null && resultDataDto.getResult() != null) { + ResultDto result = resultDataDto.getResult(); + if (result.getData() != null && result.getData().size() > 0) { + total += result.getData().size(); + System.out.println("查询得到数量:" + result.getData().size() + " 总数量:" + total); + List data = result.getData(); + for (int i = 0; i < data.size(); i++) { + JSONObject o = (JSONObject) data.get(i); + JSONObject tradeSettleOrder = (JSONObject) o.get("tradeSettleOrder"); + String id1 = String.valueOf(tradeSettleOrder.get("id")); + // 关键修改:检查重复并报错 + if (!ids.add(id1)) { // add() 返回 false 说明重复 + throw new IllegalStateException("发现重复 ID: " + id1); + } + } + } else { + System.out.println("查询完毕"); + break; + } + } + } + System.out.println("总数量:" + total + " set集合:" + ids.size()); + } catch (Exception e) { + e.printStackTrace(); + } + } +} \ No newline at end of file diff --git a/nifi-hzyadev-bundle/pom.xml b/nifi-hzyadev-bundle/pom.xml index a061eb5..27de139 100644 --- a/nifi-hzyadev-bundle/pom.xml +++ b/nifi-hzyadev-bundle/pom.xml @@ -25,6 +25,8 @@ hzya-nifi-AdvancedJoltTransformer-processors hzya-nifi-AutoAddOracleDatafile-nar hzya-nifi-AutoAddOracleDatafile-processors + hzya-nifi-GetJackyunOpenData-nar + hzya-nifi-GetJackyunOpenData-processors