Skip to content

Commit 4f93700

Browse files
committed
add Set ADT and TCP Server
Reinvented some wheels. These should be useful later on.
1 parent 3f8b160 commit 4f93700

File tree

3 files changed

+348
-4
lines changed

3 files changed

+348
-4
lines changed

LICENSE.md LICENSE

+3-4
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
1+
MIT License
12

2-
The MIT License (MIT)
3-
4-
Copyright (c) 2017 Tab/jy
3+
Copyright (c) 2017 Tabjy
54

65
Permission is hereby granted, free of charge, to any person obtaining a copy
76
of this software and associated documentation files (the "Software"), to deal
@@ -19,4 +18,4 @@ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
1918
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
2019
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
2120
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22-
SOFTWARE.
21+
SOFTWARE.

common/set.go

+109
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
package common
2+
3+
import "sync"
4+
5+
// Set is a ADT that can store certain values, without any particular order,
6+
// and no repeated values. It can be understood as a unordered list with all
7+
// elements being distinct.
8+
type Set interface {
9+
Add(element interface{}) // adds the specified element to this set if it is not already present.
10+
Remove(element interface{}) bool // removes the specified element from this set if it is present
11+
Contains(element interface{}) bool // returns true if this set contains the specified element.
12+
Clear() // removes all of the elements from this set
13+
Len() int // returns the number of elements in this set (its cardinality).
14+
Empty() bool // returns true if this set contains no elements.
15+
16+
ForEach(fn func(interface{})) // apply callback function to each of the elements within the set.
17+
Filter(fn func(interface{}) bool) // apply callback function to each of the elements within the set, a element is kept if and only if callback returns true.
18+
}
19+
20+
// HashSet is a general implement of Set ADT with a underlying golang built-in
21+
// map. By natures of golang maps (where as order of iteration is undefined),
22+
// iterators are not possible to implement.
23+
type HashSet struct {
24+
items map[interface{}]bool
25+
mu sync.Mutex
26+
}
27+
28+
func NewHashSet() Set {
29+
return &HashSet{items: make(map[interface{}]bool)}
30+
}
31+
32+
// Add implements Add in Set ADT.
33+
func (s *HashSet) Add(element interface{}) {
34+
s.mu.Lock()
35+
defer s.mu.Unlock()
36+
37+
s.items[element] = true
38+
}
39+
40+
// Remove implements Remove in Set ADT.
41+
func (s *HashSet) Remove(element interface{}) bool {
42+
s.mu.Lock()
43+
defer s.mu.Unlock()
44+
45+
_, contains := s.items[element]
46+
47+
if contains {
48+
delete(s.items, element)
49+
}
50+
return contains
51+
}
52+
53+
// Contains implements Contains in Set ADT.
54+
func (s *HashSet) Contains(element interface{}) bool {
55+
// not sure is map lookup is thread safe, but just better be safe than sorry
56+
s.mu.Lock()
57+
defer s.mu.Unlock()
58+
59+
_, contains := s.items[element]
60+
return contains
61+
}
62+
63+
// Clear implements Clear in Set ADT.
64+
func (s *HashSet) Clear() {
65+
// the old map should be recycled by GC, hopefully...
66+
s.mu.Lock()
67+
defer s.mu.Unlock()
68+
69+
s.items = make(map[interface{}]bool)
70+
}
71+
72+
// Len implements Len in Set ADT.
73+
func (s *HashSet) Len() int {
74+
// this is atomic already
75+
return len(s.items)
76+
}
77+
78+
// Empty implements Empty in Set ADT.
79+
func (s *HashSet) Empty() bool {
80+
return s.Len() == 0
81+
}
82+
83+
// ForEach implements ForEach in Set ADT.
84+
//
85+
// IMPORTANT: ForEach causes the set to be locked until finish iterating all
86+
// elements. Therefore, calling any of Add, Remove, Contains, and Clear will
87+
// result in a dead lock. Do these ops in a separate goroutine!
88+
func (s *HashSet) ForEach(fn func(interface{})) {
89+
s.mu.Lock()
90+
defer s.mu.Unlock()
91+
for element := range s.items {
92+
fn(element)
93+
}
94+
}
95+
96+
// Filter implements Filter in Set ADT.
97+
//
98+
// IMPORTANT: Filter causes the set to be locked until finish iterating all
99+
// elements. Therefore, calling any of Add, Remove, Contains, and Clear will
100+
// result in a dead lock. Do these ops in a separate goroutine!
101+
func (s *HashSet) Filter(fn func(interface{}) bool) {
102+
s.mu.Lock()
103+
defer s.mu.Unlock()
104+
for element := range s.items {
105+
if keep := fn(element); !keep {
106+
delete(s.items, element)
107+
}
108+
}
109+
}

common/tcp.go

+236
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,236 @@
1+
// Package common provides a few generalized interface/struct utilities might
2+
// be handy for other packages. The goal is to reduce code redundancy.
3+
package common
4+
5+
import (
6+
"context"
7+
"errors"
8+
"io"
9+
"net"
10+
"strconv"
11+
"strings"
12+
"sync"
13+
14+
"github.com/tabjy/yagl"
15+
)
16+
17+
// ErrServerClosed is returned by the TCPServer's Serve, and ListenAndServe,
18+
// methods after a call to Shutdown or Close.
19+
var ErrServerClosed = errors.New("common: TCPServer closed")
20+
21+
// ErrServerNotListening is returned by TCPServer's Serve, and ListenAndServe,
22+
// methods if being called before calling Listen.
23+
var ErrServerNotListening = errors.New("common: TCPServer not listening")
24+
25+
type TCPHandler interface {
26+
ServeTCP(ctx context.Context, conn net.Conn)
27+
}
28+
29+
type echoHandler struct{}
30+
31+
func (h *echoHandler) ServeTCP(ctx context.Context, conn net.Conn) {
32+
go func() {
33+
<-ctx.Done() // this doesn't block forever, TCPServer call cancel after ServeTCP returns
34+
// conn.Close() // could fail with error, but it's okay
35+
yagl.Trace("echoHandler goroutine unblocks and exists")
36+
}()
37+
if _, err := io.Copy(conn, conn); err != nil {
38+
if ctx.Err() != nil {
39+
yagl.Infof("context canceled: %v", ctx.Err())
40+
} else {
41+
yagl.Errorf("fail to echo request: %v", err)
42+
}
43+
}
44+
}
45+
46+
var EchoHandler = &echoHandler{}
47+
48+
// A TCPServer defines parameters for running a TCP server. The zero value for
49+
// TCPServer is a valid configuration.
50+
type TCPServer struct {
51+
Host string // IP address or hostname to listen on. Leave empty for an unspecified address.
52+
Port uint16 // Port to listen on. A port number is automatically chosen if left empty or 0.
53+
54+
Handler TCPHandler
55+
56+
// ErrorLog specifies an optional logger
57+
// If nil, logging goes to os.Stderr via a yagl standard logger
58+
Logger yagl.Logger
59+
60+
ln net.Listener
61+
conns Set
62+
63+
ctx context.Context
64+
cancel context.CancelFunc
65+
wg sync.WaitGroup
66+
}
67+
68+
func (srv *TCPServer) logger() yagl.Logger {
69+
if srv.Logger == nil {
70+
return yagl.StdLogger()
71+
}
72+
return srv.Logger
73+
}
74+
75+
// Listen listens on srv.Host:srv.Port. If a Listener is ready created, the old
76+
// one will be closed and replaced.
77+
func (srv *TCPServer) Listen() error {
78+
addr := net.JoinHostPort(srv.Host, strconv.Itoa(int(srv.Port)))
79+
80+
ln, err := net.Listen("tcp", addr)
81+
if err != nil {
82+
srv.logger().Errorf("failed to listen on %s: %v", addr, err)
83+
return err
84+
}
85+
srv.ln = ln
86+
srv.logger().Infof("TCPServer listening on %v", srv.ln.Addr())
87+
88+
return nil
89+
}
90+
91+
// Serve accepts incoming connections on the Listener ln, creating a new
92+
// service goroutine for each. The service goroutines read requests and then
93+
// call srv.Handler to handle to them. Make sure Listen is called before
94+
// calling this function.
95+
//
96+
// Serve always returns a non-nil error. After Shutdown or Close, the returned
97+
// error is ErrServerClosed.
98+
func (srv *TCPServer) Serve() error {
99+
if srv.ln == nil {
100+
return ErrServerNotListening
101+
}
102+
103+
srv.conns = NewHashSet()
104+
105+
if srv.Handler == nil {
106+
srv.Handler = EchoHandler
107+
}
108+
109+
srv.ctx, srv.cancel = context.WithCancel(context.Background())
110+
111+
for true {
112+
conn, err := srv.ln.Accept()
113+
if err != nil {
114+
// server level error, causing server to stop
115+
// ln.Accept unblocks and returns error when ln.Close called, by design
116+
if strings.Contains(err.Error(), "use of closed network connection") {
117+
118+
return ErrServerClosed
119+
}
120+
srv.logger().Errorf("TCPServer stopping for error: %v", err)
121+
return err
122+
}
123+
124+
srv.wg.Add(1)
125+
go func() {
126+
defer func() {
127+
srv.wg.Add(-1)
128+
srv.conns.Remove(conn)
129+
}()
130+
srv.conns.Add(conn)
131+
srv.logger().Tracef("new connection from %v", conn.RemoteAddr())
132+
133+
srv.logger().Tracef("connection to be handled by %T", srv.Handler)
134+
ctx, cancel := context.WithCancel(srv.ctx)
135+
srv.Handler.ServeTCP(ctx, conn)
136+
srv.logger().Tracef("handler %T returned, connection closing...", srv.Handler)
137+
cancel()
138+
139+
// try to closed connection, even if it's closed by handler already
140+
if err := conn.Close(); err != nil {
141+
// test if because conn already closed
142+
if !strings.Contains(err.Error(), "use of closed network connection") {
143+
srv.logger().Panicf("failed to close connection from %v, %v", conn.RemoteAddr(), err)
144+
// goroutine stops here
145+
}
146+
srv.logger().Warnf("connection from %v is already closed", conn.RemoteAddr())
147+
}
148+
srv.logger().Tracef("connection from %v is now closed", conn.RemoteAddr())
149+
150+
151+
}()
152+
}
153+
154+
// this never happens due to the infinite loop
155+
return nil
156+
}
157+
158+
// ListenAndServe first call Listen, then calls Server to handle incoming
159+
// connections. If srv.Addr is blank, ":tcp" is used.
160+
//
161+
// ListenAndServe always returns a non-nil error. After Shutdown or Close, the
162+
// returned error is ErrServerClosed.
163+
func (srv *TCPServer) ListenAndServe() error {
164+
if err := srv.Listen(); err != nil {
165+
return err
166+
}
167+
168+
return srv.Serve()
169+
}
170+
171+
func (srv *TCPServer) forceCloseConns() {
172+
srv.logger().Tracef("forcing to close all connections, %d remaining", srv.conns.Len())
173+
srv.conns.ForEach(func(element interface{}) {
174+
conn, _ := element.(net.Conn)
175+
srv.logger().Tracef("forcing to close %v", conn.RemoteAddr())
176+
if err := conn.Close(); err != nil {
177+
// connection level errors, no need to deal with them, just log
178+
srv.logger().Errorf("failed to close connection: %v", err)
179+
}
180+
})
181+
srv.conns.Clear()
182+
srv.wg.Done()
183+
}
184+
185+
// Close immediately closes active net.Listener and closes all active
186+
// connections. This could be unsafe as there might be ongoing goroutines
187+
// handling connections. Close return after Serve returns.
188+
//
189+
// Close returns any error returned from closing the Server's underlying
190+
// Listener(s).
191+
func (srv *TCPServer) Close() error {
192+
srv.logger().Infof("closing server listening on %v", srv.ln.Addr())
193+
194+
// first close listener, so no more incoming connections
195+
if err := srv.ln.Close(); err != nil {
196+
srv.logger().Errorf("failed to close listener: %v", err)
197+
return err
198+
}
199+
200+
srv.cancel() // notify all handlers to finish whatever is left
201+
srv.forceCloseConns()
202+
203+
return nil
204+
}
205+
206+
// Shutdown immediately closes active net.Listener and sends close signals to
207+
// all actives connection through context; still, it's up to connections'
208+
// handlers to decide what to do with close signals. Shutdown wait for all
209+
// connection to be closed before returning.
210+
//
211+
// Best practice: wait for a certain amount of time, then just call Close to
212+
// force close all connections.
213+
//
214+
// Shutdown returns any error returned from closing the Server's underlying
215+
// Listener(s).
216+
func (srv *TCPServer) Shutdown() error {
217+
srv.logger().Infof("shutting down server listening on %v", srv.ln.Addr())
218+
219+
// first close listener, so no more incoming connections
220+
if err := srv.ln.Close(); err != nil {
221+
srv.logger().Errorf("failed to close listener: %v", err)
222+
return err
223+
}
224+
225+
srv.cancel() // notify all handler to finish whatever is left
226+
srv.logger().Infof("shutdown signal sent, waiting for all connection to be closed, %d remaining", srv.conns.Len())
227+
srv.wg.Wait()
228+
// by the time wg.Wait unblocks, all connection SHOULD be closed
229+
// but let's just check for sure, for debug purpose
230+
if srv.conns.Len() != 0 {
231+
srv.Logger.Warn("not all connection are closed upon Shutdown!")
232+
srv.forceCloseConns()
233+
}
234+
235+
return nil
236+
}

0 commit comments

Comments
 (0)