Flinkmap函数操作数据库

更新时间:2023-05-12 02:27:56 阅读: 评论:0

Flinkmap函数操作数据库
刚接触flink没多久,做的⼀个flink流处理任务,状况百出,下⾯聊⼀聊关于数据库操作出的状况。
需求:需要从数据库取⼀些判断条件,流数据根据判断条件做⼀些变换(map),所以决定直接在map⾥操作数据库
1.最初版(调试前):第⼀反应,操作数据库,上连接池,所以在main⾥⾯直接建了⼀个连接池(druid),然后再map函数⾥⽤。结果直接⽆法运⾏,原因,⽆法序列化。看druid源码,druid的Connection实例是没有实现序列化接⼝的。。。
2.本着先实现再优化的原则:直接在map函数⾥建⽴jdbc连接,操作数据库(但是总感觉⼀定会有问题,频繁连接,断开,本来也影响性能)。调试OK,测试环境跑了⼀段时间,被告知数据库的连接数爆掉了。。。(可能跟代码不严谨有关系,漏了⼀个地⽅的连接关闭操作)⽆论如何,准备优化。
3.各种查资料,得出结论,⽤RichMapFunction来实现,在open()⽅法中建⽴连接,在clo()⽅法中关闭连接,map⽅法中应⽤。调试OK...⾼⾼兴兴回家,突然晚上觉得会有问题,连接长时间不⽤,会不会被mysql服务器主动断掉?第⼆天早上到公司(⼀个晚上没有数据处理,但是任务⼀直启动着),果然,数据库连接丢失,所有数据处理都出问题了。
4.既然猜到是数据库连接被服务器端主动关闭的问题,那么还是上连接池吧,druid⾛起。仍然是RichMapFunction,open⽅法中建⽴druid的datasource,clo⽅法中关闭druidDatasource。仍然搁⼀个晚上,第⼆天早上,新数据正常处理,代码⽚段如下:
public abstract class MapFuctionWithDataBa<I, O> extends RichMapFunction<I, O> {
private static final long rialVersionUID = 20000L;
private String driver;
private String url;
private String urname;
private String password;
private DruidDataSource dataSource;
public MapFuctionWithDataBa(String driver, String url, String urname, String password) {
this.driver = driver;
this.url = url;
this.urname = urname;
this.password = password;
}
@Override
public void open(Configuration parameters) throws Exception {
dataSource = new DruidDataSource();
dataSource.tDriverClassName(driver);
dataSource.tUrl(url);
dataSource.tUrname(urname);
dataSource.tPassword(password);
dataSource.tValidationQuery("lect 1");
}
@Override
public void clo() throws Exception {
dataSource.clo();
}
public DataSource getDataSource(){
return dataSource;
}
}
PS:map⽅法没有做实现,实际业务中根据业务需求去实现,map函数中调⽤getDataSource⽅法获取dataSource,获取
Connection(其实例实际上是DruidPooledConnection,可以放⼼的执⾏clo操作)

本文发布于:2023-05-12 02:27:56,感谢您对本站的认可!

本文链接:https://www.wtabcd.cn/fanwen/fan/82/593550.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:数据库   连接   实现   操作   问题
相关文章
留言与评论(共有 0 条评论)
   
验证码:
推荐文章
排行榜
Copyright ©2019-2022 Comsenz Inc.Powered by © 专利检索| 网站地图