diff --git a/.circleci/config.yml b/.circleci/config.yml index 554dfabbbc..792840db5b 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -145,7 +145,7 @@ version: 2.1 orbs: npm-publisher: uraway/npm-publisher@0.2.0 - kurtosis-docs-checker: kurtosis-tech/docs-checker@0.2.7 + kurtosis-docs-checker: kurtosis-tech/docs-checker@0.2.9 slack: circleci/slack@4.10.1 executors: @@ -1475,17 +1475,6 @@ workflows: name: "Check if CLI builds for all os and arch pairs" <<: *filters_ignore_main - - test_enclave_manager_web_ui: - name: "Test Basic Web UI Functionality in Docker" - context: - - docker-user - requires: - - build_cli - - build_api_container_server - - build_engine_server - - build_files_artifacts_expander - <<: *filters_ignore_main - - test_basic_cli_functionality: name: "Test Basic CLI Functionality in Docker" cli-cluster-backend: "docker" diff --git a/README.md b/README.md index a2a01ce152..26857129dc 100644 --- a/README.md +++ b/README.md @@ -38,7 +38,7 @@ Because of this additional layer of abstraction, we are able to introduce severa How do I get going? =================== -To see Kurtosis in action, first install it using the instructions [here](https://docs.kurtosis.com/install) or visit [Kurtosis Cloud](https://cloud.kurtosis.com/) to provision a remote host. +To see Kurtosis in action, first install it using the instructions [here](https://docs.kurtosis.com/install). Then, run the [Redis voting app Kurtosis package](https://github.com/kurtosis-tech/awesome-kurtosis/tree/main/redis-voting-app): diff --git a/engine/server/engine/centralized_logs/client_implementations/persistent_volume/persistent_volume_logs_database_client.go b/engine/server/engine/centralized_logs/client_implementations/persistent_volume/persistent_volume_logs_database_client.go index 46c00f00d2..6880e8db3e 100644 --- a/engine/server/engine/centralized_logs/client_implementations/persistent_volume/persistent_volume_logs_database_client.go +++ b/engine/server/engine/centralized_logs/client_implementations/persistent_volume/persistent_volume_logs_database_client.go @@ -63,7 +63,8 @@ func (client *persistentVolumeLogsDatabaseClient) StreamUserServiceLogs( streamErrChan := make(chan error) // this channel will return the user service log lines by service UUID - logsByKurtosisUserServiceUuidChan := make(chan map[service.ServiceUUID][]logline.LogLine) + logLineSender := logline.NewLogLineSender() + logsByKurtosisUserServiceUuidChan := logLineSender.GetLogsChannel() wgSenders := &sync.WaitGroup{} for serviceUuid := range userServiceUuids { @@ -71,7 +72,7 @@ func (client *persistentVolumeLogsDatabaseClient) StreamUserServiceLogs( go client.streamServiceLogLines( ctx, wgSenders, - logsByKurtosisUserServiceUuidChan, + logLineSender, streamErrChan, enclaveUuid, serviceUuid, @@ -87,7 +88,11 @@ func (client *persistentVolumeLogsDatabaseClient) StreamUserServiceLogs( //wait for stream go routine to end wgSenders.Wait() - close(logsByKurtosisUserServiceUuidChan) + // send all buffered log lines + logLineSender.Flush() + + // wait until the channel has been fully read/empty before closing it + closeChannelWhenEmpty(logsByKurtosisUserServiceUuidChan) close(streamErrChan) //then cancel the context @@ -130,7 +135,7 @@ func (client *persistentVolumeLogsDatabaseClient) FilterExistingServiceUuids( func (client *persistentVolumeLogsDatabaseClient) streamServiceLogLines( ctx context.Context, wgSenders *sync.WaitGroup, - logsByKurtosisUserServiceUuidChan chan map[service.ServiceUUID][]logline.LogLine, + logLineSender *logline.LogLineSender, streamErrChan chan error, enclaveUuid enclave.EnclaveUUID, serviceUuid service.ServiceUUID, @@ -143,7 +148,7 @@ func (client *persistentVolumeLogsDatabaseClient) streamServiceLogLines( client.streamStrategy.StreamLogs( ctx, client.filesystem, - logsByKurtosisUserServiceUuidChan, + logLineSender, streamErrChan, enclaveUuid, serviceUuid, @@ -152,3 +157,12 @@ func (client *persistentVolumeLogsDatabaseClient) streamServiceLogLines( shouldReturnAllLogs, numLogLines) } + +func closeChannelWhenEmpty(logsChan chan map[service.ServiceUUID][]logline.LogLine) { + for { + if len(logsChan) == 0 { + close(logsChan) + return + } + } +} diff --git a/engine/server/engine/centralized_logs/client_implementations/persistent_volume/stream_logs_strategy/per_file_stream_logs_strategy.go b/engine/server/engine/centralized_logs/client_implementations/persistent_volume/stream_logs_strategy/per_file_stream_logs_strategy.go index 7fa08c1594..b322e8c214 100644 --- a/engine/server/engine/centralized_logs/client_implementations/persistent_volume/stream_logs_strategy/per_file_stream_logs_strategy.go +++ b/engine/server/engine/centralized_logs/client_implementations/persistent_volume/stream_logs_strategy/per_file_stream_logs_strategy.go @@ -30,7 +30,7 @@ type JsonLog map[string]string func (strategy *PerFileStreamLogsStrategy) StreamLogs( ctx context.Context, fs volume_filesystem.VolumeFilesystem, - logsByKurtosisUserServiceUuidChan chan map[service.ServiceUUID][]logline.LogLine, + logLineSender *logline.LogLineSender, streamErrChan chan error, enclaveUuid enclave.EnclaveUUID, serviceUuid service.ServiceUUID, @@ -122,12 +122,7 @@ func (strategy *PerFileStreamLogsStrategy) StreamLogs( break } - // send the log line - logLines := []logline.LogLine{*logLine} - userServicesLogLinesMap := map[service.ServiceUUID][]logline.LogLine{ - serviceUuid: logLines, - } - logsByKurtosisUserServiceUuidChan <- userServicesLogLinesMap + logLineSender.Send(serviceUuid, *logLine) } } } diff --git a/engine/server/engine/centralized_logs/client_implementations/persistent_volume/stream_logs_strategy/per_week_stream_logs_strategy.go b/engine/server/engine/centralized_logs/client_implementations/persistent_volume/stream_logs_strategy/per_week_stream_logs_strategy.go index 755df67123..1717721392 100644 --- a/engine/server/engine/centralized_logs/client_implementations/persistent_volume/stream_logs_strategy/per_week_stream_logs_strategy.go +++ b/engine/server/engine/centralized_logs/client_implementations/persistent_volume/stream_logs_strategy/per_week_stream_logs_strategy.go @@ -48,7 +48,7 @@ func NewPerWeekStreamLogsStrategy(time logs_clock.LogsClock, logRetentionPeriodI func (strategy *PerWeekStreamLogsStrategy) StreamLogs( ctx context.Context, fs volume_filesystem.VolumeFilesystem, - logsByKurtosisUserServiceUuidChan chan map[service.ServiceUUID][]logline.LogLine, + logLineSender *logline.LogLineSender, streamErrChan chan error, enclaveUuid enclave.EnclaveUUID, serviceUuid service.ServiceUUID, @@ -89,24 +89,26 @@ func (strategy *PerWeekStreamLogsStrategy) StreamLogs( }() if shouldReturnAllLogs { - if err := strategy.streamAllLogs(ctx, logsReader, logsByKurtosisUserServiceUuidChan, serviceUuid, conjunctiveLogLinesFiltersWithRegex); err != nil { + if err := strategy.streamAllLogs(ctx, logsReader, logLineSender, serviceUuid, conjunctiveLogLinesFiltersWithRegex); err != nil { streamErrChan <- stacktrace.Propagate(err, "An error occurred streaming all logs for service '%v' in enclave '%v'", serviceUuid, enclaveUuid) return } } else { - if err := strategy.streamTailLogs(ctx, logsReader, numLogLines, logsByKurtosisUserServiceUuidChan, serviceUuid, conjunctiveLogLinesFiltersWithRegex); err != nil { + if err := strategy.streamTailLogs(ctx, logsReader, numLogLines, logLineSender, serviceUuid, conjunctiveLogLinesFiltersWithRegex); err != nil { streamErrChan <- stacktrace.Propagate(err, "An error occurred streaming '%v' logs for service '%v' in enclave '%v'", numLogLines, serviceUuid, enclaveUuid) return } } + // need to flush before following logs + logLineSender.Flush() if shouldFollowLogs { latestLogFile := paths[len(paths)-1] - if err := strategy.followLogs(ctx, latestLogFile, logsByKurtosisUserServiceUuidChan, serviceUuid, conjunctiveLogLinesFiltersWithRegex); err != nil { + logrus.Debugf("Following logs...") + if err := strategy.followLogs(ctx, latestLogFile, logLineSender, serviceUuid, conjunctiveLogLinesFiltersWithRegex); err != nil { streamErrChan <- stacktrace.Propagate(err, "An error occurred creating following logs for service '%v' in enclave '%v'", serviceUuid, enclaveUuid) return } - logrus.Debugf("Following logs...") } } @@ -180,7 +182,7 @@ func getLogsReader(filesystem volume_filesystem.VolumeFilesystem, logFilePaths [ func (strategy *PerWeekStreamLogsStrategy) streamAllLogs( ctx context.Context, logsReader *bufio.Reader, - logsByKurtosisUserServiceUuidChan chan map[service.ServiceUUID][]logline.LogLine, + logLineSender *logline.LogLineSender, serviceUuid service.ServiceUUID, conjunctiveLogLinesFiltersWithRegex []logline.LogLineFilterWithRegex) error { for { @@ -190,12 +192,14 @@ func (strategy *PerWeekStreamLogsStrategy) streamAllLogs( return nil default: jsonLogStr, err := getCompleteJsonLogString(logsReader) + if isValidJsonEnding(jsonLogStr) { jsonLog, err := convertStringToJson(jsonLogStr) if err != nil { return stacktrace.Propagate(err, "An error occurred converting the json log string '%v' into json.", jsonLogStr) } - if err = strategy.sendJsonLogLine(jsonLog, logsByKurtosisUserServiceUuidChan, serviceUuid, conjunctiveLogLinesFiltersWithRegex); err != nil { + + if err = strategy.sendJsonLogLine(jsonLog, conjunctiveLogLinesFiltersWithRegex, logLineSender, serviceUuid); err != nil { return err } } @@ -217,7 +221,7 @@ func (strategy *PerWeekStreamLogsStrategy) streamTailLogs( ctx context.Context, logsReader *bufio.Reader, numLogLines uint32, - logsByKurtosisUserServiceUuidChan chan map[service.ServiceUUID][]logline.LogLine, + logLineSender *logline.LogLineSender, serviceUuid service.ServiceUUID, conjunctiveLogLinesFiltersWithRegex []logline.LogLineFilterWithRegex) error { tailLogLines := make([]string, 0, numLogLines) @@ -255,7 +259,7 @@ func (strategy *PerWeekStreamLogsStrategy) streamTailLogs( if err != nil { return stacktrace.Propagate(err, "An error occurred converting the json log string '%v' into json.", jsonLogStr) } - if err := strategy.sendJsonLogLine(jsonLog, logsByKurtosisUserServiceUuidChan, serviceUuid, conjunctiveLogLinesFiltersWithRegex); err != nil { + if err = strategy.sendJsonLogLine(jsonLog, conjunctiveLogLinesFiltersWithRegex, logLineSender, serviceUuid); err != nil { return err } } @@ -298,11 +302,7 @@ func isValidJsonEnding(line string) bool { return endOfLine == volume_consts.EndOfJsonLine } -func (strategy *PerWeekStreamLogsStrategy) sendJsonLogLine( - jsonLog JsonLog, - logsByKurtosisUserServiceUuidChan chan map[service.ServiceUUID][]logline.LogLine, - serviceUuid service.ServiceUUID, - conjunctiveLogLinesFiltersWithRegex []logline.LogLineFilterWithRegex) error { +func (strategy *PerWeekStreamLogsStrategy) sendJsonLogLine(jsonLog JsonLog, conjunctiveLogLinesFiltersWithRegex []logline.LogLineFilterWithRegex, logLineSender *logline.LogLineSender, serviceUuid service.ServiceUUID) error { // each logLineStr is of the following structure: {"enclave_uuid": "...", "service_uuid":"...", "log": "...",.. "timestamp":"..."} // eg. {"container_type":"api-container", "container_id":"8f8558ba", "container_name":"/kurtosis-api--ffd", // "log":"hi","timestamp":"2023-08-14T14:57:49Z"} @@ -338,12 +338,7 @@ func (strategy *PerWeekStreamLogsStrategy) sendJsonLogLine( return nil } - // send the log line - logLines := []logline.LogLine{*logLine} - userServicesLogLinesMap := map[service.ServiceUUID][]logline.LogLine{ - serviceUuid: logLines, - } - logsByKurtosisUserServiceUuidChan <- userServicesLogLinesMap + logLineSender.Send(serviceUuid, *logLine) return nil } @@ -358,7 +353,7 @@ func (strategy *PerWeekStreamLogsStrategy) isWithinRetentionPeriod(logLine *logl func (strategy *PerWeekStreamLogsStrategy) followLogs( ctx context.Context, filepath string, - logsByKurtosisUserServiceUuidChan chan map[service.ServiceUUID][]logline.LogLine, + logLineSender *logline.LogLineSender, serviceUuid service.ServiceUUID, conjunctiveLogLinesFiltersWithRegex []logline.LogLineFilterWithRegex, ) error { @@ -399,8 +394,7 @@ func (strategy *PerWeekStreamLogsStrategy) followLogs( // if tail package fails to parse a valid new line, fail fast return stacktrace.NewError("hpcloud/tail returned the following line: '%v' that was not valid json.\nThis is potentially a bug in tailing package.", logLine.Text) } - err = strategy.sendJsonLogLine(jsonLog, logsByKurtosisUserServiceUuidChan, serviceUuid, conjunctiveLogLinesFiltersWithRegex) - if err != nil { + if err = strategy.sendJsonLogLine(jsonLog, conjunctiveLogLinesFiltersWithRegex, logLineSender, serviceUuid); err != nil { return stacktrace.Propagate(err, "An error occurred sending json log line '%v'.", logLine.Text) } } diff --git a/engine/server/engine/centralized_logs/client_implementations/persistent_volume/stream_logs_strategy/stream_logs_strategy.go b/engine/server/engine/centralized_logs/client_implementations/persistent_volume/stream_logs_strategy/stream_logs_strategy.go index c8fa215b30..af00ed5646 100644 --- a/engine/server/engine/centralized_logs/client_implementations/persistent_volume/stream_logs_strategy/stream_logs_strategy.go +++ b/engine/server/engine/centralized_logs/client_implementations/persistent_volume/stream_logs_strategy/stream_logs_strategy.go @@ -15,7 +15,7 @@ type StreamLogsStrategy interface { StreamLogs( ctx context.Context, fs volume_filesystem.VolumeFilesystem, - logsByKurtosisUserServiceUuidChan chan map[service.ServiceUUID][]logline.LogLine, + logLineSender *logline.LogLineSender, streamErrChan chan error, enclaveUuid enclave.EnclaveUUID, serviceUuid service.ServiceUUID, diff --git a/engine/server/engine/centralized_logs/logline/logline_sender.go b/engine/server/engine/centralized_logs/logline/logline_sender.go new file mode 100644 index 0000000000..0e76510dac --- /dev/null +++ b/engine/server/engine/centralized_logs/logline/logline_sender.go @@ -0,0 +1,65 @@ +package logline + +import ( + "github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_interface/objects/service" + "sync" +) + +const ( + batchLogsAmount = 500 + logsChanBufferSize = 300 +) + +type LogLineSender struct { + logsChan chan map[service.ServiceUUID][]LogLine + + logLineBuffer map[service.ServiceUUID][]LogLine + + mu sync.Mutex +} + +func NewLogLineSender() *LogLineSender { + return &LogLineSender{ + logsChan: make(chan map[service.ServiceUUID][]LogLine, logsChanBufferSize), + logLineBuffer: map[service.ServiceUUID][]LogLine{}, + mu: sync.Mutex{}, + } +} + +func (sender *LogLineSender) Send(serviceUuid service.ServiceUUID, logLine LogLine) { + sender.mu.Lock() + defer sender.mu.Unlock() + + sender.logLineBuffer[serviceUuid] = append(sender.logLineBuffer[serviceUuid], logLine) + + if len(sender.logLineBuffer[serviceUuid])%batchLogsAmount == 0 { + userServicesLogLinesMap := map[service.ServiceUUID][]LogLine{ + serviceUuid: sender.logLineBuffer[serviceUuid], + } + sender.logsChan <- userServicesLogLinesMap + + // clear buffer after flushing it through the channel + sender.logLineBuffer[serviceUuid] = []LogLine{} + } +} + +func (sender *LogLineSender) GetLogsChannel() chan map[service.ServiceUUID][]LogLine { + return sender.logsChan +} + +// sends all logs remaining in the buffers through the channel +// this should be called at the end of processing to send the remainder of logs +func (sender *LogLineSender) Flush() { + sender.mu.Lock() + defer sender.mu.Unlock() + + for uuid, logLines := range sender.logLineBuffer { + serviceUuid := uuid + userServiceLogLinesMap := map[service.ServiceUUID][]LogLine{ + serviceUuid: logLines, + } + sender.logsChan <- userServiceLogLinesMap + + sender.logLineBuffer[serviceUuid] = []LogLine{} + } +} diff --git a/engine/server/go.mod b/engine/server/go.mod index f6548f984e..4660cedcc6 100644 --- a/engine/server/go.mod +++ b/engine/server/go.mod @@ -63,7 +63,7 @@ require ( github.com/kurtosis-tech/kurtosis/grpc-file-transfer/golang v0.0.0-20230803130419-099ee7a4e3dc github.com/kurtosis-tech/kurtosis/metrics-library/golang v0.0.0-20231206095907-9bdf0d02cb90 github.com/labstack/echo/v4 v4.11.3 - github.com/rs/cors v1.9.0 + github.com/rs/cors v1.11.0 github.com/spf13/afero v1.10.0 golang.org/x/exp v0.0.0-20230905200255-921286631fa9 k8s.io/apimachinery v0.27.2 diff --git a/engine/server/go.sum b/engine/server/go.sum index b7fd1e32d1..c027f4c074 100644 --- a/engine/server/go.sum +++ b/engine/server/go.sum @@ -312,8 +312,8 @@ github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1: github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M= -github.com/rs/cors v1.9.0 h1:l9HGsTsHJcvW14Nk7J9KFz8bzeAWXn3CG6bgt7LsrAE= -github.com/rs/cors v1.9.0/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU= +github.com/rs/cors v1.11.0 h1:0B9GE/r9Bc2UxRMMtymBkHTenPkHDv0CW4Y98GBY+po= +github.com/rs/cors v1.11.0/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU= github.com/segmentio/backo-go v1.0.0 h1:kbOAtGJY2DqOR0jfRkYEorx/b18RgtepGtY3+Cpe6qA= github.com/segmentio/backo-go v1.0.0/go.mod h1:kJ9mm9YmoWSkk+oQ+5Cj8DEoRCX2JT6As4kEtIIOp1M= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=