Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ public void overrideExpiredRecords(QueryResult<R> query) throws IOException {
LOG.warn("Couldn't delete State Store record {}: {}", recordName,
record);
}
} else if (record.checkExpired(currentDriverTime)) {
} else if (!record.isExpired() && record.checkExpired(currentDriverTime)) {
String recordName = StateStoreUtils.getRecordName(record.getClass());
LOG.info("Override State Store record {}: {}", recordName, record);
commitRecords.add(record);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,18 @@
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

Expand All @@ -49,16 +56,22 @@
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateNamenodeRegistrationRequest;
import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.GenericTestUtils.DelayAnswer;
import org.apache.hadoop.util.Time;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Test the basic {@link MembershipStore} membership functionality.
*/
public class TestStateStoreMembershipState extends TestStateStoreBase {

private static Logger LOG = LoggerFactory.getLogger(
TestStateStoreMembershipState.class);

private static MembershipStore membershipStore;

@BeforeClass
Expand Down Expand Up @@ -529,6 +542,94 @@ public void testRegistrationExpiredAndDeletion()
}, 100, 3000);
}

@Test
public void testRegistrationExpiredRaceCondition()
throws InterruptedException, IOException, TimeoutException, ExecutionException {

// Populate the state store with a single NN element
// 1) ns0:nn0 - Expired
// Create a thread to refresh the cached records, pulling the expired record
// into the thread's memory
// Then insert an active record, and confirm that the refresh thread does not
// override the active record with the expired record it has in memory

MembershipState.setDeletionMs(-1);

MembershipState expiredReport = createRegistration(
NAMESERVICES[0], NAMENODES[0], ROUTERS[0],
FederationNamenodeServiceState.ACTIVE);
expiredReport.setDateModified(Time.monotonicNow() - 5000);
expiredReport.setState(FederationNamenodeServiceState.EXPIRED);
assertTrue(namenodeHeartbeat(expiredReport));

// Load cache
MembershipStore memStoreSpy = spy(membershipStore);
DelayAnswer delayer = new DelayAnswer(LOG);
doAnswer(delayer).when(memStoreSpy).overrideExpiredRecords(any());

ExecutorService pool = Executors.newFixedThreadPool(1);

Future<Boolean> cacheRefreshFuture = pool.submit(() -> {
try {
return memStoreSpy.loadCache(true);
} catch (IOException e) {
LOG.error("Exception while loading cache:", e);
}
return false;
});

// Verify quorum and entry
MembershipState quorumEntry = getNamenodeRegistration(
expiredReport.getNameserviceId(), expiredReport.getNamenodeId());
assertNull(quorumEntry);


MembershipState record = membershipStore.getDriver()
.get(MembershipState.class).getRecords().get(0);
assertNotNull(record);
assertEquals(ROUTERS[0], record.getRouterId());
assertEquals(FederationNamenodeServiceState.EXPIRED,
record.getState());

// Insert active while the other thread refreshing it's cache
MembershipState activeReport = createRegistration(
NAMESERVICES[0], NAMENODES[0], ROUTERS[0],
FederationNamenodeServiceState.ACTIVE);

delayer.waitForCall();
assertTrue(namenodeHeartbeat(activeReport));

record = membershipStore.getDriver()
.get(MembershipState.class).getRecords().get(0);
assertNotNull(record);
assertEquals(ROUTERS[0], record.getRouterId());
assertEquals(FederationNamenodeServiceState.ACTIVE,
record.getState());

quorumEntry = getExpiredNamenodeRegistration(
expiredReport.getNameserviceId(), expiredReport.getNamenodeId());
assertNull(quorumEntry);

// Allow the thread to finish refreshing the cache
delayer.proceed();
assertTrue(cacheRefreshFuture.get(5, TimeUnit.SECONDS));

// The state store should still be the active report
record = membershipStore.getDriver()
.get(MembershipState.class).getRecords().get(0);
assertNotNull(record);
assertEquals(ROUTERS[0], record.getRouterId());
assertEquals(FederationNamenodeServiceState.ACTIVE,
record.getState());

membershipStore.loadCache(true);

quorumEntry = getExpiredNamenodeRegistration(
expiredReport.getNameserviceId(),
expiredReport.getNamenodeId());
assertNull(quorumEntry);
}

@Test
public void testNamespaceInfoWithUnavailableNameNodeRegistration()
throws IOException {
Expand Down