build(base-buildpackage): 添加 dev环境配置文件

- 新增 application-liuy.yml 文件,包含 dev 环境的配置信息
- 配置内容包括日志设置、数据源配置、文件保存路径等
- 添加了与 CBS8、OA 等系统的集成配置
- 包含了 NiFi API 的访问信息
This commit is contained in:
liuy 2025-05-20 08:59:07 +08:00
parent 7d60bdeef4
commit d7413fb99f
3 changed files with 990 additions and 0 deletions

View File

@ -18,6 +18,12 @@
<artifactId>base-webapp</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>com.hzya.frame</groupId>
<artifactId>fw-nifi</artifactId>
<version>0.0.1-SNAPSHOT</version>
<scope>test</scope>
</dependency>
</dependencies>
<profiles>
@ -32,6 +38,12 @@
</profile>
<profile>
<id>liuy</id> <!--刘洋-->
<properties>
<profile.active>liuy</profile.active>
</properties>
</profile>
<profile>
<id>llg</id> <!--吕磊钢-->
<properties>

View File

@ -0,0 +1,48 @@
#######################dev环境#######################
logging:
#日志级别 指定目录级别
level:
root: warn
encodings: UTF-8
file:
# 日志保存路径
path: /Users/liuyang/workspaces/hzya/fw-nifi/zt-log
spring:
datasource:
dynamic:
datasource:
master:
url: jdbc:mysql://ufidahz.com.cn:9014/businesscenter?serverTimezone=UTC&useUnicode=true&characterEncoding=UTF8&serverTimezone=GMT%2B8&zeroDateTimeBehavior=convertToNull&useSSL=false&allowLoadLocalInfile=false&autoReconnect=true&failOverReadOnly=false&connectTimeout=30000&socketTimeout=30000&autoReconnectForPools=true
username: root
password: bd993088e8a7c3dc5f44441617f9b4bf
driver-class-name: com.mysql.jdbc.Driver # 3.2.0开始支持SPI可省略此配置
savefile:
# 文件保存路径
path: /Users/liuyang/workspaces/hzya/fw-nifi/zt-file
# path: D:\webservice\file
# pluginpath: D:\webservice\plugin
pluginpath: /home/webservice/zt/plugin
# tomcatpath: D:\apache-tomcat-9.0.69\webapps\kangarooDataCenter\WEB-INF\classes\
tomcatpath: /home/webservice/zt/tomcatV3/webapps/kangarooDataCenterV3/WEB-INF/classes/
cbs8:
appId: 1P4AGrpz
appSecret: 2c2369ae5dc04382844bbe3a5abf39e1bea9cd3a
url: https://cbs8-openapi-reprd.csuat.cmburl.cn
# 测试用这个 这个是银行给的,和下面的公钥不是一对密钥
ya_private_key: 83BA7EC821D35F4CB31FF9A51C1EFA520FC52AF828C2337F88E91CF119B07F44
# 这个私钥到时候上传到cbs和下面到是同一对
#ya_private_key: e1eacfdee9b8d4184437d5a2071e17ce31befc3d93395f9f05709ed562e8dc46
ya_public_key: 044fa399d2223760f17b81b863cb482b009294c4516f8a605dea1475ec09e720eaa98468715e5ad509a592a0b426061551c5a3df236966c23253a7d894eac0dcde
cbs_public_key: 0469146F06BF3B01236E84632441E826
#电子回单下载临时存放位置
elec_path: /Users/xiangerlin/Downloads/
OA:
data_source_code: yc_oa
zt:
url: http://127.0.0.1:9082/kangarooDataCenterV3/entranceController/externalCallInterface
nifi:
api:
url: https://192.168.2.233:8443/nifi-api
username: hzya
password: hzya1314*nifi

View File

@ -0,0 +1,930 @@
package com.hzya.frame;
import com.hzya.frame.nifiapi.model.joinbindparametercontexts.BindParameterContextsJoin11;
import com.hzya.frame.nifiapi.model.joinbindparametercontexts.Component11;
import com.hzya.frame.nifiapi.model.joinbindparametercontexts.ParameterContext11;
import com.hzya.frame.nifiapi.model.joinbindparametercontexts.Revision11;
import com.hzya.frame.nifiapi.model.joincontrollerenabled.EnOrDiControllerServices12;
import com.hzya.frame.nifiapi.model.joincontrollerenabled.Revision13;
import com.hzya.frame.nifiapi.model.joincreateconnection.CreateConnection18;
import com.hzya.frame.nifiapi.model.joincreateconnections.Component18;
import com.hzya.frame.nifiapi.model.joincreateconnections.CreateConnections18;
import com.hzya.frame.nifiapi.model.joincreateconnections.Revision18;
import com.hzya.frame.nifiapi.model.joincreateconnections.SourceOrDestination18;
import com.hzya.frame.nifiapi.model.joincreatetemp.CreateTemplateJoin;
import com.hzya.frame.nifiapi.model.joingetcontroller.Component12;
import com.hzya.frame.nifiapi.model.joingetcontroller.ControllerService12;
import com.hzya.frame.nifiapi.model.joingetcontroller.Revision12;
import com.hzya.frame.nifiapi.model.joinparametercontexts.*;
import com.hzya.frame.nifiapi.model.joinprocessgroups.Component7;
import com.hzya.frame.nifiapi.model.joinprocessgroups.ProcessGroupsJoin;
import com.hzya.frame.nifiapi.model.joinprocessgroups.Revision5;
import com.hzya.frame.nifiapi.model.joinsnippetinstance.SnippetInstanceJoin;
import com.hzya.frame.nifiapi.model.joinsnippets.Snippet;
import com.hzya.frame.nifiapi.model.joinsnippets.SnippetsJoin;
import com.hzya.frame.nifiapi.model.joinstartorstopprocessgroup.Revision10;
import com.hzya.frame.nifiapi.model.joinstartorstopprocessgroup.StartOrStopProcessGroupsInfoJoin10;
import com.hzya.frame.nifiapi.model.joinupdateprocessor.Revision17;
import com.hzya.frame.nifiapi.model.joinupdateprocessor.RunStatusOrStop17;
import com.hzya.frame.nifiapi.model.nifitemplates.NifiTemplates;
import com.hzya.frame.nifiapi.model.resultcreateprocessor.*;
import com.hzya.frame.nifiapi.model.resultcreateprocessors.CreateProcess16;
import com.hzya.frame.nifiapi.model.resultcreatesnippet.Flow15;
import com.hzya.frame.nifiapi.model.resultcreatesnippet.ProcessGroups15;
import com.hzya.frame.nifiapi.model.resultcreatesnippet.SnippetInstance15;
import com.hzya.frame.nifiapi.model.resultcreatetemplate.NewNifiTemplatete14;
import com.hzya.frame.nifiapi.model.resultcreatetemplate.Template14;
import com.hzya.frame.nifiapi.model.resultparametercontexts.*;
import com.hzya.frame.nifiapi.model.resultprocessgroups.ProcessgroupsResult;
import com.hzya.frame.nifiapi.model.resultprocessgroupsinfo.ProcessGroupsInfoResult9;
import com.hzya.frame.nifiapi.model.resultprocessgroupsinfo.Revision9;
import com.hzya.frame.nifiapi.model.resultprocessorsinfo.Component16;
import com.hzya.frame.nifiapi.model.resultprocessorsinfo.ProcessorsInfo16;
import com.hzya.frame.nifiapi.model.resultprocessorsinfo.Revision16;
import com.hzya.frame.nifiapi.model.resultsnippets.Snippet13;
import com.hzya.frame.nifiapi.model.resultsnippets.SnippetResult13;
import com.hzya.frame.nifiapi.service.NifiApiService;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.io.FileOutputStream;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* nifi单元测试以及使用案例
*
* @Authorliuyang
* @Packagecom.hzya.frame
* @Projectfw-nifi
* @nameTestNifiService
* @Date2025/5/14 11:18
* @FilenameTestNifiService
*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {WebappApplication.class})
public class TestNifiServiceApi {
@Autowired
private NifiApiService nifiApiService;
/**
* 获取nifi token
*/
@Test
public void getAccessToken() {
try {
String accessToken = nifiApiService.getAccessToken();
System.out.println(accessToken);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 查询指定流程组或全部流程组
*/
@Test
public void testQueryFlowProcessGroupsRoot() {
try {
// nifiApiService.queryFlowProcessGroupsRoot(null);
Object o = nifiApiService.queryFlowProcessGroupsRoot("41437b87-0196-1000-6951-50e41e75c0da");
System.out.println(o);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 新增参数上下文环境
*/
@Test
public void testCreateParameterContexts() {
try {
Revision revision = new Revision();
revision.setVersion(0);
Parameter2 parameter = new Parameter2();
parameter.setSensitive(false);
parameter.setValue("4");
parameter.setDescription("请求类型");
parameter.setName("sceneIden");
Parameters2 parameters = new Parameters2();
parameters.setParameter(parameter);
List<Parameters2> parametersList = new ArrayList<>();
parametersList.add(parameters);
Component3 component = new Component3();
component.setName("测试创建参数环境");
component.setParameters(parametersList);
ParameterContextsJoin parameterContextsJoin = new ParameterContextsJoin();
parameterContextsJoin.setComponent(component);
parameterContextsJoin.setRevision(revision);
ParameterContextsResult parameterContexts = nifiApiService.createParameterContexts(parameterContextsJoin);
System.out.println(parameterContexts);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 查询参数上下文环境
*/
@Test
public void testGetParameterContexts() {
try {
ParameterContextsResult parameterContexts = nifiApiService.getParameterContexts("ce640660-0196-1000-b81a-efea19693a24");
System.out.println(parameterContexts);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 修改参数上下文环境
*/
@Test
public void testUpdateParameterContexts() {
try {
//先查询版本
ParameterContextsResult parameterContexts = nifiApiService.getParameterContexts("ce640660-0196-1000-b81a-efea19693a24");
Parameter parameter = new Parameter();
parameter.setSensitive("false");
parameter.setValue("9922129");
parameter.setDescription("212请求类型22");
parameter.setName("sceneIden2121");
Parameter parameter2 = new Parameter();
parameter2.setSensitive("false");
parameter2.setValue("999");
parameter2.setDescription("请求类型22");
parameter2.setName("sceneIden");
Parameters parameters = new Parameters();
parameters.setParameter(parameter);
// parameters.setParameter(parameter2);
Parameters parameters2 = new Parameters();
parameters2.setParameter(parameter2);
List<Parameters> parametersList = new ArrayList<>();
parametersList.add(parameters);
parametersList.add(parameters2);
Revision4 revision1 = parameterContexts.getRevision();
int version = revision1.getVersion();
Revision4 revision = new Revision4();
revision.setVersion(version);
Component6 component = new Component6();
component.setParameters(parametersList);
component.setName("测试创建参数环境2222");
component.setId(parameterContexts.getId());
ParameterContextsResult parameterContextsJoin = new ParameterContextsResult();
parameterContextsJoin.setRevision(revision);
parameterContextsJoin.setId(parameterContexts.getId());
parameterContextsJoin.setComponent(component);
ParameterContextsResult parameterContextsResult = nifiApiService.updateParameterContexts(parameterContextsJoin);
System.out.println(parameterContextsResult);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 删除上下文参数环境
*/
@Test
public void testDeleteParameterContexts() {
try {
//先查询版本
ParameterContextsResult parameterContexts = nifiApiService.getParameterContexts("d1bed8c7-0196-1000-2d90-5490f160703a");
Revision4 revision = parameterContexts.getRevision();
int version = revision.getVersion();
Map<String, String> queryParams = new HashMap<>();
queryParams.put("version", String.valueOf(version));
ParameterContextsResult parameterContextsResult = nifiApiService.deleteParameterContexts(parameterContexts.getId(), queryParams);
System.out.println(parameterContextsResult.getId());
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 创建空的流程组
*/
@Test
public void testCreateProcessGroups() throws Exception {
try {
// ProcessGroupStatus processGroupStatus = new ProcessGroupStatus();
// processGroupStatus.setName("测试流程组");
Revision5 revision5 = new Revision5();
revision5.setVersion(0);
Component7 component7 = new Component7();
component7.setName("测试流程组");
ProcessGroupsJoin processGroupsJoin = new ProcessGroupsJoin();
// processGroupsJoin.setStatus(processGroupStatus);
processGroupsJoin.setRevision(revision5);
processGroupsJoin.setComponent(component7);
//父处理器组
String parentGroupId = "76e23115-ad84-1aab-7630-21c76aa3973d";
ProcessgroupsResult processGroups = nifiApiService.createProcessGroups(parentGroupId, processGroupsJoin);
System.out.println("流程组id" + processGroups.getId());
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 删除流程组
*/
@Test
public void testDeleteProcessGroups() throws Exception {
try {
//查询指定流程组,获得版本
// ProcessGroupsId processGroupsRoot = (ProcessGroupsId) nifiApiService.queryFlowProcessGroupsRoot("d1f318c0-0196-1000-6525-813ea2d8a7c8");
//查询指定流程组的详情
ProcessGroupsInfoResult9 processGroups = nifiApiService.getProcessGroups("d6efaa35-0196-1000-d82c-348f4e1339ea");
Revision9 revision = processGroups.getRevision();
System.out.println(revision.getVersion());
//执行删除流程组
Map<String, String> queryParams = new HashMap<>();
queryParams.put("version", revision.getVersion());
ProcessGroupsInfoResult9 processGroupsInfoResult9 = nifiApiService.deleteProcessGroups(processGroups.getId(), queryParams);
System.out.println("被删除的流程组id" + processGroupsInfoResult9.getId());
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 启动或停止流程组
*/
@Test
public void testStartOrStopProcessGroups() throws Exception {
try {
//查询指定流程组的详情
ProcessGroupsInfoResult9 processGroups = nifiApiService.getProcessGroups("d70315c5-0196-1000-ecce-245ae0e5b43b");
Revision9 revision = processGroups.getRevision();
// System.out.println(revision.getVersion());
Revision10 revision10 = new Revision10();
revision10.setVersion(revision.getVersion());
StartOrStopProcessGroupsInfoJoin10 startOrStopProcessGroupsInfoJoin10 = new StartOrStopProcessGroupsInfoJoin10();
startOrStopProcessGroupsInfoJoin10.setId(processGroups.getId());
// startOrStopProcessGroupsInfoJoin10.setState("RUNNING");
startOrStopProcessGroupsInfoJoin10.setState("STOPPED");
startOrStopProcessGroupsInfoJoin10.setRevision(revision10);
ProcessGroupsInfoResult9 processGroupsInfoResult9 = nifiApiService.startOrStopProcessGroups(startOrStopProcessGroupsInfoJoin10);
System.out.println("流程组id" + processGroupsInfoResult9.getId());
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 给流程组绑定参数上下文环境或者给流程组更新上下文环境
*/
@Test
public void testBindParameterContexts() throws Exception {
try {
//查询指定流程组的详情
ProcessGroupsInfoResult9 processGroups = nifiApiService.getProcessGroups("d70315c5-0196-1000-ecce-245ae0e5b43b");
Revision9 revision = processGroups.getRevision();
//流程组的版本号
Revision11 revision11 = new Revision11();
revision11.setVersion(revision.getVersion());
//参数上下文对象id
ParameterContext11 parameterContext11 = new ParameterContext11();
parameterContext11.setId("d71285a9-0196-1000-80fc-41b9af5a8c6a");
//指定的组件对象
Component11 component11 = new Component11();
component11.setId(processGroups.getId());
component11.setParameterContext(parameterContext11);
BindParameterContextsJoin11 bindParameterContextsJoin11 = new BindParameterContextsJoin11();
bindParameterContextsJoin11.setComponent(component11);
bindParameterContextsJoin11.setRevision(revision11);
ProcessGroupsInfoResult9 processGroupsInfoResult9 = nifiApiService.bindParameterContexts(bindParameterContextsJoin11);
System.out.println("流程组id" + processGroupsInfoResult9.getId());
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 获取控制器服务
*/
@Test
public void testGetControllerServices() {
try {
ControllerService12 controllerServices = nifiApiService.getControllerServices("0196107e-f933-1f70-d8db-53adf9ed7eda");
System.out.println(controllerServices.getId());
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 控制器服务-Mysql数据源
*/
@Test
public void testCreateControllerServices() {
try {
//A revision of 0 must be specified when creating a new Controller service.
Revision12 revision12 = new Revision12();
revision12.setVersion("0");
//创建Mysql数据源
Map<String, String> properties = new HashMap<>();
properties.put("Database Connection URL", "jdbc:mysql://ufidahz.com.cn:9014/apache_nifi_sync?useSSL=false&serverTimezone=Asia/Shanghai&characterEncoding=utf8&useLegacyDatetimeCode=false");
properties.put("Database Driver Class Name", "com.mysql.cj.jdbc.Driver");
properties.put("database-driver-locations", "/data/nifi-1.28.1/lib/mysql-connector-java-8.0.30.jar");
properties.put("Database User", "root");
properties.put("Password", "hzya1314");
properties.put("Max Wait Time", "600000 millis");
properties.put("Max Total Connections", "15");
properties.put("Validation-query", "select 1");
properties.put("dbcp-min-idle-conns", "5");
properties.put("dbcp-max-idle-conns", "10");
properties.put("dbcp-max-conn-lifetime", "-1");
properties.put("dbcp-time-between-eviction-runs", "-1");
properties.put("dbcp-min-evictable-idle-time", "30 mins");
properties.put("dbcp-soft-min-evictable-idle-time", "-1");
Component12 component12 = new Component12();
component12.setName("新建Mysql数据源");
component12.setType("org.apache.nifi.dbcp.DBCPConnectionPool");
component12.setState("ENABLED");
component12.setProperties(properties);
component12.setComments("一个备注");
component12.setBulletinLevel("DEBUG");
ControllerService12 controllerService11 = new ControllerService12();
controllerService11.setComponent(component12);
controllerService11.setRevision(revision12);
ControllerService12 controllerServices = nifiApiService.createControllerServices("d7f1b14d-0196-1000-ce43-7499a84f6096", controllerService11);
System.out.println(controllerServices.getId());
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 控制器服务-Oracle数据源
*/
@Test
public void testCreateControllerServices2() {
try {
//A revision of 0 must be specified when creating a new Controller service.
Revision12 revision12 = new Revision12();
revision12.setVersion("0");
//新建Oracle数据源
Map<String, String> properties2 = new HashMap<>();
properties2.put("Database Connection URL", "jdbc:oracle:thin:@39.170.109.90:1521:orcl");
properties2.put("Database Driver Class Name", "oracle.jdbc.OracleDriver");
properties2.put("database-driver-locations", "/data/nifi-1.28.1/lib/ojdbc8.jar");
properties2.put("Database User", "U8C241231");
properties2.put("Password", "U8C241231");
properties2.put("Max Wait Time", "600000 millis");
properties2.put("Max Total Connections", "20");
properties2.put("Validation-query", "select 1 from dual");
properties2.put("dbcp-min-idle-conns", "5");
properties2.put("dbcp-max-idle-conns", "10");
properties2.put("dbcp-max-conn-lifetime", "-1");
properties2.put("dbcp-time-between-eviction-runs", "-1");
properties2.put("dbcp-min-evictable-idle-time", "30 mins");
properties2.put("dbcp-soft-min-evictable-idle-time", "-1");
Component12 component123 = new Component12();
component123.setName("新建Oracle数据源");
component123.setType("org.apache.nifi.dbcp.DBCPConnectionPool");
component123.setState("ENABLED");
component123.setProperties(properties2);
component123.setComments("一个备注");
component123.setBulletinLevel("DEBUG");
ControllerService12 controllerService11 = new ControllerService12();
controllerService11.setComponent(component123);
controllerService11.setRevision(revision12);
ControllerService12 controllerServices = nifiApiService.createControllerServices("d7f1b14d-0196-1000-ce43-7499a84f6096", controllerService11);
System.out.println(controllerServices.getId());
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 控制器服务-JsonTreeReaderJsonTreeReader 是一种 Controller Service主要用于解析 JSON 数据将其转换为结构化的记录Records
* Records NiFi 中的一种数据抽象表示一组结构化的数据记录类似于数据库中的行或 JSON 对象每个 Record 由多个字段Fields组成
* 每个字段有名称类型和值遵循用户定义的 Schema
* 一个 Record 可以看作一个键值对集合例如 { "name": "Alice", "age": 30 }
* 多个 Records 组成一个 Record Set表示一批数据 JSON 数组 [ { "name": "Alice" }, { "name": "Bob" } ]
*/
@Test
public void testCreateControllerServices3() {
try {
//A revision of 0 must be specified when creating a new Controller service.
Revision12 revision12 = new Revision12();
revision12.setVersion("0");
//新建Oracle数据源
Map<String, String> properties2 = new HashMap<>();
properties2.put("schema-access-strategy", "schema-text-property");
properties2.put("schema-name", "${schema.name}");
properties2.put("schema-text", "${fieldMapping}");
properties2.put("starting-field-strategy", "ROOT_NODE");
properties2.put("Max String Length", "500 MB");
properties2.put("Allow Comments", "false");
Component12 component123 = new Component12();
component123.setName("新建JsonTreeReader");
component123.setType("org.apache.nifi.json.JsonTreeReader");
component123.setState("ENABLED");
component123.setProperties(properties2);
component123.setComments("一个备注");
component123.setBulletinLevel("DEBUG");
ControllerService12 controllerService11 = new ControllerService12();
controllerService11.setComponent(component123);
controllerService11.setRevision(revision12);
ControllerService12 controllerServices = nifiApiService.createControllerServices("d7f1b14d-0196-1000-ce43-7499a84f6096", controllerService11);
System.out.println(controllerServices.getId());
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 控制器服务-JsonRecordSetWriter
* 用于将 NiFi Record 数据集Record Set转换为 JSON 格式的输出
*/
@Test
public void testCreateControllerServices4() {
try {
//A revision of 0 must be specified when creating a new Controller service.
Revision12 revision12 = new Revision12();
revision12.setVersion("0");
//新建Oracle数据源
Map<String, String> properties2 = new HashMap<>();
properties2.put("Schema Write Strategy", "no-schema");
properties2.put("schema-protocol-version", "1");
properties2.put("schema-access-strategy", "schema-text-property");
properties2.put("schema-name", "${schema.name}");
properties2.put("schema-text", "${fieldMapping}");
properties2.put("Pretty Print JSON", "false");
properties2.put("suppress-nulls", "never-suppress");
properties2.put("Allow Scientific Notation", "true");
properties2.put("output-grouping", "output-array");
properties2.put("compression-format", "none");
properties2.put("compression-level", "1");
Component12 component123 = new Component12();
component123.setName("新建JsonRecordSetWriter");
component123.setType("org.apache.nifi.json.JsonRecordSetWriter");
component123.setState("ENABLED");
component123.setProperties(properties2);
component123.setComments("一个备注2");
component123.setBulletinLevel("DEBUG");
ControllerService12 controllerService11 = new ControllerService12();
controllerService11.setComponent(component123);
controllerService11.setRevision(revision12);
ControllerService12 controllerServices = nifiApiService.createControllerServices("0196107c-f933-1f70-b5a3-634484bed74a", controllerService11);
System.out.println(controllerServices.getId());
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* AvroReader 是一个 Apache NiFi 1.28.1 中的 Controller Service用于读取 Avro 格式的结构化数据
* 并将其转换为 NiFi Record 数据集它主要用于数据输入场景帮助 Processor 处理 Avro 文件或流数据
*/
@Test
public void testCreateControllerServices5() {
try {
//A revision of 0 must be specified when creating a new Controller service.
Revision12 revision12 = new Revision12();
revision12.setVersion("0");
//新建Oracle数据源
Map<String, String> properties2 = new HashMap<>();
properties2.put("schema-access-strategy", "embedded-avro-schema");
properties2.put("cache-size", "1000");
Component12 component123 = new Component12();
component123.setName("新建AvroReader");
component123.setType("org.apache.nifi.avro.AvroReader");
component123.setState("ENABLED");
component123.setProperties(properties2);
component123.setComments("99999");
component123.setBulletinLevel("DEBUG");
ControllerService12 controllerService11 = new ControllerService12();
controllerService11.setComponent(component123);
controllerService11.setRevision(revision12);
ControllerService12 controllerServices = nifiApiService.createControllerServices("0196107c-f933-1f70-b5a3-634484bed74a", controllerService11);
System.out.println(controllerServices.getId());
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 停用或启用控制器服务
*/
@Test
public void testEnabledOrDisabledControllerServices() {
try {
// Revision13 controllerServiceStatus = new Revision13();
// //启用
// controllerServiceStatus.setRunStatus("ENABLED");
// //停用
//// controllerServiceStatus.setRunStatus("DISABLED");
//
// EnOrDiControllerServices enOrDiControllerServices = new EnOrDiControllerServices();
// enOrDiControllerServices.setId("0196107e-f933-1f70-d8db-53adf9ed7eda");
// enOrDiControllerServices.setStatus(controllerServiceStatus);
//查询控制器详情得到最新的版本号
ControllerService12 controllerServices = nifiApiService.getControllerServices("0196107e-f933-1f70-d8db-53adf9ed7eda");
Revision12 revision = controllerServices.getRevision();
Revision13 revision13 = new Revision13();
revision13.setVersion(revision.getVersion());
EnOrDiControllerServices12 enOrDiControllerServices = new EnOrDiControllerServices12();
// enOrDiControllerServices.setState("ENABLED");
enOrDiControllerServices.setState("DISABLED");
enOrDiControllerServices.setRevision(revision13);
ControllerService12 controllerService12 = nifiApiService.enabledOrDisabledControllerServices(controllerServices.getId(), enOrDiControllerServices);
System.out.println(controllerService12.getId());
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 删除控制器服务
*/
@Test
public void testDeleteControllerServices() {
try {
//查询最新的版本号
ControllerService12 controllerServices = nifiApiService.getControllerServices("0196107e-f933-1f70-d8db-53adf9ed7eda");
Revision12 revision = controllerServices.getRevision();
Map<String, String> stringStringMap = new HashMap<>();
stringStringMap.put("version", revision.getVersion());
ControllerService12 controllerService12 = nifiApiService.deleteControllerServices("0196107e-f933-1f70-d8db-53adf9ed7eda", stringStringMap);
System.out.println(controllerService12.getId());
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 获取所有xml模板列表
*/
@Test
public void testGetAllXmlTemplates() throws Exception {
try {
NifiTemplates allXmlTemplates = nifiApiService.getAllXmlTemplates();
System.out.println(allXmlTemplates.getTemplates());
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 获取指定模板内容并保存到xml文件
* getXmlTemplatesContent方法只提供InputStream需要调用者按业务逻辑自行处理
*/
@Test
public void testGetXmlTemplatesContent() throws Exception {
try {
String id = "d140afb6-c357-4136-8d0f-58c59f09ea6b";
String outputFilePath = "/Users/liuyang/workspaces/hzya/fw-nifi/aaa.xml";
// InputStream xmlTemplatesContent = nifiApiService.getXmlTemplatesContent(id);
// System.out.println(xmlTemplatesContent);
try (InputStream xmlTemplatesContent = nifiApiService.getXmlTemplatesContent(id); FileOutputStream outputStream = new FileOutputStream(outputFilePath)) {
byte[] buffer = new byte[8192];
int bytesRead;
while ((bytesRead = xmlTemplatesContent.read(buffer)) != -1) {
outputStream.write(buffer, 0, bytesRead);
}
System.out.println("XML文件已保存到: " + outputFilePath);
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 把某个流程组转换为模板
*/
@Test
public void testCreateTemplate() throws Exception {
try {
CreateTemplateJoin createTemplateJoin = new CreateTemplateJoin();
createTemplateJoin.setName("新模板名称");//模版名称
createTemplateJoin.setSnippetId("019610ca-f933-1f70-1c03-4b2b24652c0f");//代码片段标识符
String id = "0196107c-f933-1f70-b5a3-634484bed74a";//目标流程组id
nifiApiService.createNifiTemplate(id, createTemplateJoin);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 把指定流程组创建为代码段并创建代码片段
* Snippets是NiFi内部用于标识流程组内容的临时快照
*/
@Test
public void testCreateSnippets() {
try {
ProcessGroupsInfoResult9 processGroups = nifiApiService.getProcessGroups("0196107c-f933-1f70-b5a3-634484bed74a");
Revision9 revision = processGroups.getRevision();
Map<String, String> processGroupsMap2 = new HashMap<>();
processGroupsMap2.put("version", revision.getVersion());//目标流程组版本号
Map<String, Map<String, String>> processGroupsMap = new HashMap<>();
processGroupsMap.put("0196107c-f933-1f70-b5a3-634484bed74a", processGroupsMap2);//目标流程组id
Snippet snippet = new Snippet();
snippet.setParentGroupId("76e23115-ad84-1aab-7630-21c76aa3973d");//目标流程组id的父流程组id
snippet.setProcessGroups(processGroupsMap);
SnippetsJoin snippetsJoin = new SnippetsJoin();
snippetsJoin.setSnippet(snippet);
SnippetResult13 snippets = nifiApiService.createSnippets(snippetsJoin);
Snippet13 snippet1 = snippets.getSnippet();
System.out.println(snippet1.getId());
//根据代码片段创建模板
CreateTemplateJoin createTemplateJoin = new CreateTemplateJoin();
createTemplateJoin.setName("新模板名称222");//模版名称
createTemplateJoin.setSnippetId(snippet1.getId());//代码片段标识符
String id = "0196107c-f933-1f70-b5a3-634484bed74a";//目标流程组id
NewNifiTemplatete14 nifiTemplate = nifiApiService.createNifiTemplate(id, createTemplateJoin);
Template14 template = nifiTemplate.getTemplate();
System.out.println("新模板主键:" + template.getId());
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 将代码片段实例化到指定位置
*/
@Test
public void testCreateSnippetInstance() throws Exception {
try {
//现请求得到代码片段id
ProcessGroupsInfoResult9 processGroups = nifiApiService.getProcessGroups("60792725-0196-1000-cda3-3584147c4535");
Revision9 revision = processGroups.getRevision();
Map<String, String> processGroupsMap2 = new HashMap<>();
processGroupsMap2.put("version", revision.getVersion());//目标流程组版本号
Map<String, Map<String, String>> processGroupsMap = new HashMap<>();
processGroupsMap.put("60792725-0196-1000-cda3-3584147c4535", processGroupsMap2);//目标流程组id
Snippet snippet = new Snippet();
snippet.setParentGroupId("76e23100-ad84-1aab-4447-b9ee936ee7d5");//目标流程组id的父流程组id
snippet.setProcessGroups(processGroupsMap);
SnippetsJoin snippetsJoin = new SnippetsJoin();
snippetsJoin.setSnippet(snippet);
SnippetResult13 snippets = nifiApiService.createSnippets(snippetsJoin);
Snippet13 snippet1 = snippets.getSnippet();
// System.out.println(snippet1.getId());
//将片段实例化到指定位置
String groupId = "019610d1-f933-1f70-16ea-3cf005012c4b";//流程组id
SnippetInstanceJoin snippetInstanceJoin = new SnippetInstanceJoin();
snippetInstanceJoin.setSnippetId(snippet1.getId());
snippetInstanceJoin.setOriginY("0");
snippetInstanceJoin.setOriginX("0");
SnippetInstance15 snippetInstance = nifiApiService.createSnippetInstance(groupId, snippetInstanceJoin);
Flow15 flow = snippetInstance.getFlow();
ProcessGroups15 processGroups15 = flow.getProcessGroups().get(0);
System.out.println(processGroups15.getId());
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 创建GenerateFlowFile处理器
*/
@Test
public void testCreateProcessor() throws Exception {
try {
Revision15 revision15 = new Revision15();
revision15.setVersion("0");
Component15 component15 = new Component15();
component15.setType("org.apache.nifi.processors.standard.GenerateFlowFile");
component15.setName("定时调度");
Position15 position15 = new Position15();
position15.setX("0");
position15.setY("0");
Config15 config15 = new Config15();
config15.setSchedulingPeriod("1 min");
Properties15 properties15 = new Properties15();
properties15.setFileSize("1KB");
config15.setProperties(properties15);
component15.setConfig(config15);
CreateProcessorJoin15 createProcessorJoin15 = new CreateProcessorJoin15();
createProcessorJoin15.setComponent(component15);
createProcessorJoin15.setRevision(revision15);
String groupId = "019610d1-f933-1f70-16ea-3cf005012c4b";
CreateProcess16 processor = nifiApiService.createProcessor(groupId, createProcessorJoin15);
// String id = processor.getId();
System.out.println("处理器id" + processor.getId());
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 查询处理器详情
*/
@Test
public void testGetProcessor() throws Exception {
try {
// String processId = "01961137-f933-1f70-4f08-6fc7041f99bd";
String processId2 = "ad841aed-30f7-16e2-4fa0-595ab0ab9fdc";
ProcessorsInfo16 processor = nifiApiService.getProcessor(processId2);
String id = processor.getId();
System.out.println(id);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 更新处理器
*/
@Test
public void testUpdateProcessor() {
try {
//查询处理器版本号
String processId2 = "01961137-f933-1f70-4f08-6fc7041f99bd";
ProcessorsInfo16 processor = nifiApiService.getProcessor(processId2);
Revision16 revision16 = processor.getRevision();
String version = revision16.getVersion();
Revision16 revision161 = new Revision16();
revision161.setVersion(version);
Component16 component16 = new Component16();
component16.setId(processId2);
component16.setName("定时调度222");//修改名称
ProcessorsInfo16 processorsInfo16 = new ProcessorsInfo16();
processorsInfo16.setRevision(revision161);
processorsInfo16.setComponent(component16);
ProcessorsInfo16 processor2 = nifiApiService.updateProcessor(processId2, processorsInfo16);
String id = processor2.getId();
System.out.println(id);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 启动或停止处理器
*/
@Test
public void testRunStatusOrStop() throws Exception {
try {
//查询处理器版本号
String processId2 = "01961137-f933-1f70-4f08-6fc7041f99bd";
ProcessorsInfo16 processor = nifiApiService.getProcessor(processId2);
Revision16 revision16 = processor.getRevision();
String version = revision16.getVersion();
Revision17 revision17 = new Revision17();
revision17.setVersion(version);
//修改处理器
String processId = "01961137-f933-1f70-4f08-6fc7041f99bd";
RunStatusOrStop17 runStatusOrStop17 = new RunStatusOrStop17();
runStatusOrStop17.setState("RUNNING");//启动
// runStatusOrStop17.setState("STOPPED");//停止
runStatusOrStop17.setRevision(revision17);
ProcessorsInfo16 processorsInfo16 = nifiApiService.runStatusOrStop(processId, runStatusOrStop17);
String id = processorsInfo16.getId();
System.out.println(id);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 删除处理器
*/
@Test
public void testDeleteProcessor() throws Exception {
try {
//查询处理器版本号
String processId2 = "ad841aed-30f7-16e2-4fa0-595ab0ab9fdc";
ProcessorsInfo16 processor = nifiApiService.getProcessor(processId2);
Revision16 revision16 = processor.getRevision();
String version = revision16.getVersion();
Map<String, String> queryParams = new HashMap<>();
queryParams.put("version", version);
ProcessorsInfo16 processor2 = nifiApiService.deleteProcessor(processId2, queryParams);
String id = processor2.getId();
System.out.println(id);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 建立连接关系
*/
@Test
public void testCreateProcessorConnections() throws Exception {
try {
// ProcessGroupsInfoResult9 processGroups = nifiApiService.getProcessGroups("019610d1-f933-1f70-16ea-3cf005012c4b");
// Revision9 revision = processGroups.getRevision();
// System.out.println(revision.getVersion());
SourceOrDestination18 source = new SourceOrDestination18();
source.setId("01961137-f933-1f70-4f08-6fc7041f99bd");
source.setGroupId("019610d1-f933-1f70-16ea-3cf005012c4b");
source.setType("PROCESSOR");
SourceOrDestination18 destination = new SourceOrDestination18();
destination.setId("0196113c-f933-1f70-2318-883f9f6faffd");
destination.setGroupId("019610d1-f933-1f70-16ea-3cf005012c4b");
// destination.setType("INPUT_PORT");
destination.setType("PROCESSOR");
List<String> selectedRelationships = new ArrayList<>();
selectedRelationships.add("success");
Component18 component18 = new Component18();
component18.setSource(source);
component18.setDestination(destination);
component18.setSelectedRelationships(selectedRelationships);
Revision18 revision18 = new Revision18();
revision18.setVersion("0");//默认传递0
CreateConnections18 createConnections17 = new CreateConnections18();
createConnections17.setComponent(component18);
createConnections17.setRevision(revision18);
String processGroupsId = "019610d1-f933-1f70-16ea-3cf005012c4b";
CreateConnection18 processorConnections = nifiApiService.createProcessorConnections(processGroupsId, createConnections17);
System.out.println("连线id" + processorConnections.getId());
} catch (Exception e) {
e.printStackTrace();
}
}
}