Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve raw input scheduler #201

Merged
merged 1 commit into from
Sep 2, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion http_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func NewHTTPClient(baseURL string, config *HTTPClientConfig) *HTTPClient {
}

if config.ResponseBufferSize == 0 {
config.ResponseBufferSize = 512 * 1024 // 500kb
config.ResponseBufferSize = 100 * 1024 // 100kb
}

client := new(HTTPClient)
Expand Down
4 changes: 2 additions & 2 deletions input_raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ func (i *RAWInput) Read(data []byte) (int, error) {
var header []byte

if msg.IsIncoming {
header = payloadHeader(RequestPayload, msg.UUID(), msg.Start)
header = payloadHeader(RequestPayload, msg.UUID(), msg.Start.UnixNano())
} else {
header = payloadHeader(ResponsePayload, msg.UUID(), msg.End-msg.RequestStart)
header = payloadHeader(ResponsePayload, msg.UUID(), msg.End.UnixNano()-msg.RequestStart.UnixNano())
}

copy(data[0:len(header)], header)
Expand Down
4 changes: 4 additions & 0 deletions proto/proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,10 @@ var httpMethods []string = []string{
}

func IsHTTPPayload(payload []byte) bool {
if len(payload) < 4 {
return false
}

method := string(payload[0:4])

for _, m := range httpMethods {
Expand Down
50 changes: 33 additions & 17 deletions raw_socket_listener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,6 @@ type Listener struct {
// Messages ready to be send to client
messagesChan chan *TCPMessage

// Used for notifications about completed or expired messages
messageDelChan chan *TCPMessage

addr string // IP to listen
port uint16 // Port to listen

Expand All @@ -57,7 +54,7 @@ type Listener struct {
}

type request struct {
start int64
start time.Time
ack uint32
}

Expand All @@ -67,7 +64,6 @@ func NewListener(addr string, port string, expire time.Duration, captureResponse

l.packetsChan = make(chan *TCPPacket, 10000)
l.messagesChan = make(chan *TCPMessage, 10000)
l.messageDelChan = make(chan *TCPMessage, 10000)
l.quit = make(chan bool)

l.messages = make(map[string]*TCPMessage)
Expand All @@ -92,28 +88,44 @@ func NewListener(addr string, port string, expire time.Duration, captureResponse
}

func (t *Listener) listen() {
gcTicker := time.Tick(t.messageExpire / 2)

for {
select {
case <-t.quit:
t.conn.Close()
return
// If message ready for deletion it means that its also complete or expired by timeout
case message := <-t.messageDelChan:
delete(t.ackAliases, message.Ack)
delete(t.messages, message.ID)
// We need to use channels to process each packet to avoid data races
case packet := <-t.packetsChan:
t.processTCPPacket(packet)

if !message.IsIncoming {
delete(t.respAliases, message.Ack)
case <- gcTicker:
now := time.Now()

for _, message := range t.messages {
if now.Sub(message.Start) >= t.messageExpire {
t.dispatchMessage(message)
}
}
}
}
}

t.messagesChan <- message
func (t *Listener) dispatchMessage(message *TCPMessage) {
delete(t.ackAliases, message.Ack)
delete(t.messages, message.ID)

// We need to use channels to process each packet to avoid data races
case packet := <-t.packetsChan:
t.processTCPPacket(packet)
if !message.IsIncoming {
delete(t.respAliases, message.Ack)

// Do not track responses which have no associated requests
if message.RequestAck == 0 {
return
}
}
t.messagesChan <- message
}

func (t *Listener) readRAWSocket() {
conn, e := net.ListenPacket("ip4:tcp", t.addr)
t.conn = conn
Expand Down Expand Up @@ -212,8 +224,7 @@ func (t *Listener) processTCPPacket(packet *TCPPacket) {
message, ok := t.messages[mID]

if !ok {
// We sending messageDelChan channel, so message object can communicate with Listener and notify it if message completed
message = NewTCPMessage(mID, t.messageDelChan, packet.Ack, &t.messageExpire, isIncoming)
message = NewTCPMessage(mID, packet.Ack, isIncoming)
t.messages[mID] = message

if !isIncoming && responseRequest != nil {
Expand Down Expand Up @@ -246,6 +257,11 @@ func (t *Listener) processTCPPacket(packet *TCPPacket) {

// Adding packet to message
message.AddPacket(packet)

// If message contains only single packet immediately dispatch it
if !message.IsMultipart() {
t.dispatchMessage(message)
}
}

// Receive TCP messages from the listener channel
Expand Down
68 changes: 15 additions & 53 deletions raw_socket_listener/tcp_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"github.com/buger/gor/proto"
"log"
"strconv"
"sync"
"time"
)

Expand All @@ -21,47 +20,25 @@ type TCPMessage struct {
ID string // Message ID
Ack uint32
ResponseAck uint32
RequestStart int64
RequestStart time.Time
RequestAck uint32
Start int64
End int64
Start time.Time
End time.Time
IsIncoming bool

packets []*TCPPacket

timer *time.Timer // Used for expire check

delChan chan *TCPMessage

expire *time.Duration

mu sync.Mutex
}

// NewTCPMessage pointer created from a Acknowledgment number and a channel of messages readuy to be deleted
func NewTCPMessage(ID string, delChan chan *TCPMessage, Ack uint32, expire *time.Duration, IsIncoming bool) (msg *TCPMessage) {
msg = &TCPMessage{ID: ID, Ack: Ack, expire: expire, IsIncoming: IsIncoming}
msg.Start = time.Now().UnixNano()
msg.delChan = delChan // used for notifying that message completed or expired
func NewTCPMessage(ID string, Ack uint32, IsIncoming bool) (msg *TCPMessage) {
msg = &TCPMessage{ID: ID, Ack: Ack, IsIncoming: IsIncoming}
msg.Start = time.Now()

return
}

// Timeout notifies message to stop listening, close channel and message ready to be sent
func (t *TCPMessage) Timeout() {
t.mu.Lock()
if t.timer != nil {
t.timer.Stop()
}
t.mu.Unlock()

// Notify RAWListener that message is ready to be send to replay server
// Responses without requests gets discarded
if t.IsIncoming || t.RequestStart != 0 {
t.delChan <- t
}
}

// Bytes return message content
func (t *TCPMessage) Bytes() (output []byte) {
for _, p := range t.packets {
Expand All @@ -73,7 +50,9 @@ func (t *TCPMessage) Bytes() (output []byte) {

// Size returns total size of message
func (t *TCPMessage) Size() (size int) {
for _, p := range t.packets {
size += len(proto.Body(t.packets[0].Data))

for _, p := range t.packets[1:] {
size += len(p.Data)
}

Expand Down Expand Up @@ -102,29 +81,12 @@ func (t *TCPMessage) AddPacket(packet *TCPPacket) {
t.packets = append([]*TCPPacket{packet}, t.packets...)
}

t.End = time.Now().UnixNano()
}

if !t.isMultipart() {
t.Timeout()
} else {
t.mu.Lock()
// If more then 1 packet, wait for more, and set expiration
if len(t.packets) == 1 {
// Every time we receive packet we reset this timer
t.timer = time.AfterFunc(*t.expire, t.Timeout)
} else {
// Reset message timeout timer
if t.timer != nil {
t.timer.Reset(*t.expire)
}
}
t.mu.Unlock()
t.End = time.Now()
}
}

// isMultipart returns true if message contains from multiple tcp packets
func (t *TCPMessage) isMultipart() bool {
func (t *TCPMessage) IsMultipart() bool {
if len(t.packets) > 1 {
return true
}
Expand All @@ -143,7 +105,7 @@ func (t *TCPMessage) isMultipart() bool {
l, _ := strconv.Atoi(string(length))

// If content-length equal current body length
if l > 0 && l == len(proto.Body(payload)) {
if l > 0 && l == t.Size() {
return false
}
}
Expand All @@ -158,7 +120,7 @@ func (t *TCPMessage) isMultipart() bool {
l, _ := strconv.Atoi(string(length))

// If content-length equal current body length
if l > 0 && l == len(proto.Body(payload)) {
if l > 0 && l == t.Size() {
return false
}
}
Expand All @@ -171,10 +133,10 @@ func (t *TCPMessage) UUID() []byte {
var key []byte

if t.IsIncoming {
key = strconv.AppendInt(key, t.Start, 10)
key = strconv.AppendInt(key, t.Start.UnixNano(), 10)
key = strconv.AppendUint(key, uint64(t.Ack), 10)
} else {
key = strconv.AppendInt(key, t.RequestStart, 10)
key = strconv.AppendInt(key, t.RequestStart.UnixNano(), 10)
key = strconv.AppendUint(key, uint64(t.RequestAck), 10)
}

Expand Down