流式计算之Storm简介

系统 1652 0

Storm是一个分布式的、容错的实时计算系统,遵循Eclipse Public License 1.0,Storm可以方便地在一个计算机集群中编写与扩展复杂的实时计算,Storm之于实时处理,就好比Hadoop之于批处理。Storm保证每个消息都会得到处理,而且它很快——在一个小集群中,每秒可以处理数以百万计的消息。可以使用任意编程语言来做开发。
主要商业应用及案例:Twitter
Storm的优点
1. 简单的编程模型。类似于MapReduce降低了并行批处理复杂性,Storm降低了进行实时处理的复杂性。
2. 服务化,一个服务框架,支持热部署,即时上线或下线App.
3. 可以使用各种编程语言。你可以在Storm之上使用各种编程语言。默认支持Clojure、Java、Ruby和Python。要增加对其他语言的支持,只需实现一个简单的Storm通信协议即可。
4. 容错性。Storm会管理工作进程和节点的故障。
5. 水平扩展。计算是在多个线程、进程和服务器之间并行进行的。
6. 可靠的消息处理。Storm保证每个消息至少能得到一次完整处理。任务失败时,它会负责从消息源重试消息。
7. 快速。系统的设计保证了消息能得到快速的处理,使用ZeroMQ作为其底层消息队列。
8. 本地模式。Storm有一个“本地模式”,可以在处理过程中完全模拟Storm集群。这让你可以快速进行开发和单元测试。


Storm目前存在的问题

1.   目前的开源版本中只是单节点 Nimbus ,挂掉只能自动重启,可以考虑实现一个双 nimbus 的布局。
2. Clojure
是一个在 JVM 平台运行的动态函数式编程语言 , 优势在于流程计算,   Storm 的部分核心内容由 Clojure 编写,虽然性能上提高不少但同时也提升了维护成本。

Storm 架构

Storm 集群由一个主节点和多个工作节点组成。主节点运行了一个名为 “Nimbus” 的守护进程,用于分配代码、布置任务及故障检测。每个工作节点都运行了一个名为 “Supervisor” 的守护进程,用于监听工作,开始并终止工作进程。 Nimbus Supervisor 都能快速失败,而且是无状态的,这样一来它们就变得十分健壮,两者的协调工作是由 Zookeeper 来完成的。 ZooKeeper 用于管理集群中的不同组件, ZeroMQ 是内部消息系统, JZMQ ZeroMQMQ Java Binding 。有个名为 storm-deploy 的子项目,可以在 AWS 上一键部署 Storm 集群 .

流式计算之Storm简介

Storm 术语解释

Storm 的术语包括 Stream Spout Bolt Task Worker Stream Grouping Topology Stream 是被处理的数据。 Sprout 是数据源。 Bolt 处理数据。 Task 是运行于 Spout Bolt 中的   线程。 Worker 是运行这些线程的进程。 Stream Grouping 规定了 Bolt 接收什么东西作为输入数据。数据可以随机分配(术语为 Shuffle ),或者根据字段值分配(术语为 Fields ),或者   广播(术语为 All ),或者总是发给一个 Task (术语为 Global ),也可以不关心该数据(术语为 None ),或者由自定义逻辑来决定(术语为 Direct )。 Topology 是由 Stream Grouping 连接起来的 Spout Bolt 节点网络 . 下面进行详细介绍:

  • Topologies   用于封装一个实时计算应用程序的逻辑,类似于 Hadoop MapReduce Job

流式计算之Storm简介

  • Stream   消息流,是一个没有边界的 tuple 序列,这些 tuples 会被以一种分布式的方式并行地创建和处理
  • Spouts   消息源,是消息生产者,他会从一个外部源读取数据并向 topology 里面面发出消息: tuple
  • Bolts   消息处理者,所有的消息处理逻辑被封装在 bolts 里面,处理输入的数据流并产生输出的新数据流 , 可执行过滤,聚合,查询数据库等操作

流式计算之Storm简介

  • Task   每一个 Spout Bolt 会被当作很多 task 在整个集群里面执行 , 每一个 task 对应到一个线程 .

流式计算之Storm简介

  • Stream groupings   消息分发策略 , 定义一个 Topology 的其中一步是定义每个 tuple 接受什么样的流作为输入 ,stream grouping 就是用来定义一个 stream 应该如果分配给 Bolts .

stream grouping 分类

1. Shuffle Grouping:   随机分组,   随机派发 stream 里面的 tuple   保证每个 bolt 接收到的 tuple 数目相同 .
2. Fields Grouping
:按字段分组,   比如按 userid 来分组,   具有同样 userid tuple 会被分到相同的 Bolts   而不同的 userid 则会被分配到不同的 Bolts.
3. All Grouping
  广播发送,   对于每一个 tuple   所有的 Bolts 都会收到 .
4. Global Grouping:
  全局分组,这个 tuple 被分配到 storm 中的一个 bolt 的其中一个 task. 再具体一点就是分配给 id 值最低的那个 task.
5. Non Grouping:
  不分组,意思是说 stream 不关心到底谁会收到它的 tuple. 目前他和 Shuffle grouping 是一样的效果 , 有点不同的是 storm 会把这个 bolt 放到这个 bolt 的订阅者同一个线程去执行 .
6. Direct Grouping:
  直接分组 , 这是一种比较特别的分组方法,用这种分组意味着消息的发送者举鼎由消息接收者的哪个 task 处理这个消息 . 只有被声明为 Direct Stream 的消息流可以声明这种分组方法 . 而且这种消息 tuple 必须使用 emitDirect 方法来发射 . 消息处理者可以通过 TopologyContext 来或者处理它的消息的 taskid (OutputCollector.emit 方法也会返回 taskid)

Storm 如何保证消息被处理

storm 保证每个 tuple 会被 topology 完整的执行。 storm 会追踪由每个 spout tuple 所产生的 tuple ( 一个 bolt 处理一个 tuple 之后可能会发射别的 tuple 从而可以形成树状结构 ),   并且跟踪这棵 tuple 树什么时候成功处理完。每个 topology 都有一个消息超时的设置,   如果 storm 在这个超时的时间内检测不到某个 tuple 树到底有没有执行成功,   那么 topology 会把这个 tuple 标记为执行失败,并且过一会会重新发射这个 tuple

一个 tuple 能根据新获取到的 spout 而触发创建基于此的上千个 tuple

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout(1, new KestrelSpout("kestrel.backtype.com",

                                     22133,

                                     "sentence_queue",

                                     new StringScheme()));

builder.setBolt(2, new SplitSentence(), 10)

        .shuffleGrouping(1);

builder.setBolt(3, new WordCount(), 20)

        .fieldsGrouping(2, new Fields("word"));

这个 topology kestrel queue 读取句子 , 并把句子划分成单词 , 然后汇总每个单词出现的次数 , 一个 tuple 负责读取句子 , 每一个 tuple 分别对应计算每一个单词出现的次数 , 大概样子如下所示 :

流式计算之Storm简介

一个 tuple 的生命周期 :

public interface ISpout extends Serializable {

    void open(Map conf, TopologyContext context, SpoutOutputCollector collector);

    void close();

    void nextTuple();

    void ack(Object msgId);

    void fail(Object msgId);

}

首先 storm 通过调用 spout nextTuple 方法来获取下一个 tuple, Spout 通过 open 方法参数里面提供的 SpoutOutputCollector 来发射新 tuple 到它的其中一个输出消息流 ,   发射 tuple 的时候 spout 会提供一个 message-id,   后面我们通过这个 tuple-id 来追踪这个 tuple 。举例来说,   KestrelSpout kestrel 队列里面读取一个消息,并且把 kestrel 提供的消息 id 作为 message-id,   看例子:

collector.emit(new Values("field1", "field2", 3) , msgId);

 

接下来,   这个发射的 tuple 被传送到消息处理者 bolt 那里,   storm 会跟踪这个消息的树形结构是否创建 , 根据 messageid 调用 Spout 里面的 ack 函数以确认 tuple 是否被完全处理。如果 tuple 超时就会调用 spout fail 方法。由此看出同一个 tuple 不管是 acked 还是 fail 都是由创建他的那个 spout 发出的 , 所以即使 spout 在集群环境中执行了很多的 task, 这个 tule 也不会被不同的 task acked failed.
kestrelspout kestrel 队列中得到一个消息后会打开这个他 , 这意味着他并不会把此消息拿走 , 消息的状态会显示为 pending, 直到等待确认此消息已经处理完成 , 处于 pending 状态直到 ack 或者 fail 被调用 , 处于 "Pending" 的消息不会再被其他队列消费者使用 . 如果在这过程中 spout 中处理此消息的 task 断开连接或失去响应则此 pending 的消息会回到 " 等待处理 " 状态 .

Storm 的一些常用应用场景

1. 流聚合
流聚合把两个或者多个数据流聚合成一个数据流     基于一些共同的 tuple 字段。

builder.setBolt(5, new MyJoiner(), parallelism)

  .fieldsGrouping(1, new Fields("joinfield1", "joinfield2"))

  .fieldsGrouping(2, new Fields("joinfield1", "joinfield2"))

  .fieldsGrouping(3, new Fields("joinfield1", "joinfield2"))

 

2. 批处理
有时候为了性能或者一些别的原因,   你可能想把一组 tuple 一起处理,   而不是一个个单独处理。

3.BasicBolt
1.
  读一个输入 tuple
2.
  根据这个输入 tuple 发射一个或者多个 tuple
3.
  execute 的方法的最后 ack 那个输入 tuple
遵循这类模式的 bolt 一般是函数或者是过滤器 ,   这种模式太常见, storm 为这类模式单独封装了一个接口 : IbasicBolt

4. 内存内缓存 +Fields grouping 组合
bolt 的内存里面缓存一些东西非常常见。缓存在和 fields grouping 结合起来之后就更有用了。比如,你有一个 bolt 把短链接变成长链接 (bit.ly, t.co 之类的 ) 。你可以把短链接到长链接的对应关系利用 LRU 算法缓存在内存里面以避免重复计算。比如组件一发射短链接,组件二把短链接转化成长链接并缓存在内存里面。看一下下面两段代码有什么不一样:

builder.setBolt(2, new ExpandUrl(), parallelism)

  .shuffleGrouping(1);

builder.setBolt(2, new ExpandUrl(), parallelism)

  .fieldsGrouping(1, new Fields("url"));

5. 计算 top N
比如你有一个 bolt 发射这样的 tuple: "value", "count" 并且你想一个 bolt 基于这些信息算出 top N tuple 。最简单的办法是有一个 bolt 可以做一个全局的 grouping 的动作并且在内存里面保持这 top N 的值。
这个方式对于大数据量的流显然是没有扩展性的,   因为所有的数据会被发到同一台机器。一个更好的方法是在多台机器上面并行的计算这个流每一部分的 top N,   然后再有一个 bolt 合并这些机器上面所算出来的 top N 以算出最后的 top N,   代码大概是这样的 :

builder.setBolt(2, new RankObjects(), parallellism)

  .fieldsGrouping(1, new Fields("value"));

builder.setBolt(3, new MergeObjects())

  .globalGrouping(2);

这个模式之所以可以成功是因为第一个 bolt fields grouping 使得这种并行算法在语义上是正确的。
TimeCacheMap 来高效地保存一个最近被更新的对象的缓存

6. TimeCacheMap 来高效地保存一个最近被更新的对象的缓存
有时候你想在内存里面保存一些最近活跃的对象,以及那些不再活跃的对象。   TimeCacheMap   是一个非常高效的数据结构,它提供了一些 callback 函数使得我们在对象不再活跃的时候我们可以做一些事情 .

7. 分布式 RPC:CoordinatedBolt KeyedFairBolt
storm 做分布式 RPC 应用的时候有两种比较常见的模式 : 它们被封装在 CoordinatedBolt KeyedFairBolt 里面 . CoordinatedBolt 包装你的 bolt, 并且确定什么时候你的 bolt 已经接收到所有的 tuple, 它主要使用 Direct Stream 来做这个 .
KeyedFairBolt
同样包装你的 bolt 并且保证你的 topology 同时处理多个 DRPC 调用,而不是串行地一次只执行一个。

流式计算之Storm简介


更多文章、技术交流、商务合作、联系博主

微信扫码或搜索:z360901061

微信扫一扫加我为好友

QQ号联系: 360901061

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描下面二维码支持博主2元、5元、10元、20元等您想捐的金额吧,狠狠点击下面给点支持吧,站长非常感激您!手机微信长按不能支付解决办法:请将微信支付二维码保存到相册,切换到微信,然后点击微信右上角扫一扫功能,选择支付二维码完成支付。

【本文对您有帮助就好】

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描上面二维码支持博主2元、5元、10元、自定义金额等您想捐的金额吧,站长会非常 感谢您的哦!!!

发表我的评论
最新评论 总共0条评论