Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support raising consumer memory limit; return error when something's wrong #30

Merged
merged 9 commits into from
Aug 31, 2023
133 changes: 74 additions & 59 deletions api/experimental/arrow/v1/arrow_service.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,10 @@ func statusOKFor(id int64) *arrowpb.BatchStatus {
}
}

func statusStreamShutdownFor(id int64) *arrowpb.BatchStatus {
func statusCanceledFor(id int64) *arrowpb.BatchStatus {
return &arrowpb.BatchStatus{
BatchId: id,
StatusCode: arrowpb.StatusCode_STREAM_SHUTDOWN,
StatusCode: arrowpb.StatusCode_CANCELED,
}
}

Expand Down
36 changes: 24 additions & 12 deletions collector/exporter/otelarrowexporter/internal/arrow/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,9 @@ func (s *Stream) run(bgctx context.Context, streamClient StreamClientFunc, grpcO
// production); in both cases "NO_ERROR" is the key
// signifier.
if strings.Contains(status.Message(), "NO_ERROR") {
s.telemetry.Logger.Debug("arrow stream shutdown")
s.telemetry.Logger.Error("arrow stream reset (consider lowering max_stream_lifetime)",
zap.String("message", status.Message()),
)
} else {
s.telemetry.Logger.Error("arrow stream unavailable",
zap.String("message", status.Message()),
Expand Down Expand Up @@ -362,7 +364,7 @@ func (s *Stream) read(_ context.Context) error {
// This indicates the server received EOF from client shutdown.
// This is not an error because this is an expected shutdown
// initiated by the client by setting max_stream_lifetime.
if resp.StatusCode == arrowpb.StatusCode_STREAM_SHUTDOWN {
if resp.StatusCode == arrowpb.StatusCode_CANCELED {
return nil
}

Expand Down Expand Up @@ -392,32 +394,42 @@ func (s *Stream) getSenderChannels(status *arrowpb.BatchStatus) (chan error, err

// processBatchStatus processes a single response from the server and unblocks the
// associated sender.
func (s *Stream) processBatchStatus(status *arrowpb.BatchStatus) error {
ch, ret := s.getSenderChannels(status)
func (s *Stream) processBatchStatus(ss *arrowpb.BatchStatus) error {
ch, ret := s.getSenderChannels(ss)

if ch == nil {
// In case getSenderChannels encounters a problem, the
// channel is nil.
return ret
}

if status.StatusCode == arrowpb.StatusCode_OK {
if ss.StatusCode == arrowpb.StatusCode_OK {
ch <- nil
return nil
}
// See ../../otlp.go's `shouldRetry()` method, the retry
// behavior described here is achieved there by setting these
// recognized codes.
var err error
switch status.StatusCode {
switch ss.StatusCode {
case arrowpb.StatusCode_UNAVAILABLE:
err = fmt.Errorf("destination unavailable: %d: %s", status.BatchId, status.StatusMessage)
// Retryable
err = status.Errorf(codes.Unavailable, "destination unavailable: %d: %s", ss.BatchId, ss.StatusMessage)
case arrowpb.StatusCode_INVALID_ARGUMENT:
err = consumererror.NewPermanent(
fmt.Errorf("invalid argument: %d: %s", status.BatchId, status.StatusMessage))
// Not retryable
err = status.Errorf(codes.InvalidArgument, "invalid argument: %d: %s", ss.BatchId, ss.StatusMessage)
case arrowpb.StatusCode_RESOURCE_EXHAUSTED:
// Retry behavior is configurable
err = status.Errorf(codes.ResourceExhausted, "resource exhausted: %d: %s", ss.BatchId, ss.StatusMessage)
default:
base := fmt.Errorf("unexpected stream response: %d: %s", status.BatchId, status.StatusMessage)
err = consumererror.NewPermanent(base)
// Note: case arrowpb.StatusCode_CANCELED (a.k.a. codes.Canceled)
// is handled before calling processBatchStatus().

// Unrecognized status code.
err = status.Errorf(codes.Internal, "unexpected stream response: %d: %s", ss.BatchId, ss.StatusMessage)

// Will break the stream.
ret = multierr.Append(ret, base)
ret = multierr.Append(ret, err)
}
ch <- err
return ret
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func TestStreamGracefulShutdown(t *testing.T) {
// mimick the server which will send a batchID
// of 0 after max_stream_lifetime elapses.
time.Sleep(maxStreamLifetime)
channel.recv <- statusStreamShutdownFor(0)
channel.recv <- statusCanceledFor(0)
}()

err := tc.get().SendAndWait(tc.bgctx, twoTraces)
Expand Down
6 changes: 5 additions & 1 deletion collector/exporter/otelarrowexporter/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,11 @@ func (e *baseExporter) arrowSendAndWait(ctx context.Context, data interface{}) (
if e.arrow == nil {
return false, nil
}
return e.arrow.SendAndWait(ctx, data)
sent, err := e.arrow.SendAndWait(ctx, data)
if err != nil {
return sent, processError(err)
}
return sent, nil
}

func (e *baseExporter) pushTraces(ctx context.Context, td ptrace.Traces) error {
Expand Down
6 changes: 5 additions & 1 deletion collector/receiver/otelarrowreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,12 @@ type Protocols struct {
Arrow *ArrowSettings `mapstructure:"arrow"`
}

// ArrowSettings support disabling the Arrow receiver.
// ArrowSettings support configuring the Arrow receiver.
type ArrowSettings struct {
// MemoryLimit is the size of a shared memory region used by
// all Arrow streams. When too much load is passing through, they
// will see ResourceExhausted errors.
MemoryLimit uint64
}

// Config defines configuration for OTel Arrow receiver.
Expand Down
8 changes: 6 additions & 2 deletions collector/receiver/otelarrowreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,9 @@ func TestUnmarshalConfig(t *testing.T) {
MetricsURLPath: "/v2/metrics",
LogsURLPath: "/log/ingest",
},
Arrow: &ArrowSettings{},
Arrow: &ArrowSettings{
MemoryLimit: defaultMemoryLimit,
},
},
}, cfg)

Expand Down Expand Up @@ -161,7 +163,9 @@ func TestUnmarshalConfigUnix(t *testing.T) {
MetricsURLPath: defaultMetricsURLPath,
LogsURLPath: defaultLogsURLPath,
},
Arrow: &ArrowSettings{},
Arrow: &ArrowSettings{
MemoryLimit: defaultMemoryLimit,
},
},
}, cfg)
}
Expand Down
6 changes: 5 additions & 1 deletion collector/receiver/otelarrowreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ const (
defaultTracesURLPath = "/v1/traces"
defaultMetricsURLPath = "/v1/metrics"
defaultLogsURLPath = "/v1/logs"

defaultMemoryLimit = 128 << 20 // 128MB
)

// NewFactory creates a new OTLP receiver factory.
Expand Down Expand Up @@ -56,7 +58,9 @@ func createDefaultConfig() component.Config {
MetricsURLPath: defaultMetricsURLPath,
LogsURLPath: defaultLogsURLPath,
},
Arrow: &ArrowSettings{},
Arrow: &ArrowSettings{
MemoryLimit: defaultMemoryLimit,
},
},
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ func (r *Receiver) anyStream(serverStream anyStreamServer) (retErr error) {
// client called CloseSend()
if err == io.EOF {
status := &arrowpb.BatchStatus{}
status.StatusCode = arrowpb.StatusCode_STREAM_SHUTDOWN
status.StatusCode = arrowpb.StatusCode_CANCELED
err = serverStream.Send(status)
if err != nil {
r.logStreamError(err)
Expand Down Expand Up @@ -349,8 +349,10 @@ func (r *Receiver) anyStream(serverStream anyStreamServer) (retErr error) {
status.StatusCode = arrowpb.StatusCode_OK
} else {
status.StatusMessage = err.Error()

if consumererror.IsPermanent(err) {
if errors.Is(err, arrowRecord.ErrConsumerMemoryLimit) {
r.telemetry.Logger.Error("arrow resource exhausted", zap.Error(err))
status.StatusCode = arrowpb.StatusCode_RESOURCE_EXHAUSTED
} else if consumererror.IsPermanent(err) {
r.telemetry.Logger.Error("arrow data error", zap.Error(err))
status.StatusCode = arrowpb.StatusCode_INVALID_ARGUMENT
} else {
Expand Down
7 changes: 6 additions & 1 deletion collector/receiver/otelarrowreceiver/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,12 @@ func (r *otlpReceiver) startProtocolServers(host component.Host) error {
}

r.arrowReceiver = arrow.New(arrow.Consumers(r), r.settings, r.obsrepGRPC, r.cfg.GRPC, authServer, func() arrowRecord.ConsumerAPI {
return arrowRecord.NewConsumer()
var opts []arrowRecord.Option
if r.cfg.Arrow.MemoryLimit != 0 {
// in which case the default is selected in the arrowRecord package.
opts = append(opts, arrowRecord.WithMemoryLimit(r.cfg.Arrow.MemoryLimit))
}
return arrowRecord.NewConsumer(opts...)
})

arrowpb.RegisterArrowStreamServiceServer(r.serverGRPC, r.arrowReceiver)
Expand Down
Loading