Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework face API and fix transports #67

Merged
merged 18 commits into from
Dec 16, 2024
75 changes: 75 additions & 0 deletions executor/profiler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package executor

import (
"os"
"runtime"
"runtime/pprof"

"github.com/named-data/YaNFD/core"
)

type Profiler struct {
config *YaNFDConfig
cpuFile *os.File
memFile *os.File
block *pprof.Profile
}

func NewProfiler(config *YaNFDConfig) *Profiler {
return &Profiler{config: config}
}

func (p *Profiler) Start() (err error) {
if p.config.CpuProfile != "" {
p.cpuFile, err = os.Create(p.config.CpuProfile)
if err != nil {
core.LogFatal("Main", "Unable to open output file for CPU profile: ", err)
}

core.LogInfo("Main", "Profiling CPU - outputting to ", p.config.CpuProfile)
pprof.StartCPUProfile(p.cpuFile)
}

if p.config.MemProfile != "" {
memProfileFile, err := os.Create(p.config.MemProfile)
if err != nil {
core.LogFatal("Main", "Unable to open output file for memory profile: ", err)
}

core.LogInfo("Main", "Profiling memory - outputting to ", p.config.MemProfile)
runtime.GC()
if err := pprof.WriteHeapProfile(memProfileFile); err != nil {
core.LogFatal("Main", "Unable to write memory profile: ", err)
}
}

if p.config.BlockProfile != "" {
core.LogInfo("Main", "Profiling blocking operations - outputting to ", p.config.BlockProfile)
runtime.SetBlockProfileRate(1)
p.block = pprof.Lookup("block")
}

return
}

func (p *Profiler) Stop() {
if p.block != nil {
blockProfileFile, err := os.Create(p.config.BlockProfile)
if err != nil {
core.LogFatal("Main", "Unable to open output file for block profile: ", err)
}
if err := p.block.WriteTo(blockProfileFile, 0); err != nil {
core.LogFatal("Main", "Unable to write block profile: ", err)
}
blockProfileFile.Close()
}

if p.memFile != nil {
p.memFile.Close()
}

if p.cpuFile != nil {
pprof.StopCPUProfile()
p.cpuFile.Close()
}
}
105 changes: 25 additions & 80 deletions executor/yanfd.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ package executor
import (
"net"
"os"
"runtime"
"runtime/pprof"
"time"

"github.com/named-data/YaNFD/core"
Expand All @@ -38,15 +36,13 @@ type YaNFDConfig struct {
// YaNFD is the wrapper class for the NDN Forwarding Daemon.
// Note: only one instance of this class should be created.
type YaNFD struct {
config *YaNFDConfig

cpuProfileFile *os.File
memProfileFile *os.File
blockProfiler *pprof.Profile
config *YaNFDConfig
profiler *Profiler

unixListener *face.UnixStreamListener
wsListener *face.WebSocketListener
tcpListeners []*face.TCPListener
udpListener *face.UDPListener
}

// NewYaNFD creates a YaNFD. Don't call this function twice.
Expand All @@ -68,33 +64,9 @@ func NewYaNFD(config *YaNFDConfig) *YaNFD {
table.Configure()
mgmt.Configure()

// Initialize profiling
var cpuProfileFile *os.File
var memProfileFile *os.File
var blockProfiler *pprof.Profile
var err error
if config.CpuProfile != "" {
cpuProfileFile, err = os.Create(config.CpuProfile)
if err != nil {
core.LogFatal("Main", "Unable to open output file for CPU profile: ", err)
}

core.LogInfo("Main", "Profiling CPU - outputting to ", config.CpuProfile)
pprof.StartCPUProfile(cpuProfileFile)
}

if config.BlockProfile != "" {
core.LogInfo("Main", "Profiling blocking operations - outputting to ", config.BlockProfile)
runtime.SetBlockProfileRate(1)
blockProfiler = pprof.Lookup("block")
// Output at end of runtime
}

return &YaNFD{
config: config,
cpuProfileFile: cpuProfileFile,
memProfileFile: memProfileFile,
blockProfiler: blockProfiler,
config: config,
profiler: NewProfiler(config),
}
}

Expand All @@ -103,18 +75,18 @@ func NewYaNFD(config *YaNFDConfig) *YaNFD {
func (y *YaNFD) Start() {
core.LogInfo("Main", "Starting YaNFD")

// Start profiler
y.profiler.Start()

// Initialize FIB table
fibTableAlgorithm := core.GetConfigStringDefault("tables.fib.algorithm", "nametree")
table.CreateFIBTable(fibTableAlgorithm)

// Create null face
nullFace := face.MakeNullLinkService(face.MakeNullTransport())
face.FaceTable.Add(nullFace)
go nullFace.Run(nil)
face.MakeNullLinkService(face.MakeNullTransport()).Run(nil)

// Start management thread
management := mgmt.MakeMgmtThread()
go management.Run()
go mgmt.MakeMgmtThread().Run()

// Create forwarding threads
if fw.NumFwThreads < 1 || fw.NumFwThreads > fw.MaxFwThreads {
Expand Down Expand Up @@ -170,10 +142,13 @@ func (y *YaNFD) Start() {
core.LogError("Main", "Unable to create MulticastUDPTransport for ", path, " on ", iface.Name, ": ", err)
continue
}
multicastUDPFace := face.MakeNDNLPLinkService(multicastUDPTransport, face.MakeNDNLPLinkServiceOptions())
face.FaceTable.Add(multicastUDPFace)

face.MakeNDNLPLinkService(
multicastUDPTransport,
face.MakeNDNLPLinkServiceOptions(),
).Run(nil)

faceCnt += 1
go multicastUDPFace.Run(nil)
core.LogInfo("Main", "Created multicast UDP face for ", path, " on ", iface.Name)
}

Expand All @@ -184,6 +159,7 @@ func (y *YaNFD) Start() {
}
faceCnt += 1
go udpListener.Run()
y.udpListener = udpListener
core.LogInfo("Main", "Created UDP listener for ", path, " on ", iface.Name)

if tcpEnabled {
Expand All @@ -194,8 +170,8 @@ func (y *YaNFD) Start() {
}
faceCnt += 1
go tcpListener.Run()
core.LogInfo("Main", "Created TCP listener for ", path, " on ", iface.Name)
y.tcpListeners = append(y.tcpListeners, tcpListener)
core.LogInfo("Main", "Created TCP listener for ", path, " on ", iface.Name)
}
}
}
Expand Down Expand Up @@ -240,28 +216,22 @@ func (y *YaNFD) Stop() {
core.LogInfo("Main", "Forwarder shutting down ...")
core.ShouldQuit = true

if y.config.MemProfile != "" {
memProfileFile, err := os.Create(y.config.MemProfile)
if err != nil {
core.LogFatal("Main", "Unable to open output file for memory profile: ", err)
}

core.LogInfo("Main", "Profiling memory - outputting to ", y.config.MemProfile)
runtime.GC()
if err := pprof.WriteHeapProfile(memProfileFile); err != nil {
core.LogFatal("Main", "Unable to write memory profile: ", err)
}
}
// Stop profiler
y.profiler.Stop()

// Wait for unix socket listener to quit
if y.unixListener != nil {
y.unixListener.Close()
<-y.unixListener.HasQuit
}
if y.wsListener != nil {
y.wsListener.Close()
}

// Wait for UDP listener to quit
if y.udpListener != nil {
y.udpListener.Close()
}

// Wait for TCP listeners to quit
for _, tcpListener := range y.tcpListeners {
tcpListener.Close()
Expand All @@ -272,12 +242,6 @@ func (y *YaNFD) Stop() {
face.Close()
}

// Wait for all faces to quit
for _, face := range face.FaceTable.GetAll() {
core.LogTrace("Main", "Waiting for face ", face, " to quit")
<-face.GetHasQuit()
}

// Tell all forwarding threads to quit
for _, fw := range fw.Threads {
fw.TellToQuit()
Expand All @@ -287,23 +251,4 @@ func (y *YaNFD) Stop() {
for _, fw := range fw.Threads {
<-fw.HasQuit
}

// Shutdown Profilers
if y.config.BlockProfile != "" {
blockProfileFile, err := os.Create(y.config.BlockProfile)
if err != nil {
core.LogFatal("Main", "Unable to open output file for block profile: ", err)
}
if err := y.blockProfiler.WriteTo(blockProfileFile, 0); err != nil {
core.LogFatal("Main", "Unable to write block profile: ", err)
}
blockProfileFile.Close()
}
if y.config.MemProfile != "" {
y.memProfileFile.Close()
}
if y.config.CpuProfile != "" {
pprof.StopCPUProfile()
y.cpuProfileFile.Close()
}
}
Loading
Loading