Storm Trident学习总结
1、Trident
Trident
可以理解为 Storm
批处理的高级抽象,提供了分组、分区、聚合、函数等操作。
这里涉及到几个概念,做一下说明:
- batch: 出于效率原因,对tuple进行批量操作。若干的tuple组合就是batch,Trident的操作就是对batch的操作
- partition: 将一个batch的数据分区,分成多个partition,或者可以理解为多个子batch,然后多个partition可以并发处理。
所以,分区后要分配并发度(parallelismHint),然后才能进行并发处理,否则分区无用。(也可以简单理解为taskNumber)
- repartition:对partition的操作。aggregate()隐含有repartition操作
1.1、Partition-local operations
分区本地操作不涉及网络传输,并且独立地应用于每个批处理分区。
以下操作都属于分区本地操作。
1.1.1、Function
函数接受一组输入字段并发出零个或多个元组作为输出。
输出元组的字段将附加到流中的原始输入元组。
如果函数没有发出元组,则过滤掉原始输入元组。否则,为每个输出元组复制输入元组。
1.1.2、Filter
过滤器将元组作为输入,并决定是否保留该元组。
1.1.3、map/flatMap
map
返回一个流,该流经过指定函数操作后,输出的 tuple
将作为新的流。属于 one-one transformation
。见 storm demo代码 中的 LowerCaseFunction
。
flatMap
与 map
类似,但属于 one-more transformation
,同样,输出的 tuple
将作为新的流。
如果未指定输出的字段,则 map
和 flatMap
会将输入字段保留为输出字段,当流的内容已改变。
如果指定了输出字段,则 map
和 flatMap
的输出流将会采用新的字段。
1.1.4、peek
peek
可用于在每个 trident tuple
经过时对其执行额外操作。这可以用于调试,以便在元组流经管道中的某个点时查看元组。
1.1.5、min/minBy
min
和 minBy
返回在 trident
流中在每个分区上一批 tuple
的最小值。
同理,max and maxBy
相反。
1.1.6、partitionAggregate
partitionAggregate
对每个分区上的每批 tuple
进行聚合操作。与函数不同,partitionAggregate
输出的 tuple
会替换给它的输入 tuple
。
partitionAggregate
不做分区操作,只对现有分区内的每批 tuple
进行聚合操作。
partitionAggregate
只能使用 Aggregator
作为聚合器。Aggregator
是针对 batch
操作的。
1.1.7、stateQuery and partitionPersist
stateQuery
用于查询状态源,partitionPersist
用于更新状态源。此功能暂未测试。
1.1.8、projection
投影操作,只保留指定字段的数据。
1.2、Repartitioning operations
重新分区操作运行一个函数来更改元组在任务之间的分区方式。分区数也可能因重新分区而改变(例如,如果重新分配后并行性变得更大)。重新分区需要网络传输。
- shuffle: 在所有目标分区中随机均匀重新分配tuple。
- broadcast: 每个tuple都复制到所有目标分区。
- partitionBy:接收一组字段,并根据该组字段进行语义分区。确保指定字段相同的tuple能够被发送到同一个partition。(但同一个partition里可能有字段不同的tuple)。
- 保证同一组字段始终转到同一目标分区。
- global: 所有tuple都被发送到同一个分区。流中的所有批次选择相同的分区。
- batchGlobal: 所在同一批次的tuple都将发送到同一分区。不同批次可能会转到不同的分区。
- partition: 此方法接受实现org.apache.storm.grouping.CustomStreamGrouping的自定义分区函数。
1.3、parallelismHint
parallelismHint
适用于所有操作的并发度,直到遇到某个 repartition
操作为止。
可以理解 parallelismHint
对操作的并发度有个影响范围,即 parallelismHint
的前一个 repartition
操作到 parallelismHint
的后一个 repartition
操作。
如果没有 repartition
操作,那 parallelismHint
对所有操作有效。
Spout
默认只有1个分区,按照 partition
的定义,要想指定多并行度,必须先由多个分区,如果只有一个分区,并行度指定值大于1也没有意义。
1、不指定parallelismHint
TridentTopology topology = new TridentTopology(); topology.newStream("spout-random", new RandomBatchSpout(new Fields("name", "event"))) .each(new Fields("name"), new PrintFilter("view")) .each(new Fields("name"), new PrintFilter("print"));
没有给Spout指定并行度,也没有通过parallelismHint给流操作指定,默认partition等于1,并行度为1。
2、指定parallelismHint,不重新分区
TridentTopology topology = new TridentTopology(); topology.newStream("spout-random", new RandomBatchSpout(new Fields("name", "event"))) .parallelismHint(2) .each(new Fields("name"), new PrintFilter("view")) .each(new Fields("name"), new PrintFilter("print"));
通过 parallelismHint
指定了并行度为2,过程中并没有进行重新分区操作,那么它的作用范围就是从 spout
到最后一个 each
,范围内,所有操作的 partition
等于2。
注意:这种情况下,parallelismHint
位置不影响结果。
3、指定parallelismHint,重新分区

和第二步相比,它多了一步重新分区操作,重新分区,意味着之后的操作将采用新的并行度,由于没有指定并行度,所以默认为1。
注意:
如果spout
的getComponentConfiguration()
指定了conf.setMaxTaskParallelism
(maxTaskParallelism) 配置,且parallelismHint
大于maxTaskParallelism
,并行度将为maxTaskParallelism
。
Aggregator
聚合器有:Aggregator
、CombinerAggregator
、ReducerAggregator
。
CombinerAggregator
在每个输入 tuple
上运行 init()
,然后通过 combine()
聚合结果值直到只剩下一个 tuple
。如果分区中没有任何 tuple
,CombinerAggregator
将返回 zero()
中定义的 tuple
。
ReducerAggregator
使用 init()
生成初始值,然后针对每个输入tuple
调用 reduce()
迭代计算,最终作为单个 tuple
输出。
CombinerAggregator
与 ReducerAggregator
的区别在于,CombinerAggregator
的每个输入 tuple
都是在 init()
先进行初始化,而 ReducerAggregator
的每个输入 tuple
都是直接计算。
CombinerAggregator
与 ReducerAggregator
,最终都返回 一个 tuple
,且这个 tuple
只有一个 field
。
Aggregator
是使用聚合的最通用接口。它可以使用任意数量的字段发出任意数量的 tuple
。可以在执行期间的任何时候发出 tuple
。它以下列方式执行:
- 1、在处理一批数据之前先调用
init
方法。init
方法的返回值是一个代表着聚合状态的对象,这个对象接下来会被传入aggregate
方法和complete
方法中。 - 2、在分区的
batch
中,每输入一个tuple
都会调用aggregate
方法。这个方法能够更新状态并且有选择地输出tuple
。 - 3、在分区的
batch
中,当所有的tuple都被聚合处理时,将调用complete
方法。
1.6、Aggregation operations
Trident
有 aggregate()
和 persistentAggregate()
方法,作用在 Streams
上进行聚合操作。
与 partitionAggregate
的区别在于,partitionAggregate
是对本地分区内的每个 batch
的聚合操作。
aggregate()
方法在每个 batch
上独立执行的。
aggregate()
、persistentAggregate()
、partitionAggregate()
的聚合操作都要由具体的聚合器来实现。
当使用 ReduceAggregator
或者 Aggregator
聚合器时, 流先被重新划分成一个大分区(仅有一个 partition), 然后对这个 partition
做聚合操作;
当使用 CombinerAggregator
聚合器时, Trident
首先对每个 partition
局部聚合, 然后将所有这些 partition
重新划分到一个 partition
中, 完成全局聚合。
相比而言, CombinerAggregator
比 ReduceAggregator
更高效, 推荐使用。
persistentAggregate()
方法将聚合流中所有 batch
的 tuple
,将每个 batch
的结果存储在状态源中作为中间态,最终会做一个总的聚合。
中间状态可以持久化,同时支持事务性。
persistentAggregate()
也可以运行在 groupBy()
的流中,在这种情况下,结果将保存在一个以分组字段作为 key
的 MapState
中。
Operations on grouped streams
groupBy()
操作比较特殊,需要看后面跟什么聚合操作。
groupBy() + partitionAggregate()
groupBy
后跟 partitionAggregate()
,不涉及分区操作,是在现有的分区基础上根据 field
进行分组,分完组后,再在每个分组上进行聚合。
由于相同的 field
可能分布在每个分区上,且 partitionAggregate()
是对分区上的每批 batch
操作,所以,相同的 field
,会出现多个聚合结果。
以 “open” 事件为例,查看生产的数据。

以 “open” 事件为例,查看聚合的结果。

对比生产数据的结果和聚合汇总的结果。

groupBy() + aggregate()
如果 groupBy()
后面是 aggregate()
的话,先进行 partitionBy
分区,然后在每个 partition
上分组,最后,在每个分组上进行聚合。
groupBy()
后跟 aggregate()
,先根据并行度进行分区,在每个 partition
上根据 field
进行分组,分完组后,再在每个分组上进行聚合。
这里涉及到按照 field
重新分区的操作(相当于 partitionBy
),在 partition
上再按照 field
分组,这样,一个 field
只存在一个 partition
上,且单个 batch
的 field
都是相同的,而aggregate()
是对分区上的 batch
进行聚合操作的。
所以,相比第一种方式,第二种方式更合适。
Today, with all the fast life style that everyone is having, credit cards have a big demand throughout the economy. Persons coming from every discipline are using credit card and people who aren’t using the credit cards have made arrangements to apply for one in particular. Thanks for expressing your ideas about credit cards. https://hypertensionmedi.com buy hypertension drugs