diff --git a/benchmark/quic.go b/benchmark/quic.go index 820ab0a..c87eb24 100644 --- a/benchmark/quic.go +++ b/benchmark/quic.go @@ -7,6 +7,7 @@ import ( "fmt" "github.com/quic-go/quic-go" "github.com/unpackdev/fdb" + transport_quic "github.com/unpackdev/fdb/transports/quic" "github.com/unpackdev/fdb/types" "io" ) @@ -14,7 +15,7 @@ import ( // QuicSuite represents the QUIC-specific benchmark suite. type QuicSuite struct { fdbInstance *fdb.FDB - quicServer *fdb.QuicServer + quicServer *transport_quic.Server } // NewQuicSuite creates a new QuicSuite for benchmarking. @@ -31,7 +32,7 @@ func (qs *QuicSuite) Start() error { return fmt.Errorf("failed to retrieve QUIC transport: %w", err) } - quicServer, ok := quicTransport.(*fdb.QuicServer) + quicServer, ok := quicTransport.(*transport_quic.Server) if !ok { return fmt.Errorf("failed to cast transport to QuicServer") } @@ -41,11 +42,11 @@ func (qs *QuicSuite) Start() error { return fmt.Errorf("failed to retrieve benchmark database: %w", err) } - wHandler := fdb.NewQuicWriteHandler(db) - quicServer.RegisterHandler(fdb.WriteHandlerType, wHandler.HandleMessage) + wHandler := transport_quic.NewQuicWriteHandler(db) + quicServer.RegisterHandler(types.WriteHandlerType, wHandler.HandleMessage) - rHandler := fdb.NewQuicReadHandler(db) - quicServer.RegisterHandler(fdb.ReadHandlerType, rHandler.HandleMessage) + rHandler := transport_quic.NewQuicReadHandler(db) + quicServer.RegisterHandler(types.ReadHandlerType, rHandler.HandleMessage) if err := quicServer.Start(); err != nil { return fmt.Errorf("failed to start QUIC server: %w", err) diff --git a/fdb.go b/fdb.go index 4f64ef4..bce14b7 100644 --- a/fdb.go +++ b/fdb.go @@ -5,6 +5,7 @@ import ( "github.com/pkg/errors" "github.com/unpackdev/fdb/config" "github.com/unpackdev/fdb/db" + transport_quic "github.com/unpackdev/fdb/transports/quic" "github.com/unpackdev/fdb/types" ) @@ -38,7 +39,7 @@ func New(ctx context.Context, cnf config.Config) (*FDB, error) { for _, transport := range cnf.Transports { switch t := transport.Config.(type) { case config.QuicTransport: - quicServer, err := NewQuicServer(ctx, t) + quicServer, err := transport_quic.NewServer(ctx, t) if err != nil { return nil, errors.Wrap(err, "failed to create QUIC server") } diff --git a/messages/message.go b/messages/message.go new file mode 100644 index 0000000..e3bbb6d --- /dev/null +++ b/messages/message.go @@ -0,0 +1,62 @@ +package messages + +import ( + "encoding/binary" + "fmt" + "github.com/unpackdev/fdb/types" +) + +// Message struct represents a UDP message +type Message struct { + Handler types.HandlerType // The handler type (1 byte) + Key [32]byte // Fixed 32-byte key (e.g., Ethereum hash) + Data []byte // The remaining data after the key +} + +// Encode encodes the Message struct into a byte slice +func (m *Message) Encode() ([]byte, error) { + // Add 4 bytes for the length of the data + msgLen := 1 + 32 + 4 + len(m.Data) + buf := make([]byte, msgLen) + + // Set handler type + buf[0] = byte(m.Handler) + + // Copy the 32-byte key + copy(buf[1:33], m.Key[:]) + + // Set the length of the data (4 bytes) + binary.BigEndian.PutUint32(buf[33:37], uint32(len(m.Data))) + + // Copy the data + copy(buf[37:], m.Data) + + return buf, nil +} + +// Decode decodes a byte slice into a Message struct +func Decode(data []byte) (*Message, error) { + if len(data) < 37 { // 1 byte for handler + 32 bytes for key + 4 bytes for data length + return nil, fmt.Errorf("data too short, must be at least 37 bytes") + } + + msg := &Message{ + Handler: types.HandlerType(data[0]), + } + + // Copy the 32-byte key + copy(msg.Key[:], data[1:33]) + + // Read the 4-byte data length + dataLen := binary.BigEndian.Uint32(data[33:37]) + + // Ensure the length of the remaining data matches the declared length + if len(data[37:]) < int(dataLen) { + return nil, fmt.Errorf("data length mismatch, expected %d bytes but got %d bytes", dataLen, len(data[37:])) + } + + // Copy the data + msg.Data = data[37 : 37+dataLen] + + return msg, nil +} diff --git a/quic_handler_read.go b/transports/quic/handler_read.go similarity index 93% rename from quic_handler_read.go rename to transports/quic/handler_read.go index d54cbba..b028af2 100644 --- a/quic_handler_read.go +++ b/transports/quic/handler_read.go @@ -1,9 +1,10 @@ -package fdb +package transport_quic import ( "encoding/binary" "github.com/quic-go/quic-go" "github.com/unpackdev/fdb/db" + "github.com/unpackdev/fdb/messages" "log" ) @@ -20,7 +21,7 @@ func NewQuicReadHandler(db db.Provider) *QuicReadHandler { } // HandleMessage processes the incoming message using the QuicReadHandler -func (rh *QuicReadHandler) HandleMessage(conn quic.Connection, stream quic.Stream, message *Message) { +func (rh *QuicReadHandler) HandleMessage(conn quic.Connection, stream quic.Stream, message *messages.Message) { //log.Printf("Processing read request: Handler=%d, Key=%x", message.Handler, message.Key) // Query the database using the key from the Message struct diff --git a/quic_handler_read_test.go b/transports/quic/handler_read_test.go similarity index 99% rename from quic_handler_read_test.go rename to transports/quic/handler_read_test.go index 132b239..18130bf 100644 --- a/quic_handler_read_test.go +++ b/transports/quic/handler_read_test.go @@ -1,4 +1,4 @@ -package fdb +package transport_quic import ( "context" diff --git a/quic_handler_write.go b/transports/quic/handler_write.go similarity index 91% rename from quic_handler_write.go rename to transports/quic/handler_write.go index 82d922f..5dac3fc 100644 --- a/quic_handler_write.go +++ b/transports/quic/handler_write.go @@ -1,8 +1,9 @@ -package fdb +package transport_quic import ( "github.com/quic-go/quic-go" "github.com/unpackdev/fdb/db" + "github.com/unpackdev/fdb/messages" "log" ) @@ -19,7 +20,7 @@ func NewQuicWriteHandler(db db.Provider) *QuicWriteHandler { } // HandleMessage processes the incoming message using the QuicWriteHandler -func (wh *QuicWriteHandler) HandleMessage(conn quic.Connection, stream quic.Stream, message *Message) { +func (wh *QuicWriteHandler) HandleMessage(conn quic.Connection, stream quic.Stream, message *messages.Message) { // Log the message for debugging purposes //log.Printf("Processing write request: Handler=%d, Key=%x, Data=%s", message.Handler, message.Key, string(message.Data)) diff --git a/quic_handler_write_test.go b/transports/quic/handler_write_test.go similarity index 98% rename from quic_handler_write_test.go rename to transports/quic/handler_write_test.go index 4a6b1b7..8b6624e 100644 --- a/quic_handler_write_test.go +++ b/transports/quic/handler_write_test.go @@ -1,4 +1,4 @@ -package fdb +package transport_quic import ( "context" diff --git a/quic_server.go b/transports/quic/server.go similarity index 82% rename from quic_server.go rename to transports/quic/server.go index 86a7f8d..b91609f 100644 --- a/quic_server.go +++ b/transports/quic/server.go @@ -1,4 +1,4 @@ -package fdb +package transport_quic import ( "context" @@ -7,6 +7,8 @@ import ( "github.com/pkg/errors" "github.com/quic-go/quic-go" "github.com/unpackdev/fdb/config" + "github.com/unpackdev/fdb/messages" + "github.com/unpackdev/fdb/types" "io" "log" "strings" @@ -14,12 +16,12 @@ import ( ) // QuicHandler function type for QUIC -type QuicHandler func(sess quic.Connection, stream quic.Stream, message *Message) +type QuicHandler func(sess quic.Connection, stream quic.Stream, message *messages.Message) -// QuicServer struct represents the QUIC server -type QuicServer struct { +// Server struct represents the QUIC server +type Server struct { ctx context.Context - handlerRegistry map[HandlerType]QuicHandler + handlerRegistry map[types.HandlerType]QuicHandler cnf config.QuicTransport tlsConfig *tls.Config stopChan chan struct{} @@ -28,16 +30,16 @@ type QuicServer struct { listener *quic.Listener } -// NewQuicServer creates a new QuicServer instance -func NewQuicServer(ctx context.Context, cnf config.QuicTransport) (*QuicServer, error) { +// NewServer creates a new QuicServer instance +func NewServer(ctx context.Context, cnf config.QuicTransport) (*Server, error) { tlsConfig, tcErr := cnf.GetTLSConfig() if tcErr != nil { return nil, errors.Wrapf(tcErr, "could not get TLS config for quic transport") } - server := &QuicServer{ + server := &Server{ ctx: ctx, - handlerRegistry: make(map[HandlerType]QuicHandler), + handlerRegistry: make(map[types.HandlerType]QuicHandler), cnf: cnf, tlsConfig: tlsConfig, stopChan: make(chan struct{}), @@ -48,12 +50,12 @@ func NewQuicServer(ctx context.Context, cnf config.QuicTransport) (*QuicServer, } // Addr returns the server address as a string. -func (s *QuicServer) Addr() string { +func (s *Server) Addr() string { return s.cnf.Addr() } // Start starts the QUIC server -func (s *QuicServer) Start() error { +func (s *Server) Start() error { var err error s.listener, err = quic.ListenAddr(s.cnf.Addr(), s.tlsConfig, nil) if err != nil { @@ -71,7 +73,7 @@ func (s *QuicServer) Start() error { } // acceptConnections continuously accepts new QUIC connections -func (s *QuicServer) acceptConnections() { +func (s *Server) acceptConnections() { defer s.wg.Done() for { @@ -91,7 +93,7 @@ func (s *QuicServer) acceptConnections() { } } -func (s *QuicServer) handleConnection(conn quic.Connection) { +func (s *Server) handleConnection(conn quic.Connection) { defer s.wg.Done() // Continuously accept and handle new streams on the connection @@ -131,7 +133,7 @@ func isClosedNetworkConnectionError(err error) bool { } // handleStream handles incoming QUIC streams -func (s *QuicServer) handleStream(conn quic.Connection, stream quic.Stream) { +func (s *Server) handleStream(conn quic.Connection, stream quic.Stream) { defer s.wg.Done() // Continuously read from the stream until it's closed @@ -164,7 +166,7 @@ func (s *QuicServer) handleStream(conn quic.Connection, stream quic.Stream) { } // Step 2: Decode the message from the buffer - message, err := Decode(buffer[:n]) + message, err := messages.Decode(buffer[:n]) if err != nil { log.Printf("Error decoding message: %v", err) return @@ -186,24 +188,24 @@ func (s *QuicServer) handleStream(conn quic.Connection, stream quic.Stream) { } // Stop stops the QUIC server -func (s *QuicServer) Stop() { +func (s *Server) Stop() { close(s.stopChan) s.listener.Close() s.wg.Wait() } // WaitStarted returns a channel that is closed when the server has started -func (s *QuicServer) WaitStarted() <-chan struct{} { +func (s *Server) WaitStarted() <-chan struct{} { return s.started } // parseActionType parses the action type from the first byte of the stream -func (s *QuicServer) parseActionType(frame []byte) (HandlerType, error) { +func (s *Server) parseActionType(frame []byte) (types.HandlerType, error) { if len(frame) < 1 { return 0, fmt.Errorf("invalid action: frame too short") } - var actionType HandlerType + var actionType types.HandlerType err := actionType.FromByte(frame[0]) if err != nil { return 0, err @@ -213,11 +215,11 @@ func (s *QuicServer) parseActionType(frame []byte) (HandlerType, error) { } // RegisterHandler registers a handler for a specific action -func (s *QuicServer) RegisterHandler(actionType HandlerType, handler QuicHandler) { +func (s *Server) RegisterHandler(actionType types.HandlerType, handler QuicHandler) { s.handlerRegistry[actionType] = handler } // DeregisterHandler deregisters a handler for a specific action -func (s *QuicServer) DeregisterHandler(actionType HandlerType) { +func (s *Server) DeregisterHandler(actionType types.HandlerType) { delete(s.handlerRegistry, actionType) } diff --git a/quic_server_test.go b/transports/quic/server_test.go similarity index 99% rename from quic_server_test.go rename to transports/quic/server_test.go index 44c298b..e0dabe9 100644 --- a/quic_server_test.go +++ b/transports/quic/server_test.go @@ -1,4 +1,4 @@ -package fdb +package transport_quic import ( "context"