-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathinitiator.go
146 lines (117 loc) · 2.58 KB
/
initiator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package simplefixgo
import (
"context"
"fmt"
"net"
"sync"
"time"
"golang.org/x/sync/errgroup"
)
// InitiatorHandler is an interface implementing basic methods required for handling the Initiator object.
type InitiatorHandler interface {
ServeIncoming(msg []byte)
Outgoing() <-chan []byte
Run() error
StopWithError(err error)
CloseErrorChan()
Send(message SendingMessage) error
Context() context.Context
Stop()
}
// Initiator provides the client-side service functionality.
type Initiator struct {
conn *Conn
handler InitiatorHandler
ctx context.Context
cancel context.CancelFunc
}
// NewInitiator creates a new Initiator instance.
func NewInitiator(conn net.Conn, handler InitiatorHandler, bufSize int, writeDeadline time.Duration) *Initiator {
c := &Initiator{handler: handler}
c.ctx, c.cancel = context.WithCancel(context.Background())
c.conn = NewConn(c.ctx, conn, bufSize, writeDeadline)
return c
}
// Close is used to cancel the specified Initiator context.
func (c *Initiator) Close() {
c.conn.Close()
c.cancel()
}
// Send is used to send a FIX message.
func (c *Initiator) Send(message SendingMessage) error {
return c.handler.Send(message)
}
// Serve is used to initiate the procedure of delivering messages.
func (c *Initiator) Serve() error {
eg := errgroup.Group{}
defer c.Close()
defer c.handler.CloseErrorChan()
stopHandler := sync.Once{}
eg.Go(func() error {
defer c.Close()
err := c.conn.serve()
if err != nil {
err = fmt.Errorf("%s: %w", err, ErrConnClosed)
defer stopHandler.Do(func() {
c.handler.StopWithError(err)
})
}
return err
})
eg.Go(func() error {
defer c.Close()
return c.handler.Run()
})
eg.Go(func() error {
defer c.Close()
for {
select {
case <-c.ctx.Done():
return nil
case msg, ok := <-c.handler.Outgoing():
if !ok {
return fmt.Errorf("outgoing chan is closed")
}
err := c.conn.Write(msg)
if err != nil {
c.handler.Stop()
return ErrConnClosed
}
}
}
})
eg.Go(func() error {
defer c.Close()
select {
case <-c.handler.Context().Done():
stopHandler.Do(func() {})
case <-c.ctx.Done():
stopHandler.Do(func() {
c.handler.StopWithError(nil)
})
}
return nil
})
eg.Go(func() error {
defer c.Close()
for {
select {
case <-c.ctx.Done():
return nil
case msg, ok := <-c.conn.Reader():
if !ok {
continue
}
c.handler.ServeIncoming(msg)
}
}
})
err := eg.Wait()
if err != nil {
stopHandler.Do(func() {
c.handler.StopWithError(err)
})
return fmt.Errorf("stop handler: %w", err)
}
return nil
}