From a2292b52b8f5a747e7e5ed4de8359614f1986c4e Mon Sep 17 00:00:00 2001 From: streamer45 Date: Thu, 21 Dec 2023 15:35:13 -0600 Subject: [PATCH 1/3] Refactor --- service/config.go | 69 ++++++++++++++--------------------- service/config_test.go | 2 +- service/docker/service.go | 14 ++++++- service/helper_test.go | 4 ++ service/jobs_service.go | 17 +++++---- service/kubernetes/service.go | 14 ++++++- 6 files changed, 68 insertions(+), 52 deletions(-) diff --git a/service/config.go b/service/config.go index 5d8f491..2dd5d65 100644 --- a/service/config.go +++ b/service/config.go @@ -13,6 +13,8 @@ import ( "github.com/mattermost/calls-offloader/logger" "github.com/mattermost/calls-offloader/service/api" "github.com/mattermost/calls-offloader/service/auth" + "github.com/mattermost/calls-offloader/service/docker" + "github.com/mattermost/calls-offloader/service/kubernetes" "github.com/kelseyhightower/envconfig" ) @@ -78,10 +80,24 @@ const ( JobAPITypeKubernetes = "kubernetes" ) +// Alias is needed to implement custom unmarshaler. +type RetentionTime time.Duration + +func (rt *RetentionTime) UnmarshalText(data []byte) error { + d, err := parseRetentionTime(string(data)) + if err != nil { + return err + } + *rt = RetentionTime(d) + return nil +} + type JobsConfig struct { - APIType JobAPIType `toml:"api_type"` - MaxConcurrentJobs int `toml:"max_concurrent_jobs"` - FailedJobsRetentionTime time.Duration `toml:"failed_jobs_retention_time" ignored:"true"` + APIType JobAPIType `toml:"api_type"` + MaxConcurrentJobs int `toml:"max_concurrent_jobs"` + FailedJobsRetentionTime RetentionTime `toml:"failed_jobs_retention_time" ignored:"true"` + Kubernetes kubernetes.JobServiceConfig `toml:"kubernetes"` + Docker docker.JobServiceConfig `toml:"docker"` } // We need some custom parsing since duration doesn't support days. @@ -110,42 +126,6 @@ func parseRetentionTime(val string) (time.Duration, error) { return d, nil } -func (c *JobsConfig) UnmarshalTOML(data interface{}) error { - if c == nil { - return fmt.Errorf("invalid nil pointer") - } - - m, ok := data.(map[string]any) - if !ok { - return fmt.Errorf("invalid data type") - } - - apiType, ok := m["api_type"].(string) - if !ok { - return fmt.Errorf("invalid api_type type") - } - c.APIType = JobAPIType(apiType) - - maxConcurrentJobs, ok := m["max_concurrent_jobs"].(int64) - if !ok { - return fmt.Errorf("invalid max_concurrent_jobs type") - } - c.MaxConcurrentJobs = int(maxConcurrentJobs) - - if val, ok := m["failed_jobs_retention_time"]; ok { - retentionTime, ok := val.(string) - if !ok { - return fmt.Errorf("invalid failed_jobs_retention_time type") - } - - var err error - c.FailedJobsRetentionTime, err = parseRetentionTime(retentionTime) - return err - } - - return nil -} - func (c JobsConfig) IsValid() error { if c.APIType != JobAPITypeDocker && c.APIType != JobAPITypeKubernetes { return fmt.Errorf("invalid APIType value: %s", c.APIType) @@ -159,10 +139,17 @@ func (c JobsConfig) IsValid() error { return fmt.Errorf("invalid FailedJobsRetentionTime value: should be a positive duration") } - if c.FailedJobsRetentionTime > 0 && c.FailedJobsRetentionTime < time.Minute { + if c.FailedJobsRetentionTime > 0 && time.Duration(c.FailedJobsRetentionTime) < time.Minute { return fmt.Errorf("invalid FailedJobsRetentionTime value: should be at least one minute") } + switch c.APIType { + case JobAPITypeDocker: + return c.Docker.IsValid() + case JobAPITypeKubernetes: + return c.Kubernetes.IsValid() + } + return nil } @@ -179,7 +166,7 @@ func (c *Config) ParseFromEnv() error { if err != nil { return fmt.Errorf("failed to parse FailedJobsRetentionTime: %w", err) } - c.Jobs.FailedJobsRetentionTime = d + c.Jobs.FailedJobsRetentionTime = RetentionTime(d) } return envconfig.Process("", c) diff --git a/service/config_test.go b/service/config_test.go index b507971..95b11ea 100644 --- a/service/config_test.go +++ b/service/config_test.go @@ -84,7 +84,7 @@ func TestParseFromEnv(t *testing.T) { var cfg Config err := cfg.ParseFromEnv() require.NoError(t, err) - require.Equal(t, time.Hour*24, cfg.Jobs.FailedJobsRetentionTime) + require.Equal(t, RetentionTime(time.Hour*24), cfg.Jobs.FailedJobsRetentionTime) }) t.Run("override", func(t *testing.T) { diff --git a/service/docker/service.go b/service/docker/service.go index 7e15432..0e57e8f 100644 --- a/service/docker/service.go +++ b/service/docker/service.go @@ -49,6 +49,18 @@ type JobServiceConfig struct { FailedJobsRetentionTime time.Duration } +func (c JobServiceConfig) IsValid() error { + if c.MaxConcurrentJobs < 0 { + return fmt.Errorf("invalid MaxConcurrentJobs value: should be positive") + } + + if c.FailedJobsRetentionTime > 0 && c.FailedJobsRetentionTime < time.Minute { + return fmt.Errorf("invalid FailedJobsRetentionTime value: should be at least one minute") + } + + return nil +} + type JobService struct { cfg JobServiceConfig log mlog.LoggerIFace @@ -261,7 +273,7 @@ func (s *JobService) CreateJob(cfg job.Config, onStopCb job.StopCb) (job.Job, er if err != nil { return job.Job{}, fmt.Errorf("failed to list containers: %w", err) } - if len(containers) >= s.cfg.MaxConcurrentJobs { + if s.cfg.MaxConcurrentJobs > 0 && len(containers) >= s.cfg.MaxConcurrentJobs { if !devMode { return job.Job{}, fmt.Errorf("max concurrent jobs reached") } diff --git a/service/helper_test.go b/service/helper_test.go index 74e1135..1f5a735 100644 --- a/service/helper_test.go +++ b/service/helper_test.go @@ -12,6 +12,7 @@ import ( "github.com/mattermost/calls-offloader/public" "github.com/mattermost/calls-offloader/service/api" "github.com/mattermost/calls-offloader/service/auth" + "github.com/mattermost/calls-offloader/service/docker" "github.com/stretchr/testify/require" ) @@ -86,6 +87,9 @@ func MakeDefaultCfg(tb testing.TB) *Config { Jobs: JobsConfig{ APIType: JobAPITypeDocker, MaxConcurrentJobs: 2, + Docker: docker.JobServiceConfig{ + MaxConcurrentJobs: 2, + }, }, Logger: logger.Config{ EnableConsole: true, diff --git a/service/jobs_service.go b/service/jobs_service.go index c98882f..36f7cd8 100644 --- a/service/jobs_service.go +++ b/service/jobs_service.go @@ -6,6 +6,7 @@ package service import ( "fmt" "io" + "time" "github.com/mattermost/calls-offloader/public/job" "github.com/mattermost/calls-offloader/service/docker" @@ -29,15 +30,15 @@ func NewJobService(cfg JobsConfig, log mlog.LoggerIFace) (JobService, error) { switch cfg.APIType { case JobAPITypeDocker: - return docker.NewJobService(log, docker.JobServiceConfig{ - MaxConcurrentJobs: cfg.MaxConcurrentJobs, - FailedJobsRetentionTime: cfg.FailedJobsRetentionTime, - }) + cfg.Docker.MaxConcurrentJobs = cfg.MaxConcurrentJobs + cfg.Docker.FailedJobsRetentionTime = time.Duration(cfg.FailedJobsRetentionTime) + log.Info("creating new job service", mlog.Any("apiType", cfg.APIType), mlog.String("config", fmt.Sprintf("%+v", cfg.Docker))) + return docker.NewJobService(log, cfg.Docker) case JobAPITypeKubernetes: - return kubernetes.NewJobService(log, kubernetes.JobServiceConfig{ - MaxConcurrentJobs: cfg.MaxConcurrentJobs, - FailedJobsRetentionTime: cfg.FailedJobsRetentionTime, - }) + cfg.Kubernetes.MaxConcurrentJobs = cfg.MaxConcurrentJobs + cfg.Kubernetes.FailedJobsRetentionTime = time.Duration(cfg.FailedJobsRetentionTime) + log.Info("creating new job service", mlog.Any("apiType", cfg.APIType), mlog.String("config", fmt.Sprintf("%+v", cfg.Kubernetes))) + return kubernetes.NewJobService(log, cfg.Kubernetes) default: return nil, fmt.Errorf("%s API is not implemeneted", cfg.APIType) } diff --git a/service/kubernetes/service.go b/service/kubernetes/service.go index e503dcf..57e016e 100644 --- a/service/kubernetes/service.go +++ b/service/kubernetes/service.go @@ -44,6 +44,18 @@ type JobServiceConfig struct { FailedJobsRetentionTime time.Duration } +func (c JobServiceConfig) IsValid() error { + if c.MaxConcurrentJobs < 0 { + return fmt.Errorf("invalid MaxConcurrentJobs value: should be positive") + } + + if c.FailedJobsRetentionTime > 0 && c.FailedJobsRetentionTime < time.Minute { + return fmt.Errorf("invalid FailedJobsRetentionTime value: should be at least one minute") + } + + return nil +} + type JobService struct { cfg JobServiceConfig log mlog.LoggerIFace @@ -116,7 +128,7 @@ func (s *JobService) CreateJob(cfg job.Config, onStopCb job.StopCb) (job.Job, er if err != nil { return job.Job{}, fmt.Errorf("failed to list jobs: %w", err) } - if activeJobs := getActiveJobs(jobList.Items); activeJobs >= s.cfg.MaxConcurrentJobs { + if activeJobs := getActiveJobs(jobList.Items); s.cfg.MaxConcurrentJobs > 0 && activeJobs >= s.cfg.MaxConcurrentJobs { if !devMode { return job.Job{}, fmt.Errorf("max concurrent jobs reached") } From d6080a0a8f36c7cc1f48249f686d042190370601 Mon Sep 17 00:00:00 2001 From: streamer45 Date: Thu, 21 Dec 2023 16:49:11 -0600 Subject: [PATCH 2/3] Pass Kubernetes resource requirements --- config/config.sample.toml | 5 +++++ service/kubernetes/service.go | 27 ++++++++++++++++++++++++--- 2 files changed, 29 insertions(+), 3 deletions(-) diff --git a/config/config.sample.toml b/config/config.sample.toml index 39b2851..87d0814 100644 --- a/config/config.sample.toml +++ b/config/config.sample.toml @@ -37,6 +37,11 @@ max_concurrent_jobs = 2 # The supported units of time are "m" (minutes), "h" (hours) and "d" (days). failed_jobs_retention_time = "30d" +# Kubernetes API optionally supports definining resource limits and requests on +# a per job type basis. Example: +#[jobs.kubernetes] +#jobs_resource_requirements = '{"transcribing": {"limits": {"cpu": "2000m"}}, "recording": {"limits": {"cpu": "500"}}}' + [logger] # A boolean controlling whether to log to the console. enable_console = true diff --git a/service/kubernetes/service.go b/service/kubernetes/service.go index 57e016e..1dd3e5a 100644 --- a/service/kubernetes/service.go +++ b/service/kubernetes/service.go @@ -4,6 +4,7 @@ package kubernetes import ( + "bytes" "context" "fmt" "io" @@ -21,6 +22,7 @@ import ( batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/yaml" "k8s.io/apimachinery/pkg/watch" k8s "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" @@ -39,9 +41,27 @@ const ( transcribingJobPrefix = "calls-transcriber" ) +type JobsResourceRequirements map[job.Type]corev1.ResourceRequirements + +// Custom decoders to support passing JSON from both TOML config and env +// variable. + +func (r *JobsResourceRequirements) Decode(data string) error { + return yaml.NewYAMLOrJSONDecoder(bytes.NewBuffer([]byte(data)), 0).Decode(r) +} + +func (r *JobsResourceRequirements) UnmarshalTOML(data interface{}) error { + js, ok := data.(string) + if !ok { + return fmt.Errorf("invalid data found") + } + return yaml.NewYAMLOrJSONDecoder(bytes.NewBuffer([]byte(js)), 0).Decode(r) +} + type JobServiceConfig struct { - MaxConcurrentJobs int - FailedJobsRetentionTime time.Duration + MaxConcurrentJobs int + FailedJobsRetentionTime time.Duration + JobsResourceRequirements JobsResourceRequirements `toml:"jobs_resource_requirements"` } func (c JobServiceConfig) IsValid() error { @@ -243,7 +263,8 @@ func (s *JobService) CreateJob(cfg job.Config, onStopCb job.StopCb) (job.Job, er MountPath: k8sVolumePath, }, }, - Env: env, + Env: env, + Resources: s.cfg.JobsResourceRequirements[cfg.Type], }, }, Volumes: []corev1.Volume{ From 83178e742c503461592e938b33ab8611374bd0f3 Mon Sep 17 00:00:00 2001 From: streamer45 Date: Fri, 22 Dec 2023 09:18:36 -0600 Subject: [PATCH 3/3] Allow passing Kubernetes resource requirements to jobs --- config/config.sample.toml | 2 +- public/job/job.go | 2 ++ service/config_test.go | 54 +++++++++++++++++++++++++++++++++++ service/docker/service.go | 9 ++---- service/kubernetes/service.go | 13 +++------ 5 files changed, 63 insertions(+), 17 deletions(-) diff --git a/config/config.sample.toml b/config/config.sample.toml index 87d0814..1ded883 100644 --- a/config/config.sample.toml +++ b/config/config.sample.toml @@ -40,7 +40,7 @@ failed_jobs_retention_time = "30d" # Kubernetes API optionally supports definining resource limits and requests on # a per job type basis. Example: #[jobs.kubernetes] -#jobs_resource_requirements = '{"transcribing": {"limits": {"cpu": "2000m"}}, "recording": {"limits": {"cpu": "500"}}}' +#jobs_resource_requirements = '{"transcribing":{"limits":{"cpu":"4000m"},"requests":{"cpu":"2000m"}},"recording":{"limits":{"cpu":"2000m"},"requests":{"cpu":"1000m"}}}' [logger] # A boolean controlling whether to log to the console. diff --git a/public/job/job.go b/public/job/job.go index 26657d0..dd3379d 100644 --- a/public/job/job.go +++ b/public/job/job.go @@ -22,6 +22,8 @@ const ( const ( MinSupportedRecorderVersion = "0.6.0" MinSupportedTranscriberVersion = "0.1.0" + RecordingJobPrefix = "calls-recorder" + TranscribingJobPrefix = "calls-transcriber" ) var ( diff --git a/service/config_test.go b/service/config_test.go index 95b11ea..2d1cae5 100644 --- a/service/config_test.go +++ b/service/config_test.go @@ -4,10 +4,17 @@ package service import ( + "encoding/json" "os" "testing" "time" + "github.com/mattermost/calls-offloader/public/job" + "github.com/mattermost/calls-offloader/service/kubernetes" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + "github.com/stretchr/testify/require" ) @@ -97,4 +104,51 @@ func TestParseFromEnv(t *testing.T) { require.NoError(t, err) require.Equal(t, JobAPITypeDocker, cfg.Jobs.APIType) }) + + t.Run("kubernetes.JobsResourceRequirements", func(t *testing.T) { + requirements := make(kubernetes.JobsResourceRequirements) + + t.Run("empty", func(t *testing.T) { + js, err := json.Marshal(requirements) + require.NoError(t, err) + + os.Setenv("JOBS_KUBERNETES_JOBSRESOURCEREQUIREMENTS", string(js)) + defer os.Unsetenv("JOBS_KUBERNETES_JOBSRESOURCEREQUIREMENTS") + + var cfg Config + err = cfg.ParseFromEnv() + require.NoError(t, err) + require.Equal(t, requirements, cfg.Jobs.Kubernetes.JobsResourceRequirements) + }) + + t.Run("defined", func(t *testing.T) { + requirements[job.TypeRecording] = corev1.ResourceRequirements{ + Limits: corev1.ResourceList{ + "cpu": resource.MustParse("1"), + }, + Requests: corev1.ResourceList{ + "cpu": resource.MustParse("1"), + }, + } + requirements[job.TypeTranscribing] = corev1.ResourceRequirements{ + Limits: corev1.ResourceList{ + "cpu": resource.MustParse("1"), + }, + Requests: corev1.ResourceList{ + "cpu": resource.MustParse("1"), + }, + } + + js, err := json.Marshal(requirements) + require.NoError(t, err) + + os.Setenv("JOBS_KUBERNETES_JOBSRESOURCEREQUIREMENTS", string(js)) + defer os.Unsetenv("JOBS_KUBERNETES_JOBSRESOURCEREQUIREMENTS") + + var cfg Config + err = cfg.ParseFromEnv() + require.NoError(t, err) + require.Equal(t, requirements, cfg.Jobs.Kubernetes.JobsResourceRequirements) + }) + }) } diff --git a/service/docker/service.go b/service/docker/service.go index 0e57e8f..0a76f2e 100644 --- a/service/docker/service.go +++ b/service/docker/service.go @@ -34,11 +34,6 @@ const ( dockerVolumePath = "/data" ) -const ( - recordingJobPrefix = "calls-recorder" - transcribingJobPrefix = "calls-transcriber" -) - var ( dockerStopTimeout = 5 * time.Minute dockerRetentionJobInterval = time.Minute @@ -292,13 +287,13 @@ func (s *JobService) CreateJob(cfg job.Config, onStopCb job.StopCb) (job.Job, er var jobData recorder.RecorderConfig jobData.FromMap(cfg.InputData) jobData.SiteURL = getSiteURLForJob(jobData.SiteURL) - jobPrefix = recordingJobPrefix + jobPrefix = job.RecordingJobPrefix env = append(env, jobData.ToEnv()...) case job.TypeTranscribing: var jobData transcriber.CallTranscriberConfig jobData.FromMap(cfg.InputData) jobData.SiteURL = getSiteURLForJob(jobData.SiteURL) - jobPrefix = transcribingJobPrefix + jobPrefix = job.TranscribingJobPrefix env = append(env, jobData.ToEnv()...) } diff --git a/service/kubernetes/service.go b/service/kubernetes/service.go index 1dd3e5a..fccd7df 100644 --- a/service/kubernetes/service.go +++ b/service/kubernetes/service.go @@ -36,16 +36,11 @@ const ( k8sVolumePath = "/data" ) -const ( - recordingJobPrefix = "calls-recorder" - transcribingJobPrefix = "calls-transcriber" -) +// Type alias and custom decoders to support passing JSON from both TOML config and env +// variable. type JobsResourceRequirements map[job.Type]corev1.ResourceRequirements -// Custom decoders to support passing JSON from both TOML config and env -// variable. - func (r *JobsResourceRequirements) Decode(data string) error { return yaml.NewYAMLOrJSONDecoder(bytes.NewBuffer([]byte(data)), 0).Decode(r) } @@ -166,7 +161,7 @@ func (s *JobService) CreateJob(cfg job.Config, onStopCb job.StopCb) (job.Job, er jobCfg.FromMap(cfg.InputData) jobCfg.SetDefaults() jobCfg.SiteURL = getSiteURLForJob(jobCfg.SiteURL) - jobPrefix = recordingJobPrefix + jobPrefix = job.RecordingJobPrefix jobID = jobPrefix + "-job-" + random.NewID() env = append(env, getEnvFromJobConfig(jobCfg)...) initContainers = []corev1.Container{ @@ -191,7 +186,7 @@ func (s *JobService) CreateJob(cfg job.Config, onStopCb job.StopCb) (job.Job, er jobCfg.FromMap(cfg.InputData) jobCfg.SetDefaults() jobCfg.SiteURL = getSiteURLForJob(jobCfg.SiteURL) - jobPrefix = transcribingJobPrefix + jobPrefix = job.TranscribingJobPrefix jobID = jobPrefix + "-job-" + random.NewID() env = append(env, getEnvFromJobConfig(jobCfg)...) }