feat(nifi): 增加流程组相关API接口并优化参数处理

- 新增流程组查询、创建、删除和启动/停止的API接口
- 添加流程组绑定参数上下文的接口
- 优化参数处理,支持直接传入对象作为请求体
-重构部分代码以提高可维护性
This commit is contained in:
liuy 2025-05-16 11:21:28 +08:00
parent 9acde1cb1e
commit 72710dc255
38 changed files with 441 additions and 18 deletions

View File

@ -86,6 +86,15 @@ public class NifiClient {
});
}
public <T> T put(String path, Object requestBody, Class<T> responseType) throws IOException {
String jsonBody = objectMapper.writeValueAsString(requestBody);
RequestBody body = RequestBody.create(jsonBody, MediaType.get("application/json; charset=utf-8"));
return executeRequestWithRetry(() -> {
Request request = new Request.Builder().url(config.getApiUrl() + path).put(body).header("Authorization", "Bearer " + accessToken.get()).build();
return executeRequest(request, responseType);
});
}
private <T> T executeRequestWithRetry(IOExceptionRunnable<T> runnable) throws IOException {
int maxRetries = 2;
for (int attempt = 0; attempt < maxRetries; attempt++) {

View File

@ -0,0 +1,15 @@
package com.hzya.frame.nifi.model.joinbindparametercontexts;
import lombok.Data;
/**
* Auto-generated: 2025-05-16 11:1:46
*
* @author bejson.com (i@bejson.com)
* @website http://www.bejson.com/java2pojo/
*/
@Data
public class BindParameterContextsJoin11 {
private Revision11 revision;
private Component11 component;
}

View File

@ -0,0 +1,15 @@
package com.hzya.frame.nifi.model.joinbindparametercontexts;
import lombok.Data;
/**
* Auto-generated: 2025-05-16 11:1:46
*
* @author bejson.com (i@bejson.com)
* @website http://www.bejson.com/java2pojo/
*/
@Data
public class Component11 {
private String id;
private ParameterContext11 parameterContext;
}

View File

@ -0,0 +1,8 @@
package com.hzya.frame.nifi.model.joinbindparametercontexts;
import lombok.Data;
@Data
public class ParameterContext11 {
private String id;
}

View File

@ -0,0 +1,17 @@
/**
* Copyright 2025 bejson.com
*/
package com.hzya.frame.nifi.model.joinbindparametercontexts;
import lombok.Data;
/**
* Auto-generated: 2025-05-16 11:1:46
*
* @author bejson.com (i@bejson.com)
* @website http://www.bejson.com/java2pojo/
*/
@Data
public class Revision11 {
private String version;
}

View File

@ -0,0 +1,11 @@
package com.hzya.frame.nifi.model.joinprocessgroups;
import com.hzya.frame.nifi.model.resultparametercontexts.Parameters;
import lombok.Data;
import java.util.List;
@Data
public class Component7 {
private String name;
}

View File

@ -0,0 +1,16 @@
package com.hzya.frame.nifi.model.joinprocessgroups;
import lombok.Data;
/**
* @Authorliuyang
* @Packagecom.hzya.frame.nifi.model.joinprocessgroups
* @Projectfw-nifi
* @nameProcessGroupStatus
* @Date2025/5/15 10:43
* @FilenameProcessGroupStatus
*/
@Data
public class ProcessGroupStatus {
private String name;
}

View File

@ -0,0 +1,18 @@
package com.hzya.frame.nifi.model.joinprocessgroups;
import lombok.Data;
/**
* @Authorliuyang
* @Packagecom.hzya.frame.nifi.model.joinprocessgroups
* @Projectfw-nifi
* @nameProcessGroupsJoin
* @Date2025/5/15 10:43
* @FilenameProcessGroupsJoin
*/
@Data
public class ProcessGroupsJoin {
private Revision5 revision;
// private ProcessGroupStatus status;
private Component7 component;
}

View File

@ -0,0 +1,8 @@
package com.hzya.frame.nifi.model.joinprocessgroups;
import lombok.Data;
@Data
public class Revision5 {
private int version;
}

View File

@ -0,0 +1,17 @@
/**
* Copyright 2025 bejson.com
*/
package com.hzya.frame.nifi.model.joinstartorstopprocessgroup;
import lombok.Data;
/**
* Auto-generated: 2025-05-16 10:47:3
*
* @author bejson.com (i@bejson.com)
* @website http://www.bejson.com/java2pojo/
*/
@Data
public class Revision10 {
private String version;
}

View File

@ -0,0 +1,16 @@
package com.hzya.frame.nifi.model.joinstartorstopprocessgroup;
import lombok.Data;
/**
* Auto-generated: 2025-05-16 10:47:3
*
* @author bejson.com (i@bejson.com)
* @website http://www.bejson.com/java2pojo/
*/
@Data
public class StartOrStopProcessGroupsInfoJoin10 {
private String id;
private String state;
private Revision10 revision;
}

View File

@ -6,7 +6,7 @@ import java.util.List;
@Data
public class Connections {
private Revision revision;
private Revision2 revision;
private String id;
private String uri;
private Permissions permissions;

View File

@ -6,7 +6,7 @@ import java.util.List;
@Data
public class InputPorts {
private Revision revision;
private Revision2 revision;
private String id;
private String uri;
private Position position;

View File

@ -4,7 +4,7 @@ import lombok.Data;
@Data
public class Labels {
private Revision revision;
private Revision2 revision;
private String id;
private String uri;
private Position position;

View File

@ -6,7 +6,7 @@ import java.util.List;
@Data
public class OutputPorts {
private Revision revision;
private Revision2 revision;
private String id;
private String uri;
private Position position;

View File

@ -6,7 +6,7 @@ import java.util.List;
@Data
public class Processors {
private Revision revision;
private Revision2 revision;
private String id;
private String uri;
private Position position;

View File

@ -3,6 +3,6 @@ package com.hzya.frame.nifi.model.processgroupid;
import lombok.Data;
@Data
public class Revision {
public class Revision2 {
private int version;
}

View File

@ -6,7 +6,7 @@ import java.util.List;
@Data
public class ProcessGroups {
private Revision revision;
private Revision3 revision;
private String id;
private String uri;
private Position position;

View File

@ -3,6 +3,6 @@ package com.hzya.frame.nifi.model.processgrouproot;
import lombok.Data;
@Data
public class Revision {
public class Revision3 {
private String version;
}

View File

@ -4,7 +4,7 @@ import lombok.Data;
@Data
public class ParameterContextsResult {
private Revision revision;
private Revision4 revision;
private String id;
private String uri;
private Permissions permissions;

View File

@ -3,7 +3,7 @@ package com.hzya.frame.nifi.model.resultparametercontexts;
import lombok.Data;
@Data
public class Revision {
public class Revision4 {
private int version;
private String lastModifier;
}

View File

@ -0,0 +1,8 @@
package com.hzya.frame.nifi.model.resultprocessgroups;
import lombok.Data;
@Data
public class ProcessgroupsResult {
private String id;
}

View File

@ -0,0 +1,43 @@
package com.hzya.frame.nifi.model.resultprocessgroupsinfo;
import lombok.Data;
/**
* Auto-generated: 2025-05-16 9:47:26
*
* @author bejson.com (i@bejson.com)
* @website http://www.bejson.com/java2pojo/
*/
@Data
public class AggregateSnapshot9 {
private String id;
private String name;
private String flowFilesIn;
private String bytesIn;
private String input;
private String flowFilesQueued;
private String bytesQueued;
private String queued;
private String queuedCount;
private String queuedSize;
private String bytesRead;
private String read;
private String bytesWritten;
private String written;
private String flowFilesOut;
private String bytesOut;
private String output;
private String flowFilesTransferred;
private String bytesTransferred;
private String transferred;
private String bytesReceived;
private String flowFilesReceived;
private String received;
private String bytesSent;
private String flowFilesSent;
private String sent;
private String activeThreadCount;
private String terminatedThreadCount;
private String processingNanos;
private ProcessingPerformanceStatus9 processingPerformanceStatus;
}

View File

@ -0,0 +1,41 @@
package com.hzya.frame.nifi.model.resultprocessgroupsinfo;
import lombok.Data;
/**
* Auto-generated: 2025-05-16 9:47:26
*
* @author bejson.com (i@bejson.com)
* @website http://www.bejson.com/java2pojo/
*/
@Data
public class Component9 {
private String id;
private String parentGroupId;
private Position9 position;
private String name;
private String comments;
private Variables9 variables;
private String flowfileConcurrency;
private String flowfileOutboundPolicy;
private String defaultFlowFileExpiration;
private String defaultBackPressureObjectThreshold;
private String defaultBackPressureDataSizeThreshold;
private String runningCount;
private String stoppedCount;
private String invalidCount;
private String disabledCount;
private String activeRemotePortCount;
private String inactiveRemotePortCount;
private String upToDateCount;
private String locallyModifiedCount;
private String staleCount;
private String locallyModifiedAndStaleCount;
private String syncFailureCount;
private String localInputPortCount;
private String localOutputPortCount;
private String publicInputPortCount;
private String publicOutputPortCount;
private String inputPortCount;
private String outputPortCount;
}

View File

@ -0,0 +1,15 @@
package com.hzya.frame.nifi.model.resultprocessgroupsinfo;
import lombok.Data;
/**
* Auto-generated: 2025-05-16 9:47:26
*
* @author bejson.com (i@bejson.com)
* @website http://www.bejson.com/java2pojo/
*/
@Data
public class Permissions9 {
private String canRead;
private String canWrite;
}

View File

@ -0,0 +1,15 @@
package com.hzya.frame.nifi.model.resultprocessgroupsinfo;
import lombok.Data;
/**
* Auto-generated: 2025-05-16 9:47:26
*
* @author bejson.com (i@bejson.com)
* @website http://www.bejson.com/java2pojo/
*/
@Data
public class Position9 {
private String x;
private String y;
}

View File

@ -0,0 +1,40 @@
package com.hzya.frame.nifi.model.resultprocessgroupsinfo;
import lombok.Data;
import java.util.List;
/**
* Auto-generated: 2025-05-16 9:47:26
*
* @author bejson.com (i@bejson.com)
* @website http://www.bejson.com/java2pojo/
*/
@Data
public class ProcessGroupsInfoResult9 {
private Revision9 revision;
private String id;
private String uri;
private Position9 position;
private Permissions9 permissions;
private List<String> bulletins;
private Component9 component;
private Status9 status;
private String runningCount;
private String stoppedCount;
private String invalidCount;
private String disabledCount;
private String activeRemotePortCount;
private String inactiveRemotePortCount;
private String upToDateCount;
private String locallyModifiedCount;
private String staleCount;
private String locallyModifiedAndStaleCount;
private String syncFailureCount;
private String localInputPortCount;
private String localOutputPortCount;
private String publicInputPortCount;
private String publicOutputPortCount;
private String inputPortCount;
private String outputPortCount;
}

View File

@ -0,0 +1,19 @@
package com.hzya.frame.nifi.model.resultprocessgroupsinfo;
import lombok.Data;
/**
* Auto-generated: 2025-05-16 9:47:26
*
* @author bejson.com (i@bejson.com)
* @website http://www.bejson.com/java2pojo/
*/
@Data
public class ProcessingPerformanceStatus9 {
private String identifier;
private String cpuDuration;
private String contentReadDuration;
private String contentWriteDuration;
private String sessionCommitDuration;
private String garbageCollectionDuration;
}

View File

@ -0,0 +1,14 @@
package com.hzya.frame.nifi.model.resultprocessgroupsinfo;
import lombok.Data;
/**
* Auto-generated: 2025-05-16 9:47:26
*
* @author bejson.com (i@bejson.com)
* @website http://www.bejson.com/java2pojo/
*/
@Data
public class Revision9 {
private String version;
}

View File

@ -0,0 +1,20 @@
/**
* Copyright 2025 bejson.com
*/
package com.hzya.frame.nifi.model.resultprocessgroupsinfo;
import lombok.Data;
/**
* Auto-generated: 2025-05-16 9:47:26
*
* @author bejson.com (i@bejson.com)
* @website http://www.bejson.com/java2pojo/
*/
@Data
public class Status9 {
private String id;
private String name;
private String statsLastRefreshed;
private AggregateSnapshot9 aggregateSnapshot;
}

View File

@ -0,0 +1,14 @@
/**
* Copyright 2025 bejson.com
*/
package com.hzya.frame.nifi.model.resultprocessgroupsinfo;
/**
* Auto-generated: 2025-05-16 9:47:26
*
* @author bejson.com (i@bejson.com)
* @website http://www.bejson.com/java2pojo/
*/
public class Variables9 {
}

View File

@ -2,10 +2,15 @@ package com.hzya.frame.nifi.service;
import cn.hutool.core.util.StrUtil;
import com.hzya.frame.nifi.client.NifiClient;
import com.hzya.frame.nifi.model.joinbindparametercontexts.BindParameterContextsJoin11;
import com.hzya.frame.nifi.model.joinparametercontexts.ParameterContextsJoin;
import com.hzya.frame.nifi.model.joinprocessgroups.ProcessGroupsJoin;
import com.hzya.frame.nifi.model.joinstartorstopprocessgroup.StartOrStopProcessGroupsInfoJoin10;
import com.hzya.frame.nifi.model.processgroupid.ProcessGroupsId;
import com.hzya.frame.nifi.model.processgrouproot.ProcessGroupsRoot;
import com.hzya.frame.nifi.model.resultparametercontexts.ParameterContextsResult;
import com.hzya.frame.nifi.model.resultprocessgroups.ProcessgroupsResult;
import com.hzya.frame.nifi.model.resultprocessgroupsinfo.ProcessGroupsInfoResult9;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@ -39,14 +44,17 @@ public class NifiApiService {
}
/**
* 查询全部流程组或指定流程组
* 查询全部流程
*/
public Object queryFlowProcessGroupsRoot(String flowGroupId) throws Exception {
if (flowGroupId != null) {
return client.get(StrUtil.format("/flow/process-groups/{}", flowGroupId), ProcessGroupsId.class);
} else {
return client.get("/flow/process-groups/root", ProcessGroupsRoot.class);
}
public ProcessGroupsRoot queryFlowProcessGroupsRoot() throws Exception {
return client.get("/flow/process-groups/root", ProcessGroupsRoot.class);
}
/**
* 查询指定流程
*/
public ProcessGroupsId queryFlowProcessGroupsRoot(String flowGroupId) throws Exception {
return client.get(StrUtil.format("/flow/process-groups/{}", flowGroupId), ProcessGroupsId.class);
}
/**
@ -78,9 +86,45 @@ public class NifiApiService {
}
/**
* 删除参数上下文,待query参数
* 删除参数上下文
*/
public ParameterContextsResult deleteParameterContexts(String id, Map<String, String> queryParams) throws Exception {
return client.delete(StrUtil.format("/parameter-contexts/{}", id), queryParams, ParameterContextsResult.class);
}
/**
* 查询流程组详情
*/
public ProcessGroupsInfoResult9 getProcessGroups(String processGroups) throws Exception {
return client.get(StrUtil.format("/process-groups/{}", processGroups), ProcessGroupsInfoResult9.class);
}
/**
* 创建流程组
*/
public ProcessgroupsResult createProcessGroups(String parentGroupId, ProcessGroupsJoin processGroupsJoin) throws Exception {
return client.post(StrUtil.format("/process-groups/{}/process-groups", parentGroupId), processGroupsJoin, ProcessgroupsResult.class);
}
/**
* 删除流程组
*/
public ProcessGroupsInfoResult9 deleteProcessGroups(String parentGroupId, Map<String, String> queryParams) throws Exception {
return client.delete(StrUtil.format("/process-groups/{}", parentGroupId), queryParams, ProcessGroupsInfoResult9.class);
}
/**
* 启动或停止流程组
*/
public ProcessGroupsInfoResult9 startOrStopProcessGroups(StartOrStopProcessGroupsInfoJoin10 startOrStopProcessGroupsInfoJoin10) throws Exception {
return client.put(StrUtil.format("/flow/process-groups/{}", startOrStopProcessGroupsInfoJoin10.getId()), startOrStopProcessGroupsInfoJoin10, ProcessGroupsInfoResult9.class);
}
/**
* 给流程组绑定参数上下文环境或者给流程组更新上下文环境
*/
public ProcessGroupsInfoResult9 bindParameterContexts(BindParameterContextsJoin11 bindParameterContextsJoin) throws Exception {
String id = bindParameterContextsJoin.getComponent().getId();
return client.put(StrUtil.format("/process-groups/{}", id), bindParameterContextsJoin, ProcessGroupsInfoResult9.class);
}
}