Flinkmap函数操作数据库
刚接触flink没多久,做的⼀个flink流处理任务,状况百出,下⾯聊⼀聊关于数据库操作出的状况。
需求:需要从数据库取⼀些判断条件,流数据根据判断条件做⼀些变换(map),所以决定直接在map⾥操作数据库
1.最初版(调试前):第⼀反应,操作数据库,上连接池,所以在main⾥⾯直接建了⼀个连接池(druid),然后再map函数⾥⽤。结果直接⽆法运⾏,原因,⽆法序列化。看druid源码,druid的Connection实例是没有实现序列化接⼝的。。。
2.本着先实现再优化的原则:直接在map函数⾥建⽴jdbc连接,操作数据库(但是总感觉⼀定会有问题,频繁连接,断开,本来也影响性能)。调试OK,测试环境跑了⼀段时间,被告知数据库的连接数爆掉了。。。(可能跟代码不严谨有关系,漏了⼀个地⽅的连接关闭操作)⽆论如何,准备优化。
3.各种查资料,得出结论,⽤RichMapFunction来实现,在open()⽅法中建⽴连接,在clo()⽅法中关闭连接,map⽅法中应⽤。调试OK...⾼⾼兴兴回家,突然晚上觉得会有问题,连接长时间不⽤,会不会被mysql服务器主动断掉?第⼆天早上到公司(⼀个晚上没有数据处理,但是任务⼀直启动着),果然,数据库连接丢失,所有数据处理都出问题了。
4.既然猜到是数据库连接被服务器端主动关闭的问题,那么还是上连接池吧,druid⾛起。仍然是RichMapFunction,open⽅法中建⽴druid的datasource,clo⽅法中关闭druidDatasource。仍然搁⼀个晚上,第⼆天早上,新数据正常处理,代码⽚段如下:
public abstract class MapFuctionWithDataBa<I, O> extends RichMapFunction<I, O> {
private static final long rialVersionUID = 20000L;
private String driver;
private String url;
private String urname;
private String password;
private DruidDataSource dataSource;
public MapFuctionWithDataBa(String driver, String url, String urname, String password) {
this.driver = driver;
this.url = url;
this.urname = urname;
this.password = password;
}
@Override
public void open(Configuration parameters) throws Exception {
dataSource = new DruidDataSource();
dataSource.tDriverClassName(driver);
dataSource.tUrl(url);
dataSource.tUrname(urname);
dataSource.tPassword(password);
dataSource.tValidationQuery("lect 1");
}
@Override
public void clo() throws Exception {
dataSource.clo();
}
public DataSource getDataSource(){
return dataSource;
}
}
PS:map⽅法没有做实现,实际业务中根据业务需求去实现,map函数中调⽤getDataSource⽅法获取dataSource,获取
Connection(其实例实际上是DruidPooledConnection,可以放⼼的执⾏clo操作)