Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
sylwiaszunejko committed Oct 25, 2024
1 parent f62fc6e commit 45191a1
Show file tree
Hide file tree
Showing 11 changed files with 1,812 additions and 9 deletions.
54 changes: 54 additions & 0 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1921,3 +1921,57 @@ type ErrSchemaMismatch struct {
func (e *ErrSchemaMismatch) Error() string {
return fmt.Sprintf("gocql: cluster schema versions not consistent: %+v", e.schemas)
}

func NewMockNetConn() net.Conn {
return &mockNetConn{}
}

type mockNetConn struct{}

func (m *mockNetConn) Read(p []byte) (n int, err error) {
// data := []byte("test receive")
// n = copy(p, data)
return n, nil
}

func (m *mockNetConn) Write(p []byte) (n int, err error) {
return len(p), nil
}

func (m *mockNetConn) Close() error {
return nil
}

func (m *mockNetConn) LocalAddr() net.Addr {
return nil
}

func (m *mockNetConn) RemoteAddr() net.Addr {
return nil
}

func (m *mockNetConn) SetDeadline(t time.Time) error {
return nil
}

func (m *mockNetConn) SetReadDeadline(t time.Time) error {
return nil
}

func (m *mockNetConn) SetWriteDeadline(t time.Time) error {
return nil
}

func NewMockConn() *Conn {
return &Conn{
conn: NewMockNetConn(),
w: &deadlineContextWriter{
w: NewMockNetConn(),
timeout: 0,
semaphore: make(chan struct{}, 1),
quit: make(chan struct{}),
},
timeout: 0,
r: bufio.NewReader(NewMockNetConn()),
}
}
212 changes: 212 additions & 0 deletions dialer/recorder/recorder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
package recorder

import (
"context"
"encoding/json"
"fmt"
"io"
"net"
"os"
"path"
"time"

"github.com/gocql/gocql"
"github.com/gocql/gocql/internal/murmur"
)

func NewRecordDialer(dir string) *RecordDialer {
return &RecordDialer{
dir: dir,
}
}

type RecordDialer struct {
dir string
net.Dialer
}

func (d *RecordDialer) DialContext(ctx context.Context, network, addr string) (conn net.Conn, err error) {
fmt.Println("Dial Context Record Dialer")
sourcePort := gocql.ScyllaGetSourcePort(ctx)
fmt.Println("Source port: ", sourcePort)
// if sourcePort == 0 {
// return d.Dialer.DialContext(ctx, network, addr)
// }
dialerWithLocalAddr := d.Dialer
dialerWithLocalAddr.LocalAddr, err = net.ResolveTCPAddr(network, fmt.Sprintf(":%d", sourcePort))
if err != nil {
fmt.Println(err)
return nil, err
}

conn, err = dialerWithLocalAddr.DialContext(ctx, network, addr)
if err != nil {
return nil, err
}

return NewConnectionRecorder(path.Join(d.dir, fmt.Sprintf("%s-%d", addr, sourcePort)), conn)
}

func NewConnectionRecorder(fname string, conn net.Conn) (net.Conn, error) {
fd_writes, err := os.OpenFile(fname+"Writes", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
if err != nil {
return nil, err
}
fd_reads, err2 := os.OpenFile(fname+"Reads", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
if err2 != nil {
return nil, err2
}
return &ConnectionRecorder{fd_writes: fd_writes, fd_reads: fd_reads, orig: conn, new_write: true, new_read: true}, nil
}

type ConnectionRecorder struct {
fd_writes *os.File
fd_reads *os.File
orig net.Conn
new_write bool
new_read bool
to_write int
to_read int
read_record map[string]interface{}
write_record map[string]interface{}
}

func (c *ConnectionRecorder) Read(b []byte) (n int, err error) {
n, err = c.orig.Read(b)
if err != nil && err != io.EOF {
return n, err
}

// Only record if RESULT frame
if b[4] != 0x08 {
return n, err
}

if c.new_read {
stream_id := int(int16(b[2])<<8 | int16(b[3]))

c.read_record = make(map[string]interface{})
c.read_record["stream_id"] = stream_id
c.read_record["data"] = b[:n]

c.to_read = int(b[8])
if n < c.to_read {
c.new_read = false
c.to_read = c.to_read - n
} else {
// Write JSON record to file
jsonData, marshalErr := json.Marshal(c.read_record)
if marshalErr != nil {
return n, fmt.Errorf("failed to encode JSON record: %w", marshalErr)
}
_, writeErr := c.fd_reads.Write(append(jsonData, '\n'))
if writeErr != nil {
return n, fmt.Errorf("failed to record read: %w", writeErr)
}
}

return n, err
}

c.read_record["data"] = append(c.read_record["data"].([]byte), b[:n]...)

c.to_read = c.to_read - n
if c.to_read <= 0 {
c.new_read = true
// Write JSON record to file
jsonData, marshalErr := json.Marshal(c.read_record)
if marshalErr != nil {
return n, fmt.Errorf("failed to encode JSON record: %w", marshalErr)
}
_, writeErr := c.fd_reads.Write(append(jsonData, '\n'))
if writeErr != nil {
return n, fmt.Errorf("failed to record read: %w", writeErr)
}
}

return n, err
}

func (c *ConnectionRecorder) Write(b []byte) (n int, err error) {
n, err = c.orig.Write(b)

// Only record if EXECUTE frame
if b[4] != 0x0A {
return n, err
}

if c.new_write {
stream_id := int(b[2])<<8 | int(b[3])
writeHash := murmur.Murmur3H1(b[:9])

c.write_record = make(map[string]interface{})
c.write_record["stream_id"] = stream_id
c.write_record["hash"] = writeHash
c.write_record["data"] = b[:n]

c.to_write = int(b[8])
if n < c.to_write {
c.new_write = false
c.to_write = c.to_write - n
} else {
// Write JSON record to file
jsonData, marshalErr := json.Marshal(c.write_record)
if marshalErr != nil {
return n, fmt.Errorf("failed to encode JSON record: %w", marshalErr)
}
_, writeErr := c.fd_writes.Write(append(jsonData, '\n'))
if writeErr != nil {
return n, fmt.Errorf("failed to record write: %w", writeErr)
}
}

return n, err
}

c.write_record["data"] = append(c.write_record["data"].([]byte), b[:n]...)

c.to_write = c.to_write - n
if c.to_write <= 0 {
c.new_write = true
// Write JSON record to file
jsonData, marshalErr := json.Marshal(c.write_record)
if marshalErr != nil {
return n, fmt.Errorf("failed to encode JSON record: %w", marshalErr)
}
_, writeErr := c.fd_writes.Write(append(jsonData, '\n'))
if writeErr != nil {
return n, fmt.Errorf("failed to record write: %w", writeErr)
}
}
return n, err
}

func (c ConnectionRecorder) Close() error {
if err := c.fd_writes.Close(); err != nil {
return fmt.Errorf("failed to close the file: %w", err)
}
if err := c.fd_reads.Close(); err != nil {
return fmt.Errorf("failed to close the file: %w", err)
}
return c.orig.Close()
}

func (c ConnectionRecorder) LocalAddr() net.Addr {
return c.orig.LocalAddr()
}

func (c ConnectionRecorder) RemoteAddr() net.Addr {
return c.orig.RemoteAddr()
}

func (c ConnectionRecorder) SetDeadline(t time.Time) error {
return c.orig.SetDeadline(t)
}

func (c ConnectionRecorder) SetReadDeadline(t time.Time) error {
return c.orig.SetReadDeadline(t)
}

func (c ConnectionRecorder) SetWriteDeadline(t time.Time) error {
return c.orig.SetWriteDeadline(t)
}
Loading

0 comments on commit 45191a1

Please sign in to comment.