Skip to content

Commit

Permalink
feat: add AI Remote Worker (#3168)
Browse files Browse the repository at this point in the history
This commit adds a new AI remote worker node which can be used to split worker and orchestrator machines similar to how it is done on the transcoding side.

Co-authored-by: Rafał Leszko <rafal@livepeer.org>
Co-authored-by: Rick Staa <rick.staa@outlook.com>
  • Loading branch information
3 people committed Oct 21, 2024
1 parent 4390579 commit 2c50134
Show file tree
Hide file tree
Showing 30 changed files with 6,260 additions and 1,688 deletions.
296 changes: 126 additions & 170 deletions cmd/livepeer/starter/starter.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion common/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func IgnoreRoutines() []goleak.Option {
"github.com/livepeer/go-livepeer/server.(*LivepeerServer).StartMediaServer", "github.com/livepeer/go-livepeer/core.(*RemoteTranscoderManager).Manage.func1",
"github.com/livepeer/go-livepeer/server.(*LivepeerServer).HandlePush.func1", "github.com/rjeczalik/notify.(*nonrecursiveTree).dispatch",
"github.com/rjeczalik/notify.(*nonrecursiveTree).internal", "github.com/livepeer/lpms/stream.NewBasicRTMPVideoStream.func1", "github.com/patrickmn/go-cache.(*janitor).Run",
"github.com/golang/glog.(*fileSink).flushDaemon",
"github.com/golang/glog.(*fileSink).flushDaemon", "github.com/livepeer/go-livepeer/core.(*LivepeerNode).transcodeFrames.func2",
}

res := make([]goleak.Option, 0, len(funcs2ignore))
Expand Down
15 changes: 15 additions & 0 deletions common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ var (
ErrProfName = fmt.Errorf("unknown VideoProfile profile name")

ErrAudioDurationCalculation = fmt.Errorf("audio duration calculation failed")
ErrNoExtensionsForType = fmt.Errorf("no extensions exist for mime type")

ext2mime = map[string]string{
".ts": "video/mp2t",
Expand Down Expand Up @@ -571,3 +572,17 @@ func CalculateAudioDuration(audio types.File) (int64, error) {
func ValidateServiceURI(serviceURI *url.URL) bool {
return !strings.Contains(serviceURI.Host, "0.0.0.0")
}

func ExtensionByType(contentType string) (string, error) {
contentType = strings.ToLower(contentType)
switch contentType {
case "video/mp2t":
return ".ts", nil
case "video/mp4":
return ".mp4", nil
case "image/png":
return ".png", nil
}

return "", ErrNoExtensionsForType
}
18 changes: 18 additions & 0 deletions common/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,3 +519,21 @@ func TestValidateServiceURI(t *testing.T) {
}
}
}
func TestExtensionByType(t *testing.T) {
assert := assert.New(t)

// Test valid content types
contentTypes := []string{"image/png", "video/mp4", "video/mp2t"}
expectedExtensions := []string{".png", ".mp4", ".ts"}

for i, contentType := range contentTypes {
ext, err := ExtensionByType(contentType)
assert.Nil(err)
assert.Equal(expectedExtensions[i], ext)
}

// Test invalid content type
invalidContentType := "invalid/type"
_, err := ExtensionByType(invalidContentType)
assert.Equal(ErrNoExtensionsForType, err)
}
116 changes: 110 additions & 6 deletions core/ai.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"strconv"
"strings"

"github.com/golang/glog"
"github.com/livepeer/ai-worker/worker"
)

Expand Down Expand Up @@ -64,15 +65,19 @@ func PipelineToCapability(pipeline string) (Capability, error) {
}

type AIModelConfig struct {
Pipeline string `json:"pipeline"`
ModelID string `json:"model_id"`
Pipeline string `json:"pipeline"`
ModelID string `json:"model_id"`
// used by worker
URL string `json:"url,omitempty"`
Token string `json:"token,omitempty"`
Warm bool `json:"warm,omitempty"`
PricePerUnit JSONRat `json:"price_per_unit,omitempty"`
PixelsPerUnit JSONRat `json:"pixels_per_unit,omitempty"`
Currency string `json:"currency,omitempty"`
Capacity int `json:"capacity,omitempty"`
OptimizationFlags worker.OptimizationFlags `json:"optimization_flags,omitempty"`
// used by orchestrator
Gateway string `json:"gateway"`
PricePerUnit JSONRat `json:"price_per_unit,omitempty"`
PixelsPerUnit JSONRat `json:"pixels_per_unit,omitempty"`
Currency string `json:"currency,omitempty"`
}

func ParseAIModelConfigs(config string) ([]AIModelConfig, error) {
Expand Down Expand Up @@ -112,7 +117,7 @@ func ParseAIModelConfigs(config string) ([]AIModelConfig, error) {
return configs, nil
}

// parseStepsFromModelID parses the number of inference steps from the model ID suffix.
// ParseStepsFromModelID parses the number of inference steps from the model ID suffix.
func ParseStepsFromModelID(modelID *string, defaultSteps float64) float64 {
numInferenceSteps := defaultSteps

Expand All @@ -127,3 +132,102 @@ func ParseStepsFromModelID(modelID *string, defaultSteps float64) float64 {

return numInferenceSteps
}

// AddAICapabilities adds AI capabilities to the node.
func (n *LivepeerNode) AddAICapabilities(caps *Capabilities) {
aiConstraints := caps.PerCapability()
if aiConstraints == nil {
return
}

n.Capabilities.mutex.Lock()
defer n.Capabilities.mutex.Unlock()
for aiCapability, aiConstraint := range aiConstraints {
_, capExists := n.Capabilities.constraints.perCapability[aiCapability]
if !capExists {
n.Capabilities.constraints.perCapability[aiCapability] = &CapabilityConstraints{
Models: make(ModelConstraints),
}
}

for modelId, modelConstraint := range aiConstraint.Models {
_, modelExists := n.Capabilities.constraints.perCapability[aiCapability].Models[modelId]
if modelExists {
n.Capabilities.constraints.perCapability[aiCapability].Models[modelId].Capacity += modelConstraint.Capacity
} else {
n.Capabilities.constraints.perCapability[aiCapability].Models[modelId] = &ModelConstraint{Warm: modelConstraint.Warm, Capacity: modelConstraint.Capacity}
}
}
}
}

// RemoveAICapabilities removes AI capabilities from the node.
func (n *LivepeerNode) RemoveAICapabilities(caps *Capabilities) {
aiConstraints := caps.PerCapability()
if aiConstraints == nil {
return
}

n.Capabilities.mutex.Lock()
defer n.Capabilities.mutex.Unlock()
for capability, constraint := range aiConstraints {
_, ok := n.Capabilities.constraints.perCapability[capability]
if ok {
for modelId, modelConstraint := range constraint.Models {
_, modelExists := n.Capabilities.constraints.perCapability[capability].Models[modelId]
if modelExists {
n.Capabilities.constraints.perCapability[capability].Models[modelId].Capacity -= modelConstraint.Capacity
if n.Capabilities.constraints.perCapability[capability].Models[modelId].Capacity <= 0 {
delete(n.Capabilities.constraints.perCapability[capability].Models, modelId)
}
} else {
glog.Errorf("failed to remove AI capability capacity, model does not exist pipeline=%v modelID=%v", capability, modelId)
}
}
}
}
}

func (n *LivepeerNode) ReserveAICapability(pipeline string, modelID string) error {
cap, err := PipelineToCapability(pipeline)
if err != nil {
return err
}

_, hasCap := n.Capabilities.constraints.perCapability[cap]
if hasCap {
_, hasModel := n.Capabilities.constraints.perCapability[cap].Models[modelID]
if hasModel {
n.Capabilities.mutex.Lock()
defer n.Capabilities.mutex.Unlock()
if n.Capabilities.constraints.perCapability[cap].Models[modelID].Capacity > 0 {
n.Capabilities.constraints.perCapability[cap].Models[modelID].Capacity -= 1
} else {
return fmt.Errorf("failed to reserve AI capability capacity, model capacity is 0 pipeline=%v modelID=%v", pipeline, modelID)
}
return nil
}
return fmt.Errorf("failed to reserve AI capability capacity, model does not exist pipeline=%v modelID=%v", pipeline, modelID)
}
return fmt.Errorf("failed to reserve AI capability capacity, pipeline does not exist pipeline=%v modelID=%v", pipeline, modelID)
}

func (n *LivepeerNode) ReleaseAICapability(pipeline string, modelID string) error {
cap, err := PipelineToCapability(pipeline)
if err != nil {
return err
}
_, hasCap := n.Capabilities.constraints.perCapability[cap]
if hasCap {
_, hasModel := n.Capabilities.constraints.perCapability[cap].Models[modelID]
if hasModel {
n.Capabilities.mutex.Lock()
defer n.Capabilities.mutex.Unlock()
n.Capabilities.constraints.perCapability[cap].Models[modelID].Capacity += 1

return nil
}
return fmt.Errorf("failed to release AI capability capacity, model does not exist pipeline=%v modelID=%v", pipeline, modelID)
}
return fmt.Errorf("failed to release AI capability capacity, pipeline does not exist pipeline=%v modelID=%v", pipeline, modelID)
}
Loading

0 comments on commit 2c50134

Please sign in to comment.