Skip to content

Commit

Permalink
feat: lazy init pollers to avoid create any pollers if netpoll is not…
Browse files Browse the repository at this point in the history
… used
  • Loading branch information
joway committed Feb 6, 2024
1 parent c3792e8 commit c7d7926
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 42 deletions.
2 changes: 1 addition & 1 deletion netpoll_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func SetNumLoops(numLoops int) error {

// SetLoadBalance sets the load balancing method. Load balancing is always a best effort to attempt
// to distribute the incoming connections between multiple polls.
// This option only works when NumLoops is set.
// This option only works when numLoops is set.
func SetLoadBalance(lb LoadBalance) error {
return setLoadBalance(lb)
}
Expand Down
99 changes: 60 additions & 39 deletions poll_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"log"
"os"
"runtime"
"sync/atomic"
)

func setNumLoops(numLoops int) error {
Expand All @@ -37,53 +38,41 @@ func setLoggerOutput(w io.Writer) {
logger = log.New(w, "", log.LstdFlags)
}

// manage all pollers
// pollmanager manage all pollers
var pollmanager *manager
var logger *log.Logger

func init() {
var loops = runtime.GOMAXPROCS(0)/20 + 1
pollmanager = &manager{}
pollmanager.SetLoadBalance(RoundRobin)
pollmanager.SetNumLoops(loops)

pollmanager.SetNumLoops(runtime.GOMAXPROCS(0)/20 + 1)
setLoggerOutput(os.Stderr)
}

const (
managerUninitialized = iota
managerInitializing = iota
managerInitialized = iota
)

// LoadBalance is used to do load balancing among multiple pollers.
// a single poller may not be optimal if the number of cores is large (40C+).
type manager struct {
NumLoops int
numLoops int32
status int32 // 0: uninitialized, 1: initializing, 2: initialized
balance loadbalance // load balancing method
polls []Poll // all the polls
}

// SetNumLoops will return error when set numLoops < 1
func (m *manager) SetNumLoops(numLoops int) error {
func (m *manager) SetNumLoops(numLoops int) (err error) {
if numLoops < 1 {
return fmt.Errorf("set invalid numLoops[%d]", numLoops)
}

if numLoops < m.NumLoops {
// if less than, close the redundant pollers
var polls = make([]Poll, numLoops)
for idx := 0; idx < m.NumLoops; idx++ {
if idx < numLoops {
polls[idx] = m.polls[idx]
} else {
if err := m.polls[idx].Close(); err != nil {
logger.Printf("NETPOLL: poller close failed: %v\n", err)
}
}
}
m.NumLoops = numLoops
m.polls = polls
m.balance.Rebalance(m.polls)
return nil
}

m.NumLoops = numLoops
return m.Run()
// note: set numLoops first and then change the status
atomic.StoreInt32(&m.numLoops, int32(numLoops))
atomic.StoreInt32(&m.status, managerUninitialized)
return nil
}

// SetLoadBalance set load balance.
Expand All @@ -96,14 +85,14 @@ func (m *manager) SetLoadBalance(lb LoadBalance) error {
}

// Close release all resources.
func (m *manager) Close() error {
func (m *manager) Close() (err error) {
for _, poll := range m.polls {
poll.Close()
err = poll.Close()
}
m.NumLoops = 0
m.numLoops = 0
m.balance = nil
m.polls = nil
return nil
return err
}

// Run all pollers.
Expand All @@ -114,16 +103,34 @@ func (m *manager) Run() (err error) {
}
}()

// new poll to fill delta.
for idx := len(m.polls); idx < m.NumLoops; idx++ {
var poll Poll
poll, err = openPoll()
if err != nil {
return
numLoops := int(atomic.LoadInt32(&m.numLoops))
if numLoops == len(m.polls) {
return nil
}
var polls = make([]Poll, numLoops)
if numLoops < len(m.polls) {
// shrink polls
copy(polls, m.polls[:numLoops])
for idx := numLoops; idx < len(m.polls); idx++ {
// close redundant polls
if err = m.polls[idx].Close(); err != nil {
logger.Printf("NETPOLL: poller close failed: %v\n", err)
}
}
} else {
// growth polls
copy(polls, m.polls)
for idx := len(m.polls); idx < numLoops; idx++ {
var poll Poll
poll, err = openPoll()
if err != nil {
return err
}
polls[idx] = poll
go poll.Wait()
}
m.polls = append(m.polls, poll)
go poll.Wait()
}
m.polls = polls

// LoadBalance must be set before calling Run, otherwise it will panic.
m.balance.Rebalance(m.polls)
Expand All @@ -141,5 +148,19 @@ func (m *manager) Reset() error {

// Pick will select the poller for use each time based on the LoadBalance.
func (m *manager) Pick() Poll {
START:
// fast path
if atomic.LoadInt32(&m.status) == managerInitialized {
return m.balance.Pick()
}
// slow path
// try to get initializing lock failed, wait others finished the init work, and try again
if !atomic.CompareAndSwapInt32(&m.status, managerUninitialized, managerInitializing) {
runtime.Gosched()
goto START
}
// adjust polls
_ = pollmanager.Run()
atomic.StoreInt32(&m.status, managerInitialized) // initialized
return m.balance.Pick()
}
43 changes: 41 additions & 2 deletions poll_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package netpoll

import (
"runtime"
"sync"
"testing"
"time"
)
Expand Down Expand Up @@ -45,9 +47,46 @@ func TestPollManager(t *testing.T) {
}

func TestPollManagerReset(t *testing.T) {
n := pollmanager.NumLoops
n := pollmanager.numLoops
err := pollmanager.Reset()
MustNil(t, err)
Equal(t, len(pollmanager.polls), n)
Equal(t, pollmanager.NumLoops, n)
Equal(t, pollmanager.numLoops, n)
}

func TestPollManagerSetNumLoops(t *testing.T) {
startGs := runtime.NumGoroutine()
poll := pollmanager.Pick()
newGs := runtime.NumGoroutine()
Assert(t, poll != nil)
Assert(t, newGs > startGs)
t.Logf("old=%d, new=%d", startGs, newGs)

// change pollers
oldGs := newGs
err := SetNumLoops(100)
MustNil(t, err)
newGs = runtime.NumGoroutine()
t.Logf("old=%d, new=%d", oldGs, newGs)
Assert(t, newGs == oldGs)

// trigger polls adjustment
var wg sync.WaitGroup
finish := make(chan struct{})
oldGs = startGs + 32 // 32 self goroutines
for i := 0; i < 32; i++ {
wg.Add(1)
go func() {
poll = pollmanager.Pick()
newGs = runtime.NumGoroutine()
t.Logf("old=%d, new=%d", oldGs, newGs)
Assert(t, poll != nil)
Assert(t, newGs-oldGs == 100)
Assert(t, len(pollmanager.polls) == 100)
wg.Done()
<-finish // hold goroutines
}()
}
wg.Wait()
close(finish)
}

0 comments on commit c7d7926

Please sign in to comment.