Storm的Window机制

Storm 在1.0版本引入了 windowing 机制,使得开发者可以很方便的做一些统计计算。

1、常用window函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
withWindow(Count windowLength, Count slidingInterval) 
Tuple count based sliding window that slides after slidingInterval number of tuples.
//长度用tuple数量衡量的滑动窗口,每当有slidingInternal个tuple到来后触发一次

withWindow(Count windowLength)
Tuple count based window that slides with every incoming tuple.
//长度用tuple数量衡量的滑动窗口,每个tuple到来都会触发一次

withWindow(Count windowLength, Duration slidingInterval)
Tuple count based sliding window that slides after slidingInterval time duration.
//长度用tuple数量衡量的滑动窗口,每经过slidingInternal时间段后触发一次

withWindow(Duration windowLength, Duration slidingInterval)
Time duration based sliding window that slides after slidingInterval time duration.
//长度用时间段衡量的滑动窗口,每经过slidingInternal时间段后触发一次

withWindow(Duration windowLength)
Time duration based window that slides with every incoming tuple.
//长度用时间段衡量的滑动窗口,每个tuple到来都会触发一次

withWindow(Duration windowLength, Count slidingInterval)
Time duration based sliding window that slides after slidingInterval number of tuples.
//长度用时间段衡量的滑动窗口,每当有slidingInternal个tuple到来后触发一次

withTumblingWindow(BaseWindowedBolt.Count count)
Count based tumbling window that tumbles after the specified count of tuples.
//长度用tuple数量衡量的滚动窗口,每当有count个tuple到来触发一次

withTumblingWindow(BaseWindowedBolt.Duration duration)
Time duration based tumbling window that tumbles after the specified time duration.
//长度用时间段衡量的滚动窗口,每经过duration时间段后触发一次

2、大概原理

将Tuple抽象为Event, Event中增加了时间戳(默认为系统时间戳,也可以为Tuple中的时间戳)。

WindowedBoltExecutor: 负责初始化WindowManager、TimestampExtractor、TriggerPolicy、EvictionPolicy、WaterMarkEventGenerator和一些其它参数配置,并启动WaterMarkEventGenerator。同时对原始的IWindowedBolt的实现类进行了一次包装,完成了对bolt实现类的prepare,execute,cleanup方法调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
private WindowManager<Tuple> initWindowManager(WindowLifecycleListener<Tuple> lifecycleListener, Map stormConf, TopologyContext context) {
WindowManager<Tuple> manager = new WindowManager(lifecycleListener);
Duration windowLengthDuration = null;
Count windowLengthCount = null;
Duration slidingIntervalDuration = null;
Count slidingIntervalCount = null;
if (stormConf.containsKey("topology.bolts.window.length.count")) {
windowLengthCount = new Count(((Number)stormConf.get("topology.bolts.window.length.count")).intValue());
} else if (stormConf.containsKey("topology.bolts.window.length.duration.ms")) {
windowLengthDuration = new Duration(((Number)stormConf.get("topology.bolts.window.length.duration.ms")).intValue(), TimeUnit.MILLISECONDS);
}

...

this.validate(stormConf, windowLengthCount, windowLengthDuration, slidingIntervalCount, slidingIntervalDuration);
this.evictionPolicy = this.getEvictionPolicy(windowLengthCount, windowLengthDuration, manager);
this.triggerPolicy = this.getTriggerPolicy(slidingIntervalCount, slidingIntervalDuration, manager, this.evictionPolicy);
manager.setEvictionPolicy(this.evictionPolicy);
manager.setTriggerPolicy(this.triggerPolicy);
return manager;
}

protected void start() {
if (this.waterMarkEventGenerator != null) {
LOG.debug("Starting waterMarkEventGenerator");
this.waterMarkEventGenerator.start();
}

LOG.debug("Starting trigger policy");
this.triggerPolicy.start();
}

public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.windowedOutputCollector = new WindowedBoltExecutor.WindowedOutputCollector(collector);
this.bolt.prepare(stormConf, context, this.windowedOutputCollector);
this.listener = this.newWindowLifecycleListener();
this.windowManager = this.initWindowManager(this.listener, stormConf, context);
this.start();
LOG.debug("Initialized window manager {} ", this.windowManager);
}

public void execute(Tuple input) {
if (this.isTupleTs()) { //判断是否使用tuple中的时间戳
long ts = this.timestampExtractor.extractTimestamp(input);
if (this.waterMarkEventGenerator.track(input.getSourceGlobalStreamId(), ts)) {
this.windowManager.add(input, ts); // 用tuple中的时间戳创建Event
} else {
if (this.lateTupleStream != null) {
this.windowedOutputCollector.emit(this.lateTupleStream, input, new Values(new Object[]{input}));
} else {
LOG.info("Received a late tuple {} with ts {}. This will not be processed.", input, ts);
}

this.windowedOutputCollector.ack(input);
}
} else {
this.windowManager.add(input); //使用系统时间戳
}

}

WindowManager:负责Tuple的存取操作,WindowedBoltExecutor在执行bolt实现类的execute方法时,将Tuple转换成Event,并将Event放入ConcurrentLinkedQueue中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public void add(T event) {
this.add(event, System.currentTimeMillis());
}

public void add(T event, long ts) {
this.add((Event)(new EventImpl(event, ts)));
}

public void add(Event<T> windowEvent) {
if (!windowEvent.isWatermark()) {
this.queue.add(windowEvent);
} else {
LOG.debug("Got watermark event with ts {}", windowEvent.getTimestamp());
}

this.track(windowEvent);
this.compactWindow();
}

TimestampExtractor: 默认实现者为TupleFieldTimestampExtractor,负责从Tuple提取对应的时间戳,默认情况下为系统处理的时间戳,可以通过 withTimestampField 从指定的 fieldName 中提取。

TriggerPolicy:窗口的触发策略,用于确定窗口的计算点,以时间或者Tuplt数量为标准。

WaterMarkEventGenerator:一个调度任务,每隔watermarkInterval时间计算一下最新的waterMark(输入流中最新的元组时间戳的最小值减去Lag值),如果结果比lastWaterMark值大,则更新lastWaterMark

aterMarkEventGenerator.track方法用于计算该tuple是否应该处理,如果该tuple的timestamp小于lastWaterMarkTs,则返回false,如果有配置Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM则会发送给该stream,没有则打印log。

1
2
3
4
5
6
7
8
9
10
11
/**
* 获取下一个对齐窗口的结束时间
*
* startTs (excluding)
* endTs (including)
* 如果没有更多要处理的事件,则返回下一个窗口的对齐窗口结束ts或Long.MAX_VALUE。
**/
private long getNextAlignedWindowTs(long startTs, long endTs) {
long nextTs = this.windowManager.getEarliestEventTs(startTs, endTs);
return nextTs != 9223372036854775807L && nextTs % this.slidingIntervalMs != 0L ? nextTs + (this.slidingIntervalMs - nextTs % this.slidingIntervalMs) : nextTs;
}

例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 数据
e1(2018-09-19 18:15:50), e2(2018-09-19 18:15:51), e3(2018-09-19 18:15:58), e4(2018-09-19 18:16:00), e5(2018-09-19 18:16:03)

# 窗口滑动参数
Window length = 10s, sliding interval = 10s, watermark emit frequency = 1s, max lag = 5s

2018-09-19 18:15:50
结束窗口:1537352150(2018-09-19 18:15:50)

2018-09-19 18:15:51
结束窗口:1537352151000 + (10000 - 1537352151000 % 10000) = 1537352160999 (2018-09-19 18:16:00)
.
.
.

将产生以下window:

1、 2018-09-19 18:15:41 ~ 2018-09-19 18:15:50 :e1
2、 2018-09-19 18:15:51 ~ 2018-09-19 18:16:00 :e2、e3、e4
3、 2018-09-19 18:16:01 ~ 2018-09-19 18:16:10 :e5

注意:
由于采用的时间没有精确到毫秒,只精确到秒,导致窗口的时间轴起点不是从0开始,而是从1开始。
如果不指定 withTimestampField,时间轴的起点不会特殊处理,从Event的最小时间开始,每隔 sliding interval 为一个窗口。

3、Watermark和Tag

Storm 在track各个流时,会获取各流的最大时间,再根据最大时间集合计算出最小时间,这个时间减tag 就是watermark的时间。

1
2
3
4
5
6
7
8
9
public boolean track(GlobalStreamId stream, long ts) {
Long currentVal = (Long)this.streamToTs.get(stream);
if (currentVal == null || ts > currentVal.longValue()) {
this.streamToTs.put(stream, ts);
}

this.checkFailures();
return ts >= this.lastWaterMarkTs;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
private long computeWaterMarkTs() {
long ts = 0L;
if (this.streamToTs.size() >= this.inputStreams.size()) {
ts = 9223372036854775807L;

Entry entry;
for(Iterator var3 = this.streamToTs.entrySet().iterator(); var3.hasNext(); ts = Math.min(ts, ((Long)entry.getValue()).longValue())) {
entry = (Entry)var3.next();
}
}

return ts - (long)this.eventTsLag;
}

watermark可能是Event的处理时间,也可能是指定的 withTimestampField

watermark和tag是为了解决tuple延迟的的问题。由于消息往往是乱序的,一个窗口不能为了等待延迟的tuple而一直停留着,对以时间段作为滑动间隔来触发的滑动窗口和以时间段作为窗口长度的滚动窗口来说,

1、生成第一个窗口:
2、生成水印
3、生成新的窗口
标记Event是否为水印

参考:

文章作者: OneRain
文章链接: https://kiswo.com/2017/08/24/stream-processing/storm/storm-window/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 OneRain's Blog