Skip to content

Commit

Permalink
Merge pull request #2428 from openziti/low-overhead-xg
Browse files Browse the repository at this point in the history
Add low overhead payload protocol. Fixes #2427
  • Loading branch information
plorenz committed Sep 20, 2024
2 parents be7103c + 00d761c commit 12c254b
Show file tree
Hide file tree
Showing 18 changed files with 5,436 additions and 63 deletions.
2 changes: 1 addition & 1 deletion common/inspect/circuit_inspections.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type XgressDetail struct {
XgressPointer string `json:"xgressPointer"`
LinkSendBufferPointer string `json:"linkSendBufferPointer"`
Goroutines []string `json:"goroutines"`
Sequence int32 `json:"sequence"`
Sequence uint64 `json:"sequence"`
Flags string `json:"flags"`
}

Expand Down
4 changes: 4 additions & 0 deletions router/handler_link/bind.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/openziti/ziti/router/env"
"github.com/openziti/ziti/router/forwarder"
metrics2 "github.com/openziti/ziti/router/metrics"
"github.com/openziti/ziti/router/xgress"
"github.com/openziti/ziti/router/xlink"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -79,6 +80,9 @@ func (self *bindHandler) BindChannel(binding channel.Binding) error {
binding.AddTypedReceiveHandler(newControlHandler(self.xlink, self.forwarder))
binding.AddPeekHandler(metrics2.NewChannelPeekHandler(self.xlink.Id(), self.forwarder.MetricsRegistry()))
binding.AddPeekHandler(trace.NewChannelPeekHandler(self.xlink.Id(), ch, self.forwarder.TraceController()))
if self.xlink.LinkProtocol() == "dtls" {
binding.AddTransformHandler(xgress.PayloadTransformer{})
}
if err := self.xlink.Init(self.forwarder.MetricsRegistry()); err != nil {
return err
}
Expand Down
38 changes: 38 additions & 0 deletions router/xgress/heartbeat_transformer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
Copyright NetFoundry Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package xgress

import (
"encoding/binary"
"github.com/openziti/channel/v3"
"time"
)

type PayloadTransformer struct {
}

func (self PayloadTransformer) Rx(*channel.Message, channel.Channel) {}

func (self PayloadTransformer) Tx(m *channel.Message, ch channel.Channel) {
if m.ContentType == channel.ContentTypeRaw && len(m.Body) > 1 {
if m.Body[0]&HeartbeatFlagMask != 0 && len(m.Body) > 12 {
now := time.Now().UnixNano()
m.PutUint64Header(channel.HeartbeatHeader, uint64(now))
binary.BigEndian.PutUint64(m.Body[len(m.Body)-8:], uint64(now))
}
}
}
68 changes: 45 additions & 23 deletions router/xgress/link_send_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"math"
"slices"
"sync/atomic"
"time"
)
Expand Down Expand Up @@ -161,7 +162,7 @@ func (buffer *LinkSendBuffer) Close() {
func (buffer *LinkSendBuffer) isBlocked() bool {
blocked := false

if buffer.windowsSize < buffer.linkRecvBufferSize {
if buffer.x.Options.TxPortalMaxSize < buffer.linkRecvBufferSize {
blocked = true
if !buffer.blockedByRemoteWindow {
buffer.blockedByRemoteWindow = true
Expand Down Expand Up @@ -202,17 +203,13 @@ func (buffer *LinkSendBuffer) run() {

for {
// bias acks, process all pending, since that should not block
processingAcks := true
for processingAcks {
select {
case ack := <-buffer.newlyReceivedAcks:
buffer.receiveAcknowledgement(ack)
case <-buffer.closeNotify:
buffer.close()
return
default:
processingAcks = false
}
select {
case ack := <-buffer.newlyReceivedAcks:
buffer.receiveAcknowledgement(ack)
case <-buffer.closeNotify:
buffer.close()
return
default:
}

// don't block when we're closing, since the only thing that should still be coming in is end-of-circuit
Expand All @@ -221,6 +218,21 @@ func (buffer *LinkSendBuffer) run() {
buffered = nil
} else {
buffered = buffer.newlyBuffered

select {
case txPayload := <-buffered:
buffer.buffer[txPayload.payload.GetSequence()] = txPayload
payloadSize := len(txPayload.payload.Data)
buffer.linkSendBufferSize += uint32(payloadSize)
atomic.AddInt64(&outstandingPayloads, 1)
atomic.AddInt64(&outstandingPayloadBytes, int64(payloadSize))
log.Tracef("buffering payload %v with size %v. payload buffer size: %v",
txPayload.payload.Sequence, len(txPayload.payload.Data), buffer.linkSendBufferSize)
case <-buffer.closeNotify:
buffer.close()
return
default:
}
}

select {
Expand Down Expand Up @@ -288,7 +300,7 @@ func (buffer *LinkSendBuffer) receiveAcknowledgement(ack *Acknowledgement) {
if buffer.windowsSize > buffer.x.Options.TxPortalMaxSize {
buffer.windowsSize = buffer.x.Options.TxPortalMaxSize
}
buffer.retxScale -= 0.02
buffer.retxScale -= 0.01
if buffer.retxScale < buffer.x.Options.RetxScale {
buffer.retxScale = buffer.x.Options.RetxScale
}
Expand Down Expand Up @@ -320,17 +332,27 @@ func (buffer *LinkSendBuffer) retransmit() {
log := pfxlog.ContextLogger(buffer.x.Label())

retransmitted := 0
var rtxList []*txPayload
for _, v := range buffer.buffer {
if v.isRetransmittable() && uint32(now-v.getAge()) >= buffer.retxThreshold {
v.markQueued()
retransmitter.queue(v)
retransmitted++
buffer.retransmits++
if buffer.retransmits >= buffer.x.Options.TxPortalRetxThresh {
buffer.accumulator = 0
buffer.retransmits = 0
buffer.scale(buffer.x.Options.TxPortalRetxScale)
}
age := v.getAge()
if age != math.MaxInt64 && v.isRetransmittable() && uint32(now-age) >= buffer.retxThreshold {
rtxList = append(rtxList, v)
}
}

slices.SortFunc(rtxList, func(a, b *txPayload) int {
return int(a.payload.Sequence - b.payload.Sequence)
})

for _, v := range rtxList {
v.markQueued()
retransmitter.queue(v)
retransmitted++
buffer.retransmits++
if buffer.retransmits >= buffer.x.Options.TxPortalRetxThresh {
buffer.accumulator = 0
buffer.retransmits = 0
buffer.scale(buffer.x.Options.TxPortalRetxScale)
}
}

Expand Down
17 changes: 17 additions & 0 deletions router/xgress/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const (
HeaderKeyFlags = 2258
HeaderKeyRecvBufferSize = 2259
HeaderKeyRTT = 2260
HeaderPayloadRaw = 2261

ContentTypePayloadType = 1100
ContentTypeAcknowledgementType = 1101
Expand Down Expand Up @@ -197,13 +198,25 @@ type Payload struct {
Sequence int32
Headers map[uint8][]byte
Data []byte
raw []byte
}

func (payload *Payload) GetSequence() int32 {
return payload.Sequence
}

func (payload *Payload) Marshall() *channel.Message {
if payload.raw != nil {
if payload.raw[0]&RttFlagMask != 0 {
rtt := uint16(info.NowInMilliseconds())
b0 := byte(rtt)
b1 := byte(rtt >> 8)
payload.raw[2] = b0
payload.raw[3] = b1
}
return channel.NewMessage(channel.ContentTypeRaw, payload.raw)
}

msg := channel.NewMessage(ContentTypePayloadType, payload.Data)
addPayloadHeadersToMsg(msg, payload.Headers)
msg.Headers[HeaderKeyCircuitId] = []byte(payload.CircuitId)
Expand Down Expand Up @@ -260,6 +273,10 @@ func UnmarshallPayload(msg *channel.Message) (*Payload, error) {
}
payload.Sequence = int32(sequence)

if raw, ok := msg.Headers[HeaderPayloadRaw]; ok {
payload.raw = raw
}

return payload, nil
}

Expand Down
Loading

0 comments on commit 12c254b

Please sign in to comment.