Merge remote-tracking branch 'origin/zanhuo'

# Conflicts:
#	nifi-hzyadev-bundle/pom.xml
This commit is contained in:
liuy 2025-09-12 10:16:35 +08:00
commit e512bddad2
11 changed files with 1074 additions and 61 deletions

View File

@ -1,8 +1,7 @@
package com.hzya.frame; package com.hzya.frame;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.AbstractProcessor;
import com.bazaarvoice.jolt.Chainr; import com.bazaarvoice.jolt.Chainr;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.nifi.annotation.behavior.ReadsAttribute; import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes; import org.apache.nifi.annotation.behavior.ReadsAttributes;
@ -13,42 +12,26 @@ import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.*; import org.apache.nifi.processor.*;
import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.*; import java.util.*;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
/**
* 自定义jolt转换器
*
* @Authorliuyang
* @Packagecom.hzya.frame
* @Projectnifi-hzyadev-bundle
* @nameAdvancedJoltTransformerProcessor
* @Date2025/7/30 10:29
* @FilenameAdvancedJoltTransformerProcessor
*/
@Tags({"jolt", "json", "transform", "custom"}) @Tags({"jolt", "json", "transform", "custom"})
@CapabilityDescription("自定义Jolt JSON转换处理器使用Jolt 0.1.8版本执行JSON数据转换") @CapabilityDescription("自定义Jolt JSON转换处理器使用Jolt 0.1.8版本执行JSON数据转换")
@SeeAlso({}) @SeeAlso({})
@ReadsAttributes({@ReadsAttribute(attribute = "", description = "")}) @ReadsAttributes({@ReadsAttribute(attribute = "", description = "")})
@WritesAttributes({@WritesAttribute(attribute = "", description = "")}) @WritesAttributes({@WritesAttribute(attribute = "", description = "")})
public class AdvancedJoltTransformerProcessor extends AbstractProcessor { public class AdvancedJoltTransformerProcessor extends AbstractProcessor {
// 属性Jolt转换规范
public static final PropertyDescriptor JOLT_SPEC = new PropertyDescriptor.Builder().name("JOLT_SPEC").displayName("Jolt转换规范").description("Jolt转换规范JSON格式。支持表达式语言。").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
// 关系成功和失败 public static final PropertyDescriptor JOLT_SPEC = new PropertyDescriptor.Builder().name("JOLT_SPEC").displayName("Jolt转换规范").description("Jolt转换规范JSON格式。支持表达式语言和注释。").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("转换成功的FlowFile").build(); public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("转换成功的FlowFile").build();
public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("转换失败的FlowFile").build(); public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("转换失败的FlowFile").build();
private List<PropertyDescriptor> descriptors; private List<PropertyDescriptor> descriptors;
@ -57,8 +40,9 @@ public class AdvancedJoltTransformerProcessor extends AbstractProcessor {
@Override @Override
protected void init(final ProcessorInitializationContext context) { protected void init(final ProcessorInitializationContext context) {
descriptors = Collections.singletonList(JOLT_SPEC); final List<PropertyDescriptor> tempDescriptors = new ArrayList<>();
descriptors = Collections.unmodifiableList(descriptors); tempDescriptors.add(JOLT_SPEC);
this.descriptors = Collections.unmodifiableList(tempDescriptors);
final Set<Relationship> relationships = new HashSet<>(); final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS); relationships.add(REL_SUCCESS);
@ -66,6 +50,7 @@ public class AdvancedJoltTransformerProcessor extends AbstractProcessor {
this.relationships = Collections.unmodifiableSet(relationships); this.relationships = Collections.unmodifiableSet(relationships);
this.objectMapper = new ObjectMapper(); this.objectMapper = new ObjectMapper();
this.objectMapper.configure(JsonParser.Feature.ALLOW_COMMENTS, true);
} }
@Override @Override
@ -80,7 +65,7 @@ public class AdvancedJoltTransformerProcessor extends AbstractProcessor {
@OnScheduled @OnScheduled
public void onScheduled(final ProcessContext context) { public void onScheduled(final ProcessContext context) {
// 处理器启动时的初始化逻辑 // 无需特定逻辑
} }
@Override @Override
@ -92,8 +77,6 @@ public class AdvancedJoltTransformerProcessor extends AbstractProcessor {
try { try {
String joltSpecStr = context.getProperty(JOLT_SPEC).evaluateAttributeExpressions(flowFile).getValue(); String joltSpecStr = context.getProperty(JOLT_SPEC).evaluateAttributeExpressions(flowFile).getValue();
// 解析Jolt规范
Object joltSpec; Object joltSpec;
try { try {
joltSpec = objectMapper.readValue(joltSpecStr, Object.class); joltSpec = objectMapper.readValue(joltSpecStr, Object.class);
@ -103,9 +86,9 @@ public class AdvancedJoltTransformerProcessor extends AbstractProcessor {
return; return;
} }
// 创建Jolt Chainr
Chainr chainr; Chainr chainr;
try { try {
// 使用不带自定义函数的最简单的 fromSpec 方法
chainr = Chainr.fromSpec(joltSpec); chainr = Chainr.fromSpec(joltSpec);
} catch (Exception e) { } catch (Exception e) {
getLogger().error("无法创建Jolt Chainr: {}", new Object[]{e.getMessage()}, e); getLogger().error("无法创建Jolt Chainr: {}", new Object[]{e.getMessage()}, e);
@ -113,57 +96,57 @@ public class AdvancedJoltTransformerProcessor extends AbstractProcessor {
return; return;
} }
// 读取FlowFile内容并进行转换
final AtomicReference<Object> transformedData = new AtomicReference<>(); final AtomicReference<Object> transformedData = new AtomicReference<>();
final AtomicReference<Exception> exception = new AtomicReference<>(); final AtomicReference<Exception> exception = new AtomicReference<>();
session.read(flowFile, new InputStreamCallback() { session.read(flowFile, in -> {
@Override try {
public void process(InputStream in) throws IOException { Object inputData = objectMapper.readValue(in, Object.class);
try { Object result = chainr.transform(inputData);
// 读取输入JSON - 兼容JDK 1.8
StringBuilder inputBuilder = new StringBuilder(); // --- [最终解决方案在Java中手动转换_id类型] ---
byte[] buffer = new byte[8192]; if (result instanceof LinkedHashMap) {
int bytesRead; try {
while ((bytesRead = in.read(buffer)) != -1) { @SuppressWarnings("unchecked") LinkedHashMap<String, String> stringLinkedHashMap = (LinkedHashMap<String, String>) result;
inputBuilder.append(new String(buffer, 0, bytesRead, StandardCharsets.UTF_8)); // for (LinkedHashMap<String, Object> record : stringLinkedHashMap) {
// if (record.containsKey("_id")) {
// Object idValue = record.get("_id");
// if (idValue != null && !(idValue instanceof String)) {
// record.put("_id", String.valueOf(idValue));
// }
// }
// }
for (Map.Entry<String, String> entry : stringLinkedHashMap.entrySet()) {
if (entry.getKey().equals("_id")) {
Object idValue = entry.getValue();
if (idValue != null) {
stringLinkedHashMap.put("_id", String.valueOf(idValue));
}
}
}
} catch (ClassCastException e) {
getLogger().warn("Jolt转换结果中包含非标准格式的元素无法进行_id类型转换。");
} }
String inputJson = inputBuilder.toString();
// 解析输入JSON
Object inputData = objectMapper.readValue(inputJson, Object.class);
// 执行Jolt转换
Object result = chainr.transform(inputData);
transformedData.set(result);
} catch (Exception e) {
exception.set(e);
} }
// --- [代码修改结束] ---
transformedData.set(result);
} catch (Exception e) {
exception.set(e);
} }
}); });
// 检查是否有异常
if (exception.get() != null) { if (exception.get() != null) {
getLogger().error("JSON转换过程中发生错误: {}", new Object[]{exception.get().getMessage()}, exception.get()); getLogger().error("JSON转换过程中发生错误: {}", new Object[]{exception.get().getMessage()}, exception.get());
session.transfer(flowFile, REL_FAILURE); session.transfer(flowFile, REL_FAILURE);
return; return;
} }
// 将转换结果写入新的FlowFile flowFile = session.write(flowFile, out -> objectMapper.writeValue(out, transformedData.get()));
flowFile = session.write(flowFile, new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
String resultJson = objectMapper.writeValueAsString(transformedData.get());
out.write(resultJson.getBytes(StandardCharsets.UTF_8));
}
});
// 更新FlowFile属性
flowFile = session.putAttribute(flowFile, "jolt.transformed", "true"); flowFile = session.putAttribute(flowFile, "jolt.transformed", "true");
flowFile = session.putAttribute(flowFile, "mime.type", "application/json"); flowFile = session.putAttribute(flowFile, "mime.type", "application/json");
// 转移到成功关系
session.transfer(flowFile, REL_SUCCESS); session.transfer(flowFile, REL_SUCCESS);
getLogger().info("成功转换FlowFile {}", new Object[]{flowFile}); getLogger().info("成功转换FlowFile {}", new Object[]{flowFile});

View File

@ -0,0 +1,38 @@
//package com.hzya.frame.joltext; // 请确保包名与您的项目一致
//
//// 必须导入 Jolt 自己的 Optional
//import com.bazaarvoice.jolt.common.Optional;
//import com.bazaarvoice.jolt.modifier.function.Function;
//
///**
// * Jolt 0.1.8 的自定义函数用于将任何输入对象转换为其字符串表示形式
// * 实现了 Jolt Function 接口使其可以被 Chainr 加载并在表达式中调用
// * 使用 @SuppressWarnings("deprecation") 来抑制关于 Function 接口已过时的警告
// */
//@SuppressWarnings("deprecation")
//public class ToString implements Function {
//
// /**
// * Jolt 在执行转换时会调用此方法
// * @param args 从JOLT表达式中传递过来的可变参数列表
// * @return 参数的字符串表示形式如果参数为null或不存在则返回Optional.empty()
// */
// @Override
// public Optional<Object> apply(Object... args) {
// // 检查是否有参数传入
// if (args == null || args.length == 0) {
// return Optional.empty();
// }
//
// // 我们只关心第一个参数
// Object firstArg = args[0];
//
// // 使用 String.valueOf() 可以安全地处理 null 和各种数据类型
// if (firstArg == null) {
// return Optional.empty(); // 或者 return Optional.of("") 返回空字符串
// }
//
// // 返回 Jolt 自己的 Optional 包装的字符串结果
// return Optional.of(String.valueOf(firstArg));
// }
//}

View File

@ -0,0 +1,34 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>nifi-hzyadev-bundle</artifactId>
<groupId>com.hzya</groupId>
<version>1.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>hzya-nifi-GetJackyunOpenData-nar</artifactId>
<packaging>nar</packaging>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>com.hzya</groupId>
<artifactId>hzya-nifi-GetJackyunOpenData-processors</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-services-api-nar</artifactId>
<version>${nifi-revision}</version>
<type>nar</type>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,106 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>nifi-hzyadev-bundle</artifactId>
<groupId>com.hzya</groupId>
<version>1.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>hzya-nifi-GetJackyunOpenData-processors</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<!-- <junit.jupiter.version>5.9.1</junit.jupiter.version>-->
</properties>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
<version>${nifi-revision}</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-dbcp-service-api</artifactId>
<version>${nifi-revision}</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId>
<version>1.15.3</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<version>${nifi-revision}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>4.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-distributed-cache-client-service-api</artifactId>
<version>1.28.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
<version>1.28.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-sync</artifactId>
<version>4.11.1</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>org.junit.jupiter</groupId>-->
<!-- <artifactId>junit-jupiter-api</artifactId>-->
<!-- <version>5.9.1</version>-->
<!-- <scope>test</scope>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>org.mockito</groupId>-->
<!-- <artifactId>mockito-core</artifactId>-->
<!-- <version>4.8.1</version>-->
<!-- <scope>test</scope>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>org.mockito</groupId>-->
<!-- <artifactId>mockito-junit-jupiter</artifactId>-->
<!-- <version>4.8.1</version>-->
<!-- <scope>test</scope>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>org.junit.jupiter</groupId>-->
<!-- <artifactId>junit-jupiter-api</artifactId>-->
<!-- <version>${junit.jupiter.version}</version>-->
<!-- <scope>test</scope>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>org.junit.jupiter</groupId>-->
<!-- <artifactId>junit-jupiter-engine</artifactId>-->
<!-- <version>${junit.jupiter.version}</version>-->
<!-- <scope>test</scope>-->
<!-- </dependency>-->
</dependencies>
</project>

View File

@ -0,0 +1,356 @@
package com.hzya.frame;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.hzya.frame.dto.ResultDataDto;
import com.hzya.frame.dto.ResultDto;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.model.Updates;
import okhttp3.*;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.*;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.bson.Document;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.concurrent.TimeUnit;
@Tags({"吉客云", "电商", "ERP", "http", "api", "hzya", "mongodb"})
@CapabilityDescription("从吉客云开放平台分页拉取数据。处理器使用外部MongoDB管理时间窗口和分页状态实现自动化的全量及增量同步并内置了签名逻辑。")
@InputRequirement(Requirement.INPUT_FORBIDDEN)
@WritesAttributes({@WritesAttribute(attribute = "jackyun.api.method", description = "本次调用的API方法名"), @WritesAttribute(attribute = "jackyun.api.startTime", description = "本次API调用的查询开始时间"), @WritesAttribute(attribute = "jackyun.api.endTime", description = "本次API调用的查询结束时间"), @WritesAttribute(attribute = "jackyun.api.pageIndex", description = "本次API调用的数据页码")})
public class GetJackyunOpenDataProcessor extends AbstractProcessor {
// --- 吉客云相关属性 ---
public static final PropertyDescriptor PROP_GATEWAY_URL = new PropertyDescriptor.Builder().name("吉客云网关URL").description("吉客云开放平台的网关地址。").defaultValue("https://open.jackyun.com/open/openapi/do").required(true).addValidator(StandardValidators.URL_VALIDATOR).build();
public static final PropertyDescriptor PROP_APP_KEY = new PropertyDescriptor.Builder().name("App Key").description("在吉客云开放平台上申请的AppKey。").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
public static final PropertyDescriptor PROP_APP_SECRET = new PropertyDescriptor.Builder().name("App Secret").description("在吉客云开放平台上申请的AppSecret。").required(true).sensitive(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
public static final PropertyDescriptor PROP_API_METHOD = new PropertyDescriptor.Builder().name("API方法名").description("要调用的吉客云API方法名例如erp.storage.goodsdocout.v2").required(true).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
public static final PropertyDescriptor PROP_API_VERSION = new PropertyDescriptor.Builder().name("API版本号").description("要调用的吉客云API版本号例如v1.0").defaultValue("v1.0").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
// --- 数据拉取逻辑相关属性 ---
public static final PropertyDescriptor PROP_INITIAL_START_TIME = new PropertyDescriptor.Builder().name("初始同步开始时间").description("首次运行时使用的查询开始时间。格式必须为 'yyyy-MM-dd HH:mm:ss'。这是定义全量同步起点的关键。").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
public static final PropertyDescriptor PROP_TIME_WINDOW_INCREMENT = new PropertyDescriptor.Builder().name("时间窗口增量").description("处理器内部循环调用API时每个时间窗口的最大时长用于遵守API的查询范围限制。").defaultValue("59 minutes").required(true).addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).build();
public static final PropertyDescriptor PROP_END_TIME_SAFETY_BUFFER = new PropertyDescriptor.Builder().name("结束时间安全缓冲").description("从当前时间减去的缓冲量,以防止因服务器时钟不一致而查询未来时间。").defaultValue("5 minutes").required(true).addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).build();
public static final PropertyDescriptor PROP_PAGE_SIZE = new PropertyDescriptor.Builder().name("每页数据条数").description("每次API请求期望返回的数据条数。").defaultValue("100").required(true).addValidator(StandardValidators.INTEGER_VALIDATOR).build();
public static final PropertyDescriptor PROP_REQUEST_BIZ_CONTENT = new PropertyDescriptor.Builder().name("业务请求参数 (BizContent)").description("API的业务请求参数(JSON格式)。处理器会自动注入分页(pageIndex, pageSize)和时间(gmtModifiedStart, gmtModifiedEnd)参数,无需在此填写。").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
public static final PropertyDescriptor PROP_START_TIME_PARAM_NAME = new PropertyDescriptor.Builder().name("开始时间参数名").description("在BizContent中用于传递查询开始时间的字段名。").defaultValue("gmtModifiedStart").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
public static final PropertyDescriptor PROP_END_TIME_PARAM_NAME = new PropertyDescriptor.Builder().name("结束时间参数名").description("在BizContent中用于传递查询结束时间的字段名。").defaultValue("gmtModifiedEnd").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
// --- MongoDB 相关属性 ---
public static final PropertyDescriptor PROP_MONGO_URI = new PropertyDescriptor.Builder().name("MongoDB Connection String").description("MongoDB的连接URI例如mongodb://user:password@host1:27017,host2:27017").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
public static final PropertyDescriptor PROP_MONGO_DATABASE = new PropertyDescriptor.Builder().name("MongoDB Database Name").description("用于存储状态的数据库名称。").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
public static final PropertyDescriptor PROP_MONGO_COLLECTION = new PropertyDescriptor.Builder().name("MongoDB Collection Name").description("用于存储状态的集合名称。").defaultValue("nifi_processor_states").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
public static final PropertyDescriptor PROP_STATE_KEY_NAME = new PropertyDescriptor.Builder().name("状态存储键名 (State Key)").description("在MongoDB中用于存储此任务状态的唯一键名。例如 '销售结算单同步'。这将作为文档的 _id。").required(true).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
// --- 关系定义 (Relationships) ---
public static final Relationship REL_SUCCESS = new Relationship.Builder().name("成功").description("成功从API获取并解析的数据页将路由到此关系。").build();
public static final Relationship REL_FAILURE = new Relationship.Builder().name("失败").description("API调用失败或发生不可恢复的错误时将触发FlowFile路由到此关系。").build();
public static final Relationship REL_NO_DATA = new Relationship.Builder().name("无数据").description("在检查的时间范围内API没有返回任何新数据时将触发FlowFile路由到此关系。").build();
private List<PropertyDescriptor> descriptors;
private Set<Relationship> relationships;
private static final DateTimeFormatter DATETIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
private volatile OkHttpClient httpClient;
private volatile MongoClient mongoClient;
@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(PROP_GATEWAY_URL);
descriptors.add(PROP_APP_KEY);
descriptors.add(PROP_APP_SECRET);
descriptors.add(PROP_API_METHOD);
descriptors.add(PROP_API_VERSION);
descriptors.add(PROP_INITIAL_START_TIME);
descriptors.add(PROP_TIME_WINDOW_INCREMENT);
descriptors.add(PROP_END_TIME_SAFETY_BUFFER);
descriptors.add(PROP_PAGE_SIZE);
descriptors.add(PROP_REQUEST_BIZ_CONTENT);
descriptors.add(PROP_START_TIME_PARAM_NAME);
descriptors.add(PROP_END_TIME_PARAM_NAME);
// MongoDB属性
descriptors.add(PROP_MONGO_URI);
descriptors.add(PROP_MONGO_DATABASE);
descriptors.add(PROP_MONGO_COLLECTION);
descriptors.add(PROP_STATE_KEY_NAME);
this.descriptors = Collections.unmodifiableList(descriptors);
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
relationships.add(REL_FAILURE);
relationships.add(REL_NO_DATA);
this.relationships = Collections.unmodifiableSet(relationships);
}
@Override
public Set<Relationship> getRelationships() {
return this.relationships;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
}
@OnScheduled
public void onScheduled(final ProcessContext context) {
// 在处理器启动时创建可复用的HTTP和MongoDB客户端以利用连接池提高性能
this.httpClient = new OkHttpClient.Builder().connectTimeout(30, TimeUnit.SECONDS).readTimeout(60, TimeUnit.SECONDS).build();
this.mongoClient = MongoClients.create(context.getProperty(PROP_MONGO_URI).getValue());
}
@OnStopped
public void onStopped() {
// 在处理器停止时安全地关闭客户端连接释放资源
if (this.mongoClient != null) {
this.mongoClient.close();
}
}
protected OkHttpClient getHttpClient() {
return this.httpClient;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
// 从属性中动态获取用户为当前任务指定的状态键
final String stateKey = context.getProperty(PROP_STATE_KEY_NAME).evaluateAttributeExpressions().getValue();
if (stateKey == null || stateKey.trim().isEmpty()) {
getLogger().error("状态存储键名(State Key)未配置或为空,无法进行状态管理。");
context.yield();
return;
}
// 获取MongoDB集合对象
final MongoDatabase database = mongoClient.getDatabase(context.getProperty(PROP_MONGO_DATABASE).getValue());
final MongoCollection<Document> stateCollection = database.getCollection(context.getProperty(PROP_MONGO_COLLECTION).getValue());
try {
// 从属性中获取用户配置的开始和结束时间参数名
String startTimeParamName = context.getProperty(PROP_START_TIME_PARAM_NAME).getValue();
String endTimeParamName = context.getProperty(PROP_END_TIME_PARAM_NAME).getValue();
// 1. 确定需要追赶的最终目标时间
long safetyBufferMillis = context.getProperty(PROP_END_TIME_SAFETY_BUFFER).asTimePeriod(TimeUnit.MILLISECONDS);
long catchUpTargetTimeMillis = System.currentTimeMillis() - safetyBufferMillis;
// 2. 从MongoDB获取上次成功同步的时间点
Document stateDoc = stateCollection.find(Filters.eq("_id", stateKey)).first();
String lastSyncTimeStr = (stateDoc != null) ? stateDoc.getString("value") : null;
long loopStartTimeMillis;
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
if (lastSyncTimeStr == null || lastSyncTimeStr.isEmpty()) {
// 首次运行使用用户配置的初始时间
loopStartTimeMillis = sdf.parse(context.getProperty(PROP_INITIAL_START_TIME).getValue()).getTime();
getLogger().info("未在MongoDB中找到状态键 '{}' 的记录,将使用初始同步时间: {}", stateKey, sdf.format(new Date(loopStartTimeMillis)));
} else {
// 后续运行使用上次保存的时间
loopStartTimeMillis = sdf.parse(lastSyncTimeStr).getTime();
}
// 如果水位线已经超过或等于目标时间说明已追上无需执行
if (loopStartTimeMillis >= catchUpTargetTimeMillis) {
getLogger().info("任务 '{}' 已同步至最新进度(水位线: {}), 无需执行本次任务。", stateKey, sdf.format(new Date(loopStartTimeMillis)));
context.yield(); // 放弃本次执行提高效率
return;
}
// 3. 开始追赶循环将大时间段切分为小窗口
long timeWindowMillis = context.getProperty(PROP_TIME_WINDOW_INCREMENT).asTimePeriod(TimeUnit.MILLISECONDS);
boolean hasFetchedDataInThisRun = false;
while (loopStartTimeMillis < catchUpTargetTimeMillis) {
// ============================ 边界值处理核心修改 START ============================
// 1. 定义下一个时间窗口的理论开始时间 (即当前窗口的边界)
long nextWindowStartTimeMillis = loopStartTimeMillis + timeWindowMillis;
// 2. 确定本次API调用的实际结束时间 (apiQueryEndTimeMillis) 和下次循环的开始时间 (newWatermarkMillis)
long apiQueryEndTimeMillis;
long newWatermarkMillis;
if (nextWindowStartTimeMillis >= catchUpTargetTimeMillis) {
// 这是最后一个窗口
// API查询的结束时间就是最终目标时间
apiQueryEndTimeMillis = catchUpTargetTimeMillis;
// 新的水位线(下次循环的起点)也更新为最终目标时间
newWatermarkMillis = catchUpTargetTimeMillis;
} else {
// 非最后一个窗口
// API查询的结束时间是下一个窗口开始时间的前一秒构成一个无重叠的闭合区间
// 例如对于 [01:00:00 ... 02:00:00) 的范围实际查询 [01:00:00, 01:59:59]
apiQueryEndTimeMillis = nextWindowStartTimeMillis - 1000; // 减去1000毫秒 (1秒)
// 但新的水位线必须是下一个窗口的准确开始时间以确保连续性
newWatermarkMillis = nextWindowStartTimeMillis;
}
// 3. 格式化API所需的时间字符串
String windowStartTimeStr = sdf.format(new Date(loopStartTimeMillis));
String windowEndTimeStr = sdf.format(new Date(apiQueryEndTimeMillis));
// ============================ 边界值处理核心修改 END ==============================
getLogger().debug("任务 '{}' 准备处理时间窗口: [{} -> {}]", stateKey, windowStartTimeStr, windowEndTimeStr);
// --- 内部分页循环 ---
int currentPage = 0;
boolean hasNextPageInWindow = true;
while (hasNextPageInWindow) {
JSONObject bizData = JSON.parseObject(context.getProperty(PROP_REQUEST_BIZ_CONTENT).getValue());
bizData.put("pageIndex", String.valueOf(currentPage));
bizData.put("pageSize", context.getProperty(PROP_PAGE_SIZE).getValue());
bizData.put(startTimeParamName, windowStartTimeStr);
bizData.put(endTimeParamName, windowEndTimeStr);
ResultDataDto resultDataDto = callJackyunApi(context, bizData);
if (resultDataDto == null || !"200".equals(resultDataDto.getCode())) {
String errorMsg = (resultDataDto != null) ? resultDataDto.toString() : "API返回为空";
getLogger().error("吉客云API返回业务错误: {}", errorMsg);
FlowFile failureFlowFile = session.create();
failureFlowFile = session.write(failureFlowFile, out -> out.write(errorMsg.getBytes(StandardCharsets.UTF_8)));
session.transfer(failureFlowFile, REL_FAILURE);
return; // 发生业务错误终止本次执行
}
ResultDto result = resultDataDto.getResult();
if (result != null && result.getData() != null && !result.getData().isEmpty()) {
hasFetchedDataInThisRun = true;
getLogger().info("任务 '{}' 在时间窗 [{} -> {}] 的第 {} 页获取到 {} 条数据。", stateKey, windowStartTimeStr, windowEndTimeStr, currentPage, result.getData().size());
FlowFile ff = session.create();
ff = session.putAttribute(ff, "jackyun.api.method", context.getProperty(PROP_API_METHOD).getValue());
ff = session.putAttribute(ff, "jackyun.api.startTime", windowStartTimeStr);
ff = session.putAttribute(ff, "jackyun.api.endTime", windowEndTimeStr);
ff = session.putAttribute(ff, "jackyun.api.pageIndex", String.valueOf(currentPage));
ff = session.write(ff, out -> out.write(JSON.toJSONString(result.getData()).getBytes(StandardCharsets.UTF_8)));
session.transfer(ff, REL_SUCCESS);
session.commitAsync();
currentPage++;
// 如果返回的数据量小于每页大小可以推断这是最后一页但为了保险起见继续查询直到返回空数据
// hasNextPageInWindow = result.getData().size() == Integer.parseInt(context.getProperty(PROP_PAGE_SIZE).getValue());
} else {
// API返回的数据为空说明当前时间窗口已无更多数据
hasNextPageInWindow = false;
}
} // --- 分页循环结束 ---
// 成功处理完一个时间窗口包括所有页立即更新MongoDB中的状态
String newWatermarkStr = sdf.format(new Date(newWatermarkMillis));
stateCollection.updateOne(
Filters.eq("_id", stateKey),
Updates.set("value", newWatermarkStr),
new UpdateOptions().upsert(true)
);
getLogger().info("任务 '{}' 的时间窗口 [{} -> {}] 数据同步完成,状态水位线更新至: {}", stateKey, windowStartTimeStr, windowEndTimeStr, newWatermarkStr);
// 更新循环变量为下一个时间窗口做准备
loopStartTimeMillis = newWatermarkMillis;
} // --- 追赶循环结束 ---
if (!hasFetchedDataInThisRun) {
getLogger().info("任务 '{}' 在本次执行中未发现新数据。", stateKey);
FlowFile noDataFlowFile = session.create();
noDataFlowFile = session.putAttribute(noDataFlowFile, "message", "在指定的时间范围内没有查询到新数据。");
session.transfer(noDataFlowFile, REL_NO_DATA);
}
} catch (Exception e) {
getLogger().error("处理吉客云数据时发生未知错误: {}", e.getMessage(), e);
StringWriter sw = new StringWriter();
e.printStackTrace(new PrintWriter(sw));
String exceptionAsString = sw.toString();
FlowFile failureFlowFile = session.create();
failureFlowFile = session.write(failureFlowFile, out -> out.write(exceptionAsString.getBytes(StandardCharsets.UTF_8)));
session.transfer(failureFlowFile, REL_FAILURE);
}
}
private ResultDataDto callJackyunApi(ProcessContext context, JSONObject bizData) throws IOException, NoSuchAlgorithmException {
String appKey = context.getProperty(PROP_APP_KEY).getValue();
String appSecret = context.getProperty(PROP_APP_SECRET).getValue();
String method = context.getProperty(PROP_API_METHOD).getValue();
String version = context.getProperty(PROP_API_VERSION).getValue();
SortedMap<String, String> sortedMap = new TreeMap<>();
sortedMap.put("method", method);
sortedMap.put("appkey", appKey);
sortedMap.put("version", version);
sortedMap.put("contenttype", "json");
sortedMap.put("timestamp", DATETIME_FORMATTER.format(LocalDateTime.now()));
sortedMap.put("bizcontent", bizData.toJSONString());
StringBuilder sbSignData = new StringBuilder(appSecret);
for (Map.Entry<String, String> entry : sortedMap.entrySet()) {
sbSignData.append(entry.getKey()).append(entry.getValue());
}
sbSignData.append(appSecret);
sortedMap.put("sign", md5Encrypt(sbSignData.toString().toLowerCase()));
FormBody.Builder formBodyBuilder = new FormBody.Builder();
for (Map.Entry<String, String> entry : sortedMap.entrySet()) {
formBodyBuilder.add(entry.getKey(), entry.getValue());
}
RequestBody formBody = formBodyBuilder.build();
Request request = new Request.Builder().url(context.getProperty(PROP_GATEWAY_URL).getValue()).post(formBody).build();
try (Response response = getHttpClient().newCall(request).execute()) {
if (!response.isSuccessful()) {
throw new IOException("非预期的HTTP响应码: " + response.code() + ", 响应体: " + response.body().string());
}
if (response.body() != null) {
return JSON.parseObject(response.body().string(), ResultDataDto.class);
}
return null;
}
}
private String md5Encrypt(String text) throws NoSuchAlgorithmException {
MessageDigest md5 = MessageDigest.getInstance("MD5");
byte[] resultByte = text.getBytes(StandardCharsets.UTF_8);
byte[] md5Bytes = md5.digest(resultByte);
StringBuilder hexValue = new StringBuilder();
for (byte md5Byte : md5Bytes) {
int val = (md5Byte) & 0xff;
if (val < 16) {
hexValue.append("0");
}
hexValue.append(Integer.toHexString(val));
}
return hexValue.toString();
}
}

View File

@ -0,0 +1,53 @@
package com.hzya.frame.dto;
/**
* @Authorliuyang
* @Packagecom.hzya.frame.dto
* @Projectnifi-hzyadev-bundle
* @nameResultDataDto
* @Date2025/8/27 16:15
* @FilenameResultDataDto
*/
public class ResultDataDto {
private String code;
private String developerInfo;
private String msg;
private ResultDto result;
public String getCode() {
return code;
}
public void setCode(String code) {
this.code = code;
}
public String getDeveloperInfo() {
return developerInfo;
}
public void setDeveloperInfo(String developerInfo) {
this.developerInfo = developerInfo;
}
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
public ResultDto getResult() {
return result;
}
public void setResult(ResultDto result) {
this.result = result;
}
@Override
public String toString() {
return "ResultDataDto{" + "code='" + code + '\'' + ", developerInfo='" + developerInfo + '\'' + ", msg='" + msg + '\'' + ", result=" + result + '}';
}
}

View File

@ -0,0 +1,32 @@
package com.hzya.frame.dto;
import java.util.List;
/**
* @Authorliuyang
* @Packagecom.hzya.frame.dto
* @Projectnifi-hzyadev-bundle
* @nameResultDataDto
* @Date2025/8/27 16:15
* @FilenameResultDataDto
*/
public class ResultDto {
private String contextId;
private List<Object> data;
public String getContextId() {
return contextId;
}
public void setContextId(String contextId) {
this.contextId = contextId;
}
public List<Object> getData() {
return data;
}
public void setData(List<Object> data) {
this.data = data;
}
}

View File

@ -0,0 +1,15 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
com.hzya.frame.GetJackyunOpenDataProcessor

View File

@ -0,0 +1,138 @@
package com.hzya.frame;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.hzya.frame.dto.ResultDataDto;
import com.hzya.frame.dto.ResultDto;
import okhttp3.*;
import org.apache.nifi.processor.ProcessContext;
import org.junit.Test;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.*;
/**
* @Authorliuyang
* @Packagecom.hzya.frame
* @Projectnifi-hzyadev-bundle
* @nameGetJackyunOpenDataProcessorTest
* @Date2025/8/27 16:04
* @FilenameGetJackyunOpenDataProcessorTest
*/
public class GetJackyunOpenDataProcessorTest {
private static final DateTimeFormatter DATETIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
@Test
public void test1() {
try {
Long total = 0L;
Integer pageIndex = 0;
Set<String> ids = new HashSet<>();
while (true) {
ResultDataDto resultDataDto = callJackyunApi(pageIndex++);
if (resultDataDto != null && resultDataDto.getResult() != null) {
ResultDto result = resultDataDto.getResult();
if (result.getData() != null && result.getData().size() > 0) {
total += result.getData().size();
System.out.println("查询得到数量:" + result.getData().size() + " 总数量:" + total);
List<Object> data = result.getData();
for (int i = 0; i < data.size(); i++) {
JSONObject o = (JSONObject) data.get(i);
JSONObject tradeSettleOrder = (JSONObject) o.get("tradeSettleOrder");
String id1 = String.valueOf(tradeSettleOrder.get("id"));
// 关键修改检查重复并报错
if (!ids.add(id1)) { // add() 返回 false 说明重复
throw new IllegalStateException("发现重复 ID: " + id1);
}
}
} else {
System.out.println("查询完毕");
break;
}
}
}
System.out.println("总数量:" + total + " set集合" + ids.size());
} catch (Exception e) {
e.printStackTrace();
}
}
private ResultDataDto callJackyunApi(Integer pageIndex) throws IOException, NoSuchAlgorithmException {
OkHttpClient httpClient = new OkHttpClient.Builder().connectTimeout(30, TimeUnit.SECONDS).readTimeout(60, TimeUnit.SECONDS).build();
String bizData = "{\n" +
" \"tradeNo\": \"\",\n" +
" \"settleStatus\": \"\",\n" +
" \"gmtModifiedEnd\": \"\",\n" +
" \"auditTimeBegin\": \"2025-08-27 00:00:00\",\n" +
" \"pageSize\": \"200\",\n" +
" \"gmtCreatedBegin\": \"\",\n" +
" \"billDateBegin\": \"\",\n" +
" \"settleNo\": \"\",\n" +
" \"billDateEnd\": \"\",\n" +
" \"gmtCreatedEnd\": \"\",\n" +
" \"pageIndex\": \""+pageIndex+"\",\n" +
" \"auditTimeEnd\": \"2025-08-27 19:21:44\",\n" +
" \"detailCols\": \"tradeSettleOrder.settleId, tradeSettleOrder.shopId, tradeSettleOrder.auditor, tradeSettleOrder.flagIds, tradeSettleOrder.payment, tradeSettleOrder.departId, tradeSettleOrder.settleNo, tradeSettleOrder.shopName, tradeSettleOrder.auditTime, tradeSettleOrder.auditorId, tradeSettleOrder.companyId, tradeSettleOrder.settleFee, tradeSettleOrder.settlerId, tradeSettleOrder.chargeType, tradeSettleOrder.departName, tradeSettleOrder.settleMemo, tradeSettleOrder.settleTime, tradeSettleOrder.sysFlagIds, tradeSettleOrder.accountName, tradeSettleOrder.companyName, tradeSettleOrder.settleStatus, tradeSettleOrder.chargeAccount, tradeSettleOrder.onlinePayTime, tradeSettleOrder.chargeCurrency, tradeSettleOrder.chargeCurrencyCode, tradeSettleOrder.chargeExchangeRate, tradeSettleOrder.settler, tradeSettleOrder.couponFee, tradeSettleOrder.differenceFee, tradeSettleOrder.settleTotalFee, tradeSettleOrder.settleDiscountFee, tradeSettleOrderDetailArr.subTradeId31177, tradeSettleOrderDetailArr.tradeFrom, tradeSettleOrderDetailArr.cost, tradeSettleOrderDetailArr.unit, tradeSettleOrderDetailArr.payNo, tradeSettleOrderDetailArr.cateId, tradeSettleOrderDetailArr.remark, tradeSettleOrderDetailArr.seller, tradeSettleOrderDetailArr.specId, tradeSettleOrderDetailArr.taxFee, tradeSettleOrderDetailArr.brandId, tradeSettleOrderDetailArr.goodsId, tradeSettleOrderDetailArr.goodsNo, tradeSettleOrderDetailArr.orderNo, tradeSettleOrderDetailArr.payTime, tradeSettleOrderDetailArr.payType, tradeSettleOrderDetailArr.taxRate, tradeSettleOrderDetailArr.cateName, tradeSettleOrderDetailArr.sellerId, tradeSettleOrderDetailArr.settleId, tradeSettleOrderDetailArr.specName, tradeSettleOrderDetailArr.auditTime, tradeSettleOrderDetailArr.brandName, tradeSettleOrderDetailArr.goodsName, tradeSettleOrderDetailArr.goodsTags, tradeSettleOrderDetailArr.sellCount, tradeSettleOrderDetailArr.sellPrice, tradeSettleOrderDetailArr.sellTotal, tradeSettleOrderDetailArr.tradeTime, tradeSettleOrderDetailArr.tradeType, tradeSettleOrderDetailArr.arriveTime, tradeSettleOrderDetailArr.chargeType, tradeSettleOrderDetailArr.goodsAlias, tradeSettleOrderDetailArr.logisticId, tradeSettleOrderDetailArr.mainPostid, tradeSettleOrderDetailArr.payAccount, tradeSettleOrderDetailArr.payDueDate, tradeSettleOrderDetailArr.stockoutNo, tradeSettleOrderDetailArr.accountName, tradeSettleOrderDetailArr.consignTime, tradeSettleOrderDetailArr.discountFee, tradeSettleOrderDetailArr.tradeStatus, tradeSettleOrderDetailArr.warehouseId, tradeSettleOrderDetailArr.lastShipTime, tradeSettleOrderDetailArr.logisticName, tradeSettleOrderDetailArr.logisticType, tradeSettleOrderDetailArr.chargeAccount, tradeSettleOrderDetailArr.sourceTradeNo, tradeSettleOrderDetailArr.warehouseName, tradeSettleOrderDetailArr.goodsAttribute, tradeSettleOrderDetailArr.shareFavourableAfterFee, tradeSettleOrderDetailArr.tradeNo, tradeSettleOrderDetailArr.innerSettleCost, tradeSettleOrderDetailArr.shareDiscountFee, tradeSettleOrderDetailArr.shareOtherPayment, tradeSettleOrderDetailArr.goodsReceiptAmount, tradeSettleOrderDetailArr.shareFairSellTotal, tradeSettleOrderDetailArr.shareFavourableFee, tradeSettleOrderDetailArr.otherShareFavourableFee, tradeSettleOrderDetailArr.customizeGoodsColumn1, tradeSettleOrderDetailArr.customizeGoodsColumn2, tradeSettleOrderDetailArr.customizeGoodsColumn3, tradeSettleOrderDetailArr.customizeGoodsColumn4, tradeSettleOrderDetailArr.customizeGoodsColumn5, tradeSettleOrderDetailArr.customizeGoodsColumn6, tradeSettleOrderDetailArr.customizeGoodsColumn7, tradeSettleOrderDetailArr.customizeGoodsColumn8, tradeSettleOrderDetailArr.customizeGoodsColumn9, tradeSettleOrderDetailArr.customizeGoodsColumn10, tradeSettleOrderCustomer.settleUnitId, tradeSettleOrderCustomer.qq, tradeSettleOrderCustomer.email, tradeSettleOrderCustomer.weiXin, tradeSettleOrderCustomer.wangWang, tradeSettleOrderCustomer.customerId, tradeSettleOrderCustomer.settleUnit, tradeSettleOrderCustomer.customerName, tradeSettleOrderCustomer.customerTags, tradeSettleOrderCustomer.customerType, tradeSettleOrderCustomer.customerGrade, tradeSettleOrderCustomer.customerAccount, tradeSettleOrderCustomer.customerTypeName, tradeSettleOrderCustomer.customerGradeName, tradeSettleOrderCustomer.endCustomerAccount, tradeSettleOrderColumnExt.customizeTradeColumn1, tradeSettleOrderColumnExt.customizeTradeColumn2, tradeSettleOrderColumnExt.customizeTradeColumn3, tradeSettleOrderColumnExt.customizeTradeColumn4, tradeSettleOrderColumnExt.customizeTradeColumn5, tradeSettleOrderColumnExt.customizeTradeColumn6, tradeSettleOrderColumnExt.customizeTradeColumn7, tradeSettleOrderColumnExt.customizeTradeColumn8, tradeSettleOrderColumnExt.customizeTradeColumn9, tradeSettleOrderColumnExt.customizeTradeColumn10\",\n" +
" \"gmtModifiedBegin\": \"\",\n" +
" \"hasQueryHistory\": \"\",\n" +
" \"shopId\": \"\",\n" +
" \"cols\": \"settleId,settleNo,tradeNo,sourceTradeNo,settleInvoiceStatus\"\n" +
"}";
SortedMap<String, String> sortedMap = new TreeMap<>();
sortedMap.put("method", "oms.open.trade.settle.allinfo");
sortedMap.put("appkey", "12346738");
sortedMap.put("version", "v1.0");
sortedMap.put("contenttype", "json");
sortedMap.put("timestamp", DATETIME_FORMATTER.format(LocalDateTime.now()));
sortedMap.put("bizcontent", bizData);
StringBuilder sbSignData = new StringBuilder("94d715fc68214ce1ba48803b3bf19a9f");
for (Map.Entry<String, String> entry : sortedMap.entrySet()) {
sbSignData.append(entry.getKey()).append(entry.getValue());
}
sbSignData.append("94d715fc68214ce1ba48803b3bf19a9f");
sortedMap.put("sign", md5Encrypt(sbSignData.toString().toLowerCase()));
FormBody.Builder formBodyBuilder = new FormBody.Builder();
for (Map.Entry<String, String> entry : sortedMap.entrySet()) {
formBodyBuilder.add(entry.getKey(), entry.getValue());
}
RequestBody formBody = formBodyBuilder.build();
Request request = new Request.Builder().url("https://open.jackyun.com/open/openapi/do").post(formBody).build();
try (Response response = httpClient.newCall(request).execute()) {
if (!response.isSuccessful()) {
throw new IOException("非预期的HTTP响应码: " + response.code() + ", 响应体: " + response.body().string());
}
return JSON.parseObject(response.body().string(), ResultDataDto.class);
}
}
private String md5Encrypt(String text) throws NoSuchAlgorithmException {
MessageDigest md5 = MessageDigest.getInstance("MD5");
byte[] resultByte = text.getBytes(StandardCharsets.UTF_8);
byte[] md5Bytes = md5.digest(resultByte);
StringBuilder hexValue = new StringBuilder();
for (byte md5Byte : md5Bytes) {
int val = (md5Byte) & 0xff;
if (val < 16) {
hexValue.append("0");
}
hexValue.append(Integer.toHexString(val));
}
return hexValue.toString();
}
}

View File

@ -0,0 +1,256 @@
package com.hzya.frame;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.hzya.frame.dto.ResultDataDto;
import com.hzya.frame.dto.ResultDto;
import okhttp3.*;
import org.junit.Test;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.concurrent.TimeUnit;
/**
* @Authorliuyang
* @Packagecom.hzya.frame
* @Projectnifi-hzyadev-bundle
* @nameGetJackyunOpenDataProcessorTest
* @Date2025/8/27 16:04
* @FilenameGetJackyunOpenDataProcessorTest
*/
public class GetJackyunOpenDataProcessorTest2 {
// 定义统一的时间格式化器
private static final DateTimeFormatter DATETIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
/**
* 新增的方法按小时调度查询任务
* 这个方法模拟在2025年8月27日 00:00:00 开始每小时查询一次数据
* 直到最后一个时间段覆盖到 18:16:00
*/
/**
* 新增的方法按小时调度查询任务
* 这个方法模拟在2025年8月27日 00:00:00 开始每小时查询一次数据
* 直到最后一个时间段覆盖到 18:16:00并最后输出总计
*/
/**
* 修正后的方法按小时调度查询任务
* 通过将每个时间段的结束时间减去1秒来避免边界数据的重复计算
* 确保查询区间为 [start, end)实现无缝且不重叠的查询
*/
@Test
public void queryDataHourly() {
// 设置任务的起始和最终截止时间
LocalDateTime taskStartTime = LocalDateTime.of(2025, 8, 27, 0, 0, 0);
LocalDateTime taskFinalEndTime = LocalDateTime.of(2025, 8, 27, 18, 16, 0);
LocalDateTime currentQueryStartTime = taskStartTime;
long grandTotal = 0L;
System.out.println("开始执行按小时查询任务(已修正边界问题)...");
while (currentQueryStartTime.isBefore(taskFinalEndTime)) {
// 定义下一个小时的起点作为当前区间的边界
LocalDateTime nextHourStartTime = currentQueryStartTime.plusHours(1);
// 确定本次查询的实际结束时间
LocalDateTime actualQueryEndTime;
// 如果下一个小时的起点已经超过了最终截止时间或者就是最终截止时间
// 那么本次查询的结束时间就应该是最终截止时间这是最后一个区间
if (nextHourStartTime.isAfter(taskFinalEndTime) || nextHourStartTime.isEqual(taskFinalEndTime)) {
actualQueryEndTime = taskFinalEndTime;
} else {
// 否则本次查询的结束时间是下一个小时的起点减去1秒
// 例如对于 [00:00:00, 01:00:00) 的区间实际查询的是 [00:00:00, 00:59:59]
actualQueryEndTime = nextHourStartTime.minusSeconds(1);
}
// 将时间对象格式化为字符串
String startTimeStr = DATETIME_FORMATTER.format(currentQueryStartTime);
String endTimeStr = DATETIME_FORMATTER.format(actualQueryEndTime);
try {
long totalCountInHour = getTotalCountForTimeRange(startTimeStr, endTimeStr);
System.out.println("时间范围: " + startTimeStr + " - " + endTimeStr + " 查询数量: " + totalCountInHour);
grandTotal += totalCountInHour;
} catch (Exception e) {
System.err.println("查询时间范围 " + startTimeStr + " - " + endTimeStr + " 时发生错误: " + e.getMessage());
e.printStackTrace();
}
// 下一次循环的开始时间直接就是下一个小时的整点
currentQueryStartTime = nextHourStartTime;
}
System.out.println("所有时间段查询完毕。");
System.out.println("查询合计总数量: " + grandTotal);
}
/**
* 辅助方法获取指定时间范围内的总数据量包含分页逻辑
* @param startTimeStr 查询开始时间 (格式: "yyyy-MM-dd HH:mm:ss")
* @param endTimeStr 查询结束时间 (格式: "yyyy-MM-dd HH:mm:ss")
* @return 该时间范围内的总记录数
* @throws IOException API请求异常
* @throws NoSuchAlgorithmException MD5加密异常
*/
private long getTotalCountForTimeRange(String startTimeStr, String endTimeStr) throws IOException, NoSuchAlgorithmException {
long total = 0L;
int pageIndex = 0;
// 循环调用API直到返回的数据为空实现分页查询
while (true) {
// 调用API传入分页索引和时间范围
ResultDataDto resultDataDto = callJackyunApi(pageIndex++, startTimeStr, endTimeStr);
if (resultDataDto != null && resultDataDto.getResult() != null) {
ResultDto result = resultDataDto.getResult();
// 如果返回的数据列表不为空且有数据
if (result.getData() != null && !result.getData().isEmpty()) {
total += result.getData().size();
} else {
// 如果没有数据返回说明已经查询完所有分页跳出循环
break;
}
} else {
// 如果返回结果为空也跳出循环
break;
}
}
return total;
}
/**
* 修改后的API调用方法
* @param pageIndex 页码
* @param auditTimeBegin 审核开始时间
* @param auditTimeEnd 审核结束时间
* @return API返回结果
* @throws IOException API请求异常
* @throws NoSuchAlgorithmException MD5加密异常
*/
private ResultDataDto callJackyunApi(Integer pageIndex, String auditTimeBegin, String auditTimeEnd) throws IOException, NoSuchAlgorithmException {
OkHttpClient httpClient = new OkHttpClient.Builder()
.connectTimeout(30, TimeUnit.SECONDS)
.readTimeout(60, TimeUnit.SECONDS)
.build();
// auditTimeBegin auditTimeEnd 参数动态拼接到 bizData
String bizData = "{\n" +
" \"tradeNo\": \"\",\n" +
" \"settleStatus\": \"\",\n" +
" \"gmtModifiedEnd\": \"\",\n" +
" \"auditTimeBegin\": \"" + auditTimeBegin + "\",\n" +
" \"pageSize\": \"200\",\n" +
" \"gmtCreatedBegin\": \"\",\n" +
" \"billDateBegin\": \"\",\n" +
" \"settleNo\": \"\",\n" +
" \"billDateEnd\": \"\",\n" +
" \"gmtCreatedEnd\": \"\",\n" +
" \"pageIndex\": \"" + pageIndex + "\",\n" +
" \"auditTimeEnd\": \"" + auditTimeEnd + "\",\n" +
" \"detailCols\": \"tradeSettleOrder.settleId, tradeSettleOrder.shopId, tradeSettleOrder.auditor, tradeSettleOrder.flagIds, tradeSettleOrder.payment, tradeSettleOrder.departId, tradeSettleOrder.settleNo, tradeSettleOrder.shopName, tradeSettleOrder.auditTime, tradeSettleOrder.auditorId, tradeSettleOrder.companyId, tradeSettleOrder.settleFee, tradeSettleOrder.settlerId, tradeSettleOrder.chargeType, tradeSettleOrder.departName, tradeSettleOrder.settleMemo, tradeSettleOrder.settleTime, tradeSettleOrder.sysFlagIds, tradeSettleOrder.accountName, tradeSettleOrder.companyName, tradeSettleOrder.settleStatus, tradeSettleOrder.chargeAccount, tradeSettleOrder.onlinePayTime, tradeSettleOrder.chargeCurrency, tradeSettleOrder.chargeCurrencyCode, tradeSettleOrder.chargeExchangeRate, tradeSettleOrder.settler, tradeSettleOrder.couponFee, tradeSettleOrder.differenceFee, tradeSettleOrder.settleTotalFee, tradeSettleOrder.settleDiscountFee, tradeSettleOrderDetailArr.subTradeId31177, tradeSettleOrderDetailArr.tradeFrom, tradeSettleOrderDetailArr.cost, tradeSettleOrderDetailArr.unit, tradeSettleOrderDetailArr.payNo, tradeSettleOrderDetailArr.cateId, tradeSettleOrderDetailArr.remark, tradeSettleOrderDetailArr.seller, tradeSettleOrderDetailArr.specId, tradeSettleOrderDetailArr.taxFee, tradeSettleOrderDetailArr.brandId, tradeSettleOrderDetailArr.goodsId, tradeSettleOrderDetailArr.goodsNo, tradeSettleOrderDetailArr.orderNo, tradeSettleOrderDetailArr.payTime, tradeSettleOrderDetailArr.payType, tradeSettleOrderDetailArr.taxRate, tradeSettleOrderDetailArr.cateName, tradeSettleOrderDetailArr.sellerId, tradeSettleOrderDetailArr.settleId, tradeSettleOrderDetailArr.specName, tradeSettleOrderDetailArr.auditTime, tradeSettleOrderDetailArr.brandName, tradeSettleOrderDetailArr.goodsName, tradeSettleOrderDetailArr.goodsTags, tradeSettleOrderDetailArr.sellCount, tradeSettleOrderDetailArr.sellPrice, tradeSettleOrderDetailArr.sellTotal, tradeSettleOrderDetailArr.tradeTime, tradeSettleOrderDetailArr.tradeType, tradeSettleOrderDetailArr.arriveTime, tradeSettleOrderDetailArr.chargeType, tradeSettleOrderDetailArr.goodsAlias, tradeSettleOrderDetailArr.logisticId, tradeSettleOrderDetailArr.mainPostid, tradeSettleOrderDetailArr.payAccount, tradeSettleOrderDetailArr.payDueDate, tradeSettleOrderDetailArr.stockoutNo, tradeSettleOrderDetailArr.accountName, tradeSettleOrderDetailArr.consignTime, tradeSettleOrderDetailArr.discountFee, tradeSettleOrderDetailArr.tradeStatus, tradeSettleOrderDetailArr.warehouseId, tradeSettleOrderDetailArr.lastShipTime, tradeSettleOrderDetailArr.logisticName, tradeSettleOrderDetailArr.logisticType, tradeSettleOrderDetailArr.chargeAccount, tradeSettleOrderDetailArr.sourceTradeNo, tradeSettleOrderDetailArr.warehouseName, tradeSettleOrderDetailArr.goodsAttribute, tradeSettleOrderDetailArr.shareFavourableAfterFee, tradeSettleOrderDetailArr.tradeNo, tradeSettleOrderDetailArr.innerSettleCost, tradeSettleOrderDetailArr.shareDiscountFee, tradeSettleOrderDetailArr.shareOtherPayment, tradeSettleOrderDetailArr.goodsReceiptAmount, tradeSettleOrderDetailArr.shareFairSellTotal, tradeSettleOrderDetailArr.shareFavourableFee, tradeSettleOrderDetailArr.otherShareFavourableFee, tradeSettleOrderDetailArr.customizeGoodsColumn1, tradeSettleOrderDetailArr.customizeGoodsColumn2, tradeSettleOrderDetailArr.customizeGoodsColumn3, tradeSettleOrderDetailArr.customizeGoodsColumn4, tradeSettleOrderDetailArr.customizeGoodsColumn5, tradeSettleOrderDetailArr.customizeGoodsColumn6, tradeSettleOrderDetailArr.customizeGoodsColumn7, tradeSettleOrderDetailArr.customizeGoodsColumn8, tradeSettleOrderDetailArr.customizeGoodsColumn9, tradeSettleOrderDetailArr.customizeGoodsColumn10, tradeSettleOrderCustomer.settleUnitId, tradeSettleOrderCustomer.qq, tradeSettleOrderCustomer.email, tradeSettleOrderCustomer.weiXin, tradeSettleOrderCustomer.wangWang, tradeSettleOrderCustomer.customerId, tradeSettleOrderCustomer.settleUnit, tradeSettleOrderCustomer.customerName, tradeSettleOrderCustomer.customerTags, tradeSettleOrderCustomer.customerType, tradeSettleOrderCustomer.customerGrade, tradeSettleOrderCustomer.customerAccount, tradeSettleOrderCustomer.customerTypeName, tradeSettleOrderCustomer.customerGradeName, tradeSettleOrderCustomer.endCustomerAccount, tradeSettleOrderColumnExt.customizeTradeColumn1, tradeSettleOrderColumnExt.customizeTradeColumn2, tradeSettleOrderColumnExt.customizeTradeColumn3, tradeSettleOrderColumnExt.customizeTradeColumn4, tradeSettleOrderColumnExt.customizeTradeColumn5, tradeSettleOrderColumnExt.customizeTradeColumn6, tradeSettleOrderColumnExt.customizeTradeColumn7, tradeSettleOrderColumnExt.customizeTradeColumn8, tradeSettleOrderColumnExt.customizeTradeColumn9, tradeSettleOrderColumnExt.customizeTradeColumn10\",\n" +
" \"gmtModifiedBegin\": \"\",\n" +
" \"hasQueryHistory\": \"\",\n" +
" \"shopId\": \"\",\n" +
" \"cols\": \"settleId,settleNo,tradeNo,sourceTradeNo,settleInvoiceStatus\"\n" +
"}";
SortedMap<String, String> sortedMap = new TreeMap<>();
sortedMap.put("method", "oms.open.trade.settle.allinfo");
sortedMap.put("appkey", "12346738");
sortedMap.put("version", "v1.0");
sortedMap.put("contenttype", "json");
sortedMap.put("timestamp", DATETIME_FORMATTER.format(LocalDateTime.now()));
sortedMap.put("bizcontent", bizData);
StringBuilder sbSignData = new StringBuilder("94d715fc68214ce1ba48803b3bf19a9f");
for (Map.Entry<String, String> entry : sortedMap.entrySet()) {
sbSignData.append(entry.getKey()).append(entry.getValue());
}
sbSignData.append("94d715fc68214ce1ba48803b3bf19a9f");
sortedMap.put("sign", md5Encrypt(sbSignData.toString().toLowerCase()));
FormBody.Builder formBodyBuilder = new FormBody.Builder();
for (Map.Entry<String, String> entry : sortedMap.entrySet()) {
formBodyBuilder.add(entry.getKey(), entry.getValue());
}
RequestBody formBody = formBodyBuilder.build();
Request request = new Request.Builder().url("https://open.jackyun.com/open/openapi/do").post(formBody).build();
try (Response response = httpClient.newCall(request).execute()) {
if (!response.isSuccessful()) {
throw new IOException("非预期的HTTP响应码: " + response.code() + ", 响应体: " + response.body().string());
}
return JSON.parseObject(response.body().string(), ResultDataDto.class);
}
}
/**
* MD5加密工具方法
* @param text 待加密的字符串
* @return 加密后的32位小写MD5字符串
* @throws NoSuchAlgorithmException
*/
private String md5Encrypt(String text) throws NoSuchAlgorithmException {
MessageDigest md5 = MessageDigest.getInstance("MD5");
byte[] resultByte = text.getBytes(StandardCharsets.UTF_8);
byte[] md5Bytes = md5.digest(resultByte);
StringBuilder hexValue = new StringBuilder();
for (byte md5Byte : md5Bytes) {
int val = (md5Byte) & 0xff;
if (val < 16) {
hexValue.append("0");
}
hexValue.append(Integer.toHexString(val));
}
return hexValue.toString();
}
/*
* 保留您原有的 test1 方法不做修改
*/
@Test
public void test1() {
try {
Long total = 0L;
Integer pageIndex = 0;
Set<String> ids = new HashSet<>();
while (true) {
// 为了让这个旧方法能运行我们让它调用修改后的API方法但传入固定的时间
ResultDataDto resultDataDto = callJackyunApi(pageIndex++, "2025-08-27 00:00:00", "2025-08-27 18:16:00");
if (resultDataDto != null && resultDataDto.getResult() != null) {
ResultDto result = resultDataDto.getResult();
if (result.getData() != null && result.getData().size() > 0) {
total += result.getData().size();
System.out.println("查询得到数量:" + result.getData().size() + " 总数量:" + total);
List<Object> data = result.getData();
for (int i = 0; i < data.size(); i++) {
JSONObject o = (JSONObject) data.get(i);
JSONObject tradeSettleOrder = (JSONObject) o.get("tradeSettleOrder");
String id1 = String.valueOf(tradeSettleOrder.get("id"));
// 关键修改检查重复并报错
if (!ids.add(id1)) { // add() 返回 false 说明重复
throw new IllegalStateException("发现重复 ID: " + id1);
}
}
} else {
System.out.println("查询完毕");
break;
}
}
}
System.out.println("总数量:" + total + " set集合" + ids.size());
} catch (Exception e) {
e.printStackTrace();
}
}
}

View File

@ -25,6 +25,8 @@
<module>hzya-nifi-AdvancedJoltTransformer-processors</module> <module>hzya-nifi-AdvancedJoltTransformer-processors</module>
<module>hzya-nifi-AutoAddOracleDatafile-nar</module> <module>hzya-nifi-AutoAddOracleDatafile-nar</module>
<module>hzya-nifi-AutoAddOracleDatafile-processors</module> <module>hzya-nifi-AutoAddOracleDatafile-processors</module>
<module>hzya-nifi-GetJackyunOpenData-nar</module>
<module>hzya-nifi-GetJackyunOpenData-processors</module>
<module>hzya-nifi-Zjnx-czb-processors</module> <module>hzya-nifi-Zjnx-czb-processors</module>
<module>hzya-nifi-Zjnx-czb-nar</module> <module>hzya-nifi-Zjnx-czb-nar</module>
<module>hzya-nifi-Zsyh-cbs-nar</module> <module>hzya-nifi-Zsyh-cbs-nar</module>