Flink-6.Flink分组求和
u.flink.project;
ity.BehaviorChannelCount;穆天子
ity.MarketingUrBehavior;
import org.apache.eventtime.WatermarkStrategy;
import org.apache.functions.AggregateFunction;
import org.apache.functions.RichMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.vironment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.sql.Timestamp;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
public class Flink_Sql_Marketing {
public static void main(String[] args) throws Exception {
long start = System.currentTimeMillis();
StreamExecutionEnvironment env = ExecutionEnvironment();
env.tParallelism(1);
DataStream<MarketingUrBehavior> dataStream = env.addSource(new SimulatedMarketingUrBehaviorSource()) .assignTimestampsAndWatermarks(WatermarkStrategy
遗憾近义词.<MarketingUrBehavior>forBoundedOutOfOrderness(Duration.ofSeconds(0))
.withTimestampAssigner((event, timestamp) -> Timestamp()));
dataStream.filter(data -> !"UNINSTALL".Behavior()))
.keyBy(new KeySelector<MarketingUrBehavior, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> getKey(MarketingUrBehavior urBehavior) throws Exception {
return new Tuple2<>(Channel(), Behavior());
}
})
// .keyBy(MarketingUrBehavior::getBehavior)
.window(SlidingEventTimeWindows.of(Time.hours(1), ds(1)))
.aggregate(new AverageAggregate(), new MyProcessWindowFunction())
.print("分组求和:");
dataStream.filter(data -> !"UNINSTALL".Behavior()))
.
map(new MyMapFunction())
.keyBy(data -> data.f0)
.window(SlidingEventTimeWindows.of(Time.hours(1), ds(1)))
nfc功能怎么打开.aggregate(new AverageAggregate1(), new MyWindowFunction())
.print("total:");
System.out.println("耗时: " + (System.currentTimeMillis() - start) / 1000);
}
private static class SimulatedMarketingUrBehaviorSource implements SourceFunction<MarketingUrBehavior> {
boolean running = true;
List<String> behaviorList = Arrays.asList("CLICK", "DOWNLOAD", "INSTALL", "UNINSTALL");
List<String> channelList = Arrays.asList("app store", "wechat", "tencent", "ali");
Random rand = new Random();
@Override
public void run(SourceContext<MarketingUrBehavior> sourceContext) throws Exception {
while (running) {房门颜色
long urId = Long();
String behavior = (Int(behaviorList.size()));
String channel = (Int(channelList.size()));
long timestamp = System.currentTimeMillis();
MarketingUrBehavior urBehavior = new MarketingUrBehavior(urId, behavior, channel, timestamp); System.out.println(urBehavior);迷恋
Thread.sleep(100);
}
}
@Override
public void cancel() {
running = fal;
}
}
private static class MyMapFunction extends RichMapFunction<MarketingUrBehavior, Tuple2<String, Long>> { @Override
public Tuple2<String, Long> map(MarketingUrBehavior urBehavior) throws Exception {
return new Tuple2<>("total", 1L);
}
}
private static class AverageAggregate
implements AggregateFunction<MarketingUrBehavior, Long, Long> {
随笔800字
@Override
public Long createAccumulator() {
return 0L;
}中学生英文
@Override
public Long add(MarketingUrBehavior urBehavior, Long aLong) {
return aLong + 1;
}
@Override
public Long getResult(Long aLong) {
return aLong;
}
@Override
public Long merge(Long a, Long b) {
return a + b;
}
}
private static class AverageAggregate1
implements AggregateFunction<Tuple2<String, Long>, Long, Long> {
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(Tuple2<String, Long> tuple, Long aLong) {
return aLong + 1;
}
@Override
public Long getResult(Long aLong) {
隐患排查记录表return aLong;
}
@Override
public Long merge(Long a, Long b) {
return a + b;
}