Skip to content

Commit

Permalink
[PLAT-118505] fix receive write metrics
Browse files Browse the repository at this point in the history
Signed-off-by: Yi Jin <yi.jin@databricks.com>
  • Loading branch information
jnyi committed Sep 17, 2024
1 parent 30e2550 commit 4616f29
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 9 deletions.
12 changes: 6 additions & 6 deletions pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -711,11 +711,11 @@ type remoteWriteParams struct {
alreadyReplicated bool
}

func (h *Handler) gatherWriteStats(localWrites map[endpointReplica]map[string]trackedSeries) tenantRequestStats {
func (h *Handler) gatherWriteStats(remoteWrites map[endpointReplica]map[string]trackedSeries) tenantRequestStats {
var stats tenantRequestStats = make(tenantRequestStats)

for er := range localWrites {
for tenant, series := range localWrites[er] {
for er := range remoteWrites {
for tenant, series := range remoteWrites[er] {
samples := 0

for _, ts := range series.timeSeries {
Expand Down Expand Up @@ -743,7 +743,6 @@ func (h *Handler) fanoutForward(ctx context.Context, params remoteWriteParams) (
ctx, cancel := context.WithTimeout(tracing.CopyTraceContext(context.Background(), ctx), h.options.ForwardTimeout)

var writeErrors writeErrors
var stats tenantRequestStats = make(tenantRequestStats)

defer func() {
if writeErrors.ErrOrNil() != nil {
Expand All @@ -763,10 +762,11 @@ func (h *Handler) fanoutForward(ctx context.Context, params remoteWriteParams) (
localWrites, remoteWrites, err := h.distributeTimeseriesToReplicas(params.tenant, params.replicas, params.writeRequest.Timeseries)
if err != nil {
level.Error(requestLogger).Log("msg", "failed to distribute timeseries to replicas", "err", err)
return stats, err
return tenantRequestStats{}, err
}

stats = h.gatherWriteStats(localWrites)
// Specific to Databricks setup, we only measure remote writes
stats := h.gatherWriteStats(remoteWrites)

// Prepare a buffered channel to receive the responses from the local and remote writes. Remote writes will all go
// asynchronously and with this capacity we will never block on writing to the channel.
Expand Down
7 changes: 4 additions & 3 deletions pkg/receive/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1841,8 +1841,6 @@ func TestHandlerFlippingHashrings(t *testing.T) {
}

func TestIngestorRestart(t *testing.T) {
// TODO: fix this test. It has a data race.
t.Skip("Skipping this test case temporarily due to a data race")
var err error
logger := log.NewLogfmtLogger(os.Stderr)
addr1, addr2, addr3 := "localhost:14090", "localhost:14091", "localhost:14092"
Expand Down Expand Up @@ -1884,8 +1882,11 @@ func TestIngestorRestart(t *testing.T) {
},
}

_, err = client.handleRequest(ctx, 0, "test", data)
stats, err := client.handleRequest(ctx, 0, "test", data)
require.NoError(t, err)
require.Equal(t, tenantRequestStats{
"test": requestStats{timeseries: 2, totalSamples: 2},
}, stats)

// close srv2 to simulate ingestor down
ing2.Shutdown(err)
Expand Down

0 comments on commit 4616f29

Please sign in to comment.