From 72174321b953e60c10d67f1e7554930819b68c39 Mon Sep 17 00:00:00 2001 From: Sindy Li Date: Thu, 10 Oct 2024 15:31:57 -0700 Subject: [PATCH] Cleanup Consume() --- exporter/internal/queue/persistent_queue.go | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/exporter/internal/queue/persistent_queue.go b/exporter/internal/queue/persistent_queue.go index 61925875b3e..e378e41699c 100644 --- a/exporter/internal/queue/persistent_queue.go +++ b/exporter/internal/queue/persistent_queue.go @@ -189,20 +189,6 @@ func (pq *persistentQueue[T]) restoreQueueSizeFromStorage(ctx context.Context) ( return bytesToItemIndex(val) } -// Consume applies the provided function on the head of queue. -// The call blocks until there is an item available or the queue is stopped. -// The function returns true when an item is consumed or false if the queue is stopped. -func (pq *persistentQueue[T]) Consume(consumeFunc func(context.Context, T) error) bool { - index, _, req, ok := pq.Read(context.Background()) - if !ok { - return false - } - consumeErr := consumeFunc(context.Background(), req) - pq.OnProcessingFinished(index, consumeErr) - return true - -} - func (pq *persistentQueue[T]) Shutdown(ctx context.Context) error { // If the queue is not initialized, there is nothing to shut down. if pq.client == nil {