-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsrrecs.py
executable file
·156 lines (124 loc) · 4.52 KB
/
srrecs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
#!/usr/bin/env python
import mr_tools
from hashlib import md5
import random
import tempfile
import sys
def obsc(d, secret):
m = md5(str(d))
m.update(secret)
return m.hexdigest()
def obscure(secret=random.random()):
"""Turn identifiable components of vote dumps into salted hashes"""
def o(d):
m = md5(str(d))
m.update(secret)
return m.hexdigest()
@mr_tools.dataspec_m('account_id',
'link_id',
'sr_id',
'dir')
def process(aff):
yield o(aff.account_id), o(aff.link_id), o(aff.sr_id), aff.dir
mr_tools.mr_map(process)
def affinities_m():
"""Take the vote dump generated by srrecs.pig on stdin and prepare
for reducing the votes into affinities by keying them on
(account_id, sr_id)"""
@mr_tools.dataspec_m('account_id',
'link_id',
'sr_id',
'dir')
def process(aff):
yield ('%s_%s' % (aff.account_id, aff.sr_id),
aff.account_id, aff.link_id, aff.sr_id, aff.dir)
mr_tools.mr_map(process)
def affinities_r():
"""The reduction step of turning lists of votes on sr_ids into
affinities"""
@mr_tools.dataspec_r('account_id',
'link_id',
'sr_id',
'dir')
def process(account_srid, affs):
# we can assume that all of the account_ids and sr_ids are
# equal
count=0.0
ups=0.0
account_id = sr_id = None
for aff in affs:
if account_id is None:
account_id = aff.account_id
sr_id = aff.sr_id
count+=1
if aff.dir == '1':
ups += 1
# you must vote at least three times to ride
if count >= 3:
yield account_id, sr_id, ups/count
mr_tools.mr_reduce(process)
def write_matrix(out_cm, out_clabel, out_rlabel):
"""Reformat the affinities coming out of the functions above to
the format wanted by skmeans (which is the format used by
CLUTO, documented at
<http://glaros.dtc.umn.edu/gkhome/fetch/sw/cluto/manual.pdf> in
section 3.3.1)"""
class Stats(object):
__slots__ = ['num_srs', 'num_rows', 'sr_map', 'total_entries']
def __init__(self):
self.num_srs = self.num_rows = self.total_entries = 0
# we can safely keep the whole sr_map around in memory
# like this because we have fewer than 100k of them
self.sr_map = {}
stats = Stats()
f_cm = tempfile.TemporaryFile()
f_cl = tempfile.TemporaryFile()
@mr_tools.dataspec_r('sr_id',
('affinity',float))
def _reduce(account_id, affs):
affs = list(affs)
for aff in affs:
# the affinities we get are from 0..1, but skmeans wants
# -1..1)
aff.affinity = aff.affinity*2-1
# skmeans really doesn't like rows consisting entirely in
# zeroes
affs = filter(lambda aff: not(-0.001 < aff.affinity < 0.001),
affs)
if not affs:
return []
stats.num_rows += 1
for aff in affs:
if aff.sr_id not in stats.sr_map:
stats.num_srs += 1
stats.sr_map[aff.sr_id] = stats.num_srs # CLUTO's
# matricies
# are 1-based
stats.total_entries += len(affs)
f_cl.write('%s\n' % (account_id,))
f_cm.write(' '.join(('%s %s' % (stats.sr_map[aff.sr_id], aff.affinity)
for aff in affs)))
f_cm.write('\n')
return []
mr_tools.mr_reduce(_reduce)
def cp_fds(infd, outfd, buffsize = 1024*1024):
infd.flush()
infd.seek(0)
while True:
readed = infd.read(buffsize)
if readed:
outfd.write(readed)
else:
break
with open(out_cm,'w') as outfd:
outfd.write(
'%d %d %d\n' % (stats.num_rows, len(stats.sr_map), stats.total_entries))
cp_fds(f_cm, outfd)
with open(out_clabel,'w') as outfd:
cp_fds(f_cl, outfd)
with open(out_rlabel, 'w') as outfd:
for sr_id, sr_mapped in sorted(stats.sr_map.items(),
key = lambda x: x[1]):
outfd.write('%s\n' % (sr_id,))
if __name__ == '__main__':
eval(sys.argv[1])