-
Notifications
You must be signed in to change notification settings - Fork 1
/
MyJob.java
165 lines (141 loc) · 5.47 KB
/
MyJob.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
package cisc.mapreduce;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class MyJob extends Configured implements Tool {
public static Set<String> goodWords = new HashSet<String>();
public static Set<String> badWords = new HashSet<String>();
public static class MapClass extends MapReduceBase
implements Mapper<LongWritable, Text, Text, Text> {
private Text product_id = new Text();
private Text body = new Text();
public void map(LongWritable key, Text value,
OutputCollector<Text, Text> output,
Reporter reporter) throws IOException {
String[] line = value.toString().split("\t");
//Initially had errors thrown when using larger data file beacuse some lines did not have 8 entries
//Checking length before accessing.. Also wrapped in a try/catch to be extra safe
if (line.length >= 8) {
try {
body.set(line[1]);
product_id.set(line[7]);
} catch (Exception e){
System.err.println("Caught exception while parsing line of data (data is inconsistent)");
}
output.collect(body, product_id);
}
}
}
public static class Reduce extends MapReduceBase
implements Reducer<Text, Text, Text, Text> {
private Text result = new Text();
public void reduce(Text key, Iterator<Text> values,
OutputCollector<Text, Text> output,
Reporter reporter) throws IOException {
//Total sentiment words count
int count = 0;
String[] csv;
//Iterating over each review -- if more than one
while (values.hasNext()) {
//remove all non whitespace and non characters from review body, split by spaces between words
csv = values.next().toString().replaceAll("[^\\p{L}\\p{Z}]","").split(" ");
for (String word : csv) {
//Using HashSet because lookup is O(1) -- arraylist or similar is O(n)
if (goodWords.contains(word)) {
count = count+1;
}
if (badWords.contains(word)) {
count = count-1;
}
}
}
if (count > 0) {
result.set("Positive Sentiment");
}
else if (count < 0) {
result.set("Negative Sentiment");
}
else {
result.set("Neutral Sentiment");
}
// output text of value rather than just the count of total sentiment
output.collect(key, result);
}
}
public int run(String[] args) throws Exception {
Configuration conf = getConf();
JobConf job = new JobConf(conf, MyJob.class);
positiveList(args[1]); //Path to positive words file
negativeList(args[0]); //Path to negative words file
Path in = new Path(args[2]);
Path out = new Path(args[3]);
FileInputFormat.setInputPaths(job, in);
FileOutputFormat.setOutputPath(job, out);
job.setJobName("MyJob");
job.setMapperClass(MapClass.class);
job.setReducerClass(Reduce.class);
job.setInputFormat(TextInputFormat.class);
job.setOutputFormat(TextOutputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
JobClient.runJob(job);
return 0;
}
private void positiveList(String p) {
try {
BufferedReader fis = new BufferedReader(new FileReader(new File(p)));
String word;
while ((word = fis.readLine()) != null) {
goodWords.add(word);
}
fis.close();
} catch (IOException ioe) {
System.err.println("Caught exception..File not found");
}
}
private void negativeList(String p) {
try {
BufferedReader fis = new BufferedReader(new FileReader(new File(p)));
String word;
while ((word = fis.readLine()) != null) {
badWords.add(word);
}
fis.close();
} catch (IOException ioe) {
System.err.println("Caught exception..File not found");
}
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new MyJob(), args);
System.exit(res);
}
}