-
Notifications
You must be signed in to change notification settings - Fork 249
debugging
Debugging is a little bit different. Let's analyze an actual user issue: https://github.com/chrislusf/glow/issues/36
The code is here:
package main
import (
_ "bufio"
"encoding/gob"
"flag"
"fmt"
_ "github.com/chrislusf/glow/driver"
"github.com/chrislusf/glow/flow"
_ "io"
"io/ioutil"
_ "os"
"strconv"
"strings"
"sync"
)
type WordSentence struct {
Word string
LineNumber int
}
type AccessByAgeGroup struct {
Addr string
Info MemInfo
}
type MemInfo struct {
Addr string
Size int
Count int
}
func init() {
gob.Register(MemInfo{})
}
func goStart(wg *sync.WaitGroup, fn func()) {
wg.Add(1)
go func() {
defer wg.Done()
fn()
}()
}
func testWordCount2() {
println("testWordCount2")
flowOut2 := make(chan AccessByAgeGroup)
chIn := make(chan string)
f2 := flow.New()
f2.Channel(chIn).Partition(1).Map(func(line string) MemInfo {
//println(line)
words := strings.Split(line, ":")
//println(words[0]+" "+words[1])
s, _ := strconv.ParseInt(words[1], 16, 0)
return MemInfo{words[0], int(s), 1}
}).Map(func(ws MemInfo) (string, MemInfo) {
return ws.Addr, ws
}).ReduceByKey(func(x MemInfo, y MemInfo) MemInfo {
println("reduce:")
return MemInfo{x.Addr, x.Size + y.Size, x.Count + y.Count}
}).AddOutput(flowOut2)
flow.Ready()
var wg sync.WaitGroup
goStart(&wg, func() {
f2.Run()
})
goStart(&wg, func() {
for t := range flowOut2 {
fmt.Printf("%s size:%-8d count:%-8d\n",
t.Info.Addr, t.Info.Size, t.Info.Count)
}
})
bytes, err := ioutil.ReadFile("passwd")
if err != nil {
println("Failed to read")
return
}
lines := strings.Split(string(bytes), "\r\n")
for _, line := range lines {
chIn <- line
}
wg.Wait()
}
func main() {
flag.Parse()
testWordCount2()
}
The user has tried very hard to debug, but not much clue besides just printed out the messages and saw the ReduceByKey() function is not working. Here is how I found out the issue here.
I started the program in standalone mode. Do not try the distributed mode. Always make sure the code works in standalone mode first!
Well, it did hang. So I ctrl+C the program, this is part of the output:
step:Input0
output : d0
shard:0 time:2.038281735s processed 2779
step:Map1
input : d0
shard:0 time:2.038321665s processed 2779
output : d1
shard:0 time:2.038380948s processed 2779
step:Map2
input : d1
shard:0 time:2.038388886s processed 2779
output : d2
shard:0 time:2.038408013s processed 2779
step:LocalSort3
input : d2
shard:0 time:2.03841651s processed 2779
output : d3
shard:0 time:2.038403271s processed 0
step:LocalReduceByKey4
input : d3
shard:0 time:2.038411775s processed 0
output : d4
shard:0 time:2.038419295s processed 0
step:MergeSorted5
input : d4
shard:0 time:2.038427534s processed 0
output : d5
shard:0 time:2.038420435s processed 0
step:LocalReduceByKey6
input : d5
shard:0 time:2.038428171s processed 0
output : d6
shard:0 time:2.038496247s processed 0
This showed all the steps a flow has, and how much time, processed bytes for each step and shard.
This shows the "step:LocalSort3" is not started! Now what?
Let's do a "kill -3" for the program. It's a long output. From it, the map function looks suspicious. It somehow is still running! But it just processes one element and continue to the next. Why is it stuck?
goroutine 23 [chan receive]:
github.com/chrislusf/glow/flow.(*Dataset).Map.func1(0xc8200a23c0)
/Users/chrislu/dev/gopath/src/github.com/chrislusf/glow/flow/dataset_map.go:27 +0x9b
github.com/chrislusf/glow/flow.(*Task).RunTask(0xc8200a23c0)
/Users/chrislu/dev/gopath/src/github.com/chrislusf/glow/flow/step_task.go:33 +0x2d
github.com/chrislusf/glow/flow.(*Step).RunStep.func1(0xc820067540, 0x0, 0xc8200a23c0)
/Users/chrislu/dev/gopath/src/github.com/chrislusf/glow/flow/step.go:22 +0x48
created by github.com/chrislusf/glow/flow.(*Step).RunStep
/Users/chrislu/dev/gopath/src/github.com/chrislusf/glow/flow/step.go:23 +0xd3
OK. Let's go to glow/flow/dataset_map.go:27
#27 for input := range task.InputChan() {
#28 invokeMapFunc(input)
#29 }
It's waiting for the input channel to close! Hmm, wait, the flow's input is provided by the user. Let's check whether the input channel is closed?
That's right: the "chIn" channel is not closed.
Hanging is actually one of the common problems. Always make sure the input channel is closed. Sorting is a blocking step. And Reduce() function needs to sort the elements first before the reducing action.