过程,
Spout 发送msgid 1-10
一级Bolt, msgid1的tuple做为基本组合tuple, 其他8个和一组合, 然后发送给二级Bolt, 同时单个msgid对应的tuple都ack一次,msgid1对象tuple, acker将会跟踪8个二级bolt处理情况.
二级Bolt,发送ack fail(模拟处理失败)
结果:在spout fail下出现msg1-9都失败的情况 .
拓扑代码
package storm.starter; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.LocalDRPC; import backtype.storm.StormSubmitter; import backtype.storm.drpc.DRPCSpout; import backtype.storm.task.OutputCollector; import backtype.storm.task.ShellBolt; import backtype.storm.task.TopologyContext; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.TopologyBuilder; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import storm.starter.spout.RandomSentenceSpout; import java.lang.management.ManagementFactory; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.log4j.Logger; import org.apache.log4j.PropertyConfigurator; /** * This topology demonstrates Storm's stream groupings and multilang * capabilities. */ public class WordCountTopology { public static String GetThreadName() { Thread thread = Thread.currentThread(); return thread.getName(); } public static final Logger logger = Logger .getLogger(WordCountTopology.class); // 切分单词 一级bolt /* * public static class SplitSentence extends ShellBolt implements IRichBolt * { public SplitSentence() { super("python", "splitsentence.py"); * logger.error(GetThreadName() + "SplitSentence create"); } * * // 定义字段发送 * * @Override public void declareOutputFields(OutputFieldsDeclarer declarer) * { declarer.declare(new Fields("word")); logger.error(GetThreadName() + * "declarer.declare(new Fields(word))"); } * * @Override public Map<String, Object> getComponentConfiguration() { * logger.error("getComponentConfiguration"); return null; } } */ public static class SplitSentence implements IRichBolt { private OutputCollector _collector; int num = 0; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { _collector = collector; } private Tuple tuple1; @Override public void execute(Tuple tuple) { String sentence = tuple.getString(0); if(sentence.equals("a")) { tuple1 = tuple; } else{ List<Tuple> anchors = new ArrayList<Tuple>(); anchors.add(tuple1); anchors.add(tuple); _collector.emit(anchors, new Values(sentence + "a")); _collector.ack(tuple); _collector.ack(tuple1); } // for (String word : sentence.split(" ")){ // _collector.emit(tuple, new Values(word)); // } // num++; System.out.println("Bolt Thread " + Thread.currentThread().getName() + "recve : " + sentence); System.out.println( num + " bolt recev:" + tuple.getMessageId().getAnchorsToIds()); } @Override public void cleanup() { } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } @Override public Map<String, Object> getComponentConfiguration() { // TODO Auto-generated method stub return null; } } public static class CountCount1 implements IRichBolt { Map<String, Integer> counts = new HashMap<String, Integer>(); private OutputCollector _collector; int num = 0; @Override public void execute(Tuple tuple) { String word = tuple.getString(0); //logger.error(this.toString() + "word = " + word); Integer count = counts.get(word); if (count == null) count = 0; count++; counts.put(word, count); num++; _collector.fail(tuple); //_collector.ack(tuple); //_collector.emit(tuple, new Values(word, count)); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // logger.error("declareOutputFields :"); declarer.declare(new Fields("result", "count")); } @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { // TODO Auto-generated method stub _collector = collector; } @Override public void cleanup() { // TODO Auto-generated method stub } @Override public Map<String, Object> getComponentConfiguration() { // TODO Auto-generated method stub return null; } } public static class WordCount extends BaseBasicBolt { private OutputCollector _collector; Map<String, Integer> counts = new HashMap<String, Integer>(); @Override public void execute(Tuple tuple, BasicOutputCollector collector) { String word = tuple.getString(0); //logger.error(this.toString() + "word = " + word); Integer count = counts.get(word); if (count == null) count = 0; count++; counts.put(word, count); // <key, list<value, count> > //logger.error(this.toString() + "count = " + count); collector.emit(new Values(word, count)); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // logger.error("declareOutputFields :"); declarer.declare(new Fields("result", "count")); } } public static class WordCount1 extends BaseBasicBolt { Map<String, Integer> counts = new HashMap<String, Integer>(); @Override public void execute(Tuple tuple, BasicOutputCollector collector) { // logger.error("WordCount1"); // tuple.getFields()[0]; if (tuple.getFields().contains("result")) { String count = (String) tuple.getValueByField("result"); // tuple.getValueByField(field) long countl = -0;// = Long.valueOf(count); // logger.error(this.toString() + " key = resultkey " + count); } if (tuple.getFields().contains("count")) { Integer count = (Integer) tuple.getValueByField("count"); // tuple.getValueByField(field) long countl = -0;// = Long.valueOf(count); //logger.error(this.toString() + " key = count " + count); } // String word = tuple.getString(0); // logger.error(this.toString() +"word = " + word); // Integer count = counts.get(word); // if (count == null) // count = 0; // count++; // counts.put(word, count); // logger.error(this.toString() + "count = " + count); // collector.emit(new Values(word, count)); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // logger.error("declareOutputFields :"); declarer.declare(new Fields("word1", "count1")); } } public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); PropertyConfigurator .configure("/home/hadoop/code1/Kafka/src/Log4j.properties"); // parallelism_hint 代表是executor数量, setNumTasks 代表Tasks数量 builder.setSpout("spout", new RandomSentenceSpout(), 5).setNumTasks(2); builder.setBolt("split", new SplitSentence(), 8).setNumTasks(1).shuffleGrouping("spout"); builder.setBolt("count", new CountCount1(), 12).fieldsGrouping("split", new Fields("word")); // builder.setBolt("WordCount1", new WordCount1(), 1).fieldsGrouping( // "count", new Fields("result", "count")); Config conf = new Config(); conf.setDebug(true);
// 这个设置一个spout task上面最多有多少个没有处理(ack/fail)的tuple,防止tuple队列过大, 只对可靠任务起作用 conf.setMaxSpoutPending(2); conf.setMessageTimeoutSecs(5); // 消息处理延时 conf.setNumAckers(2); // 消息处理acker System.out.println("args.length = " + args.length); if (args != null && args.length > 0) { conf.setNumWorkers(5); // 设置工作进程 StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } else { // 每个组件的最大executor数 conf.setMaxTaskParallelism(1); conf.setDebug(true); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("word-count", conf, builder.createTopology()); String str = "testdrpc"; // drpc.execute("testdrpc", str); Thread.sleep(1088000); cluster.shutdown(); } } }
spout代码
package storm.starter.spout; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.utils.Utils; import java.util.Map; import java.util.Random; import org.apache.log4j.Logger; import storm.starter.WordCountTopology; // IRichSpout public class RandomSentenceSpout extends BaseRichSpout { SpoutOutputCollector _collector; Random _rand; public static final Logger logger = Logger .getLogger(RandomSentenceSpout.class); @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { _collector = collector; _rand = new Random(); WordCountTopology.logger.error(this.toString() + "RandomSentenceSpout is create"); } private int num = 0; private String gettmstr() { StringBuilder tmp = new StringBuilder(); for (int i = 0; i <= num; i++) tmp.append("a"); num++; return tmp.toString(); } @Override public void nextTuple() { Utils.sleep(200); // String[] sentences = new String[]{ "the cow jumped over the moon", // "an apple a day keeps the doctor away", // "four score and seven years ago", "snow white and the seven dwarfs", // "i am at two with nature" }; String[] sentences = new String[] { "A" }; String sentence = gettmstr(); // sentences[_rand.nextInt(sentences.length)]; if (num < 10) { _collector.emit(new Values(sentence), new Integer(num)); // logger.error(this.toString() + "send sentence = " + sentence); // System.out.println(Thread.currentThread().getName() + " Spout "); } } @Override public void ack(Object id) { logger.error(this.toString() + "spout ack =" + (Integer)id); } @Override public void fail(Object id) { logger.error("spout fail =" + (Integer)id); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } }
运行结果
2014-10-03 21:17:31,149 ERROR (storm.starter.spout.RandomSentenceSpout:71) - spout fail =1 2014-10-03 21:17:31,351 ERROR (storm.starter.spout.RandomSentenceSpout:71) - spout fail =2 Bolt Thread Thread-22recve : aaa 0 bolt recev:{-3139141336114052337=7131499433188364504} 2014-10-03 21:17:31,552 ERROR (storm.starter.spout.RandomSentenceSpout:71) - spout fail =3 Bolt Thread Thread-22recve : aaaa 0 bolt recev:{-4497680640148241887=-615828348570847097} 2014-10-03 21:17:31,754 ERROR (storm.starter.spout.RandomSentenceSpout:71) - spout fail =4 Bolt Thread Thread-22recve : aaaaa 0 bolt recev:{-8878772617767839827=-7708082520013359311} 2014-10-03 21:17:31,957 ERROR (storm.starter.spout.RandomSentenceSpout:71) - spout fail =5 Bolt Thread Thread-22recve : aaaaaa 0 bolt recev:{-3995020874692495577=-5070846720162762196} 2014-10-03 21:17:32,160 ERROR (storm.starter.spout.RandomSentenceSpout:71) - spout fail =6 Bolt Thread Thread-22recve : aaaaaaa 0 bolt recev:{-5994700617723404155=-3738685841476816404} 2014-10-03 21:17:32,362 ERROR (storm.starter.spout.RandomSentenceSpout:71) - spout fail =7 Bolt Thread Thread-22recve : aaaaaaaa 0 bolt recev:{-2308734827213127682=-5719708045753233056} 2014-10-03 21:17:32,563 ERROR (storm.starter.spout.RandomSentenceSpout:71) - spout fail =8 Bolt Thread Thread-22recve : aaaaaaaaa 0 bolt recev:{-3718844156917119468=-6359724009048981605} 2014-10-03 21:17:32,766 ERROR (storm.starter.spout.RandomSentenceSpout:71) - spout fail =9