storm操作zookeeper的主要函数都定义在命名空间backtype.storm.cluster中(即cluster.clj文件中)。backtype.storm.cluster定义了两个重要protocol:ClusterState和StormClusterState。clojure中的protocol可以看成java中的接口,封装了一组方法。ClusterState协议中封装了一组与zookeeper进行交互的基础函数,如获取子节点函数,获取子节点数据函数等,ClusterState协议定义如下:
ClusterState协议
( set-ephemeral-node [ this path data ])
( delete-node [ this path ])
( create-sequential [ this path data ])
;; if node does not exist, create persistent with this data
( set-data [ this path data ])
( get-data [ this path watch? ])
( get-version [ this path watch? ])
( get-data-with-version [ this path watch? ])
( get-children [ this path watch? ])
( mkdirs [ this path ])
( close [ this ])
( register [ this callback ])
( unregister [ this id ]))
StormClusterState协议封装了一组storm与zookeeper进行交互的函数,可以将StormClusterState协议中的函数看成ClusterState协议中函数的"组合"。StormClusterState协议定义如下:
StormClusterState协议
( assignments [ this callback ])
( assignment-info [ this storm-id callback ])
( assignment-info-with-version [ this storm-id callback ])
( assignment-version [ this storm-id callback ])
( active-storms [ this ])
( storm-base [ this storm-id callback ])
( get-worker-heartbeat [ this storm-id node port ])
( executor-beats [ this storm-id executor->node+port ])
( supervisors [ this callback ])
( supervisor-info [ this supervisor-id ]) ;; returns nil if doesn't exist
( setup-heartbeats! [ this storm-id ])
( teardown-heartbeats! [ this storm-id ])
( teardown-topology-errors! [ this storm-id ])
( heartbeat-storms [ this ])
( error-topologies [ this ])
( worker-heartbeat! [ this storm-id node port info ])
( remove-worker-heartbeat! [ this storm-id node port ])
( supervisor-heartbeat! [ this supervisor-id info ])
( activate-storm! [ this storm-id storm-base ])
( update-storm! [ this storm-id new-elems ])
( remove-storm-base! [ this storm-id ])
( set-assignment! [ this storm-id info ])
( remove-storm! [ this storm-id ])
( report-error [ this storm-id task-id node port error ])
( errors [ this storm-id task-id ])
( disconnect [ this ]))
命名空间backtype.storm.cluster除了定义ClusterState和StormClusterState这两个重要协议外,还定义了两个重要函数:mk-distributed-cluster-state和mk-storm-cluster-state。
mk-distributed-cluster-state函数如下:
该函数返回一个实现了ClusterState协议的对象,通过这个对象就可以与zookeeper进行交互了。
mk-distributed-cluster-state函数
;; conf绑定了storm.yaml中的配置信息,是一个map对象
[ conf ]
;; zk绑定一个zk client,Storm使用CuratorFramework与Zookeeper进行交互
( let [ zk ( zk/mk-client conf ( conf STORM-ZOOKEEPER-SERVERS ) ( conf STORM-ZOOKEEPER-PORT ) :auth-conf conf )]
;; 创建storm集群在zookeeper上的根目录,默认值为/storm
( zk/mkdirs zk ( conf STORM-ZOOKEEPER-ROOT ))
( .close zk ))
;; callbacks绑定回调函数集合,是一个map对象
( let [ callbacks ( atom {})
;; active标示zookeeper集群状态
active ( atom true )
;; zk重新绑定新的zk client,该zk client设置了watcher,这样当zookeeper集群的状态发生变化时,zk server会给zk client发送相应的event,zk client设置的watcher会调用callbacks中相应回调函数来处理event
;; 启动nimbus时,callbacks是一个空集合,所以nimbus端收到event后不会调用任何回调函数;但是启动supervisor时,callbacks中注册了回调函数,所以当supervisor收到zk server发送的event后,会调用相应的回调函数
;; mk-client函数定义在zookeeper.clj文件中,请参见其定义部分
zk ( zk/mk-client conf
( conf STORM-ZOOKEEPER-SERVERS )
( conf STORM-ZOOKEEPER-PORT )
:auth-conf conf
:root ( conf STORM-ZOOKEEPER-ROOT )
;; :watcher绑定一个函数,指定zk client的默认watcher函数,state标示当前zk client的状态;type标示事件类型;path标示zookeeper上产生该事件的znode
;; 该watcher函数主要功能就是执行callbacks集合中的函数,callbacks集合中的函数是在mk-storm-cluster-state函数中通过调用ClusterState的register函数添加的
:watcher ( fn [ state type path ]
( when @ active
( when-not ( = :connected state )
( log-warn "Received event " state ":" type ":" path " with disconnected Zookeeper." ))
( when-not ( = :none type )
( doseq [ callback ( vals @ callbacks )]
( callback type path ))))))]
;; reify相当于java中的implements,这里表示实现一个协议
( reify
ClusterState
;; register函数用于将回调函数加入callbacks中,key是一个32位的标识
( register
[ this callback ]
( let [ id ( uuid )]
( swap! callbacks assoc id callback )
id ))
;; unregister函数用于将指定key的回调函数从callbacks中删除
( unregister
[ this id ]
( swap! callbacks dissoc id ))
;; 在zookeeper上添加一个临时节点
( set-ephemeral-node
[ this path data ]
( zk/mkdirs zk ( parent-path path ))
( if ( zk/exists zk path false )
( try-cause
( zk/set-data zk path data ) ; should verify that it's ephemeral
( catch KeeperException$NoNodeException e
( log-warn-error e "Ephemeral node disappeared between checking for existing and setting data" )
( zk/create-node zk path data :ephemeral )
))
( zk/create-node zk path data :ephemeral )))
;; 在zookeeper上添加一个顺序节点
( create-sequential
[ this path data ]
( zk/create-node zk path data :sequential ))
;; 修改某个节点数据
( set-data
[ this path data ]
;; note: this does not turn off any existing watches
( if ( zk/exists zk path false )
( zk/set-data zk path data )
( do
( zk/mkdirs zk ( parent-path path ))
( zk/create-node zk path data :persistent ))))
;; 删除指定节点
( delete-node
[ this path ]
( zk/delete-recursive zk path ))
;; 获取指定节点数据。path标示节点路径;watch?是一个布尔类型值,表示是否需要对该节点进行"观察",如果watch?=true,当调用set-data函数修改该节点数据后,
;; 会给zk client发送一个事件,zk client接收事件后,会调用创建zk client时指定的默认watcher函数(即:watcher绑定的函数)
( get-data
[ this path watch? ]
( zk/get-data zk path watch? ))
;; 与get-data函数的区别就是获取指定节点数据的同时,获取节点数据的version,version表示节点数据修改的次数
( get-data-with-version
[ this path watch? ]
( zk/get-data-with-version zk path watch? ))
;; 获取指定节点的version,watch?的含义与get-data函数中的watch?相同
( get-version
[ this path watch? ]
( zk/get-version zk path watch? ))
;; 获取指定节点的子节点列表,watch?的含义与get-data函数中的watch?相同
( get-children
[ this path watch? ]
( zk/get-children zk path watch? ))
;; 在zookeeper上创建一个节点
( mkdirs
[ this path ]
( zk/mkdirs zk path ))
;; 关闭zk client
( close
[ this ]
( reset! active false )
( .close zk )))))
mk-storm-cluster-state函数定义如下:
mk-storm-cluster-state函数非常重要,该函数返回一个实现了StormClusterState协议的实例,通过该实例storm就可以更加方便与zookeeper进行交互在启动nimbus和supervisor的函数中均调用了
mk-storm-cluster-state函数。关于nimbus和supervisor的启动将在之后的文章中介绍。
mk-storm-cluster-state函数
[ cluster-state-spec ]
;; satisfies?谓词相当于java中的instanceof,判断cluster-state-spec是不是ClusterState实例
( let [[ solo? cluster-state ] ( if ( satisfies? ClusterState cluster-state-spec )
[ false cluster-state-spec ]
[ true ( mk-distributed-cluster-state cluster-state-spec )])
;; 绑定topology id->回调函数的map,当/assignments/{topology id}数据发生变化时,zk client执行assignment-info-callback中topology id所对应的回调函数
assignment-info-callback ( atom {})
;; assignment-info-with-version-callback与assignment-info-callback类似
assignment-info-with-version-callback ( atom {})
;; assignment-version-callback与assignments-callback类似
assignment-version-callback ( atom {})
;; 当/supervisors标示的znode的子节点发生变化时,zk client执行supervisors-callback指向的函数
supervisors-callback ( atom nil )
;; 当/assignments标示的znode的子节点发生变化时,zk client执行assignments-callback指向的函数
assignments-callback ( atom nil )
;; 当/storms/{topology id}标示的znode的数据发生变化时,zk client执行storm-base-callback中topology id所对应的回调函数
storm-base-callback ( atom {})
;; register函数将"回调函数(fn ...)"添加到cluster-state的callbacks集合中,并返回标示该回调函数的uuid
state-id ( register
cluster-state
;; 定义"回调函数",type标示事件类型,path标示znode
( fn [ type path ]
;; subtree绑定路径前缀如"assignments"、"storms"、"supervisors"等,args存放topology id
( let [[ subtree & args ] ( tokenize-path path )]
;; condp相当于java中的switch
( condp = subtree
;; 当subtree="assignments"时,如果args为空,说明是/assignments的子节点发生变化,执行assignments-callback指向的回调函数,否则
;; 说明/assignments/{topology id}标示的节点数据发生变化,执行assignment-info-callback指向的回调函数
ASSIGNMENTS-ROOT ( if ( empty? args )
( issue-callback! assignments-callback )
( issue-map-callback! assignment-info-callback ( first args )))
;; 当subtree="supervisors"时,说明是/supervisors的子节点发生变化,执行supervisors-callback指向的回调函数
SUPERVISORS-ROOT ( issue-callback! supervisors-callback )
;; 当subtree="storms"时,说明是/storms/{topology id}标示的节点数据发生变化,执行storm-base-callback指向的回调函数
STORMS-ROOT ( issue-map-callback! storm-base-callback ( first args ))
;; this should never happen
( exit-process! 30 "Unknown callback for subtree " subtree args )))))]
;; 在zookeeper上创建storm运行topology所必需的znode
( doseq [ p [ ASSIGNMENTS-SUBTREE STORMS-SUBTREE SUPERVISORS-SUBTREE WORKERBEATS-SUBTREE ERRORS-SUBTREE ]]
( mkdirs cluster-state p))
;; 返回一个实现StormClusterState协议的实例
( reify
StormClusterState
;; 获取/assignments的子节点列表,如果callback不为空,将其赋值给assignments-callback,并对/assignments添加"节点观察"
( assignments
[ this callback ]
( when callback
( reset! assignments-callback callback ))
( get-children cluster-state ASSIGNMENTS-SUBTREE ( not-nil? callback )))
;; 获取/assignments/{storm-id}节点数据,即storm-id的分配信息,如果callback不为空,将其添加到assignment-info-callback中,并对/assignments/{storm-id}添加"数据观察"
( assignment-info
[ this storm-id callback ]
( when callback
( swap! assignment-info-callback assoc storm-id callback ))
( maybe-deserialize ( get-data cluster-state ( assignment-path storm-id ) ( not-nil? callback ))))
;; 获取/assignments/{storm-id}节点数据包括version信息,如果callback不为空,将其添加到assignment-info-with-version-callback中,并对/assignments/{storm-id}添加"数据观察"
( assignment-info-with-version
[ this storm-id callback ]
( when callback
( swap! assignment-info-with-version-callback assoc storm-id callback ))
( let [{ data :data version :version }
( get-data-with-version cluster-state ( assignment-path storm-id ) ( not-nil? callback ))]
{ :data ( maybe-deserialize data )
:version version }))
;; 获取/assignments/{storm-id}节点数据的version信息,如果callback不为空,将其添加到assignment-version-callback中,并对/assignments/{storm-id}添加"数据观察"
( assignment-version
[ this storm-id callback ]
( when callback
( swap! assignment-version-callback assoc storm-id callback ))
( get-version cluster-state ( assignment-path storm-id ) ( not-nil? callback )))
;; 获取storm集群中正在运行的topology id即/storms的子节点列表
( active-storms
[ this ]
( get-children cluster-state STORMS-SUBTREE false ))
;; 获取storm集群中所有有心跳的topology id即/workerbeats的子节点列表
( heartbeat-storms
[ this ]
( get-children cluster-state WORKERBEATS-SUBTREE false ))
;; 获取所有有错误的topology id即/errors的子节点列表
( error-topologies
[ this ]
( get-children cluster-state ERRORS-SUBTREE false ))
;; 获取指定storm-id进程的心跳信息,即/workerbeats/{storm-id}/{node-port}节点数据
( get-worker-heartbeat
[ this storm-id node port ]
( -> cluster-state
( get-data ( workerbeat-path storm-id node port ) false )
maybe-deserialize ))
;; 获取指定进程中所有线程的心跳信息
( executor-beats
[ this storm-id executor->node+port ]
;; need to take executor->node+port in explicitly so that we don't run into a situation where a
;; long dead worker with a skewed clock overrides all the timestamps. By only checking heartbeats
;; with an assigned node+port, and only reading executors from that heartbeat that are actually assigned,
;; we avoid situations like that
( let [ node+port->executors ( reverse-map executor->node+port )
all-heartbeats ( for [[[ node port ] executors ] node+port->executors ]
( ->> ( get-worker-heartbeat this storm-id node port )
( convert-executor-beats executors )
))]
( apply merge all-heartbeats )))
;; 获取/supervisors的子节点列表,如果callback不为空,将其赋值给supervisors-callback,并对/supervisors添加"节点观察"
( supervisors
[ this callback ]
( when callback
( reset! supervisors-callback callback ))
( get-children cluster-state SUPERVISORS-SUBTREE ( not-nil? callback )))
;; 获取/supervisors/{supervisor-id}节点数据,即supervisor的心跳信息
( supervisor-info
[ this supervisor-id ]
( maybe-deserialize ( get-data cluster-state ( supervisor-path supervisor-id ) false )))
;; 设置进程心跳信息
( worker-heartbeat!
[ this storm-id node port info ]
( set-data cluster-state ( workerbeat-path storm-id node port ) ( Utils/serialize info )))
;; 删除进程心跳信息
( remove-worker-heartbeat!
[ this storm-id node port ]
( delete-node cluster-state ( workerbeat-path storm-id node port )))
;; 创建指定storm-id的topology的用于存放心跳信息的节点
( setup-heartbeats!
[ this storm-id ]
( mkdirs cluster-state ( workerbeat-storm-root storm-id )))
;; 删除指定storm-id的topology的心跳信息节点
( teardown-heartbeats!
[ this storm-id ]
( try-cause
( delete-node cluster-state ( workerbeat-storm-root storm-id ))
( catch KeeperException e
( log-warn-error e "Could not teardown heartbeats for " storm-id ))))
;; 删除指定storm-id的topology的错误信息节点
( teardown-topology-errors!
[ this storm-id ]
( try-cause
( delete-node cluster-state ( error-storm-root storm-id ))
( catch KeeperException e
( log-warn-error e "Could not teardown errors for " storm-id ))))
;; 创建临时节点存放supervisor的心跳信息
( supervisor-heartbeat!
[ this supervisor-id info ]
( set-ephemeral-node cluster-state ( supervisor-path supervisor-id ) ( Utils/serialize info )))
;; 创建/storms/{storm-id}节点
( activate-storm!
[ this storm-id storm-base ]
( set-data cluster-state ( storm-path storm-id ) ( Utils/serialize storm-base )))
;; 更新topology对应的StormBase对象,即更新/storm/{storm-id}节点
( update-storm!
[ this storm-id new-elems ]
;; base绑定storm-id在zookeeper上的StormBase对象
( let [ base ( storm-base this storm-id nil )
;; executors绑定component名称->组件并行度的map
executors ( :component->executors base )
;; new-elems绑定合并后的组件并行度map,update函数将组件新并行度map合并到旧map中
new-elems ( update new-elems :component->executors ( partial merge executors ))]
;; 更新StormBase对象中的组件并行度map,并写入zookeeper的/storms/{storm-id}节点
( set-data cluster-state ( storm-path storm-id )
( -> base
( merge new-elems )
Utils/serialize ))))
;; 获取storm-id的StormBase对象,即读取/storms/{storm-id}节点数据,如果callback不为空,将其赋值给storm-base-callback,并为/storms/{storm-id}节点添加"数据观察"
( storm-base
[ this storm-id callback ]
( when callback
( swap! storm-base-callback assoc storm-id callback ))
( maybe-deserialize ( get-data cluster-state ( storm-path storm-id ) ( not-nil? callback ))))
;; 删除storm-id的StormBase对象,即删除/storms/{storm-id}节点
( remove-storm-base!
[ this storm-id ]
( delete-node cluster-state ( storm-path storm-id )))
;; 更新storm-id的分配信息,即更新/assignments/{storm-id}节点数据
( set-assignment!
[ this storm-id info ]
( set-data cluster-state ( assignment-path storm-id ) ( Utils/serialize info )))
;; 删除storm-id的分配信息,同时删除其StormBase信息,即删除/assignments/{storm-id}节点和/storms/{storm-id}节点
( remove-storm!
[ this storm-id ]
( delete-node cluster-state ( assignment-path storm-id ))
( remove-storm-base! this storm-id ))
;; 将组件异常信息写入zookeeper
( report-error
[ this storm-id component-id node port error ]
;; path绑定"/errors/{storm-id}/{component-id}"
( let [ path ( error-path storm-id component-id )
;; data绑定异常信息,包括异常时间、异常堆栈信息、主机和端口
data { :time-secs ( current-time-secs ) :error ( stringify-error error ) :host node :port port }
;; 创建/errors/{storm-id}/{component-id}节点
_ ( mkdirs cluster-state path )
;; 创建/errors/{storm-id}/{component-id}的子顺序节点,并写入异常信息
_ ( create-sequential cluster-state ( str path "/e" ) ( Utils/serialize data ))
;; to-kill绑定除去顺序节点编号最大的前10个节点的剩余节点的集合
to-kill ( ->> ( get-children cluster-state path false )
( sort-by parse-error-path )
reverse
( drop 10 ))]
;; 删除to-kill中包含的节点
( doseq [ k to-kill ]
( delete-node cluster-state ( str path "/" k )))))
;; 得到给定的storm-id component-id下的异常信息
( errors
[ this storm-id component-id ]
( let [ path ( error-path storm-id component-id )
_ ( mkdirs cluster-state path )
children ( get-children cluster-state path false )
errors ( dofor [ c children ]
( let [ data ( -> ( get-data cluster-state ( str path "/" c ) false )
maybe-deserialize )]
( when data
( struct TaskError ( :error data ) ( :time-secs data ) ( :host data ) ( :port data ))
)))
]
( ->> ( filter not-nil? errors )
( sort-by ( comp - :time-secs )))))
;; 关闭连接,在关闭连接前,将回调函数从cluster-state的callbacks中删除
( disconnect
[ this ]
( unregister cluster-state state-id )
( when solo?
( close cluster-state ))))))
zookeeper.clj中mk-client函数定义如下:
mk-client函数创建一个CuratorFramework实例,为该实例注册了CuratorListener,当一个后台操作完成或者指定的watch被触发时将会执行CuratorListener中的eventReceived()。eventReceived中调用的wacher函数就是mk-distributed-cluster-state中:watcher绑定的函数。
mk-client函数
[ conf servers port
:root ""
:watcher default-watcher
:auth-conf nil ]
( let [ fk ( Utils/newCurator conf servers port root ( when auth-conf ( ZookeeperAuthInfo. auth-conf )))]
( .. fk
( getCuratorListenable )
( addListener
( reify CuratorListener
( ^ void eventReceived [ this ^ CuratorFramework _fk ^ CuratorEvent e ]
( when ( = ( .getType e ) CuratorEventType/WATCHED )
( let [ ^ WatchedEvent event ( .getWatchedEvent e )]
( watcher ( zk-keeper-states ( .getState event ))
( zk-event-types ( .getType event ))
( .getPath event ))))))))
( .start fk )
fk ))
以上就是storm与zookeeper进行交互的源码分析,我觉得最重要的部分就是如何给zk client添加"wacher",storm的很多功能都是通过zookeeper的wacher机制实现的,如"分配信息领取"。添加"wacher"大概分为以下几个步骤:
-
mk-distributed-cluster-state函数创建了一个zk client,并通过:watcher给该zk client指定了"wacher"函数,这个"wacher"函数只是简单调用ClusterState的callbacks集合中的函数,这样这个"wacher"函数执行哪些函数将由ClusterState实例决定
- ClusterState实例提供register函数来更新callbacks集合,ClusterState实例被传递给了mk-storm-cluster-state函数,在mk-storm-cluster-state中调用register添加了一个函数(fn [type path] ... ),这个函数实现了"watcher"函数的全部逻辑
-
mk-storm-cluster-state中注册的函数执行的具体内容由StormClusterState实例决定,对zookeeper节点添加"观察"也是通过StormClusterState实例实现的,这样我们就可以通过StormClusterState实例对我们感兴趣的节点添加"观察"和"回调函数",当节点或节点数据发生变化后,zk server就会给zk client发送"通知",zk client中的"wather"函数将被调用,进而我们注册的"回到函数"将被执行。
这部分源码与zookeeper联系十分紧密,涉及了很多zookeeper中的概念和特性,如"数据观察"和"节点观察"等,有关zookeeper的wacher机制请参考
http://www.cnblogs.com/ggjucheng/p/3369946.html
http://www.cnblogs.com/zhangchaoyang/articles/3813217.html
storm并没有直接使用zookeeper的api,而是使用Curator框架,Curator框架简化了访问zookeeper的操作。关于Curator框架请参考
http://f.dataguru.cn/thread-120125-1-1.html