diff --git a/waltz-client/src/main/java/com/wepay/waltz/client/TransactionBuilder.java b/waltz-client/src/main/java/com/wepay/waltz/client/TransactionBuilder.java index dcb6feca..420bf3c7 100644 --- a/waltz-client/src/main/java/com/wepay/waltz/client/TransactionBuilder.java +++ b/waltz-client/src/main/java/com/wepay/waltz/client/TransactionBuilder.java @@ -37,4 +37,11 @@ public interface TransactionBuilder { */ void setReadLocks(List partitionLocalLocks); + /** + * Sets optimistic append locks. + * + * @param partitionLocalLocks a list of {@link PartitionLocalLock} + */ + void setAppendLocks(List partitionLocalLocks); + } diff --git a/waltz-client/src/main/java/com/wepay/waltz/client/internal/TransactionBuilderImpl.java b/waltz-client/src/main/java/com/wepay/waltz/client/internal/TransactionBuilderImpl.java index 9defb3c3..94c36cc3 100644 --- a/waltz-client/src/main/java/com/wepay/waltz/client/internal/TransactionBuilderImpl.java +++ b/waltz-client/src/main/java/com/wepay/waltz/client/internal/TransactionBuilderImpl.java @@ -23,6 +23,7 @@ public class TransactionBuilderImpl implements TransactionBuilder { private byte[] data = null; private List writeLocks; private List readLocks; + private List appendLocks; /** * Class Constructor. @@ -54,6 +55,11 @@ public void setReadLocks(List locks) { this.readLocks = locks; } + @Override + public void setAppendLocks(List locks) { + this.appendLocks = locks; + } + /** * @return a new instance of {@link AppendRequest} to send to Waltz cluster. */ @@ -63,6 +69,7 @@ public AppendRequest buildRequest() { clientHighWaterMark, compileLockRequest(writeLocks), compileLockRequest(readLocks), + compileLockRequest(appendLocks), header, data, Utils.checksum(data) diff --git a/waltz-client/src/main/java/com/wepay/waltz/client/internal/network/WaltzClientHandler.java b/waltz-client/src/main/java/com/wepay/waltz/client/internal/network/WaltzClientHandler.java index 2669e819..5e121af7 100644 --- a/waltz-client/src/main/java/com/wepay/waltz/client/internal/network/WaltzClientHandler.java +++ b/waltz-client/src/main/java/com/wepay/waltz/client/internal/network/WaltzClientHandler.java @@ -13,6 +13,7 @@ import com.wepay.waltz.common.message.LockFailure; import com.wepay.waltz.common.message.MessageCodecV0; import com.wepay.waltz.common.message.MessageCodecV1; +import com.wepay.waltz.common.message.MessageCodecV2; import com.wepay.waltz.common.message.MessageType; import com.wepay.waltz.common.message.MountRequest; import com.wepay.waltz.common.message.MountResponse; @@ -35,6 +36,7 @@ public class WaltzClientHandler extends MessageHandler { static { CODECS.put(MessageCodecV0.VERSION, MessageCodecV0.INSTANCE); CODECS.put(MessageCodecV1.VERSION, MessageCodecV1.INSTANCE); + CODECS.put(MessageCodecV2.VERSION, MessageCodecV2.INSTANCE); } private static final String HELLO_MESSAGE = "Waltz Client"; diff --git a/waltz-client/src/test/java/com/wepay/waltz/client/internal/TransactionBuilderImplTest.java b/waltz-client/src/test/java/com/wepay/waltz/client/internal/TransactionBuilderImplTest.java index 942bc933..583944a2 100644 --- a/waltz-client/src/test/java/com/wepay/waltz/client/internal/TransactionBuilderImplTest.java +++ b/waltz-client/src/test/java/com/wepay/waltz/client/internal/TransactionBuilderImplTest.java @@ -18,6 +18,7 @@ public void test() { .data("good transaction") .writeLocks(1, 2, 3) .readLocks(4, 5, 6) + .appendLocks(7, 8, 9) .build(); TransactionBuilderImpl builder = new TransactionBuilderImpl(new ReqId(111, 222, 333, 444), 999); @@ -43,8 +44,15 @@ public void test() { MockContext.makeLock(6).hashCode() }; + int[] expectedAppendLockRequest = new int[]{ + MockContext.makeLock(7).hashCode(), + MockContext.makeLock(8).hashCode(), + MockContext.makeLock(9).hashCode() + }; + assertArrayEquals(expectedWriteLockRequest, request.writeLockRequest); assertArrayEquals(expectedReadLockRequest, request.readLockRequest); + assertArrayEquals(expectedAppendLockRequest, request.appendLockRequest); } } diff --git a/waltz-common/src/main/java/com/wepay/waltz/common/message/AppendRequest.java b/waltz-common/src/main/java/com/wepay/waltz/common/message/AppendRequest.java index ce13c105..c2bb363c 100644 --- a/waltz-common/src/main/java/com/wepay/waltz/common/message/AppendRequest.java +++ b/waltz-common/src/main/java/com/wepay/waltz/common/message/AppendRequest.java @@ -7,17 +7,28 @@ public class AppendRequest extends AbstractMessage { public final long clientHighWaterMark; public final int[] writeLockRequest; public final int[] readLockRequest; + public final int[] appendLockRequest; public final int header; public final byte[] data; public final int checksum; @SuppressFBWarnings(value = "EI_EXPOSE_REP2", justification = "internal class") - public AppendRequest(ReqId reqId, long clientHighWaterMark, int[] writeLockRequest, int[] readLockRequest, int header, byte[] data, int checksum) { + public AppendRequest( + ReqId reqId, + long clientHighWaterMark, + int[] writeLockRequest, + int[] readLockRequest, + int[] appendLockRequest, + int header, + byte[] data, + int checksum + ) { super(reqId); this.clientHighWaterMark = clientHighWaterMark; this.writeLockRequest = writeLockRequest; this.readLockRequest = readLockRequest; + this.appendLockRequest = appendLockRequest; this.header = header; this.data = data; this.checksum = checksum; diff --git a/waltz-common/src/main/java/com/wepay/waltz/common/message/MessageCodecV0.java b/waltz-common/src/main/java/com/wepay/waltz/common/message/MessageCodecV0.java index 4fe26670..3fef61e6 100644 --- a/waltz-common/src/main/java/com/wepay/waltz/common/message/MessageCodecV0.java +++ b/waltz-common/src/main/java/com/wepay/waltz/common/message/MessageCodecV0.java @@ -13,7 +13,7 @@ public class MessageCodecV0 implements MessageCodec { public static final MessageCodecV0 INSTANCE = new MessageCodecV0(); private static final byte MAGIC_BYTE = 'L'; - private static final int[] NO_READ_LOCKS = new int[0]; + private static final int[] NO_LOCKS = new int[0]; @Override public byte magicByte() { @@ -52,7 +52,7 @@ public Message decode(MessageAttributeReader reader) { data = reader.readByteArray(); checksum = reader.readInt(); Utils.verifyChecksum(messageType, data, checksum); - return new AppendRequest(reqId, transactionId, writeLockRequest, NO_READ_LOCKS, header, data, checksum); + return new AppendRequest(reqId, transactionId, writeLockRequest, NO_LOCKS, NO_LOCKS, header, data, checksum); case MessageType.FEED_REQUEST: transactionId = reader.readLong(); // client High-water mark @@ -131,6 +131,11 @@ public void encode(Message msg, MessageAttributeWriter writer) { "read locks not supported, upgrade servers" ); } + if (appendRequest.appendLockRequest.length > 0) { + throw new UnsupportedOperationException( + "append locks not supported, upgrade servers" + ); + } writer.writeLong(appendRequest.clientHighWaterMark); writer.writeIntArray(appendRequest.writeLockRequest); diff --git a/waltz-common/src/main/java/com/wepay/waltz/common/message/MessageCodecV1.java b/waltz-common/src/main/java/com/wepay/waltz/common/message/MessageCodecV1.java index f9ee0df3..1cd2f5d2 100644 --- a/waltz-common/src/main/java/com/wepay/waltz/common/message/MessageCodecV1.java +++ b/waltz-common/src/main/java/com/wepay/waltz/common/message/MessageCodecV1.java @@ -16,6 +16,7 @@ public class MessageCodecV1 implements MessageCodec { public static final MessageCodecV1 INSTANCE = new MessageCodecV1(); private static final byte MAGIC_BYTE = 'L'; + private static final int[] NO_LOCKS = new int[0]; @Override public byte magicByte() { @@ -55,7 +56,7 @@ public Message decode(MessageAttributeReader reader) { data = reader.readByteArray(); checksum = reader.readInt(); Utils.verifyChecksum(messageType, data, checksum); - return new AppendRequest(reqId, transactionId, writeLockRequest, readLockRequest, header, data, checksum); + return new AppendRequest(reqId, transactionId, writeLockRequest, readLockRequest, NO_LOCKS, header, data, checksum); case MessageType.FEED_REQUEST: transactionId = reader.readLong(); // client High-water mark @@ -139,6 +140,13 @@ public void encode(Message msg, MessageAttributeWriter writer) { case MessageType.APPEND_REQUEST: AppendRequest appendRequest = (AppendRequest) msg; + + if (appendRequest.appendLockRequest.length > 0) { + throw new UnsupportedOperationException( + "append locks not supported, upgrade servers" + ); + } + writer.writeLong(appendRequest.clientHighWaterMark); writer.writeIntArray(appendRequest.writeLockRequest); writer.writeIntArray(appendRequest.readLockRequest); diff --git a/waltz-common/src/main/java/com/wepay/waltz/common/message/MessageCodecV2.java b/waltz-common/src/main/java/com/wepay/waltz/common/message/MessageCodecV2.java new file mode 100644 index 00000000..4a0fd456 --- /dev/null +++ b/waltz-common/src/main/java/com/wepay/waltz/common/message/MessageCodecV2.java @@ -0,0 +1,236 @@ +package com.wepay.waltz.common.message; + +import com.wepay.riff.network.Message; +import com.wepay.riff.network.MessageAttributeReader; +import com.wepay.riff.network.MessageAttributeWriter; +import com.wepay.riff.network.MessageCodec; +import com.wepay.waltz.common.util.Utils; +import com.wepay.waltz.exception.RpcException; + +import java.util.HashMap; +import java.util.Map; + +public class MessageCodecV2 implements MessageCodec { + + public static final short VERSION = 2; + public static final MessageCodecV2 INSTANCE = new MessageCodecV2(); + + private static final byte MAGIC_BYTE = 'L'; + + @Override + public byte magicByte() { + return MAGIC_BYTE; + } + + @Override + public short version() { + return VERSION; + } + + @Override + public Message decode(MessageAttributeReader reader) { + // Decode common attributes + byte messageType = reader.readByte(); + ReqId reqId = ReqId.readFrom(reader); + long transactionId; + int header; + byte[] data; + int checksum; + + switch (messageType) { + case MessageType.MOUNT_REQUEST: + long clientHighWaterMark = reader.readLong(); + long seqNum = reader.readLong(); + return new MountRequest(reqId, clientHighWaterMark, seqNum); + + case MessageType.MOUNT_RESPONSE: + boolean partitionReady = reader.readBoolean(); + return new MountResponse(reqId, partitionReady); + + case MessageType.APPEND_REQUEST: + transactionId = reader.readLong(); // client High-water mark + int[] writeLockRequest = reader.readIntArray(); + int[] readLockRequest = reader.readIntArray(); + int[] appendLockRequest = reader.readIntArray(); + header = reader.readInt(); + data = reader.readByteArray(); + checksum = reader.readInt(); + Utils.verifyChecksum(messageType, data, checksum); + return new AppendRequest( + reqId, + transactionId, + writeLockRequest, + readLockRequest, + appendLockRequest, + header, + data, + checksum + ); + + case MessageType.FEED_REQUEST: + transactionId = reader.readLong(); // client High-water mark + return new FeedRequest(reqId, transactionId); + + case MessageType.FEED_DATA: + transactionId = reader.readLong(); + header = reader.readInt(); + return new FeedData(reqId, transactionId, header); + + case MessageType.FEED_SUSPENDED: + return new FeedSuspended(reqId); + + case MessageType.TRANSACTION_DATA_REQUEST: + transactionId = reader.readLong(); + return new TransactionDataRequest(reqId, transactionId); + + case MessageType.TRANSACTION_DATA_RESPONSE: + transactionId = reader.readLong(); + if (reader.readBoolean()) { + data = reader.readByteArray(); + checksum = reader.readInt(); + Utils.verifyChecksum(messageType, data, checksum); + return new TransactionDataResponse(reqId, transactionId, data, checksum); + } else { + RpcException exception = new RpcException(reader.readString()); + return new TransactionDataResponse(reqId, transactionId, exception); + } + + case MessageType.FLUSH_REQUEST: + return new FlushRequest(reqId); + + case MessageType.FLUSH_RESPONSE: + transactionId = reader.readLong(); + return new FlushResponse(reqId, transactionId); + + case MessageType.HIGH_WATER_MARK_REQUEST: + return new HighWaterMarkRequest(reqId); + + case MessageType.HIGH_WATER_MARK_RESPONSE: + transactionId = reader.readLong(); + return new HighWaterMarkResponse(reqId, transactionId); + + case MessageType.LOCK_FAILURE: + transactionId = reader.readLong(); + return new LockFailure(reqId, transactionId); + + case MessageType.CHECK_STORAGE_CONNECTIVITY_REQUEST: + return new CheckStorageConnectivityRequest(reqId); + + case MessageType.CHECK_STORAGE_CONNECTIVITY_RESPONSE: + int size = reader.readInt(); + Map storageConnectivityMap = new HashMap<>(); + for (int i = 0; i < size; i++) { + storageConnectivityMap.put(reader.readString(), reader.readBoolean()); + } + return new CheckStorageConnectivityResponse(reqId, storageConnectivityMap); + + default: + throw new IllegalStateException("unknown message type: " + messageType); + } + } + + @Override + public void encode(Message msg, MessageAttributeWriter writer) { + // Encode common attributes + writer.writeByte(msg.type()); + ((AbstractMessage) msg).reqId.writeTo(writer); + + switch (msg.type()) { + case MessageType.MOUNT_REQUEST: + MountRequest mountRequest = (MountRequest) msg; + writer.writeLong(mountRequest.clientHighWaterMark); + writer.writeLong(mountRequest.seqNum); + break; + + case MessageType.MOUNT_RESPONSE: + MountResponse mountResponse = (MountResponse) msg; + writer.writeBoolean(mountResponse.partitionReady); + break; + + case MessageType.APPEND_REQUEST: + AppendRequest appendRequest = (AppendRequest) msg; + writer.writeLong(appendRequest.clientHighWaterMark); + writer.writeIntArray(appendRequest.writeLockRequest); + writer.writeIntArray(appendRequest.readLockRequest); + writer.writeIntArray(appendRequest.appendLockRequest); + writer.writeInt(appendRequest.header); + writer.writeByteArray(appendRequest.data); + writer.writeInt(appendRequest.checksum); + break; + + case MessageType.FEED_REQUEST: + FeedRequest feedRequest = (FeedRequest) msg; + writer.writeLong(feedRequest.clientHighWaterMark); + break; + + case MessageType.FEED_DATA: + FeedData feedData = (FeedData) msg; + writer.writeLong(feedData.transactionId); + writer.writeInt(feedData.header); + break; + + case MessageType.FEED_SUSPENDED: + break; + + case MessageType.TRANSACTION_DATA_REQUEST: + TransactionDataRequest dataRequest = (TransactionDataRequest) msg; + writer.writeLong(dataRequest.transactionId); + break; + + case MessageType.TRANSACTION_DATA_RESPONSE: + TransactionDataResponse dataResponse = (TransactionDataResponse) msg; + writer.writeLong(dataResponse.transactionId); + if (dataResponse.data != null) { + writer.writeBoolean(true); + writer.writeByteArray(dataResponse.data); + writer.writeInt(dataResponse.checksum); + } else if (dataResponse.exception != null) { + writer.writeBoolean(false); + writer.writeString(dataResponse.exception.getMessage()); + } else { + throw new IllegalStateException("corrupted message: " + msg.type()); + } + break; + + case MessageType.FLUSH_REQUEST: + break; + + case MessageType.FLUSH_RESPONSE: + FlushResponse flushResponse = (FlushResponse) msg; + writer.writeLong(flushResponse.transactionId); + break; + + case MessageType.HIGH_WATER_MARK_REQUEST: + break; + + case MessageType.HIGH_WATER_MARK_RESPONSE: + HighWaterMarkResponse highWaterMarkResponse = (HighWaterMarkResponse) msg; + writer.writeLong(highWaterMarkResponse.transactionId); + break; + + case MessageType.LOCK_FAILURE: + LockFailure lockFailure = (LockFailure) msg; + writer.writeLong(lockFailure.transactionId); + break; + + case MessageType.CHECK_STORAGE_CONNECTIVITY_REQUEST: + break; + + case MessageType.CHECK_STORAGE_CONNECTIVITY_RESPONSE: + CheckStorageConnectivityResponse checkStorageConnectivityResponse = + (CheckStorageConnectivityResponse) msg; + int size = checkStorageConnectivityResponse.storageConnectivityMap.size(); + writer.writeInt(size); + Map storageConnectivityMap = checkStorageConnectivityResponse.storageConnectivityMap; + for (Map.Entry storageConnectionEntry : storageConnectivityMap.entrySet()) { + writer.writeString(storageConnectionEntry.getKey()); + writer.writeBoolean(storageConnectionEntry.getValue()); + } + break; + + default: + throw new IllegalStateException("unknown message type: " + msg.type()); + } + } + +} diff --git a/waltz-common/src/test/java/com/wepay/waltz/common/message/MessageCodecV0Test.java b/waltz-common/src/test/java/com/wepay/waltz/common/message/MessageCodecV0Test.java index 39cbb567..116e3838 100644 --- a/waltz-common/src/test/java/com/wepay/waltz/common/message/MessageCodecV0Test.java +++ b/waltz-common/src/test/java/com/wepay/waltz/common/message/MessageCodecV0Test.java @@ -30,7 +30,7 @@ public void test() { byte[] data; data = data(); - AppendRequest appendRequest1 = new AppendRequest(reqId(), rand.nextLong(), lockRequest, new int[0], header, data, Utils.checksum(data)); + AppendRequest appendRequest1 = new AppendRequest(reqId(), rand.nextLong(), lockRequest, new int[0], new int[0], header, data, Utils.checksum(data)); AppendRequest appendRequest2 = encodeThenDecode(appendRequest1); assertEquals(MessageType.APPEND_REQUEST, appendRequest1.type()); assertEquals(appendRequest1.type(), appendRequest2.type()); @@ -138,7 +138,7 @@ public void test() { } @Test(expected = UnsupportedOperationException.class) - public void testUnsupported() { + public void testUnsupported1() { assertEquals(0, codec.version()); int[] readLocks = {1}; @@ -146,7 +146,21 @@ public void testUnsupported() { byte[] data; data = data(); - AppendRequest appendRequest = new AppendRequest(reqId(), rand.nextLong(), new int[0], readLocks, header, data, Utils.checksum(data)); + AppendRequest appendRequest = new AppendRequest(reqId(), rand.nextLong(), new int[0], readLocks, new int[0], header, data, Utils.checksum(data)); + ByteArrayMessageAttributeWriter writer = new ByteArrayMessageAttributeWriter(); + codec.encode(appendRequest, writer); + } + + @Test(expected = UnsupportedOperationException.class) + public void testUnsupported2() { + assertEquals(0, codec.version()); + + int[] appendLocks = {1}; + int header = rand.nextInt(); + byte[] data; + + data = data(); + AppendRequest appendRequest = new AppendRequest(reqId(), rand.nextLong(), new int[0], new int[0], appendLocks, header, data, Utils.checksum(data)); ByteArrayMessageAttributeWriter writer = new ByteArrayMessageAttributeWriter(); codec.encode(appendRequest, writer); } diff --git a/waltz-common/src/test/java/com/wepay/waltz/common/message/MessageCodecV1Test.java b/waltz-common/src/test/java/com/wepay/waltz/common/message/MessageCodecV1Test.java new file mode 100644 index 00000000..ed234f05 --- /dev/null +++ b/waltz-common/src/test/java/com/wepay/waltz/common/message/MessageCodecV1Test.java @@ -0,0 +1,182 @@ +package com.wepay.waltz.common.message; + +import com.wepay.riff.message.ByteArrayMessageAttributeReader; +import com.wepay.riff.message.ByteArrayMessageAttributeWriter; +import com.wepay.riff.network.Message; +import com.wepay.waltz.common.util.Utils; +import com.wepay.waltz.exception.RpcException; +import org.junit.Test; + +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Random; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +public class MessageCodecV1Test { + + private final MessageCodecV1 codec = new MessageCodecV1(); + private final Random rand = new Random(); + + @Test + public void test() { + assertEquals(1, codec.version()); + + int[] writeLockRequest = lock(); + int[] readLockRequest = lock(); + int header = rand.nextInt(); + byte[] data; + + data = data(); + AppendRequest appendRequest1 = new AppendRequest(reqId(), rand.nextLong(), writeLockRequest, readLockRequest, new int[0], header, data, Utils.checksum(data)); + AppendRequest appendRequest2 = encodeThenDecode(appendRequest1); + assertEquals(MessageType.APPEND_REQUEST, appendRequest1.type()); + assertEquals(appendRequest1.type(), appendRequest2.type()); + assertEquals(appendRequest1.reqId, appendRequest2.reqId); + assertEquals(appendRequest1.clientHighWaterMark, appendRequest2.clientHighWaterMark); + assertTrue(Arrays.equals(appendRequest1.data, appendRequest2.data)); + + MountRequest mountRequest1 = new MountRequest(reqId(), rand.nextLong(), rand.nextLong()); + MountRequest mountRequest2 = encodeThenDecode(mountRequest1); + assertEquals(MessageType.MOUNT_REQUEST, mountRequest1.type()); + assertEquals(mountRequest1.type(), mountRequest2.type()); + assertEquals(mountRequest1.reqId, mountRequest2.reqId); + assertEquals(mountRequest1.clientHighWaterMark, mountRequest2.clientHighWaterMark); + assertEquals(mountRequest1.seqNum, mountRequest2.seqNum); + + MountResponse mountResponse1 = new MountResponse(reqId(), rand.nextBoolean()); + MountResponse mountResponse2 = encodeThenDecode(mountResponse1); + assertEquals(MessageType.MOUNT_RESPONSE, mountResponse1.type()); + assertEquals(mountResponse1.type(), mountResponse2.type()); + assertEquals(mountResponse1.reqId, mountResponse2.reqId); + assertEquals(mountResponse1.partitionReady, mountResponse2.partitionReady); + + FeedData feedData1 = new FeedData(reqId(), rand.nextLong(), header); + FeedData feedData2 = encodeThenDecode(feedData1); + assertEquals(MessageType.FEED_DATA, feedData1.type()); + assertEquals(feedData1.type(), feedData2.type()); + assertEquals(feedData1.reqId, feedData2.reqId); + assertEquals(feedData1.transactionId, feedData2.transactionId); + assertEquals(feedData1.header, feedData2.header); + + FeedRequest feedRequest1 = new FeedRequest(reqId(), rand.nextLong()); + FeedRequest feedRequest2 = encodeThenDecode(feedRequest1); + assertEquals(MessageType.FEED_REQUEST, feedRequest1.type()); + assertEquals(feedRequest1.type(), feedRequest2.type()); + assertEquals(feedRequest1.reqId, feedRequest2.reqId); + assertEquals(feedRequest1.clientHighWaterMark, feedRequest2.clientHighWaterMark); + + FeedSuspended feedSuspended1 = new FeedSuspended(reqId()); + FeedSuspended feedSuspended2 = encodeThenDecode(feedSuspended1); + assertEquals(MessageType.FEED_SUSPENDED, feedSuspended1.type()); + assertEquals(feedSuspended1.type(), feedSuspended2.type()); + assertEquals(feedSuspended1.reqId, feedSuspended2.reqId); + + FlushRequest flushRequest1 = new FlushRequest(reqId()); + FlushRequest flushRequest2 = encodeThenDecode(flushRequest1); + assertEquals(MessageType.FLUSH_REQUEST, flushRequest1.type()); + assertEquals(flushRequest1.type(), flushRequest2.type()); + assertEquals(flushRequest1.reqId, flushRequest2.reqId); + + FlushResponse flushResponse1 = new FlushResponse(reqId(), rand.nextLong()); + FlushResponse flushResponse2 = encodeThenDecode(flushResponse1); + assertEquals(MessageType.FLUSH_RESPONSE, flushResponse1.type()); + assertEquals(flushResponse1.type(), flushResponse2.type()); + assertEquals(flushResponse1.reqId, flushResponse2.reqId); + assertEquals(flushResponse1.transactionId, flushResponse2.transactionId); + + TransactionDataRequest transactionDataRequest1 = new TransactionDataRequest(reqId(), rand.nextLong()); + TransactionDataRequest transactionDataRequest2 = encodeThenDecode(transactionDataRequest1); + assertEquals(MessageType.TRANSACTION_DATA_REQUEST, transactionDataRequest1.type()); + assertEquals(transactionDataRequest1.type(), transactionDataRequest2.type()); + assertEquals(transactionDataRequest1.reqId, transactionDataRequest2.reqId); + assertEquals(transactionDataRequest1.transactionId, transactionDataRequest2.transactionId); + + TransactionDataResponse transactionDataResponse1 = + new TransactionDataResponse(reqId(), rand.nextLong(), data, Utils.checksum(data)); + TransactionDataResponse transactionDataResponse2 = encodeThenDecode(transactionDataResponse1); + assertEquals(MessageType.TRANSACTION_DATA_RESPONSE, transactionDataResponse1.type()); + assertEquals(transactionDataResponse1.type(), transactionDataResponse2.type()); + assertEquals(transactionDataResponse1.reqId, transactionDataResponse2.reqId); + assertEquals(transactionDataResponse1.transactionId, transactionDataResponse2.transactionId); + assertNotNull(transactionDataResponse1.data); + assertNotNull(transactionDataResponse2.data); + assertNull(transactionDataResponse1.exception); + assertNull(transactionDataResponse2.exception); + assertTrue(Arrays.equals(transactionDataResponse1.data, transactionDataResponse2.data)); + assertEquals(transactionDataResponse1.checksum, transactionDataResponse2.checksum); + + TransactionDataResponse transactionDataResponse3 = + new TransactionDataResponse(reqId(), rand.nextLong(), new RpcException(Integer.toString(rand.nextInt()))); + TransactionDataResponse transactionDataResponse4 = encodeThenDecode(transactionDataResponse3); + assertEquals(MessageType.TRANSACTION_DATA_RESPONSE, transactionDataResponse1.type()); + assertEquals(transactionDataResponse3.type(), transactionDataResponse4.type()); + assertEquals(transactionDataResponse3.reqId, transactionDataResponse4.reqId); + assertEquals(transactionDataResponse3.transactionId, transactionDataResponse4.transactionId); + assertNull(transactionDataResponse3.data); + assertNull(transactionDataResponse4.data); + assertEquals(0, transactionDataResponse3.checksum); + assertEquals(0, transactionDataResponse4.checksum); + assertNotNull(transactionDataResponse3.exception); + assertNotNull(transactionDataResponse4.exception); + assertEquals(transactionDataResponse3.exception.toString(), transactionDataResponse4.exception.toString()); + + HighWaterMarkRequest highWaterMarkRequest1 = new HighWaterMarkRequest(reqId()); + HighWaterMarkRequest highWaterMarkRequest2 = encodeThenDecode(highWaterMarkRequest1); + assertEquals(MessageType.HIGH_WATER_MARK_REQUEST, highWaterMarkRequest1.type()); + assertEquals(highWaterMarkRequest1.type(), highWaterMarkRequest2.type()); + assertEquals(highWaterMarkRequest1.reqId, highWaterMarkRequest2.reqId); + + HighWaterMarkResponse highWaterMarkResponse1 = new HighWaterMarkResponse(reqId(), rand.nextLong()); + HighWaterMarkResponse highWaterMarkResponse2 = encodeThenDecode(highWaterMarkResponse1); + assertEquals(MessageType.HIGH_WATER_MARK_RESPONSE, highWaterMarkResponse1.type()); + assertEquals(highWaterMarkResponse1.type(), highWaterMarkResponse2.type()); + assertEquals(highWaterMarkResponse1.reqId, highWaterMarkResponse2.reqId); + assertEquals(highWaterMarkResponse1.transactionId, highWaterMarkResponse2.transactionId); + } + + @Test(expected = UnsupportedOperationException.class) + public void testUnsupported() { + assertEquals(1, codec.version()); + + int[] appendLocks = {1}; + int header = rand.nextInt(); + byte[] data; + + data = data(); + AppendRequest appendRequest = new AppendRequest(reqId(), rand.nextLong(), new int[0], new int[0], appendLocks, header, data, Utils.checksum(data)); + ByteArrayMessageAttributeWriter writer = new ByteArrayMessageAttributeWriter(); + codec.encode(appendRequest, writer); + } + + @SuppressWarnings("unchecked") + private T encodeThenDecode(T message) { + ByteArrayMessageAttributeWriter writer = new ByteArrayMessageAttributeWriter(); + codec.encode(message, writer); + ByteArrayMessageAttributeReader reader = new ByteArrayMessageAttributeReader(writer.toByteArray()); + return (T) codec.decode(reader); + } + + private ReqId reqId() { + return new ReqId(rand.nextLong(), rand.nextLong()); + } + + private int[] lock() { + int n = rand.nextInt(3); + int[] lock = new int[n]; + + for (int i = 0; i < n; i++) { + lock[i] = rand.nextInt(); + } + + return lock; + } + + private byte[] data() { + return Long.toOctalString(rand.nextLong()).getBytes(StandardCharsets.UTF_8); + } + +} diff --git a/waltz-common/src/test/java/com/wepay/waltz/common/message/MessageCodecV2Test.java b/waltz-common/src/test/java/com/wepay/waltz/common/message/MessageCodecV2Test.java new file mode 100644 index 00000000..a064454c --- /dev/null +++ b/waltz-common/src/test/java/com/wepay/waltz/common/message/MessageCodecV2Test.java @@ -0,0 +1,169 @@ +package com.wepay.waltz.common.message; + +import com.wepay.riff.message.ByteArrayMessageAttributeReader; +import com.wepay.riff.message.ByteArrayMessageAttributeWriter; +import com.wepay.riff.network.Message; +import com.wepay.waltz.common.util.Utils; +import com.wepay.waltz.exception.RpcException; +import org.junit.Test; + +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Random; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +public class MessageCodecV2Test { + + private final MessageCodecV2 codec = new MessageCodecV2(); + private final Random rand = new Random(); + + @Test + public void test() { + assertEquals(2, codec.version()); + + int[] writeLockRequest = lock(); + int[] readLockRequest = lock(); + int[] appendLockRequest = lock(); + int header = rand.nextInt(); + byte[] data; + + data = data(); + AppendRequest appendRequest1 = new AppendRequest(reqId(), rand.nextLong(), writeLockRequest, readLockRequest, appendLockRequest, header, data, Utils.checksum(data)); + AppendRequest appendRequest2 = encodeThenDecode(appendRequest1); + assertEquals(MessageType.APPEND_REQUEST, appendRequest1.type()); + assertEquals(appendRequest1.type(), appendRequest2.type()); + assertEquals(appendRequest1.reqId, appendRequest2.reqId); + assertEquals(appendRequest1.clientHighWaterMark, appendRequest2.clientHighWaterMark); + assertTrue(Arrays.equals(appendRequest1.data, appendRequest2.data)); + + MountRequest mountRequest1 = new MountRequest(reqId(), rand.nextLong(), rand.nextLong()); + MountRequest mountRequest2 = encodeThenDecode(mountRequest1); + assertEquals(MessageType.MOUNT_REQUEST, mountRequest1.type()); + assertEquals(mountRequest1.type(), mountRequest2.type()); + assertEquals(mountRequest1.reqId, mountRequest2.reqId); + assertEquals(mountRequest1.clientHighWaterMark, mountRequest2.clientHighWaterMark); + assertEquals(mountRequest1.seqNum, mountRequest2.seqNum); + + MountResponse mountResponse1 = new MountResponse(reqId(), rand.nextBoolean()); + MountResponse mountResponse2 = encodeThenDecode(mountResponse1); + assertEquals(MessageType.MOUNT_RESPONSE, mountResponse1.type()); + assertEquals(mountResponse1.type(), mountResponse2.type()); + assertEquals(mountResponse1.reqId, mountResponse2.reqId); + assertEquals(mountResponse1.partitionReady, mountResponse2.partitionReady); + + FeedData feedData1 = new FeedData(reqId(), rand.nextLong(), header); + FeedData feedData2 = encodeThenDecode(feedData1); + assertEquals(MessageType.FEED_DATA, feedData1.type()); + assertEquals(feedData1.type(), feedData2.type()); + assertEquals(feedData1.reqId, feedData2.reqId); + assertEquals(feedData1.transactionId, feedData2.transactionId); + assertEquals(feedData1.header, feedData2.header); + + FeedRequest feedRequest1 = new FeedRequest(reqId(), rand.nextLong()); + FeedRequest feedRequest2 = encodeThenDecode(feedRequest1); + assertEquals(MessageType.FEED_REQUEST, feedRequest1.type()); + assertEquals(feedRequest1.type(), feedRequest2.type()); + assertEquals(feedRequest1.reqId, feedRequest2.reqId); + assertEquals(feedRequest1.clientHighWaterMark, feedRequest2.clientHighWaterMark); + + FeedSuspended feedSuspended1 = new FeedSuspended(reqId()); + FeedSuspended feedSuspended2 = encodeThenDecode(feedSuspended1); + assertEquals(MessageType.FEED_SUSPENDED, feedSuspended1.type()); + assertEquals(feedSuspended1.type(), feedSuspended2.type()); + assertEquals(feedSuspended1.reqId, feedSuspended2.reqId); + + FlushRequest flushRequest1 = new FlushRequest(reqId()); + FlushRequest flushRequest2 = encodeThenDecode(flushRequest1); + assertEquals(MessageType.FLUSH_REQUEST, flushRequest1.type()); + assertEquals(flushRequest1.type(), flushRequest2.type()); + assertEquals(flushRequest1.reqId, flushRequest2.reqId); + + FlushResponse flushResponse1 = new FlushResponse(reqId(), rand.nextLong()); + FlushResponse flushResponse2 = encodeThenDecode(flushResponse1); + assertEquals(MessageType.FLUSH_RESPONSE, flushResponse1.type()); + assertEquals(flushResponse1.type(), flushResponse2.type()); + assertEquals(flushResponse1.reqId, flushResponse2.reqId); + assertEquals(flushResponse1.transactionId, flushResponse2.transactionId); + + TransactionDataRequest transactionDataRequest1 = new TransactionDataRequest(reqId(), rand.nextLong()); + TransactionDataRequest transactionDataRequest2 = encodeThenDecode(transactionDataRequest1); + assertEquals(MessageType.TRANSACTION_DATA_REQUEST, transactionDataRequest1.type()); + assertEquals(transactionDataRequest1.type(), transactionDataRequest2.type()); + assertEquals(transactionDataRequest1.reqId, transactionDataRequest2.reqId); + assertEquals(transactionDataRequest1.transactionId, transactionDataRequest2.transactionId); + + TransactionDataResponse transactionDataResponse1 = + new TransactionDataResponse(reqId(), rand.nextLong(), data, Utils.checksum(data)); + TransactionDataResponse transactionDataResponse2 = encodeThenDecode(transactionDataResponse1); + assertEquals(MessageType.TRANSACTION_DATA_RESPONSE, transactionDataResponse1.type()); + assertEquals(transactionDataResponse1.type(), transactionDataResponse2.type()); + assertEquals(transactionDataResponse1.reqId, transactionDataResponse2.reqId); + assertEquals(transactionDataResponse1.transactionId, transactionDataResponse2.transactionId); + assertNotNull(transactionDataResponse1.data); + assertNotNull(transactionDataResponse2.data); + assertNull(transactionDataResponse1.exception); + assertNull(transactionDataResponse2.exception); + assertTrue(Arrays.equals(transactionDataResponse1.data, transactionDataResponse2.data)); + assertEquals(transactionDataResponse1.checksum, transactionDataResponse2.checksum); + + TransactionDataResponse transactionDataResponse3 = + new TransactionDataResponse(reqId(), rand.nextLong(), new RpcException(Integer.toString(rand.nextInt()))); + TransactionDataResponse transactionDataResponse4 = encodeThenDecode(transactionDataResponse3); + assertEquals(MessageType.TRANSACTION_DATA_RESPONSE, transactionDataResponse1.type()); + assertEquals(transactionDataResponse3.type(), transactionDataResponse4.type()); + assertEquals(transactionDataResponse3.reqId, transactionDataResponse4.reqId); + assertEquals(transactionDataResponse3.transactionId, transactionDataResponse4.transactionId); + assertNull(transactionDataResponse3.data); + assertNull(transactionDataResponse4.data); + assertEquals(0, transactionDataResponse3.checksum); + assertEquals(0, transactionDataResponse4.checksum); + assertNotNull(transactionDataResponse3.exception); + assertNotNull(transactionDataResponse4.exception); + assertEquals(transactionDataResponse3.exception.toString(), transactionDataResponse4.exception.toString()); + + HighWaterMarkRequest highWaterMarkRequest1 = new HighWaterMarkRequest(reqId()); + HighWaterMarkRequest highWaterMarkRequest2 = encodeThenDecode(highWaterMarkRequest1); + assertEquals(MessageType.HIGH_WATER_MARK_REQUEST, highWaterMarkRequest1.type()); + assertEquals(highWaterMarkRequest1.type(), highWaterMarkRequest2.type()); + assertEquals(highWaterMarkRequest1.reqId, highWaterMarkRequest2.reqId); + + HighWaterMarkResponse highWaterMarkResponse1 = new HighWaterMarkResponse(reqId(), rand.nextLong()); + HighWaterMarkResponse highWaterMarkResponse2 = encodeThenDecode(highWaterMarkResponse1); + assertEquals(MessageType.HIGH_WATER_MARK_RESPONSE, highWaterMarkResponse1.type()); + assertEquals(highWaterMarkResponse1.type(), highWaterMarkResponse2.type()); + assertEquals(highWaterMarkResponse1.reqId, highWaterMarkResponse2.reqId); + assertEquals(highWaterMarkResponse1.transactionId, highWaterMarkResponse2.transactionId); + } + + @SuppressWarnings("unchecked") + private T encodeThenDecode(T message) { + ByteArrayMessageAttributeWriter writer = new ByteArrayMessageAttributeWriter(); + codec.encode(message, writer); + ByteArrayMessageAttributeReader reader = new ByteArrayMessageAttributeReader(writer.toByteArray()); + return (T) codec.decode(reader); + } + + private ReqId reqId() { + return new ReqId(rand.nextLong(), rand.nextLong()); + } + + private int[] lock() { + int n = rand.nextInt(3); + int[] lock = new int[n]; + + for (int i = 0; i < n; i++) { + lock[i] = rand.nextInt(); + } + + return lock; + } + + private byte[] data() { + return Long.toOctalString(rand.nextLong()).getBytes(StandardCharsets.UTF_8); + } + +} diff --git a/waltz-server/src/main/java/com/wepay/waltz/server/internal/Locks.java b/waltz-server/src/main/java/com/wepay/waltz/server/internal/Locks.java index 7104499b..b0e15a2e 100644 --- a/waltz-server/src/main/java/com/wepay/waltz/server/internal/Locks.java +++ b/waltz-server/src/main/java/com/wepay/waltz/server/internal/Locks.java @@ -19,10 +19,10 @@ public class Locks { private final BitSet locks; private final int numHashFuncs; - private int numActiveLocks = 0; + private int numActiveLockRequests = 0; /** - * Class contsructor. + * Class constructor. * @param size The size of the optimistic lock table. * @param numHashFuncs The number of hash functions to use inorder to reduce the false positives. * @param defaultHighWaterMark The default high-water mark to set initially. @@ -37,15 +37,17 @@ public Locks(int size, int numHashFuncs, long defaultHighWaterMark) { /** * Returns True if the locks doesn't overlap with the locks that are currently in use, otherwise returns False. * Also acquires the write locks while returning True. - * @param request List of write locks and read locks to use. + * @param request Locks to use. * @return True if the locks doesn't overlap with the locks that are currently in use, otherwise returns False. */ public boolean begin(LockRequest request) { synchronized (locks) { - if (begin(request.writeLocks) && begin(request.readLocks)) { - // Mark entries for write locks + if (begin(request.writeLocks) && begin(request.readLocks) && begin(request.appendLocks)) { + // Mark entries for write locks and append locks. + // Entries for Read locks are not marked since they won't change the high-water marks. mark(request.writeLocks); - numActiveLocks++; + mark(request.appendLocks); + numActiveLockRequests++; return true; } else { return false; @@ -74,7 +76,8 @@ private boolean begin(int[] lockRequest) { public void end(LockRequest request) { synchronized (locks) { unmark(request.writeLocks); - numActiveLocks--; + unmark(request.appendLocks); + numActiveLockRequests--; } } @@ -100,7 +103,7 @@ private void unmark(int[] lockRequest) { /** * Returns the estimated transaction ID of the last successful transaction for the given lock. - * @param request Lock for which last successful transaction ID is to be determined. + * @param request Locks for which last successful transaction ID is to be determined. * @return the estimated transaction ID of the last successful transaction for the given lock. */ public long getLockHighWaterMark(LockRequest request) { @@ -129,11 +132,16 @@ private long getLockHighWaterMark(int[] lockRequest) { /** * Updates the transaction ID in the locks table. - * @param request The write locks for which the transaction ID has to be updated. + * @param request Locks for which the transaction ID has to be updated. * @param transactionId The new transaction ID of the lock. */ public void commit(LockRequest request, long transactionId) { - for (int hash : request.writeLocks) { + commit(request.writeLocks, transactionId); + commit(request.appendLocks, transactionId); + } + + private void commit(int[] locks, long transactionId) { + for (int hash : locks) { for (int i = 0; i < numHashFuncs; i++) { // linear congruential generator hash = nextHash(hash); @@ -150,7 +158,7 @@ public void reset(long defaultHighWaterMark) { synchronized (locks) { Arrays.fill(highWaterMarks, defaultHighWaterMark); locks.clear(); - numActiveLocks = 0; + numActiveLockRequests = 0; } } @@ -160,7 +168,7 @@ public void reset(long defaultHighWaterMark) { */ public int numActiveLocks() { synchronized (locks) { - return numActiveLocks; + return numActiveLockRequests; } } @@ -177,10 +185,11 @@ private int index(int hash) { * Returns the lock request which contains an array of write locks and read locks. * @param writeLocks Array of write locks to use. * @param readLocks Array of read locks to use. + * @param appendLocks Array of read locks to use. * @return the lock request which contains an array of write locks and read locks. */ - public static LockRequest createRequest(int[] writeLocks, int[] readLocks) { - return new LockRequest(writeLocks, readLocks); + public static LockRequest createRequest(int[] writeLocks, int[] readLocks, int[] appendLocks) { + return new LockRequest(writeLocks, readLocks, appendLocks); } /** @@ -189,10 +198,12 @@ public static LockRequest createRequest(int[] writeLocks, int[] readLocks) { public static class LockRequest { private final int[] writeLocks; private final int[] readLocks; + private final int[] appendLocks; - LockRequest(int[] writeLocks, int[] readLocks) { + LockRequest(int[] writeLocks, int[] readLocks, int[] appendLocks) { this.writeLocks = writeLocks; this.readLocks = readLocks; + this.appendLocks = appendLocks; } } diff --git a/waltz-server/src/main/java/com/wepay/waltz/server/internal/Partition.java b/waltz-server/src/main/java/com/wepay/waltz/server/internal/Partition.java index 6e85fe68..da37f8c9 100644 --- a/waltz-server/src/main/java/com/wepay/waltz/server/internal/Partition.java +++ b/waltz-server/src/main/java/com/wepay/waltz/server/internal/Partition.java @@ -532,7 +532,7 @@ protected void process(AppendContext context) throws Exception { } else { AppendRequest request = context.request; - Locks.LockRequest lockRequest = Locks.createRequest(request.writeLockRequest, request.readLockRequest); + Locks.LockRequest lockRequest = Locks.createRequest(request.writeLockRequest, request.readLockRequest, request.appendLockRequest); // Begin locking while (!locks.begin(lockRequest)) { if (storePartition.numPendingAppends() == 0) { diff --git a/waltz-server/src/main/java/com/wepay/waltz/server/internal/WaltzServerHandler.java b/waltz-server/src/main/java/com/wepay/waltz/server/internal/WaltzServerHandler.java index f15688ea..85c58e78 100644 --- a/waltz-server/src/main/java/com/wepay/waltz/server/internal/WaltzServerHandler.java +++ b/waltz-server/src/main/java/com/wepay/waltz/server/internal/WaltzServerHandler.java @@ -10,6 +10,7 @@ import com.wepay.waltz.common.message.CheckStorageConnectivityResponse; import com.wepay.waltz.common.message.MessageCodecV0; import com.wepay.waltz.common.message.MessageCodecV1; +import com.wepay.waltz.common.message.MessageCodecV2; import com.wepay.waltz.common.message.MessageType; import com.wepay.waltz.common.message.MountRequest; import com.wepay.waltz.common.metadata.ReplicaId; @@ -38,6 +39,7 @@ public class WaltzServerHandler extends MessageHandler implements PartitionClien static { CODECS.put(MessageCodecV0.VERSION, MessageCodecV0.INSTANCE); CODECS.put(MessageCodecV1.VERSION, MessageCodecV1.INSTANCE); + CODECS.put(MessageCodecV2.VERSION, MessageCodecV2.INSTANCE); } private static final String HELLO_MESSAGE = "Waltz Server"; diff --git a/waltz-server/src/test/java/com/wepay/waltz/server/internal/LocksTest.java b/waltz-server/src/test/java/com/wepay/waltz/server/internal/LocksTest.java index 30c475b2..bc1c57d0 100644 --- a/waltz-server/src/test/java/com/wepay/waltz/server/internal/LocksTest.java +++ b/waltz-server/src/test/java/com/wepay/waltz/server/internal/LocksTest.java @@ -20,7 +20,7 @@ public void testSingleLock() { // A write lock for (int i = 0; i < 10; i++) { - lockRequest = Locks.createRequest(array(rand.nextInt(Integer.MAX_VALUE)), noLocks); + lockRequest = Locks.createRequest(array(rand.nextInt(Integer.MAX_VALUE)), noLocks, noLocks); assertTrue(locks.begin(lockRequest)); assertTrue(locks.getLockHighWaterMark(lockRequest) <= transactionId); @@ -38,28 +38,52 @@ public void testSingleLock() { transactionId++; } + // A read lock for (int i = 0; i < 10; i++) { // Get the lock high water mark using a write lock. (Do not commit) int lock = rand.nextInt(Integer.MAX_VALUE); - lockRequest = Locks.createRequest(array(lock), noLocks); + lockRequest = Locks.createRequest(array(lock), noLocks, noLocks); assertTrue(locks.begin(lockRequest)); long lockHighWaterMark = locks.getLockHighWaterMark(lockRequest); locks.end(lockRequest); // Get the lock high-water make using a read lock, and compare. Do commit. - lockRequest = Locks.createRequest(noLocks, array(lock)); + lockRequest = Locks.createRequest(noLocks, array(lock), noLocks); assertTrue(locks.begin(lockRequest)); assertEquals(lockHighWaterMark, locks.getLockHighWaterMark(lockRequest)); locks.commit(lockRequest, transactionId); locks.end(lockRequest); // Repeat. Commit should not have change the lock high-water mark. - lockRequest = Locks.createRequest(noLocks, array(lock)); + lockRequest = Locks.createRequest(noLocks, array(lock), noLocks); assertTrue(locks.begin(lockRequest)); assertEquals(lockHighWaterMark, locks.getLockHighWaterMark(lockRequest)); locks.commit(lockRequest, transactionId); locks.end(lockRequest); } + + // An append lock + for (int i = 0; i < 10; i++) { + // Get the lock high water mark using a write lock. (Do not commit) + int lock = rand.nextInt(Integer.MAX_VALUE); + lockRequest = Locks.createRequest(array(lock), noLocks, noLocks); + assertTrue(locks.begin(lockRequest)); + long lockHighWaterMark = locks.getLockHighWaterMark(lockRequest); + locks.end(lockRequest); + + // Get the lock high-water make using an append lock, and compare. Do commit. + lockRequest = Locks.createRequest(noLocks, noLocks, array(lock)); + assertTrue(locks.begin(lockRequest)); + assertEquals(-1L, locks.getLockHighWaterMark(lockRequest)); + locks.commit(lockRequest, transactionId); + locks.end(lockRequest); + + // Get the lock high-water make using a read lock, and compare. + lockRequest = Locks.createRequest(noLocks, array(lock), noLocks); + assertTrue(locks.begin(lockRequest)); + assertEquals(transactionId, locks.getLockHighWaterMark(lockRequest)); + locks.end(lockRequest); + } } @Test @@ -75,76 +99,76 @@ public void testMultipleWriteLocks() { Locks.LockRequest lockRequest; // Commit accountId & paymentId at transactionId1 - lockRequest = Locks.createRequest(array(accountId, paymentId), noLocks); + lockRequest = Locks.createRequest(array(accountId, paymentId), noLocks, noLocks); assertTrue(locks.begin(lockRequest)); assertTrue(locks.getLockHighWaterMark(lockRequest) <= transactionId1); locks.commit(lockRequest, transactionId1); locks.end(lockRequest); // The lock high-water mark for accountId is at transactionId1 - lockRequest = Locks.createRequest(array(accountId), noLocks); + lockRequest = Locks.createRequest(array(accountId), noLocks, noLocks); assertTrue(locks.begin(lockRequest)); assertEquals(transactionId1, locks.getLockHighWaterMark(lockRequest)); locks.end(lockRequest); // The lock high-water mark for paymentId is at transactionId1 - lockRequest = Locks.createRequest(array(paymentId), noLocks); + lockRequest = Locks.createRequest(array(paymentId), noLocks, noLocks); assertTrue(locks.begin(lockRequest)); assertEquals(transactionId1, locks.getLockHighWaterMark(lockRequest)); locks.end(lockRequest); // The lock high-water mark for accountId & paymentId is at transactionId1 - lockRequest = Locks.createRequest(array(accountId, paymentId), noLocks); + lockRequest = Locks.createRequest(array(accountId, paymentId), noLocks, noLocks); assertTrue(locks.begin(lockRequest)); assertEquals(transactionId1, locks.getLockHighWaterMark(lockRequest)); locks.end(lockRequest); // Commit just accountId at transactionId2 - lockRequest = Locks.createRequest(array(accountId), noLocks); + lockRequest = Locks.createRequest(array(accountId), noLocks, noLocks); assertTrue(locks.begin(lockRequest)); assertTrue(locks.getLockHighWaterMark(lockRequest) < transactionId2); locks.commit(lockRequest, transactionId2); locks.end(lockRequest); // The lock high-water mark for accountId is advanced to transactionId2 - lockRequest = Locks.createRequest(array(accountId), noLocks); + lockRequest = Locks.createRequest(array(accountId), noLocks, noLocks); assertTrue(locks.begin(lockRequest)); assertEquals(transactionId2, locks.getLockHighWaterMark(lockRequest)); locks.end(lockRequest); // The lock high-water mark for paymentId is still at transactionId1 - lockRequest = Locks.createRequest(array(paymentId), noLocks); + lockRequest = Locks.createRequest(array(paymentId), noLocks, noLocks); assertTrue(locks.begin(lockRequest)); assertEquals(transactionId1, locks.getLockHighWaterMark(lockRequest)); locks.end(lockRequest); // The lock high-water mark for accountId & paymentId is advanced to transactionId2 - lockRequest = Locks.createRequest(array(accountId, paymentId), noLocks); + lockRequest = Locks.createRequest(array(accountId, paymentId), noLocks, noLocks); assertTrue(locks.begin(lockRequest)); assertEquals(transactionId2, locks.getLockHighWaterMark(lockRequest)); locks.end(lockRequest); // Commit just paymentId at transactionId3 - lockRequest = Locks.createRequest(array(paymentId), noLocks); + lockRequest = Locks.createRequest(array(paymentId), noLocks, noLocks); assertTrue(locks.begin(lockRequest)); assertTrue(locks.getLockHighWaterMark(lockRequest) < transactionId3); locks.commit(lockRequest, transactionId3); locks.end(lockRequest); // The lock high-water mark for accountId is still at transactionId2 - lockRequest = Locks.createRequest(array(accountId), noLocks); + lockRequest = Locks.createRequest(array(accountId), noLocks, noLocks); assertTrue(locks.begin(lockRequest)); assertEquals(transactionId2, locks.getLockHighWaterMark(lockRequest)); locks.end(lockRequest); // The lock high-water mark for paymentId is is advanced to transactionId3 - lockRequest = Locks.createRequest(array(paymentId), noLocks); + lockRequest = Locks.createRequest(array(paymentId), noLocks, noLocks); assertTrue(locks.begin(lockRequest)); assertEquals(transactionId3, locks.getLockHighWaterMark(lockRequest)); locks.end(lockRequest); // The lock high-water mark for accountId & paymentId is advanced to transactionId3 - lockRequest = Locks.createRequest(array(accountId, paymentId), noLocks); + lockRequest = Locks.createRequest(array(accountId, paymentId), noLocks, noLocks); assertTrue(locks.begin(lockRequest)); assertEquals(transactionId3, locks.getLockHighWaterMark(lockRequest)); locks.end(lockRequest); @@ -168,85 +192,85 @@ public void testMultipleReadLocks() { Locks.LockRequest lockRequest; // Commit accountId & paymentId at transactionId1 - lockRequest = Locks.createRequest(array(accountId, paymentId), noLocks); + lockRequest = Locks.createRequest(array(accountId, paymentId), noLocks, noLocks); assertTrue(locks.begin(lockRequest)); assertTrue(locks.getLockHighWaterMark(lockRequest) <= transactionId1); locks.commit(lockRequest, transactionId1); locks.end(lockRequest); // The lock high-water mark for accountId is at transactionId1 - lockRequest = Locks.createRequest(noLocks, array(accountId)); + lockRequest = Locks.createRequest(noLocks, array(accountId), noLocks); assertTrue(locks.begin(lockRequest)); assertEquals(transactionId1, locks.getLockHighWaterMark(lockRequest)); locks.end(lockRequest); // The lock high-water mark for paymentId is at transactionId1 - lockRequest = Locks.createRequest(noLocks, array(paymentId)); + lockRequest = Locks.createRequest(noLocks, array(paymentId), noLocks); assertTrue(locks.begin(lockRequest)); assertEquals(transactionId1, locks.getLockHighWaterMark(lockRequest)); locks.end(lockRequest); // The lock high-water mark for accountId & paymentId is at transactionId1 - lockRequest = Locks.createRequest(noLocks, array(accountId, paymentId)); + lockRequest = Locks.createRequest(noLocks, array(accountId, paymentId), noLocks); assertTrue(locks.begin(lockRequest)); assertEquals(transactionId1, locks.getLockHighWaterMark(lockRequest)); locks.end(lockRequest); - lockRequest = Locks.createRequest(array(accountId), array(paymentId)); + lockRequest = Locks.createRequest(array(accountId), array(paymentId), noLocks); assertEquals(transactionId1, locks.getLockHighWaterMark(lockRequest)); locks.end(lockRequest); // Commit just accountId at transactionId2 - lockRequest = Locks.createRequest(array(accountId), noLocks); + lockRequest = Locks.createRequest(array(accountId), noLocks, noLocks); assertTrue(locks.begin(lockRequest)); assertTrue(locks.getLockHighWaterMark(lockRequest) < transactionId2); locks.commit(lockRequest, transactionId2); locks.end(lockRequest); // The lock high-water mark for accountId is advanced to transactionId2 - lockRequest = Locks.createRequest(noLocks, array(accountId)); + lockRequest = Locks.createRequest(noLocks, array(accountId), noLocks); assertTrue(locks.begin(lockRequest)); assertEquals(transactionId2, locks.getLockHighWaterMark(lockRequest)); locks.end(lockRequest); // The lock high-water mark for paymentId is still at transactionId1 - lockRequest = Locks.createRequest(noLocks, array(paymentId)); + lockRequest = Locks.createRequest(noLocks, array(paymentId), noLocks); assertTrue(locks.begin(lockRequest)); assertEquals(transactionId1, locks.getLockHighWaterMark(lockRequest)); locks.end(lockRequest); // The lock high-water mark for accountId & paymentId is advanced to transactionId2 - lockRequest = Locks.createRequest(noLocks, array(accountId, paymentId)); + lockRequest = Locks.createRequest(noLocks, array(accountId, paymentId), noLocks); assertTrue(locks.begin(lockRequest)); assertEquals(transactionId2, locks.getLockHighWaterMark(lockRequest)); locks.end(lockRequest); // Commit just paymentId at transactionId3 - lockRequest = Locks.createRequest(array(paymentId), noLocks); + lockRequest = Locks.createRequest(array(paymentId), noLocks, noLocks); assertTrue(locks.begin(lockRequest)); assertTrue(locks.getLockHighWaterMark(lockRequest) < transactionId3); locks.commit(lockRequest, transactionId3); locks.end(lockRequest); // The lock high-water mark for accountId is still at transactionId2 - lockRequest = Locks.createRequest(noLocks, array(accountId)); + lockRequest = Locks.createRequest(noLocks, array(accountId), noLocks); assertTrue(locks.begin(lockRequest)); assertEquals(transactionId2, locks.getLockHighWaterMark(lockRequest)); locks.end(lockRequest); // The lock high-water mark for paymentId is is advanced to transactionId3 - lockRequest = Locks.createRequest(noLocks, array(paymentId)); + lockRequest = Locks.createRequest(noLocks, array(paymentId), noLocks); assertTrue(locks.begin(lockRequest)); assertEquals(transactionId3, locks.getLockHighWaterMark(lockRequest)); locks.end(lockRequest); // The lock high-water mark for accountId & paymentId is advanced to transactionId3 - lockRequest = Locks.createRequest(noLocks, array(accountId, paymentId)); + lockRequest = Locks.createRequest(noLocks, array(accountId, paymentId), noLocks); assertTrue(locks.begin(lockRequest)); assertEquals(transactionId3, locks.getLockHighWaterMark(lockRequest)); locks.end(lockRequest); - lockRequest = Locks.createRequest(array(accountId), array(paymentId)); + lockRequest = Locks.createRequest(array(accountId), array(paymentId), noLocks); assertTrue(locks.begin(lockRequest)); assertEquals(transactionId3, locks.getLockHighWaterMark(lockRequest)); locks.end(lockRequest); @@ -257,6 +281,50 @@ public void testMultipleReadLocks() { } } + @Test + public void testHierarchicalResourceScenario() { + Locks locks = new Locks(100, 3, -1L); + long getResourceFirstTime = rand.nextInt(Integer.MAX_VALUE); + long modifyChildResourceFirstTime = getResourceFirstTime + 1; + long modifyChildResourceSecondTime = getResourceFirstTime + 2; + long getParentResourceSecondTime = getResourceFirstTime + 3; + Locks.LockRequest lockRequest; + + final int parent = 12345; // a parent resource + final int child = 6789; // a child resource + + // get the resource + lockRequest = Locks.createRequest(array(parent, child), noLocks, noLocks); + assertTrue(locks.begin(lockRequest)); + assertEquals(-1L, locks.getLockHighWaterMark(lockRequest)); + locks.commit(lockRequest, getResourceFirstTime); + locks.end(lockRequest); + + // first modification of the child resource + lockRequest = Locks.createRequest(noLocks, array(parent), array(child)); + assertTrue(locks.begin(lockRequest)); + // Lock high water-mark be getResourceFirstTime + assertEquals(getResourceFirstTime, locks.getLockHighWaterMark(lockRequest)); + locks.commit(lockRequest, modifyChildResourceFirstTime); + locks.end(lockRequest); + + // second modification of the child resource + lockRequest = Locks.createRequest(noLocks, array(parent), array(child)); + assertTrue(locks.begin(lockRequest)); + // Lock high water-mark should be getResourceFirstTime + assertEquals(getResourceFirstTime, locks.getLockHighWaterMark(lockRequest)); + locks.commit(lockRequest, modifyChildResourceSecondTime); + locks.end(lockRequest); + + // get the resource + lockRequest = Locks.createRequest(array(parent, child), noLocks, noLocks); + assertTrue(locks.begin(lockRequest)); + // Lock high water-mark should be modifyChildResourceSecondTime. + assertEquals(modifyChildResourceSecondTime, locks.getLockHighWaterMark(lockRequest)); + locks.commit(lockRequest, getParentResourceSecondTime); + locks.end(lockRequest); + } + @Test public void testCollisions() { int size = 300; @@ -270,7 +338,7 @@ public void testCollisions() { int numLockFailures = 0; while (numLockFailures == 0) { int accountId = rand.nextInt(Integer.MAX_VALUE); - Locks.LockRequest lockRequest = Locks.createRequest(array(accountId), noLocks); + Locks.LockRequest lockRequest = Locks.createRequest(array(accountId), noLocks, noLocks); assertTrue(locks.begin(lockRequest)); if (locks.getLockHighWaterMark(lockRequest) <= clientHighWaterMark) { diff --git a/waltz-server/src/test/java/com/wepay/waltz/server/internal/PartitionTest.java b/waltz-server/src/test/java/com/wepay/waltz/server/internal/PartitionTest.java index 5f6d817d..61df4f1f 100644 --- a/waltz-server/src/test/java/com/wepay/waltz/server/internal/PartitionTest.java +++ b/waltz-server/src/test/java/com/wepay/waltz/server/internal/PartitionTest.java @@ -593,7 +593,7 @@ private void append( for (int i = 0; i < numTransactions; i++) { byte[] data = data(); partition.receiveMessage( - new AppendRequest(reqId(partitionClient.clientId()), -1L, NO_LOCK, NO_LOCK, HEADER, data, Utils.checksum(data)), + new AppendRequest(reqId(partitionClient.clientId()), -1L, NO_LOCK, NO_LOCK, NO_LOCK, HEADER, data, Utils.checksum(data)), partitionClient ); } diff --git a/waltz-test/src/main/java/com/wepay/waltz/test/mock/MockContext.java b/waltz-test/src/main/java/com/wepay/waltz/test/mock/MockContext.java index 9c90d0ac..25dde619 100644 --- a/waltz-test/src/main/java/com/wepay/waltz/test/mock/MockContext.java +++ b/waltz-test/src/main/java/com/wepay/waltz/test/mock/MockContext.java @@ -22,19 +22,21 @@ public class MockContext extends TransactionContext { private final String data; private final List writeLocks; private final List readLocks; + private final List appendLocks; private final boolean retry; public static Builder builder() { return new Builder(); } - MockContext(int partitionId, int header, String data, List writeLocks, List readLocks, boolean retry) { + MockContext(int partitionId, int header, String data, List writeLocks, List readLocks, List appendLocks, boolean retry) { super(); this.partitionId = partitionId; this.header = header; this.data = data; this.writeLocks = writeLocks; this.readLocks = readLocks; + this.appendLocks = appendLocks; this.retry = retry; } @@ -53,6 +55,7 @@ public boolean execute(TransactionBuilder builder) { builder.setTransactionData(data, StringSerializer.INSTANCE); builder.setWriteLocks(writeLocks); builder.setReadLocks(readLocks); + builder.setAppendLocks(appendLocks); return true; } else { @@ -86,6 +89,7 @@ public static class Builder { private String data = null; private List writeLocks = new ArrayList<>(); private List readLocks = new ArrayList<>(); + private List appendLocks = new ArrayList<>(); private boolean retry = true; public Builder partitionId(int partitionId) { @@ -117,13 +121,20 @@ public Builder readLocks(int... locks) { return this; } + public Builder appendLocks(int... locks) { + for (int lock : locks) { + this.appendLocks.add(makeLock(lock)); + } + return this; + } + public Builder retry(boolean retry) { this.retry = retry; return this; } public MockContext build() { - return new MockContext(partitionId, header, data, writeLocks, readLocks, retry); + return new MockContext(partitionId, header, data, writeLocks, readLocks, appendLocks, retry); } }