Storm 中drpc调用

系统 1600 0
      package storm.starter;



import backtype.storm.Config;

import backtype.storm.LocalCluster;

import backtype.storm.LocalDRPC;

import backtype.storm.StormSubmitter;

import backtype.storm.drpc.DRPCSpout;

import backtype.storm.task.ShellBolt;

import backtype.storm.topology.BasicOutputCollector;

import backtype.storm.topology.IRichBolt;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.topology.TopologyBuilder;

import backtype.storm.topology.base.BaseBasicBolt;

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Tuple;

import backtype.storm.tuple.Values;

import storm.starter.spout.RandomSentenceSpout;



import java.lang.management.ManagementFactory;

import java.util.HashMap;

import java.util.Map;



import org.apache.log4j.Logger;

import org.apache.log4j.PropertyConfigurator;



/**

 * This topology demonstrates Storm's stream groupings and multilang

 * capabilities.

 */

public class Drpctest {

	public static final Logger logger = Logger.getLogger(Drpctest.class);

	public static class WordCount extends BaseBasicBolt {

		Map<String, Integer> counts = new HashMap<String, Integer>();



		@Override

		public void execute(Tuple tuple, BasicOutputCollector collector) {

			String word = tuple.getString(0);

			logger.error(this.toString() + "word = " + word);

			Integer count = counts.get(word);

			if (count == null)

				count = 0;

			count++;

			counts.put(word, count);

			logger.error(this.toString() + "count = " + count);

			collector.emit(new Values(word, count));

		}



		String str = Thread.currentThread().getName();



		@Override

		public void declareOutputFields(OutputFieldsDeclarer declarer) {

			logger.error("declareOutputFields :");

			declarer.declare(new Fields("result", "count"));

		}

	}



	public static class DrpcBolt extends BaseBasicBolt {

		Map<String, Integer> counts = new HashMap<String, Integer>();



		@Override

		public void execute(Tuple tuple, BasicOutputCollector collector) {

			String logString = tuple.getString(0);

			logger.error("DrpcBolt recve :" + logString);

		}



		@Override

		public void declareOutputFields(OutputFieldsDeclarer declarer) {

			// 暂时没用

			declarer.declare(new Fields("word1", "count1"));

		}

	}



	public static void main(String[] args) throws Exception {

		TopologyBuilder builder = new TopologyBuilder();



		// drpc

		LocalDRPC drpc = new LocalDRPC();

		DRPCSpout drpc_spout = new DRPCSpout("testdrpc", drpc);

		builder.setSpout("drpcspout", drpc_spout, 3);



		PropertyConfigurator

				.configure("/home/hadoop/code1/Kafka/src/Log4j.properties");



		// 接入drpc

		builder.setBolt("DrpcBolt", new DrpcBolt(), 1).shuffleGrouping(

				"drpcspout");



		Config conf = new Config();

		conf.setDebug(true);



		if (args != null && args.length > 0) {

			conf.setNumWorkers(3);



			StormSubmitter.submitTopology(args[0], conf,

					builder.createTopology());

		} else {

			conf.setMaxTaskParallelism(3);

			conf.setDebug(true);



			LocalCluster cluster = new LocalCluster();

			cluster.submitTopology("word-count", conf, builder.createTopology());



			String str = "send test drpc"; // 和 DRPCSpout 名字对应

			drpc.execute("testdrpc", str);



			Thread.sleep(10000);



			cluster.shutdown();

		}

	}

}


    

 

Storm 中drpc调用


更多文章、技术交流、商务合作、联系博主

微信扫码或搜索:z360901061

微信扫一扫加我为好友

QQ号联系: 360901061

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描下面二维码支持博主2元、5元、10元、20元等您想捐的金额吧,狠狠点击下面给点支持吧,站长非常感激您!手机微信长按不能支付解决办法:请将微信支付二维码保存到相册,切换到微信,然后点击微信右上角扫一扫功能,选择支付二维码完成支付。

【本文对您有帮助就好】

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描上面二维码支持博主2元、5元、10元、自定义金额等您想捐的金额吧,站长会非常 感谢您的哦!!!

发表我的评论
最新评论 总共0条评论