Skip to content

Commit

Permalink
Merge pull request #77 from ymatsuda/new_lock_mode
Browse files Browse the repository at this point in the history
a new lock type
  • Loading branch information
ymatsuda authored Jan 23, 2020
2 parents 7affc2c + c617663 commit 32f1b23
Show file tree
Hide file tree
Showing 17 changed files with 798 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,11 @@ public interface TransactionBuilder {
*/
void setReadLocks(List<PartitionLocalLock> partitionLocalLocks);

/**
* Sets optimistic append locks.
*
* @param partitionLocalLocks a list of {@link PartitionLocalLock}
*/
void setAppendLocks(List<PartitionLocalLock> partitionLocalLocks);

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public class TransactionBuilderImpl implements TransactionBuilder {
private byte[] data = null;
private List<PartitionLocalLock> writeLocks;
private List<PartitionLocalLock> readLocks;
private List<PartitionLocalLock> appendLocks;

/**
* Class Constructor.
Expand Down Expand Up @@ -54,6 +55,11 @@ public void setReadLocks(List<PartitionLocalLock> locks) {
this.readLocks = locks;
}

@Override
public void setAppendLocks(List<PartitionLocalLock> locks) {
this.appendLocks = locks;
}

/**
* @return a new instance of {@link AppendRequest} to send to Waltz cluster.
*/
Expand All @@ -63,6 +69,7 @@ public AppendRequest buildRequest() {
clientHighWaterMark,
compileLockRequest(writeLocks),
compileLockRequest(readLocks),
compileLockRequest(appendLocks),
header,
data,
Utils.checksum(data)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.wepay.waltz.common.message.LockFailure;
import com.wepay.waltz.common.message.MessageCodecV0;
import com.wepay.waltz.common.message.MessageCodecV1;
import com.wepay.waltz.common.message.MessageCodecV2;
import com.wepay.waltz.common.message.MessageType;
import com.wepay.waltz.common.message.MountRequest;
import com.wepay.waltz.common.message.MountResponse;
Expand All @@ -35,6 +36,7 @@ public class WaltzClientHandler extends MessageHandler {
static {
CODECS.put(MessageCodecV0.VERSION, MessageCodecV0.INSTANCE);
CODECS.put(MessageCodecV1.VERSION, MessageCodecV1.INSTANCE);
CODECS.put(MessageCodecV2.VERSION, MessageCodecV2.INSTANCE);
}

private static final String HELLO_MESSAGE = "Waltz Client";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ public void test() {
.data("good transaction")
.writeLocks(1, 2, 3)
.readLocks(4, 5, 6)
.appendLocks(7, 8, 9)
.build();

TransactionBuilderImpl builder = new TransactionBuilderImpl(new ReqId(111, 222, 333, 444), 999);
Expand All @@ -43,8 +44,15 @@ public void test() {
MockContext.makeLock(6).hashCode()
};

int[] expectedAppendLockRequest = new int[]{
MockContext.makeLock(7).hashCode(),
MockContext.makeLock(8).hashCode(),
MockContext.makeLock(9).hashCode()
};

assertArrayEquals(expectedWriteLockRequest, request.writeLockRequest);
assertArrayEquals(expectedReadLockRequest, request.readLockRequest);
assertArrayEquals(expectedAppendLockRequest, request.appendLockRequest);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,28 @@ public class AppendRequest extends AbstractMessage {
public final long clientHighWaterMark;
public final int[] writeLockRequest;
public final int[] readLockRequest;
public final int[] appendLockRequest;
public final int header;
public final byte[] data;
public final int checksum;

@SuppressFBWarnings(value = "EI_EXPOSE_REP2", justification = "internal class")
public AppendRequest(ReqId reqId, long clientHighWaterMark, int[] writeLockRequest, int[] readLockRequest, int header, byte[] data, int checksum) {
public AppendRequest(
ReqId reqId,
long clientHighWaterMark,
int[] writeLockRequest,
int[] readLockRequest,
int[] appendLockRequest,
int header,
byte[] data,
int checksum
) {
super(reqId);

this.clientHighWaterMark = clientHighWaterMark;
this.writeLockRequest = writeLockRequest;
this.readLockRequest = readLockRequest;
this.appendLockRequest = appendLockRequest;
this.header = header;
this.data = data;
this.checksum = checksum;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public class MessageCodecV0 implements MessageCodec {
public static final MessageCodecV0 INSTANCE = new MessageCodecV0();

private static final byte MAGIC_BYTE = 'L';
private static final int[] NO_READ_LOCKS = new int[0];
private static final int[] NO_LOCKS = new int[0];

@Override
public byte magicByte() {
Expand Down Expand Up @@ -52,7 +52,7 @@ public Message decode(MessageAttributeReader reader) {
data = reader.readByteArray();
checksum = reader.readInt();
Utils.verifyChecksum(messageType, data, checksum);
return new AppendRequest(reqId, transactionId, writeLockRequest, NO_READ_LOCKS, header, data, checksum);
return new AppendRequest(reqId, transactionId, writeLockRequest, NO_LOCKS, NO_LOCKS, header, data, checksum);

case MessageType.FEED_REQUEST:
transactionId = reader.readLong(); // client High-water mark
Expand Down Expand Up @@ -131,6 +131,11 @@ public void encode(Message msg, MessageAttributeWriter writer) {
"read locks not supported, upgrade servers"
);
}
if (appendRequest.appendLockRequest.length > 0) {
throw new UnsupportedOperationException(
"append locks not supported, upgrade servers"
);
}

writer.writeLong(appendRequest.clientHighWaterMark);
writer.writeIntArray(appendRequest.writeLockRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ public class MessageCodecV1 implements MessageCodec {
public static final MessageCodecV1 INSTANCE = new MessageCodecV1();

private static final byte MAGIC_BYTE = 'L';
private static final int[] NO_LOCKS = new int[0];

@Override
public byte magicByte() {
Expand Down Expand Up @@ -55,7 +56,7 @@ public Message decode(MessageAttributeReader reader) {
data = reader.readByteArray();
checksum = reader.readInt();
Utils.verifyChecksum(messageType, data, checksum);
return new AppendRequest(reqId, transactionId, writeLockRequest, readLockRequest, header, data, checksum);
return new AppendRequest(reqId, transactionId, writeLockRequest, readLockRequest, NO_LOCKS, header, data, checksum);

case MessageType.FEED_REQUEST:
transactionId = reader.readLong(); // client High-water mark
Expand Down Expand Up @@ -139,6 +140,13 @@ public void encode(Message msg, MessageAttributeWriter writer) {

case MessageType.APPEND_REQUEST:
AppendRequest appendRequest = (AppendRequest) msg;

if (appendRequest.appendLockRequest.length > 0) {
throw new UnsupportedOperationException(
"append locks not supported, upgrade servers"
);
}

writer.writeLong(appendRequest.clientHighWaterMark);
writer.writeIntArray(appendRequest.writeLockRequest);
writer.writeIntArray(appendRequest.readLockRequest);
Expand Down
Loading

0 comments on commit 32f1b23

Please sign in to comment.