Flink批量写⼊HBa效率问题
Flink批量写⼊HBa效率问题
标题 HBa批量写⼊单次提交数据量⼤⼩问题
⽣产环境写⼊HBa体验:
清明散文如果写⼊HBa效率瓶颈在连接HBa费时上,keyBy(字段).countWindow(50000).apply(new …)⽐任何复杂的⾃定义时间窗⼝效率都⾼,太有感触了T T。
DataStream<List<Put>> sourceList = dataStream.ds(2)).apply(new OrderToHBaFunctionOutPutFormat()).name("format");
sourceList.writeUsingOutputFormat(new BulkPutHBaOutputFormat<Put>() {
@Override
public String getTableName() {
return "ORDER_DOC_SN_TEST";
}
@Override
public String getColumnFamily() {
return "CFD";
}
@Override
public List<Put> writeList(List<Put> list) {
return list;
}
}).name("write");
如果消费的数据量⽐较⼤的时候,写⼊HBa可能会出现阻塞,如:waiting for 2001 actions to finish on table:
ORDER_DOC_SN_TEST,从⽽影响实时计算的效率。
单从代码层⾯来讲,它是跟HBa配置hba.client.write.buffer⼤⼩有关系。默认是2097152=2M,可以通过该参数调整单次提交写⼊HBa数据量,⼀般建议设置2M到6M。
<property>
<name>hba.client.write.buffer</name>
<value>2097152</value>
<source&l</source>
</property>
可以通过⾃定义触发器Trigger,实现按照窗⼝中数据条数或者窗⼝时间触发后续操作。
trigger 接⼝有5个⽅法如下:
5.clear()⽅法执⾏任何需要清除的相应窗⼝
TriggerResult返回的操作:
1.CONTINUE:什么也不做
2.FIRE:触发计算
3.PURGE:清除窗⼝中的数据
4.FIRE_AND_PURGE:触发计算并清除窗⼝中的数据
需求:
1.窗⼝内每500条记录写⼊⼀次HBa
2.窗⼝内数据不⾜500条,窗⼝时间触发写⼊HBa
12万日元public class SinkOrderToHBaTestJob {
十七岁的单车public static void main(String[] args) throws Exception{
final StreamExecutionEnvironment env = ExecutionEnvironment();
Properties prop = new Properties();
什么是写作手法prop.load(ClassLoader().getResourceAsStream("config.properties"));
Properties properties = new Properties();
炒辣椒的做法properties.tProperty("bootstrap.rvers", Property("kafka.bootstrap.rvers"));
properties.tProperty("group.id", "SinkOrderToHBaTestJob");
//设置IngestionTime作为⽔位时间
env.tStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
/
/flink连接kafka
FlinkKafkaConsumer consumer = new FlinkKafkaConsumer("order_correct",new CustomOrderSourceYundaDeSerializationSchema(),properties);
consumer.tStartFromEarliest();
DataStream<OrderSourceYunDa> dataStream = env.addSource(consumer);
//在窗⼝内每500条数据触发⼀次
DataStream<List<Put>> sourceList = dataStream.ds(5)).trigger(OrderTrigger .create(500)).apply(new OrderToHBaFunct ionOutPutFormat()).name("format");
sourceList.writeUsingOutputFormat(new BulkPutHBaOutputFormat<Put>() {
@Override
public String getTableName() {
return "ORDER_DOC_SN_TEST";
}
@Override
public String getColumnFamily() {
return "CFD";
}
@Override
public List<Put> writeList(List<Put> list) {
// System.out.println("写⼊到hba数据量:"+list.size());
return list;
}
}).name("write");蒲公英根的功效与作用
}
}
class OrderTrigger extends Trigger<Object, TimeWindow> {
private static final long rivalVersionUID = 1L;
public CustomTrigger(){}
private static int flag = 0;
public static int threshold = 0;
@Override
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
flag++;
if(flag>=threshold){
flag = 0;羊骨汤的做法
ctx.deleteProcessingTimeTimer(window.maxTimestamp());
return TriggerResult.FIRE_AND_PURGE;
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
if(flag>0){
System.out.println("到达窗⼝时间执⾏触发:"+flag);
System.out.println("到达窗⼝时间执⾏触发:"+flag);
flag = 0;
return TriggerResult.FIRE_AND_PURGE;
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception { if(time>=window.maxTimestamp()&&flag>0){
System.out.println("到达时间窗⼝且有数据,触发操作!");
flag=0;
return TriggerResult.FIRE_AND_PURGE;
}el if(time>=window.maxTimestamp()&&flag==0){
//清除窗⼝但不触发
return TriggerResult.PURGE;
}
return TriggerResult.CONTINUE;
}
@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
ctx.deleteProcessingTimeTimer(window.maxTimestamp());
ctx.deleteEventTimeTimer(window.maxTimestamp());
}
public static CustomTrigger create(int value){
threshold = value;
return new CustomTrigger();
}
真想回家}