Skip to content

PIP 94: Message converter at broker level

Yunze Xu edited this page Sep 9, 2021 · 2 revisions

Motivation

The initial motivation was from Kafka's protocol handler, i.e. KoP (https://github.com/streamnative/kop). KoP allows Kafka producer to configure entry format to kafka, which means the message from Kafka producer can be written to bookies directly without any conversion between Kafka's message format and Pulsar's message format. This improves the performance significantly. However, it also introduced the limit that Pulsar consumer cannot consume this topic because it cannot recognize the entry's format.

This proposal tries to introduce a message converter, which is responsible to convert the buffer of an entry to the format that Pulsar consumer can recognize. Once the converter was configured, before dispatching messages to Pulsar consumers, the converter would check if the buffer needs to be converted and then perform the conversion if necessary. We can configure multiple converters because we can configure multiple protocol handlers as well. Each protocol handler could write the entry with its own format.

The benefit is, after this change:

  • When other clients write an entry, no conversion is needed.
  • When other clients read an entry, no conversion is needed.
  • When a Pulsar consumer reads an entry, the conversion will be performed in broker.

Before this change, if we want to interact Pulsar consumer with other clients:

  • When other clients write an entry, we need to convert it to Pulsar format.
  • When other clients read an entry, we need to convert it from Pulsar format to the specific format.
  • When a Pulsar consumer reads an entry, no conversion is needed.

This proposal is mainly for protocol handlers because they can access PersistentTopic and write bytes to bookies directly. In a rare case, if users want to write something to the topic's ledger directly by BookKeeper client, the converter can also handle the case.

Goal

This proposal's goal is only adding message converter at broker level. Once the related broker configs were enabled, the converters would be applied to all topics. An overhead would be brought to the topics which are only created for Pulsar clients. Because we need to check if the buffer needs to be converted. See MessageConverter#accept method in the next section.

In future, we can configure the message converters at namespace level or topic level. Even we can also configure the message converter for Pulsar client so that the conversion only happens at client side and the CPU overload of broker can be reduced.

API changes

First an interface is added under package org.apache.pulsar.common.api.raw

public interface MessageConverter {

    /**
     * Initialize some resources if necessary.
     *
     * @apiNote it should only be called once
     */
    default void init() throws IOException {
        // No ops by default
    }

    /**
     * Release the resources if they were initialized in init().
     */
    default void close() throws IOException {
        // No ops by default
    }

    /**
     * Determine whether the buffer can be converted
     *
     * @param buffer the buffer that might need to be converted
     * @return whether the buffer can be converted
     * @implNote it should treat `buffer` as a read-only buffer, i.e. the buffer's writer index won't change.
     *   However, the buffer's reader index might change in this method, so the caller side should call
     *   {@link ByteBuf#markReaderIndex()} to mark the original reader index if needed.
     */
    boolean accept(ByteBuf buffer);

    /**
     * Convert the buffer to the format that Pulsar consumer can recognize.
     *
     * @param originalBuffer the original buffer
     * @return the converted buffer
     * @implNote it can return either `originalBuffer` itself or a new allocated buffer. When it returns
     *   `originalBuffer` itself, the reference count must increase by 1. Therefore, we must call
     *   {@link ByteBuf#release()} to release the returned value in any case.
     *   The reader index and writer index might change in this method, so the caller side should call
     *   {@link ByteBuf#markReaderIndex()} or {@link ByteBuf#markWriterIndex()} if needed.
     */
    ByteBuf convert(ByteBuf originalBuffer);
}

The a new configuration is added

    @FieldContext(
            category = CATEGORY_PLUGIN,
            doc = "List of message converters, which are responsible to convert entries before dispatching. If multiple"
                    + " converters are accepted for the same payload, the previous one in this list is preferred."
    )
    private List<String> messageConverters;

Implementation

For MessageConverter, add a class MessageConverterValidator to validate whether the implementation is valid.

The implementation is simple. When the broker starts, load all classes that implement MessageConverter interface from messageConverters config. Then we can pass the converters to ServerCnx. Each time a dispatcher dispatches messages to consumer, it will eventually call ServerCnx#newMessageAndIntercept method, in which we can perform the conversion.

NOTE

This implementation needs to be improved because it requires the message payload has the MessageMetadata as its header. Otherwise, it could fail at AbstractBaseDispatcher#filterEntriesForConsumer before sending to consumer. But we can still get some way to pass messageConverters to a dispatcher.

For unit tests, we can test following converters:

  1. RejectAllConverter: accept returns false so that no conversion is performed.
  2. EchoConverter: accept returns true and convert simply returns the original buffer.
  3. BytesConverter: It's an example of a real world converter. The message format has the MessageMetadata part that has the entry.format=bytes property. And the payload part is only the raw bytes without SingleMessageMetadata. The BytesConverter#converter will convert the raw bytes to the format that Pulsar consumer can recognize.
Clone this wiki locally