From d354cbcd736b7cfb62def425c56a0dcf60e44bf4 Mon Sep 17 00:00:00 2001 From: Raja Kantamaneni Date: Wed, 10 May 2017 08:50:59 -0700 Subject: [PATCH] Add support for Hybrik as a transcoding provider Initial implementation of the video-transcoding-api provider interface for Hybrik. Majority coding credit to Leonardo Soares (leonardogcsoares) --- config/config.go | 16 +- main.go | 2 + provider/hybrik/hybrik.go | 457 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 474 insertions(+), 1 deletion(-) create mode 100644 provider/hybrik/hybrik.go diff --git a/config/config.go b/config/config.go index a2fbf33a..f1be514c 100644 --- a/config/config.go +++ b/config/config.go @@ -16,6 +16,7 @@ type Config struct { EncodingCom *EncodingCom ElasticTranscoder *ElasticTranscoder ElementalConductor *ElementalConductor + Hybrik *Hybrik Zencoder *Zencoder Bitmovin *Bitmovin } @@ -72,6 +73,18 @@ type Bitmovin struct { EncodingVersion string `envconfig:"BITMOVIN_ENCODING_VERSION" default:"STABLE"` } +// Hybrik represents the set of configurations for the Hybrik +// provider. +type Hybrik struct { + URL string `envconfig:"HYBRIK_URL"` + ComplianceDate string `envconfig:"HYBRIK_COMPLIANCE_DATE"` + OAPIKey string `envconfig:"HYBRIK_OAPI_KEY"` + OAPISecret string `envconfig:"HYBRIK_OAPI_SECRET"` + AuthKey string `envconfig:"HYBRIK_AUTH_KEY"` + AuthSecret string `envconfig:"HYBRIK_AUTH_SECRET"` + Destination string `envconfig:"HYBRIK_DESTINATION"` +} + // LoadConfig loads the configuration of the API using environment variables. func LoadConfig() *Config { cfg := Config{ @@ -80,10 +93,11 @@ func LoadConfig() *Config { ElasticTranscoder: new(ElasticTranscoder), ElementalConductor: new(ElementalConductor), Bitmovin: new(Bitmovin), + Hybrik: new(Hybrik), Server: new(server.Config), } config.LoadEnvConfig(&cfg) - loadFromEnv(cfg.Redis, cfg.EncodingCom, cfg.ElasticTranscoder, cfg.ElementalConductor, cfg.Bitmovin, cfg.Server) + loadFromEnv(cfg.Redis, cfg.EncodingCom, cfg.ElasticTranscoder, cfg.ElementalConductor, cfg.Bitmovin, cfg.Hybrik, cfg.Server) return &cfg } diff --git a/main.go b/main.go index 8715c241..b221f3e7 100644 --- a/main.go +++ b/main.go @@ -8,6 +8,7 @@ import ( _ "github.com/NYTimes/video-transcoding-api/provider/elastictranscoder" _ "github.com/NYTimes/video-transcoding-api/provider/elementalconductor" _ "github.com/NYTimes/video-transcoding-api/provider/encodingcom" + _ "github.com/NYTimes/video-transcoding-api/provider/hybrik" _ "github.com/NYTimes/video-transcoding-api/provider/zencoder" "github.com/NYTimes/video-transcoding-api/service" "github.com/google/gops/agent" @@ -25,6 +26,7 @@ func main() { server.Init("video-transcoding-api", cfg.Server) server.Log.Hooks.Add(logrus_stack.StandardHook()) server.Log.Hooks.Add(logrus_env.NewHook([]string{"ENVIRONMENT"})) + gcpLoggingHook, err := sdhook.New( sdhook.GoogleLoggingAgent(), sdhook.ErrorReportingService("video-transcoding-api"), diff --git a/provider/hybrik/hybrik.go b/provider/hybrik/hybrik.go new file mode 100644 index 00000000..bcd5cdaa --- /dev/null +++ b/provider/hybrik/hybrik.go @@ -0,0 +1,457 @@ +package hybrik + +import ( + "encoding/json" + "fmt" + "path" + "strconv" + "strings" + + hwrapper "github.com/NYTimes/encoding-wrapper/hybrik" + "github.com/NYTimes/video-transcoding-api/config" + "github.com/NYTimes/video-transcoding-api/db" + "github.com/NYTimes/video-transcoding-api/provider" +) + +const ( + // Name describes the name of the transcoder + Name = "hybrik" + queued = "queued" + active = "active" + completed = "completed" + failed = "failed" + activeRunning = "running" + activeWaiting = "waiting" +) + +var ( + // ErrBitrateNan is an error returned when the bitrate field of db.Preset is not a valid number + ErrBitrateNan = fmt.Errorf("bitrate not a number") + + // ErrPresetOutputMatch represents an error in the hybrik encoding-wrapper provider. + ErrPresetOutputMatch = fmt.Errorf("preset retrieved does not map to hybrik.Preset struct") + + // ErrVideoWidthNan is an error returned when the preset video width of db.Preset is not a valid number + ErrVideoWidthNan = fmt.Errorf("preset video width not a number") + // ErrVideoHeightNan is an error returned when the preset video height of db.Preset is not a valid number + ErrVideoHeightNan = fmt.Errorf("preset video height not a number") + + // ErrUnsupportedContainer is returned when the container format is not present in the provider's capabilities list + ErrUnsupportedContainer = fmt.Errorf("container format unsupported. Hybrik provider capabilities may need to be updated") +) + +func init() { + provider.Register(Name, hybrikTranscoderFactory) +} + +type hybrikProvider struct { + c hwrapper.ClientInterface + config *config.Hybrik +} + +func (hp hybrikProvider) String() string { + return "Hybrik" +} + +func hybrikTranscoderFactory(cfg *config.Config) (provider.TranscodingProvider, error) { + + api, err := hwrapper.NewClient(hwrapper.Config{ + URL: cfg.Hybrik.URL, + ComplianceDate: cfg.Hybrik.ComplianceDate, + OAPIKey: cfg.Hybrik.OAPIKey, + OAPISecret: cfg.Hybrik.OAPISecret, + AuthKey: cfg.Hybrik.AuthKey, + AuthSecret: cfg.Hybrik.AuthSecret, + }) + if err != nil { + return &hybrikProvider{}, err + } + + return &hybrikProvider{ + c: api, + config: cfg.Hybrik, + }, nil +} + +func (hp *hybrikProvider) Transcode(job *db.Job) (*provider.JobStatus, error) { + + cj, err := hp.presetsToTranscodeJob(job) + if err != nil { + return &provider.JobStatus{}, err + } + + id, err := hp.c.QueueJob(cj) + if err != nil { + return &provider.JobStatus{}, err + } + + return &provider.JobStatus{ + ProviderName: Name, + ProviderJobID: id, + Status: provider.StatusQueued, + }, nil +} + +func (hp *hybrikProvider) mountTranscodeElement(elementID, id, outputFilename, destination string, duration uint, preset hwrapper.Preset) (hwrapper.Element, error) { + var e hwrapper.Element + var subLocation *hwrapper.TranscodeLocation + + // outputFilename can be "test.mp4", or "subfolder1/subfodler2/test.mp4" + // Handling accordingly + subPath := path.Dir(outputFilename) + outputFilePattern := path.Base(outputFilename) + if subPath != "." && subPath != "/" { + subLocation = &hwrapper.TranscodeLocation{ + StorageProvider: "relative", + Path: subPath, + } + } else { + subLocation = nil + } + + // create the transcode element + e = hwrapper.Element{ + UID: "transcode_task" + elementID, + Kind: "transcode", + Task: &hwrapper.ElementTaskOptions{ + Name: "Transcode - " + preset.Name, + }, + Preset: &hwrapper.TranscodePreset{ + Key: preset.Name, + }, + Payload: hwrapper.LocationTargetPayload{ + Location: hwrapper.TranscodeLocation{ + StorageProvider: "s3", + Path: fmt.Sprintf("%s/j%s", destination, id), + }, + Targets: []hwrapper.TranscodeLocationTarget{ + hwrapper.TranscodeLocationTarget{ + Location: subLocation, + FilePattern: outputFilePattern, + Container: hwrapper.TranscodeTargetContainer{ + SegmentDuration: duration, + }, + }, + }, + }, + } + + return e, nil +} + +func (hp *hybrikProvider) presetsToTranscodeJob(job *db.Job) (string, error) { + elements := []hwrapper.Element{} + var hlsElementIds []int + + // create a source element + sourceElement := hwrapper.Element{ + UID: "source_file", + Kind: "source", + Payload: hwrapper.ElementPayload{ + Kind: "asset_url", + Payload: hwrapper.AssetPayload{ + StorageProvider: "s3", + URL: job.SourceMedia, + }, + }, + } + + elements = append(elements, sourceElement) + + // create transcode elements for each target + // TODO: This can be optimized further with regards to combining tasks so that they run in the same machine. Requires some discussion + elementID := 0 + for _, output := range job.Outputs { + presetID, ok := output.Preset.ProviderMapping[Name] + if !ok { + return "", provider.ErrPresetMapNotFound + } + presetOutput, err := hp.GetPreset(presetID) + if err != nil { + return "", fmt.Errorf("Error getting preset info: %s", err.Error()) + } + + preset, ok := presetOutput.(hwrapper.Preset) + if !ok { + return "", ErrPresetOutputMatch + } + + e, err := hp.mountTranscodeElement(strconv.Itoa(elementID), job.ID, output.FileName, hp.config.Destination, job.StreamingParams.SegmentDuration, preset) + if err != nil { + return "", err + } + + elements = append(elements, e) + + // track the hls outputs so we can later connect them to a manifest creator task + if len(preset.Payload.Targets) > 0 && preset.Payload.Targets[0].Container.Kind == "hls" { + hlsElementIds = append(hlsElementIds, elementID) + } + + elementID++ + } + + // connect the source element to each of the transcode elements + var transcodeSuccessConnections []hwrapper.ToSuccess + for i := 0; i < elementID; i++ { + transcodeSuccessConnections = append(transcodeSuccessConnections, hwrapper.ToSuccess{Element: "transcode_task" + strconv.Itoa(i)}) + } + + // create the full job structure + cj := hwrapper.CreateJob{ + Name: fmt.Sprintf("Job %s [%s]", job.ID, path.Base(job.SourceMedia)), + Payload: hwrapper.CreateJobPayload{ + Elements: elements, + Connections: []hwrapper.Connection{ + hwrapper.Connection{ + From: []hwrapper.ConnectionFrom{ + hwrapper.ConnectionFrom{ + Element: "source_file", + }, + }, + To: hwrapper.ConnectionTo{ + Success: transcodeSuccessConnections, + }, + }, + }, + }, + } + + // check if we need to add a master manifest task element + if job.StreamingParams.Protocol == "hls" { + manifestOutputDir := fmt.Sprintf("%s/j%s", hp.config.Destination, job.ID) + manifestSubDir := path.Dir(job.StreamingParams.PlaylistFileName) + manifestFilePattern := path.Base(job.StreamingParams.PlaylistFileName) + + if manifestSubDir != "." && manifestSubDir != "/" { + manifestOutputDir = path.Join(manifestOutputDir, manifestSubDir) + } + + manifestElement := hwrapper.Element{ + UID: "manifest_creator", + Kind: "manifest_creator", + Payload: hwrapper.ManifestCreatorPayload{ + Location: hwrapper.TranscodeLocation{ + StorageProvider: "s3", + Path: manifestOutputDir, + }, + FilePattern: manifestFilePattern, + Kind: "hls", + }, + } + + cj.Payload.Elements = append(cj.Payload.Elements, manifestElement) + + var manifestFromConnections []hwrapper.ConnectionFrom + for _, hlsElementID := range hlsElementIds { + manifestFromConnections = append(manifestFromConnections, hwrapper.ConnectionFrom{Element: "transcode_task" + strconv.Itoa(hlsElementID)}) + } + + cj.Payload.Connections = append(cj.Payload.Connections, + hwrapper.Connection{ + From: manifestFromConnections, + To: hwrapper.ConnectionTo{ + Success: []hwrapper.ToSuccess{ + hwrapper.ToSuccess{Element: "manifest_creator"}, + }, + }, + }, + ) + + } + + resp, err := json.Marshal(cj) + if err != nil { + return "", err + } + + return string(resp), nil +} + +func (hp *hybrikProvider) JobStatus(job *db.Job) (*provider.JobStatus, error) { + + ji, err := hp.c.GetJobInfo(job.ProviderJobID) + if err != nil { + return &provider.JobStatus{}, err + } + + var status provider.Status + switch ji.Status { + case active: + fallthrough + case activeRunning: + fallthrough + case activeWaiting: + status = provider.StatusStarted + case queued: + status = provider.StatusQueued + case completed: + status = provider.StatusFinished + case failed: + status = provider.StatusFailed + } + + return &provider.JobStatus{ + ProviderJobID: job.ProviderJobID, + ProviderName: hp.String(), + Progress: float64(ji.Progress), + Status: status, + }, nil +} + +func (hp *hybrikProvider) getOutputDestination(name string) string { + return "" +} + +func (hp *hybrikProvider) getOutputFiles() []provider.OutputFile { + return []provider.OutputFile{} +} + +func (hp *hybrikProvider) CancelJob(id string) error { + return hp.c.StopJob(id) +} + +func (hp *hybrikProvider) CreatePreset(preset db.Preset) (string, error) { + var gopSize int + var gopMode bool + if preset.Video.GopMode == "fixed" { + gopMode = true + gopSize = 90 + if preset.Video.GopSize != "" { + gopSize, _ = strconv.Atoi(preset.Video.GopSize) + } + } else { + gopMode = false + } + + container := "" + for _, c := range hp.Capabilities().OutputFormats { + if preset.Container == c || (preset.Container == "m3u8" && c == "hls") { + container = c + } + } + + if container == "" { + return "", ErrUnsupportedContainer + } + + bitrate, err := strconv.Atoi(preset.Video.Bitrate) + if err != nil { + return "", ErrBitrateNan + } + + audioBitrate, err := strconv.Atoi(preset.Audio.Bitrate) + if err != nil { + return "", ErrBitrateNan + } + + var videoWidth *int + var videoHeight *int + + if preset.Video.Width != "" { + presetWidth, err := strconv.Atoi(preset.Video.Width) + if err != nil { + return "", ErrVideoWidthNan + } + videoWidth = &presetWidth + } else { + videoWidth = nil + } + + if preset.Video.Height != "" { + presetHeight, err := strconv.Atoi(preset.Video.Height) + if err != nil { + return "", ErrVideoHeightNan + } + videoHeight = &presetHeight + } else { + videoHeight = nil + } + + videoProfile := strings.ToLower(preset.Video.Profile) + videoLevel := preset.Video.ProfileLevel + + // TODO: Understand video-transcoding-api profile + level settings in relation to vp8 + // For now, we will omit and leave to encoder defaults + if preset.Video.Codec == "vp8" { + videoProfile = "" + videoLevel = "" + } + + p := hwrapper.Preset{ + Key: preset.Name, + Name: preset.Name, + Description: preset.Description, + Kind: "transcode", + Path: "video-transcoding-api-presets", + Payload: hwrapper.PresetPayload{ + Targets: []hwrapper.PresetTarget{ + hwrapper.PresetTarget{ + FilePattern: "", + Container: hwrapper.TranscodeContainer{Kind: container}, + Video: hwrapper.VideoTarget{ + Width: videoWidth, + Height: videoHeight, + Codec: preset.Video.Codec, + BitrateKb: bitrate / 1000, + GopMode: gopMode, + GopSize: gopSize, + Profile: videoProfile, + Level: videoLevel, + InterlaceMode: preset.Video.InterlaceMode, + }, + Audio: []hwrapper.AudioTarget{ + hwrapper.AudioTarget{ + Codec: preset.Audio.Codec, + BitrateKb: audioBitrate / 1000, + }, + }, + ExistingFiles: "replace", + UID: "target", + }, + }, + }, + } + + resultPreset, err := hp.c.CreatePreset(p) + if err != nil { + return "", err + } + + return resultPreset.Name, nil +} + +func (hp *hybrikProvider) DeletePreset(presetID string) error { + return hp.c.DeletePreset(presetID) +} + +func (hp *hybrikProvider) GetPreset(presetID string) (interface{}, error) { + preset, err := hp.c.GetPreset(presetID) + if err != nil { + return nil, err + } + + return preset, nil +} + +// Healthcheck should return nil if the provider is currently available +// for transcoding videos, otherwise it should return an error +// explaining what's going on. +func (hp *hybrikProvider) Healthcheck() error { + err := hp.c.HealthCheck() + if err != nil { + return err + } + + return nil +} + +// Capabilities describes the capabilities of the provider. +func (hp *hybrikProvider) Capabilities() provider.Capabilities { + // we can support quite a bit more format wise, but unsure of schema so limiting to known supported video-transcoding-api formats for now... + return provider.Capabilities{ + InputFormats: []string{"prores", "h264"}, + OutputFormats: []string{"mp4", "hls", "webm", "mov"}, + Destinations: []string{"s3"}, + } +}