Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat] - Introduce Fatal/Non-Fatal File Handling Errors #3521

Merged
merged 12 commits into from
Nov 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions pkg/handlers/apk.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,16 @@ func newAPKHandler() *apkHandler {
}

// HandleFile processes apk formatted files.
// Fatal errors that will stop processing:
// - Unable to create ZIP reader from input
// - Unable to parse resources.arsc file
// - Panics during processing (recovered but returned as errors)
//
// Non-fatal errors that will be logged and continue processing:
// - Failed to process individual files within the APK
// - Failed to process resources.arsc contents
// - Failed to process individual dex classes
// - Failed to decode specific XML files
func (h *apkHandler) HandleFile(ctx logContext.Context, input fileReader) chan DataOrErr {
apkChan := make(chan DataOrErr, defaultBufferSize)

Expand Down
31 changes: 25 additions & 6 deletions pkg/handlers/ar.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,16 @@ func newARHandler() *arHandler {
return &arHandler{defaultHandler: newDefaultHandler(arHandlerType)}
}

// HandleFile processes AR formatted files. This function needs to be implemented to extract or
// manage data from AR files according to specific requirements.
// HandleFile processes AR formatted files and returns a channel of DataOrErr.
// Fatal errors that will terminate processing include:
// - Context cancellation
// - Context deadline exceeded
// - Errors loading the AR file
// - Panics during processing (recovered and returned as fatal errors)
//
// Non-fatal errors that will be logged but allow processing to continue include:
// - Errors creating mime-type readers for individual AR entries
// - Errors handling content within AR entries
func (h *arHandler) HandleFile(ctx logContext.Context, input fileReader) chan DataOrErr {
dataOrErrChan := make(chan DataOrErr, defaultBufferSize)

Expand All @@ -42,14 +50,18 @@ func (h *arHandler) HandleFile(ctx logContext.Context, input fileReader) chan Da
} else {
panicErr = fmt.Errorf("panic occurred: %v", r)
}
ctx.Logger().Error(panicErr, "Panic occurred when attempting to open ar archive")
dataOrErrChan <- DataOrErr{
Err: fmt.Errorf("%w: panic error: %v", ErrProcessingFatal, panicErr),
}
}
}()

start := time.Now()
arReader, err := deb.LoadAr(input)
if err != nil {
ctx.Logger().Error(err, "Error loading AR file")
dataOrErrChan <- DataOrErr{
Err: fmt.Errorf("%w: loading AR error: %v", ErrProcessingFatal, err),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we care that we "lose" the original err since we're wrapping the sentinel error?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We would still maintain the error string though correct? Do you think we should use errors.Join here instead?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We would still maintain the error string though correct?

Yeah

Do you think we should use errors.Join here instead?

I think it would have the same effect. It's probably better to wrap the sentinel, right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe so. We need to be able to use errors.Is to determine if it's a fatal error.

}
return
}

Expand Down Expand Up @@ -85,12 +97,19 @@ func (h *arHandler) processARFiles(ctx logContext.Context, reader *deb.Ar, dataO

rdr, err := newMimeTypeReader(arEntry.Data)
if err != nil {
return fmt.Errorf("error creating mime-type reader: %w", err)
dataOrErrChan <- DataOrErr{
Err: fmt.Errorf("%w: error creating AR mime-type reader: %v", ErrProcessingWarning, err),
}
h.metrics.incErrors()
continue
}

if err := h.handleNonArchiveContent(fileCtx, rdr, dataOrErrChan); err != nil {
fileCtx.Logger().Error(err, "error handling archive content in AR")
dataOrErrChan <- DataOrErr{
Err: fmt.Errorf("%w: error handling archive content in AR: %v", ErrProcessingWarning, err),
}
h.metrics.incErrors()
continue
}

h.metrics.incFilesProcessed()
Expand Down
25 changes: 20 additions & 5 deletions pkg/handlers/archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,23 @@ func newArchiveHandler() *archiveHandler {
return &archiveHandler{defaultHandler: newDefaultHandler(archiveHandlerType)}
}

// HandleFile processes the input as either an archive or non-archive based on its content,
// utilizing a single output channel. It first tries to identify the input as an archive. If it is an archive,
// it processes it accordingly; otherwise, it handles the input as non-archive content.
// The function returns a channel that will receive the extracted data bytes and an error if the initial setup fails.
// HandleFile processes archive files and returns a channel of DataOrErr.
//
// Fatal errors that will terminate processing include:
// - Context cancellation
// - Context deadline exceeded
// - Panics during archive processing (recovered and returned as fatal errors)
// - Maximum archive depth exceeded
// - Unknown archive formats
// - Errors opening decompressors
// - Errors creating readers for decompressed content
// - Errors during archive extraction
//
// Non-fatal errors that will be logged but allow processing to continue include:
// - Empty readers encountered during nested archive processing
// - Files exceeding maximum size limits
// - Files with ignored extensions or binary content
// - Errors opening individual files within archives
func (h *archiveHandler) HandleFile(ctx logContext.Context, input fileReader) chan DataOrErr {
dataOrErrChan := make(chan DataOrErr, defaultBufferSize)

Expand All @@ -66,7 +79,9 @@ func (h *archiveHandler) HandleFile(ctx logContext.Context, input fileReader) ch
} else {
panicErr = fmt.Errorf("panic occurred: %v", r)
}
ctx.Logger().Error(panicErr, "Panic occurred when attempting to open archive")
dataOrErrChan <- DataOrErr{
Err: fmt.Errorf("%w: panic error: %v", ErrProcessingFatal, panicErr),
}
}
}()

Expand Down
17 changes: 11 additions & 6 deletions pkg/handlers/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,15 @@ func newDefaultHandler(handlerType handlerType) *defaultHandler {
return &defaultHandler{metrics: newHandlerMetrics(handlerType)}
}

// HandleFile processes the input as either an archive or non-archive based on its content,
// utilizing a single output channel. It first tries to identify the input as an archive. If it is an archive,
// it processes it accordingly; otherwise, it handles the input as non-archive content.
// The function returns a channel that will receive the extracted data bytes and an error if the initial setup fails.
// HandleFile processes non-archive files.
//
// Fatal errors that will terminate processing include:
// - Context cancellation
// - Context deadline exceeded
// - Errors writing to the data channel
//
// Non-fatal errors that will be logged but allow processing to continue include:
// - Errors reading individual chunks from the input (wrapped as ErrProcessingWarning)
func (h *defaultHandler) HandleFile(ctx logContext.Context, input fileReader) chan DataOrErr {
// Shared channel for both archive and non-archive content.
dataOrErrChan := make(chan DataOrErr, defaultBufferSize)
Expand Down Expand Up @@ -106,9 +111,9 @@ func (h *defaultHandler) handleNonArchiveContent(
dataOrErr := DataOrErr{}
if err := data.Error(); err != nil {
h.metrics.incErrors()
dataOrErr.Err = fmt.Errorf("%w: error reading chunk", err)
dataOrErr.Err = fmt.Errorf("%w: error reading chunk: %v", ErrProcessingWarning, err)
if writeErr := common.CancellableWrite(ctx, dataOrErrChan, dataOrErr); writeErr != nil {
return fmt.Errorf("%w: error writing to data channel", writeErr)
return fmt.Errorf("%w: error writing to data channel: %v", ErrProcessingFatal, writeErr)
}
continue
}
Expand Down
45 changes: 39 additions & 6 deletions pkg/handlers/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package handlers
import (
"archive/zip"
"bufio"
"context"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -42,6 +43,17 @@ type fileReader struct {
*iobuf.BufferedReadSeeker
}

var (
ErrEmptyReader = errors.New("reader is empty")

// ErrProcessingFatal indicates a severe error that requires stopping the file processing.
ErrProcessingFatal = errors.New("fatal error processing file")

// ErrProcessingWarning indicates a recoverable error that can be logged,
// allowing processing to continue.
ErrProcessingWarning = errors.New("error processing file")
)

type readerConfig struct{ fileExtension string }

type readerOption func(*readerConfig)
Expand All @@ -50,8 +62,6 @@ func withFileExtension(ext string) readerOption {
return func(c *readerConfig) { c.fileExtension = ext }
}

var ErrEmptyReader = errors.New("reader is empty")

// mimeTypeReader wraps an io.Reader with MIME type information.
// This type is used to pass content through the processing pipeline
// while carrying its detected MIME type, avoiding redundant type detection.
Expand Down Expand Up @@ -389,12 +399,14 @@ func HandleFile(
// handleChunksWithError processes data and errors received from the dataErrChan channel.
// For each DataOrErr received:
// - If it contains data, the function creates a chunk based on chunkSkel and reports it through the reporter.
// - If it contains an error, the function logs the error.
// - If it contains an error, the function handles it based on severity:
// - Fatal errors (context cancellation, deadline exceeded, ErrProcessingFatal) cause immediate termination
// - Non-fatal errors (ErrProcessingWarning and others) are logged and processing continues
// The function also listens for context cancellation to gracefully terminate processing if the context is done.
// It returns nil upon successful processing of all data, or the first encountered error.
// It returns nil upon successful processing of all data, or the first encountered fatal error.
func handleChunksWithError(
ctx logContext.Context,
dataErrChan chan DataOrErr,
dataErrChan <-chan DataOrErr,
chunkSkel *sources.Chunk,
reporter sources.ChunkReporter,
) error {
Expand All @@ -407,7 +419,10 @@ func handleChunksWithError(
return nil
}
if dataOrErr.Err != nil {
ctx.Logger().Error(dataOrErr.Err, "error processing chunk")
if isFatal(dataOrErr.Err) {
return dataOrErr.Err
}
ctx.Logger().Error(dataOrErr.Err, "non-critical error processing chunk")
continue
}
if len(dataOrErr.Data) > 0 {
Expand All @@ -423,6 +438,24 @@ func handleChunksWithError(
}
}

// isFatal determines whether the given error is a fatal error that should
// terminate processing the current file, or a non-critical error that can be logged and ignored.
// "Fatal" errors include context cancellation, deadline exceeded, and the
// ErrProcessingFatal error. Non-fatal errors include the ErrProcessingWarning
// error as well as any other error that is not one of the fatal errors.
func isFatal(err error) bool {
switch {
case errors.Is(err, context.Canceled) ||
errors.Is(err, context.DeadlineExceeded) ||
errors.Is(err, ErrProcessingFatal):
return true
case errors.Is(err, ErrProcessingWarning):
return false
default:
return false
}
}

// getFileExtension extracts the file extension from the chunk's SourceMetadata.
// It considers all sources defined in the MetaData message.
// Note: Probably should add this as a method to the source_metadatapb object.
Expand Down
77 changes: 77 additions & 0 deletions pkg/handlers/handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package handlers
import (
"archive/zip"
"bytes"
stdctx "context"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -781,3 +782,79 @@ func getGitCommitHash(t *testing.T, gitDir string) string {
commitHash := strings.TrimSpace(string(hashBytes))
return commitHash
}

type mockReporter struct{ reportedChunks int }

func (m *mockReporter) ChunkOk(context.Context, sources.Chunk) error {
m.reportedChunks++
return nil
}

func (m *mockReporter) ChunkErr(context.Context, error) error { return nil }

func TestHandleChunksWithError(t *testing.T) {
tests := []struct {
name string
input []DataOrErr
expectedErr error
expectedReportedChunks int
}{
{
name: "Non-Critical Error",
input: []DataOrErr{{Err: ErrProcessingWarning}},
},
{
name: "Critical Error",
input: []DataOrErr{{Err: ErrProcessingFatal}},
expectedErr: ErrProcessingFatal,
},
{
name: "No Error",
input: []DataOrErr{
{Data: []byte("test data")},
{Data: []byte("more data")},
},
expectedReportedChunks: 2,
},
{
name: "Context Canceled",
input: []DataOrErr{{Err: stdctx.Canceled}},
expectedErr: stdctx.Canceled,
},
{
name: "Context Deadline Exceeded",
input: []DataOrErr{{Err: stdctx.DeadlineExceeded}},
expectedErr: stdctx.DeadlineExceeded,
},
{
name: "EOF Error",
input: []DataOrErr{{Err: io.EOF}},
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()

ctx := context.Background()
chunkSkel := &sources.Chunk{}
reporter := new(mockReporter)

dataErrChan := make(chan DataOrErr, len(tc.input))
for _, de := range tc.input {
dataErrChan <- de
}
close(dataErrChan)

err := handleChunksWithError(ctx, dataErrChan, chunkSkel, reporter)

if tc.expectedErr != nil {
assert.ErrorIs(t, err, tc.expectedErr, "handleChunksWithError should return the expected error")
} else {
assert.NoError(t, err, "handleChunksWithError should not return an error for non-critical errors")
}

assert.Equal(t, tc.expectedReportedChunks, reporter.reportedChunks, "should have reported the expected number of chunks")
})
}
}
31 changes: 25 additions & 6 deletions pkg/handlers/rpm.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,19 @@ func newRPMHandler() *rpmHandler {
return &rpmHandler{defaultHandler: newDefaultHandler(rpmHandlerType)}
}

// HandleFile processes RPM formatted files. Further implementation is required to appropriately
// handle RPM specific archive operations.
// HandleFile processes RPM formatted files.
// It returns a channel of DataOrErr that will receive either file data
// or errors encountered during processing.
//
// Fatal errors that will terminate processing include:
// - Context cancellation or deadline exceeded
// - Errors reading or uncompressing the RPM file
// - Panics during processing (wrapped as ErrProcessingFatal)
//
// Non-fatal errors that will be reported but allow processing to continue include:
// - Errors processing individual files within the RPM archive (wrapped as ErrProcessingWarning)
//
// The handler will skip processing entirely if ForceSkipArchives is enabled.
func (h *rpmHandler) HandleFile(ctx logContext.Context, input fileReader) chan DataOrErr {
dataOrErrChan := make(chan DataOrErr, defaultBufferSize)

Expand All @@ -42,20 +53,26 @@ func (h *rpmHandler) HandleFile(ctx logContext.Context, input fileReader) chan D
} else {
panicErr = fmt.Errorf("panic occurred: %v", r)
}
ctx.Logger().Error(panicErr, "Panic occurred when attempting to open rpm archive")
dataOrErrChan <- DataOrErr{
Err: fmt.Errorf("%w: panic error: %v", ErrProcessingFatal, panicErr),
}
}
}()

start := time.Now()
rpm, err := rpmutils.ReadRpm(input)
if err != nil {
ctx.Logger().Error(err, "error reading rpm file")
dataOrErrChan <- DataOrErr{
Err: fmt.Errorf("%w: reading rpm error: %v", ErrProcessingFatal, err),
}
return
}

reader, err := rpm.PayloadReaderExtended()
if err != nil {
ctx.Logger().Error(err, "error reading rpm payload")
dataOrErrChan <- DataOrErr{
Err: fmt.Errorf("%w: uncompressing rpm error: %v", ErrProcessingFatal, err),
}
return
}

Expand Down Expand Up @@ -99,7 +116,9 @@ func (h *rpmHandler) processRPMFiles(
}

if err := h.handleNonArchiveContent(fileCtx, rdr, dataOrErrChan); err != nil {
fileCtx.Logger().Error(err, "error handling archive content in RPM")
dataOrErrChan <- DataOrErr{
Err: fmt.Errorf("%w: error processing RPM archive: %v", ErrProcessingWarning, err),
}
h.metrics.incErrors()
}

Expand Down
Loading