Skip to content

Commit

Permalink
router: small refactoring to remove the need for a batchconn.WriteTo(…
Browse files Browse the repository at this point in the history
…) API.

WriteTo needs to be given the destination address explicitly. WriteBatch, on
the other end, can either find it in each packet structure, or rely on the
connection's destination. WriteTo is only used to send BFD packets. It turns
out that BFD packets can also very easily be sent via the regular forwarders
that use WriteBtach.

The motivation to do that is to simplify the interface between the dataplane
and the forwarders, in view of supporting multiple underlays with possibly
very different interfaces. So the channels around the processor tasks seems
like a good universal interface.
  • Loading branch information
jiceatscion committed Nov 7, 2024
1 parent fec5ebe commit 3c07286
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 50 deletions.
13 changes: 7 additions & 6 deletions router/control/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,13 +196,14 @@ func confExternalInterfaces(dp Dataplane, cfg *Config) error {

_, owned := cfg.BR.IFs[ifID]
if !owned {
// XXX The current implementation effectively uses IP/UDP tunnels to create
// the SCION network as an overlay, with forwarding to local hosts being a special case.
// When setting up external interfaces that belong to other routers in the AS, they
// are basically IP/UDP tunnels between the two border routers, and as such is
// configured in the data plane.
// The current implementation effectively uses IP/UDP tunnels to create the SCION
// network as an overlay, with forwarding to local hosts being a special case. When
// setting up external interfaces that belong to other routers in the AS, they are
// basically IP/UDP tunnels between the two border routers. Those are described as a
// link from the internal address of the local router to the internal address of the
// sibling router; and not to the router in the remote AS.
linkInfo.Local.Addr = cfg.BR.InternalAddr
linkInfo.Remote.Addr = iface.InternalAddr
linkInfo.Remote.Addr = iface.InternalAddr // i.e. via sibling router.
// For internal BFD always use the default configuration.
linkInfo.BFD = BFD{}
}
Expand Down
59 changes: 48 additions & 11 deletions router/dataplane.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ type bfdSession interface {
// BatchConn is a connection that supports batch reads and writes.
type BatchConn interface {
ReadBatch(underlayconn.Messages) (int, error)
WriteTo([]byte, netip.AddrPort) (int, error)
WriteBatch(msgs underlayconn.Messages, flags int) (int, error)
Close() error
}
Expand Down Expand Up @@ -193,6 +192,10 @@ type DataPlane struct {

ExperimentalSCMPAuthentication bool

// The forwarding queues. Each is consumed by a forwarder process and fed by
// one bfd sender and the packet processors.
fwQs map[uint16]chan *packet

// The pool that stores all the packet buffers as described in the design document. See
// https://github.com/scionproto/scion/blob/master/doc/dev/design/BorderRouter.rst
// To avoid garbage collection, most the meta-data that is produced during the processing of a
Expand Down Expand Up @@ -228,6 +231,7 @@ var (
errBFDSessionDown = errors.New("bfd session down")
expiredHop = errors.New("expired hop")
ingressInterfaceInvalid = errors.New("ingress interface invalid")
egressInterfaceInvalid = errors.New("egress interface invalid")
macVerificationFailed = errors.New("MAC verification failed")
badPacketSize = errors.New("bad packet size")

Expand Down Expand Up @@ -361,7 +365,7 @@ func (d *DataPlane) AddExternalInterface(ifID uint16, conn BatchConn,
if conn == nil || !src.Addr.IsValid() || !dst.Addr.IsValid() {
return emptyValue
}
err := d.addExternalInterfaceBFD(ifID, conn, src, dst, cfg)
err := d.addExternalInterfaceBFD(ifID, src, dst, cfg)
if err != nil {
return serrors.Wrap("adding external BFD", err, "if_id", ifID)
}
Expand Down Expand Up @@ -439,7 +443,7 @@ func (d *DataPlane) AddRemotePeer(local, remote uint16) error {
}

// AddExternalInterfaceBFD adds the inter AS connection BFD session.
func (d *DataPlane) addExternalInterfaceBFD(ifID uint16, conn BatchConn,
func (d *DataPlane) addExternalInterfaceBFD(ifID uint16,
src, dst control.LinkEnd, cfg control.BFD) error {

if *cfg.Disable {
Expand All @@ -459,7 +463,7 @@ func (d *DataPlane) addExternalInterfaceBFD(ifID uint16, conn BatchConn,
PacketsReceived: d.Metrics.BFDPacketsReceived.With(labels),
}
}
s, err := newBFDSend(conn, src.IA, dst.IA, src.Addr, dst.Addr, ifID, d.macFactory())
s, err := newBFDSend(d, src.IA, dst.IA, src.Addr, dst.Addr, ifID, d.macFactory())
if err != nil {
return err
}
Expand Down Expand Up @@ -599,7 +603,7 @@ func (d *DataPlane) addNextHopBFD(ifID uint16, src, dst netip.AddrPort, cfg cont
}
}

s, err := newBFDSend(d.internal, d.localIA, d.localIA, src, dst, 0, d.macFactory())
s, err := newBFDSend(d, d.localIA, d.localIA, src, dst, 0, d.macFactory())
if err != nil {
return err
}
Expand All @@ -621,7 +625,6 @@ type RunConfig struct {

func (d *DataPlane) Run(ctx context.Context, cfg *RunConfig) error {
d.mtx.Lock()
d.setRunning()
d.initMetrics()

processorQueueSize := max(
Expand All @@ -630,7 +633,9 @@ func (d *DataPlane) Run(ctx context.Context, cfg *RunConfig) error {

d.initPacketPool(cfg, processorQueueSize)
procQs, fwQs, slowQs := initQueues(cfg, d.interfaces, processorQueueSize)
d.fwQs = fwQs // Shared with BFD senders

d.setRunning()
for ifID, conn := range d.interfaces {
go func(ifID uint16, conn BatchConn) {
defer log.HandlePanic()
Expand Down Expand Up @@ -759,7 +764,7 @@ func (d *DataPlane) runReceiver(ifID uint16, conn BatchConn, cfg *RunConfig,

// Give a new buffer to the msgs elements that have been used in the previous loop.
for i := 0; i < cfg.BatchSize-numReusable; i++ {
p := <-d.packetPool
p := d.getPacketFromPool()
p.reset()
packets[i] = p
msgs[i].Buffers[0] = p.rawPacket
Expand Down Expand Up @@ -805,6 +810,10 @@ func computeProcID(data []byte, numProcRoutines int, hashSeed uint32) (uint32, e
return s % uint32(numProcRoutines), nil
}

func (d *DataPlane) getPacketFromPool() *packet {
return <-d.packetPool
}

func (d *DataPlane) returnPacketToPool(pkt *packet) {
d.packetPool <- pkt
}
Expand Down Expand Up @@ -2316,7 +2325,8 @@ func updateSCIONLayer(rawPkt []byte, s slayers.SCION, buffer gopacket.SerializeB
}

type bfdSend struct {
conn BatchConn
dataPlane *DataPlane
ifID uint16
srcAddr, dstAddr netip.AddrPort
scn *slayers.SCION
ohp *onehop.Path
Expand All @@ -2326,7 +2336,7 @@ type bfdSend struct {
}

// newBFDSend creates and initializes a BFD Sender
func newBFDSend(conn BatchConn, srcIA, dstIA addr.IA, srcAddr, dstAddr netip.AddrPort,
func newBFDSend(d *DataPlane, srcIA, dstIA addr.IA, srcAddr, dstAddr netip.AddrPort,
ifID uint16, mac hash.Hash) (*bfdSend, error) {

scn := &slayers.SCION{
Expand Down Expand Up @@ -2373,8 +2383,13 @@ func newBFDSend(conn BatchConn, srcIA, dstIA addr.IA, srcAddr, dstAddr netip.Add
scn.Path = ohp
}

// bfdSend includes a reference to the dataplane. In general this must not be used until the
// dataplane is running. This is ensured by the fact that bfdSend objects are owned by bfd
// sessions, which are started by dataplane.Run() itself.

return &bfdSend{
conn: conn,
dataPlane: d,
ifID: ifID,
srcAddr: srcAddr,
dstAddr: dstAddr,
scn: scn,
Expand Down Expand Up @@ -2405,7 +2420,29 @@ func (b *bfdSend) Send(bfd *layers.BFD) error {
if err != nil {
return err
}
_, err = b.conn.WriteTo(b.buffer.Bytes(), b.dstAddr)

fwChan, ok := b.dataPlane.fwQs[b.ifID]
if !ok {
// WTF? May be we should just treat that as a panic-able offense
return egressInterfaceInvalid
}

p := b.dataPlane.getPacketFromPool()
p.reset()

// FIXME: would rather build the pkt directly into the rawPacket's buffer.
sz := copy(p.rawPacket, b.buffer.Bytes())
p.rawPacket = p.rawPacket[:sz]
if b.ifID == 0 {
// Using the internal interface: must specify the destination address
updateNetAddrFromAddrPort(p.dstAddr, b.dstAddr)
}
// No need to specify pkt.egress. It isn't used downstream from here.
select {
case fwChan <- p:
default:
b.dataPlane.returnPacketToPool(p) // Do we care enough to have a metric?
}
return err
}

Expand Down
33 changes: 16 additions & 17 deletions router/dataplane_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ func TestDataPlaneRun(t *testing.T) {
},
).Times(1)
mExternal.EXPECT().ReadBatch(gomock.Any()).Return(0, nil).AnyTimes()
mExternal.EXPECT().WriteTo(gomock.Any(), gomock.Any()).Return(0, nil).AnyTimes()
mExternal.EXPECT().WriteBatch(gomock.Any(), gomock.Any()).Return(0, nil).AnyTimes()
l := control.LinkEnd{
IA: addr.MustParseIA("1-ff00:0:1"),
Addr: netip.MustParseAddrPort("10.0.0.100:0"),
Expand Down Expand Up @@ -373,10 +373,9 @@ func TestDataPlaneRun(t *testing.T) {
},
).Times(1)
mInternal.EXPECT().ReadBatch(gomock.Any()).Return(0, nil).AnyTimes()

mInternal.EXPECT().WriteTo(gomock.Any(), gomock.Any()).DoAndReturn(
func(data []byte, _ netip.AddrPort) (int, error) {
pkt := gopacket.NewPacket(data,
mInternal.EXPECT().WriteBatch(gomock.Any(), gomock.Any()).DoAndReturn(
func(msgs underlayconn.Messages, _ int) (int, error) {
pkt := gopacket.NewPacket(msgs[0].Buffers[0],
slayers.LayerTypeSCION, gopacket.Default)
if b := pkt.Layer(layers.LayerTypeBFD); b != nil {
v := b.(*layers.BFD).YourDiscriminator
Expand All @@ -391,7 +390,7 @@ func TestDataPlaneRun(t *testing.T) {

return 0, fmt.Errorf("no valid BFD message")
}).MinTimes(1)
mInternal.EXPECT().WriteTo(gomock.Any(), gomock.Any()).Return(0, nil).AnyTimes()
mInternal.EXPECT().WriteBatch(gomock.Any(), gomock.Any()).Return(0, nil).AnyTimes()

local := netip.MustParseAddrPort("10.0.200.100:0")
_ = ret.SetKey([]byte("randomkeyformacs"))
Expand All @@ -410,9 +409,9 @@ func TestDataPlaneRun(t *testing.T) {
localAddr := netip.MustParseAddrPort("10.0.200.100:0")
remoteAddr := netip.MustParseAddrPort("10.0.200.200:0")
mInternal := mock_router.NewMockBatchConn(ctrl)
mInternal.EXPECT().WriteTo(gomock.Any(), gomock.Any()).DoAndReturn(
func(data []byte, _ netip.AddrPort) (int, error) {
pkt := gopacket.NewPacket(data,
mInternal.EXPECT().WriteBatch(gomock.Any(), gomock.Any()).DoAndReturn(
func(msgs underlayconn.Messages, _ int) (int, error) {
pkt := gopacket.NewPacket(msgs[0].Buffers[0],
slayers.LayerTypeSCION, gopacket.Default)

if b := pkt.Layer(layers.LayerTypeBFD); b == nil {
Expand Down Expand Up @@ -464,9 +463,9 @@ func TestDataPlaneRun(t *testing.T) {

mExternal := mock_router.NewMockBatchConn(ctrl)
mExternal.EXPECT().ReadBatch(gomock.Any()).Return(0, nil).AnyTimes()
mExternal.EXPECT().WriteTo(gomock.Any(), gomock.Any()).DoAndReturn(
func(data []byte, _ netip.AddrPort) (int, error) {
pkt := gopacket.NewPacket(data,
mExternal.EXPECT().WriteBatch(gomock.Any(), gomock.Any()).DoAndReturn(
func(msgs underlayconn.Messages, _ int) (int, error) {
pkt := gopacket.NewPacket(msgs[0].Buffers[0],
slayers.LayerTypeSCION, gopacket.Default)

if b := pkt.Layer(layers.LayerTypeBFD); b == nil {
Expand All @@ -491,7 +490,7 @@ func TestDataPlaneRun(t *testing.T) {
done <- struct{}{}
return 1, nil
}).MinTimes(1)
mExternal.EXPECT().WriteTo(gomock.Any(), gomock.Any()).Return(0, nil).AnyTimes()
mExternal.EXPECT().WriteBatch(gomock.Any(), gomock.Any()).Return(0, nil).AnyTimes()

local := control.LinkEnd{
IA: addr.MustParseIA("1-ff00:0:1"),
Expand Down Expand Up @@ -552,9 +551,9 @@ func TestDataPlaneRun(t *testing.T) {
).Times(1)
mExternal.EXPECT().ReadBatch(gomock.Any()).Return(0, nil).AnyTimes()

mExternal.EXPECT().WriteTo(gomock.Any(), gomock.Any()).DoAndReturn(
func(data []byte, _ netip.AddrPort) (int, error) {
pkt := gopacket.NewPacket(data,
mExternal.EXPECT().WriteBatch(gomock.Any(), gomock.Any()).DoAndReturn(
func(msgs underlayconn.Messages, _ int) (int, error) {
pkt := gopacket.NewPacket(msgs[0].Buffers[0],
slayers.LayerTypeSCION, gopacket.Default)

if b := pkt.Layer(layers.LayerTypeBFD); b != nil {
Expand All @@ -569,7 +568,7 @@ func TestDataPlaneRun(t *testing.T) {
}
return 0, fmt.Errorf("no valid BFD message")
}).MinTimes(1)
mExternal.EXPECT().WriteTo(gomock.Any(), gomock.Any()).Return(0, nil).AnyTimes()
mExternal.EXPECT().WriteBatch(gomock.Any(), gomock.Any()).Return(0, nil).AnyTimes()

local := control.LinkEnd{
IA: addr.MustParseIA("1-ff00:0:1"),
Expand Down
16 changes: 0 additions & 16 deletions router/mock_router/mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 3c07286

Please sign in to comment.