Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize memory allocation for mqtt-kafka offset tracking #694

Merged
merged 3 commits into from
Jan 15, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,6 @@ public class MqttKafkaSessionFactory implements MqttKafkaStreamFactory
public static final int MQTT_NOT_AUTHORIZED = 0x87;
public static final int MQTT_IMPLEMENTATION_SPECIFIC_ERROR = 0x83;
public static final String MQTT_INVALID_SESSION_TIMEOUT_REASON = "Invalid session expiry interval";
private static final String16FW EMPTY_STRING = new String16FW("");

static
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.stream.MqttBeginExFW;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.stream.MqttDataExFW;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.stream.MqttFlushExFW;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.stream.MqttOffsetMetadataFW;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.stream.MqttOffsetStateFlags;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.stream.MqttSubscribeBeginExFW;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.stream.MqttSubscribeFlushExFW;
Expand All @@ -108,6 +107,7 @@ public class MqttKafkaSubscribeFactory implements MqttKafkaStreamFactory
private static final int DATA_FLAG_INIT = 0x02;
private static final int DATA_FLAG_FIN = 0x01;
private static final OctetsFW EMPTY_OCTETS = new OctetsFW().wrap(new UnsafeBuffer(new byte[0]), 0, 0);
private static final String16FW EMPTY_STRING = new String16FW("");

private final OctetsFW emptyRO = new OctetsFW().wrap(new UnsafeBuffer(0L, 0), 0, 0);
private final BeginFW beginRO = new BeginFW();
Expand All @@ -128,7 +128,6 @@ public class MqttKafkaSubscribeFactory implements MqttKafkaStreamFactory
private final WindowFW.Builder windowRW = new WindowFW.Builder();
private final ResetFW.Builder resetRW = new ResetFW.Builder();
private final MqttSubscribeMessageFW.Builder mqttSubscribeMessageRW = new MqttSubscribeMessageFW.Builder();
private final MqttOffsetMetadataFW.Builder mqttOffsetMetadataRW = new MqttOffsetMetadataFW.Builder();

private final ExtensionFW extensionRO = new ExtensionFW();
private final MqttBeginExFW mqttBeginExRO = new MqttBeginExFW();
Expand All @@ -138,7 +137,6 @@ public class MqttKafkaSubscribeFactory implements MqttKafkaStreamFactory
private final KafkaFlushExFW kafkaFlushExRO = new KafkaFlushExFW();
private final KafkaHeaderFW kafkaHeaderRO = new KafkaHeaderFW();
private final MqttSubscribeMessageFW mqttSubscribeMessageRO = new MqttSubscribeMessageFW();
private final MqttOffsetMetadataFW mqttOffsetMetadataRO = new MqttOffsetMetadataFW();

private final MqttDataExFW.Builder mqttDataExRW = new MqttDataExFW.Builder();
private final MqttFlushExFW.Builder mqttFlushExRW = new MqttFlushExFW.Builder();
Expand Down Expand Up @@ -1200,8 +1198,8 @@ else if (state == MqttOffsetStateFlags.INCOMPLETE)
{
p.partitionId(offset.partitionId).partitionOffset(offset.offset + 1);
final IntArrayList incomplete = incompletePacketIds.get(offset.partitionId);
final String partitionMetadata =
incomplete == null || incomplete.isEmpty() ? "" : offSetMetadataListToString(incomplete);
final String16FW partitionMetadata = incomplete == null || incomplete.isEmpty() ?
EMPTY_STRING : offsetMetadataListToString(incomplete);
p.metadata(partitionMetadata);
});
f.correlationId(correlationId);
Expand Down Expand Up @@ -1826,26 +1824,39 @@ public void flushDataIfNecessary(
}
}

//TODO: how to make these more efficient while keeping the internal object easily modifieable (not using FW)?
private IntArrayList stringToOffsetMetadataList(
String16FW metadata)
{
final IntArrayList metadataList = new IntArrayList();
UnsafeBuffer buffer = new UnsafeBuffer(BitUtil.fromHex(metadata.asString()));
final MqttOffsetMetadataFW offsetMetadata = mqttOffsetMetadataRO.wrap(buffer, 0, buffer.capacity());
offsetMetadata.metadata().forEach(m -> metadataList.add(m.packetId()));
int offset = 0;
final DirectBuffer buffer = metadata.value();
byte version = buffer.getByte(offset++);
for (; offset < buffer.capacity(); offset += BitUtil.SIZE_OF_SHORT)
{
metadataList.add((int) buffer.getShort(offset));
}

return metadataList;
}

private String offSetMetadataListToString(
private String16FW offsetMetadataListToString(
IntArrayList metadataList)
{
mqttOffsetMetadataRW.wrap(offsetBuffer, 0, offsetBuffer.capacity());
metadataList.forEach(m -> mqttOffsetMetadataRW.metadataItem(mi -> mi.packetId(m)));
final MqttOffsetMetadataFW offsetMetadata = mqttOffsetMetadataRW.build();
final byte[] array = new byte[offsetMetadata.sizeof()];
offsetMetadata.buffer().getBytes(offsetMetadata.offset(), array);
return BitUtil.toHex(array);
final int length = metadataList.size() * BitUtil.SIZE_OF_SHORT + 1;
final int capacity = BitUtil.SIZE_OF_SHORT + length;
int offset = 0;

offsetBuffer.putShort(offset, (short) length);
offset += BitUtil.SIZE_OF_SHORT;
offsetBuffer.putByte(offset++, (byte) 1);

for (int value : metadataList)
{
offsetBuffer.putShort(offset, (short) value);
offset += BitUtil.SIZE_OF_SHORT;
}

return new String16FW().wrap(offsetBuffer, 0, capacity);
}

final class KafkaRetainedProxy extends KafkaProxy
Expand Down Expand Up @@ -1972,8 +1983,8 @@ protected void doKafkaConsumerFlush(
{
p.partitionId(offset.partitionId).partitionOffset(offset.offset + 1);
final IntArrayList incomplete = incompletePacketIds.get(offset.partitionId);
final String partitionMetadata =
incomplete == null || incomplete.isEmpty() ? "" : offSetMetadataListToString(incomplete);
final String16FW partitionMetadata = incomplete == null || incomplete.isEmpty() ?
EMPTY_STRING : offsetMetadataListToString(incomplete);
p.metadata(partitionMetadata);
});
f.correlationId(correlationId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.agrona.BitUtil;
import org.agrona.DirectBuffer;
import org.agrona.MutableDirectBuffer;
import org.agrona.collections.IntArrayList;
import org.agrona.concurrent.UnsafeBuffer;
import org.kaazing.k3po.lang.el.BytesMatcher;
import org.kaazing.k3po.lang.el.Function;
Expand Down Expand Up @@ -53,7 +54,6 @@
import io.aklivity.zilla.specs.binding.mqtt.internal.types.stream.MqttDataExFW;
import io.aklivity.zilla.specs.binding.mqtt.internal.types.stream.MqttExtensionKind;
import io.aklivity.zilla.specs.binding.mqtt.internal.types.stream.MqttFlushExFW;
import io.aklivity.zilla.specs.binding.mqtt.internal.types.stream.MqttOffsetMetadataFW;
import io.aklivity.zilla.specs.binding.mqtt.internal.types.stream.MqttOffsetStateFlags;
import io.aklivity.zilla.specs.binding.mqtt.internal.types.stream.MqttPublishBeginExFW;
import io.aklivity.zilla.specs.binding.mqtt.internal.types.stream.MqttPublishDataExFW;
Expand Down Expand Up @@ -848,27 +848,42 @@ public byte[] build()

public static final class MqttOffsetMetadataBuilder
{
private final MqttOffsetMetadataFW.Builder offsetMetadataRW = new MqttOffsetMetadataFW.Builder();
final MutableDirectBuffer writeBuffer;
final IntArrayList packetIds;

byte version = 1;


private MqttOffsetMetadataBuilder()
{
MutableDirectBuffer writeBuffer = new UnsafeBuffer(new byte[1024 * 8]);
offsetMetadataRW.wrap(writeBuffer, 0, writeBuffer.capacity());
writeBuffer = new UnsafeBuffer(new byte[1024 * 8]);
packetIds = new IntArrayList();
}

public MqttOffsetMetadataBuilder metadata(
int packetId)
{
offsetMetadataRW.metadataItem(f -> f.packetId(packetId));
packetIds.add(packetId);
return this;
}

public String build()
{
final MqttOffsetMetadataFW offsetMetadata = offsetMetadataRW.build();
final byte[] array = new byte[offsetMetadata.sizeof()];
offsetMetadata.buffer().getBytes(offsetMetadata.offset(), array);
return BitUtil.toHex(array);
final int length = packetIds.size() * BitUtil.SIZE_OF_SHORT + 1;
final int capacity = BitUtil.SIZE_OF_SHORT + length;
int offset = 0;

writeBuffer.putShort(offset, (short) length);
offset += BitUtil.SIZE_OF_SHORT;
writeBuffer.putByte(offset++, (byte) 1);

for (int value : packetIds)
{
writeBuffer.putShort(offset, (short) value);
offset += BitUtil.SIZE_OF_SHORT;
}

return new String16FW().wrap(writeBuffer, 0, capacity).asString();
}
}

Expand Down
11 changes: 0 additions & 11 deletions specs/binding-mqtt.spec/src/main/resources/META-INF/zilla/mqtt.idl
Original file line number Diff line number Diff line change
Expand Up @@ -248,16 +248,5 @@ scope mqtt
COMPLETE(0),
INCOMPLETE(1)
}

struct MqttOffsetState
{
uint16 packetId;
}

struct MqttOffsetMetadata
{
uint8 version = 1;
MqttOffsetState[] metadata;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import org.agrona.BitUtil;
import org.agrona.DirectBuffer;
import org.agrona.collections.IntArrayList;
import org.agrona.concurrent.UnsafeBuffer;
import org.junit.Test;
import org.kaazing.k3po.lang.el.BytesMatcher;
Expand All @@ -35,10 +36,10 @@
import io.aklivity.zilla.specs.binding.mqtt.internal.types.MqttSessionSignalType;
import io.aklivity.zilla.specs.binding.mqtt.internal.types.MqttSessionStateFW;
import io.aklivity.zilla.specs.binding.mqtt.internal.types.MqttWillMessageFW;
import io.aklivity.zilla.specs.binding.mqtt.internal.types.String16FW;
import io.aklivity.zilla.specs.binding.mqtt.internal.types.stream.MqttBeginExFW;
import io.aklivity.zilla.specs.binding.mqtt.internal.types.stream.MqttDataExFW;
import io.aklivity.zilla.specs.binding.mqtt.internal.types.stream.MqttFlushExFW;
import io.aklivity.zilla.specs.binding.mqtt.internal.types.stream.MqttOffsetMetadataFW;
import io.aklivity.zilla.specs.binding.mqtt.internal.types.stream.MqttResetExFW;

public class MqttFunctionsTest
Expand Down Expand Up @@ -1272,16 +1273,18 @@ public void shouldEncodeMqttOffsetMetadata()
.metadata(2)
.build();

DirectBuffer buffer = new UnsafeBuffer(BitUtil.fromHex(state));
MqttOffsetMetadataFW offsetMetadata = new MqttOffsetMetadataFW().wrap(buffer, 0, buffer.capacity());
final IntArrayList metadataList = new IntArrayList();
int offset = 0;
final DirectBuffer buffer = new String16FW(state).value();
byte version = buffer.getByte(offset++);
for (; offset < buffer.capacity(); offset += BitUtil.SIZE_OF_SHORT)
{
metadataList.add((int) buffer.getShort(offset));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs a version prefix in case we choose to evolve the structure over time.
Needs BIG_ENDIAN as we are putting into an external system so could be read by any endian environment, not just local native endianness.

Also, let's use int16[length] support in .idl to generate a flyweight instead.
See examples in flyweight test resources at https://github.com/aklivity/zilla/blob/develop/build/flyweight-maven-plugin/src/test/resources/test-project/test.idl#L123.


assertNotNull(offsetMetadata.metadata()
.matchFirst(m ->
1 == m.packetId()));

assertNotNull(offsetMetadata.metadata()
.matchFirst(m ->
2 == m.packetId()));
assertEquals(1, version);
assertEquals(1, (int) metadataList.get(0));
assertEquals(2, (int) metadataList.get(1));
}

@Test
Expand Down