-
Notifications
You must be signed in to change notification settings - Fork 0
/
profile_job.pyj
98 lines (76 loc) · 2.47 KB
/
profile_job.pyj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
#import random
#import datetime
profile = rule(name='profile')
profile._sort = True
profile._params = {'c': 0}
@profile.declare_targets
def target(rule, nodes):
if rule.target.url not in nodes:
segments = nodes.where('scheme == ? and table == ?',
rule.source.scheme, rule.source.table)
if not segments:
return
# examine a random third of the segments
segments = filter(lambda x: random.random() <1, segments)
yield Node(rule.target.url, 'fever', rule.target.table, datetime.datetime.now(), rule, segments)
@profile.map
def map(record, params):
from fever.support import dumps
#from disco.node import worker
from disco.worker.classic import worker
params.c += 1
uid = "{0}:{1}".format(worker.this_partition(),params.c)
for key in record[1].items():
yield dumps(key + (key[1].__class__.__name__,)), 1
yield dumps(('__total__', uid, None)), 1
@profile.reduce
def reduce(iter, params):
from collections import defaultdict
counts = defaultdict(float)
last = None
for key, value in iter:
if key == last:
continue
last = key
attribute,attr_value,attr_type = json.loads(key)
counts[attribute] += 1
total = counts['__total__']
for key, val in counts.items():
yield key, [val, val/total]
cube = rule(name='cube',
year=(lambda r: r['created_at'].year),
month=(lambda r: r['created_at'].month),
day=(lambda r: r['created_at'].day),
hour=(lambda r: r['created_at'].hour),
)
@cube.declare_targets
def target(rule, nodes):
profile_node = 'profile_{0}'.format(rule.source.table)
if profile_node in nodes:
return rule.__class__._target(rule, nodes)
else:
return []
@cube.load('attributes')
def load(rule, target_node, nodes):
# Load the file profile_<node> into params.attributes prior to running the job
table = 'profile_' + target_node._segments[0].table
return [nodes[table]]
@cube.map
def map(record, params):
from fever.support import dumps
if not hasattr(params, 'sorted'):
params.sorted = sorted(params.attributes.items())
vals = []
for attr, (count, coverage) in params.sorted:
if coverage < .4:
vals.append((attr, record[1].get(attr)))
yield dumps(vals), 1
@cube.reduce
def reduce(iter, params):
from disco.util import kvgroup
import json
counts = {}
for k, vs in kvgroup(sorted(iter)):
record = dict([i for i in json.loads(k) if i[1] is not None])
record['count'] = sum(vs)
yield ' ', record