Skip to content

Commit

Permalink
Persist logs to scoped data path
Browse files Browse the repository at this point in the history
  • Loading branch information
streamer45 committed Dec 20, 2024
1 parent f19a56a commit 326d4bb
Show file tree
Hide file tree
Showing 7 changed files with 82 additions and 19 deletions.
2 changes: 1 addition & 1 deletion cmd/transcriber/call/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestReportJobFailure(t *testing.T) {
AuthToken: "qj75unbsef83ik9p7ueypb6iyw",
}
cfg.SetDefaults()
tr, err := NewTranscriber(cfg)
tr, err := NewTranscriber(cfg, GetDataDir(""))
require.NoError(t, err)
require.NotNil(t, tr)

Expand Down
4 changes: 2 additions & 2 deletions cmd/transcriber/call/tracks.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (t *Transcriber) processLiveTrack(track trackRemote, sessionID string) {
return
}
ctx.user = user
ctx.filename = filepath.Join(getDataDir(), fmt.Sprintf("%s_%s.ogg", user.Id, track.ID()))
ctx.filename = filepath.Join(t.dataPath, fmt.Sprintf("%s_%s.ogg", user.Id, track.ID()))

var prevArrivalTime time.Time
var prevRTPTimestamp uint32
Expand Down Expand Up @@ -535,7 +535,7 @@ func (t *Transcriber) newTrackTranscriber() (transcribe.Transcriber, error) {
return azure.NewSpeechRecognizer(azure.SpeechRecognizerConfig{
SpeechKey: speechKey,
SpeechRegion: speechRegion,
DataDir: getDataDir(),
DataDir: t.dataPath,
})
default:
return nil, fmt.Errorf("transcribe API %q not implemented", t.cfg.TranscribeAPI)
Expand Down
9 changes: 8 additions & 1 deletion cmd/transcriber/call/transcriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ type APIClient interface {
type Transcriber struct {
cfg config.CallTranscriberConfig

dataPath string

client *client.Client
apiClient APIClient
apiURL string
Expand All @@ -48,18 +50,23 @@ type Transcriber struct {
captionsPoolDoneCh chan struct{}
}

func NewTranscriber(cfg config.CallTranscriberConfig) (t *Transcriber, retErr error) {
func NewTranscriber(cfg config.CallTranscriberConfig, dataPath string) (t *Transcriber, retErr error) {
if err := cfg.IsValidURL(); err != nil {
return nil, fmt.Errorf("failed to validate URL: %w", err)
}

if dataPath == "" {
return nil, fmt.Errorf("dataPath should not be empty")
}

apiClient := model.NewAPIv4Client(cfg.SiteURL)
apiClient.SetToken(cfg.AuthToken)

t = &Transcriber{
cfg: cfg,
apiClient: apiClient,
apiURL: apiClient.URL,
dataPath: dataPath,
}

defer func() {
Expand Down
43 changes: 39 additions & 4 deletions cmd/transcriber/call/transcriber_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func setupTranscriberForTest(t *testing.T) *Transcriber {
ModelSize: config.ModelSizeTiny,
}
cfg.SetDefaults()
tr, err := NewTranscriber(cfg)
tr, err := NewTranscriber(cfg, GetDataDir(""))
require.NoError(t, err)
require.NotNil(t, tr)

Expand All @@ -61,6 +61,41 @@ func setupTranscriberForTest(t *testing.T) *Transcriber {
return tr
}

func TestNewTranscriber(t *testing.T) {
t.Run("invalid siteURL", func(t *testing.T) {
cfg := config.CallTranscriberConfig{
SiteURL: "invalid-url",
CallID: "8w8jorhr7j83uqr6y1st894hqe",
PostID: "udzdsg7dwidbzcidx5khrf8nee",
TranscriptionID: "67t5u6cmtfbb7jug739d43xa9e",
AuthToken: "qj75unbsef83ik9p7ueypb6iyw",
NumThreads: 1,
ModelSize: config.ModelSizeTiny,
}
cfg.SetDefaults()

tr, err := NewTranscriber(cfg, GetDataDir(""))
require.EqualError(t, err, "failed to validate URL: SiteURL parsing failed: invalid scheme \"\"")
require.Nil(t, tr)
})
t.Run("empty data path", func(t *testing.T) {
cfg := config.CallTranscriberConfig{
SiteURL: "http://localhost:8065",
CallID: "8w8jorhr7j83uqr6y1st894hqe",
PostID: "udzdsg7dwidbzcidx5khrf8nee",
TranscriptionID: "67t5u6cmtfbb7jug739d43xa9e",
AuthToken: "qj75unbsef83ik9p7ueypb6iyw",
NumThreads: 1,
ModelSize: config.ModelSizeTiny,
}
cfg.SetDefaults()

tr, err := NewTranscriber(cfg, "")
require.EqualError(t, err, "dataPath should not be empty")
require.Nil(t, tr)
})
}

func TestTranscribeTrack(t *testing.T) {
tr := setupTranscriberForTest(t)

Expand Down Expand Up @@ -192,7 +227,7 @@ func TestProcessLiveTrack(t *testing.T) {
close(tr.trackCtxs)
require.Len(t, tr.trackCtxs, 1)

trackFile, err := os.Open(filepath.Join(getDataDir(), fmt.Sprintf("userID_%s.ogg", track.id)))
trackFile, err := os.Open(filepath.Join(tr.dataPath, fmt.Sprintf("userID_%s.ogg", track.id)))
defer trackFile.Close()
require.NoError(t, err)

Expand Down Expand Up @@ -293,7 +328,7 @@ func TestProcessLiveTrack(t *testing.T) {
close(tr.trackCtxs)
require.Len(t, tr.trackCtxs, 1)

trackFile, err := os.Open(filepath.Join(getDataDir(), fmt.Sprintf("userID_%s.ogg", track.id)))
trackFile, err := os.Open(filepath.Join(tr.dataPath, fmt.Sprintf("userID_%s.ogg", track.id)))
defer trackFile.Close()
require.NoError(t, err)

Expand Down Expand Up @@ -393,7 +428,7 @@ func TestProcessLiveTrack(t *testing.T) {
close(tr.trackCtxs)
require.Len(t, tr.trackCtxs, 1)

trackFile, err := os.Open(filepath.Join(getDataDir(), fmt.Sprintf("userID_%s.ogg", track.id)))
trackFile, err := os.Open(filepath.Join(tr.dataPath, fmt.Sprintf("userID_%s.ogg", track.id)))
defer trackFile.Close()
require.NoError(t, err)

Expand Down
10 changes: 5 additions & 5 deletions cmd/transcriber/call/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,11 @@ func (t *Transcriber) getUserForSession(sessionID string) (*model.User, error) {
return nil, fmt.Errorf("failed to get user for call: max attempts reached")
}

func getDataDir() string {
func GetDataDir(jobID string) string {
if dir := os.Getenv("DATA_DIR"); dir != "" {
return dir
return filepath.Join(dir, jobID)
}
return dataDir
return filepath.Join(dataDir, jobID)
}

func getModelsDir() string {
Expand Down Expand Up @@ -102,12 +102,12 @@ func (t *Transcriber) publishTranscription(tr transcribe.Transcription) (err err
var vttFile *os.File
var textFile *os.File
openFiles := func() error {
vttFile, err = os.OpenFile(filepath.Join(getDataDir(), fname+".vtt"), os.O_RDWR|os.O_CREATE, 0600)
vttFile, err = os.OpenFile(filepath.Join(t.dataPath, fname+".vtt"), os.O_RDWR|os.O_CREATE, 0600)
if err != nil {
return fmt.Errorf("failed to open output file: %w", err)
}

textFile, err = os.OpenFile(filepath.Join(getDataDir(), fname+".txt"), os.O_RDWR|os.O_CREATE, 0600)
textFile, err = os.OpenFile(filepath.Join(t.dataPath, fname+".txt"), os.O_RDWR|os.O_CREATE, 0600)
if err != nil {
return fmt.Errorf("failed to open output file: %w", err)
}
Expand Down
6 changes: 3 additions & 3 deletions cmd/transcriber/call/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,11 @@ func TestPublishTranscriptions(t *testing.T) {
ModelSize: config.ModelSizeTiny,
}
cfg.SetDefaults()
tr, err := NewTranscriber(cfg)
tr, err := NewTranscriber(cfg, GetDataDir(""))
require.NoError(t, err)
require.NotNil(t, tr)

t.Run("failure to get filename", func(t *testing.T) {
t.Run("", func(t *testing.T) {
err := tr.publishTranscription(transcribe.Transcription{})
require.EqualError(t, err, "failed to get filename for call: failed to get filename: AppErrorFromJSON: model.utils.decode_json.app_error, body: 404 page not found\n, json: cannot unmarshal number into Go value of type model.AppError")
})
Expand All @@ -98,7 +98,7 @@ func TestPublishTranscriptions(t *testing.T) {
}

err := tr.publishTranscription(transcribe.Transcription{})
require.EqualError(t, err, fmt.Sprintf("failed to open output file: open %s: no such file or directory", filepath.Join(getDataDir(), "Call_Test.vtt")))
require.EqualError(t, err, fmt.Sprintf("failed to open output file: open %s: no such file or directory", filepath.Join(tr.dataPath, "Call_Test.vtt")))
})

vttFile, err := os.CreateTemp("", "Call_Test.vtt")
Expand Down
27 changes: 24 additions & 3 deletions cmd/transcriber/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"
"fmt"
"io"
"log/slog"
"os"
"os/signal"
Expand Down Expand Up @@ -39,11 +40,31 @@ func slogReplaceAttr(_ []string, a slog.Attr) slog.Attr {
}

func main() {
logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
trID := os.Getenv("TRANSCRIPTION_ID")

// Create scoped (by jobID) data path
dataPath := call.GetDataDir(trID)
err := os.MkdirAll(dataPath, 0700)
if err != nil {
slog.Error("failed to create data path", slog.String("err", err.Error()))
os.Exit(1)
}

logFile, err := os.Create(filepath.Join(dataPath, "transcriber.log"))
if err != nil {
slog.Error("failed to create log file", slog.String("err", err.Error()))
os.Exit(1)
}
defer logFile.Close()

// This lets us write logs simultaneously to console and file.
logWriter := io.MultiWriter(os.Stdout, logFile)

logger := slog.New(slog.NewTextHandler(logWriter, &slog.HandlerOptions{
AddSource: true,
Level: slog.LevelDebug,
ReplaceAttr: slogReplaceAttr,
})).With("trID", os.Getenv("TRANSCRIPTION_ID"))
})).With("trID", trID)
slog.SetDefault(logger)

pid := os.Getpid()
Expand All @@ -59,7 +80,7 @@ func main() {
}
cfg.SetDefaults()

transcriber, err := call.NewTranscriber(cfg)
transcriber, err := call.NewTranscriber(cfg, dataPath)
if err != nil {
slog.Error("failed to create call transcriber", slog.String("err", err.Error()))
os.Exit(1)
Expand Down

0 comments on commit 326d4bb

Please sign in to comment.