#头条创作挑战赛#
编写写入DM层业务代码DM层主要是报表数据,针对实时业务将DM层设置在Clickhou中,在此业务中DM层主要存储的是通过Flink读取Kafka “KAFKA-DWS-BROWSE-LOG-WIDE-TOPIC” topic中的数据进行设置窗口分析,每隔10s设置滚动窗口统计该窗口内访问商品及商品一级、二级分类分析结果,实时写入到Clickhou中。
一、代码编写具体代码参照“ProcessBrowLogInfoToDM.scala”,大体代码逻辑如下:
object ProcessBrowLogInfoToDM { def main(args: Array[String]): Unit = { //1.准备环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val tblEnv: StreamTableEnvironment = StreamTableEnvironment.create(env) env.enableCheckpointing(5000) import org.apache.flink.streaming.api.scala._ /** * 2.创建 Kafka Connector,连接消费Kafka dwd中数据 * */ tblEnv.executeSql( """ |create table kafka_dws_ur_login_wide_tbl ( | ur_id string, | product_name string, | first_category_name string, | cond_category_name string, | obtain_points string |) with ( | 'connector' = 'kafka', | 'topic' = 'KAFKA-DWS-BROWSE-LOG-WIDE-TOPIC', | 'properties.bootstrap.rvers'='node1:9092,node2:9092,node3:9092', | 'scan.startup.mode'='earliest-offt', --也可以指定 earliest-offt 、latest-offt | 'properties.group.id' = 'my-group-id', | 'format' = 'json' |) """.stripMargin) /** * 3.实时统计每个用户最近10s浏览的商品次数和商品一级、二级种类次数,存入到Clickhou */ val dwsTbl:Table = tblEnv.sqlQuery( """ | lect ur_id,product_name,first_category_name,cond_category_name from kafka_dws_ur_login_wide_tbl """.stripMargin) //4.将Row 类型数据转换成对象类型操作 val browDS: DataStream[BrowLogWideInfo] = tblEnv.toAppendStream[Row](dwsTbl) .map(row => { val ur_id: String = row.getField(0).toString val product_name: String = row.getField(1).toString val first_category_name: String = row.getField(2).toString val cond_category_name: String = row.getField(3).toString BrowLogWideInfo(null, ur_id, null, product_name, null, null, first_category_name, cond_category_name, null) }) val dwsDS: DataStream[ProductVisitInfo] = browDS.keyBy(info => { info.first_category_name + "-" + info.cond_category_name + "-" + info.product_name }) .timeWindow(Time.conds(10)) .process(new ProcessWindowFunction[BrowLogWideInfo, ProductVisitInfo, String, TimeWindow] { override def process(key: String, context: Context, elements: Iterable[BrowLogWideInfo], out: Collector[ProductVisitInfo]): Unit = { val currentDt: String = DateUtil.getDateYYYYMMDD(context.window.getStart.toString) val startTime: String = DateUtil.getDateYYYYMMDDHHMMSS(context.window.getStart.toString) val endTime: String = DateUtil.getDateYYYYMMDDHHMMSS(context.window.getEnd.toString) val arr: Array[String] = key.split("-") val firstCatName: String = arr(0) val condCatName: String = arr(1) val productName: String = arr(2) val cnt: Int = elements.toList.size out.collect(ProductVisitInfo(currentDt, startTime, endTime, firstCatName, condCatName, productName, cnt)) } }) /** * 5.将以上结果写入到Clickhou表 dm_product_visit_info 表中 * create table dm_product_visit_info( * current_dt String, * window_start String, * window_end String, * first_cat String, * cond_cat String, * product String, * product_cnt UInt32 * ) engine = MergeTree() order by current_dt * */ //准备向ClickHou中插入数据的sql val inrtIntoCkSql = "inrt into dm_product_visit_info (current_dt,window_start,window_end,first_cat,cond_cat,product,product_cnt) values (?,?,?,?,?,?,?)" val ckSink: SinkFunction[ProductVisitInfo] = MyClickHouUtil.clickhouSink[ProductVisitInfo](inrtIntoCkSql,new JdbcStatementBuilder[ProductVisitInfo] { override def accept(pst: PreparedStatement, productVisitInfo: ProductVisitInfo): Unit = { pst.tString(1,productVisitInfo.currentDt) pst.tString(2,productVisitInfo.windowStart) pst.tString(3,productVisitInfo.windowEnd) pst.tString(4,productVisitInfo.firstCat) pst.tString(5,productVisitInfo.condCat) pst.tString(6,productVisitInfo.product) pst.tLong(7,productVisitInfo.productCnt) } }) //针对数据加入sink dwsDS.addSink(ckSink) env.execute() }}
二、创建Clickhou-DM层表
代码在执行之前需要在Clickhou中创建对应的DM层商品浏览信息表dm_product_visit_info,clickhou建表语句如下:
#node1节点启动clickhou[root@node1 bin]# rvice clickhou-rver start#node1节点进入clickhou[root@node1 bin]# clickhou-client -m#node1节点创建clickhou-DM层表create table dm_product_visit_info( current_dt String, window_start String, window_end String, first_cat String, cond_cat String, product String, product_cnt UInt32) engine = MergeTree() order by current_dt;
三、代码测试
以上代码编写完成后,代码执行测试步骤如下:
1、将代码中消费Kafka数据改成从头开始消费代码中Kafka Connector中属性“scan.startup.mode”设置为“earliest-offt”,从头开始消费数据。
这里也可以不设置从头开始消费Kafka数据,而是直接启动向日志采集接口模拟生产日志代码“RTMockUrLogData.java”,需要启动日志采集接口及Flume。
2、执行代码,查看对应结果以上代码执行后在,在Clickhou-DM层中表“dm_product_visit_info”中查看对应数据结果如下:
四、架构图本文发布于:2023-02-28 20:04:00,感谢您对本站的认可!
本文链接:https://www.wtabcd.cn/zhishi/a/167765383675656.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文word下载地址:hhmm(汗汗吗漫画官方在线阅读页面免费漫画入口页面弹窗漫画).doc
本文 PDF 下载地址:hhmm(汗汗吗漫画官方在线阅读页面免费漫画入口页面弹窗漫画).pdf
留言与评论(共有 0 条评论) |