Skip to content

Commit

Permalink
Fix/deployment (#72)
Browse files Browse the repository at this point in the history
* fix: for deploment on VM

* feat: added gpac package in consumer container

* feat: added consumer changes
  • Loading branch information
abskrj authored Sep 18, 2023
1 parent d1ef778 commit bf27349
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 26 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@
.vscode

downloads
output
output
*credentials*
9 changes: 4 additions & 5 deletions service/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,13 @@ func VideoProcessConsumer(ch *rmq.Channel, q *rmq.Queue) {

var video types.Video

log.Println("Request Consumed: ", video)

err := json.Unmarshal(d.Body, &video)
if err != nil {
log.Println(err)
continue
}

log.Println("Request Consumed: ", video)
go processVideo(&video, guard)
}
}()
Expand Down Expand Up @@ -90,10 +89,10 @@ func processVideo(video *types.Video, guard <-chan int) {
outputDir, err := utils.GetOutputFilePathName(videoFileName, "")
utils.LogErr(err)

err = os.RemoveAll(outputDir)
utils.LogErr(err)

utils.UploadToCloudStorage(uploader, outputDir)

utils.LogErr(err)
err = os.RemoveAll(outputDir)

<-guard
}
12 changes: 6 additions & 6 deletions service/dash.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ func generateDash(fileName string, watermark types.WaterMark) {

wg.Add(len(constants.AudioFileTypeMap) + len(constants.VideoFileTypeMap) + 1)

generateAudioFiles(targetFile, outputPath, &wg)
go generateAudioFiles(targetFile, outputPath, &wg)

generateVideoFiles(targetFile, outputPath, watermark, &wg)
go generateVideoFiles(targetFile, outputPath, watermark, &wg)

generateThumbnailFiles(targetFile, outputPath, &wg)
go generateThumbnailFiles(targetFile, outputPath, &wg)

wg.Wait()

Expand Down Expand Up @@ -92,9 +92,9 @@ func generateMultiBitrateVideo(targetFile string, outputFile string, fileType co

err := getInput(targetFile, watermark).
Output(outputFile, ffmpeg.KwArgs{
constants.VideoKwargs[constants.Preset]: constants.FFmpegConfig[constants.Preset],
constants.VideoKwargs[constants.Tune]: constants.FFmpegConfig[constants.Tune],
constants.VideoKwargs[constants.FpsMode]: constants.FFmpegConfig[constants.FpsMode],
constants.VideoKwargs[constants.Preset]: constants.FFmpegConfig[constants.Preset],
constants.VideoKwargs[constants.Tune]: constants.FFmpegConfig[constants.Tune],
// constants.VideoKwargs[constants.FpsMode]: constants.FFmpegConfig[constants.FpsMode],
constants.VideoKwargs[constants.AudioExclusion]: constants.FFmpegConfig[constants.AudioExclusion],
constants.VideoKwargs[constants.VideoCodec]: constants.FFmpegConfig[constants.VideoCodec],
constants.VideoKwargs[constants.MaxRate]: constants.VideoBitrateMap[fileType],
Expand Down
29 changes: 15 additions & 14 deletions utils/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"log"
"net/url"
"os"
"path"
"path/filepath"
"time"
"zestream-server/configs"
Expand Down Expand Up @@ -103,26 +104,26 @@ func (a AwsUploader) Upload(walker fileWalk) {

uploader := s3manager.NewUploader(a.Session)

for path := range walker {
filename := filepath.Base(path)
for pathName := range walker {
filename := filepath.Base(pathName)

file, err := os.Open(path)
file, err := os.Open(pathName)
if err != nil {
log.Println("Failed opening file", path, err)
log.Println("Failed opening file", pathName, err)
continue
}

result, err := uploader.Upload(&s3manager.UploadInput{
Bucket: &bucket,
Key: aws.String(filepath.Join(a.ContainerName, a.VideoId, filename)),
Key: aws.String(path.Join(a.ContainerName, a.VideoId, filename)),
Body: file,
})

if err != nil {
file.Close()
log.Println("Failed to upload", path, err)
log.Println("Failed to upload", pathName, err)
}
log.Println("Uploaded", path, result.Location)
log.Println("Uploaded", pathName, result.Location)

if err := file.Close(); err != nil {
log.Println("Unable to close the file")
Expand All @@ -146,29 +147,29 @@ func (g *GcpUploader) Upload(walker fileWalk) {

bucket := g.Client.Bucket(bucketName)

for path := range walker {
filename := filepath.Base(path)
for pathName := range walker {
filename := filepath.Base(pathName)
fmt.Printf("Creating file /%v/%v\n", bucketName, filename)

wc := bucket.Object(filepath.Join(g.ContainerName, g.VideoId, filename)).NewWriter(ctx)
wc := bucket.Object(path.Join(g.ContainerName, g.VideoId, filename)).NewWriter(ctx)

blob, err := os.Open(path)
blob, err := os.Open(pathName)
if err != nil {
log.Println("Failed opening file", path, err)
log.Println("Failed opening file", pathName, err)
}

if _, err := io.Copy(wc, blob); err != nil {
//close the blob before error log
blob.Close()
log.Println("Failed to upload", path, err)
log.Println("Failed to upload", pathName, err)
}

if err := wc.Close(); err != nil {
//close the file before error log
blob.Close()
log.Println("unable to close the bucket", err)
} else {
log.Println("successfully uploaded ", path)
log.Println("successfully uploaded ", pathName)
}

if err := blob.Close(); err != nil {
Expand Down

0 comments on commit bf27349

Please sign in to comment.