Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,12 @@ public void updateResponseState(RpcHeaderProtos.RpcResponseHeaderProto.Builder h
*/
@Override
public void receiveResponseState(RpcHeaderProtos.RpcResponseHeaderProto header) {
sharedGlobalStateId.accumulate(header.getStateId());
if (header.getStateId() == 0 && sharedGlobalStateId.get() > 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stateId 0 means no state id right?
This is different than msync being 0?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume stateId is an integer and protobuf will return 0 for an integer if it is not set?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, stateID 0 means no stateId is set. Msync doesn't have a return value.

There is another bug in the router in that it accepts 0 as a value to advance it's cachedStateID to.
Ideally sharedGlobalStateId.get() > 0 should not be necessary here. For now it captures namenodes that actually had STATE_ID_CONTEXT enabled to begin with. But stale reads could happen with a namenode that has never had STATE_ID_CONTEXT enabled.

Fixing this will touch other tests so I'm debating whether to try fix that in this PR or separately. I'm leaning towards a separate PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove the sharedGlobalStateId.get() > 0, right?

If sharedGlobalStateId.get() < 0, routers already fallback to active and no need to reset. If it is > 0 and then we see a request without StateID, we will reset this counter and routers will fallback to active.

Adding sharedGlobalStateId.get() > 0 doesn't seem to make a difference.

Copy link
Member Author

@simbadzina simbadzina May 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tests in TestNoNamenodesAvailableLongTime rely on the router allowing a stateId of 0. So having sharedGlobalStateId.get() > 0 allows this behavior while guarding against when the sharedGlobalStateId has advances beyond zero.

The tests in TestNoNamenodesAvailableLongTime do need to be fixed but I would like to limit the scope of this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to fixing the tests and associated check in follow on PR.

sharedGlobalStateId.reset();
poolLocalStateId.reset();
} else {
sharedGlobalStateId.accumulate(header.getStateId());
Copy link
Contributor

@ctrezzo ctrezzo May 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@simbadzina I have a naive question: What protects us here from the state where header.getStateId() > 0 && header.getStateId() < sharedGlobalStateId?

It seems like if this case were to occur then sharedGlobalStateId would go backwards.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sharedGlobalStateID is created as new LongAccumulator(Math::max, Long.MIN_VALUE)
So accumulate either keeps the current value or moves it forward.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah makes sense. Thanks.

}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,7 @@ public void teardown() throws IOException {
public void startUpCluster(int numberOfObserver, Configuration confOverrides) throws Exception {
int numberOfNamenode = 2 + numberOfObserver;
Configuration conf = new Configuration(false);
conf.setBoolean(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_DEFAULT_KEY, true);
conf.setBoolean(DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, true);
conf.set(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, "0ms");
conf.setBoolean(DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY, true);
setConfDefaults(conf);
if (confOverrides != null) {
confOverrides
.iterator()
Expand Down Expand Up @@ -153,6 +150,13 @@ public void startUpCluster(int numberOfObserver, Configuration confOverrides) th
routerContext = cluster.getRandomRouter();
}

private void setConfDefaults(Configuration conf) {
conf.setBoolean(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_DEFAULT_KEY, true);
conf.setBoolean(DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, true);
conf.set(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, "0ms");
conf.setBoolean(DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY, true);
}

public enum ConfigSetting {
USE_NAMENODE_PROXY_FLAG,
USE_ROUTER_OBSERVER_READ_PROXY_PROVIDER,
Expand Down Expand Up @@ -972,4 +976,55 @@ public void testMsyncWithNoNamespacesEligibleForCRS(ConfigSetting configSetting)
// There should no calls to any namespace.
assertEquals("No calls to any namespace", 0, rpcCountForActive);
}

@EnumSource(ConfigSetting.class)
@ParameterizedTest
public void testRestartingNamenodeWithStateIDContextDisabled(ConfigSetting configSetting)
throws Exception {
fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads(configSetting));
Path path = new Path("/testFile1");
// Send Create call to active
fileSystem.create(path).close();

// Send read request
fileSystem.open(path).close();

long observerCount1 = routerContext.getRouter().getRpcServer()
.getRPCMetrics().getObserverProxyOps();

// Restart active namenodes and disable sending state id.
restartActiveWithStateIDContextDisabled();

Configuration conf = getConfToEnableObserverReads(configSetting);
conf.setBoolean("fs.hdfs.impl.disable.cache", true);
FileSystem fileSystem2 = routerContext.getFileSystem(conf);
fileSystem2.msync();
fileSystem2.open(path).close();

long observerCount2 = routerContext.getRouter().getRpcServer()
.getRPCMetrics().getObserverProxyOps();
assertEquals("There should no extra calls to the observer", observerCount1, observerCount2);

fileSystem.open(path).close();
long observerCount3 = routerContext.getRouter().getRpcServer()
.getRPCMetrics().getObserverProxyOps();
assertTrue("Old filesystem will send calls to observer", observerCount3 > observerCount2);
}

void restartActiveWithStateIDContextDisabled() throws Exception {
for (int nnIndex = 0; nnIndex < cluster.getNamenodes().size(); nnIndex++) {
NameNode nameNode = cluster.getCluster().getNameNode(nnIndex);
if (nameNode != null && nameNode.isActiveState()) {
Configuration conf = new Configuration();
setConfDefaults(conf);
cluster.getCluster().getConfiguration(nnIndex)
.setBoolean(DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY, false);
cluster.getCluster().restartNameNode(nnIndex, true);
cluster.getCluster().getNameNode(nnIndex).isActiveState();
}
}
for (String ns : cluster.getNameservices()) {
cluster.switchToActive(ns, NAMENODES[0]);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.federation.router;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -50,4 +51,35 @@ private void assertRequestHeaderStateId(PoolAlignmentContext poolAlignmentContex
poolAlignmentContext.updateRequestState(builder);
Assertions.assertEquals(expectedValue, builder.getStateId());
}

@Test
public void testWhenNamenodeStopsSendingStateId() {
RouterStateIdContext routerStateIdContext = new RouterStateIdContext(new Configuration());
String namespaceId = "namespace1";
PoolAlignmentContext poolContext = new PoolAlignmentContext(routerStateIdContext, namespaceId);

poolContext.receiveResponseState(getRpcResponseHeader(10L));
// Last seen value is the one from namenode,
// but request header is the max seen by clients so far.
Assertions.assertEquals(10L, poolContext.getLastSeenStateId());
assertRequestHeaderStateId(poolContext, Long.MIN_VALUE);

poolContext.advanceClientStateId(10L);
assertRequestHeaderStateId(poolContext, 10L);

// When namenode state context is disabled, it returns a stateId of zero
poolContext.receiveResponseState(getRpcResponseHeader(0));
// Routers should reset the cached state Id to not send a stale value to the observer.
Assertions.assertEquals(Long.MIN_VALUE, poolContext.getLastSeenStateId());
assertRequestHeaderStateId(poolContext, Long.MIN_VALUE);
}

private RpcResponseHeaderProto getRpcResponseHeader(long stateID) {
return RpcResponseHeaderProto
.newBuilder()
.setCallId(1)
.setStatus(RpcResponseHeaderProto.RpcStatusProto.SUCCESS)
.setStateId(stateID)
.build();
}
}