Skip to content

Commit

Permalink
Reorganizing where quic and rest of transports will be
Browse files Browse the repository at this point in the history
  • Loading branch information
0x19 committed Sep 15, 2024
1 parent b2c6d36 commit 9bdb018
Show file tree
Hide file tree
Showing 9 changed files with 103 additions and 35 deletions.
13 changes: 7 additions & 6 deletions benchmark/quic.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@ import (
"fmt"
"github.com/quic-go/quic-go"
"github.com/unpackdev/fdb"
transport_quic "github.com/unpackdev/fdb/transports/quic"
"github.com/unpackdev/fdb/types"
"io"
)

// QuicSuite represents the QUIC-specific benchmark suite.
type QuicSuite struct {
fdbInstance *fdb.FDB
quicServer *fdb.QuicServer
quicServer *transport_quic.Server
}

// NewQuicSuite creates a new QuicSuite for benchmarking.
Expand All @@ -31,7 +32,7 @@ func (qs *QuicSuite) Start() error {
return fmt.Errorf("failed to retrieve QUIC transport: %w", err)
}

quicServer, ok := quicTransport.(*fdb.QuicServer)
quicServer, ok := quicTransport.(*transport_quic.Server)
if !ok {
return fmt.Errorf("failed to cast transport to QuicServer")
}
Expand All @@ -41,11 +42,11 @@ func (qs *QuicSuite) Start() error {
return fmt.Errorf("failed to retrieve benchmark database: %w", err)
}

wHandler := fdb.NewQuicWriteHandler(db)
quicServer.RegisterHandler(fdb.WriteHandlerType, wHandler.HandleMessage)
wHandler := transport_quic.NewQuicWriteHandler(db)
quicServer.RegisterHandler(types.WriteHandlerType, wHandler.HandleMessage)

rHandler := fdb.NewQuicReadHandler(db)
quicServer.RegisterHandler(fdb.ReadHandlerType, rHandler.HandleMessage)
rHandler := transport_quic.NewQuicReadHandler(db)
quicServer.RegisterHandler(types.ReadHandlerType, rHandler.HandleMessage)

if err := quicServer.Start(); err != nil {
return fmt.Errorf("failed to start QUIC server: %w", err)
Expand Down
3 changes: 2 additions & 1 deletion fdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"github.com/pkg/errors"
"github.com/unpackdev/fdb/config"
"github.com/unpackdev/fdb/db"
transport_quic "github.com/unpackdev/fdb/transports/quic"
"github.com/unpackdev/fdb/types"
)

Expand Down Expand Up @@ -38,7 +39,7 @@ func New(ctx context.Context, cnf config.Config) (*FDB, error) {
for _, transport := range cnf.Transports {
switch t := transport.Config.(type) {
case config.QuicTransport:
quicServer, err := NewQuicServer(ctx, t)
quicServer, err := transport_quic.NewServer(ctx, t)
if err != nil {
return nil, errors.Wrap(err, "failed to create QUIC server")
}
Expand Down
62 changes: 62 additions & 0 deletions messages/message.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package messages

import (
"encoding/binary"
"fmt"
"github.com/unpackdev/fdb/types"
)

// Message struct represents a UDP message
type Message struct {
Handler types.HandlerType // The handler type (1 byte)
Key [32]byte // Fixed 32-byte key (e.g., Ethereum hash)
Data []byte // The remaining data after the key
}

// Encode encodes the Message struct into a byte slice
func (m *Message) Encode() ([]byte, error) {
// Add 4 bytes for the length of the data
msgLen := 1 + 32 + 4 + len(m.Data)
buf := make([]byte, msgLen)

// Set handler type
buf[0] = byte(m.Handler)

// Copy the 32-byte key
copy(buf[1:33], m.Key[:])

// Set the length of the data (4 bytes)
binary.BigEndian.PutUint32(buf[33:37], uint32(len(m.Data)))

// Copy the data
copy(buf[37:], m.Data)

return buf, nil
}

// Decode decodes a byte slice into a Message struct
func Decode(data []byte) (*Message, error) {
if len(data) < 37 { // 1 byte for handler + 32 bytes for key + 4 bytes for data length
return nil, fmt.Errorf("data too short, must be at least 37 bytes")
}

msg := &Message{
Handler: types.HandlerType(data[0]),
}

// Copy the 32-byte key
copy(msg.Key[:], data[1:33])

// Read the 4-byte data length
dataLen := binary.BigEndian.Uint32(data[33:37])

// Ensure the length of the remaining data matches the declared length
if len(data[37:]) < int(dataLen) {
return nil, fmt.Errorf("data length mismatch, expected %d bytes but got %d bytes", dataLen, len(data[37:]))
}

// Copy the data
msg.Data = data[37 : 37+dataLen]

return msg, nil
}
5 changes: 3 additions & 2 deletions quic_handler_read.go → transports/quic/handler_read.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package fdb
package transport_quic

import (
"encoding/binary"
"github.com/quic-go/quic-go"
"github.com/unpackdev/fdb/db"
"github.com/unpackdev/fdb/messages"
"log"
)

Expand All @@ -20,7 +21,7 @@ func NewQuicReadHandler(db db.Provider) *QuicReadHandler {
}

// HandleMessage processes the incoming message using the QuicReadHandler
func (rh *QuicReadHandler) HandleMessage(conn quic.Connection, stream quic.Stream, message *Message) {
func (rh *QuicReadHandler) HandleMessage(conn quic.Connection, stream quic.Stream, message *messages.Message) {
//log.Printf("Processing read request: Handler=%d, Key=%x", message.Handler, message.Key)

// Query the database using the key from the Message struct
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package fdb
package transport_quic

import (
"context"
Expand Down
5 changes: 3 additions & 2 deletions quic_handler_write.go → transports/quic/handler_write.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package fdb
package transport_quic

import (
"github.com/quic-go/quic-go"
"github.com/unpackdev/fdb/db"
"github.com/unpackdev/fdb/messages"
"log"
)

Expand All @@ -19,7 +20,7 @@ func NewQuicWriteHandler(db db.Provider) *QuicWriteHandler {
}

// HandleMessage processes the incoming message using the QuicWriteHandler
func (wh *QuicWriteHandler) HandleMessage(conn quic.Connection, stream quic.Stream, message *Message) {
func (wh *QuicWriteHandler) HandleMessage(conn quic.Connection, stream quic.Stream, message *messages.Message) {
// Log the message for debugging purposes
//log.Printf("Processing write request: Handler=%d, Key=%x, Data=%s", message.Handler, message.Key, string(message.Data))

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package fdb
package transport_quic

import (
"context"
Expand Down
44 changes: 23 additions & 21 deletions quic_server.go → transports/quic/server.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package fdb
package transport_quic

import (
"context"
Expand All @@ -7,19 +7,21 @@ import (
"github.com/pkg/errors"
"github.com/quic-go/quic-go"
"github.com/unpackdev/fdb/config"
"github.com/unpackdev/fdb/messages"
"github.com/unpackdev/fdb/types"
"io"
"log"
"strings"
"sync"
)

// QuicHandler function type for QUIC
type QuicHandler func(sess quic.Connection, stream quic.Stream, message *Message)
type QuicHandler func(sess quic.Connection, stream quic.Stream, message *messages.Message)

// QuicServer struct represents the QUIC server
type QuicServer struct {
// Server struct represents the QUIC server
type Server struct {
ctx context.Context
handlerRegistry map[HandlerType]QuicHandler
handlerRegistry map[types.HandlerType]QuicHandler
cnf config.QuicTransport
tlsConfig *tls.Config
stopChan chan struct{}
Expand All @@ -28,16 +30,16 @@ type QuicServer struct {
listener *quic.Listener
}

// NewQuicServer creates a new QuicServer instance
func NewQuicServer(ctx context.Context, cnf config.QuicTransport) (*QuicServer, error) {
// NewServer creates a new QuicServer instance
func NewServer(ctx context.Context, cnf config.QuicTransport) (*Server, error) {
tlsConfig, tcErr := cnf.GetTLSConfig()
if tcErr != nil {
return nil, errors.Wrapf(tcErr, "could not get TLS config for quic transport")
}

server := &QuicServer{
server := &Server{
ctx: ctx,
handlerRegistry: make(map[HandlerType]QuicHandler),
handlerRegistry: make(map[types.HandlerType]QuicHandler),
cnf: cnf,
tlsConfig: tlsConfig,
stopChan: make(chan struct{}),
Expand All @@ -48,12 +50,12 @@ func NewQuicServer(ctx context.Context, cnf config.QuicTransport) (*QuicServer,
}

// Addr returns the server address as a string.
func (s *QuicServer) Addr() string {
func (s *Server) Addr() string {
return s.cnf.Addr()
}

// Start starts the QUIC server
func (s *QuicServer) Start() error {
func (s *Server) Start() error {
var err error
s.listener, err = quic.ListenAddr(s.cnf.Addr(), s.tlsConfig, nil)
if err != nil {
Expand All @@ -71,7 +73,7 @@ func (s *QuicServer) Start() error {
}

// acceptConnections continuously accepts new QUIC connections
func (s *QuicServer) acceptConnections() {
func (s *Server) acceptConnections() {
defer s.wg.Done()

for {
Expand All @@ -91,7 +93,7 @@ func (s *QuicServer) acceptConnections() {
}
}

func (s *QuicServer) handleConnection(conn quic.Connection) {
func (s *Server) handleConnection(conn quic.Connection) {
defer s.wg.Done()

// Continuously accept and handle new streams on the connection
Expand Down Expand Up @@ -131,7 +133,7 @@ func isClosedNetworkConnectionError(err error) bool {
}

// handleStream handles incoming QUIC streams
func (s *QuicServer) handleStream(conn quic.Connection, stream quic.Stream) {
func (s *Server) handleStream(conn quic.Connection, stream quic.Stream) {
defer s.wg.Done()

// Continuously read from the stream until it's closed
Expand Down Expand Up @@ -164,7 +166,7 @@ func (s *QuicServer) handleStream(conn quic.Connection, stream quic.Stream) {
}

// Step 2: Decode the message from the buffer
message, err := Decode(buffer[:n])
message, err := messages.Decode(buffer[:n])
if err != nil {
log.Printf("Error decoding message: %v", err)
return
Expand All @@ -186,24 +188,24 @@ func (s *QuicServer) handleStream(conn quic.Connection, stream quic.Stream) {
}

// Stop stops the QUIC server
func (s *QuicServer) Stop() {
func (s *Server) Stop() {
close(s.stopChan)
s.listener.Close()
s.wg.Wait()
}

// WaitStarted returns a channel that is closed when the server has started
func (s *QuicServer) WaitStarted() <-chan struct{} {
func (s *Server) WaitStarted() <-chan struct{} {
return s.started
}

// parseActionType parses the action type from the first byte of the stream
func (s *QuicServer) parseActionType(frame []byte) (HandlerType, error) {
func (s *Server) parseActionType(frame []byte) (types.HandlerType, error) {
if len(frame) < 1 {
return 0, fmt.Errorf("invalid action: frame too short")
}

var actionType HandlerType
var actionType types.HandlerType
err := actionType.FromByte(frame[0])
if err != nil {
return 0, err
Expand All @@ -213,11 +215,11 @@ func (s *QuicServer) parseActionType(frame []byte) (HandlerType, error) {
}

// RegisterHandler registers a handler for a specific action
func (s *QuicServer) RegisterHandler(actionType HandlerType, handler QuicHandler) {
func (s *Server) RegisterHandler(actionType types.HandlerType, handler QuicHandler) {
s.handlerRegistry[actionType] = handler
}

// DeregisterHandler deregisters a handler for a specific action
func (s *QuicServer) DeregisterHandler(actionType HandlerType) {
func (s *Server) DeregisterHandler(actionType types.HandlerType) {
delete(s.handlerRegistry, actionType)
}
2 changes: 1 addition & 1 deletion quic_server_test.go → transports/quic/server_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package fdb
package transport_quic

import (
"context"
Expand Down

0 comments on commit 9bdb018

Please sign in to comment.