Skip to content

Commit

Permalink
#163: Added health tracking for the chatStream workflow
Browse files Browse the repository at this point in the history
  • Loading branch information
roma-glushko committed Mar 6, 2024
1 parent b73b086 commit 41d6b00
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 5 deletions.
29 changes: 24 additions & 5 deletions pkg/providers/lang.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ func (m LanguageModel) ChatLatency() *latency.MovingAverage {
return m.chatLatency
}

func (m LanguageModel) ChatStreamLatency() *latency.MovingAverage {
return m.chatStreamLatency
}

func (m LanguageModel) Healthy() bool {
return !m.rateLimit.Limited() && m.errBudget.HasTokens()
}
Expand All @@ -88,10 +92,10 @@ func (m *LanguageModel) Chat(ctx context.Context, request *schemas.ChatRequest)
return resp, err
}

var rle *clients.RateLimitError
var rateLimitErr *clients.RateLimitError

if errors.As(err, &rle) {
m.rateLimit.SetLimited(rle.UntilReset())
if errors.As(err, &rateLimitErr) {
m.rateLimit.SetLimited(rateLimitErr.UntilReset())

return resp, err
}
Expand All @@ -102,8 +106,23 @@ func (m *LanguageModel) Chat(ctx context.Context, request *schemas.ChatRequest)
}

func (m *LanguageModel) ChatStream(ctx context.Context, request *schemas.ChatRequest, responseC chan<- schemas.ChatResponse) error {
// TODO: implement health & latency tracking
return m.client.ChatStream(ctx, request, responseC)
err := m.client.ChatStream(ctx, request, responseC)

if err == nil {
return err
}

var rateLimitErr *clients.RateLimitError

if errors.As(err, &rateLimitErr) {
m.rateLimit.SetLimited(rateLimitErr.UntilReset())

return err
}

_ = m.errBudget.Take(1)

return err
}

func (m *LanguageModel) SupportChatStream() bool {
Expand Down
1 change: 1 addition & 0 deletions pkg/routers/routing/least_latency.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ const (
LeastLatency Strategy = "least_latency"
)

// LatencyGetter defines where to find latency for the specific model action
type LatencyGetter = func(model providers.Model) *latency.MovingAverage

// ModelSchedule defines latency update schedule for models
Expand Down

0 comments on commit 41d6b00

Please sign in to comment.