终于把配置部分写完了。其实如果你觉得浪费时间,不想熟悉linux,那么其实你可以根本就不看,直接用本地模式跑一个简单的程序也行。但是事实上,我们学习storm,就是为了让其处理实时的大数据,配置集群有利于我们更熟悉整个框架。好了不多说,进入正题,这篇文章,我将用简单的实例代码来初步接触storm。
在storm官网下载的压缩包里有官方给出的例子,建议大家根据这个来学习一下。由于里面涉及到maven,需要配置一些东西,所以这里暂时不直接跑它的例子。取而代之的,我们用一个比较直观的例子来熟悉storm。
需求如下:
从一个数据源中获取随机的单词,然后将其转换为大写,再加个后缀,写入本地文件。
在storm的框架下,我们可以这样来解决这个问题。首先因为我们没有数据源,所以我们用RandomSpout来模拟产生源源不断的数据,然后用UpperBolt来接收数据,将字母变为大写,再发送给SuffixBolt,给它加后缀,然后写入文件。
接下来就用代码来说话。
eclipse新建工程,并在build path中把storm的jar包引进来(通常在lib文件夹下)。
写第一个类RandomSpout,用于产生数据
import java.util.Map; import java.util.Random; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; public class RandomSpout extends BaseRichSpout{ SpoutOutputCollector collector=null; String[] words={"storm","apache","spark","hadoop"}; @Override public void nextTuple() { Random random = new Random(); String word =words[random.nextInt(words.length)]; //发送消息 collector.emit(new Values(word)); } @Override public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector collector) { //初始化collector this.collector=collector; } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("src_word")); } }
要点:
- open()方法是运行的最早的一个方法,我们这里初始化了collector,以方便给别的方法使用。
- 最开始创建了一个String数组,作为单词源。里面有四个单词:storm,apache,spark,hadoop。
- nextTuple()方法会被循环调用,里面的collector.emit(new Values(word))用于发送数据给接收者。
- declareOutputFields()方法用来定义输出的字段。
然后新建UpperBolt类,用于接收RandomSpout发来的单词,并将单词转化为大写。
import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; public class UpperBolt extends BaseBasicBolt { @Override public void execute(Tuple tuple, BasicOutputCollector collector) { String src_word = tuple.getString(0); String upper_word=src_word.toUpperCase(); //发送消息出去 collector.emit(new Values(upper_word)); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("upper_word")); } }
要点:
- execute()方法不断接收tuple,并在里面写逻辑进行处理,并通过collector将封装成tuple的数据emit出去
- tuple是storm传输数据的单位,我们需要将数据处理好之后封装成tuple,再发送出去,new Values(upper_word)就是封装成tuple的一种形式
再新建SuffixBolt类,用于接受经过大写处理的单词,同时给单词加上后缀“-handled”。最后再写入本地文件。
import java.io.FileWriter; import java.io.IOException; import java.util.Map; import java.util.UUID; import backtype.storm.task.TopologyContext; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; public class SuffixBolt extends BaseBasicBolt{ FileWriter fileWriter = null; @Override public void prepare(Map stormConf, TopologyContext context) { try { fileWriter = new FileWriter("/home/"+UUID.randomUUID());//初始化fileWriter,写到本地文件,在/home/目录下,文件用随机的UUID命名 } catch (IOException e) { e.printStackTrace(); } } @Override public void execute(Tuple tuple, BasicOutputCollector collector) { String upper_word = tuple.getString(0);//接收数据 String suffix_word = upper_word+"-handled";//加后缀 try { fileWriter.append(suffix_word+"\n");//写入文件 } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("suffix_word") ); } }
要点:
- execute()方法不断接收tuple,但和之前的Bolt不一样的是,这次我们不用发送数据了,而是将处理后的数据直接写入本地文件中。
最后,写个主程序类TopMain,用来串联所有的组件。
import backtype.storm.Config; import backtype.storm.StormSubmitter; import backtype.storm.generated.AlreadyAliveException; import backtype.storm.generated.InvalidTopologyException; import backtype.storm.generated.StormTopology; import backtype.storm.topology.TopologyBuilder; public class TopMain { public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException { //新建对象 TopologyBuilder topologyBuilder = new TopologyBuilder(); //设置spout,并行度为4,第一个参数为自定义的名字 topologyBuilder.setSpout("randomspout", new RandomSpout(),4); //设置bolt,数据来源是上面的spout topologyBuilder.setBolt("upperbolt", new UpperBolt(),4).shuffleGrouping("randomspout"); topologyBuilder.setBolt("suffixbolt", new SuffixBolt(), 4).shuffleGrouping("upperbolt"); //创建拓扑 StormTopology top = topologyBuilder.createTopology(); //创建config,配置storm Config config = new Config(); config.setNumWorkers(3); //进程数 config.setNumAckers(2); //应答器数目 //提交拓扑给集群 StormSubmitter.submitTopology("example_top", config, top); } }
要点:
- 拓扑是storm里面一个很重要的概念,说白了,就是工程的整合,在这里,你可以设置worker数,Acker数,串联Spout和Bolt等等,关于它的配置,其实有很多内容,这里先不多说,注释把基本的功能写出来了,其实每个地方都可以深入探讨的。
好,代码写完了,怎么跑呢,且听下回分解。