-
Notifications
You must be signed in to change notification settings - Fork 707
/
TypedTutorial.scala
243 lines (210 loc) · 9.35 KB
/
TypedTutorial.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
import cascading.pipe.Pipe
import com.twitter.scalding._
/**
Scalding Tutorial ported to use the Type-safe API (TDsl)
(rather than Cascading's Fields API). The examples here roughly correspond
to those in `tutorial/Tutorial{0..5}.scala`.
These tutorials are all run from this single file; which one is run can
be chosen with a command-line flag "--tutorial". For instance, to run the
first tutorial example:
> ./scripts/scald.rb --local tutorial/TypedTutorial.scala \
--tutorial 0 \
--input tutorial/data/hello.txt \
--output tutorial/data/output0.txt \
--words tutorial/data/word_scores.tsv
(Note: only tutorial 5 uses "word_scores.tsv")
**/
class TypedTutorial(args : Args) extends Job(args) {
args("tutorial") match {
/**
Tutorial {0,1}: Write out to a TSV file.
----------------------------------------
In this first version we will be as explicit as possible to show all
the steps required to go from a raw text file to a typed stream.
**/
case "0" | "1" => {
// The TextLine source splits the input by lines.
val textSource = TextLine(args("input"))
// Create a type-safe pipe from the TextLine.
val lines: TypedPipe[String] =
TypedPipe.from[String](textSource)
// Write the typed pipe out to a tab-delimited file.
lines.write(TypedTsv[String](args("output")))
}
/**
Tutorial 2: Simple map
----------------------
Reverse all the strings. Notice that we've now left off the [String] type.
Scala can generally infer these types for us, making the code cleaner.
**/
case "2" | "map" => {
// Create a typed pipe from the TextLine (of type TypedPipe[String] still)
TypedPipe.from(TextLine(args("input")))
// Transform each line, reversing it. Output is a new TypedPipe, still of String.
.map(_.reverse)
// Note, the types for the TypedTsv *can* be inferred by Scala here.
// However, it's best to specify them explicitly so that if the
// output type changes, it is detected and doesn't break the next
// thing to read from the output file.
.write(TypedTsv[String](args("output")))
}
/**
Tutorial 3: Flat Map
---------------------
Dump all the words.
**/
case "3" | "flatmap" => {
TypedPipe.from(TextLine(args("input")))
// flatMap is like map, but instead of returning a single item
// from the function, we return a collection of items. Each of
// these items will create a new entry in the data stream; here,
// we'll end up with a new entry for each word.
.flatMap(_.split("\\s"))
// output of flatMap is still a collection of String
.write(TypedTsv[String](args("output")))
}
/**
Tutorial 4: Word Count
----------------------
Now that we have a stream of words, clearly we're ready for
that most exciting of MapReduce examples: the Word Count.
**/
case "4" | "wordcount" => {
// Get the words (just like above in case "3")
val words = TypedPipe.from(TextLine(args("input")))
.flatMap(_.split("\\s"))
// To count the words, we use TypedPipe's `groupBy` method.
// However, this no longer returns a `TypedPipe[T]`, but rather
// a `Grouped[K,T]` based on the type of the key used to group by.
//
// groupBy accepts a function to determine the key for grouping.
// In the case of word count, let's imagine we want to make sure
// capitalization doesn't matter, so to come up with the key,
// we normalize it to lower case.
val groups : Grouped[String,String] = words.groupBy(_.toLowerCase)
// Next we specify what to do with each aggregation. In the case
// of word count, we simply want the size of each group. This
// operation results in a new `Grouped` that has the key (String,
// the lower case words), and the counts (Long).
//
// Note: To do more interesting aggregations, Scalding supports
// a variety of operations, such as `sum`, `reduce`, `foldLeft`,
// `mapGroup`, etc, that can all be applied efficiently on Monoids
// (primitives like Long, container types like `Map`, or custom
// monoids you define yourself). See the wiki for more details:
// https://github.com/twitter/scalding/wiki/Type-safe-api-reference
val counts = groups.size
// And finally, we dump these results to a TypedTsv with the
// correct Tuple type.
counts.write(TypedTsv[(String,Long)](args("output")))
}
/**
Tutorial 5: Demonstrate joins
-----------------------------
Associate a score with each word and compute a score for each line.
Note: this example is a bit contrived, but serves to demonstrate
how to combine multiple input sources.
**/
case "5" | "join" => {
// Load the scores for each word from TSV file and group by word.
val scores: Grouped[String,Double] =
// For TypedTsv, Scalding coerces the fields to the specified types,
// throwing an exception if any line fails.
TypedPipe.from(TypedTsv[(String,Double)](args("words")))
// group by word so we can join it
.group
// get the lines, this time from an 'OffsetTextLine' which is a
// typed wrapper on 'TextLine' that contains the 'byte offset' and
// text of each line in the file.
val lines: TypedPipe[(Long,String)] = TypedPipe.from(OffsetTextLine(args("input")))
// Split lines into words, but keep their original line offset with them.
val wordsWithLine : Grouped[String,Long] =
lines
.flatMap{ case (offset, line) =>
// split into words
line.split("\\s")
// keep the line offset with them
.map(word => (word.toLowerCase, offset))
}
// make the 'word' field the key
.group
// Associate scores with each word; merges the two value types into
// a tuple: [String,Long] join [String,Double] -> [String,(Long,Double)]
val scoredWords = wordsWithLine.join(scores)
// get scores for each line (indexed by line number)
val scoredLinesByNumber =
scoredWords
// select the line offset and score fields
.map{ case (word,(offset,score)) => (offset,score) }
// group by line offset (groups all the words for a line together)
.group
// compute total score per line
.sum
// Group and sum are often run together in this way.
// The `sumByKey` operation performs performs both.
// Associate the original line text with the computed score,
// discard the 'offset' field
val scoredLines: TypedPipe[(String,Double)] =
lines
// index lines by 'offset'
.group
// associate scores with lines (by offset)
.join(scoredLinesByNumber)
// take just the value fields (discard the 'line offset')
.values
// write out the final result
scoredLines.write(TypedTsv[(String,Double)](args("output")))
}
/**
Interoperability with Fields API
--------------------------------
Scalding also provides a thinner, un-type-safe wrapper over Cascading
which is known as the Fields API because each record has a number of
named "fields".
Most jobs can be done completely in the Typed API, but for compatibility,
there are ways to go back and forth between the two schemes, which the
next couple cases demonstrate.
**/
/**
Pipe vs. TypedPipe
------------------
TypedPipes can be easily converted to Pipes and vice-versa.
**/
case "pipes" => {
// calling 'read' on a source returns an un-typed Pipe
// TextLine, by default, contains two fields: 'offset, and 'line.
val rawPipe: Pipe = TextLine(args("input")).read
// To convert to a typed pipe, we must specify the fields we want
// and their types:
val lines: TypedPipe[(Long,String)] =
TypedPipe.fromPipe[(Long,String)](rawPipe, ('offset,'line))
// We can operate on this typed pipe as above, and come up with a
// different set of fields
val lineSizes: TypedPipe[Long] = lines.map{ case (offset,line) => line.length }
// To convert back to a Fields Pipe, we must specify the names of the fields:
val lineSizesField: Pipe = lineSizes.toPipe('size)
// finally, we can write out this untyped pipe with an untyped sink:
lineSizesField.write(Tsv(args("output")))
}
/**
Bonus: Typed blocks
-------------------
An alternative to working completely in typed mode is to use
`typed` blocks, which create a TypedPipe within the scope, and then
map the output back into an untyped Pipe. You specify the fields to
map in and out using the `->` pair passed to `typed()`.
**/
case "block" => {
// Get the .typed enrichment
import TDsl._
TextLine(args("input")).read
.typed('line -> 'size) { tp: TypedPipe[String] =>
// now operate on the typed pipe
tp.map(_.length)
}
// the final output will have just the 'size field
// and can be dumped using the un-typed Tsv source.
.write(Tsv(args("output")))
}
}
}