kettle导⼊oracle优化,【经验分享】通过Kettle⾼效的导⼊数据
⾄DWS
摘要: Kettle是⼀款⾼效易⽤的开源ETL⼯具,DWS⽀持和Kettle的对接以及各种数据⼊库⽅式。
准备⼯作:在使⽤Kettle对接DWS前,请先安装号Kettle,并确保⽹络和DWS可以连通。
⼀、 配置Kettle的DWS数据库连接:
可选:⽤户可以使⽤PG原⽣驱动,或使⽤DWS提供的驱动替换PG驱动。在Kettle\data-integration\libext\JDBC⽬录中有jdbc驱动,需
要把pg原有的jdbc驱动替换成mppdb的驱动,命名仍按照pg原有的命名。
选择PostgreSQL连接,填写connection name、host name、databa name、port number、urname、password等,点击
OK。
s开头的英文名男⼆、 使⽤Kettle加载数据⾄DWS
1) 从源端库导出成⽂件到OBS或本地服务器,然后⽤bulk load导⼊⽅式(OBS数据加载、本地数据加载),通过DWS创建外表,在转换中
添加执⾏SQL脚本,在执⾏SQL语句中执⾏bulk load语句。(推荐,传输速率最快可⾄集群每个计算节点100MB/s以上)
2) 通过⾃定义Java代码,⾛Copy的⽅式,流式数据⼊库(时效性⾼,⼊库效率百万条/s)
⽰例代码:
import py.CopyManager;
import BaConnection;
import java.sql.Connection;
import java.sql.DriverManager;
import java.util.Properties;
import java.io.StringReader;
private Connection con;
private StringBuffer tuples;
private int count;
private String sql = "copy
dwi_alarm_info(alarm_definition,alarm_level,ext_similarity,last_update_date,occurring_time,source_channel,source_device,space,statu from STDIN WITH(format 'CSV', QUOTE '|')";
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException {
Object[] r = getRow();
if (r == null) {
if (count > 0) {
try {
//logError("2***2019***" + String.valueOf(count));
//logError("3----------------------------");
//String());
//logError("4----------------------------");
CopyManager cm = new CopyManager((BaConnection) con);
StringReader sr = new String());
long starttime = System.currentTimeMillis();
long rows = cm.copyIn(sql, sr);
long endtime = System.currentTimeMillis();
long costs = endtime - starttime;
/
/logError(String.valueOf(rows) + " rows, costs " + String.valueOf(costs) + " ms"); count = 0;
形式教育与实质教育的区别tuples.delete(0, tuples.length());
} catch (Exception ) {
//logError("5----------------------------" + .getMessage());
.printStackTrace();
return fal;
}
}
tOutputDone();
return fal;
}
if (first) {
try {
Class.forName("org.postgresql.Driver");
} catch (ClassNotFoundException e) {
// logError("1----------------------------" + e.getMessage());
System.out.println("can not find Driver");
e.printStackTrace();
return fal;
}
first = fal;
count = 0;
添加用户
tuples = new StringBuffer();
}
String alarm_definition = get(Fields.In, "alarm_definition").getString(r);
String alarm_level = get(Fields.In, "alarm_level").getString(r);
String ext_blacklistpicurl = get(Fields.In, "ext_blacklistpicurl").getString(r);
String ext_similarity = get(Fields.In, "ext_similarity").getString(r);
//String ext_suspectid = get(Fields.In, "ext_suspectid").getString(r);
String last_update_date = get(Fields.In, "last_update_date").getString(r);
String occurring_time = get(Fields.In, "occurring_time").getString(r);
String source_channel = get(Fields.In, "source_channel").getString(r);
String source_device = get(Fields.In, "source_device").getString(r);
String space = get(Fields.In, "space").getString(r);
String status = get(Fields.In, "status").getString(r);
String alarm_number = get(Fields.In, "alarm_number").getString(r);
近试上张水部tuples.append(alarm_definition+",").append(alarm_level+",").append(ext_similarity+",").append(last_update_date+",").append(occ
long incrementLinesOutput = incrementLinesOutput();
count++;
//logError("6***2019***" + String());
//logError("7***2019***" + String.valueOf(count));
if (count >= 100) {
try {
/
/logError("8***2019***");
CopyManager cm = new CopyManager((BaConnection) con);
StringReader sr = new String());
long starttime = System.currentTimeMillis();
long rows = cm.copyIn(sql, sr);
long endtime = System.currentTimeMillis();
long costs = endtime - starttime;
//logError(String.valueOf(rows) + " rows, costs " + String.valueOf(costs) + " ms");
count = 0;
tuples.delete(0, tuples.length());加力涡扇发动机
} catch (Exception ) {
.printStackTrace();
//logError("9******"+.getMessage());
return fal;
}
}
return true;
}
public boolean init(StepMetaInterface stepMetaInterface, StepDataInterface stepDataInterface) { if (!parent.initImpl(stepMetaInterface, stepDataInterface)) {
return fal;天宝寺
}
String url = getParameter("URL");
String urName = getParameter("USERNAME");
String passWd = getParameter("PASSWORD");
try {
Properties props = new Properties();
props.tProperty("ur", urName);
props.tProperty("password", passWd);
con = Connection(url, props);
con.tAutoCommit(fal);
} catch (Exception e) {
//logError("Connecting to databa " + url + " failed.", e);
tErrors(1);
return fal;
}
return true;
}
public void dispo(StepMetaInterface smi, StepDataInterface sdi) {
try {
if (con != null) {
con.clo();
}
} catch (Exception e) {
/
/logError("clo conn failed.", e);
tErrors(1);
return;
}
工资方案
parent.dispoImpl(smi, sdi);
秋之梦}
3) 通过表输出,单条数据写⼊(效率低,不推荐)
配置表输⼊和表输出,即可实现从oracle导出到DWS。