一、介绍
Storm的开发语言主要是Java和Clojure,其中Java定义骨架,而Clojure编写核心逻辑。源码统计结果:
Code highlighting produced by Actipro CodeHighlighter (freeware)
http://www.CodeHighlighter.com/
–>
180
textfiles.
177
uniquefiles.
7
filesignored.
http:
//
cloc.sourceforge.netv1.55T=1.0s(171.0files/s,46869.0lines/s)
——————————————————————————-
Languagefilesblankcommentcode
——————————————————————————-
Java
125
5010
2414
25661
Lisp
33
732
283
4871
Python
7
742
433
4675
CSS
1
12
45
1837
ruby
2
22
0
104
BourneShell
1
0
0
6
Javascript
2
1
15
6
——————————————————————————-
SUM:
171
6519
3190
37160
——————————————————————————-
Java代码25000多行,而Clojure(Lisp)只有4871行,说语言不重要再次证明是扯淡。
二、Topology和Nimbus
Topology是storm的核心理念,将spout和bolt组织成一个topology,运行在storm集群里,完成实时分析和计算的任务。这里我主要想介绍下topology部署到storm集群的大概过程。提交一个topology任务到Storm集群是通过StormSubmitter.submitTopology方法提交:
Code highlighting produced by Actipro CodeHighlighter (freeware)
http://www.CodeHighlighter.com/
–> StormSubmitter.submitTopology(name,conf,builder.createTopology());
我们将topology打成jar包后,利用bin/storm这个python脚本,执行如下命令:
Code highlighting produced by Actipro CodeHighlighter (freeware)
http://www.CodeHighlighter.com/
–> bin / stormjarxxxx.jarcom.taobao.MyTopologyargs
将jar包提交给storm集群。storm脚本会启动JVM执行Topology的main方法,执行submitTopology的过程。而submitTopology会将jar 文件上传 到nimbus,上传是通过socket传输。在storm这个python脚本的jar方法里可以看到:
Code highlighting produced by Actipro CodeHighlighter (freeware)
http://www.CodeHighlighter.com/
–>
def
jar(jarfile,klass,
*
args):
exec_storm_class(
klass,
jvmtype
=
“
-client
“
,
extrajars
=
[jarfile,CONF_DIR,STORM_DIR
+
"
/bin
"
],
args
=
args,
prefix
=
“
exportSTORM_JAR=
“
+
jarfile
+
“
;
“
)
将jar文件的地址设置为环境变量STORM_JAR,这个环境变量在执行submitTopology的时候用到:
Code highlighting produced by Actipro CodeHighlighter (freeware)
http://www.CodeHighlighter.com/
–>
//
StormSubmitter.java
private
static
void
submitJar(Mapconf){
if
(submittedJar
==
null
){
LOG.info(
“
Jarnotuploadedtomasteryet.Submittingjar
“
);
StringlocalJar
=
System.getenv(
“
STORM_JAR
“
)
;
submittedJar
=
submitJar(conf,localJar);
}
else
{
LOG.info(
“
Jaralreadyuploadedtomaster.Notsubmittingjar.
“
);
}
}
通过环境变量找到jar包的地址,然后上传。利用环境变量传参是个小技巧。
其次,nimbus在接收到jar文件后,存放到数据目录的inbox目录, nimbus数据目录的结构 :
Code highlighting produced by Actipro CodeHighlighter (freeware)
http://www.CodeHighlighter.com/
–>
-
nimbus
-
inbox
-
stormjar
-
57f1d694
-
2865
-
4b3b
-
8a7c
-
99104fc0aea3.jar
-
stormjar
-
76b4e316
-
b430
-
4215
-
9e26
-
4f33ba4ee520.jar
-
stormdist
-
storm
-
id
-
stormjar.jar
-
stormconf.ser
-
stormcode.ser
其中inbox用于存放提交的jar文件,每个jar文件都重命名为stormjar加上一个32位的UUID。而stormdist存放的是启动topology后生成的文件,每个topology都分配一个唯一的id,ID的规则是“name-计数-时间戳”。启动后的topology的jar文件名命名为storm.jar ,而它的配置经过java序列化后存放在stormconf.ser文件,而stormcode.ser是将topology本身序列化后存放的文件。
这些文件在部署的时候,supervisor会从这个目录下载这些文件,然后在supervisor本地执行这些代码。
进入重点,topology任务的分配过程(zookeeper路径说明忽略root):
1.在zookeeper上创建/taskheartbeats/{storm id} 路径,用于任务的心跳检测。storm对zookeeper的一个重要应用就是利用zk的临时节点做存活检测。task将定时刷新节点的时间戳,然后nimbus会检测这个时间戳是否超过timeout设置。
2.从topology中获取bolts,spouts设置的并行数目以及全局配置的最大并行数,然后产生task id列表,如[1 2 3 4]
3.在zookeeper上创建/tasks/{strom id}/{task id}路径,并存储task信息
4.开始分配任务(内部称为assignment), 具体步骤:
(1)从zk上获得已有的assignment(新的toplogy当然没有了)
(2)查找所有可用的slot,所谓slot就是可用的worker,在所有supervisor上配置的多个worker的端口。
(3)将任务均匀地分配给可用的worker,这里有两种情况:
(a)task数目比worker多,例如task是[1 2 3 4],可用的slot只有[host1:port1 host2:port1],那么最终是这样分配
Code highlighting produced by Actipro CodeHighlighter (freeware)
http://www.CodeHighlighter.com/
–>
{
1
:[host1:port1]
2
:[host2:port1]
3
:[host1:port1]
4
:[host2:port1]}
,可以看到任务平均地分配在两个worker上。
(b)如果task数目比worker少,例如task是[1 2],而worker有[host1:port1 host1:port2 host2:port1 host2:port2],那么首先会将woker排序,
将不同host间隔排列
,保证task不会全部分配到同一个worker上,也就是将worker排列成
Code highlighting produced by Actipro CodeHighlighter (freeware)
http://www.CodeHighlighter.com/
–> [host1:port1host2:port1host1:port2host2:port2]
,然后分配任务为
Code highlighting produced by Actipro CodeHighlighter (freeware)
http://www.CodeHighlighter.com/
–> { 1 :host1:port1, 2 :host2:port2}
(4)记录启动时间
(5)判断现有的assignment是否跟重新分配的assignment相同,如果相同,不需要变更,否则更新assignment到zookeeper的/assignments/{storm id}上。
5.启动topology,所谓启动,只是将zookeeper上/storms/{storm id}对应的数据里的active设置为true。
6.nimbus会检查task的心跳,如果发现task心跳超过超时时间,那么会重新跳到第4步做re-assignment。