Skip to content

Commit

Permalink
Code refactoring. replace channel m.allDone with WaitGroup and channe…
Browse files Browse the repository at this point in the history
…ls (stop|send|ping)Routine with stopRoutines. I think it should help to fix issue #8
  • Loading branch information
shelomentsevd committed Aug 15, 2017
1 parent cc96d8b commit 7a7c9f6
Showing 1 changed file with 21 additions and 32 deletions.
53 changes: 21 additions & 32 deletions mtproto.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,12 @@ import (
)

type MTProto struct {
addr string
conn *net.TCPConn
f *os.File
queueSend chan packetToSend
stopSend chan struct{}
stopRead chan struct{}
stopPing chan struct{}
allDone chan struct{}
addr string
conn *net.TCPConn
f *os.File
queueSend chan packetToSend
stopRoutines chan struct{}
allDone sync.WaitGroup

authKey []byte
authKeyHash []byte
Expand Down Expand Up @@ -188,10 +186,8 @@ func (m *MTProto) Connect() error {

// start goroutines
m.queueSend = make(chan packetToSend, 64)
m.stopSend = make(chan struct{}, 1)
m.stopRead = make(chan struct{}, 1)
m.stopPing = make(chan struct{}, 1)
m.allDone = make(chan struct{}, 3)
m.stopRoutines = make(chan struct{})
m.allDone = sync.WaitGroup{}
m.msgsIdToAck = make(map[int64]packetToSend)
m.msgsIdToResp = make(map[int64]chan response)
m.mutex = &sync.Mutex{}
Expand Down Expand Up @@ -237,21 +233,11 @@ func (m *MTProto) Connect() error {
}

func (m *MTProto) Disconnect() error {
// stop ping routine
m.stopPing <- struct{}{}
close(m.stopPing)
// stop ping, send and read routine by closing channel stopRoutines
close(m.stopRoutines)

// stop send routine
m.stopSend <- struct{}{}
close(m.stopSend)

// stop read routine
m.stopRead <- struct{}{}
close(m.stopRead)

<-m.allDone
<-m.allDone
<-m.allDone
// Wait until all goroutines stopped
m.allDone.Wait()

// close send queue
close(m.queueSend)
Expand Down Expand Up @@ -283,10 +269,11 @@ func (m *MTProto) reconnect(newaddr string) error {
}

func (m *MTProto) pingRoutine() {
defer func() { m.allDone <- struct{}{} }()
m.allDone.Add(1)
defer func() { m.allDone.Done() }()
for {
select {
case <-m.stopPing:
case <-m.stopRoutines:
return
case <-time.After(60 * time.Second):
m.queueSend <- packetToSend{TL_ping{0xCADACADA}, nil}
Expand All @@ -295,10 +282,11 @@ func (m *MTProto) pingRoutine() {
}

func (m *MTProto) sendRoutine() {
defer func() { m.allDone <- struct{}{} }()
m.allDone.Add(1)
defer func() { m.allDone.Done() }()
for {
select {
case <-m.stopSend:
case <-m.stopRoutines:
return
case x := <-m.queueSend:
err := m.sendPacket(x.msg, x.resp)
Expand All @@ -310,7 +298,8 @@ func (m *MTProto) sendRoutine() {
}

func (m *MTProto) readRoutine() {
defer func() { m.allDone <- struct{}{} }()
m.allDone.Add(1)
defer func() { m.allDone.Done() }()
for {
// Run async wait for data from server
ch := make(chan interface{}, 1)
Expand All @@ -330,7 +319,7 @@ func (m *MTProto) readRoutine() {
}(ch)

select {
case <-m.stopRead:
case <-m.stopRoutines:
return
case data := <-ch:
if data == nil {
Expand Down

0 comments on commit 7a7c9f6

Please sign in to comment.