From d7413fb99ff76f8d52011c1238cdf358cf73b87b Mon Sep 17 00:00:00 2001 From: liuy <37787198+LiuyCodes@users.noreply.github.com> Date: Tue, 20 May 2025 08:59:07 +0800 Subject: [PATCH] =?UTF-8?q?build(base-buildpackage):=20=E6=B7=BB=E5=8A=A0?= =?UTF-8?q?=20dev=E7=8E=AF=E5=A2=83=E9=85=8D=E7=BD=AE=E6=96=87=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 application-liuy.yml 文件,包含 dev 环境的配置信息 - 配置内容包括日志设置、数据源配置、文件保存路径等 - 添加了与 CBS8、OA 等系统的集成配置 - 包含了 NiFi API 的访问信息 --- base-buildpackage/pom.xml | 12 + .../src/main/resources/application-liuy.yml | 48 + .../com/hzya/frame/TestNifiServiceApi.java | 930 ++++++++++++++++++ 3 files changed, 990 insertions(+) create mode 100644 base-buildpackage/src/main/resources/application-liuy.yml create mode 100644 base-buildpackage/src/test/java/com/hzya/frame/TestNifiServiceApi.java diff --git a/base-buildpackage/pom.xml b/base-buildpackage/pom.xml index 2467d45d..ed64e54a 100644 --- a/base-buildpackage/pom.xml +++ b/base-buildpackage/pom.xml @@ -18,6 +18,12 @@ base-webapp ${revision} + + com.hzya.frame + fw-nifi + 0.0.1-SNAPSHOT + test + @@ -32,6 +38,12 @@ + + liuy + + liuy + + llg diff --git a/base-buildpackage/src/main/resources/application-liuy.yml b/base-buildpackage/src/main/resources/application-liuy.yml new file mode 100644 index 00000000..aea31169 --- /dev/null +++ b/base-buildpackage/src/main/resources/application-liuy.yml @@ -0,0 +1,48 @@ +#######################dev环境####################### +logging: + #日志级别 指定目录级别 + level: + root: warn + encodings: UTF-8 + file: + # 日志保存路径 + path: /Users/liuyang/workspaces/hzya/fw-nifi/zt-log +spring: + datasource: + dynamic: + datasource: + master: + url: jdbc:mysql://ufidahz.com.cn:9014/businesscenter?serverTimezone=UTC&useUnicode=true&characterEncoding=UTF8&serverTimezone=GMT%2B8&zeroDateTimeBehavior=convertToNull&useSSL=false&allowLoadLocalInfile=false&autoReconnect=true&failOverReadOnly=false&connectTimeout=30000&socketTimeout=30000&autoReconnectForPools=true + username: root + password: bd993088e8a7c3dc5f44441617f9b4bf + driver-class-name: com.mysql.jdbc.Driver # 3.2.0开始支持SPI可省略此配置 +savefile: + # 文件保存路径 + path: /Users/liuyang/workspaces/hzya/fw-nifi/zt-file +# path: D:\webservice\file +# pluginpath: D:\webservice\plugin + pluginpath: /home/webservice/zt/plugin +# tomcatpath: D:\apache-tomcat-9.0.69\webapps\kangarooDataCenter\WEB-INF\classes\ + tomcatpath: /home/webservice/zt/tomcatV3/webapps/kangarooDataCenterV3/WEB-INF/classes/ +cbs8: + appId: 1P4AGrpz + appSecret: 2c2369ae5dc04382844bbe3a5abf39e1bea9cd3a + url: https://cbs8-openapi-reprd.csuat.cmburl.cn + # 测试用这个 这个是银行给的,和下面的公钥不是一对密钥 + ya_private_key: 83BA7EC821D35F4CB31FF9A51C1EFA520FC52AF828C2337F88E91CF119B07F44 + # 这个私钥到时候上传到cbs,和下面到是同一对 + #ya_private_key: e1eacfdee9b8d4184437d5a2071e17ce31befc3d93395f9f05709ed562e8dc46 + ya_public_key: 044fa399d2223760f17b81b863cb482b009294c4516f8a605dea1475ec09e720eaa98468715e5ad509a592a0b426061551c5a3df236966c23253a7d894eac0dcde + cbs_public_key: 0469146F06BF3B01236E84632441E826 + #电子回单下载临时存放位置 + elec_path: /Users/xiangerlin/Downloads/ +OA: + data_source_code: yc_oa +zt: + url: http://127.0.0.1:9082/kangarooDataCenterV3/entranceController/externalCallInterface + +nifi: + api: + url: https://192.168.2.233:8443/nifi-api + username: hzya + password: hzya1314*nifi \ No newline at end of file diff --git a/base-buildpackage/src/test/java/com/hzya/frame/TestNifiServiceApi.java b/base-buildpackage/src/test/java/com/hzya/frame/TestNifiServiceApi.java new file mode 100644 index 00000000..95f6299c --- /dev/null +++ b/base-buildpackage/src/test/java/com/hzya/frame/TestNifiServiceApi.java @@ -0,0 +1,930 @@ +package com.hzya.frame; + +import com.hzya.frame.nifiapi.model.joinbindparametercontexts.BindParameterContextsJoin11; +import com.hzya.frame.nifiapi.model.joinbindparametercontexts.Component11; +import com.hzya.frame.nifiapi.model.joinbindparametercontexts.ParameterContext11; +import com.hzya.frame.nifiapi.model.joinbindparametercontexts.Revision11; +import com.hzya.frame.nifiapi.model.joincontrollerenabled.EnOrDiControllerServices12; +import com.hzya.frame.nifiapi.model.joincontrollerenabled.Revision13; +import com.hzya.frame.nifiapi.model.joincreateconnection.CreateConnection18; +import com.hzya.frame.nifiapi.model.joincreateconnections.Component18; +import com.hzya.frame.nifiapi.model.joincreateconnections.CreateConnections18; +import com.hzya.frame.nifiapi.model.joincreateconnections.Revision18; +import com.hzya.frame.nifiapi.model.joincreateconnections.SourceOrDestination18; +import com.hzya.frame.nifiapi.model.joincreatetemp.CreateTemplateJoin; +import com.hzya.frame.nifiapi.model.joingetcontroller.Component12; +import com.hzya.frame.nifiapi.model.joingetcontroller.ControllerService12; +import com.hzya.frame.nifiapi.model.joingetcontroller.Revision12; +import com.hzya.frame.nifiapi.model.joinparametercontexts.*; +import com.hzya.frame.nifiapi.model.joinprocessgroups.Component7; +import com.hzya.frame.nifiapi.model.joinprocessgroups.ProcessGroupsJoin; +import com.hzya.frame.nifiapi.model.joinprocessgroups.Revision5; +import com.hzya.frame.nifiapi.model.joinsnippetinstance.SnippetInstanceJoin; +import com.hzya.frame.nifiapi.model.joinsnippets.Snippet; +import com.hzya.frame.nifiapi.model.joinsnippets.SnippetsJoin; +import com.hzya.frame.nifiapi.model.joinstartorstopprocessgroup.Revision10; +import com.hzya.frame.nifiapi.model.joinstartorstopprocessgroup.StartOrStopProcessGroupsInfoJoin10; +import com.hzya.frame.nifiapi.model.joinupdateprocessor.Revision17; +import com.hzya.frame.nifiapi.model.joinupdateprocessor.RunStatusOrStop17; +import com.hzya.frame.nifiapi.model.nifitemplates.NifiTemplates; +import com.hzya.frame.nifiapi.model.resultcreateprocessor.*; +import com.hzya.frame.nifiapi.model.resultcreateprocessors.CreateProcess16; +import com.hzya.frame.nifiapi.model.resultcreatesnippet.Flow15; +import com.hzya.frame.nifiapi.model.resultcreatesnippet.ProcessGroups15; +import com.hzya.frame.nifiapi.model.resultcreatesnippet.SnippetInstance15; +import com.hzya.frame.nifiapi.model.resultcreatetemplate.NewNifiTemplatete14; +import com.hzya.frame.nifiapi.model.resultcreatetemplate.Template14; +import com.hzya.frame.nifiapi.model.resultparametercontexts.*; +import com.hzya.frame.nifiapi.model.resultprocessgroups.ProcessgroupsResult; +import com.hzya.frame.nifiapi.model.resultprocessgroupsinfo.ProcessGroupsInfoResult9; +import com.hzya.frame.nifiapi.model.resultprocessgroupsinfo.Revision9; +import com.hzya.frame.nifiapi.model.resultprocessorsinfo.Component16; +import com.hzya.frame.nifiapi.model.resultprocessorsinfo.ProcessorsInfo16; +import com.hzya.frame.nifiapi.model.resultprocessorsinfo.Revision16; +import com.hzya.frame.nifiapi.model.resultsnippets.Snippet13; +import com.hzya.frame.nifiapi.model.resultsnippets.SnippetResult13; +import com.hzya.frame.nifiapi.service.NifiApiService; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.junit4.SpringRunner; + +import java.io.FileOutputStream; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * nifi单元测试以及使用案例 + * + * @Author:liuyang + * @Package:com.hzya.frame + * @Project:fw-nifi + * @name:TestNifiService + * @Date:2025/5/14 11:18 + * @Filename:TestNifiService + */ +@RunWith(SpringRunner.class) +@SpringBootTest(classes = {WebappApplication.class}) +public class TestNifiServiceApi { + + @Autowired + private NifiApiService nifiApiService; + + /** + * 获取nifi token + */ + @Test + public void getAccessToken() { + try { + String accessToken = nifiApiService.getAccessToken(); + System.out.println(accessToken); + } catch (Exception e) { + e.printStackTrace(); + } + } + + /** + * 查询指定流程组或全部流程组 + */ + @Test + public void testQueryFlowProcessGroupsRoot() { + try { +// nifiApiService.queryFlowProcessGroupsRoot(null); + Object o = nifiApiService.queryFlowProcessGroupsRoot("41437b87-0196-1000-6951-50e41e75c0da"); + System.out.println(o); + } catch (Exception e) { + e.printStackTrace(); + } + } + + /** + * 新增参数上下文环境 + */ + @Test + public void testCreateParameterContexts() { + try { + Revision revision = new Revision(); + revision.setVersion(0); + + Parameter2 parameter = new Parameter2(); + parameter.setSensitive(false); + parameter.setValue("4"); + parameter.setDescription("请求类型"); + parameter.setName("sceneIden"); + + Parameters2 parameters = new Parameters2(); + parameters.setParameter(parameter); + + List parametersList = new ArrayList<>(); + parametersList.add(parameters); + + Component3 component = new Component3(); + component.setName("测试创建参数环境"); + component.setParameters(parametersList); + + ParameterContextsJoin parameterContextsJoin = new ParameterContextsJoin(); + parameterContextsJoin.setComponent(component); + parameterContextsJoin.setRevision(revision); + ParameterContextsResult parameterContexts = nifiApiService.createParameterContexts(parameterContextsJoin); + System.out.println(parameterContexts); + } catch (Exception e) { + e.printStackTrace(); + } + } + + /** + * 查询参数上下文环境 + */ + @Test + public void testGetParameterContexts() { + try { + ParameterContextsResult parameterContexts = nifiApiService.getParameterContexts("ce640660-0196-1000-b81a-efea19693a24"); + System.out.println(parameterContexts); + } catch (Exception e) { + e.printStackTrace(); + } + } + + /** + * 修改参数上下文环境 + */ + @Test + public void testUpdateParameterContexts() { + try { + //先查询版本 + ParameterContextsResult parameterContexts = nifiApiService.getParameterContexts("ce640660-0196-1000-b81a-efea19693a24"); + + Parameter parameter = new Parameter(); + parameter.setSensitive("false"); + parameter.setValue("9922129"); + parameter.setDescription("212请求类型22"); + parameter.setName("sceneIden2121"); + + Parameter parameter2 = new Parameter(); + parameter2.setSensitive("false"); + parameter2.setValue("999"); + parameter2.setDescription("请求类型22"); + parameter2.setName("sceneIden"); + + Parameters parameters = new Parameters(); + parameters.setParameter(parameter); +// parameters.setParameter(parameter2); + + Parameters parameters2 = new Parameters(); + parameters2.setParameter(parameter2); + + List parametersList = new ArrayList<>(); + parametersList.add(parameters); + parametersList.add(parameters2); + + Revision4 revision1 = parameterContexts.getRevision(); + int version = revision1.getVersion(); + Revision4 revision = new Revision4(); + revision.setVersion(version); + + Component6 component = new Component6(); + component.setParameters(parametersList); + component.setName("测试创建参数环境2222"); + component.setId(parameterContexts.getId()); + + ParameterContextsResult parameterContextsJoin = new ParameterContextsResult(); + parameterContextsJoin.setRevision(revision); + parameterContextsJoin.setId(parameterContexts.getId()); + parameterContextsJoin.setComponent(component); + + ParameterContextsResult parameterContextsResult = nifiApiService.updateParameterContexts(parameterContextsJoin); + System.out.println(parameterContextsResult); + } catch (Exception e) { + e.printStackTrace(); + } + } + + /** + * 删除上下文参数环境 + */ + @Test + public void testDeleteParameterContexts() { + try { + //先查询版本 + ParameterContextsResult parameterContexts = nifiApiService.getParameterContexts("d1bed8c7-0196-1000-2d90-5490f160703a"); + Revision4 revision = parameterContexts.getRevision(); + int version = revision.getVersion(); + + Map queryParams = new HashMap<>(); + queryParams.put("version", String.valueOf(version)); + ParameterContextsResult parameterContextsResult = nifiApiService.deleteParameterContexts(parameterContexts.getId(), queryParams); + System.out.println(parameterContextsResult.getId()); + } catch (Exception e) { + e.printStackTrace(); + } + } + + /** + * 创建空的流程组 + */ + @Test + public void testCreateProcessGroups() throws Exception { + try { +// ProcessGroupStatus processGroupStatus = new ProcessGroupStatus(); +// processGroupStatus.setName("测试流程组"); + + Revision5 revision5 = new Revision5(); + revision5.setVersion(0); + + Component7 component7 = new Component7(); + component7.setName("测试流程组"); + + ProcessGroupsJoin processGroupsJoin = new ProcessGroupsJoin(); +// processGroupsJoin.setStatus(processGroupStatus); + processGroupsJoin.setRevision(revision5); + processGroupsJoin.setComponent(component7); + + //父处理器组 + String parentGroupId = "76e23115-ad84-1aab-7630-21c76aa3973d"; + ProcessgroupsResult processGroups = nifiApiService.createProcessGroups(parentGroupId, processGroupsJoin); + System.out.println("流程组id:" + processGroups.getId()); + } catch (Exception e) { + e.printStackTrace(); + } + } + + /** + * 删除流程组 + */ + @Test + public void testDeleteProcessGroups() throws Exception { + try { + //查询指定流程组,获得版本 +// ProcessGroupsId processGroupsRoot = (ProcessGroupsId) nifiApiService.queryFlowProcessGroupsRoot("d1f318c0-0196-1000-6525-813ea2d8a7c8"); + + //查询指定流程组的详情 + ProcessGroupsInfoResult9 processGroups = nifiApiService.getProcessGroups("d6efaa35-0196-1000-d82c-348f4e1339ea"); + Revision9 revision = processGroups.getRevision(); + System.out.println(revision.getVersion()); + + //执行删除流程组 + Map queryParams = new HashMap<>(); + queryParams.put("version", revision.getVersion()); + ProcessGroupsInfoResult9 processGroupsInfoResult9 = nifiApiService.deleteProcessGroups(processGroups.getId(), queryParams); + System.out.println("被删除的流程组id:" + processGroupsInfoResult9.getId()); + } catch (Exception e) { + e.printStackTrace(); + } + } + + /** + * 启动或停止流程组 + */ + @Test + public void testStartOrStopProcessGroups() throws Exception { + try { + //查询指定流程组的详情 + ProcessGroupsInfoResult9 processGroups = nifiApiService.getProcessGroups("d70315c5-0196-1000-ecce-245ae0e5b43b"); + Revision9 revision = processGroups.getRevision(); +// System.out.println(revision.getVersion()); + + Revision10 revision10 = new Revision10(); + revision10.setVersion(revision.getVersion()); + + StartOrStopProcessGroupsInfoJoin10 startOrStopProcessGroupsInfoJoin10 = new StartOrStopProcessGroupsInfoJoin10(); + startOrStopProcessGroupsInfoJoin10.setId(processGroups.getId()); +// startOrStopProcessGroupsInfoJoin10.setState("RUNNING"); + startOrStopProcessGroupsInfoJoin10.setState("STOPPED"); + startOrStopProcessGroupsInfoJoin10.setRevision(revision10); + ProcessGroupsInfoResult9 processGroupsInfoResult9 = nifiApiService.startOrStopProcessGroups(startOrStopProcessGroupsInfoJoin10); + System.out.println("流程组id:" + processGroupsInfoResult9.getId()); + } catch (Exception e) { + e.printStackTrace(); + } + } + + /** + * 给流程组绑定参数上下文环境、或者给流程组更新上下文环境 + */ + @Test + public void testBindParameterContexts() throws Exception { + try { + //查询指定流程组的详情 + ProcessGroupsInfoResult9 processGroups = nifiApiService.getProcessGroups("d70315c5-0196-1000-ecce-245ae0e5b43b"); + Revision9 revision = processGroups.getRevision(); + + //流程组的版本号 + Revision11 revision11 = new Revision11(); + revision11.setVersion(revision.getVersion()); + + //参数上下文对象id + ParameterContext11 parameterContext11 = new ParameterContext11(); + parameterContext11.setId("d71285a9-0196-1000-80fc-41b9af5a8c6a"); + + //指定的组件对象 + Component11 component11 = new Component11(); + component11.setId(processGroups.getId()); + component11.setParameterContext(parameterContext11); + + BindParameterContextsJoin11 bindParameterContextsJoin11 = new BindParameterContextsJoin11(); + bindParameterContextsJoin11.setComponent(component11); + bindParameterContextsJoin11.setRevision(revision11); + ProcessGroupsInfoResult9 processGroupsInfoResult9 = nifiApiService.bindParameterContexts(bindParameterContextsJoin11); + System.out.println("流程组id:" + processGroupsInfoResult9.getId()); + } catch (Exception e) { + e.printStackTrace(); + } + } + + /** + * 获取控制器服务 + */ + @Test + public void testGetControllerServices() { + try { + ControllerService12 controllerServices = nifiApiService.getControllerServices("0196107e-f933-1f70-d8db-53adf9ed7eda"); + System.out.println(controllerServices.getId()); + } catch (Exception e) { + e.printStackTrace(); + } + } + + /** + * 控制器服务-Mysql数据源 + */ + @Test + public void testCreateControllerServices() { + try { + //A revision of 0 must be specified when creating a new Controller service. + Revision12 revision12 = new Revision12(); + revision12.setVersion("0"); + + //创建Mysql数据源 + Map properties = new HashMap<>(); + properties.put("Database Connection URL", "jdbc:mysql://ufidahz.com.cn:9014/apache_nifi_sync?useSSL=false&serverTimezone=Asia/Shanghai&characterEncoding=utf8&useLegacyDatetimeCode=false"); + properties.put("Database Driver Class Name", "com.mysql.cj.jdbc.Driver"); + properties.put("database-driver-locations", "/data/nifi-1.28.1/lib/mysql-connector-java-8.0.30.jar"); + properties.put("Database User", "root"); + properties.put("Password", "hzya1314"); + properties.put("Max Wait Time", "600000 millis"); + properties.put("Max Total Connections", "15"); + properties.put("Validation-query", "select 1"); + properties.put("dbcp-min-idle-conns", "5"); + properties.put("dbcp-max-idle-conns", "10"); + properties.put("dbcp-max-conn-lifetime", "-1"); + properties.put("dbcp-time-between-eviction-runs", "-1"); + properties.put("dbcp-min-evictable-idle-time", "30 mins"); + properties.put("dbcp-soft-min-evictable-idle-time", "-1"); + + Component12 component12 = new Component12(); + component12.setName("新建Mysql数据源"); + component12.setType("org.apache.nifi.dbcp.DBCPConnectionPool"); + component12.setState("ENABLED"); + component12.setProperties(properties); + component12.setComments("一个备注"); + component12.setBulletinLevel("DEBUG"); + + ControllerService12 controllerService11 = new ControllerService12(); + controllerService11.setComponent(component12); + controllerService11.setRevision(revision12); + ControllerService12 controllerServices = nifiApiService.createControllerServices("d7f1b14d-0196-1000-ce43-7499a84f6096", controllerService11); + System.out.println(controllerServices.getId()); + } catch (Exception e) { + e.printStackTrace(); + } + } + + /** + * 控制器服务-Oracle数据源 + */ + @Test + public void testCreateControllerServices2() { + try { + //A revision of 0 must be specified when creating a new Controller service. + Revision12 revision12 = new Revision12(); + revision12.setVersion("0"); + + //新建Oracle数据源 + Map properties2 = new HashMap<>(); + properties2.put("Database Connection URL", "jdbc:oracle:thin:@39.170.109.90:1521:orcl"); + properties2.put("Database Driver Class Name", "oracle.jdbc.OracleDriver"); + properties2.put("database-driver-locations", "/data/nifi-1.28.1/lib/ojdbc8.jar"); + properties2.put("Database User", "U8C241231"); + properties2.put("Password", "U8C241231"); + properties2.put("Max Wait Time", "600000 millis"); + properties2.put("Max Total Connections", "20"); + properties2.put("Validation-query", "select 1 from dual"); + properties2.put("dbcp-min-idle-conns", "5"); + properties2.put("dbcp-max-idle-conns", "10"); + properties2.put("dbcp-max-conn-lifetime", "-1"); + properties2.put("dbcp-time-between-eviction-runs", "-1"); + properties2.put("dbcp-min-evictable-idle-time", "30 mins"); + properties2.put("dbcp-soft-min-evictable-idle-time", "-1"); + + Component12 component123 = new Component12(); + component123.setName("新建Oracle数据源"); + component123.setType("org.apache.nifi.dbcp.DBCPConnectionPool"); + component123.setState("ENABLED"); + component123.setProperties(properties2); + component123.setComments("一个备注"); + component123.setBulletinLevel("DEBUG"); + + ControllerService12 controllerService11 = new ControllerService12(); + controllerService11.setComponent(component123); + controllerService11.setRevision(revision12); + ControllerService12 controllerServices = nifiApiService.createControllerServices("d7f1b14d-0196-1000-ce43-7499a84f6096", controllerService11); + System.out.println(controllerServices.getId()); + } catch (Exception e) { + e.printStackTrace(); + } + } + + /** + * 控制器服务-JsonTreeReader:JsonTreeReader 是一种 Controller Service,主要用于解析 JSON 数据,将其转换为结构化的记录(Records) + * Records 是 NiFi 中的一种数据抽象,表示一组结构化的数据记录(类似于数据库中的行或 JSON 对象)。每个 Record 由多个字段(Fields)组成, + * 每个字段有名称、类型和值,遵循用户定义的 Schema + * 一个 Record 可以看作一个键值对集合,例如 { "name": "Alice", "age": 30 } + * 多个 Records 组成一个 Record Set,表示一批数据(如 JSON 数组 [ { "name": "Alice" }, { "name": "Bob" } ]) + */ + @Test + public void testCreateControllerServices3() { + try { + //A revision of 0 must be specified when creating a new Controller service. + Revision12 revision12 = new Revision12(); + revision12.setVersion("0"); + + //新建Oracle数据源 + Map properties2 = new HashMap<>(); + properties2.put("schema-access-strategy", "schema-text-property"); + properties2.put("schema-name", "${schema.name}"); + properties2.put("schema-text", "${fieldMapping}"); + properties2.put("starting-field-strategy", "ROOT_NODE"); + properties2.put("Max String Length", "500 MB"); + properties2.put("Allow Comments", "false"); + + Component12 component123 = new Component12(); + component123.setName("新建JsonTreeReader"); + component123.setType("org.apache.nifi.json.JsonTreeReader"); + component123.setState("ENABLED"); + component123.setProperties(properties2); + component123.setComments("一个备注"); + component123.setBulletinLevel("DEBUG"); + + ControllerService12 controllerService11 = new ControllerService12(); + controllerService11.setComponent(component123); + controllerService11.setRevision(revision12); + ControllerService12 controllerServices = nifiApiService.createControllerServices("d7f1b14d-0196-1000-ce43-7499a84f6096", controllerService11); + System.out.println(controllerServices.getId()); + } catch (Exception e) { + e.printStackTrace(); + } + } + + /** + * 控制器服务-JsonRecordSetWriter + * 用于将 NiFi 的 Record 数据集(Record Set)转换为 JSON 格式的输出 + */ + @Test + public void testCreateControllerServices4() { + try { + //A revision of 0 must be specified when creating a new Controller service. + Revision12 revision12 = new Revision12(); + revision12.setVersion("0"); + + //新建Oracle数据源 + Map properties2 = new HashMap<>(); + properties2.put("Schema Write Strategy", "no-schema"); + properties2.put("schema-protocol-version", "1"); + properties2.put("schema-access-strategy", "schema-text-property"); + properties2.put("schema-name", "${schema.name}"); + properties2.put("schema-text", "${fieldMapping}"); + properties2.put("Pretty Print JSON", "false"); + properties2.put("suppress-nulls", "never-suppress"); + properties2.put("Allow Scientific Notation", "true"); + properties2.put("output-grouping", "output-array"); + properties2.put("compression-format", "none"); + properties2.put("compression-level", "1"); + + Component12 component123 = new Component12(); + component123.setName("新建JsonRecordSetWriter"); + component123.setType("org.apache.nifi.json.JsonRecordSetWriter"); + component123.setState("ENABLED"); + component123.setProperties(properties2); + component123.setComments("一个备注2"); + component123.setBulletinLevel("DEBUG"); + + ControllerService12 controllerService11 = new ControllerService12(); + controllerService11.setComponent(component123); + controllerService11.setRevision(revision12); + ControllerService12 controllerServices = nifiApiService.createControllerServices("0196107c-f933-1f70-b5a3-634484bed74a", controllerService11); + System.out.println(controllerServices.getId()); + } catch (Exception e) { + e.printStackTrace(); + } + } + + /** + * AvroReader 是一个 Apache NiFi 1.28.1 中的 Controller Service,用于读取 Avro 格式的结构化数据, + * 并将其转换为 NiFi 的 Record 数据集。它主要用于数据输入场景,帮助 Processor 处理 Avro 文件或流数据。 + */ + @Test + public void testCreateControllerServices5() { + try { + //A revision of 0 must be specified when creating a new Controller service. + Revision12 revision12 = new Revision12(); + revision12.setVersion("0"); + + //新建Oracle数据源 + Map properties2 = new HashMap<>(); + properties2.put("schema-access-strategy", "embedded-avro-schema"); + properties2.put("cache-size", "1000"); + + Component12 component123 = new Component12(); + component123.setName("新建AvroReader"); + component123.setType("org.apache.nifi.avro.AvroReader"); + component123.setState("ENABLED"); + component123.setProperties(properties2); + component123.setComments("99999"); + component123.setBulletinLevel("DEBUG"); + + ControllerService12 controllerService11 = new ControllerService12(); + controllerService11.setComponent(component123); + controllerService11.setRevision(revision12); + ControllerService12 controllerServices = nifiApiService.createControllerServices("0196107c-f933-1f70-b5a3-634484bed74a", controllerService11); + System.out.println(controllerServices.getId()); + } catch (Exception e) { + e.printStackTrace(); + } + } + + /** + * 停用或启用控制器服务 + */ + @Test + public void testEnabledOrDisabledControllerServices() { + try { +// Revision13 controllerServiceStatus = new Revision13(); +// //启用 +// controllerServiceStatus.setRunStatus("ENABLED"); +// //停用 +//// controllerServiceStatus.setRunStatus("DISABLED"); +// +// EnOrDiControllerServices enOrDiControllerServices = new EnOrDiControllerServices(); +// enOrDiControllerServices.setId("0196107e-f933-1f70-d8db-53adf9ed7eda"); +// enOrDiControllerServices.setStatus(controllerServiceStatus); + + //查询控制器详情,得到最新的版本号 + ControllerService12 controllerServices = nifiApiService.getControllerServices("0196107e-f933-1f70-d8db-53adf9ed7eda"); + Revision12 revision = controllerServices.getRevision(); + + Revision13 revision13 = new Revision13(); + revision13.setVersion(revision.getVersion()); + + EnOrDiControllerServices12 enOrDiControllerServices = new EnOrDiControllerServices12(); +// enOrDiControllerServices.setState("ENABLED"); + enOrDiControllerServices.setState("DISABLED"); + enOrDiControllerServices.setRevision(revision13); + ControllerService12 controllerService12 = nifiApiService.enabledOrDisabledControllerServices(controllerServices.getId(), enOrDiControllerServices); + System.out.println(controllerService12.getId()); + } catch (Exception e) { + e.printStackTrace(); + } + } + + /** + * 删除控制器服务 + */ + @Test + public void testDeleteControllerServices() { + try { + //查询最新的版本号 + ControllerService12 controllerServices = nifiApiService.getControllerServices("0196107e-f933-1f70-d8db-53adf9ed7eda"); + Revision12 revision = controllerServices.getRevision(); + + Map stringStringMap = new HashMap<>(); + stringStringMap.put("version", revision.getVersion()); + + ControllerService12 controllerService12 = nifiApiService.deleteControllerServices("0196107e-f933-1f70-d8db-53adf9ed7eda", stringStringMap); + System.out.println(controllerService12.getId()); + } catch (Exception e) { + e.printStackTrace(); + } + } + + /** + * 获取所有xml模板列表 + */ + @Test + public void testGetAllXmlTemplates() throws Exception { + try { + NifiTemplates allXmlTemplates = nifiApiService.getAllXmlTemplates(); + System.out.println(allXmlTemplates.getTemplates()); + } catch (Exception e) { + e.printStackTrace(); + } + } + + /** + * 获取指定模板内容,并保存到xml文件 + * getXmlTemplatesContent方法只提供InputStream,需要调用者按业务逻辑自行处理 + */ + @Test + public void testGetXmlTemplatesContent() throws Exception { + try { + String id = "d140afb6-c357-4136-8d0f-58c59f09ea6b"; + String outputFilePath = "/Users/liuyang/workspaces/hzya/fw-nifi/aaa.xml"; +// InputStream xmlTemplatesContent = nifiApiService.getXmlTemplatesContent(id); +// System.out.println(xmlTemplatesContent); + try (InputStream xmlTemplatesContent = nifiApiService.getXmlTemplatesContent(id); FileOutputStream outputStream = new FileOutputStream(outputFilePath)) { + byte[] buffer = new byte[8192]; + int bytesRead; + while ((bytesRead = xmlTemplatesContent.read(buffer)) != -1) { + outputStream.write(buffer, 0, bytesRead); + } + System.out.println("XML文件已保存到: " + outputFilePath); + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + /** + * 把某个流程组转换为模板 + */ + @Test + public void testCreateTemplate() throws Exception { + try { + CreateTemplateJoin createTemplateJoin = new CreateTemplateJoin(); + createTemplateJoin.setName("新模板名称");//模版名称 + createTemplateJoin.setSnippetId("019610ca-f933-1f70-1c03-4b2b24652c0f");//代码片段标识符 + + String id = "0196107c-f933-1f70-b5a3-634484bed74a";//目标流程组id + nifiApiService.createNifiTemplate(id, createTemplateJoin); + } catch (Exception e) { + e.printStackTrace(); + } + } + + /** + * 把指定流程组创建为代码段,并创建代码片段 + * Snippets是NiFi内部用于标识流程组内容的临时快照 + */ + @Test + public void testCreateSnippets() { + try { + ProcessGroupsInfoResult9 processGroups = nifiApiService.getProcessGroups("0196107c-f933-1f70-b5a3-634484bed74a"); + Revision9 revision = processGroups.getRevision(); + + Map processGroupsMap2 = new HashMap<>(); + processGroupsMap2.put("version", revision.getVersion());//目标流程组版本号 + + Map> processGroupsMap = new HashMap<>(); + processGroupsMap.put("0196107c-f933-1f70-b5a3-634484bed74a", processGroupsMap2);//目标流程组id + + Snippet snippet = new Snippet(); + snippet.setParentGroupId("76e23115-ad84-1aab-7630-21c76aa3973d");//目标流程组id的父流程组id + snippet.setProcessGroups(processGroupsMap); + + SnippetsJoin snippetsJoin = new SnippetsJoin(); + snippetsJoin.setSnippet(snippet); + SnippetResult13 snippets = nifiApiService.createSnippets(snippetsJoin); + Snippet13 snippet1 = snippets.getSnippet(); + System.out.println(snippet1.getId()); + + //根据代码片段创建模板 + CreateTemplateJoin createTemplateJoin = new CreateTemplateJoin(); + createTemplateJoin.setName("新模板名称222");//模版名称 + createTemplateJoin.setSnippetId(snippet1.getId());//代码片段标识符 + + String id = "0196107c-f933-1f70-b5a3-634484bed74a";//目标流程组id + NewNifiTemplatete14 nifiTemplate = nifiApiService.createNifiTemplate(id, createTemplateJoin); + Template14 template = nifiTemplate.getTemplate(); + System.out.println("新模板主键:" + template.getId()); + } catch (Exception e) { + e.printStackTrace(); + } + } + + /** + * 将代码片段实例化到指定位置 + */ + @Test + public void testCreateSnippetInstance() throws Exception { + try { + //现请求得到代码片段id + ProcessGroupsInfoResult9 processGroups = nifiApiService.getProcessGroups("60792725-0196-1000-cda3-3584147c4535"); + Revision9 revision = processGroups.getRevision(); + + Map processGroupsMap2 = new HashMap<>(); + processGroupsMap2.put("version", revision.getVersion());//目标流程组版本号 + + Map> processGroupsMap = new HashMap<>(); + processGroupsMap.put("60792725-0196-1000-cda3-3584147c4535", processGroupsMap2);//目标流程组id + + Snippet snippet = new Snippet(); + snippet.setParentGroupId("76e23100-ad84-1aab-4447-b9ee936ee7d5");//目标流程组id的父流程组id + snippet.setProcessGroups(processGroupsMap); + + SnippetsJoin snippetsJoin = new SnippetsJoin(); + snippetsJoin.setSnippet(snippet); + SnippetResult13 snippets = nifiApiService.createSnippets(snippetsJoin); + Snippet13 snippet1 = snippets.getSnippet(); +// System.out.println(snippet1.getId()); + + //将片段实例化到指定位置 + String groupId = "019610d1-f933-1f70-16ea-3cf005012c4b";//流程组id + SnippetInstanceJoin snippetInstanceJoin = new SnippetInstanceJoin(); + snippetInstanceJoin.setSnippetId(snippet1.getId()); + snippetInstanceJoin.setOriginY("0"); + snippetInstanceJoin.setOriginX("0"); + SnippetInstance15 snippetInstance = nifiApiService.createSnippetInstance(groupId, snippetInstanceJoin); + Flow15 flow = snippetInstance.getFlow(); + ProcessGroups15 processGroups15 = flow.getProcessGroups().get(0); + System.out.println(processGroups15.getId()); + } catch (Exception e) { + e.printStackTrace(); + } + } + + /** + * 创建GenerateFlowFile处理器 + */ + @Test + public void testCreateProcessor() throws Exception { + try { + Revision15 revision15 = new Revision15(); + revision15.setVersion("0"); + + Component15 component15 = new Component15(); + component15.setType("org.apache.nifi.processors.standard.GenerateFlowFile"); + component15.setName("定时调度"); + + Position15 position15 = new Position15(); + position15.setX("0"); + position15.setY("0"); + + Config15 config15 = new Config15(); + config15.setSchedulingPeriod("1 min"); + + Properties15 properties15 = new Properties15(); + properties15.setFileSize("1KB"); + + config15.setProperties(properties15); + + component15.setConfig(config15); + + CreateProcessorJoin15 createProcessorJoin15 = new CreateProcessorJoin15(); + createProcessorJoin15.setComponent(component15); + createProcessorJoin15.setRevision(revision15); + + String groupId = "019610d1-f933-1f70-16ea-3cf005012c4b"; + CreateProcess16 processor = nifiApiService.createProcessor(groupId, createProcessorJoin15); +// String id = processor.getId(); + System.out.println("处理器id:" + processor.getId()); + } catch (Exception e) { + e.printStackTrace(); + } + } + + /** + * 查询处理器详情 + */ + @Test + public void testGetProcessor() throws Exception { + try { +// String processId = "01961137-f933-1f70-4f08-6fc7041f99bd"; + String processId2 = "ad841aed-30f7-16e2-4fa0-595ab0ab9fdc"; + ProcessorsInfo16 processor = nifiApiService.getProcessor(processId2); + String id = processor.getId(); + System.out.println(id); + } catch (Exception e) { + e.printStackTrace(); + } + } + + /** + * 更新处理器 + */ + @Test + public void testUpdateProcessor() { + try { + //查询处理器版本号 + String processId2 = "01961137-f933-1f70-4f08-6fc7041f99bd"; + ProcessorsInfo16 processor = nifiApiService.getProcessor(processId2); + Revision16 revision16 = processor.getRevision(); + String version = revision16.getVersion(); + + Revision16 revision161 = new Revision16(); + revision161.setVersion(version); + + Component16 component16 = new Component16(); + component16.setId(processId2); + component16.setName("定时调度222");//修改名称 + + ProcessorsInfo16 processorsInfo16 = new ProcessorsInfo16(); + processorsInfo16.setRevision(revision161); + processorsInfo16.setComponent(component16); + + ProcessorsInfo16 processor2 = nifiApiService.updateProcessor(processId2, processorsInfo16); + String id = processor2.getId(); + System.out.println(id); + } catch (Exception e) { + e.printStackTrace(); + } + } + + /** + * 启动或停止处理器 + */ + @Test + public void testRunStatusOrStop() throws Exception { + try { + //查询处理器版本号 + String processId2 = "01961137-f933-1f70-4f08-6fc7041f99bd"; + ProcessorsInfo16 processor = nifiApiService.getProcessor(processId2); + Revision16 revision16 = processor.getRevision(); + String version = revision16.getVersion(); + + Revision17 revision17 = new Revision17(); + revision17.setVersion(version); + + //修改处理器 + String processId = "01961137-f933-1f70-4f08-6fc7041f99bd"; + RunStatusOrStop17 runStatusOrStop17 = new RunStatusOrStop17(); + runStatusOrStop17.setState("RUNNING");//启动 +// runStatusOrStop17.setState("STOPPED");//停止 + runStatusOrStop17.setRevision(revision17); + ProcessorsInfo16 processorsInfo16 = nifiApiService.runStatusOrStop(processId, runStatusOrStop17); + String id = processorsInfo16.getId(); + System.out.println(id); + } catch (Exception e) { + e.printStackTrace(); + } + } + + /** + * 删除处理器 + */ + @Test + public void testDeleteProcessor() throws Exception { + try { + //查询处理器版本号 + String processId2 = "ad841aed-30f7-16e2-4fa0-595ab0ab9fdc"; + ProcessorsInfo16 processor = nifiApiService.getProcessor(processId2); + Revision16 revision16 = processor.getRevision(); + String version = revision16.getVersion(); + + Map queryParams = new HashMap<>(); + queryParams.put("version", version); + + ProcessorsInfo16 processor2 = nifiApiService.deleteProcessor(processId2, queryParams); + String id = processor2.getId(); + System.out.println(id); + } catch (Exception e) { + e.printStackTrace(); + } + } + + /** + * 建立连接关系 + */ + @Test + public void testCreateProcessorConnections() throws Exception { + try { +// ProcessGroupsInfoResult9 processGroups = nifiApiService.getProcessGroups("019610d1-f933-1f70-16ea-3cf005012c4b"); +// Revision9 revision = processGroups.getRevision(); +// System.out.println(revision.getVersion()); + + SourceOrDestination18 source = new SourceOrDestination18(); + source.setId("01961137-f933-1f70-4f08-6fc7041f99bd"); + source.setGroupId("019610d1-f933-1f70-16ea-3cf005012c4b"); + source.setType("PROCESSOR"); + + SourceOrDestination18 destination = new SourceOrDestination18(); + destination.setId("0196113c-f933-1f70-2318-883f9f6faffd"); + destination.setGroupId("019610d1-f933-1f70-16ea-3cf005012c4b"); +// destination.setType("INPUT_PORT"); + destination.setType("PROCESSOR"); + + List selectedRelationships = new ArrayList<>(); + selectedRelationships.add("success"); + + Component18 component18 = new Component18(); + component18.setSource(source); + component18.setDestination(destination); + component18.setSelectedRelationships(selectedRelationships); + + Revision18 revision18 = new Revision18(); + revision18.setVersion("0");//默认传递0 + + CreateConnections18 createConnections17 = new CreateConnections18(); + createConnections17.setComponent(component18); + createConnections17.setRevision(revision18); + + String processGroupsId = "019610d1-f933-1f70-16ea-3cf005012c4b"; + CreateConnection18 processorConnections = nifiApiService.createProcessorConnections(processGroupsId, createConnections17); + System.out.println("连线id:" + processorConnections.getId()); + } catch (Exception e) { + e.printStackTrace(); + } + } +} \ No newline at end of file