Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework face, link and transport #66

Closed
wants to merge 15 commits into from
32 changes: 16 additions & 16 deletions executor/yanfd.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type YaNFD struct {
unixListener *face.UnixStreamListener
wsListener *face.WebSocketListener
tcpListeners []*face.TCPListener
udpListener *face.UDPListener
}

// NewYaNFD creates a YaNFD. Don't call this function twice.
Expand Down Expand Up @@ -108,13 +109,10 @@ func (y *YaNFD) Start() {
table.CreateFIBTable(fibTableAlgorithm)

// Create null face
nullFace := face.MakeNullLinkService(face.MakeNullTransport())
face.FaceTable.Add(nullFace)
go nullFace.Run(nil)
face.MakeNullLinkService(face.MakeNullTransport()).Run(nil)

// Start management thread
management := mgmt.MakeMgmtThread()
go management.Run()
go mgmt.MakeMgmtThread().Run()

// Create forwarding threads
if fw.NumFwThreads < 1 || fw.NumFwThreads > fw.MaxFwThreads {
Expand Down Expand Up @@ -170,10 +168,13 @@ func (y *YaNFD) Start() {
core.LogError("Main", "Unable to create MulticastUDPTransport for ", path, " on ", iface.Name, ": ", err)
continue
}
multicastUDPFace := face.MakeNDNLPLinkService(multicastUDPTransport, face.MakeNDNLPLinkServiceOptions())
face.FaceTable.Add(multicastUDPFace)

face.MakeNDNLPLinkService(
multicastUDPTransport,
face.MakeNDNLPLinkServiceOptions(),
).Run(nil)

faceCnt += 1
go multicastUDPFace.Run(nil)
core.LogInfo("Main", "Created multicast UDP face for ", path, " on ", iface.Name)
}

Expand All @@ -184,6 +185,7 @@ func (y *YaNFD) Start() {
}
faceCnt += 1
go udpListener.Run()
y.udpListener = udpListener
core.LogInfo("Main", "Created UDP listener for ", path, " on ", iface.Name)

if tcpEnabled {
Expand All @@ -194,8 +196,8 @@ func (y *YaNFD) Start() {
}
faceCnt += 1
go tcpListener.Run()
core.LogInfo("Main", "Created TCP listener for ", path, " on ", iface.Name)
y.tcpListeners = append(y.tcpListeners, tcpListener)
core.LogInfo("Main", "Created TCP listener for ", path, " on ", iface.Name)
}
}
}
Expand Down Expand Up @@ -256,12 +258,16 @@ func (y *YaNFD) Stop() {
// Wait for unix socket listener to quit
if y.unixListener != nil {
y.unixListener.Close()
<-y.unixListener.HasQuit
}
if y.wsListener != nil {
y.wsListener.Close()
}

// Wait for UDP listener to quit
if y.udpListener != nil {
y.udpListener.Close()
}

// Wait for TCP listeners to quit
for _, tcpListener := range y.tcpListeners {
tcpListener.Close()
Expand All @@ -272,12 +278,6 @@ func (y *YaNFD) Stop() {
face.Close()
}

// Wait for all faces to quit
for _, face := range face.FaceTable.GetAll() {
core.LogTrace("Main", "Waiting for face ", face, " to quit")
<-face.GetHasQuit()
}

// Tell all forwarding threads to quit
for _, fw := range fw.Threads {
fw.TellToQuit()
Expand Down
105 changes: 36 additions & 69 deletions face/internal-transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
package face

import (
"runtime"
"strconv"

"github.com/named-data/YaNFD/core"
Expand Down Expand Up @@ -37,20 +36,21 @@ func MakeInternalTransport() *InternalTransport {
defn.MaxNDNPacketSize)
t.recvQueue = make(chan []byte, faceQueueSize)
t.sendQueue = make(chan []byte, faceQueueSize)
t.changeState(defn.Up)
t.running.Store(true)
return t
}

// RegisterInternalTransport creates, registers, and starts an InternalTransport.
func RegisterInternalTransport() (LinkService, *InternalTransport) {
t := MakeInternalTransport()
l := MakeNDNLPLinkService(t, NDNLPLinkServiceOptions{
IsIncomingFaceIndicationEnabled: true,
IsConsumerControlledForwardingEnabled: true,
})
FaceTable.Add(l)
go l.Run(nil)
return l, t
transport := MakeInternalTransport()

options := MakeNDNLPLinkServiceOptions()
options.IsIncomingFaceIndicationEnabled = true
options.IsConsumerControlledForwardingEnabled = true
link := MakeNDNLPLinkService(transport, options)
link.Run(nil)

return link, transport
}

func (t *InternalTransport) String() string {
Expand Down Expand Up @@ -83,7 +83,7 @@ func (t *InternalTransport) Send(netWire enc.Wire, pitToken []byte, nextHopFaceI
Fragment: netWire,
}
if len(pitToken) > 0 {
lpPkt.PitToken = pitToken
lpPkt.PitToken = append([]byte{}, pitToken...)
}
if nextHopFaceID != nil {
lpPkt.NextHopFaceId = utils.IdPtr(*nextHopFaceID)
Expand All @@ -103,27 +103,22 @@ func (t *InternalTransport) Send(netWire enc.Wire, pitToken []byte, nextHopFaceI

// Receive receives a packet from the perspective of the internal component.
func (t *InternalTransport) Receive() (enc.Wire, []byte, uint64) {
shouldContinue := true
// We need to use a for loop to silently ignore invalid packets
for shouldContinue {
select {
case frame := <-t.recvQueue:
pkt, _, err := spec.ReadPacket(enc.NewBufferReader(frame))
if err != nil {
core.LogWarn(t, "Unable to decode received block - DROP: ", err)
continue
}
lpPkt := pkt.LpPacket
if lpPkt.Fragment.Length() == 0 {
core.LogWarn(t, "Received empty fragment - DROP")
continue
}

return lpPkt.Fragment, lpPkt.PitToken, *lpPkt.IncomingFaceId
case <-t.hasQuit:
shouldContinue = false
for frame := range t.recvQueue {
packet, _, err := spec.ReadPacket(enc.NewBufferReader(frame))
if err != nil {
core.LogWarn(t, "Unable to decode received block - DROP: ", err)
continue
}

lpPkt := packet.LpPacket
if lpPkt.Fragment.Length() == 0 {
core.LogWarn(t, "Received empty fragment - DROP")
continue
}

return lpPkt.Fragment, lpPkt.PitToken, *lpPkt.IncomingFaceId
}

return nil, []byte{}, 0
}

Expand All @@ -135,54 +130,26 @@ func (t *InternalTransport) sendFrame(frame []byte) {

t.nOutBytes += uint64(len(frame))

core.LogDebug(t, "Sending frame of size ", len(frame))

frameCopy := make([]byte, len(frame))
copy(frameCopy, frame)
t.recvQueue <- frameCopy
}

func (t *InternalTransport) runReceive() {
core.LogTrace(t, "Starting receive thread")

if lockThreadsToCores {
runtime.LockOSThread()
}

for {
core.LogTrace(t, "Waiting for frame from component")
select {
case <-t.hasQuit:
return
case frame := <-t.sendQueue:
core.LogTrace(t, "Component send of size ", len(frame))

if len(frame) > defn.MaxNDNPacketSize {
core.LogWarn(t, "Component trying to send too much data - DROP")
continue
}

t.nInBytes += uint64(len(frame))

t.linkService.handleIncomingFrame(frame)
for frame := range t.sendQueue {
if len(frame) > defn.MaxNDNPacketSize {
core.LogWarn(t, "Component trying to send too much data - DROP")
continue
}
}
}

func (t *InternalTransport) changeState(new defn.State) {
if t.state == new {
return
t.nInBytes += uint64(len(frame))
t.linkService.handleIncomingFrame(frame)
}
}

core.LogInfo(t, "state: ", t.state, " -> ", new)
t.state = new

if t.state != defn.Up {
// Stop link service
t.hasQuit <- true
t.hasQuit <- true // Send again to stop any pending receives
t.linkService.tellTransportQuit()

FaceTable.Remove(t.faceID)
func (t *InternalTransport) Close() {
if t.running.Swap(false) {
close(t.recvQueue)
close(t.sendQueue)
}
}
47 changes: 19 additions & 28 deletions face/link-service.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,16 @@ type LinkService interface {
State() defn.State

// Run is the main entry point for running face thread
// optNewFrame is optional new incoming frame
Run(optNewFrame []byte)
// initial is optional new incoming frame
Run(initial []byte)

// SendPacket Add a packet to the send queue for this link service
// Add a packet to the send queue for this link service
SendPacket(packet *defn.Pkt)
// Synchronously handle an incoming frame and dispatch to fw
handleIncomingFrame(frame []byte)

// Close the face
Close()
tellTransportQuit()
GetHasQuit() chan bool

// Counters
NInInterests() uint64
Expand All @@ -60,12 +60,10 @@ type LinkService interface {

// linkServiceBase is the type upon which all link service implementations should be built
type linkServiceBase struct {
faceID uint64
transport transport
HasQuit chan bool
hasImplQuit chan bool
hasTransportQuit chan bool
sendQueue chan *defn.Pkt
faceID uint64
transport transport
stopped chan bool
sendQueue chan *defn.Pkt

// Counters
nInInterests uint64
Expand All @@ -89,23 +87,12 @@ func (l *linkServiceBase) SetFaceID(faceID uint64) {
}
}

func (l *linkServiceBase) tellTransportQuit() {
l.hasTransportQuit <- true
}

// GetHasQuit returns the channel that indicates when the face has quit.
func (l *linkServiceBase) GetHasQuit() chan bool {
return l.HasQuit
}

//
// "Constructors" and threading
//

func (l *linkServiceBase) makeLinkServiceBase() {
l.HasQuit = make(chan bool)
l.hasImplQuit = make(chan bool)
l.hasTransportQuit = make(chan bool)
l.stopped = make(chan bool)
l.sendQueue = make(chan *defn.Pkt, faceQueueSize)
}

Expand Down Expand Up @@ -170,7 +157,10 @@ func (l *linkServiceBase) ExpirationPeriod() time.Duration {

// State returns the state of the underlying transport.
func (l *linkServiceBase) State() defn.State {
return l.transport.State()
if l.transport.IsRunning() {
return defn.Up
}
return defn.Down
}

//
Expand Down Expand Up @@ -207,6 +197,11 @@ func (l *linkServiceBase) NOutBytes() uint64 {
return l.transport.NOutBytes()
}

// Close the underlying transport
func (l *linkServiceBase) Close() {
l.transport.Close()
}

//
// Forwarding pipeline
//
Expand Down Expand Up @@ -277,7 +272,3 @@ func (l *linkServiceBase) dispatchData(pkt *defn.Pkt) {
core.LogTrace(l, "Dispatched Data to thread ", thread)
dispatch.GetFWThread(thread).QueueData(pkt)
}

func (l *linkServiceBase) Close() {
l.transport.changeState(defn.Down)
}
Loading
Loading