From 3f0919498cfcc55cd520252d02ee044673868a2a Mon Sep 17 00:00:00 2001 From: Yancey1989 Date: Thu, 13 Jul 2017 19:49:38 +0800 Subject: [PATCH 1/2] fetch record from master --- go/cmd/master/master.go | 14 +++++++++++--- go/master/client.go | 4 +++- python/paddle/v2/dataset/common.py | 3 +-- 3 files changed, 15 insertions(+), 6 deletions(-) diff --git a/go/cmd/master/master.go b/go/cmd/master/master.go index 54fa254863156..9eaf8c04ae01f 100644 --- a/go/cmd/master/master.go +++ b/go/cmd/master/master.go @@ -11,6 +11,7 @@ import ( "github.com/namsral/flag" log "github.com/sirupsen/logrus" + "github.com/topicai/candy" "github.com/PaddlePaddle/Paddle/go/master" "github.com/PaddlePaddle/Paddle/go/utils/networkhelper" @@ -20,11 +21,18 @@ func main() { port := flag.Int("port", 8080, "port of the master server.") ttlSec := flag.Int("ttl", 60, "etcd lease TTL in seconds.") endpoints := flag.String("endpoints", "http://127.0.0.1:2379", "comma separated etcd endpoints. If empty, fault tolerance will not be enabled.") - taskTimeoutDur := flag.Duration("task_timout_dur", 20*time.Minute, "task timout duration.") - taskTimeoutMax := flag.Int("task_timeout_max", 3, "max timtout count for each task before it being declared failed task.") - chunkPerTask := flag.Int("chunk_per_task", 10, "chunk per task.") + taskTimeoutDur := flag.Duration("task-timout-dur", 20*time.Minute, "task timout duration.") + taskTimeoutMax := flag.Int("task-timeout-max", 3, "max timtout count for each task before it being declared failed task.") + chunkPerTask := flag.Int("chunk-per-task", 10, "chunk per task.") + logLevel := flag.String("log-level", "info", + "log level, possible values: debug, info, warning, error, fatal, panic") flag.Parse() + level, e := log.ParseLevel(*logLevel) + candy.Must(e) + + log.SetLevel(level) + if *endpoints == "" { log.Warningln("-endpoints not set, fault tolerance not be enabled.") } diff --git a/go/master/client.go b/go/master/client.go index a2ca3f3ef8ce3..8079fb56a6256 100644 --- a/go/master/client.go +++ b/go/master/client.go @@ -2,6 +2,7 @@ package master import ( "os" + "time" "github.com/PaddlePaddle/Paddle/go/connection" "github.com/PaddlePaddle/recordio" @@ -38,7 +39,8 @@ func (c *Client) getRecords() { if err != nil { // TODO(helin): wait before move on with next // getTask call. - log.Errorln(err) + log.Errorf("Get task failed, sleep 3 seconds and continue, %s", err) + time.Sleep(3 * time.Second) continue } diff --git a/python/paddle/v2/dataset/common.py b/python/paddle/v2/dataset/common.py index 4a2eb59c340f5..19b25128cfb12 100644 --- a/python/paddle/v2/dataset/common.py +++ b/python/paddle/v2/dataset/common.py @@ -201,8 +201,7 @@ def close_writers(w): def write_data(w, lines): random.shuffle(lines) for i, d in enumerate(lines): - d = pickle.dumps(d, pickle.HIGHEST_PROTOCOL) - w[i % num_shards].write(d) + w[i % num_shards].write(str(d)) w = open_writers() lines = [] From d84e96a508ffef38266ebe36d56c0431f1b065a4 Mon Sep 17 00:00:00 2001 From: Yancey1989 Date: Fri, 14 Jul 2017 16:24:16 +0800 Subject: [PATCH 2/2] dumps with protocol 0 --- go/master/client.go | 2 -- python/paddle/v2/dataset/common.py | 5 ++++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/go/master/client.go b/go/master/client.go index 8079fb56a6256..a41e78bf14589 100644 --- a/go/master/client.go +++ b/go/master/client.go @@ -37,8 +37,6 @@ func (c *Client) getRecords() { for { t, err := c.getTask() if err != nil { - // TODO(helin): wait before move on with next - // getTask call. log.Errorf("Get task failed, sleep 3 seconds and continue, %s", err) time.Sleep(3 * time.Second) continue diff --git a/python/paddle/v2/dataset/common.py b/python/paddle/v2/dataset/common.py index 19b25128cfb12..ff1000b54aeda 100644 --- a/python/paddle/v2/dataset/common.py +++ b/python/paddle/v2/dataset/common.py @@ -201,7 +201,10 @@ def close_writers(w): def write_data(w, lines): random.shuffle(lines) for i, d in enumerate(lines): - w[i % num_shards].write(str(d)) + # FIXME(Yancey1989): + # dumps with protocol: pickle.HIGHEST_PROTOCOL + o = pickle.dumps(d) + w[i % num_shards].write(o) w = open_writers() lines = []