Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(p2p): allocate tunnels only when needed #3259

Merged
merged 1 commit into from
Aug 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/cli/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func (r *RunCMD) Run(ctx *cliContext.Context) error {

os.Setenv("LLAMACPP_GRPC_SERVERS", tunnelEnvVar)
log.Debug().Msgf("setting LLAMACPP_GRPC_SERVERS to %s", tunnelEnvVar)
}); err != nil {
}, true); err != nil {
return err
}
}
Expand All @@ -153,7 +153,7 @@ func (r *RunCMD) Run(ctx *cliContext.Context) error {
return err
}

if err := p2p.ServiceDiscoverer(context.Background(), node, token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.FederatedID), nil); err != nil {
if err := p2p.ServiceDiscoverer(context.Background(), node, token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.FederatedID), nil, false); err != nil {
return err
}
}
Expand Down
2 changes: 1 addition & 1 deletion core/p2p/federated_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (f *FederatedServer) Start(ctx context.Context) error {

if err := ServiceDiscoverer(ctx, n, f.p2ptoken, f.service, func(servicesID string, tunnel NodeData) {
log.Debug().Msgf("Discovered node: %s", tunnel.ID)
}); err != nil {
}, true); err != nil {
return err
}

Expand Down
33 changes: 18 additions & 15 deletions core/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,11 @@ func allocateLocalService(ctx context.Context, node *node.Node, listenAddr, serv

// This is the main of the server (which keeps the env variable updated)
// This starts a goroutine that keeps LLAMACPP_GRPC_SERVERS updated with the discovered services
func ServiceDiscoverer(ctx context.Context, n *node.Node, token, servicesID string, discoveryFunc func(serviceID string, node NodeData)) error {
func ServiceDiscoverer(ctx context.Context, n *node.Node, token, servicesID string, discoveryFunc func(serviceID string, node NodeData), allocate bool) error {
if servicesID == "" {
servicesID = defaultServicesID
}
tunnels, err := discoveryTunnels(ctx, n, token, servicesID)
tunnels, err := discoveryTunnels(ctx, n, token, servicesID, allocate)
if err != nil {
return err
}
Expand All @@ -170,7 +170,7 @@ func ServiceDiscoverer(ctx context.Context, n *node.Node, token, servicesID stri
return nil
}

func discoveryTunnels(ctx context.Context, n *node.Node, token, servicesID string) (chan NodeData, error) {
func discoveryTunnels(ctx context.Context, n *node.Node, token, servicesID string, allocate bool) (chan NodeData, error) {
tunnels := make(chan NodeData)

err := n.Start(ctx)
Expand Down Expand Up @@ -209,7 +209,7 @@ func discoveryTunnels(ctx context.Context, n *node.Node, token, servicesID strin
zlog.Error().Msg("cannot unmarshal node data")
continue
}
ensureService(ctx, n, nd, k)
ensureService(ctx, n, nd, k, allocate)
muservice.Lock()
if _, ok := service[nd.Name]; ok {
tunnels <- service[nd.Name].NodeData
Expand All @@ -231,7 +231,7 @@ type nodeServiceData struct {
var service = map[string]nodeServiceData{}
var muservice sync.Mutex

func ensureService(ctx context.Context, n *node.Node, nd *NodeData, sserv string) {
func ensureService(ctx context.Context, n *node.Node, nd *NodeData, sserv string, allocate bool) {
muservice.Lock()
defer muservice.Unlock()
if ndService, found := service[nd.Name]; !found {
Expand All @@ -240,22 +240,25 @@ func ensureService(ctx context.Context, n *node.Node, nd *NodeData, sserv string
zlog.Debug().Msgf("Node %s is offline", nd.ID)
return
}

newCtxm, cancel := context.WithCancel(ctx)
// Start the service
port, err := freeport.GetFreePort()
if err != nil {
zlog.Error().Err(err).Msgf("Could not allocate a free port for %s", nd.ID)
return
}
if allocate {
// Start the service
port, err := freeport.GetFreePort()
if err != nil {
zlog.Error().Err(err).Msgf("Could not allocate a free port for %s", nd.ID)
return
}

tunnelAddress := fmt.Sprintf("127.0.0.1:%d", port)
nd.TunnelAddress = tunnelAddress
tunnelAddress := fmt.Sprintf("127.0.0.1:%d", port)
nd.TunnelAddress = tunnelAddress
go allocateLocalService(newCtxm, n, tunnelAddress, sserv)
zlog.Debug().Msgf("Starting service %s on %s", sserv, tunnelAddress)
}
service[nd.Name] = nodeServiceData{
NodeData: *nd,
CancelFunc: cancel,
}
go allocateLocalService(newCtxm, n, tunnelAddress, sserv)
zlog.Debug().Msgf("Starting service %s on %s", sserv, tunnelAddress)
} else {
// Check if the service is still alive
// if not cancel the context
Expand Down
2 changes: 1 addition & 1 deletion core/p2p/p2p_disabled.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func (f *FederatedServer) Start(ctx context.Context) error {
return fmt.Errorf("not implemented")
}

func ServiceDiscoverer(ctx context.Context, node *node.Node, token, servicesID string, fn func(string, NodeData)) error {
func ServiceDiscoverer(ctx context.Context, node *node.Node, token, servicesID string, fn func(string, NodeData), allocate bool) error {
return fmt.Errorf("not implemented")
}

Expand Down