Kettle系列教程-第十二章:使用Java执行Kettle作业
本系列教程基于Kettle 8.1(pdi-ce-8.1.0.0-365)。大部分内容同样适用于Kettle 7.x版本。
章节目录:
- 一、运行环境配置
- JDK
- JVM参数
- KETTLE_HOME
- 依赖包导入
- 二、转换与作业
- 转换流程
- 作业流程
- 三、数据库连接配置
- 创建数据库连接
- 共享数据库连接
- 数据库连接参数
- 四、资源库(数据库存储方式)
- 创建资源库
- 保存流程到资源库
- 从资源库打开流程
- 五、变量/参数
- 参数的配置与使用
- 变量的配置与使用
- 六、转换流程-输入组件
- Excel输入
- 表输入
- 七、转换流程-输出组件
- Excel输出
- 文本文件输出
- 表输出
- 八、转换流程-转换组件
- 九、脚本组件
- 转换-Java代码组件
- 作业-SQL组件
- 作业Shell组件
- 十、对接大数据平台
- 基础文件配置
- 上传文件到HDFS
- 连接Hive
- 十一、使用Windows计划任务定时执行Kettle作业
- 命令说明
- 编写批处理脚本执行Kettle作业
- 创建计划任务定时执行Kettle作业
- 十二、使用Java执行Kettle作业
- 搭建Kettle运行环境
- 代码示例(作业、转换、资源库)
本章说明
Kettle是提供了一些api的,我们可以通过这些api去执行Kettle作业、转换。除了执行作业Kettle还有其他很多api可供使用,本章只介绍作业的执行,其他api有兴趣的可以去探索探索。
搭建Kettle运行环境
首先需要搭建一个Kettle运行环境,很简单,就是从data-integration\lib\目录下复制部分核心jar包出来,导入到java项目(jdk1.8)中,本章节所需jar包如下(不要忘了数据库连接驱动):
代码示例(作业、转换、资源库)
这里就直接放代码了,包含作业文件、转换文件、资源库作业的执行示例。
package com.staroon.kettle.exec;
import org.pentaho.di.core.KettleEnvironment;
import org.pentaho.di.core.database.DatabaseMeta;
import org.pentaho.di.core.logging.LogLevel;
import org.pentaho.di.core.util.EnvUtil;
import org.pentaho.di.job.Job;
import org.pentaho.di.job.JobMeta;
import org.pentaho.di.repository.RepositoryDirectoryInterface;
import org.pentaho.di.repository.kdr.KettleDatabaseRepository;
import org.pentaho.di.repository.kdr.KettleDatabaseRepositoryMeta;
import org.pentaho.di.trans.Trans;
import org.pentaho.di.trans.TransMeta;
import java.util.HashMap;
import java.util.Map;
public class RunKettleJob {
public static void main(String[] args) {
// 参数列表
Map<String, String> params = new HashMap<>();
params.put("filename", "runjob");
params.put("extend", "txt");
String kjbPath = "D:/kettle/jobs/Kettledoc/常规作业示例.kjb";
String ktrPath = "D:/kettle/jobs/Kettledoc/转换任务.ktr";
// runJob(params, kjbPath);
// runTrans(params, ktrPath);
runRepoJob(params);
}
/**
* 运行资源库中的作业
*
* @param params 作业参数
*/
public static void runRepoJob(Map<String, String> params) {
try {
KettleEnvironment.init();
KettleDatabaseRepository repository = new KettleDatabaseRepository();
// 配置资源库数据库连接信息
DatabaseMeta databaseMeta = new DatabaseMeta(
"kettle",
"mysql",
"jdbc",
"127.0.0.1",
"kettle",
"3308",
"root",
"lwsjfwq"
);
// 配置连接参数,指定连接编码为UTF8,若不指定则不能读取中文目录或者中文名作业
databaseMeta.getAttributes().put("EXTRA_OPTION_MYSQL.characterEncoding", "utf8");
// 连接测试
if (databaseMeta.testConnection().startsWith("正确")) {
System.out.println("数据库连接成功");
} else {
System.out.println("数据库连接失败");
return;
}
// 配置资源库
KettleDatabaseRepositoryMeta repositoryMeta = new KettleDatabaseRepositoryMeta(
"kettle",
"kettle",
"Kettle Repository",
databaseMeta
);
repository.init(repositoryMeta);
// 连接资源库
repository.connect("admin", "admin");
// 指定job或者trans所在的目录
RepositoryDirectoryInterface dir = repository.findDirectory("/批处理/");
// 选择资源库中的作业
JobMeta jobMeta = repository.loadJob("资源库作业示例", dir, null, null);
// 配置作业参数
for (String param : params.keySet()) {
jobMeta.setParameterValue(param, params.get(param));
}
Job job = new Job(repository, jobMeta);
job.setLogLevel(LogLevel.DEBUG);
//执行作业
job.start();
//等待作业执行结束
job.waitUntilFinished();
if (job.getErrors() > 0) {
throw new Exception("作业执行出错");
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 运行转换文件
*
* @param params 转换参数
* @param ktrPath 转换文件的路径,后缀ktr
*/
public static void runTrans(Map<String, String> params, String ktrPath) {
try {
// 初始化
KettleEnvironment.init();
EnvUtil.environmentInit();
TransMeta transMeta = new TransMeta(ktrPath);
// 配置参数
for (String param : params.keySet()) {
transMeta.setParameterValue(param, params.get(param));
}
Trans trans = new Trans(transMeta);
// 设置日志级别
trans.setLogLevel(LogLevel.DEBUG);
// 执行转换
trans.execute(null);
// 等待转换执行结束
trans.waitUntilFinished();
// 抛出异常
if (trans.getErrors() > 0) {
throw new Exception("转换执行出错");
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 运行作业文件
*
* @param params 作业参数
* @param kjbPath 作业文件路径,后缀kjb
*/
public static void runJob(Map<String, String> params, String kjbPath) {
try {
KettleEnvironment.init();
JobMeta jobMeta = new JobMeta(kjbPath, null);
// 配置作业参数
for (String param : params.keySet()) {
jobMeta.setParameterValue(param, params.get(param));
}
// 配置变量
// jobMeta.setVariable("name","value");
Job job = new Job(null, jobMeta);
// 设置日志级别
job.setLogLevel(LogLevel.DEBUG);
// 启动作业
job.start();
// 等待作业执行完毕
job.waitUntilFinished();
if (job.getErrors() > 0) {
throw new Exception("作业执行出错");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
本章完!