feat(nifi): 添加吉客云开放数据同步处理器
- 新增 GetJackyunOpenDataProcessor 处理器,用于从吉客云开放平台分页拉取数据- 实现了自动化的全量及增量同步功能,支持自定义时间窗口和分页参数 - 集成了 MongoDB 用于存储和管理时间窗口状态 - 添加了签名验证和错误处理机制
This commit is contained in:
parent
66b6dbbf4a
commit
a214c7ed3a
|
@ -1,8 +1,7 @@
|
|||
package com.hzya.frame;
|
||||
|
||||
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||
import org.apache.nifi.processor.AbstractProcessor;
|
||||
import com.bazaarvoice.jolt.Chainr;
|
||||
import com.fasterxml.jackson.core.JsonParser;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.apache.nifi.annotation.behavior.ReadsAttribute;
|
||||
import org.apache.nifi.annotation.behavior.ReadsAttributes;
|
||||
|
@ -13,6 +12,7 @@ import org.apache.nifi.annotation.documentation.SeeAlso;
|
|||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.processor.*;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
|
@ -23,32 +23,19 @@ import org.apache.nifi.processor.util.StandardValidators;
|
|||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
/**
|
||||
* 自定义jolt转换器
|
||||
*
|
||||
* @Author:liuyang
|
||||
* @Package:com.hzya.frame
|
||||
* @Project:nifi-hzyadev-bundle
|
||||
* @name:AdvancedJoltTransformerProcessor
|
||||
* @Date:2025/7/30 10:29
|
||||
* @Filename:AdvancedJoltTransformerProcessor
|
||||
*/
|
||||
@Tags({"jolt", "json", "transform", "custom"})
|
||||
@CapabilityDescription("自定义Jolt JSON转换处理器,使用Jolt 0.1.8版本执行JSON数据转换")
|
||||
@SeeAlso({})
|
||||
@ReadsAttributes({@ReadsAttribute(attribute = "", description = "")})
|
||||
@WritesAttributes({@WritesAttribute(attribute = "", description = "")})
|
||||
public class AdvancedJoltTransformerProcessor extends AbstractProcessor {
|
||||
// 属性:Jolt转换规范
|
||||
public static final PropertyDescriptor JOLT_SPEC = new PropertyDescriptor.Builder().name("JOLT_SPEC").displayName("Jolt转换规范").description("Jolt转换规范,JSON格式。支持表达式语言。").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
|
||||
|
||||
// 关系:成功和失败
|
||||
public static final PropertyDescriptor JOLT_SPEC = new PropertyDescriptor.Builder().name("JOLT_SPEC").displayName("Jolt转换规范").description("Jolt转换规范,JSON格式。支持表达式语言和注释。").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
|
||||
|
||||
public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("转换成功的FlowFile").build();
|
||||
|
||||
public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("转换失败的FlowFile").build();
|
||||
|
||||
private List<PropertyDescriptor> descriptors;
|
||||
|
@ -57,8 +44,9 @@ public class AdvancedJoltTransformerProcessor extends AbstractProcessor {
|
|||
|
||||
@Override
|
||||
protected void init(final ProcessorInitializationContext context) {
|
||||
descriptors = Collections.singletonList(JOLT_SPEC);
|
||||
descriptors = Collections.unmodifiableList(descriptors);
|
||||
final List<PropertyDescriptor> tempDescriptors = new ArrayList<>();
|
||||
tempDescriptors.add(JOLT_SPEC);
|
||||
this.descriptors = Collections.unmodifiableList(tempDescriptors);
|
||||
|
||||
final Set<Relationship> relationships = new HashSet<>();
|
||||
relationships.add(REL_SUCCESS);
|
||||
|
@ -66,6 +54,7 @@ public class AdvancedJoltTransformerProcessor extends AbstractProcessor {
|
|||
this.relationships = Collections.unmodifiableSet(relationships);
|
||||
|
||||
this.objectMapper = new ObjectMapper();
|
||||
this.objectMapper.configure(JsonParser.Feature.ALLOW_COMMENTS, true);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -80,7 +69,7 @@ public class AdvancedJoltTransformerProcessor extends AbstractProcessor {
|
|||
|
||||
@OnScheduled
|
||||
public void onScheduled(final ProcessContext context) {
|
||||
// 处理器启动时的初始化逻辑
|
||||
// 无需特定逻辑
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -92,8 +81,6 @@ public class AdvancedJoltTransformerProcessor extends AbstractProcessor {
|
|||
|
||||
try {
|
||||
String joltSpecStr = context.getProperty(JOLT_SPEC).evaluateAttributeExpressions(flowFile).getValue();
|
||||
|
||||
// 解析Jolt规范
|
||||
Object joltSpec;
|
||||
try {
|
||||
joltSpec = objectMapper.readValue(joltSpecStr, Object.class);
|
||||
|
@ -103,9 +90,9 @@ public class AdvancedJoltTransformerProcessor extends AbstractProcessor {
|
|||
return;
|
||||
}
|
||||
|
||||
// 创建Jolt Chainr
|
||||
Chainr chainr;
|
||||
try {
|
||||
// 使用不带自定义函数的、最简单的 fromSpec 方法
|
||||
chainr = Chainr.fromSpec(joltSpec);
|
||||
} catch (Exception e) {
|
||||
getLogger().error("无法创建Jolt Chainr: {}", new Object[]{e.getMessage()}, e);
|
||||
|
@ -113,57 +100,49 @@ public class AdvancedJoltTransformerProcessor extends AbstractProcessor {
|
|||
return;
|
||||
}
|
||||
|
||||
// 读取FlowFile内容并进行转换
|
||||
final AtomicReference<Object> transformedData = new AtomicReference<>();
|
||||
final AtomicReference<Exception> exception = new AtomicReference<>();
|
||||
|
||||
session.read(flowFile, new InputStreamCallback() {
|
||||
@Override
|
||||
public void process(InputStream in) throws IOException {
|
||||
session.read(flowFile, in -> {
|
||||
try {
|
||||
// 读取输入JSON - 兼容JDK 1.8
|
||||
StringBuilder inputBuilder = new StringBuilder();
|
||||
byte[] buffer = new byte[8192];
|
||||
int bytesRead;
|
||||
while ((bytesRead = in.read(buffer)) != -1) {
|
||||
inputBuilder.append(new String(buffer, 0, bytesRead, StandardCharsets.UTF_8));
|
||||
}
|
||||
String inputJson = inputBuilder.toString();
|
||||
|
||||
// 解析输入JSON
|
||||
Object inputData = objectMapper.readValue(inputJson, Object.class);
|
||||
|
||||
// 执行Jolt转换
|
||||
Object inputData = objectMapper.readValue(in, Object.class);
|
||||
Object result = chainr.transform(inputData);
|
||||
|
||||
// --- [最终解决方案:在Java中手动转换_id类型] ---
|
||||
if (result instanceof List) {
|
||||
try {
|
||||
@SuppressWarnings("unchecked") List<Map<String, Object>> resultList = (List<Map<String, Object>>) result;
|
||||
for (Map<String, Object> record : resultList) {
|
||||
if (record.containsKey("_id")) {
|
||||
Object idValue = record.get("_id");
|
||||
if (idValue != null && !(idValue instanceof String)) {
|
||||
record.put("_id", String.valueOf(idValue));
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (ClassCastException e) {
|
||||
getLogger().warn("Jolt转换结果中包含非标准格式的元素,无法进行_id类型转换。");
|
||||
}
|
||||
}
|
||||
// --- [代码修改结束] ---
|
||||
|
||||
transformedData.set(result);
|
||||
|
||||
} catch (Exception e) {
|
||||
exception.set(e);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// 检查是否有异常
|
||||
if (exception.get() != null) {
|
||||
getLogger().error("JSON转换过程中发生错误: {}", new Object[]{exception.get().getMessage()}, exception.get());
|
||||
session.transfer(flowFile, REL_FAILURE);
|
||||
return;
|
||||
}
|
||||
|
||||
// 将转换结果写入新的FlowFile
|
||||
flowFile = session.write(flowFile, new OutputStreamCallback() {
|
||||
@Override
|
||||
public void process(OutputStream out) throws IOException {
|
||||
String resultJson = objectMapper.writeValueAsString(transformedData.get());
|
||||
out.write(resultJson.getBytes(StandardCharsets.UTF_8));
|
||||
}
|
||||
});
|
||||
|
||||
// 更新FlowFile属性
|
||||
flowFile = session.write(flowFile, out -> objectMapper.writeValue(out, transformedData.get()));
|
||||
flowFile = session.putAttribute(flowFile, "jolt.transformed", "true");
|
||||
flowFile = session.putAttribute(flowFile, "mime.type", "application/json");
|
||||
|
||||
// 转移到成功关系
|
||||
session.transfer(flowFile, REL_SUCCESS);
|
||||
getLogger().info("成功转换FlowFile {}", new Object[]{flowFile});
|
||||
|
||||
|
|
|
@ -0,0 +1,38 @@
|
|||
//package com.hzya.frame.joltext; // 请确保包名与您的项目一致
|
||||
//
|
||||
//// 必须导入 Jolt 自己的 Optional 类
|
||||
//import com.bazaarvoice.jolt.common.Optional;
|
||||
//import com.bazaarvoice.jolt.modifier.function.Function;
|
||||
//
|
||||
///**
|
||||
// * Jolt 0.1.8 的自定义函数,用于将任何输入对象转换为其字符串表示形式。
|
||||
// * 实现了 Jolt 的 Function 接口,使其可以被 Chainr 加载并在表达式中调用。
|
||||
// * 使用 @SuppressWarnings("deprecation") 来抑制关于 Function 接口已过时的警告。
|
||||
// */
|
||||
//@SuppressWarnings("deprecation")
|
||||
//public class ToString implements Function {
|
||||
//
|
||||
// /**
|
||||
// * Jolt 在执行转换时会调用此方法。
|
||||
// * @param args 从JOLT表达式中传递过来的可变参数列表
|
||||
// * @return 参数的字符串表示形式,如果参数为null或不存在,则返回Optional.empty()
|
||||
// */
|
||||
// @Override
|
||||
// public Optional<Object> apply(Object... args) {
|
||||
// // 检查是否有参数传入
|
||||
// if (args == null || args.length == 0) {
|
||||
// return Optional.empty();
|
||||
// }
|
||||
//
|
||||
// // 我们只关心第一个参数
|
||||
// Object firstArg = args[0];
|
||||
//
|
||||
// // 使用 String.valueOf() 可以安全地处理 null 和各种数据类型
|
||||
// if (firstArg == null) {
|
||||
// return Optional.empty(); // 或者 return Optional.of("") 返回空字符串
|
||||
// }
|
||||
//
|
||||
// // 返回 Jolt 自己的 Optional 包装的字符串结果
|
||||
// return Optional.of(String.valueOf(firstArg));
|
||||
// }
|
||||
//}
|
|
@ -0,0 +1,34 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<parent>
|
||||
<artifactId>nifi-hzyadev-bundle</artifactId>
|
||||
<groupId>com.hzya</groupId>
|
||||
<version>1.0</version>
|
||||
</parent>
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<artifactId>hzya-nifi-GetJackyunOpenData-nar</artifactId>
|
||||
<packaging>nar</packaging>
|
||||
|
||||
<properties>
|
||||
<maven.compiler.source>8</maven.compiler.source>
|
||||
<maven.compiler.target>8</maven.compiler.target>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>com.hzya</groupId>
|
||||
<artifactId>hzya-nifi-GetJackyunOpenData-processors</artifactId>
|
||||
<version>1.0</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-standard-services-api-nar</artifactId>
|
||||
<version>${nifi-revision}</version>
|
||||
<type>nar</type>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
</project>
|
|
@ -0,0 +1,106 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<parent>
|
||||
<artifactId>nifi-hzyadev-bundle</artifactId>
|
||||
<groupId>com.hzya</groupId>
|
||||
<version>1.0</version>
|
||||
</parent>
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<artifactId>hzya-nifi-GetJackyunOpenData-processors</artifactId>
|
||||
|
||||
<properties>
|
||||
<maven.compiler.source>8</maven.compiler.source>
|
||||
<maven.compiler.target>8</maven.compiler.target>
|
||||
<!-- <junit.jupiter.version>5.9.1</junit.jupiter.version>-->
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-api</artifactId>
|
||||
<version>${nifi-revision}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-dbcp-service-api</artifactId>
|
||||
<version>${nifi-revision}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-processor-utils</artifactId>
|
||||
<version>1.15.3</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-mock</artifactId>
|
||||
<version>${nifi-revision}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
<version>4.13</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.alibaba</groupId>
|
||||
<artifactId>fastjson</artifactId>
|
||||
<version>1.2.83</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.squareup.okhttp3</groupId>
|
||||
<artifactId>okhttp</artifactId>
|
||||
<version>4.10.0</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-distributed-cache-client-service-api</artifactId>
|
||||
<version>1.28.1</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-utils</artifactId>
|
||||
<version>1.28.1</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.mongodb</groupId>
|
||||
<artifactId>mongodb-driver-sync</artifactId>
|
||||
<version>4.11.1</version>
|
||||
</dependency>
|
||||
<!-- <dependency>-->
|
||||
<!-- <groupId>org.junit.jupiter</groupId>-->
|
||||
<!-- <artifactId>junit-jupiter-api</artifactId>-->
|
||||
<!-- <version>5.9.1</version>-->
|
||||
<!-- <scope>test</scope>-->
|
||||
<!-- </dependency>-->
|
||||
<!-- <dependency>-->
|
||||
<!-- <groupId>org.mockito</groupId>-->
|
||||
<!-- <artifactId>mockito-core</artifactId>-->
|
||||
<!-- <version>4.8.1</version>-->
|
||||
<!-- <scope>test</scope>-->
|
||||
<!-- </dependency>-->
|
||||
<!-- <dependency>-->
|
||||
<!-- <groupId>org.mockito</groupId>-->
|
||||
<!-- <artifactId>mockito-junit-jupiter</artifactId>-->
|
||||
<!-- <version>4.8.1</version>-->
|
||||
<!-- <scope>test</scope>-->
|
||||
<!-- </dependency>-->
|
||||
<!-- <dependency>-->
|
||||
<!-- <groupId>org.junit.jupiter</groupId>-->
|
||||
<!-- <artifactId>junit-jupiter-api</artifactId>-->
|
||||
<!-- <version>${junit.jupiter.version}</version>-->
|
||||
<!-- <scope>test</scope>-->
|
||||
<!-- </dependency>-->
|
||||
<!-- <dependency>-->
|
||||
<!-- <groupId>org.junit.jupiter</groupId>-->
|
||||
<!-- <artifactId>junit-jupiter-engine</artifactId>-->
|
||||
<!-- <version>${junit.jupiter.version}</version>-->
|
||||
<!-- <scope>test</scope>-->
|
||||
<!-- </dependency>-->
|
||||
</dependencies>
|
||||
</project>
|
|
@ -0,0 +1,356 @@
|
|||
package com.hzya.frame;
|
||||
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.hzya.frame.dto.ResultDataDto;
|
||||
import com.hzya.frame.dto.ResultDto;
|
||||
import com.mongodb.client.MongoClient;
|
||||
import com.mongodb.client.MongoClients;
|
||||
import com.mongodb.client.MongoCollection;
|
||||
import com.mongodb.client.MongoDatabase;
|
||||
import com.mongodb.client.model.Filters;
|
||||
import com.mongodb.client.model.UpdateOptions;
|
||||
import com.mongodb.client.model.Updates;
|
||||
import okhttp3.*;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttributes;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||
import org.apache.nifi.annotation.lifecycle.OnStopped;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.processor.*;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.bson.Document;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.PrintWriter;
|
||||
import java.io.StringWriter;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.security.MessageDigest;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
|
||||
@Tags({"吉客云", "电商", "ERP", "http", "api", "hzya", "mongodb"})
|
||||
@CapabilityDescription("从吉客云开放平台分页拉取数据。处理器使用外部MongoDB管理时间窗口和分页状态,实现自动化的全量及增量同步,并内置了签名逻辑。")
|
||||
@InputRequirement(Requirement.INPUT_FORBIDDEN)
|
||||
@WritesAttributes({@WritesAttribute(attribute = "jackyun.api.method", description = "本次调用的API方法名"), @WritesAttribute(attribute = "jackyun.api.startTime", description = "本次API调用的查询开始时间"), @WritesAttribute(attribute = "jackyun.api.endTime", description = "本次API调用的查询结束时间"), @WritesAttribute(attribute = "jackyun.api.pageIndex", description = "本次API调用的数据页码")})
|
||||
public class GetJackyunOpenDataProcessor extends AbstractProcessor {
|
||||
|
||||
// --- 吉客云相关属性 ---
|
||||
public static final PropertyDescriptor PROP_GATEWAY_URL = new PropertyDescriptor.Builder().name("吉客云网关URL").description("吉客云开放平台的网关地址。").defaultValue("https://open.jackyun.com/open/openapi/do").required(true).addValidator(StandardValidators.URL_VALIDATOR).build();
|
||||
public static final PropertyDescriptor PROP_APP_KEY = new PropertyDescriptor.Builder().name("App Key").description("在吉客云开放平台上申请的AppKey。").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
|
||||
public static final PropertyDescriptor PROP_APP_SECRET = new PropertyDescriptor.Builder().name("App Secret").description("在吉客云开放平台上申请的AppSecret。").required(true).sensitive(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
|
||||
public static final PropertyDescriptor PROP_API_METHOD = new PropertyDescriptor.Builder().name("API方法名").description("要调用的吉客云API方法名,例如:erp.storage.goodsdocout.v2").required(true).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
|
||||
public static final PropertyDescriptor PROP_API_VERSION = new PropertyDescriptor.Builder().name("API版本号").description("要调用的吉客云API版本号,例如:v1.0").defaultValue("v1.0").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
|
||||
|
||||
// --- 数据拉取逻辑相关属性 ---
|
||||
public static final PropertyDescriptor PROP_INITIAL_START_TIME = new PropertyDescriptor.Builder().name("初始同步开始时间").description("首次运行时使用的查询开始时间。格式必须为 'yyyy-MM-dd HH:mm:ss'。这是定义全量同步起点的关键。").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
|
||||
public static final PropertyDescriptor PROP_TIME_WINDOW_INCREMENT = new PropertyDescriptor.Builder().name("时间窗口增量").description("处理器内部循环调用API时,每个时间窗口的最大时长,用于遵守API的查询范围限制。").defaultValue("59 minutes").required(true).addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).build();
|
||||
public static final PropertyDescriptor PROP_END_TIME_SAFETY_BUFFER = new PropertyDescriptor.Builder().name("结束时间安全缓冲").description("从当前时间减去的缓冲量,以防止因服务器时钟不一致而查询未来时间。").defaultValue("5 minutes").required(true).addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).build();
|
||||
public static final PropertyDescriptor PROP_PAGE_SIZE = new PropertyDescriptor.Builder().name("每页数据条数").description("每次API请求期望返回的数据条数。").defaultValue("100").required(true).addValidator(StandardValidators.INTEGER_VALIDATOR).build();
|
||||
public static final PropertyDescriptor PROP_REQUEST_BIZ_CONTENT = new PropertyDescriptor.Builder().name("业务请求参数 (BizContent)").description("API的业务请求参数(JSON格式)。处理器会自动注入分页(pageIndex, pageSize)和时间(gmtModifiedStart, gmtModifiedEnd)参数,无需在此填写。").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
|
||||
public static final PropertyDescriptor PROP_START_TIME_PARAM_NAME = new PropertyDescriptor.Builder().name("开始时间参数名").description("在BizContent中用于传递查询开始时间的字段名。").defaultValue("gmtModifiedStart").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
|
||||
public static final PropertyDescriptor PROP_END_TIME_PARAM_NAME = new PropertyDescriptor.Builder().name("结束时间参数名").description("在BizContent中用于传递查询结束时间的字段名。").defaultValue("gmtModifiedEnd").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
|
||||
|
||||
// --- MongoDB 相关属性 ---
|
||||
public static final PropertyDescriptor PROP_MONGO_URI = new PropertyDescriptor.Builder().name("MongoDB Connection String").description("MongoDB的连接URI,例如:mongodb://user:password@host1:27017,host2:27017").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
|
||||
public static final PropertyDescriptor PROP_MONGO_DATABASE = new PropertyDescriptor.Builder().name("MongoDB Database Name").description("用于存储状态的数据库名称。").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
|
||||
public static final PropertyDescriptor PROP_MONGO_COLLECTION = new PropertyDescriptor.Builder().name("MongoDB Collection Name").description("用于存储状态的集合名称。").defaultValue("nifi_processor_states").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
|
||||
public static final PropertyDescriptor PROP_STATE_KEY_NAME = new PropertyDescriptor.Builder().name("状态存储键名 (State Key)").description("在MongoDB中用于存储此任务状态的唯一键名。例如 '销售结算单同步'。这将作为文档的 _id。").required(true).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
|
||||
|
||||
// --- 关系定义 (Relationships) ---
|
||||
public static final Relationship REL_SUCCESS = new Relationship.Builder().name("成功").description("成功从API获取并解析的数据页将路由到此关系。").build();
|
||||
public static final Relationship REL_FAILURE = new Relationship.Builder().name("失败").description("API调用失败或发生不可恢复的错误时,将触发FlowFile路由到此关系。").build();
|
||||
public static final Relationship REL_NO_DATA = new Relationship.Builder().name("无数据").description("在检查的时间范围内,API没有返回任何新数据时,将触发FlowFile路由到此关系。").build();
|
||||
|
||||
private List<PropertyDescriptor> descriptors;
|
||||
private Set<Relationship> relationships;
|
||||
private static final DateTimeFormatter DATETIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
|
||||
|
||||
private volatile OkHttpClient httpClient;
|
||||
private volatile MongoClient mongoClient;
|
||||
|
||||
@Override
|
||||
protected void init(final ProcessorInitializationContext context) {
|
||||
final List<PropertyDescriptor> descriptors = new ArrayList<>();
|
||||
descriptors.add(PROP_GATEWAY_URL);
|
||||
descriptors.add(PROP_APP_KEY);
|
||||
descriptors.add(PROP_APP_SECRET);
|
||||
descriptors.add(PROP_API_METHOD);
|
||||
descriptors.add(PROP_API_VERSION);
|
||||
descriptors.add(PROP_INITIAL_START_TIME);
|
||||
descriptors.add(PROP_TIME_WINDOW_INCREMENT);
|
||||
descriptors.add(PROP_END_TIME_SAFETY_BUFFER);
|
||||
descriptors.add(PROP_PAGE_SIZE);
|
||||
descriptors.add(PROP_REQUEST_BIZ_CONTENT);
|
||||
descriptors.add(PROP_START_TIME_PARAM_NAME);
|
||||
descriptors.add(PROP_END_TIME_PARAM_NAME);
|
||||
// MongoDB属性
|
||||
descriptors.add(PROP_MONGO_URI);
|
||||
descriptors.add(PROP_MONGO_DATABASE);
|
||||
descriptors.add(PROP_MONGO_COLLECTION);
|
||||
descriptors.add(PROP_STATE_KEY_NAME);
|
||||
this.descriptors = Collections.unmodifiableList(descriptors);
|
||||
|
||||
final Set<Relationship> relationships = new HashSet<>();
|
||||
relationships.add(REL_SUCCESS);
|
||||
relationships.add(REL_FAILURE);
|
||||
relationships.add(REL_NO_DATA);
|
||||
this.relationships = Collections.unmodifiableSet(relationships);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<Relationship> getRelationships() {
|
||||
return this.relationships;
|
||||
}
|
||||
|
||||
@Override
|
||||
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
return descriptors;
|
||||
}
|
||||
|
||||
@OnScheduled
|
||||
public void onScheduled(final ProcessContext context) {
|
||||
// 在处理器启动时,创建可复用的HTTP和MongoDB客户端,以利用连接池提高性能
|
||||
this.httpClient = new OkHttpClient.Builder().connectTimeout(30, TimeUnit.SECONDS).readTimeout(60, TimeUnit.SECONDS).build();
|
||||
this.mongoClient = MongoClients.create(context.getProperty(PROP_MONGO_URI).getValue());
|
||||
}
|
||||
|
||||
@OnStopped
|
||||
public void onStopped() {
|
||||
// 在处理器停止时,安全地关闭客户端连接,释放资源
|
||||
if (this.mongoClient != null) {
|
||||
this.mongoClient.close();
|
||||
}
|
||||
}
|
||||
|
||||
protected OkHttpClient getHttpClient() {
|
||||
return this.httpClient;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
|
||||
// 从属性中动态获取用户为当前任务指定的状态键
|
||||
final String stateKey = context.getProperty(PROP_STATE_KEY_NAME).evaluateAttributeExpressions().getValue();
|
||||
if (stateKey == null || stateKey.trim().isEmpty()) {
|
||||
getLogger().error("状态存储键名(State Key)未配置或为空,无法进行状态管理。");
|
||||
context.yield();
|
||||
return;
|
||||
}
|
||||
|
||||
// 获取MongoDB集合对象
|
||||
final MongoDatabase database = mongoClient.getDatabase(context.getProperty(PROP_MONGO_DATABASE).getValue());
|
||||
final MongoCollection<Document> stateCollection = database.getCollection(context.getProperty(PROP_MONGO_COLLECTION).getValue());
|
||||
|
||||
try {
|
||||
// 从属性中获取用户配置的开始和结束时间参数名
|
||||
String startTimeParamName = context.getProperty(PROP_START_TIME_PARAM_NAME).getValue();
|
||||
String endTimeParamName = context.getProperty(PROP_END_TIME_PARAM_NAME).getValue();
|
||||
|
||||
// 1. 确定需要追赶的最终目标时间
|
||||
long safetyBufferMillis = context.getProperty(PROP_END_TIME_SAFETY_BUFFER).asTimePeriod(TimeUnit.MILLISECONDS);
|
||||
long catchUpTargetTimeMillis = System.currentTimeMillis() - safetyBufferMillis;
|
||||
|
||||
// 2. 从MongoDB获取上次成功同步的时间点
|
||||
Document stateDoc = stateCollection.find(Filters.eq("_id", stateKey)).first();
|
||||
String lastSyncTimeStr = (stateDoc != null) ? stateDoc.getString("value") : null;
|
||||
|
||||
long loopStartTimeMillis;
|
||||
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
|
||||
if (lastSyncTimeStr == null || lastSyncTimeStr.isEmpty()) {
|
||||
// 首次运行,使用用户配置的初始时间
|
||||
loopStartTimeMillis = sdf.parse(context.getProperty(PROP_INITIAL_START_TIME).getValue()).getTime();
|
||||
getLogger().info("未在MongoDB中找到状态键 '{}' 的记录,将使用初始同步时间: {}", stateKey, sdf.format(new Date(loopStartTimeMillis)));
|
||||
} else {
|
||||
// 后续运行,使用上次保存的时间
|
||||
loopStartTimeMillis = sdf.parse(lastSyncTimeStr).getTime();
|
||||
}
|
||||
|
||||
// 如果水位线已经超过或等于目标时间,说明已追上,无需执行
|
||||
if (loopStartTimeMillis >= catchUpTargetTimeMillis) {
|
||||
getLogger().info("任务 '{}' 已同步至最新进度(水位线: {}), 无需执行本次任务。", stateKey, sdf.format(new Date(loopStartTimeMillis)));
|
||||
context.yield(); // 放弃本次执行,提高效率
|
||||
return;
|
||||
}
|
||||
|
||||
// 3. 开始“追赶”循环,将大时间段切分为小窗口
|
||||
long timeWindowMillis = context.getProperty(PROP_TIME_WINDOW_INCREMENT).asTimePeriod(TimeUnit.MILLISECONDS);
|
||||
boolean hasFetchedDataInThisRun = false;
|
||||
|
||||
while (loopStartTimeMillis < catchUpTargetTimeMillis) {
|
||||
// ============================ 边界值处理核心修改 START ============================
|
||||
|
||||
// 1. 定义下一个时间窗口的理论开始时间 (即当前窗口的“开”边界)
|
||||
long nextWindowStartTimeMillis = loopStartTimeMillis + timeWindowMillis;
|
||||
|
||||
// 2. 确定本次API调用的实际结束时间 (apiQueryEndTimeMillis) 和下次循环的开始时间 (newWatermarkMillis)
|
||||
long apiQueryEndTimeMillis;
|
||||
long newWatermarkMillis;
|
||||
|
||||
if (nextWindowStartTimeMillis >= catchUpTargetTimeMillis) {
|
||||
// 这是最后一个窗口。
|
||||
// API查询的结束时间就是最终目标时间。
|
||||
apiQueryEndTimeMillis = catchUpTargetTimeMillis;
|
||||
// 新的水位线(下次循环的起点)也更新为最终目标时间。
|
||||
newWatermarkMillis = catchUpTargetTimeMillis;
|
||||
} else {
|
||||
// 非最后一个窗口。
|
||||
// API查询的结束时间是下一个窗口开始时间的前一秒,构成一个无重叠的闭合区间。
|
||||
// 例如,对于 [01:00:00 ... 02:00:00) 的范围,实际查询 [01:00:00, 01:59:59]。
|
||||
apiQueryEndTimeMillis = nextWindowStartTimeMillis - 1000; // 减去1000毫秒 (1秒)
|
||||
// 但新的水位线必须是下一个窗口的准确开始时间,以确保连续性。
|
||||
newWatermarkMillis = nextWindowStartTimeMillis;
|
||||
}
|
||||
|
||||
// 3. 格式化API所需的时间字符串
|
||||
String windowStartTimeStr = sdf.format(new Date(loopStartTimeMillis));
|
||||
String windowEndTimeStr = sdf.format(new Date(apiQueryEndTimeMillis));
|
||||
|
||||
// ============================ 边界值处理核心修改 END ==============================
|
||||
|
||||
getLogger().debug("任务 '{}' 准备处理时间窗口: [{} -> {}]", stateKey, windowStartTimeStr, windowEndTimeStr);
|
||||
|
||||
// --- 内部分页循环 ---
|
||||
int currentPage = 0;
|
||||
boolean hasNextPageInWindow = true;
|
||||
while (hasNextPageInWindow) {
|
||||
JSONObject bizData = JSON.parseObject(context.getProperty(PROP_REQUEST_BIZ_CONTENT).getValue());
|
||||
bizData.put("pageIndex", String.valueOf(currentPage));
|
||||
bizData.put("pageSize", context.getProperty(PROP_PAGE_SIZE).getValue());
|
||||
bizData.put(startTimeParamName, windowStartTimeStr);
|
||||
bizData.put(endTimeParamName, windowEndTimeStr);
|
||||
|
||||
ResultDataDto resultDataDto = callJackyunApi(context, bizData);
|
||||
|
||||
if (resultDataDto == null || !"200".equals(resultDataDto.getCode())) {
|
||||
String errorMsg = (resultDataDto != null) ? resultDataDto.toString() : "API返回为空";
|
||||
getLogger().error("吉客云API返回业务错误: {}", errorMsg);
|
||||
FlowFile failureFlowFile = session.create();
|
||||
failureFlowFile = session.write(failureFlowFile, out -> out.write(errorMsg.getBytes(StandardCharsets.UTF_8)));
|
||||
session.transfer(failureFlowFile, REL_FAILURE);
|
||||
return; // 发生业务错误,终止本次执行
|
||||
}
|
||||
|
||||
ResultDto result = resultDataDto.getResult();
|
||||
if (result != null && result.getData() != null && !result.getData().isEmpty()) {
|
||||
hasFetchedDataInThisRun = true;
|
||||
getLogger().info("任务 '{}' 在时间窗 [{} -> {}] 的第 {} 页获取到 {} 条数据。", stateKey, windowStartTimeStr, windowEndTimeStr, currentPage, result.getData().size());
|
||||
|
||||
FlowFile ff = session.create();
|
||||
ff = session.putAttribute(ff, "jackyun.api.method", context.getProperty(PROP_API_METHOD).getValue());
|
||||
ff = session.putAttribute(ff, "jackyun.api.startTime", windowStartTimeStr);
|
||||
ff = session.putAttribute(ff, "jackyun.api.endTime", windowEndTimeStr);
|
||||
ff = session.putAttribute(ff, "jackyun.api.pageIndex", String.valueOf(currentPage));
|
||||
|
||||
ff = session.write(ff, out -> out.write(JSON.toJSONString(result.getData()).getBytes(StandardCharsets.UTF_8)));
|
||||
session.transfer(ff, REL_SUCCESS);
|
||||
session.commitAsync();
|
||||
|
||||
currentPage++;
|
||||
// 如果返回的数据量小于每页大小,可以推断这是最后一页,但为了保险起见,继续查询直到返回空数据
|
||||
// hasNextPageInWindow = result.getData().size() == Integer.parseInt(context.getProperty(PROP_PAGE_SIZE).getValue());
|
||||
} else {
|
||||
// API返回的数据为空,说明当前时间窗口已无更多数据
|
||||
hasNextPageInWindow = false;
|
||||
}
|
||||
} // --- 分页循环结束 ---
|
||||
|
||||
// 成功处理完一个时间窗口(包括所有页),立即更新MongoDB中的状态
|
||||
String newWatermarkStr = sdf.format(new Date(newWatermarkMillis));
|
||||
stateCollection.updateOne(
|
||||
Filters.eq("_id", stateKey),
|
||||
Updates.set("value", newWatermarkStr),
|
||||
new UpdateOptions().upsert(true)
|
||||
);
|
||||
getLogger().info("任务 '{}' 的时间窗口 [{} -> {}] 数据同步完成,状态水位线更新至: {}", stateKey, windowStartTimeStr, windowEndTimeStr, newWatermarkStr);
|
||||
|
||||
// 更新循环变量,为下一个时间窗口做准备
|
||||
loopStartTimeMillis = newWatermarkMillis;
|
||||
} // --- 追赶循环结束 ---
|
||||
|
||||
if (!hasFetchedDataInThisRun) {
|
||||
getLogger().info("任务 '{}' 在本次执行中未发现新数据。", stateKey);
|
||||
FlowFile noDataFlowFile = session.create();
|
||||
noDataFlowFile = session.putAttribute(noDataFlowFile, "message", "在指定的时间范围内没有查询到新数据。");
|
||||
session.transfer(noDataFlowFile, REL_NO_DATA);
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
getLogger().error("处理吉客云数据时发生未知错误: {}", e.getMessage(), e);
|
||||
StringWriter sw = new StringWriter();
|
||||
e.printStackTrace(new PrintWriter(sw));
|
||||
String exceptionAsString = sw.toString();
|
||||
|
||||
FlowFile failureFlowFile = session.create();
|
||||
failureFlowFile = session.write(failureFlowFile, out -> out.write(exceptionAsString.getBytes(StandardCharsets.UTF_8)));
|
||||
session.transfer(failureFlowFile, REL_FAILURE);
|
||||
}
|
||||
}
|
||||
|
||||
private ResultDataDto callJackyunApi(ProcessContext context, JSONObject bizData) throws IOException, NoSuchAlgorithmException {
|
||||
String appKey = context.getProperty(PROP_APP_KEY).getValue();
|
||||
String appSecret = context.getProperty(PROP_APP_SECRET).getValue();
|
||||
String method = context.getProperty(PROP_API_METHOD).getValue();
|
||||
String version = context.getProperty(PROP_API_VERSION).getValue();
|
||||
|
||||
SortedMap<String, String> sortedMap = new TreeMap<>();
|
||||
sortedMap.put("method", method);
|
||||
sortedMap.put("appkey", appKey);
|
||||
sortedMap.put("version", version);
|
||||
sortedMap.put("contenttype", "json");
|
||||
sortedMap.put("timestamp", DATETIME_FORMATTER.format(LocalDateTime.now()));
|
||||
sortedMap.put("bizcontent", bizData.toJSONString());
|
||||
|
||||
StringBuilder sbSignData = new StringBuilder(appSecret);
|
||||
for (Map.Entry<String, String> entry : sortedMap.entrySet()) {
|
||||
sbSignData.append(entry.getKey()).append(entry.getValue());
|
||||
}
|
||||
sbSignData.append(appSecret);
|
||||
|
||||
sortedMap.put("sign", md5Encrypt(sbSignData.toString().toLowerCase()));
|
||||
|
||||
FormBody.Builder formBodyBuilder = new FormBody.Builder();
|
||||
for (Map.Entry<String, String> entry : sortedMap.entrySet()) {
|
||||
formBodyBuilder.add(entry.getKey(), entry.getValue());
|
||||
}
|
||||
RequestBody formBody = formBodyBuilder.build();
|
||||
|
||||
Request request = new Request.Builder().url(context.getProperty(PROP_GATEWAY_URL).getValue()).post(formBody).build();
|
||||
|
||||
try (Response response = getHttpClient().newCall(request).execute()) {
|
||||
if (!response.isSuccessful()) {
|
||||
throw new IOException("非预期的HTTP响应码: " + response.code() + ", 响应体: " + response.body().string());
|
||||
}
|
||||
if (response.body() != null) {
|
||||
return JSON.parseObject(response.body().string(), ResultDataDto.class);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
private String md5Encrypt(String text) throws NoSuchAlgorithmException {
|
||||
MessageDigest md5 = MessageDigest.getInstance("MD5");
|
||||
byte[] resultByte = text.getBytes(StandardCharsets.UTF_8);
|
||||
byte[] md5Bytes = md5.digest(resultByte);
|
||||
StringBuilder hexValue = new StringBuilder();
|
||||
for (byte md5Byte : md5Bytes) {
|
||||
int val = (md5Byte) & 0xff;
|
||||
if (val < 16) {
|
||||
hexValue.append("0");
|
||||
}
|
||||
hexValue.append(Integer.toHexString(val));
|
||||
}
|
||||
return hexValue.toString();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,53 @@
|
|||
package com.hzya.frame.dto;
|
||||
|
||||
/**
|
||||
* @Author:liuyang
|
||||
* @Package:com.hzya.frame.dto
|
||||
* @Project:nifi-hzyadev-bundle
|
||||
* @name:ResultDataDto
|
||||
* @Date:2025/8/27 16:15
|
||||
* @Filename:ResultDataDto
|
||||
*/
|
||||
public class ResultDataDto {
|
||||
private String code;
|
||||
private String developerInfo;
|
||||
private String msg;
|
||||
private ResultDto result;
|
||||
|
||||
public String getCode() {
|
||||
return code;
|
||||
}
|
||||
|
||||
public void setCode(String code) {
|
||||
this.code = code;
|
||||
}
|
||||
|
||||
public String getDeveloperInfo() {
|
||||
return developerInfo;
|
||||
}
|
||||
|
||||
public void setDeveloperInfo(String developerInfo) {
|
||||
this.developerInfo = developerInfo;
|
||||
}
|
||||
|
||||
public String getMsg() {
|
||||
return msg;
|
||||
}
|
||||
|
||||
public void setMsg(String msg) {
|
||||
this.msg = msg;
|
||||
}
|
||||
|
||||
public ResultDto getResult() {
|
||||
return result;
|
||||
}
|
||||
|
||||
public void setResult(ResultDto result) {
|
||||
this.result = result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "ResultDataDto{" + "code='" + code + '\'' + ", developerInfo='" + developerInfo + '\'' + ", msg='" + msg + '\'' + ", result=" + result + '}';
|
||||
}
|
||||
}
|
|
@ -0,0 +1,32 @@
|
|||
package com.hzya.frame.dto;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @Author:liuyang
|
||||
* @Package:com.hzya.frame.dto
|
||||
* @Project:nifi-hzyadev-bundle
|
||||
* @name:ResultDataDto
|
||||
* @Date:2025/8/27 16:15
|
||||
* @Filename:ResultDataDto
|
||||
*/
|
||||
public class ResultDto {
|
||||
private String contextId;
|
||||
private List<Object> data;
|
||||
|
||||
public String getContextId() {
|
||||
return contextId;
|
||||
}
|
||||
|
||||
public void setContextId(String contextId) {
|
||||
this.contextId = contextId;
|
||||
}
|
||||
|
||||
public List<Object> getData() {
|
||||
return data;
|
||||
}
|
||||
|
||||
public void setData(List<Object> data) {
|
||||
this.data = data;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,15 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
com.hzya.frame.GetJackyunOpenDataProcessor
|
|
@ -0,0 +1,138 @@
|
|||
package com.hzya.frame;
|
||||
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.hzya.frame.dto.ResultDataDto;
|
||||
import com.hzya.frame.dto.ResultDto;
|
||||
import okhttp3.*;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.security.MessageDigest;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
/**
|
||||
* @Author:liuyang
|
||||
* @Package:com.hzya.frame
|
||||
* @Project:nifi-hzyadev-bundle
|
||||
* @name:GetJackyunOpenDataProcessorTest
|
||||
* @Date:2025/8/27 16:04
|
||||
* @Filename:GetJackyunOpenDataProcessorTest
|
||||
*/
|
||||
public class GetJackyunOpenDataProcessorTest {
|
||||
|
||||
private static final DateTimeFormatter DATETIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
|
||||
|
||||
@Test
|
||||
public void test1() {
|
||||
try {
|
||||
Long total = 0L;
|
||||
Integer pageIndex = 0;
|
||||
Set<String> ids = new HashSet<>();
|
||||
while (true) {
|
||||
ResultDataDto resultDataDto = callJackyunApi(pageIndex++);
|
||||
if (resultDataDto != null && resultDataDto.getResult() != null) {
|
||||
ResultDto result = resultDataDto.getResult();
|
||||
if (result.getData() != null && result.getData().size() > 0) {
|
||||
total += result.getData().size();
|
||||
System.out.println("查询得到数量:" + result.getData().size() + " 总数量:" + total);
|
||||
List<Object> data = result.getData();
|
||||
for (int i = 0; i < data.size(); i++) {
|
||||
JSONObject o = (JSONObject) data.get(i);
|
||||
JSONObject tradeSettleOrder = (JSONObject) o.get("tradeSettleOrder");
|
||||
String id1 = String.valueOf(tradeSettleOrder.get("id"));
|
||||
// 关键修改:检查重复并报错
|
||||
if (!ids.add(id1)) { // add() 返回 false 说明重复
|
||||
throw new IllegalStateException("发现重复 ID: " + id1);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
System.out.println("查询完毕");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
System.out.println("总数量:" + total + " set集合:" + ids.size());
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
private ResultDataDto callJackyunApi(Integer pageIndex) throws IOException, NoSuchAlgorithmException {
|
||||
OkHttpClient httpClient = new OkHttpClient.Builder().connectTimeout(30, TimeUnit.SECONDS).readTimeout(60, TimeUnit.SECONDS).build();
|
||||
|
||||
String bizData = "{\n" +
|
||||
" \"tradeNo\": \"\",\n" +
|
||||
" \"settleStatus\": \"\",\n" +
|
||||
" \"gmtModifiedEnd\": \"\",\n" +
|
||||
" \"auditTimeBegin\": \"2025-08-27 00:00:00\",\n" +
|
||||
" \"pageSize\": \"200\",\n" +
|
||||
" \"gmtCreatedBegin\": \"\",\n" +
|
||||
" \"billDateBegin\": \"\",\n" +
|
||||
" \"settleNo\": \"\",\n" +
|
||||
" \"billDateEnd\": \"\",\n" +
|
||||
" \"gmtCreatedEnd\": \"\",\n" +
|
||||
" \"pageIndex\": \""+pageIndex+"\",\n" +
|
||||
" \"auditTimeEnd\": \"2025-08-27 18:16:00\",\n" +
|
||||
" \"detailCols\": \"tradeSettleOrder.settleId, tradeSettleOrder.shopId, tradeSettleOrder.auditor, tradeSettleOrder.flagIds, tradeSettleOrder.payment, tradeSettleOrder.departId, tradeSettleOrder.settleNo, tradeSettleOrder.shopName, tradeSettleOrder.auditTime, tradeSettleOrder.auditorId, tradeSettleOrder.companyId, tradeSettleOrder.settleFee, tradeSettleOrder.settlerId, tradeSettleOrder.chargeType, tradeSettleOrder.departName, tradeSettleOrder.settleMemo, tradeSettleOrder.settleTime, tradeSettleOrder.sysFlagIds, tradeSettleOrder.accountName, tradeSettleOrder.companyName, tradeSettleOrder.settleStatus, tradeSettleOrder.chargeAccount, tradeSettleOrder.onlinePayTime, tradeSettleOrder.chargeCurrency, tradeSettleOrder.chargeCurrencyCode, tradeSettleOrder.chargeExchangeRate, tradeSettleOrder.settler, tradeSettleOrder.couponFee, tradeSettleOrder.differenceFee, tradeSettleOrder.settleTotalFee, tradeSettleOrder.settleDiscountFee, tradeSettleOrderDetailArr.subTradeId31177, tradeSettleOrderDetailArr.tradeFrom, tradeSettleOrderDetailArr.cost, tradeSettleOrderDetailArr.unit, tradeSettleOrderDetailArr.payNo, tradeSettleOrderDetailArr.cateId, tradeSettleOrderDetailArr.remark, tradeSettleOrderDetailArr.seller, tradeSettleOrderDetailArr.specId, tradeSettleOrderDetailArr.taxFee, tradeSettleOrderDetailArr.brandId, tradeSettleOrderDetailArr.goodsId, tradeSettleOrderDetailArr.goodsNo, tradeSettleOrderDetailArr.orderNo, tradeSettleOrderDetailArr.payTime, tradeSettleOrderDetailArr.payType, tradeSettleOrderDetailArr.taxRate, tradeSettleOrderDetailArr.cateName, tradeSettleOrderDetailArr.sellerId, tradeSettleOrderDetailArr.settleId, tradeSettleOrderDetailArr.specName, tradeSettleOrderDetailArr.auditTime, tradeSettleOrderDetailArr.brandName, tradeSettleOrderDetailArr.goodsName, tradeSettleOrderDetailArr.goodsTags, tradeSettleOrderDetailArr.sellCount, tradeSettleOrderDetailArr.sellPrice, tradeSettleOrderDetailArr.sellTotal, tradeSettleOrderDetailArr.tradeTime, tradeSettleOrderDetailArr.tradeType, tradeSettleOrderDetailArr.arriveTime, tradeSettleOrderDetailArr.chargeType, tradeSettleOrderDetailArr.goodsAlias, tradeSettleOrderDetailArr.logisticId, tradeSettleOrderDetailArr.mainPostid, tradeSettleOrderDetailArr.payAccount, tradeSettleOrderDetailArr.payDueDate, tradeSettleOrderDetailArr.stockoutNo, tradeSettleOrderDetailArr.accountName, tradeSettleOrderDetailArr.consignTime, tradeSettleOrderDetailArr.discountFee, tradeSettleOrderDetailArr.tradeStatus, tradeSettleOrderDetailArr.warehouseId, tradeSettleOrderDetailArr.lastShipTime, tradeSettleOrderDetailArr.logisticName, tradeSettleOrderDetailArr.logisticType, tradeSettleOrderDetailArr.chargeAccount, tradeSettleOrderDetailArr.sourceTradeNo, tradeSettleOrderDetailArr.warehouseName, tradeSettleOrderDetailArr.goodsAttribute, tradeSettleOrderDetailArr.shareFavourableAfterFee, tradeSettleOrderDetailArr.tradeNo, tradeSettleOrderDetailArr.innerSettleCost, tradeSettleOrderDetailArr.shareDiscountFee, tradeSettleOrderDetailArr.shareOtherPayment, tradeSettleOrderDetailArr.goodsReceiptAmount, tradeSettleOrderDetailArr.shareFairSellTotal, tradeSettleOrderDetailArr.shareFavourableFee, tradeSettleOrderDetailArr.otherShareFavourableFee, tradeSettleOrderDetailArr.customizeGoodsColumn1, tradeSettleOrderDetailArr.customizeGoodsColumn2, tradeSettleOrderDetailArr.customizeGoodsColumn3, tradeSettleOrderDetailArr.customizeGoodsColumn4, tradeSettleOrderDetailArr.customizeGoodsColumn5, tradeSettleOrderDetailArr.customizeGoodsColumn6, tradeSettleOrderDetailArr.customizeGoodsColumn7, tradeSettleOrderDetailArr.customizeGoodsColumn8, tradeSettleOrderDetailArr.customizeGoodsColumn9, tradeSettleOrderDetailArr.customizeGoodsColumn10, tradeSettleOrderCustomer.settleUnitId, tradeSettleOrderCustomer.qq, tradeSettleOrderCustomer.email, tradeSettleOrderCustomer.weiXin, tradeSettleOrderCustomer.wangWang, tradeSettleOrderCustomer.customerId, tradeSettleOrderCustomer.settleUnit, tradeSettleOrderCustomer.customerName, tradeSettleOrderCustomer.customerTags, tradeSettleOrderCustomer.customerType, tradeSettleOrderCustomer.customerGrade, tradeSettleOrderCustomer.customerAccount, tradeSettleOrderCustomer.customerTypeName, tradeSettleOrderCustomer.customerGradeName, tradeSettleOrderCustomer.endCustomerAccount, tradeSettleOrderColumnExt.customizeTradeColumn1, tradeSettleOrderColumnExt.customizeTradeColumn2, tradeSettleOrderColumnExt.customizeTradeColumn3, tradeSettleOrderColumnExt.customizeTradeColumn4, tradeSettleOrderColumnExt.customizeTradeColumn5, tradeSettleOrderColumnExt.customizeTradeColumn6, tradeSettleOrderColumnExt.customizeTradeColumn7, tradeSettleOrderColumnExt.customizeTradeColumn8, tradeSettleOrderColumnExt.customizeTradeColumn9, tradeSettleOrderColumnExt.customizeTradeColumn10\",\n" +
|
||||
" \"gmtModifiedBegin\": \"\",\n" +
|
||||
" \"hasQueryHistory\": \"\",\n" +
|
||||
" \"shopId\": \"\",\n" +
|
||||
" \"cols\": \"settleId,settleNo,tradeNo,sourceTradeNo,settleInvoiceStatus\"\n" +
|
||||
"}";
|
||||
|
||||
SortedMap<String, String> sortedMap = new TreeMap<>();
|
||||
sortedMap.put("method", "oms.open.trade.settle.allinfo");
|
||||
sortedMap.put("appkey", "12346738");
|
||||
sortedMap.put("version", "v1.0");
|
||||
sortedMap.put("contenttype", "json");
|
||||
sortedMap.put("timestamp", DATETIME_FORMATTER.format(LocalDateTime.now()));
|
||||
sortedMap.put("bizcontent", bizData);
|
||||
|
||||
StringBuilder sbSignData = new StringBuilder("94d715fc68214ce1ba48803b3bf19a9f");
|
||||
for (Map.Entry<String, String> entry : sortedMap.entrySet()) {
|
||||
sbSignData.append(entry.getKey()).append(entry.getValue());
|
||||
}
|
||||
sbSignData.append("94d715fc68214ce1ba48803b3bf19a9f");
|
||||
|
||||
sortedMap.put("sign", md5Encrypt(sbSignData.toString().toLowerCase()));
|
||||
|
||||
FormBody.Builder formBodyBuilder = new FormBody.Builder();
|
||||
for (Map.Entry<String, String> entry : sortedMap.entrySet()) {
|
||||
formBodyBuilder.add(entry.getKey(), entry.getValue());
|
||||
}
|
||||
RequestBody formBody = formBodyBuilder.build();
|
||||
|
||||
Request request = new Request.Builder().url("https://open.jackyun.com/open/openapi/do").post(formBody).build();
|
||||
|
||||
try (Response response = httpClient.newCall(request).execute()) {
|
||||
if (!response.isSuccessful()) {
|
||||
throw new IOException("非预期的HTTP响应码: " + response.code() + ", 响应体: " + response.body().string());
|
||||
}
|
||||
return JSON.parseObject(response.body().string(), ResultDataDto.class);
|
||||
}
|
||||
}
|
||||
|
||||
private String md5Encrypt(String text) throws NoSuchAlgorithmException {
|
||||
MessageDigest md5 = MessageDigest.getInstance("MD5");
|
||||
byte[] resultByte = text.getBytes(StandardCharsets.UTF_8);
|
||||
byte[] md5Bytes = md5.digest(resultByte);
|
||||
StringBuilder hexValue = new StringBuilder();
|
||||
for (byte md5Byte : md5Bytes) {
|
||||
int val = (md5Byte) & 0xff;
|
||||
if (val < 16) {
|
||||
hexValue.append("0");
|
||||
}
|
||||
hexValue.append(Integer.toHexString(val));
|
||||
}
|
||||
return hexValue.toString();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,256 @@
|
|||
package com.hzya.frame;
|
||||
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.hzya.frame.dto.ResultDataDto;
|
||||
import com.hzya.frame.dto.ResultDto;
|
||||
import okhttp3.*;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.security.MessageDigest;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* @Author:liuyang
|
||||
* @Package:com.hzya.frame
|
||||
* @Project:nifi-hzyadev-bundle
|
||||
* @name:GetJackyunOpenDataProcessorTest
|
||||
* @Date:2025/8/27 16:04
|
||||
* @Filename:GetJackyunOpenDataProcessorTest
|
||||
*/
|
||||
public class GetJackyunOpenDataProcessorTest2 {
|
||||
|
||||
// 定义统一的时间格式化器
|
||||
private static final DateTimeFormatter DATETIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
|
||||
|
||||
/**
|
||||
* 新增的方法:按小时调度查询任务
|
||||
* 这个方法模拟在2025年8月27日,从 00:00:00 开始,每小时查询一次数据,
|
||||
* 直到最后一个时间段覆盖到 18:16:00。
|
||||
*/
|
||||
/**
|
||||
* 新增的方法:按小时调度查询任务
|
||||
* 这个方法模拟在2025年8月27日,从 00:00:00 开始,每小时查询一次数据,
|
||||
* 直到最后一个时间段覆盖到 18:16:00,并最后输出总计。
|
||||
*/
|
||||
/**
|
||||
* 修正后的方法:按小时调度查询任务
|
||||
* 通过将每个时间段的结束时间减去1秒,来避免边界数据的重复计算,
|
||||
* 确保查询区间为 [start, end),实现无缝且不重叠的查询。
|
||||
*/
|
||||
@Test
|
||||
public void queryDataHourly() {
|
||||
// 设置任务的起始和最终截止时间
|
||||
LocalDateTime taskStartTime = LocalDateTime.of(2025, 8, 27, 0, 0, 0);
|
||||
LocalDateTime taskFinalEndTime = LocalDateTime.of(2025, 8, 27, 18, 16, 0);
|
||||
|
||||
LocalDateTime currentQueryStartTime = taskStartTime;
|
||||
|
||||
long grandTotal = 0L;
|
||||
|
||||
System.out.println("开始执行按小时查询任务(已修正边界问题)...");
|
||||
|
||||
while (currentQueryStartTime.isBefore(taskFinalEndTime)) {
|
||||
// 定义下一个小时的起点,作为当前区间的“开”边界
|
||||
LocalDateTime nextHourStartTime = currentQueryStartTime.plusHours(1);
|
||||
|
||||
// 确定本次查询的实际结束时间
|
||||
LocalDateTime actualQueryEndTime;
|
||||
|
||||
// 如果下一个小时的起点已经超过了最终截止时间,或者就是最终截止时间
|
||||
// 那么本次查询的结束时间就应该是最终截止时间,这是最后一个区间
|
||||
if (nextHourStartTime.isAfter(taskFinalEndTime) || nextHourStartTime.isEqual(taskFinalEndTime)) {
|
||||
actualQueryEndTime = taskFinalEndTime;
|
||||
} else {
|
||||
// 否则,本次查询的结束时间是下一个小时的起点减去1秒
|
||||
// 例如,对于 [00:00:00, 01:00:00) 的区间,实际查询的是 [00:00:00, 00:59:59]
|
||||
actualQueryEndTime = nextHourStartTime.minusSeconds(1);
|
||||
}
|
||||
|
||||
// 将时间对象格式化为字符串
|
||||
String startTimeStr = DATETIME_FORMATTER.format(currentQueryStartTime);
|
||||
String endTimeStr = DATETIME_FORMATTER.format(actualQueryEndTime);
|
||||
|
||||
try {
|
||||
long totalCountInHour = getTotalCountForTimeRange(startTimeStr, endTimeStr);
|
||||
System.out.println("时间范围: " + startTimeStr + " - " + endTimeStr + " 查询数量: " + totalCountInHour);
|
||||
grandTotal += totalCountInHour;
|
||||
} catch (Exception e) {
|
||||
System.err.println("查询时间范围 " + startTimeStr + " - " + endTimeStr + " 时发生错误: " + e.getMessage());
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
||||
// 下一次循环的开始时间,直接就是下一个小时的整点
|
||||
currentQueryStartTime = nextHourStartTime;
|
||||
}
|
||||
|
||||
System.out.println("所有时间段查询完毕。");
|
||||
System.out.println("查询合计总数量: " + grandTotal);
|
||||
}
|
||||
|
||||
/**
|
||||
* 辅助方法:获取指定时间范围内的总数据量(包含分页逻辑)
|
||||
* @param startTimeStr 查询开始时间 (格式: "yyyy-MM-dd HH:mm:ss")
|
||||
* @param endTimeStr 查询结束时间 (格式: "yyyy-MM-dd HH:mm:ss")
|
||||
* @return 该时间范围内的总记录数
|
||||
* @throws IOException API请求异常
|
||||
* @throws NoSuchAlgorithmException MD5加密异常
|
||||
*/
|
||||
private long getTotalCountForTimeRange(String startTimeStr, String endTimeStr) throws IOException, NoSuchAlgorithmException {
|
||||
long total = 0L;
|
||||
int pageIndex = 0;
|
||||
// 循环调用API,直到返回的数据为空,实现分页查询
|
||||
while (true) {
|
||||
// 调用API,传入分页索引和时间范围
|
||||
ResultDataDto resultDataDto = callJackyunApi(pageIndex++, startTimeStr, endTimeStr);
|
||||
if (resultDataDto != null && resultDataDto.getResult() != null) {
|
||||
ResultDto result = resultDataDto.getResult();
|
||||
// 如果返回的数据列表不为空且有数据
|
||||
if (result.getData() != null && !result.getData().isEmpty()) {
|
||||
total += result.getData().size();
|
||||
} else {
|
||||
// 如果没有数据返回,说明已经查询完所有分页,跳出循环
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
// 如果返回结果为空,也跳出循环
|
||||
break;
|
||||
}
|
||||
}
|
||||
return total;
|
||||
}
|
||||
|
||||
/**
|
||||
* 修改后的API调用方法
|
||||
* @param pageIndex 页码
|
||||
* @param auditTimeBegin 审核开始时间
|
||||
* @param auditTimeEnd 审核结束时间
|
||||
* @return API返回结果
|
||||
* @throws IOException API请求异常
|
||||
* @throws NoSuchAlgorithmException MD5加密异常
|
||||
*/
|
||||
private ResultDataDto callJackyunApi(Integer pageIndex, String auditTimeBegin, String auditTimeEnd) throws IOException, NoSuchAlgorithmException {
|
||||
OkHttpClient httpClient = new OkHttpClient.Builder()
|
||||
.connectTimeout(30, TimeUnit.SECONDS)
|
||||
.readTimeout(60, TimeUnit.SECONDS)
|
||||
.build();
|
||||
|
||||
// 将 auditTimeBegin 和 auditTimeEnd 参数动态拼接到 bizData 中
|
||||
String bizData = "{\n" +
|
||||
" \"tradeNo\": \"\",\n" +
|
||||
" \"settleStatus\": \"\",\n" +
|
||||
" \"gmtModifiedEnd\": \"\",\n" +
|
||||
" \"auditTimeBegin\": \"" + auditTimeBegin + "\",\n" +
|
||||
" \"pageSize\": \"200\",\n" +
|
||||
" \"gmtCreatedBegin\": \"\",\n" +
|
||||
" \"billDateBegin\": \"\",\n" +
|
||||
" \"settleNo\": \"\",\n" +
|
||||
" \"billDateEnd\": \"\",\n" +
|
||||
" \"gmtCreatedEnd\": \"\",\n" +
|
||||
" \"pageIndex\": \"" + pageIndex + "\",\n" +
|
||||
" \"auditTimeEnd\": \"" + auditTimeEnd + "\",\n" +
|
||||
" \"detailCols\": \"tradeSettleOrder.settleId, tradeSettleOrder.shopId, tradeSettleOrder.auditor, tradeSettleOrder.flagIds, tradeSettleOrder.payment, tradeSettleOrder.departId, tradeSettleOrder.settleNo, tradeSettleOrder.shopName, tradeSettleOrder.auditTime, tradeSettleOrder.auditorId, tradeSettleOrder.companyId, tradeSettleOrder.settleFee, tradeSettleOrder.settlerId, tradeSettleOrder.chargeType, tradeSettleOrder.departName, tradeSettleOrder.settleMemo, tradeSettleOrder.settleTime, tradeSettleOrder.sysFlagIds, tradeSettleOrder.accountName, tradeSettleOrder.companyName, tradeSettleOrder.settleStatus, tradeSettleOrder.chargeAccount, tradeSettleOrder.onlinePayTime, tradeSettleOrder.chargeCurrency, tradeSettleOrder.chargeCurrencyCode, tradeSettleOrder.chargeExchangeRate, tradeSettleOrder.settler, tradeSettleOrder.couponFee, tradeSettleOrder.differenceFee, tradeSettleOrder.settleTotalFee, tradeSettleOrder.settleDiscountFee, tradeSettleOrderDetailArr.subTradeId31177, tradeSettleOrderDetailArr.tradeFrom, tradeSettleOrderDetailArr.cost, tradeSettleOrderDetailArr.unit, tradeSettleOrderDetailArr.payNo, tradeSettleOrderDetailArr.cateId, tradeSettleOrderDetailArr.remark, tradeSettleOrderDetailArr.seller, tradeSettleOrderDetailArr.specId, tradeSettleOrderDetailArr.taxFee, tradeSettleOrderDetailArr.brandId, tradeSettleOrderDetailArr.goodsId, tradeSettleOrderDetailArr.goodsNo, tradeSettleOrderDetailArr.orderNo, tradeSettleOrderDetailArr.payTime, tradeSettleOrderDetailArr.payType, tradeSettleOrderDetailArr.taxRate, tradeSettleOrderDetailArr.cateName, tradeSettleOrderDetailArr.sellerId, tradeSettleOrderDetailArr.settleId, tradeSettleOrderDetailArr.specName, tradeSettleOrderDetailArr.auditTime, tradeSettleOrderDetailArr.brandName, tradeSettleOrderDetailArr.goodsName, tradeSettleOrderDetailArr.goodsTags, tradeSettleOrderDetailArr.sellCount, tradeSettleOrderDetailArr.sellPrice, tradeSettleOrderDetailArr.sellTotal, tradeSettleOrderDetailArr.tradeTime, tradeSettleOrderDetailArr.tradeType, tradeSettleOrderDetailArr.arriveTime, tradeSettleOrderDetailArr.chargeType, tradeSettleOrderDetailArr.goodsAlias, tradeSettleOrderDetailArr.logisticId, tradeSettleOrderDetailArr.mainPostid, tradeSettleOrderDetailArr.payAccount, tradeSettleOrderDetailArr.payDueDate, tradeSettleOrderDetailArr.stockoutNo, tradeSettleOrderDetailArr.accountName, tradeSettleOrderDetailArr.consignTime, tradeSettleOrderDetailArr.discountFee, tradeSettleOrderDetailArr.tradeStatus, tradeSettleOrderDetailArr.warehouseId, tradeSettleOrderDetailArr.lastShipTime, tradeSettleOrderDetailArr.logisticName, tradeSettleOrderDetailArr.logisticType, tradeSettleOrderDetailArr.chargeAccount, tradeSettleOrderDetailArr.sourceTradeNo, tradeSettleOrderDetailArr.warehouseName, tradeSettleOrderDetailArr.goodsAttribute, tradeSettleOrderDetailArr.shareFavourableAfterFee, tradeSettleOrderDetailArr.tradeNo, tradeSettleOrderDetailArr.innerSettleCost, tradeSettleOrderDetailArr.shareDiscountFee, tradeSettleOrderDetailArr.shareOtherPayment, tradeSettleOrderDetailArr.goodsReceiptAmount, tradeSettleOrderDetailArr.shareFairSellTotal, tradeSettleOrderDetailArr.shareFavourableFee, tradeSettleOrderDetailArr.otherShareFavourableFee, tradeSettleOrderDetailArr.customizeGoodsColumn1, tradeSettleOrderDetailArr.customizeGoodsColumn2, tradeSettleOrderDetailArr.customizeGoodsColumn3, tradeSettleOrderDetailArr.customizeGoodsColumn4, tradeSettleOrderDetailArr.customizeGoodsColumn5, tradeSettleOrderDetailArr.customizeGoodsColumn6, tradeSettleOrderDetailArr.customizeGoodsColumn7, tradeSettleOrderDetailArr.customizeGoodsColumn8, tradeSettleOrderDetailArr.customizeGoodsColumn9, tradeSettleOrderDetailArr.customizeGoodsColumn10, tradeSettleOrderCustomer.settleUnitId, tradeSettleOrderCustomer.qq, tradeSettleOrderCustomer.email, tradeSettleOrderCustomer.weiXin, tradeSettleOrderCustomer.wangWang, tradeSettleOrderCustomer.customerId, tradeSettleOrderCustomer.settleUnit, tradeSettleOrderCustomer.customerName, tradeSettleOrderCustomer.customerTags, tradeSettleOrderCustomer.customerType, tradeSettleOrderCustomer.customerGrade, tradeSettleOrderCustomer.customerAccount, tradeSettleOrderCustomer.customerTypeName, tradeSettleOrderCustomer.customerGradeName, tradeSettleOrderCustomer.endCustomerAccount, tradeSettleOrderColumnExt.customizeTradeColumn1, tradeSettleOrderColumnExt.customizeTradeColumn2, tradeSettleOrderColumnExt.customizeTradeColumn3, tradeSettleOrderColumnExt.customizeTradeColumn4, tradeSettleOrderColumnExt.customizeTradeColumn5, tradeSettleOrderColumnExt.customizeTradeColumn6, tradeSettleOrderColumnExt.customizeTradeColumn7, tradeSettleOrderColumnExt.customizeTradeColumn8, tradeSettleOrderColumnExt.customizeTradeColumn9, tradeSettleOrderColumnExt.customizeTradeColumn10\",\n" +
|
||||
" \"gmtModifiedBegin\": \"\",\n" +
|
||||
" \"hasQueryHistory\": \"\",\n" +
|
||||
" \"shopId\": \"\",\n" +
|
||||
" \"cols\": \"settleId,settleNo,tradeNo,sourceTradeNo,settleInvoiceStatus\"\n" +
|
||||
"}";
|
||||
|
||||
SortedMap<String, String> sortedMap = new TreeMap<>();
|
||||
sortedMap.put("method", "oms.open.trade.settle.allinfo");
|
||||
sortedMap.put("appkey", "12346738");
|
||||
sortedMap.put("version", "v1.0");
|
||||
sortedMap.put("contenttype", "json");
|
||||
sortedMap.put("timestamp", DATETIME_FORMATTER.format(LocalDateTime.now()));
|
||||
sortedMap.put("bizcontent", bizData);
|
||||
|
||||
StringBuilder sbSignData = new StringBuilder("94d715fc68214ce1ba48803b3bf19a9f");
|
||||
for (Map.Entry<String, String> entry : sortedMap.entrySet()) {
|
||||
sbSignData.append(entry.getKey()).append(entry.getValue());
|
||||
}
|
||||
sbSignData.append("94d715fc68214ce1ba48803b3bf19a9f");
|
||||
|
||||
sortedMap.put("sign", md5Encrypt(sbSignData.toString().toLowerCase()));
|
||||
|
||||
FormBody.Builder formBodyBuilder = new FormBody.Builder();
|
||||
for (Map.Entry<String, String> entry : sortedMap.entrySet()) {
|
||||
formBodyBuilder.add(entry.getKey(), entry.getValue());
|
||||
}
|
||||
RequestBody formBody = formBodyBuilder.build();
|
||||
|
||||
Request request = new Request.Builder().url("https://open.jackyun.com/open/openapi/do").post(formBody).build();
|
||||
|
||||
try (Response response = httpClient.newCall(request).execute()) {
|
||||
if (!response.isSuccessful()) {
|
||||
throw new IOException("非预期的HTTP响应码: " + response.code() + ", 响应体: " + response.body().string());
|
||||
}
|
||||
return JSON.parseObject(response.body().string(), ResultDataDto.class);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* MD5加密工具方法
|
||||
* @param text 待加密的字符串
|
||||
* @return 加密后的32位小写MD5字符串
|
||||
* @throws NoSuchAlgorithmException
|
||||
*/
|
||||
private String md5Encrypt(String text) throws NoSuchAlgorithmException {
|
||||
MessageDigest md5 = MessageDigest.getInstance("MD5");
|
||||
byte[] resultByte = text.getBytes(StandardCharsets.UTF_8);
|
||||
byte[] md5Bytes = md5.digest(resultByte);
|
||||
StringBuilder hexValue = new StringBuilder();
|
||||
for (byte md5Byte : md5Bytes) {
|
||||
int val = (md5Byte) & 0xff;
|
||||
if (val < 16) {
|
||||
hexValue.append("0");
|
||||
}
|
||||
hexValue.append(Integer.toHexString(val));
|
||||
}
|
||||
return hexValue.toString();
|
||||
}
|
||||
|
||||
/*
|
||||
* 保留您原有的 test1 方法,不做修改
|
||||
*/
|
||||
@Test
|
||||
public void test1() {
|
||||
try {
|
||||
Long total = 0L;
|
||||
Integer pageIndex = 0;
|
||||
Set<String> ids = new HashSet<>();
|
||||
while (true) {
|
||||
// 为了让这个旧方法能运行,我们让它调用修改后的API方法,但传入固定的时间
|
||||
ResultDataDto resultDataDto = callJackyunApi(pageIndex++, "2025-08-27 00:00:00", "2025-08-27 18:16:00");
|
||||
if (resultDataDto != null && resultDataDto.getResult() != null) {
|
||||
ResultDto result = resultDataDto.getResult();
|
||||
if (result.getData() != null && result.getData().size() > 0) {
|
||||
total += result.getData().size();
|
||||
System.out.println("查询得到数量:" + result.getData().size() + " 总数量:" + total);
|
||||
List<Object> data = result.getData();
|
||||
for (int i = 0; i < data.size(); i++) {
|
||||
JSONObject o = (JSONObject) data.get(i);
|
||||
JSONObject tradeSettleOrder = (JSONObject) o.get("tradeSettleOrder");
|
||||
String id1 = String.valueOf(tradeSettleOrder.get("id"));
|
||||
// 关键修改:检查重复并报错
|
||||
if (!ids.add(id1)) { // add() 返回 false 说明重复
|
||||
throw new IllegalStateException("发现重复 ID: " + id1);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
System.out.println("查询完毕");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
System.out.println("总数量:" + total + " set集合:" + ids.size());
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -25,6 +25,8 @@
|
|||
<module>hzya-nifi-AdvancedJoltTransformer-processors</module>
|
||||
<module>hzya-nifi-AutoAddOracleDatafile-nar</module>
|
||||
<module>hzya-nifi-AutoAddOracleDatafile-processors</module>
|
||||
<module>hzya-nifi-GetJackyunOpenData-nar</module>
|
||||
<module>hzya-nifi-GetJackyunOpenData-processors</module>
|
||||
</modules>
|
||||
|
||||
<parent>
|
||||
|
|
Loading…
Reference in New Issue