Storm系列(三)Topology提交过程

系统 1588 0

提交示例代码:

1 public static void main(String[] args) throws Exception {

2 TopologyBuilder builder = new TopologyBuilder();

3 builder.setSpout("random", new RandomWordSpout(), 2);

4 builder.setBolt("transfer", new TransferBolt(), 4).shuffleGrouping("random");

5 builder.setBolt("writer", new WriterBolt(), 4).fieldsGrouping("transfer", new Fields("word"));

6 Config conf = new Config();

7 conf.setNumWorkers(4);// 设置启动4个Worker

8 conf.setNumAckers(1); // 设置一个ack线程

9 conf.setDebug(true); // 设置打印所有发送的消息及系统消息

10 StormSubmitter.submitTopology("test", conf, builder.createTopology());

11 }

1、构建 TopologyBuilder 对象 builder,主要用于对各个组件(bolt、spout)进行配置;

TopologyBuilder主要属性字段定义如下:

public class TopologyBuilder {

// 所提交Topolog中所有的bolt将放入到_bolts中

private Map<String, IRichBolt> _bolts = new HashMap<String, IRichBolt>();

// 所提交Topolog中所有的spout将放入到_spouts中

private Map<String, IRichSpout> _spouts = new HashMap<String, IRichSpout>();

// 所提交Topolog中所有的spout和bolt都将放入_commons中

private Map<String, ComponentCommon> _commons = new HashMap<String, ComponentCommon>();

....................................

}

2、以上提交代码中第三行,配置了一个id值为random,IRichSpout对象为RandomWordSpout,而并行度为2(两个线程里面跑两个任务)的spout;

// setSpout函数实现源码

public SpoutDeclarer setSpout(String id, IRichSpout spout, Number parallelism_hint) {

validateUnusedId(id);

initCommon(id, spout, parallelism_hint);

_spouts.put(id, spout);

return new SpoutGetter(id);

}

validateUnusedId:检测输入的id是不是唯一,若已经存在将抛出异常;

initCommon:构建ComponentCommon对象并进行相应的初始化,最后放入到_commons(以上TopologyBuilder中定义的Map);

initCommon函数实现源码:

private void initCommon(String id, IComponent component, Number parallelism) {

ComponentCommon common = new ComponentCommon();

// 设置消息流的来源及分组方式

common.set_inputs(new HashMap<GlobalStreamId, Grouping>());

if(parallelism!=null)

// 设置并行度

common.set_parallelism_hint(parallelism.intValue());

Map conf = component.getComponentConfiguration();

if(conf!=null)

// 设置组件的配置参数

common.set_json_conf(JSONValue.toJSONString(conf));

_commons.put(id, common);

}

在ComponentCommon中主要对以下四个属性字段进行设置:

// GlobalStreamId:确定消息来源,其中componentId表示所属组件,streamId为消息流的标识符;

// Grouping:确定消息分组方式;

private Map<GlobalStreamId,Grouping> inputs;

// StreamInfo表示输出的字段列表及是否为直接流

private Map<String,StreamInfo> streams;

private int parallelism_hint; // 设置并行度

private String json_conf; // 其它配置参数设置(必须为JSON格式)

3、SpoutGetter

实现源码:

protected class SpoutGetter extends ConfigGetter<SpoutDeclarer> implements SpoutDeclarer {

public SpoutGetter(String id) {

super(id);

}

}

ConfigGetter、SpoutGetter的实现都是在TopologyBuilder中, ConfigGetter作用:设置程序中的配置项,覆盖默认的配置项,且配置项的格式为为JSON(本质上是改变对应ComponentCommon对象中json_conf的值);

4、提交示例代码中的第四行定义了一个id为transfer,IRichSpout对象为TransferBolt,并行度为4的bolt

setBolt实现源码:

public BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelism_hint) {

validateUnusedId(id);

initCommon(id, bolt, parallelism_hint);

_bolts.put(id, bolt);

return new BoltGetter(id);

}

设置Bolt的函数与设置Spout函数的实现唯一的区别在返回结果;

BoltGetter实现部分源码:

protected class BoltGetter extends ConfigGetter<BoltDeclarer> implements BoltDeclarer {

private String _boltId;

public BoltGetter(String boltId) {

super(boltId);

_boltId = boltId;

}

public BoltDeclarer shuffleGrouping(String componentId) {

return shuffleGrouping(componentId, Utils.DEFAULT_STREAM_ID);

}

public BoltDeclarer fieldsGrouping(String componentId, Fields fields) {

return fieldsGrouping(componentId, Utils.DEFAULT_STREAM_ID, fields);

}

public BoltDeclarer fieldsGrouping(String componentId, String streamId, Fields fields) {

return grouping(componentId, streamId, Grouping.fields(fields.toList()));

}

public BoltDeclarer shuffleGrouping(String componentId, String streamId) {

return grouping(componentId, streamId, Grouping.shuffle(new NullStruct()));

}

private BoltDeclarer grouping(String componentId, String streamId, Grouping grouping) {

_commons.get(_boltId).put_to_inputs(new GlobalStreamId(componentId, streamId), grouping);

return this;

}

.........................................

}

BoltGetter继承至ConfigGetter并实现了BoltDeclarer接口,并重载了BoltDeclarer(InputDeclarer)中各种分组方式(如:fieldsGrouping、shuffleGrouping),分组方式的实现本质上是在_commons中通过对用的boltId找到对应的ComponentCommon对象,对inputs属性进行设置;

5、通过以上几步完成了bolt与spout的配置(对应提交示例代码中的2~5行),6~9行是对运行环境的配置,10行用于向集群提交执行任务,builder.createTopology用于构建StormTopology对象.

createTopology实现源码:

public StormTopology createTopology() {

Map<String, Bolt> boltSpecs = new HashMap<String, Bolt>();

Map<String, SpoutSpec> spoutSpecs = new HashMap<String, SpoutSpec>();

for(String boltId: _bolts.keySet()) {

IRichBolt bolt = _bolts.get(boltId);

ComponentCommon common = getComponentCommon(boltId, bolt);

boltSpecs.put(boltId, new Bolt(ComponentObject.serialized_java(Utils.serialize(bolt)), common));

}

for(String spoutId: _spouts.keySet()) {

IRichSpout spout = _spouts.get(spoutId);

ComponentCommon common = getComponentCommon(spoutId, spout);

spoutSpecs.put(spoutId, new SpoutSpec(ComponentObject.serialized_java(Utils.serialize(spout)), common));

}

return new StormTopology(spoutSpecs,

boltSpecs,

new HashMap<String, StateSpoutSpec>());

}

以上源码实现中主要做了两件事:

  • 通过boltId从_bolts中获取到对应的bolt对象,再通过getComponentCommon方法设置对应ComponentCommon对象的streams(输出的字段列表及是否为直接流)属性值,最后将bolt和common一起 放入到boltSpecs集合中。
  • 通过spoutId从_spouts中获取到对应的spout对象,再通过getComponentCommon方法设置对应ComponentCommon对象的streams(输出的字段列表及是否为直接流)属性值,最后将spout和common一起 放入到boltSpecs集合中。
  • 通过以上两步使所设置的所有组件都封装到StormTopology对象中,最后提交的到集群中运行。

Storm系列(三)Topology提交过程


更多文章、技术交流、商务合作、联系博主

微信扫码或搜索:z360901061

微信扫一扫加我为好友

QQ号联系: 360901061

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描下面二维码支持博主2元、5元、10元、20元等您想捐的金额吧,狠狠点击下面给点支持吧,站长非常感激您!手机微信长按不能支付解决办法:请将微信支付二维码保存到相册,切换到微信,然后点击微信右上角扫一扫功能,选择支付二维码完成支付。

【本文对您有帮助就好】

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描上面二维码支持博主2元、5元、10元、自定义金额等您想捐的金额吧,站长会非常 感谢您的哦!!!

发表我的评论
最新评论 总共0条评论