年初领导让做一个检索热词的干预,也就是将统计用户搜索热词的结果,人工的指定其在排行榜中的位置。当然这任务比较恶心,咱只是个出来混饭碗的民工,不出格的事儿也可以忍了
说技术。工作流程是收集用户的搜索日志,统计每个keyword在一天之中被搜索的次数,根据每个keyword的统计历史,使用数学方差得出它近期热度的评分,然后降序排序给出结果列表。(如果做的更细致可以在计算前加入语义分析的部分,这样能更好的分析出刚刚流行的网络用语,我没有做那么深,这里暂时不表)
现在加入人工干预的部分,排行本来就是个topN的问题,干预的也是排行的前几个。编辑向来喜欢简单直接粗暴的方法,把某个关键词直接指定他的位置,也就是位置(priority)与得分(score)的混合排序。priority实际上就可以认为是排名的优先级,所以组合排序的策略按priority降序,score降序。
在map/reduce框架下,排序没啥子技术含量,只需要简单调用方法告知job需要排序的key的类型。但多字段排序,需要实现WritableComparable接口的自定义Writable类型来作为排序的key,也很简单。网上hadoop的中文资料比较少,我爱好装B但缺少hadoop编程的硬货,写出这个难免让您贱笑了。。
不说废话,直接上代码
1、KeyWritable.java
1
public
static
class
KeyWritable
implements
WritableComparable
<
KeyWritable
>
{
2
3
private
IntWritable priority;
4
private
FloatWritable score;
5
6
public
KeyWritable(){
7
priority
=
new
IntWritable(
0
);
8
score
=
new
FloatWritable(
0
);
9
}
10
11
public
KeyWritable(IntWritable priority,FloatWritable score) {
12
set(priority,score);
13
}
14
15
public
KeyWritable(
int
priority,
long
score) {
16
set(
new
IntWritable(priority),
new
FloatWritable(score));
17
}
18
19
public
void
set(IntWritable priority,FloatWritable score){
20
this
.priority
=
priority;
21
this
.score
=
score;
22
}
23
24
public
IntWritable getPriority(){
25
return
this
.priority;
26
}
27
28
public
FloatWritable getScore(){
29
return
this
.score;
30
}
31
32
@Override
33
public
void
readFields(DataInput in)
throws
IOException {
34
this
.priority.readFields(in);
35
this
.score.readFields(in);
36
37
}
38
39
@Override
40
public
void
write(DataOutput out)
throws
IOException {
41
this
.priority.write(out);
42
this
.score.write(out);
43
}
44
45
@Override
46
public
int
compareTo(KeyWritable obj) {
47
int
cmp
=
this
.priority.compareTo(obj.priority);
48
if
(cmp
!=
0
){
49
return
cmp;
50
}
51
return
this
.score.compareTo(obj.score);
52
}
53
54
@Override
55
public
boolean
equals(Object obj) {
56
if
(obj
instanceof
KeyWritable){
57
int
result
=
this
.compareTo((KeyWritable)obj);
58
if
(result
==
0
){
59
return
true
;
60
}
61
}
62
return
false
;
63
}
64
65
@Override
66
public
int
hashCode() {
67
return
score.hashCode();
68
}
69
70
@Override
71
public
String toString() {
72
return
super
.toString();
73
}
74
75
76
/**
77
* Comparator
78
*
@author
zhangmiao
79
*
80
*/
81
public
static
class
Comparator
extends
WritableComparator {
82
public
Comparator() {
83
super
(KeyWritable.
class
);
84
}
85
86
@Override
87
public
int
compare(
byte
[] b1,
int
s1,
int
l1,
byte
[] b2,
88
int
s2,
int
l2) {
89
KeyWritable key1
=
new
KeyWritable();
90
KeyWritable key2
=
new
KeyWritable();
91
DataInputBuffer buffer
=
new
DataInputBuffer();
92
93
try
{
94
95
buffer.reset(b1, s1, l1);
96
key1.readFields(buffer);
97
buffer.reset(b2, s2, l2);
98
key2.readFields(buffer);
99
}
catch
(IOException e) {
100
throw
new
RuntimeException(e);
101
}
102
return
compare(key1, key2);
103
}
104
105
@Override
106
public
int
compare(WritableComparable a,WritableComparable b){
107
if
(a
instanceof
KeyWritable
&&
b
instanceof
KeyWritable) {
108
return
((KeyWritable) a).compareTo(((KeyWritable) b));
109
}
110
return
super
.compare(a, b);
111
}
112
113
}
114
115
public
static
class
DecreasingComparator
extends
Comparator {
116
117
@Override
118
public
int
compare(
byte
[] b1,
int
s1,
int
l1,
byte
[] b2,
int
s2,
int
l2){
119
return
-
super
.compare(b1,s1,l1,b2,s2,l2);
120
}
121
}
122
}
2、在提交job设置KeyWritable比较器
job.setOutputKeyComparatorClass(KeyWritable.DecreasingComparator.
class
);
(未完待续)

