Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: lazy init pollers to avoid create any poller goroutines if netpoll is not used #306

Merged
merged 3 commits into from
Feb 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion netpoll_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,17 @@ func SetNumLoops(numLoops int) error {

// SetLoadBalance sets the load balancing method. Load balancing is always a best effort to attempt
// to distribute the incoming connections between multiple polls.
// This option only works when NumLoops is set.
// This option only works when numLoops is set.
func SetLoadBalance(lb LoadBalance) error {
return setLoadBalance(lb)
}

// Initialize the pollers actively. By default, it's lazy initialized.
// It's safe to call it multi times.
func Initialize() {
initialize()
}

func SetLoggerOutput(w io.Writer) {
setLoggerOutput(w)
}
Expand Down
118 changes: 77 additions & 41 deletions poll_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"log"
"os"
"runtime"
"sync/atomic"
)

func setNumLoops(numLoops int) error {
Expand All @@ -33,57 +34,55 @@ func setLoadBalance(lb LoadBalance) error {
return pollmanager.SetLoadBalance(lb)
}

func initialize() {
// The first call of Pick() will init pollers
_ = pollmanager.Pick()
}

func setLoggerOutput(w io.Writer) {
logger = log.New(w, "", log.LstdFlags)
}

// manage all pollers
// pollmanager manage all pollers
var pollmanager *manager
var logger *log.Logger

func init() {
var loops = runtime.GOMAXPROCS(0)/20 + 1
pollmanager = &manager{}
pollmanager.SetLoadBalance(RoundRobin)
pollmanager.SetNumLoops(loops)

pollmanager = newManager(runtime.GOMAXPROCS(0)/20 + 1)
setLoggerOutput(os.Stderr)
}

const (
managerUninitialized = iota
managerInitializing
managerInitialized
)

func newManager(numLoops int) *manager {
m := new(manager)
m.SetLoadBalance(RoundRobin)
m.SetNumLoops(numLoops)
return m
}

// LoadBalance is used to do load balancing among multiple pollers.
// a single poller may not be optimal if the number of cores is large (40C+).
type manager struct {
NumLoops int
numLoops int32
status int32 // 0: uninitialized, 1: initializing, 2: initialized
balance loadbalance // load balancing method
polls []Poll // all the polls
}

// SetNumLoops will return error when set numLoops < 1
func (m *manager) SetNumLoops(numLoops int) error {
func (m *manager) SetNumLoops(numLoops int) (err error) {
if numLoops < 1 {
return fmt.Errorf("set invalid numLoops[%d]", numLoops)
}

if numLoops < m.NumLoops {
// if less than, close the redundant pollers
var polls = make([]Poll, numLoops)
for idx := 0; idx < m.NumLoops; idx++ {
if idx < numLoops {
polls[idx] = m.polls[idx]
} else {
if err := m.polls[idx].Close(); err != nil {
logger.Printf("NETPOLL: poller close failed: %v\n", err)
}
}
}
m.NumLoops = numLoops
m.polls = polls
m.balance.Rebalance(m.polls)
return nil
}

m.NumLoops = numLoops
return m.Run()
// note: set new numLoops first and then change the status
atomic.StoreInt32(&m.numLoops, int32(numLoops))
atomic.StoreInt32(&m.status, managerUninitialized)
return nil
}

// SetLoadBalance set load balance.
Expand All @@ -96,14 +95,14 @@ func (m *manager) SetLoadBalance(lb LoadBalance) error {
}

// Close release all resources.
func (m *manager) Close() error {
func (m *manager) Close() (err error) {
for _, poll := range m.polls {
poll.Close()
err = poll.Close()
}
m.NumLoops = 0
m.numLoops = 0
m.balance = nil
m.polls = nil
return nil
return err
}

// Run all pollers.
Expand All @@ -114,16 +113,34 @@ func (m *manager) Run() (err error) {
}
}()

// new poll to fill delta.
for idx := len(m.polls); idx < m.NumLoops; idx++ {
var poll Poll
poll, err = openPoll()
if err != nil {
return
numLoops := int(atomic.LoadInt32(&m.numLoops))
if numLoops == len(m.polls) {
return nil
}
var polls = make([]Poll, numLoops)
if numLoops < len(m.polls) {
// shrink polls
copy(polls, m.polls[:numLoops])
for idx := numLoops; idx < len(m.polls); idx++ {
// close redundant polls
if err = m.polls[idx].Close(); err != nil {
logger.Printf("NETPOLL: poller close failed: %v\n", err)
}
}
} else {
// growth polls
copy(polls, m.polls)
for idx := len(m.polls); idx < numLoops; idx++ {
var poll Poll
poll, err = openPoll()
if err != nil {
return err
}
polls[idx] = poll
go poll.Wait()
}
m.polls = append(m.polls, poll)
go poll.Wait()
}
m.polls = polls

// LoadBalance must be set before calling Run, otherwise it will panic.
m.balance.Rebalance(m.polls)
Expand All @@ -141,5 +158,24 @@ func (m *manager) Reset() error {

// Pick will select the poller for use each time based on the LoadBalance.
func (m *manager) Pick() Poll {
START:
// fast path
if atomic.LoadInt32(&m.status) == managerInitialized {
return m.balance.Pick()
}
// slow path
// try to get initializing lock failed, wait others finished the init work, and try again
if !atomic.CompareAndSwapInt32(&m.status, managerUninitialized, managerInitializing) {
runtime.Gosched()
goto START
}
// adjust polls
// m.Run() will finish very quickly, so will not many goroutines block on Pick.
_ = m.Run()
joway marked this conversation as resolved.
Show resolved Hide resolved

if !atomic.CompareAndSwapInt32(&m.status, managerInitializing, managerInitialized) {
// SetNumLoops called during m.Run() which cause CAS failed
// The polls will be adjusted next Pick
}
return m.balance.Pick()
}
46 changes: 43 additions & 3 deletions poll_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package netpoll

import (
"runtime"
"sync"
"testing"
"time"
)
Expand Down Expand Up @@ -45,9 +47,47 @@ func TestPollManager(t *testing.T) {
}

func TestPollManagerReset(t *testing.T) {
n := pollmanager.NumLoops
n := pollmanager.numLoops
err := pollmanager.Reset()
MustNil(t, err)
Equal(t, len(pollmanager.polls), n)
Equal(t, pollmanager.NumLoops, n)
Equal(t, len(pollmanager.polls), int(n))
}

func TestPollManagerSetNumLoops(t *testing.T) {
pm := newManager(1)

startGs := runtime.NumGoroutine()
poll := pm.Pick()
newGs := runtime.NumGoroutine()
Assert(t, poll != nil)
Assert(t, newGs-startGs == 1, newGs, startGs)
t.Logf("old=%d, new=%d", startGs, newGs)

// change pollers
oldGs := newGs
err := pm.SetNumLoops(100)
MustNil(t, err)
newGs = runtime.NumGoroutine()
t.Logf("old=%d, new=%d", oldGs, newGs)
Assert(t, newGs == oldGs)

// trigger polls adjustment
var wg sync.WaitGroup
finish := make(chan struct{})
oldGs = startGs + 32 // 32 self goroutines
for i := 0; i < 32; i++ {
wg.Add(1)
go func() {
poll := pm.Pick()
newGs := runtime.NumGoroutine()
t.Logf("old=%d, new=%d", oldGs, newGs)
Assert(t, poll != nil)
Assert(t, newGs-oldGs == 100)
Assert(t, len(pm.polls) == 100)
wg.Done()
<-finish // hold goroutines
}()
}
wg.Wait()
close(finish)
}
Loading