Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[MM-56184] Allow passing Kubernetes resource requirements to jobs #49

Merged
merged 3 commits into from
Jan 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions config/config.sample.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ max_concurrent_jobs = 2
# The supported units of time are "m" (minutes), "h" (hours) and "d" (days).
failed_jobs_retention_time = "30d"

# Kubernetes API optionally supports definining resource limits and requests on
# a per job type basis. Example:
#[jobs.kubernetes]
#jobs_resource_requirements = '{"transcribing":{"limits":{"cpu":"4000m"},"requests":{"cpu":"2000m"}},"recording":{"limits":{"cpu":"2000m"},"requests":{"cpu":"1000m"}}}'

[logger]
# A boolean controlling whether to log to the console.
enable_console = true
Expand Down
2 changes: 2 additions & 0 deletions public/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ const (
const (
MinSupportedRecorderVersion = "0.6.0"
MinSupportedTranscriberVersion = "0.1.0"
RecordingJobPrefix = "calls-recorder"
TranscribingJobPrefix = "calls-transcriber"
)

var (
Expand Down
69 changes: 28 additions & 41 deletions service/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"github.com/mattermost/calls-offloader/logger"
"github.com/mattermost/calls-offloader/service/api"
"github.com/mattermost/calls-offloader/service/auth"
"github.com/mattermost/calls-offloader/service/docker"
"github.com/mattermost/calls-offloader/service/kubernetes"

"github.com/kelseyhightower/envconfig"
)
Expand Down Expand Up @@ -78,10 +80,24 @@ const (
JobAPITypeKubernetes = "kubernetes"
)

// Alias is needed to implement custom unmarshaler.
type RetentionTime time.Duration

func (rt *RetentionTime) UnmarshalText(data []byte) error {
d, err := parseRetentionTime(string(data))
if err != nil {
return err
}
*rt = RetentionTime(d)
return nil
}

type JobsConfig struct {
APIType JobAPIType `toml:"api_type"`
MaxConcurrentJobs int `toml:"max_concurrent_jobs"`
FailedJobsRetentionTime time.Duration `toml:"failed_jobs_retention_time" ignored:"true"`
APIType JobAPIType `toml:"api_type"`
MaxConcurrentJobs int `toml:"max_concurrent_jobs"`
FailedJobsRetentionTime RetentionTime `toml:"failed_jobs_retention_time" ignored:"true"`
Kubernetes kubernetes.JobServiceConfig `toml:"kubernetes"`
Docker docker.JobServiceConfig `toml:"docker"`
}

// We need some custom parsing since duration doesn't support days.
Expand Down Expand Up @@ -110,42 +126,6 @@ func parseRetentionTime(val string) (time.Duration, error) {
return d, nil
}

func (c *JobsConfig) UnmarshalTOML(data interface{}) error {
if c == nil {
return fmt.Errorf("invalid nil pointer")
}

m, ok := data.(map[string]any)
if !ok {
return fmt.Errorf("invalid data type")
}

apiType, ok := m["api_type"].(string)
if !ok {
return fmt.Errorf("invalid api_type type")
}
c.APIType = JobAPIType(apiType)

maxConcurrentJobs, ok := m["max_concurrent_jobs"].(int64)
if !ok {
return fmt.Errorf("invalid max_concurrent_jobs type")
}
c.MaxConcurrentJobs = int(maxConcurrentJobs)

if val, ok := m["failed_jobs_retention_time"]; ok {
retentionTime, ok := val.(string)
if !ok {
return fmt.Errorf("invalid failed_jobs_retention_time type")
}

var err error
c.FailedJobsRetentionTime, err = parseRetentionTime(retentionTime)
return err
}

return nil
}

func (c JobsConfig) IsValid() error {
if c.APIType != JobAPITypeDocker && c.APIType != JobAPITypeKubernetes {
return fmt.Errorf("invalid APIType value: %s", c.APIType)
Expand All @@ -159,10 +139,17 @@ func (c JobsConfig) IsValid() error {
return fmt.Errorf("invalid FailedJobsRetentionTime value: should be a positive duration")
}

if c.FailedJobsRetentionTime > 0 && c.FailedJobsRetentionTime < time.Minute {
if c.FailedJobsRetentionTime > 0 && time.Duration(c.FailedJobsRetentionTime) < time.Minute {
return fmt.Errorf("invalid FailedJobsRetentionTime value: should be at least one minute")
}

switch c.APIType {
case JobAPITypeDocker:
return c.Docker.IsValid()
case JobAPITypeKubernetes:
return c.Kubernetes.IsValid()
}

return nil
}

Expand All @@ -179,7 +166,7 @@ func (c *Config) ParseFromEnv() error {
if err != nil {
return fmt.Errorf("failed to parse FailedJobsRetentionTime: %w", err)
}
c.Jobs.FailedJobsRetentionTime = d
c.Jobs.FailedJobsRetentionTime = RetentionTime(d)
}

return envconfig.Process("", c)
Expand Down
56 changes: 55 additions & 1 deletion service/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,17 @@
package service

import (
"encoding/json"
"os"
"testing"
"time"

"github.com/mattermost/calls-offloader/public/job"
"github.com/mattermost/calls-offloader/service/kubernetes"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"

"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -84,7 +91,7 @@ func TestParseFromEnv(t *testing.T) {
var cfg Config
err := cfg.ParseFromEnv()
require.NoError(t, err)
require.Equal(t, time.Hour*24, cfg.Jobs.FailedJobsRetentionTime)
require.Equal(t, RetentionTime(time.Hour*24), cfg.Jobs.FailedJobsRetentionTime)
})

t.Run("override", func(t *testing.T) {
Expand All @@ -97,4 +104,51 @@ func TestParseFromEnv(t *testing.T) {
require.NoError(t, err)
require.Equal(t, JobAPITypeDocker, cfg.Jobs.APIType)
})

t.Run("kubernetes.JobsResourceRequirements", func(t *testing.T) {
requirements := make(kubernetes.JobsResourceRequirements)

t.Run("empty", func(t *testing.T) {
js, err := json.Marshal(requirements)
require.NoError(t, err)

os.Setenv("JOBS_KUBERNETES_JOBSRESOURCEREQUIREMENTS", string(js))
defer os.Unsetenv("JOBS_KUBERNETES_JOBSRESOURCEREQUIREMENTS")

var cfg Config
err = cfg.ParseFromEnv()
require.NoError(t, err)
require.Equal(t, requirements, cfg.Jobs.Kubernetes.JobsResourceRequirements)
})

t.Run("defined", func(t *testing.T) {
requirements[job.TypeRecording] = corev1.ResourceRequirements{
Limits: corev1.ResourceList{
"cpu": resource.MustParse("1"),
},
Requests: corev1.ResourceList{
"cpu": resource.MustParse("1"),
},
}
requirements[job.TypeTranscribing] = corev1.ResourceRequirements{
Limits: corev1.ResourceList{
"cpu": resource.MustParse("1"),
},
Requests: corev1.ResourceList{
"cpu": resource.MustParse("1"),
},
}

js, err := json.Marshal(requirements)
require.NoError(t, err)

os.Setenv("JOBS_KUBERNETES_JOBSRESOURCEREQUIREMENTS", string(js))
defer os.Unsetenv("JOBS_KUBERNETES_JOBSRESOURCEREQUIREMENTS")

var cfg Config
err = cfg.ParseFromEnv()
require.NoError(t, err)
require.Equal(t, requirements, cfg.Jobs.Kubernetes.JobsResourceRequirements)
})
})
}
23 changes: 15 additions & 8 deletions service/docker/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,6 @@ const (
dockerVolumePath = "/data"
)

const (
recordingJobPrefix = "calls-recorder"
transcribingJobPrefix = "calls-transcriber"
)

var (
dockerStopTimeout = 5 * time.Minute
dockerRetentionJobInterval = time.Minute
Expand All @@ -49,6 +44,18 @@ type JobServiceConfig struct {
FailedJobsRetentionTime time.Duration
}

func (c JobServiceConfig) IsValid() error {
if c.MaxConcurrentJobs < 0 {
return fmt.Errorf("invalid MaxConcurrentJobs value: should be positive")
}

if c.FailedJobsRetentionTime > 0 && c.FailedJobsRetentionTime < time.Minute {
return fmt.Errorf("invalid FailedJobsRetentionTime value: should be at least one minute")
}

return nil
}

type JobService struct {
cfg JobServiceConfig
log mlog.LoggerIFace
Expand Down Expand Up @@ -261,7 +268,7 @@ func (s *JobService) CreateJob(cfg job.Config, onStopCb job.StopCb) (job.Job, er
if err != nil {
return job.Job{}, fmt.Errorf("failed to list containers: %w", err)
}
if len(containers) >= s.cfg.MaxConcurrentJobs {
if s.cfg.MaxConcurrentJobs > 0 && len(containers) >= s.cfg.MaxConcurrentJobs {
if !devMode {
return job.Job{}, fmt.Errorf("max concurrent jobs reached")
}
Expand All @@ -280,13 +287,13 @@ func (s *JobService) CreateJob(cfg job.Config, onStopCb job.StopCb) (job.Job, er
var jobData recorder.RecorderConfig
jobData.FromMap(cfg.InputData)
jobData.SiteURL = getSiteURLForJob(jobData.SiteURL)
jobPrefix = recordingJobPrefix
jobPrefix = job.RecordingJobPrefix
env = append(env, jobData.ToEnv()...)
case job.TypeTranscribing:
var jobData transcriber.CallTranscriberConfig
jobData.FromMap(cfg.InputData)
jobData.SiteURL = getSiteURLForJob(jobData.SiteURL)
jobPrefix = transcribingJobPrefix
jobPrefix = job.TranscribingJobPrefix
env = append(env, jobData.ToEnv()...)
}

Expand Down
4 changes: 4 additions & 0 deletions service/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/mattermost/calls-offloader/public"
"github.com/mattermost/calls-offloader/service/api"
"github.com/mattermost/calls-offloader/service/auth"
"github.com/mattermost/calls-offloader/service/docker"

"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -86,6 +87,9 @@ func MakeDefaultCfg(tb testing.TB) *Config {
Jobs: JobsConfig{
APIType: JobAPITypeDocker,
MaxConcurrentJobs: 2,
Docker: docker.JobServiceConfig{
MaxConcurrentJobs: 2,
},
},
Logger: logger.Config{
EnableConsole: true,
Expand Down
17 changes: 9 additions & 8 deletions service/jobs_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package service
import (
"fmt"
"io"
"time"

"github.com/mattermost/calls-offloader/public/job"
"github.com/mattermost/calls-offloader/service/docker"
Expand All @@ -29,15 +30,15 @@ func NewJobService(cfg JobsConfig, log mlog.LoggerIFace) (JobService, error) {

switch cfg.APIType {
case JobAPITypeDocker:
return docker.NewJobService(log, docker.JobServiceConfig{
MaxConcurrentJobs: cfg.MaxConcurrentJobs,
FailedJobsRetentionTime: cfg.FailedJobsRetentionTime,
})
cfg.Docker.MaxConcurrentJobs = cfg.MaxConcurrentJobs
cfg.Docker.FailedJobsRetentionTime = time.Duration(cfg.FailedJobsRetentionTime)
log.Info("creating new job service", mlog.Any("apiType", cfg.APIType), mlog.String("config", fmt.Sprintf("%+v", cfg.Docker)))
return docker.NewJobService(log, cfg.Docker)
case JobAPITypeKubernetes:
return kubernetes.NewJobService(log, kubernetes.JobServiceConfig{
MaxConcurrentJobs: cfg.MaxConcurrentJobs,
FailedJobsRetentionTime: cfg.FailedJobsRetentionTime,
})
cfg.Kubernetes.MaxConcurrentJobs = cfg.MaxConcurrentJobs
cfg.Kubernetes.FailedJobsRetentionTime = time.Duration(cfg.FailedJobsRetentionTime)
log.Info("creating new job service", mlog.Any("apiType", cfg.APIType), mlog.String("config", fmt.Sprintf("%+v", cfg.Kubernetes)))
return kubernetes.NewJobService(log, cfg.Kubernetes)
default:
return nil, fmt.Errorf("%s API is not implemeneted", cfg.APIType)
}
Expand Down
Loading
Loading