Skip to content

Commit

Permalink
Merge pull request #75 from yowcow/dev-handler-multiplexer
Browse files Browse the repository at this point in the history
multiplex storage
  • Loading branch information
yowcow authored May 28, 2018
2 parents 88e6f0d + a3e1797 commit 77e12f1
Show file tree
Hide file tree
Showing 5 changed files with 199 additions and 142 deletions.
50 changes: 50 additions & 0 deletions handler/multiplexer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package handler

import (
"fmt"
)

type handlerMap map[string]Handler
type nsHandlerMap map[string]NSHandler

type Multiplexer struct {
handlers handlerMap
nshandlers nsHandlerMap
}

func NewMultiplexer() *Multiplexer {
return &Multiplexer{
handlerMap{},
nsHandlerMap{},
}
}

func (m *Multiplexer) RegisterHandler(name string, hdr Handler) error {
if _, ok := m.handlers[name]; ok {
return fmt.Errorf("handler with name '%s' already registered", name)
}
m.handlers[name] = hdr
return nil
}

func (m *Multiplexer) RegisterNSHandler(name string, hdr NSHandler) error {
if _, ok := m.nshandlers[name]; ok {
return fmt.Errorf("nshandler with name '%s' already registered", name)
}
m.nshandlers[name] = hdr
return nil
}

func (m Multiplexer) GetHandler(name string) (Handler, error) {
if hdr, ok := m.handlers[name]; ok {
return hdr, nil
}
return nil, fmt.Errorf("handler with name '%s' not registered", name)
}

func (m Multiplexer) GetNSHandler(name string) (NSHandler, error) {
if hdr, ok := m.nshandlers[name]; ok {
return hdr, nil
}
return nil, fmt.Errorf("nshandler with name '%s' not registered", name)
}
90 changes: 90 additions & 0 deletions handler/multiplexer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package handler

import (
"fmt"
"testing"

"github.com/stretchr/testify/assert"
"github.com/yowcow/goromdb/loader"
)

var (
_ Handler = (*testHandler)(nil)
_ NSHandler = (*testNSHandler)(nil)
)

type testHandler struct {
name string
}

func (h *testHandler) Get(k []byte) ([]byte, error) {
return []byte(fmt.Sprintf("get %s from %s", string(k), h.name)), nil
}

func (h *testHandler) Load(file string) error {
return nil
}

func (h *testHandler) Start(filein <-chan string, ldr *loader.Loader) <-chan bool {
return nil
}

type testNSHandler struct {
testHandler
}

func (h *testNSHandler) GetNS(ns, k []byte) ([]byte, error) {
return []byte(fmt.Sprintf("get %s in ns %s from %s", string(k), string(ns), h.name)), nil
}

func TestRegisterHandler(t *testing.T) {
m := NewMultiplexer()

err := m.RegisterHandler("h1", new(testHandler))

assert.Nil(t, err)

err = m.RegisterHandler("h1", new(testHandler))

assert.NotNil(t, err)
}

func TestRegisterNSHandler(t *testing.T) {
m := NewMultiplexer()

err := m.RegisterNSHandler("h1", new(testNSHandler))

assert.Nil(t, err)

err = m.RegisterNSHandler("h1", new(testNSHandler))

assert.NotNil(t, err)
}

func TestGetHandler(t *testing.T) {
m := NewMultiplexer()
_ = m.RegisterHandler("h1", &testHandler{"h1"})
hdr, err := m.GetHandler("h1")

assert.NotNil(t, hdr)
assert.Nil(t, err)

hdr, err = m.GetHandler("h2")

assert.Nil(t, hdr)
assert.NotNil(t, err)
}

func TestGetNSHandler(t *testing.T) {
m := NewMultiplexer()
_ = m.RegisterNSHandler("h1", &testNSHandler{testHandler{"h1"}})
hdr, err := m.GetNSHandler("h1")

assert.NotNil(t, hdr)
assert.Nil(t, err)

hdr, err = m.GetNSHandler("h2")

assert.Nil(t, hdr)
assert.NotNil(t, err)
}
16 changes: 14 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"flag"
"fmt"
"log"
"net"
"os"

"github.com/yowcow/goromdb/handler"
Expand Down Expand Up @@ -90,8 +91,19 @@ func main() {
os.Getpid(), addr, protoBackend, handlerBackend, storageBackend, file,
)

svr := server.New("tcp", addr, proto, h, logger)
err = svr.Start()
svr := server.New("tcp", addr, logger)
err = svr.Start(server.OnReadCallbackFunc(func(conn net.Conn, line []byte, logger *log.Logger) {
if keys, err := proto.Parse(line); err != nil {
logger.Printf("server failed parsing a line: %s", err)
} else {
for _, k := range keys {
if v, _ := h.Get(k); v != nil {
proto.Reply(conn, k, v)
}
}
}
proto.Finish(conn)
}))
if err != nil {
logger.Printf("failed booting goromdb: %s", err.Error())
os.Exit(1)
Expand Down
35 changes: 12 additions & 23 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,24 @@ import (
"io"
"log"
"net"

"github.com/yowcow/goromdb/handler"
"github.com/yowcow/goromdb/protocol"
)

type OnReadCallbackFunc func(net.Conn, []byte, *log.Logger)

// Server represents a server
type Server struct {
network string
addr string
protocol protocol.Protocol
handler handler.Handler
logger *log.Logger
network string
addr string
logger *log.Logger
}

// New creates a new server
func New(network, addr string, p protocol.Protocol, h handler.Handler, logger *log.Logger) *Server {
return &Server{network, addr, p, h, logger}
func New(network, addr string, logger *log.Logger) *Server {
return &Server{network, addr, logger}
}

// Start starts a server and spawns a goroutine when a new connection is accepted
func (s Server) Start() error {
func (s Server) Start(callback OnReadCallbackFunc) error {
ln, err := net.Listen(s.network, s.addr)
if err != nil {
return err
Expand All @@ -35,13 +32,13 @@ func (s Server) Start() error {
if err != nil {
s.logger.Printf("server failed accepting a conn: %s", err.Error())
} else {
go s.HandleConn(conn)
go s.HandleConn(conn, callback)
}
}
}

// HandleConn handles a net.Conn
func (s Server) HandleConn(conn net.Conn) {
func (s Server) HandleConn(conn net.Conn, callback OnReadCallbackFunc) {
defer conn.Close()
r := bufio.NewReader(conn)
for {
Expand All @@ -53,15 +50,7 @@ func (s Server) HandleConn(conn net.Conn) {
s.logger.Printf("server failed reading a line: %s", err)
return
}
if keys, err := s.protocol.Parse(line); err != nil {
s.logger.Printf("server failed parsing a line: %s", err)
} else {
for _, k := range keys {
if v, _ := s.handler.Get(k); v != nil {
s.protocol.Reply(conn, k, v)
}
}
}
s.protocol.Finish(conn)

callback(conn, line, s.logger)
}
}
Loading

0 comments on commit 77e12f1

Please sign in to comment.