Skip to content

Commit

Permalink
HDDS-2048: State check during container state transition in datanode …
Browse files Browse the repository at this point in the history
…should be lock protected (#1375)
  • Loading branch information
lokeshj1703 authored and nandakumar131 committed Sep 10, 2019
1 parent bc2d3a7 commit c3beeb7
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,9 @@
import org.slf4j.LoggerFactory;

/**
* Class to perform KeyValue Container operations.
* Class to perform KeyValue Container operations. Any modifications to
* KeyValueContainer object should ideally be done via api exposed in
* KeyValueHandler class.
*/
public class KeyValueContainer implements Container<KeyValueContainerData> {

Expand Down Expand Up @@ -554,6 +556,8 @@ public boolean hasReadLock() {
* Acquire write lock.
*/
public void writeLock() {
// TODO: The lock for KeyValueContainer object should not be exposed
// publicly.
this.lock.writeLock().lock();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -881,75 +881,97 @@ public void exportContainer(final Container container,
@Override
public void markContainerForClose(Container container)
throws IOException {
// Move the container to CLOSING state only if it's OPEN
if (container.getContainerState() == State.OPEN) {
container.markContainerForClose();
sendICR(container);
container.writeLock();
try {
// Move the container to CLOSING state only if it's OPEN
if (container.getContainerState() == State.OPEN) {
container.markContainerForClose();
sendICR(container);
}
} finally {
container.writeUnlock();
}
}

@Override
public void markContainerUnhealthy(Container container)
throws IOException {
if (container.getContainerState() != State.UNHEALTHY) {
try {
container.markContainerUnhealthy();
} catch (IOException ex) {
// explicitly catch IOException here since the this operation
// will fail if the Rocksdb metadata is corrupted.
long id = container.getContainerData().getContainerID();
LOG.warn("Unexpected error while marking container "
+id+ " as unhealthy", ex);
} finally {
sendICR(container);
container.writeLock();
try {
if (container.getContainerState() != State.UNHEALTHY) {
try {
container.markContainerUnhealthy();
} catch (IOException ex) {
// explicitly catch IOException here since the this operation
// will fail if the Rocksdb metadata is corrupted.
long id = container.getContainerData().getContainerID();
LOG.warn("Unexpected error while marking container " + id
+ " as unhealthy", ex);
} finally {
sendICR(container);
}
}
} finally {
container.writeUnlock();
}
}

@Override
public void quasiCloseContainer(Container container)
throws IOException {
final State state = container.getContainerState();
// Quasi close call is idempotent.
if (state == State.QUASI_CLOSED) {
return;
}
// The container has to be in CLOSING state.
if (state != State.CLOSING) {
ContainerProtos.Result error = state == State.INVALID ?
INVALID_CONTAINER_STATE : CONTAINER_INTERNAL_ERROR;
throw new StorageContainerException("Cannot quasi close container #" +
container.getContainerData().getContainerID() + " while in " +
state + " state.", error);
container.writeLock();
try {
final State state = container.getContainerState();
// Quasi close call is idempotent.
if (state == State.QUASI_CLOSED) {
return;
}
// The container has to be in CLOSING state.
if (state != State.CLOSING) {
ContainerProtos.Result error =
state == State.INVALID ? INVALID_CONTAINER_STATE :
CONTAINER_INTERNAL_ERROR;
throw new StorageContainerException(
"Cannot quasi close container #" + container.getContainerData()
.getContainerID() + " while in " + state + " state.", error);
}
container.quasiClose();
sendICR(container);
} finally {
container.writeUnlock();
}
container.quasiClose();
sendICR(container);
}

@Override
public void closeContainer(Container container)
throws IOException {
final State state = container.getContainerState();
// Close call is idempotent.
if (state == State.CLOSED) {
return;
container.writeLock();
try {
final State state = container.getContainerState();
// Close call is idempotent.
if (state == State.CLOSED) {
return;
}
if (state == State.UNHEALTHY) {
throw new StorageContainerException(
"Cannot close container #" + container.getContainerData()
.getContainerID() + " while in " + state + " state.",
ContainerProtos.Result.CONTAINER_UNHEALTHY);
}
// The container has to be either in CLOSING or in QUASI_CLOSED state.
if (state != State.CLOSING && state != State.QUASI_CLOSED) {
ContainerProtos.Result error =
state == State.INVALID ? INVALID_CONTAINER_STATE :
CONTAINER_INTERNAL_ERROR;
throw new StorageContainerException(
"Cannot close container #" + container.getContainerData()
.getContainerID() + " while in " + state + " state.", error);
}
container.close();
sendICR(container);
} finally {
container.writeUnlock();
}
if (state == State.UNHEALTHY) {
throw new StorageContainerException(
"Cannot close container #" + container.getContainerData()
.getContainerID() + " while in " + state + " state.",
ContainerProtos.Result.CONTAINER_UNHEALTHY);
}
// The container has to be either in CLOSING or in QUASI_CLOSED state.
if (state != State.CLOSING && state != State.QUASI_CLOSED) {
ContainerProtos.Result error = state == State.INVALID ?
INVALID_CONTAINER_STATE : CONTAINER_INTERNAL_ERROR;
throw new StorageContainerException("Cannot close container #" +
container.getContainerData().getContainerID() + " while in " +
state + " state.", error);
}
container.close();
sendICR(container);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,21 +258,25 @@ public List<BlockData> listBlock(Container container, long startLocalID, int
Preconditions.checkArgument(count > 0,
"Count must be a positive number.");
container.readLock();
List<BlockData> result = null;
KeyValueContainerData cData = (KeyValueContainerData) container
.getContainerData();
try(ReferenceCountedDB db = BlockUtils.getDB(cData, config)) {
result = new ArrayList<>();
byte[] startKeyInBytes = Longs.toByteArray(startLocalID);
List<Map.Entry<byte[], byte[]>> range =
db.getStore().getSequentialRangeKVs(startKeyInBytes, count,
MetadataKeyFilters.getNormalKeyFilter());
for (Map.Entry<byte[], byte[]> entry : range) {
BlockData value = BlockUtils.getBlockData(entry.getValue());
BlockData data = new BlockData(value.getBlockID());
result.add(data);
try {
List<BlockData> result = null;
KeyValueContainerData cData =
(KeyValueContainerData) container.getContainerData();
try (ReferenceCountedDB db = BlockUtils.getDB(cData, config)) {
result = new ArrayList<>();
byte[] startKeyInBytes = Longs.toByteArray(startLocalID);
List<Map.Entry<byte[], byte[]>> range = db.getStore()
.getSequentialRangeKVs(startKeyInBytes, count,
MetadataKeyFilters.getNormalKeyFilter());
for (Map.Entry<byte[], byte[]> entry : range) {
BlockData value = BlockUtils.getBlockData(entry.getValue());
BlockData data = new BlockData(value.getBlockID());
result.add(data);
}
return result;
}
return result;
} finally {
container.readUnlock();
}
}

Expand Down

0 comments on commit c3beeb7

Please sign in to comment.