Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

a new lock type #77

Merged
merged 2 commits into from
Jan 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,11 @@ public interface TransactionBuilder {
*/
void setReadLocks(List<PartitionLocalLock> partitionLocalLocks);

/**
* Sets optimistic append locks.
*
* @param partitionLocalLocks a list of {@link PartitionLocalLock}
*/
void setAppendLocks(List<PartitionLocalLock> partitionLocalLocks);

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public class TransactionBuilderImpl implements TransactionBuilder {
private byte[] data = null;
private List<PartitionLocalLock> writeLocks;
private List<PartitionLocalLock> readLocks;
private List<PartitionLocalLock> appendLocks;

/**
* Class Constructor.
Expand Down Expand Up @@ -54,6 +55,11 @@ public void setReadLocks(List<PartitionLocalLock> locks) {
this.readLocks = locks;
}

@Override
public void setAppendLocks(List<PartitionLocalLock> locks) {
this.appendLocks = locks;
}

/**
* @return a new instance of {@link AppendRequest} to send to Waltz cluster.
*/
Expand All @@ -63,6 +69,7 @@ public AppendRequest buildRequest() {
clientHighWaterMark,
compileLockRequest(writeLocks),
compileLockRequest(readLocks),
compileLockRequest(appendLocks),
header,
data,
Utils.checksum(data)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.wepay.waltz.common.message.LockFailure;
import com.wepay.waltz.common.message.MessageCodecV0;
import com.wepay.waltz.common.message.MessageCodecV1;
import com.wepay.waltz.common.message.MessageCodecV2;
import com.wepay.waltz.common.message.MessageType;
import com.wepay.waltz.common.message.MountRequest;
import com.wepay.waltz.common.message.MountResponse;
Expand All @@ -35,6 +36,7 @@ public class WaltzClientHandler extends MessageHandler {
static {
CODECS.put(MessageCodecV0.VERSION, MessageCodecV0.INSTANCE);
CODECS.put(MessageCodecV1.VERSION, MessageCodecV1.INSTANCE);
CODECS.put(MessageCodecV2.VERSION, MessageCodecV2.INSTANCE);
}

private static final String HELLO_MESSAGE = "Waltz Client";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ public void test() {
.data("good transaction")
.writeLocks(1, 2, 3)
.readLocks(4, 5, 6)
.appendLocks(7, 8, 9)
.build();

TransactionBuilderImpl builder = new TransactionBuilderImpl(new ReqId(111, 222, 333, 444), 999);
Expand All @@ -43,8 +44,15 @@ public void test() {
MockContext.makeLock(6).hashCode()
};

int[] expectedAppendLockRequest = new int[]{
MockContext.makeLock(7).hashCode(),
MockContext.makeLock(8).hashCode(),
MockContext.makeLock(9).hashCode()
};

assertArrayEquals(expectedWriteLockRequest, request.writeLockRequest);
assertArrayEquals(expectedReadLockRequest, request.readLockRequest);
assertArrayEquals(expectedAppendLockRequest, request.appendLockRequest);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,28 @@ public class AppendRequest extends AbstractMessage {
public final long clientHighWaterMark;
public final int[] writeLockRequest;
public final int[] readLockRequest;
public final int[] appendLockRequest;
public final int header;
public final byte[] data;
public final int checksum;

@SuppressFBWarnings(value = "EI_EXPOSE_REP2", justification = "internal class")
public AppendRequest(ReqId reqId, long clientHighWaterMark, int[] writeLockRequest, int[] readLockRequest, int header, byte[] data, int checksum) {
public AppendRequest(
ReqId reqId,
long clientHighWaterMark,
int[] writeLockRequest,
int[] readLockRequest,
int[] appendLockRequest,
int header,
byte[] data,
int checksum
) {
super(reqId);

this.clientHighWaterMark = clientHighWaterMark;
this.writeLockRequest = writeLockRequest;
this.readLockRequest = readLockRequest;
this.appendLockRequest = appendLockRequest;
this.header = header;
this.data = data;
this.checksum = checksum;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public class MessageCodecV0 implements MessageCodec {
public static final MessageCodecV0 INSTANCE = new MessageCodecV0();

private static final byte MAGIC_BYTE = 'L';
private static final int[] NO_READ_LOCKS = new int[0];
private static final int[] NO_LOCKS = new int[0];

@Override
public byte magicByte() {
Expand Down Expand Up @@ -52,7 +52,7 @@ public Message decode(MessageAttributeReader reader) {
data = reader.readByteArray();
checksum = reader.readInt();
Utils.verifyChecksum(messageType, data, checksum);
return new AppendRequest(reqId, transactionId, writeLockRequest, NO_READ_LOCKS, header, data, checksum);
return new AppendRequest(reqId, transactionId, writeLockRequest, NO_LOCKS, NO_LOCKS, header, data, checksum);

case MessageType.FEED_REQUEST:
transactionId = reader.readLong(); // client High-water mark
Expand Down Expand Up @@ -131,6 +131,11 @@ public void encode(Message msg, MessageAttributeWriter writer) {
"read locks not supported, upgrade servers"
);
}
if (appendRequest.appendLockRequest.length > 0) {
throw new UnsupportedOperationException(
"append locks not supported, upgrade servers"
);
}

writer.writeLong(appendRequest.clientHighWaterMark);
writer.writeIntArray(appendRequest.writeLockRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ public class MessageCodecV1 implements MessageCodec {
public static final MessageCodecV1 INSTANCE = new MessageCodecV1();

private static final byte MAGIC_BYTE = 'L';
private static final int[] NO_LOCKS = new int[0];

@Override
public byte magicByte() {
Expand Down Expand Up @@ -55,7 +56,7 @@ public Message decode(MessageAttributeReader reader) {
data = reader.readByteArray();
checksum = reader.readInt();
Utils.verifyChecksum(messageType, data, checksum);
return new AppendRequest(reqId, transactionId, writeLockRequest, readLockRequest, header, data, checksum);
return new AppendRequest(reqId, transactionId, writeLockRequest, readLockRequest, NO_LOCKS, header, data, checksum);

case MessageType.FEED_REQUEST:
transactionId = reader.readLong(); // client High-water mark
Expand Down Expand Up @@ -139,6 +140,13 @@ public void encode(Message msg, MessageAttributeWriter writer) {

case MessageType.APPEND_REQUEST:
AppendRequest appendRequest = (AppendRequest) msg;

if (appendRequest.appendLockRequest.length > 0) {
throw new UnsupportedOperationException(
"append locks not supported, upgrade servers"
);
}

writer.writeLong(appendRequest.clientHighWaterMark);
writer.writeIntArray(appendRequest.writeLockRequest);
writer.writeIntArray(appendRequest.readLockRequest);
Expand Down
Loading