Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Done] Sync master client between passes and fix recordio split #2948

Merged
merged 22 commits into from
Jul 27, 2017

Conversation

typhoonzero
Copy link
Contributor

@typhoonzero typhoonzero commented Jul 18, 2017

Related #2626
Fix #3039

  • fix recordio split when actual record size < lines_per_file
  • master manage num_passes

Copy link
Contributor

@helinwang helinwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

非常赞引入了pass的概念。感觉很必须。

我觉得master不必知道trainer需要训练几个pass,而是我们需要一个方法让trainer来知道当前的pass结束了,trainer自己能够控制什么时候停止训练。

原因是当我们来看本地训练,data reader并不管trainer要做几个pass,reader()每被调用一次都产生"1个pass的generator",当这个generator被读完,trainer再次调用reader(),读下一个pass:https://github.com/PaddlePaddle/Paddle/blob/develop/python/paddle/v2/trainer.py#L142

master在这里相当于是远程的reader,应该是和本地的reader类似的,所以我觉得应该master发现过了一个pass,GetTask返回EOF,对应的Python代码需要改一下,检查EOF(比如把EOF设成-1或者1),发现EOF就return:https://github.com/PaddlePaddle/Paddle/blob/develop/python/paddle/v2/reader/creator.py#L113 ,所以感觉master本身并不需要知道trainer要训练几个pass。

@@ -106,6 +109,8 @@ func NewService(store Store, chunksPerTask int, timeoutDur time.Duration, failur
s.taskQueues.Pending = make(map[int]taskEntry)
s.ready = make(chan struct{})
s.store = store
s.numPasses = 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could be wrong, but from the code I read, people don't usually explicitly initialize 0 values (Go already do it). It's still correct to initialize, but adds few more lines of code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should init with 1? Train always need at least 1 pass.

@@ -315,6 +328,25 @@ func (s *Service) logFields() log.Fields {
}
}

// updateQueuePasses check if tasks are done, move to todo to add another pass
// IMPORTANT: must run under lock
func (s *Service) updateQueuePasses() error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Look at the name from the English language perspective, "update pass" only mean "change pass", does not have explicit meaning of "moving to next pass", maybe nextPass could describe the intention better?

if len(s.taskQueues.Failed) > 0 {
s.taskQueues.Todo = append(s.taskQueues.Todo, s.taskQueues.Failed...)
}
s.currPass++
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里已经进入了下一个pass,为何还是return errors.New("no more available task")

另外,既然进入下一个pass的前提条件是len(s.taskQueues.Pending) == 0,是由pending task complete / finish导致的,那感觉不应该跟GetTask相关(现在GetTask调用了updateQueuePasses),而是只跟TaskFinished / TaskFailed有关。

return nil
// update queues if pass finishes
errPass := s.updateQueuePasses()
if errPass.Error() == "no more available task" {
Copy link
Contributor

@helinwang helinwang Jul 19, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里不是RPC的client端,还是直接比较err更好:

var errNoMoreTask = errors.New("no more available task")

func updateQueuePasses() {
  // ...
  return errNoMoreTask
}

if errPass == errNoMoreTask {
  // ...
}

if i % max_lines_to_shuffle == 0 and i >= max_lines_to_shuffle:
write_data(w, lines)
if i % line_count == 0 and i >= line_count:
write_data(indx_f, lines)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这样改了之后,是每满了line_count条数据,就轮询写到一个recordIO文件。有一个问题是当line_count=100,num_shards=10的时候,如果只有50条数据,只会产生一个文件,而不是10个文件。

另外为何num_shards这个参数被删掉了?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这是又改回了我的上一个版本:)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

num_shards只有在文件总长度确定的时候,输出的文件数才能和参数num_shards保持一致,这样的表现会比较费解,而且输入是reader(),如果要确定总record个数,需要先完整读取一次,获取count。比较耗时。所以只指定每个文件的line_count,生成多少文件和reader输入数据量有关。

Copy link
Contributor

@gongweibao gongweibao Jul 20, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

如果要确定总record个数,需要先完整读取一次,获取count。比较耗时。所以只指定每个文件的line_count

并没有这个过程。参考文档:https://github.com/PaddlePaddle/Paddle/blob/develop/doc/design/cluster_train/data_dispatch.md#文件预处理

Copy link
Contributor

@helinwang helinwang Jul 20, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@typhoonzero 之前的实现是不会扫描record总数,当record个数小于num_shards的时候生成的文件数不是num_shards,而是record个数(是说这个情况比较让人费解吗?),其他情况生成的文件数会是num_shards
我的观点可能不对:如果我是用户我会比较倾向于指定生成多少个文件,而不是指定每个文件多少个记录。

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

当record个数小于num_shards的时候生成的文件数不是num_shards

抱歉之前没有仔细Review,发现num_shards的设计有一些小问题:

  • 当record个数小于num_shards的时候会生成空文件。
  • 写数据时是同时打开num_shards个文件来写,感觉这个设计不是很必要。

另外参考Linux split也是同时支持按文件总数和单个文件大小的方式来切分,那我们要不就先选择一个比较简单的实现,然后再补充另外一种方式?

BTW,感觉line_count会简单一些。。

Copy link
Contributor

@helinwang helinwang Jul 21, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Yancey1989 好的,有道理👍,之前我的想法欠妥,导致了@gongweibao还从现在这个PR的方案改成了num_shards的方案,不好意思!

if i % max_lines_to_shuffle == 0 and i >= max_lines_to_shuffle:
write_data(w, lines)
if i % line_count == 0 and i >= line_count:
write_data(indx_f, lines)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这是又改回了我的上一个版本:)

@@ -40,9 +40,9 @@ func TestGetFinishTask(t *testing.T) {
panic(err)
}
go func(l net.Listener) {
s, err := NewService(&InMemStore{}, chunkPerTask, time.Second, 1)
s, e := NewService(&InMemStore{}, chunkPerTask, time.Second, 1)
if err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

e?err?

Copy link
Contributor

@helinwang helinwang Jul 20, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gongweibao 这里应该是开了go vet shadow检测出这个err shadow了外面的err,所以改成e了。虽然go vet shadow报出来的情况不一定是错误,让它不报错需要改变量名,但开了它可以避免shadow出错的情况(这种情况出bug之后不容易发现),所以我觉得开go vet shadow是个不错的选择。

@typhoonzero
Copy link
Contributor Author

typhoonzero commented Jul 20, 2017

@helinwang 我对比了下,的确还是trainer端来判断会好点:

  • master端处理pass:好处是如果有的trainer比较慢,下一轮pass的task可以马上下发,而不需要等所有trainer的上一个pass完成后,再set_dataset生成新的pass。问题是event_handler在pass结束后不能执行。
  • trainer端处理pass: trainer日志可以看到当前训练到哪个pass,pass的event_handler可以生效。和local训练表现保持一致。

这个实现还需要再修改下。

@helinwang
Copy link
Contributor

helinwang commented Jul 20, 2017

@typhoonzero 赞,我之前也是想"下一轮pass的task可以马上下发,而不需要等所有trainer的上一个pass完成后,再set_dataset生成新的pass",就没有加pass的概念。
的确,看来还是有一个明确的pass(所有处理完毕)会比较清晰。

}
s.currPass++
}
return errors.New("no more available task")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can add a const error string

const NoMoreAvailableTask = "no more available task"

@typhoonzero
Copy link
Contributor Author

typhoonzero commented Jul 21, 2017

This version of the master client is able to sync before starting the next pass, though a bit tricky to use:

for pass := 0; pass < 5; pass++ {
  c.PassStart()
  c.SetDataset([]string{path})
  for {
    r, e := c.NextRecord()
    if e != nil && e.Error() == master.AllTaskFinishError.Error() || e.Error() == master.NoMoreAvailableError.Error() {
      break
    }
  }
}

@typhoonzero
Copy link
Contributor Author

Still need to update the python side and the demos.

But the go part is ready to review. @helinwang @Yancey1989

Copy link
Contributor

@helinwang helinwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry have not finished the review yet, will continue soon.

hooks:
- id: clang-formater
- repo: https://github.com/PaddlePaddle/pre-commit-golang
sha: 16398aeccf263adaf53b2495eed0406347d76281
sha: 762c7c84af6ddce7cbcbe030217856fd0b5aec46
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

如果可以过的话,要不要还是把golintgotype(gotype可以检查编译是否通过)加上?

} else if err.Error() == NoMoreAvailableError.Error() {
log.Errorf("Got NoMoreAvailableError, sleep 3 seconds and continue, %s", err)
c.ch <- record{nil, NoMoreAvailableError}
// time.Sleep(3 * time.Second)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No commented out code please :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

if err.Error() == AllTaskFinishError.Error() {
log.Infof("Got AllTaskFinishError, stopping getRecords routine.")
c.ch <- record{nil, AllTaskFinishError}
break
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里可能需要return。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

c.ch <- record{nil, AllTaskFinishError}
break
} else if err.Error() == NoMoreAvailableError.Error() {
log.Errorf("Got NoMoreAvailableError, sleep 3 seconds and continue, %s", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Got NoMoreAvailableError"和"%s"重复了,考虑只保留一个?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

合并在了一个if中。

@@ -71,6 +88,11 @@ func (c *Client) getRecords() {
// correct, but a reasonable approximation.
err = c.taskFinished(t.Meta.ID)
if err != nil {
if err.Error() == AllTaskFinishError.Error() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

taskFinished返回AllTaskFinishError稍有奇怪,从逻辑上来讲getTask那一个地方返回貌似就可以了(代码更简单)。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. 同理TaskFailed也不返回AllTaskFinishError

return c.conn.Call("Service.SetDataset", globPaths, nil)
err := c.conn.Call("Service.SetDataset", globPaths, nil)
// start to getRecords go-routine before each pass
go c.getRecords()
Copy link
Contributor

@helinwang helinwang Jul 21, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

建议用

c.gotRecordsOnce.Do(func() {
  go c.getRecords()
})

reference: https://golang.org/pkg/sync/#Once.Do

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

如果c.gotRecordsOnce作为client的成员,需要在更新pass的时候重新初始化一个once对象,否则c.getRecords()不能再次启动了。在下一个PR中Refine下额。我把这个放到一个issue里。

@@ -127,11 +154,44 @@ func (c *Client) taskFailed(meta TaskMeta) error {
return c.conn.Call("Service.TaskFailed", meta, nil)
}

// tool function for testing output goroutine ids
func goid() int {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

client_test.go里面已经有这个函数,这里是否不必有?
另外,也没见goid被调用?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Removed.

@@ -19,6 +21,12 @@ const (
dialTimeout = 5 * time.Second
)

// AllTaskFinishError occur when tasks are in done or failed state.
var AllTaskFinishError = errors.New("AllTaskFinishError")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

建议error message的内容用英文短句的格式:

var AllTaskFinishError = errors.New("all task finished")
var NoMoreAvailableError = errors.New("no more available task")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

failureMax int
ready *sync.Cond
store Store
mu sync.Mutex
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mutex变量一般会上面空一行,表示这个mutex是保护下面的局部变量:

type foo struct {
  a int

  mu sync.Mutex
  protected_b int
  protected_c int
  
  d int
}

For example here and here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. 是个好方法~~

@@ -43,24 +43,31 @@ func NewClient(addrCh <-chan string, bufSize int) *Client {
c.conn = connection.New()
c.ch = make(chan record, bufSize)
go c.monitorMaster(addrCh)
go c.getRecords()
// FIXME: async connection creation
time.Sleep(time.Second)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious why do we need to sleep one second?

randStart := rand.Int()
counter := 0
timestamp := time.Now().Nanosecond()
id := timestamp + randStart + counter
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, why random id (rather than starting from 0) is necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With multiple datasets, task id may duplicate since partition will be called multiple times.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

taskid一旦变成uniqueid里边有些细微的差别:#2752 (comment)

@@ -118,7 +136,9 @@ func NewService(store Store, chunksPerTask int, timeoutDur time.Duration, failur
s.failureMax = failureMax
s.taskQueues = taskQueues{}
s.taskQueues.Pending = make(map[int]taskEntry)
s.ready = make(chan struct{})
// s.ready = make(chan struct{})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no commented out code please :)

log.WithFields(s.logFields()).Warningln("All tasks failed.")
return err
log.WithFields(s.logFields()).Warningln("All tasks failed, may start next pass")
// s.initDone = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No commented out code please :)


// AddClient need to be called when a new master client connected.
// We need to keep a internal client count to sync all clients when pass ends.
func (s *Service) AddClient(dummyIn int, dummyOut *int) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the bad example I created, latter I realized this can be changed to:

func (s *Service) AddClient(_ int, _ *int) error {
  // ...
}


// AddClient need to be called when a new master client connected.
// We need to keep a internal client count to sync all clients when pass ends.
func (s *Service) AddClient(dummyIn int, dummyOut *int) error {
Copy link
Contributor

@helinwang helinwang Jul 24, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

可能说的不对:pass调度确实比较复杂,但是我认为AddClientPassStartPassFinish会让master变得容易出错:如果client调用了AddClient,自己挂了,是不是新的pass就无法启动了。可能还有许多类似的edge case。

我觉得容易出错的原因在于我们让unreliable的trainer去控制是否开始下一pass(PassStart),认为应该让master去做这个控制:

能否trainer找master set_dataset的时候,返回一个当前pass的值,作为getTask的参数,这样master就可以自己决定,开始一个新的pass,当trainer取task的时候对比一下如果pass不一样直接返回“pass finished”,如果一样就继续以前的逻辑。这样应该就不需要通过AddClientPassStartPassFinish让trainer去控制pass是否开始。

def recordio(paths, buf_size=100):
    # ...
    def reader():
        c = cloud(addr, buf_size)
        cur_pass, error_code = c.set_dataset(paths)
        # TODO: handle error_code
        while True:
            r, err = client.next_record(cur_pass)
            if err == error_code_pass_finished:
                break
            elif err == error_code_no_more_tasks:
                # wait for next pass to start
                time.sleep(3)
                break
            elif err != 0:
                # TODO: handle error
            yield r

        c.close()

    return reader
func (s *Service) SetDataset(globPaths []string, pass *int) error {
  if len(globPaths) == 0 {
    return errors.New("no dataset specified")
  }

  s.mu.Lock()
  defer s.mu.Unlock()

  *pass = s.curPass
  if s.initDone {
    // Already initialized. All trainer will call
    // SetDataset, but we only handle the first one. Treat
    // other calls as successful but do nothing.
    return nil
  }

  // init...
}

func (s *Service) GetTask(pass int, task *Task) error {
  // 不让trainer控制是否开始下一个pass的另一个
  // 好处是我们不需要支持多次初始化dataset,
  // 也就不需要用condition variable,可以用回行数更少的`close(s.ready)` & `<-s.ready`
  select {
    case <-s.ready:
  }

  s.mu.Lock()
  defer s.mu.Unlock()

  if pass != s.curPass {
    return ErrPassFinished
  }

  if len(s.taskQueues.Todo) == 0 {
    return ErrNoMoreAvailable
  }

  // assign task ...
}

// move to next pass in func `TaskFailed` and `TaskDone`.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. 的确 AddClient会使trainer faulttolerant有问题。
  2. trainer端的reader,在pass结束后,必须要break,这样才能触发evaluator和test,并触发保存这个pass的模型输出。这样infer时才能从多个pass选择最好的结果,否则结果会保存在一个pass中,但可能不是最好的结果。
  3. 用master保存curr_pass,可以在len(s.taskQueues.Todo)==0 && len(s.taskQueues.Pending)==0curr_pass++。reader在可以尝试下这样的实现。
  4. PR最开始是使用master管理的pass,需要找到一种合适并简单的方法可以完成同步。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

想了一下,这样的设计仍然存在两个地方,需要同步所有的trainer,始终不能避免同步的问题

  1. 每一轮pass调用set_dataset时,需要确保多个client只被调用一次。如果用channel close模拟广播,新的一轮pass这个channel是不能reopen的,不能确保第二轮pass 只被调用一次。(使用Once也有一样的问题,Once的variable同样需要renew,然而renew是需要每轮pass执行一次的),目前只有Cond能实现
  2. curr_pass++也可能被多个client同时执行,同样需要确保所有的client只执行一次。

Copy link
Contributor Author

@typhoonzero typhoonzero Jul 25, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

避免AddClient出现client容错的问题,可以在master端watch trainer的etcd节点更新clientCount. Adding: #3051

Related: #3053

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

刚才和 @typhoonzero @dzhwinter 异步SGD需要有pass同步的概念?同步任务里边的长尾问题非常影响效率。

Copy link
Contributor

@helinwang helinwang Jul 25, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gongweibao 每个trainer需要知道一个pass结束了,因为trainer callback有这个event。
有个问题是怎么定义”一个新的pass开始了“

有两个方式:

  1. 简单的就是todo和pending都空了,把done放到todo。这样就算开始了一个新的pass。这样做会有的问题想你提到的一样:todo空了,pending还没做完。这个时候其它trainer就空等了。
  2. 复杂一些的是只要todo空了,就开始新的pass,pending中的task被移除的时候要查看那个task是现在正在运行的pass被调度的还是以前的pass被调度的,如果是以前的pass调度的,放到todo,不然放到done。这个的好处是trainer不会空转。

这个PR解决的主要是解决新的pass产生之后,如何把这个信息传递给trainer。我觉得与你提出的怎么定义”一个新的pass开始了“(很重要)是两个可以分开解决的问题。建议先用简答的第一个方式来实现,把这个PR merge,以后再优化这个问题。

伟宝觉得呢?

Copy link
Contributor

@helinwang helinwang Jul 25, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curr_pass++也可能被多个client同时执行,同样需要确保所有的client只执行一次。

cur_pass++是在master收到最后一个pending task success或者fail的时候做的,并不会被多个client同时执行。

我认为既然是master在maintain task queue,应该由master决定什么时候新的pass开始。不然今天约个会讨论下?

@helinwang
Copy link
Contributor

Sorry last time I have not reviewed all the code due to time constraint. Now the comment is made after reviewing all the code.

@helinwang
Copy link
Contributor

helinwang commented Jul 24, 2017

@typhoonzero This PR always stuck in CI (last time stuck 5 hours, I restarted it, but got stuck again), could you maybe rebase / pull it with the latest develop branch?

overtime: 2h:10m  overtime: 2h:10m

@typhoonzero typhoonzero changed the title [WIP]Fix recordio split and task passes [WIP] Sync master client between passes and fix recordio split Jul 25, 2017
@typhoonzero typhoonzero changed the title [WIP] Sync master client between passes and fix recordio split [Done] Sync master client between passes and fix recordio split Jul 26, 2017
c.ch = make(chan record, c.bufSize)
// FIXME: connection is created asyncrosly in monitorMaster go routine,
// ensure the connection is ready for use before calling c.addClient.
time.Sleep(time.Second)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the comment and Sleep still necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. Connection is created in a go-routine monitorMaster, immediate use of the conn may due to error.

return c, nil
}

// StartGetRecords must be called at beginning of each pass
func (c *Client) StartGetRecords(passID int) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

赞,以前我改代码的时候感觉go c.getRecords()不好放,确实有一个这样的函数会好很多。

return c.conn.Call("Service.SetDataset", globPaths, nil)
err := c.conn.Call("Service.SetDataset", globPaths, nil)
// start to getRecords go-routine before each pass
//go c.getRecords()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No commented out code please :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

func (c *Client) SetDataset(globPaths []string) error {
return c.conn.Call("Service.SetDataset", globPaths, nil)
err := c.conn.Call("Service.SetDataset", globPaths, nil)
// start to getRecords go-routine before each pass
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this comment still necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, Done.

"testing"
"time"

"github.com/PaddlePaddle/Paddle/go/master"
"github.com/PaddlePaddle/recordio"
)

// tool function for testing output goroutine ids
func goid() int {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

好奇goid是个特殊的函数go会自动调用吗?总是只看到goid的定义,但是看不见哪里使用的。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

保留这个函数为了方便调试多线程竞态。

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

明白了。

// generate uniq id across job using nanosecond + randint + counter
// FIXME(typhoonzero): this is a workaround, use uuid
randStart := rand.Int()
counter := 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe just use id++, so we don't need this extra variable?
Anyway, since partition now only called once, do we still need to randomize?

close(s.ready)
s.initDone = true
return nil
}

// checkAllTaskDone check if all task are in "Done" or "Failed" state,
// no more tasks are "Todo" or "Pending", so that next pass may start
// must under s.ready.L.Lock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably need to revise comment, since s.ready.L.Lock() is no longer there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done already

s.taskQueues.Done = nil
log.WithFields(s.logFields()).Infoln("No more todo task, but trainer is requesting task to do. Move all done task to todo.")
// FIXME: this may not exist
allFinish := s.checkAllTaskDone()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the state len(s.taskQueues.Todo) == 0 && len(s.taskQueues.Done) == 0 && len(s.taskQueues.Pending) == 0 will never happen, because it's already handled in line 442 and moved away from that state. Probably line 388 - 402 can be changed to:

if len(s.taskQueues.Todo) == 0 {
  log.WithFields(s.logFields()).Warningln("No more available task.")
  return ErrNoMoreAvailable
}

Maybe checkAllTaskDone and ErrAllTaskFinish could be removed.

Copy link
Contributor Author

@typhoonzero typhoonzero Jul 27, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This condition will still happen when all tasks are timeout or manually called TaskFailed.

ErrAllTaskFinish is not needed, change to ErrAllTaskFailed

go c.getRecords()
})

func (c *Client) NextRecord(passID int) ([]byte, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

passID is not used by this function, maybe not necessary.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -101,6 +100,12 @@ func paddle_release_master_client(client C.paddle_master_client) {
remove(client)
}

//export paddle_start_get_records
func paddle_start_get_records(client C.paddle_master_client, pass C.int) {
Copy link
Contributor

@helinwang helinwang Jul 26, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does trainer know which pass is the training currently in? We need someway to get this information, maybe paddle_set_dataset could return this information, or creating a new function seems fine as well.

Btw, this function is not called from anywhere is this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trainer don't need to know the exact master side pass count, use local pass count will do the work. Trainer just needs to know when to break and when to wait.

Copy link
Contributor

@helinwang helinwang Jul 27, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Say if the job trains on pass 100, and one trainer gets killed and started again, should it set local pass to 0 and start from there (call get record and get error, increment pass number by 1, and call again until reaching pass 100)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. That's right, this just works, I'll refine this in another PR later.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, sure.

return c, nil
}

// StartGetRecords must be called at beginning of each pass
func (c *Client) StartGetRecords(passID int) {
c.passID = passID
Copy link
Contributor

@helinwang helinwang Jul 26, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CI failed because

[18:32:04] :	 [Step 1/1] WARNING: DATA RACE
[18:32:04] :	 [Step 1/1] Read at 0x00c4200782c0 by goroutine 72:
[18:32:04] :	 [Step 1/1]   github.com/PaddlePaddle/Paddle/go/master.(*Client).getRecords()
[18:32:04] :	 [Step 1/1]       /paddle/build/go/src/github.com/PaddlePaddle/Paddle/go/master/client.go:123 +0x75
[18:32:04] :	 [Step 1/1] 
[18:32:04] :	 [Step 1/1] Previous write at 0x00c4200782c0 by goroutine 7:
[18:32:04] :	 [Step 1/1]   [failed to restore the stack]
[18:32:04] :	 [Step 1/1] 
[18:32:04] :	 [Step 1/1] Goroutine 72 (running) created at:
[18:32:04] :	 [Step 1/1]   github.com/PaddlePaddle/Paddle/go/master.(*Client).StartGetRecords()
[18:32:04] :	 [Step 1/1]       /paddle/build/go/src/github.com/PaddlePaddle/Paddle/go/master/client.go:118 +0x67
[18:32:04] :	 [Step 1/1]   github.com/PaddlePaddle/Paddle/go/master.TestGetFinishTask()
[18:32:04] :	 [Step 1/1]       /paddle/build/go/src/github.com/PaddlePaddle/Paddle/go/master/client_internal_test.go:155 +0x879
[18:32:04] :	 [Step 1/1]   testing.tRunner()
[18:32:04] :	 [Step 1/1]       /usr/local/go/src/testing/testing.go:657 +0x107

line 117 c.passID = passID and line 123 t, err := c.getTask(c.passID) is a data race (not obvious)

It's trigged by

for i := 0; i < 10; i++ {
  // init pass data                                                                                                                                                     
  c.StartGetRecords(i)
  checkOnePass(i)
}

It's not data race if only c.StartGetRecords is only called once, because the go keyword in "line 118 go c.getRecords()" is a barrier. So "line 123 t, err := c.getTask(c.passID)" happens after "line 117 c.passID = passID". But there is no such guarantee if c.StartGetRecords is called multiple times. For example, the second call to line 117 is a race with the first call to line 123.

Go race detector is good! (nice test btw)

I think the easiest solution is remove passID from the Client struct, change it to a function parameter:

func (c *Client) StartGetRecords(passID int) {
  go c.getRecords(passID)
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will not actually cause data race because c.StartGetRecords will be called only if getRecords exits, anyway simply pass passID is simpler!

@helinwang helinwang mentioned this pull request Jul 26, 2017
}
return id
}

func TestNextRecord(t *testing.T) {
const (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can use the temp file instead of specify file:

path, err := ioutil.TempFile("", "master_client_Testfull")
if err != nil {
    log.Fatal(err)
}
defer os.Remove(path.Name())

Copy link
Contributor

@helinwang helinwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM++ except on comment, and after taking a look at: #2948 (comment) and #2948 (comment)

// TODO(typhoonzero): deal with failed tasks
s.taskQueues.Failed = []taskEntry{}
log.WithFields(s.logFields()).Warningf("all task finished, add new pass data, newpass: %d.", s.currPass)
return nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to snapshot: return s.snapshot()

Copy link
Contributor

@gongweibao gongweibao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@typhoonzero typhoonzero merged commit c10121e into PaddlePaddle:develop Jul 27, 2017
@typhoonzero typhoonzero deleted the fix_recordio_split branch August 11, 2017 06:53
heavengate pushed a commit to heavengate/Paddle that referenced this pull request Aug 16, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Pass sync acroosing clients.
4 participants