-
Notifications
You must be signed in to change notification settings - Fork 1
/
client.go
147 lines (133 loc) · 2.92 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package gna
import (
"encoding/gob"
"fmt"
"net"
"sync"
"time"
)
/*Dial tries to connect to the address. If any error is encountered it returns
a nil *Client and a non-nil error.*/
func Dial(addr string) (*Client, error) {
c, err := net.Dial("tcp", addr)
if err != nil {
return nil, err
}
cli := &Client{
acu: &cliBucket{dt: make([]interface{}, 64)},
dispatcher: dispatcher{
conn: c,
enc: gob.NewEncoder(c),
dec: gob.NewDecoder(c),
rTimeout: stdReadTimeout,
wTimeout: stdWriteTimeout,
cDisp: make(chan interface{}),
shouldStart: true,
},
}
return cli, nil
}
/*Client abstracts the connection handling and communication with the server.*/
type Client struct {
acu *cliBucket
err error
started bool
dispatcher
}
/*Send sets the deadline and encodes the data*/
func (c *Client) Send(dt interface{}) error {
err := c.dispatcher.Send(dt)
if err != nil {
err = fmt.Errorf("%w while encoding: %v", err, dt)
}
return err
}
/*Dispatch is like send but it doesn't halt and doesn't guarantee delivery.
If used with a unstarted Client it panics.*/
func (c *Client) Dispatch(data interface{}) {
if c.started {
c.cDisp <- data
return
}
panic("cannot dispatch, client not started")
}
/*Recv sets the deadline and encodes the data, if used after the Client
has started it panics.*/
func (c *Client) Recv() (interface{}, error) {
if c.started {
panic("recv cannot be used safely after Client has started")
}
out, err := c.dispatcher.Recv()
if out == nil {
return nil, err
}
return out, err
}
/*RecvBatch empties the acumulator, retrieving the data*/
func (c *Client) RecvBatch() []interface{} {
if c.started {
return c.acu.consume()
}
return nil
}
/*Start starts the client receiver and dispatcher*/
func (c *Client) Start() {
c.started = true
go c.dispatcher.work()
go c.receiver()
}
/*SetTimeout sets both read and write timeout*/
func (c *Client) SetTimeout(t time.Duration) {
c.rTimeout = t // racy c:
c.wTimeout = t // racy c:
}
func (c *Client) Error() error {
if c.err != nil {
if c.dispatcher.err != nil {
return fmt.Errorf("%w, alongside: %v", c.err, c.dispatcher.err)
}
return c.err
}
if c.dispatcher.err != nil {
return c.dispatcher.err
}
return nil
}
func (c *Client) receiver() {
defer c.Close()
for {
dt, err := c.dispatcher.Recv()
if err != nil {
c.err = fmt.Errorf("recv: %w", err)
if opErr, ok := err.(*net.OpError); ok && opErr.Timeout() {
return
}
return
}
if dt != nil { // ?
c.acu.add(dt)
}
}
}
type cliBucket struct {
dt []interface{}
i int
mu sync.Mutex
}
func (is *cliBucket) add(dt interface{}) {
is.mu.Lock()
if is.i >= len(is.dt) {
is.dt = append(is.dt, make([]interface{}, 64)...)
}
is.dt[is.i] = dt
is.i++
is.mu.Unlock()
}
func (is *cliBucket) consume() []interface{} {
is.mu.Lock()
out := make([]interface{}, is.i)
copy(out, is.dt[:is.i])
is.i = 0
is.mu.Unlock()
return out
}