Skip to content

Latest commit

 

History

History
111 lines (79 loc) · 3.19 KB

pip-363.md

File metadata and controls

111 lines (79 loc) · 3.19 KB

PIP-363: Add callback parameters to the method: org.apache.pulsar.client.impl.SendCallback.sendComplete.

Background knowledge

As introduced in PIP-264, Pulsar has been fully integrated into the OpenTelemetry system, which defines some metric specifications for messaging systems.

In the current Pulsar client code, it is not possible to obtain the number of messages sent in batches(as well as some other sending data), making it impossible to implement messaging.publish.messages metric.

In the opentelemetry-java-instrumentation code, the org.apache.pulsar.client.impl.SendCallback interface is used to instrument data points. For specific implementation details, we can refer to this.

Motivation

In the current situation, org.apache.pulsar.client.impl.ProducerImpl does not provide a public method to obtain the numMessagesInBatch.

So, we can add some of org.apache.pulsar.client.impl.ProducerImpl.OpSendMsg's key data into the org.apache.pulsar.client.impl.SendCallback.sendComplete method.

Detailed Design

Add callback parameters to the method: org.apache.pulsar.client.impl.SendCallback.sendComplete:

public interface SendCallback {

    /**
     * invoked when send operation completes.
     *
     * @param e
     */
    void sendComplete(Throwable e, OpSendMsgStats stats);
}

public interface OpSendMsgStats {
    long getUncompressedSize();

    long getSequenceId();

    int getRetryCount();

    long getBatchSizeByte();

    int getNumMessagesInBatch();

    long getHighestSequenceId();

    int getTotalChunks();

    int getChunkId();
}

@Builder
public class OpSendMsgStatsImpl implements OpSendMsgStats {
    private long uncompressedSize;
    private long sequenceId;
    private int retryCount;
    private long batchSizeByte;
    private int numMessagesInBatch;
    private long highestSequenceId;
    private int totalChunks;
    private int chunkId;

    @Override
    public long getUncompressedSize() {
        return uncompressedSize;
    }

    @Override
    public long getSequenceId() {
        return sequenceId;
    }

    @Override
    public int getRetryCount() {
        return retryCount;
    }

    @Override
    public long getBatchSizeByte() {
        return batchSizeByte;
    }

    @Override
    public int getNumMessagesInBatch() {
        return numMessagesInBatch;
    }

    @Override
    public long getHighestSequenceId() {
        return highestSequenceId;
    }

    @Override
    public int getTotalChunks() {
        return totalChunks;
    }

    @Override
    public int getChunkId() {
        return chunkId;
    }
}

Links