Storm Trident学习总结

1、Trident

Trident 可以理解为 Storm 批处理的高级抽象,提供了分组、分区、聚合、函数等操作。

这里涉及到几个概念,做一下说明:

  • batch: 出于效率原因,对tuple进行批量操作。若干的tuple组合就是batch,Trident的操作就是对batch的操作
  • partition: 将一个batch的数据分区,分成多个partition,或者可以理解为多个子batch,然后多个partition可以并发处理。 所以,分区后要分配并发度(parallelismHint),然后才能进行并发处理,否则分区无用。(也可以简单理解为taskNumber)
  • repartition:对partition的操作。aggregate()隐含有repartition操作

1.1、Partition-local operations

分区本地操作不涉及网络传输,并且独立地应用于每个批处理分区。

以下操作都属于分区本地操作。

1.1.1、Function

函数接受一组输入字段并发出零个或多个元组作为输出。

输出元组的字段将附加到流中的原始输入元组。

如果函数没有发出元组,则过滤掉原始输入元组。否则,为每个输出元组复制输入元组。

1.1.2、Filter

过滤器将元组作为输入,并决定是否保留该元组。

1.1.3、map/flatMap

map 返回一个流,该流经过指定函数操作后,输出的 tuple 将作为新的流。属于 one-one transformation。见 storm demo代码 中的 LowerCaseFunction

flatMap 与 map 类似,但属于 one-more transformation,同样,输出的 tuple 将作为新的流。

如果未指定输出的字段,则 map 和 flatMap 会将输入字段保留为输出字段,当流的内容已改变。

如果指定了输出字段,则 map 和 flatMap 的输出流将会采用新的字段。

1.1.4、peek

peek 可用于在每个 trident tuple 经过时对其执行额外操作。这可以用于调试,以便在元组流经管道中的某个点时查看元组。

1.1.5、min/minBy

min 和 minBy 返回在 trident 流中在每个分区上一批 tuple 的最小值。

同理,max and maxBy 相反。

1.1.6、partitionAggregate

partitionAggregate 对每个分区上的每批 tuple 进行聚合操作。与函数不同,partitionAggregate 输出的 tuple 会替换给它的输入 tuple

partitionAggregate 不做分区操作,只对现有分区内的每批 tuple 进行聚合操作。

partitionAggregate 只能使用 Aggregator 作为聚合器。Aggregator 是针对 batch 操作的。

1.1.7、stateQuery and partitionPersist

stateQuery 用于查询状态源,partitionPersist 用于更新状态源。此功能暂未测试。

1.1.8、projection

投影操作,只保留指定字段的数据。

1.2、Repartitioning operations

重新分区操作运行一个函数来更改元组在任务之间的分区方式。分区数也可能因重新分区而改变(例如,如果重新分配后并行性变得更大)。重新分区需要网络传输。

  • shuffle: 在所有目标分区中随机均匀重新分配tuple。
  • broadcast: 每个tuple都复制到所有目标分区。
  • partitionBy:接收一组字段,并根据该组字段进行语义分区。确保指定字段相同的tuple能够被发送到同一个partition。(但同一个partition里可能有字段不同的tuple)。
  • 保证同一组字段始终转到同一目标分区。
  • global: 所有tuple都被发送到同一个分区。流中的所有批次选择相同的分区。
  • batchGlobal: 所在同一批次的tuple都将发送到同一分区。不同批次可能会转到不同的分区。
  • partition: 此方法接受实现org.apache.storm.grouping.CustomStreamGrouping的自定义分区函数。

1.3、parallelismHint

parallelismHint 适用于所有操作的并发度,直到遇到某个 repartition 操作为止。

可以理解 parallelismHint 对操作的并发度有个影响范围,即 parallelismHint 的前一个 repartition 操作到 parallelismHint 的后一个 repartition 操作。

如果没有 repartition 操作,那 parallelismHint 对所有操作有效。

Spout 默认只有1个分区,按照 partition 的定义,要想指定多并行度,必须先由多个分区,如果只有一个分区,并行度指定值大于1也没有意义。

1、不指定parallelismHint

TridentTopology topology = new TridentTopology();
topology.newStream("spout-random", new RandomBatchSpout(new Fields("name", "event")))
        .each(new Fields("name"), new PrintFilter("view"))
        .each(new Fields("name"), new PrintFilter("print"));

没有给Spout指定并行度,也没有通过parallelismHint给流操作指定,默认partition等于1,并行度为1。

2、指定parallelismHint,不重新分区

TridentTopology topology = new TridentTopology();
topology.newStream("spout-random", new RandomBatchSpout(new Fields("name", "event")))
        .parallelismHint(2)
        .each(new Fields("name"), new PrintFilter("view"))
        .each(new Fields("name"), new PrintFilter("print"));

通过 parallelismHint 指定了并行度为2,过程中并没有进行重新分区操作,那么它的作用范围就是从 spout 到最后一个 each,范围内,所有操作的 partition 等于2。

注意:这种情况下,parallelismHint 位置不影响结果。

3、指定parallelismHint,重新分区

和第二步相比,它多了一步重新分区操作,重新分区,意味着之后的操作将采用新的并行度,由于没有指定并行度,所以默认为1。

注意:
如果 spout 的 getComponentConfiguration() 指定了 conf.setMaxTaskParallelism(maxTaskParallelism) 配置,且 parallelismHint 大于 maxTaskParallelism,并行度将为 maxTaskParallelism

Aggregator

聚合器有:AggregatorCombinerAggregatorReducerAggregator

CombinerAggregator 在每个输入 tuple 上运行 init(),然后通过 combine() 聚合结果值直到只剩下一个 tuple。如果分区中没有任何 tupleCombinerAggregator 将返回 zero() 中定义的 tuple

ReducerAggregator 使用 init() 生成初始值,然后针对每个输入tuple 调用 reduce() 迭代计算,最终作为单个 tuple 输出。

CombinerAggregator 与 ReducerAggregator 的区别在于,CombinerAggregator 的每个输入 tuple 都是在 init() 先进行初始化,而 ReducerAggregator 的每个输入 tuple 都是直接计算。

CombinerAggregator 与 ReducerAggregator,最终都返回 一个 tuple,且这个 tuple 只有一个 field

Aggregator 是使用聚合的最通用接口。它可以使用任意数量的字段发出任意数量的 tuple。可以在执行期间的任何时候发出 tuple。它以下列方式执行:

  • 1、在处理一批数据之前先调用 init 方法。 init 方法的返回值是一个代表着聚合状态的对象,这个对象接下来会被传入 aggregate 方法和 complete 方法中。
  • 2、在分区的 batch 中,每输入一个 tuple 都会调用 aggregate 方法。这个方法能够更新状态并且有选择地输出 tuple
  • 3、在分区的 batch 中,当所有的tuple都被聚合处理时,将调用 complete 方法。

1.6、Aggregation operations

Trident 有 aggregate() 和 persistentAggregate() 方法,作用在 Streams 上进行聚合操作。

与 partitionAggregate 的区别在于,partitionAggregate 是对本地分区内的每个 batch 的聚合操作。

aggregate() 方法在每个 batch 上独立执行的。

aggregate()persistentAggregate()partitionAggregate() 的聚合操作都要由具体的聚合器来实现。

当使用 ReduceAggregator 或者 Aggregator 聚合器时, 流先被重新划分成一个大分区(仅有一个 partition), 然后对这个 partition 做聚合操作;

当使用 CombinerAggregator 聚合器时, Trident 首先对每个 partition 局部聚合, 然后将所有这些 partition 重新划分到一个 partition 中, 完成全局聚合。

相比而言, CombinerAggregator 比 ReduceAggregator 更高效, 推荐使用。

persistentAggregate() 方法将聚合流中所有 batch 的 tuple,将每个 batch 的结果存储在状态源中作为中间态,最终会做一个总的聚合。

中间状态可以持久化,同时支持事务性。

persistentAggregate() 也可以运行在 groupBy() 的流中,在这种情况下,结果将保存在一个以分组字段作为 key的 MapState 中。

Operations on grouped streams

groupBy() 操作比较特殊,需要看后面跟什么聚合操作。

groupBy() + partitionAggregate()

groupBy 后跟 partitionAggregate(),不涉及分区操作,是在现有的分区基础上根据 field 进行分组,分完组后,再在每个分组上进行聚合。

由于相同的 field 可能分布在每个分区上,且 partitionAggregate() 是对分区上的每批 batch 操作,所以,相同的 field,会出现多个聚合结果。

以 “open” 事件为例,查看生产的数据。

以 “open” 事件为例,查看聚合的结果。

对比生产数据的结果和聚合汇总的结果。

groupBy() + aggregate()

如果 groupBy() 后面是 aggregate() 的话,先进行 partitionBy 分区,然后在每个 partition 上分组,最后,在每个分组上进行聚合。

groupBy() 后跟 aggregate() ,先根据并行度进行分区,在每个 partition 上根据 field 进行分组,分完组后,再在每个分组上进行聚合。

这里涉及到按照 field 重新分区的操作(相当于 partitionBy),在 partition 上再按照 field 分组,这样,一个 field 只存在一个 partition 上,且单个 batch 的 field 都是相同的,而aggregate() 是对分区上的 batch 进行聚合操作的。

所以,相比第一种方式,第二种方式更合适。

One Reply to “Storm Trident学习总结”

  1. Today, with all the fast life style that everyone is having, credit cards have a big demand throughout the economy. Persons coming from every discipline are using credit card and people who aren’t using the credit cards have made arrangements to apply for one in particular. Thanks for expressing your ideas about credit cards. https://hypertensionmedi.com buy hypertension drugs

发表评论

您的电子邮箱地址不会被公开。