Storm是一个分布式的、容错的实时计算系统,遵循Eclipse Public License 1.0,Storm可以方便地在一个计算机集群中编写与扩展复杂的实时计算,Storm之于实时处理,就好比Hadoop之于批处理。Storm保证每个消息都会得到处理,而且它很快——在一个小集群中,每秒可以处理数以百万计的消息。可以使用任意编程语言来做开发。
主要商业应用及案例:Twitter
Storm的优点
1. 简单的编程模型。类似于MapReduce降低了并行批处理复杂性,Storm降低了进行实时处理的复杂性。
2. 服务化,一个服务框架,支持热部署,即时上线或下线App.
3. 可以使用各种编程语言。你可以在Storm之上使用各种编程语言。默认支持Clojure、Java、Ruby和Python。要增加对其他语言的支持,只需实现一个简单的Storm通信协议即可。
4. 容错性。Storm会管理工作进程和节点的故障。
5. 水平扩展。计算是在多个线程、进程和服务器之间并行进行的。
6. 可靠的消息处理。Storm保证每个消息至少能得到一次完整处理。任务失败时,它会负责从消息源重试消息。
7. 快速。系统的设计保证了消息能得到快速的处理,使用ZeroMQ作为其底层消息队列。
8. 本地模式。Storm有一个“本地模式”,可以在处理过程中完全模拟Storm集群。这让你可以快速进行开发和单元测试。
Storm目前存在的问题
1.
目前的开源版本中只是单节点
Nimbus
,挂掉只能自动重启,可以考虑实现一个双
nimbus
的布局。
2. Clojure
是一个在
JVM
平台运行的动态函数式编程语言
,
优势在于流程计算,
Storm
的部分核心内容由
Clojure
编写,虽然性能上提高不少但同时也提升了维护成本。
Storm 集群由一个主节点和多个工作节点组成。主节点运行了一个名为 “Nimbus” 的守护进程,用于分配代码、布置任务及故障检测。每个工作节点都运行了一个名为 “Supervisor” 的守护进程,用于监听工作,开始并终止工作进程。 Nimbus 和 Supervisor 都能快速失败,而且是无状态的,这样一来它们就变得十分健壮,两者的协调工作是由 Zookeeper 来完成的。 ZooKeeper 用于管理集群中的不同组件, ZeroMQ 是内部消息系统, JZMQ 是 ZeroMQMQ 的 Java Binding 。有个名为 storm-deploy 的子项目,可以在 AWS 上一键部署 Storm 集群 .
Storm 的术语包括 Stream 、 Spout 、 Bolt 、 Task 、 Worker 、 Stream Grouping 和 Topology 。 Stream 是被处理的数据。 Sprout 是数据源。 Bolt 处理数据。 Task 是运行于 Spout 或 Bolt 中的 线程。 Worker 是运行这些线程的进程。 Stream Grouping 规定了 Bolt 接收什么东西作为输入数据。数据可以随机分配(术语为 Shuffle ),或者根据字段值分配(术语为 Fields ),或者 广播(术语为 All ),或者总是发给一个 Task (术语为 Global ),也可以不关心该数据(术语为 None ),或者由自定义逻辑来决定(术语为 Direct )。 Topology 是由 Stream Grouping 连接起来的 Spout 和 Bolt 节点网络 . 下面进行详细介绍:
- Topologies 用于封装一个实时计算应用程序的逻辑,类似于 Hadoop 的 MapReduce Job
- Stream 消息流,是一个没有边界的 tuple 序列,这些 tuples 会被以一种分布式的方式并行地创建和处理
- Spouts 消息源,是消息生产者,他会从一个外部源读取数据并向 topology 里面面发出消息: tuple
- Bolts 消息处理者,所有的消息处理逻辑被封装在 bolts 里面,处理输入的数据流并产生输出的新数据流 , 可执行过滤,聚合,查询数据库等操作
- Task 每一个 Spout 和 Bolt 会被当作很多 task 在整个集群里面执行 , 每一个 task 对应到一个线程 .
- Stream groupings 消息分发策略 , 定义一个 Topology 的其中一步是定义每个 tuple 接受什么样的流作为输入 ,stream grouping 就是用来定义一个 stream 应该如果分配给 Bolts 们 .
stream grouping 分类
1. Shuffle Grouping:
随机分组,
随机派发
stream
里面的
tuple
,
保证每个
bolt
接收到的
tuple
数目相同
.
2. Fields Grouping
:按字段分组,
比如按
userid
来分组,
具有同样
userid
的
tuple
会被分到相同的
Bolts
,
而不同的
userid
则会被分配到不同的
Bolts.
3. All Grouping
:
广播发送,
对于每一个
tuple
,
所有的
Bolts
都会收到
.
4. Global Grouping:
全局分组,这个
tuple
被分配到
storm
中的一个
bolt
的其中一个
task.
再具体一点就是分配给
id
值最低的那个
task.
5. Non Grouping:
不分组,意思是说
stream
不关心到底谁会收到它的
tuple.
目前他和
Shuffle grouping
是一样的效果
,
有点不同的是
storm
会把这个
bolt
放到这个
bolt
的订阅者同一个线程去执行
.
6. Direct Grouping:
直接分组
,
这是一种比较特别的分组方法,用这种分组意味着消息的发送者举鼎由消息接收者的哪个
task
处理这个消息
.
只有被声明为
Direct Stream
的消息流可以声明这种分组方法
.
而且这种消息
tuple
必须使用
emitDirect
方法来发射
.
消息处理者可以通过
TopologyContext
来或者处理它的消息的
taskid (OutputCollector.emit
方法也会返回
taskid)
Storm 如何保证消息被处理
storm 保证每个 tuple 会被 topology 完整的执行。 storm 会追踪由每个 spout tuple 所产生的 tuple 树 ( 一个 bolt 处理一个 tuple 之后可能会发射别的 tuple 从而可以形成树状结构 ), 并且跟踪这棵 tuple 树什么时候成功处理完。每个 topology 都有一个消息超时的设置, 如果 storm 在这个超时的时间内检测不到某个 tuple 树到底有没有执行成功, 那么 topology 会把这个 tuple 标记为执行失败,并且过一会会重新发射这个 tuple 。
一个 tuple 能根据新获取到的 spout 而触发创建基于此的上千个 tuple
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(1, new KestrelSpout("kestrel.backtype.com",
22133,
"sentence_queue",
new StringScheme()));
builder.setBolt(2, new SplitSentence(), 10)
.shuffleGrouping(1);
builder.setBolt(3, new WordCount(), 20)
.fieldsGrouping(2, new Fields("word"));
这个 topology 从 kestrel queue 读取句子 , 并把句子划分成单词 , 然后汇总每个单词出现的次数 , 一个 tuple 负责读取句子 , 每一个 tuple 分别对应计算每一个单词出现的次数 , 大概样子如下所示 :
一个 tuple 的生命周期 :
public interface ISpout extends Serializable {
void open(Map conf, TopologyContext context, SpoutOutputCollector collector);
void close();
void nextTuple();
void ack(Object msgId);
void fail(Object msgId);
}
首先 storm 通过调用 spout 的 nextTuple 方法来获取下一个 tuple, Spout 通过 open 方法参数里面提供的 SpoutOutputCollector 来发射新 tuple 到它的其中一个输出消息流 , 发射 tuple 的时候 spout 会提供一个 message-id, 后面我们通过这个 tuple-id 来追踪这个 tuple 。举例来说, KestrelSpout 从 kestrel 队列里面读取一个消息,并且把 kestrel 提供的消息 id 作为 message-id, 看例子:
collector.emit(new Values("field1", "field2", 3) , msgId);
接下来,
这个发射的
tuple
被传送到消息处理者
bolt
那里,
storm
会跟踪这个消息的树形结构是否创建
,
根据
messageid
调用
Spout
里面的
ack
函数以确认
tuple
是否被完全处理。如果
tuple
超时就会调用
spout
的
fail
方法。由此看出同一个
tuple
不管是
acked
还是
fail
都是由创建他的那个
spout
发出的
,
所以即使
spout
在集群环境中执行了很多的
task,
这个
tule
也不会被不同的
task
去
acked
或
failed.
当
kestrelspout
从
kestrel
队列中得到一个消息后会打开这个他
,
这意味着他并不会把此消息拿走
,
消息的状态会显示为
pending,
直到等待确认此消息已经处理完成
,
处于
pending
状态直到
ack
或者
fail
被调用
,
处于
"Pending"
的消息不会再被其他队列消费者使用
.
如果在这过程中
spout
中处理此消息的
task
断开连接或失去响应则此
pending
的消息会回到
"
等待处理
"
状态
.
1.
流聚合
流聚合把两个或者多个数据流聚合成一个数据流
—
基于一些共同的
tuple
字段。
builder.setBolt(5, new MyJoiner(), parallelism)
.fieldsGrouping(1, new Fields("joinfield1", "joinfield2"))
.fieldsGrouping(2, new Fields("joinfield1", "joinfield2"))
.fieldsGrouping(3, new Fields("joinfield1", "joinfield2"))
2.
批处理
有时候为了性能或者一些别的原因,
你可能想把一组
tuple
一起处理,
而不是一个个单独处理。
3.BasicBolt
1.
读一个输入
tuple
2.
根据这个输入
tuple
发射一个或者多个
tuple
3.
在
execute
的方法的最后
ack
那个输入
tuple
遵循这类模式的
bolt
一般是函数或者是过滤器
,
这种模式太常见,
storm
为这类模式单独封装了一个接口
: IbasicBolt
4.
内存内缓存
+Fields grouping
组合
在
bolt
的内存里面缓存一些东西非常常见。缓存在和
fields grouping
结合起来之后就更有用了。比如,你有一个
bolt
把短链接变成长链接
(bit.ly, t.co
之类的
)
。你可以把短链接到长链接的对应关系利用
LRU
算法缓存在内存里面以避免重复计算。比如组件一发射短链接,组件二把短链接转化成长链接并缓存在内存里面。看一下下面两段代码有什么不一样:
builder.setBolt(2, new ExpandUrl(), parallelism)
.shuffleGrouping(1);
builder.setBolt(2, new ExpandUrl(), parallelism)
.fieldsGrouping(1, new Fields("url"));
5.
计算
top N
比如你有一个
bolt
发射这样的
tuple: "value", "count"
并且你想一个
bolt
基于这些信息算出
top N
的
tuple
。最简单的办法是有一个
bolt
可以做一个全局的
grouping
的动作并且在内存里面保持这
top N
的值。
这个方式对于大数据量的流显然是没有扩展性的,
因为所有的数据会被发到同一台机器。一个更好的方法是在多台机器上面并行的计算这个流每一部分的
top N,
然后再有一个
bolt
合并这些机器上面所算出来的
top N
以算出最后的
top N,
代码大概是这样的
:
builder.setBolt(2, new RankObjects(), parallellism)
.fieldsGrouping(1, new Fields("value"));
builder.setBolt(3, new MergeObjects())
.globalGrouping(2);
这个模式之所以可以成功是因为第一个
bolt
的
fields grouping
使得这种并行算法在语义上是正确的。
用
TimeCacheMap
来高效地保存一个最近被更新的对象的缓存
6.
用
TimeCacheMap
来高效地保存一个最近被更新的对象的缓存
有时候你想在内存里面保存一些最近活跃的对象,以及那些不再活跃的对象。
TimeCacheMap
是一个非常高效的数据结构,它提供了一些
callback
函数使得我们在对象不再活跃的时候我们可以做一些事情
.
7.
分布式
RPC:CoordinatedBolt
和
KeyedFairBolt
用
storm
做分布式
RPC
应用的时候有两种比较常见的模式
:
它们被封装在
CoordinatedBolt
和
KeyedFairBolt
里面
. CoordinatedBolt
包装你的
bolt,
并且确定什么时候你的
bolt
已经接收到所有的
tuple,
它主要使用
Direct Stream
来做这个
.
KeyedFairBolt
同样包装你的
bolt
并且保证你的
topology
同时处理多个
DRPC
调用,而不是串行地一次只执行一个。