Jamal的博客

jstorm本地开发环境搭建

maven依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
<dependency>
<groupId>com.alibaba.jstorm</groupId>
<artifactId>jstorm-core</artifactId>
<version>2.1.1</version>
<!--<scope>provided</scope>-->
</dependency>
<!--<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>0.9.2-incubating</version>
</dependency>-->
<dependency>
<groupId>com.esotericsoftware.kryo</groupId>
<artifactId>kryo</artifactId>
<version>2.22</version>
</dependency>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>chill-java</artifactId>
<version>0.8.1</version>
</dependency>

实例

storm是spout获取数据源,bolt处理数据的框架,因此我们只要实现spout数据源读入,bolt处理数据的功能即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
spout:
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import java.util.Map;
import java.util.Random;
// 随机发送一条内置消息,该spout继承BaseRichSpout/IRichSpout类
@SuppressWarnings("serial")
public class RandomSentenceSpout extends BaseRichSpout {
SpoutOutputCollector spoutOutputCollector;
Random random;
// 进行spout的一些初始化工作,包括参数传递
@SuppressWarnings("rawtypes")
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
spoutOutputCollector = collector;
random = new Random();
}
// 进行Tuple处理的主要方法
public void nextTuple() {
Utils.sleep(2000);
String[] sentences = new String[]{
"jikexueyuan is a good school",
"And if the golden sun",
"four score and seven years ago",
"storm hadoop spark hbase",
"blogchong is a good man",
"Would make my whole world bright",
"blogchong is a good website",
"storm would have to be with you",
"Pipe to subprocess seems to be broken No output read",
" You make me feel so happy",
"For the moon never beams without bringing me dreams Of the beautiful Annalbel Lee",
"Who love jikexueyuan and blogchong",
"blogchong.com is Magic sites",
"Ko blogchong swayed my leaves and flowers in the sun",
"You love blogchong.com", "Now I may wither into the truth",
"That the wind came out of the cloud",
"at backtype storm utils ShellProcess",
"Of those who were older than we"};
// 从sentences数组中,随机获取一条语句,作为这次spout发送的消息
String sentence = sentences[random.nextInt(sentences.length)];
System.out.println("选择的数据源是: " + sentence);
// 使用emit方法进行Tuple发布,参数用Values申明
spoutOutputCollector.emit(new Values(sentence.trim().toLowerCase()));
}
// 消息保证机制中的ack确认方法
public void ack(Object id) {
}
// 消息保证机制中的fail确认方法
public void fail(Object id) {
}
// 声明字段
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
bolt:
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
@SuppressWarnings("serial")
public class PrintBolt extends BaseBasicBolt {
public void execute(Tuple input, BasicOutputCollector collector) {
try {
String mesg = input.getString(0);
if (mesg != null)
System.out.println(mesg);
} catch (Exception e) {
e.printStackTrace();
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
topology:
public class Topology1 {
private static TopologyBuilder builder = new TopologyBuilder();
public static void main(String[] args) {
Config config = new Config();
builder.setSpout("RandomSentence", new RandomSentenceSpout(), 2);
builder.setBolt("WordNormalizer", new PrintBolt(), 2).shuffleGrouping("RandomSentence");
config.setDebug(false);
config.setMaxTaskParallelism(1);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("wordcount", config, builder.createTopology());
}
}

接下来只需要启动topology即可。

问题

  1. 报错1
    1
    java.lang.ClassNotFoundException: com.twitter.chill.java.RegexSerializer, compiling:(carbonite/serializer.clj:1:1)

添加依赖:

1
2
3
4
5
<dependency>
<groupId>com.twitter</groupId>
<artifactId>chill-java</artifactId>
<version>0.8.1</version>
</dependency>