Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into HDDS-3698-upgrade
Browse files Browse the repository at this point in the history
  • Loading branch information
Aravindan Vijayan committed Nov 24, 2020
2 parents 4384789 + 6cc4a43 commit bf7624b
Show file tree
Hide file tree
Showing 37 changed files with 472 additions and 133 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hadoop.hdds.conf.ConfigGroup;
import org.apache.hadoop.hdds.conf.ConfigTag;
import org.apache.hadoop.hdds.conf.ConfigType;
import org.apache.hadoop.hdds.conf.PostConstruct;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
import org.apache.hadoop.ozone.OzoneConfigKeys;

Expand Down Expand Up @@ -111,9 +112,7 @@ public class OzoneClientConfig {
tags = ConfigTag.CLIENT)
private boolean checksumVerify = true;

public OzoneClientConfig() {
}

@PostConstruct
private void validate() {
Preconditions.checkState(streamBufferSize > 0);
Preconditions.checkState(streamBufferFlushSize > 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,12 +217,12 @@ private CompletableFuture<RaftClientReply> sendRequestAsync(
if (LOG.isDebugEnabled()) {
LOG.debug("sendCommandAsync ReadOnly {}", message);
}
return getClient().sendReadOnlyAsync(message);
return getClient().async().sendReadOnly(message);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("sendCommandAsync {}", message);
}
return getClient().sendAsync(message);
return getClient().async().send(message);
}

}
Expand Down Expand Up @@ -258,17 +258,17 @@ public XceiverClientReply watchForCommit(long index)
}
RaftClientReply reply;
try {
CompletableFuture<RaftClientReply> replyFuture = getClient()
.sendWatchAsync(index, RaftProtos.ReplicationLevel.ALL_COMMITTED);
CompletableFuture<RaftClientReply> replyFuture = getClient().async()
.watch(index, RaftProtos.ReplicationLevel.ALL_COMMITTED);
replyFuture.get();
} catch (Exception e) {
Throwable t = HddsClientUtils.checkForException(e);
LOG.warn("3 way commit failed on pipeline {}", pipeline, e);
if (t instanceof GroupMismatchException) {
throw e;
}
reply = getClient()
.sendWatchAsync(index, RaftProtos.ReplicationLevel.MAJORITY_COMMITTED)
reply = getClient().async()
.watch(index, RaftProtos.ReplicationLevel.MAJORITY_COMMITTED)
.get();
List<RaftProtos.CommitInfoProto> commitInfoProtoList =
reply.getCommitInfos().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,18 @@ public static RaftPeerId toRaftPeerId(DatanodeDetails id) {
}

public static RaftPeer toRaftPeer(DatanodeDetails id) {
return new RaftPeer(toRaftPeerId(id), toRaftPeerAddressString(id));
return RaftPeer.newBuilder()
.setId(toRaftPeerId(id))
.setAddress(toRaftPeerAddressString(id))
.build();
}

public static RaftPeer toRaftPeer(DatanodeDetails id, int priority) {
return new RaftPeer(
toRaftPeerId(id), toRaftPeerAddressString(id), priority);
return RaftPeer.newBuilder()
.setId(toRaftPeerId(id))
.setAddress(toRaftPeerAddressString(id))
.setPriority(priority)
.build();
}

private static List<RaftPeer> toRaftPeers(Pipeline pipeline) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public void handle(SCMCommand command, OzoneContainer ozoneContainer,
final RaftPeer peer = RatisHelper.toRaftPeer(d);
try (RaftClient client = RatisHelper.newRaftClient(peer, conf,
ozoneContainer.getTlsClientConfig())) {
client.groupAdd(group, peer.getId());
client.getGroupManagementApi(peer.getId()).add(group);
} catch (AlreadyExistsException ae) {
// do not log
} catch (IOException ioe) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -700,7 +700,7 @@ private synchronized void updateLastApplied() {
* @param index index of the log entry
*/
@Override
public void notifyIndexUpdate(long term, long index) {
public void notifyTermIndexUpdated(long term, long index) {
applyTransactionCompletionMap.put(index, term);
// We need to call updateLastApplied here because now in ratis when a
// node becomes leader, it is checking stateMachineIndex >=
Expand Down Expand Up @@ -844,7 +844,7 @@ public void evictStateMachineCache() {
}

@Override
public void notifySlowness(RoleInfoProto roleInfoProto) {
public void notifyFollowerSlowness(RoleInfoProto roleInfoProto) {
ratisServer.handleNodeSlowness(gid, roleInfoProto);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ public ReferenceCountedDB getDB(long containerID, String containerDBType,
try {
long start = Time.monotonicNow();
DatanodeStore store = BlockUtils.getUncachedDatanodeStore(containerID,
containerDBPath, schemaVersion, conf);
containerDBPath, schemaVersion, conf, false);
db = new ReferenceCountedDB(store, containerDBPath);
metrics.incDbOpenLatency(Time.monotonicNow() - start);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,15 @@ private BlockUtils() {
*/
public static DatanodeStore getUncachedDatanodeStore(long containerID,
String containerDBPath, String schemaVersion,
ConfigurationSource conf) throws IOException {
ConfigurationSource conf, boolean readOnly) throws IOException {

DatanodeStore store;
if (schemaVersion.equals(OzoneConsts.SCHEMA_V1)) {
store = new DatanodeStoreSchemaOneImpl(conf,
containerID, containerDBPath);
containerID, containerDBPath, readOnly);
} else if (schemaVersion.equals(OzoneConsts.SCHEMA_V2)) {
store = new DatanodeStoreSchemaTwoImpl(conf,
containerID, containerDBPath);
containerID, containerDBPath, readOnly);
} else {
throw new IllegalArgumentException(
"Unrecognized database schema version: " + schemaVersion);
Expand All @@ -88,11 +88,11 @@ public static DatanodeStore getUncachedDatanodeStore(long containerID,
* @throws IOException
*/
public static DatanodeStore getUncachedDatanodeStore(
KeyValueContainerData containerData, ConfigurationSource conf)
throws IOException {
KeyValueContainerData containerData, ConfigurationSource conf,
boolean readOnly) throws IOException {
return getUncachedDatanodeStore(containerData.getContainerID(),
containerData.getDbFile().getAbsolutePath(),
containerData.getSchemaVersion(), conf);
containerData.getSchemaVersion(), conf, readOnly);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,10 @@ public static void createContainerMetaData(long containerID,
DatanodeStore store;
if (schemaVersion.equals(OzoneConsts.SCHEMA_V1)) {
store = new DatanodeStoreSchemaOneImpl(conf,
containerID, dbFile.getAbsolutePath());
containerID, dbFile.getAbsolutePath(), false);
} else if (schemaVersion.equals(OzoneConsts.SCHEMA_V2)) {
store = new DatanodeStoreSchemaTwoImpl(conf,
containerID, dbFile.getAbsolutePath());
containerID, dbFile.getAbsolutePath(), false);
} else {
throw new IllegalArgumentException(
"Unrecognized schema version for container: " + schemaVersion);
Expand Down Expand Up @@ -192,7 +192,8 @@ public static void parseKVContainerData(KeyValueContainerData kvContainerData,
DatanodeStore store = null;
try {
try {
store = BlockUtils.getUncachedDatanodeStore(kvContainerData, config);
store = BlockUtils.getUncachedDatanodeStore(
kvContainerData, config, true);
} catch (IOException e) {
// If an exception is thrown, then it may indicate the RocksDB is
// already open in the container cache. As this code is only executed at
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public abstract class AbstractDatanodeStore implements DatanodeStore {
private static final DBProfile DEFAULT_PROFILE = DBProfile.DISK;
private static final Map<ConfigurationSource, ColumnFamilyOptions>
OPTIONS_CACHE = new ConcurrentHashMap<>();
private final boolean openReadOnly;

/**
* Constructs the metadata store and starts the DB services.
Expand All @@ -85,7 +86,8 @@ public abstract class AbstractDatanodeStore implements DatanodeStore {
* @throws IOException - on Failure.
*/
protected AbstractDatanodeStore(ConfigurationSource config, long containerID,
AbstractDatanodeDBDefinition dbDef) throws IOException {
AbstractDatanodeDBDefinition dbDef, boolean openReadOnly)
throws IOException {

// The same config instance is used on each datanode, so we can share the
// corresponding column family options, providing a single shared cache
Expand All @@ -97,6 +99,7 @@ protected AbstractDatanodeStore(ConfigurationSource config, long containerID,

this.dbDef = dbDef;
this.containerID = containerID;
this.openReadOnly = openReadOnly;
start(config);
}

Expand All @@ -121,6 +124,7 @@ public void start(ConfigurationSource config)
this.store = DBStoreBuilder.newBuilder(config, dbDef)
.setDBOptions(options)
.setDefaultCFOptions(cfOptions)
.setOpenReadOnly(openReadOnly)
.build();

// Use the DatanodeTable wrapper to disable the table iterator on
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,10 @@ public class DatanodeStoreSchemaOneImpl extends AbstractDatanodeStore {
* @throws IOException - on Failure.
*/
public DatanodeStoreSchemaOneImpl(ConfigurationSource config,
long containerID, String dbPath)
throws IOException {
super(config, containerID, new DatanodeSchemaOneDBDefinition(dbPath));
long containerID, String dbPath, boolean openReadOnly)
throws IOException {
super(config, containerID, new DatanodeSchemaOneDBDefinition(dbPath),
openReadOnly);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ public class DatanodeStoreSchemaTwoImpl extends AbstractDatanodeStore {
* @throws IOException - on Failure.
*/
public DatanodeStoreSchemaTwoImpl(ConfigurationSource config,
long containerID, String dbPath)
throws IOException {
super(config, containerID, new DatanodeSchemaTwoDBDefinition(dbPath));
long containerID, String dbPath, boolean openReadOnly)
throws IOException {
super(config, containerID, new DatanodeSchemaTwoDBDefinition(dbPath),
openReadOnly);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public class TestContainerCache {
private void createContainerDB(OzoneConfiguration conf, File dbFile)
throws Exception {
DatanodeStore store = new DatanodeStoreSchemaTwoImpl(
conf, 1, dbFile.getAbsolutePath());
conf, 1, dbFile.getAbsolutePath(), false);

// we close since the SCM pre-creates containers.
// we will open and put Db handle into a cache when keys are being created
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.hadoop.ozone.protocol.commands.CreatePipelineCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.api.GroupManagementApi;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.RaftGroup;
Expand Down Expand Up @@ -64,15 +65,19 @@ public class TestCreatePipelineCommandHandler {
private StateContext stateContext;
private SCMConnectionManager connectionManager;
private RaftClient raftClient;
private GroupManagementApi raftClientGroupManager;

@Before
public void setup() throws Exception {
ozoneContainer = Mockito.mock(OzoneContainer.class);
stateContext = Mockito.mock(StateContext.class);
connectionManager = Mockito.mock(SCMConnectionManager.class);
raftClient = Mockito.mock(RaftClient.class);
raftClientGroupManager = Mockito.mock(GroupManagementApi.class);
final RaftClient.Builder builder = mockRaftClientBuilder();
Mockito.when(builder.build()).thenReturn(raftClient);
Mockito.when(raftClient.getGroupManagementApi(
Mockito.any(RaftPeerId.class))).thenReturn(raftClientGroupManager);
PowerMockito.mockStatic(RaftClient.class);
PowerMockito.when(RaftClient.newBuilder()).thenReturn(builder);
}
Expand Down Expand Up @@ -121,8 +126,8 @@ public void testPipelineCreation() throws IOException {
Mockito.verify(writeChanel, Mockito.times(1))
.addGroup(pipelineID.getProtobuf(), datanodes, priorityList);

Mockito.verify(raftClient, Mockito.times(2))
.groupAdd(Mockito.any(RaftGroup.class), Mockito.any(RaftPeerId.class));
Mockito.verify(raftClientGroupManager, Mockito.times(2))
.add(Mockito.any(RaftGroup.class));
}

@Test
Expand Down Expand Up @@ -150,8 +155,8 @@ public void testCommandIdempotency() throws IOException {
Mockito.verify(writeChanel, Mockito.times(0))
.addGroup(pipelineID.getProtobuf(), datanodes);

Mockito.verify(raftClient, Mockito.times(0))
.groupAdd(Mockito.any(RaftGroup.class), Mockito.any(RaftPeerId.class));
Mockito.verify(raftClientGroupManager, Mockito.times(0))
.add(Mockito.any(RaftGroup.class));
}

private List<DatanodeDetails> getDatanodes() {
Expand Down
6 changes: 3 additions & 3 deletions hadoop-hdds/docs/content/feature/HA.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ title: "高可用"
weight: 1
menu:
main:
parent: 特性
parent: 特点
summary: Ozone 用于避免单点故障的高可用设置
---

Expand Down Expand Up @@ -87,7 +87,7 @@ Ozone 有两个leader节点(用于键管理的 *Ozone Manager* 和用于块空
</property>
```

基于 [客户端接口]({{< ref path="interface/_index.md" lang="en">}}) ,定义好的 `serviceId` 就可用于替代单个 OM 主机。
基于 [客户端接口]({{< ref path="interface/_index.zh.md" lang="zh">}}) ,定义好的 `serviceId` 就可用于替代单个 OM 主机。

例如,使用 `o3fs://`

Expand All @@ -114,4 +114,4 @@ RocksDB 由后台的批处理事务线程负责更新(这也就是所谓的"
## 参考文档

* 查看 [该页面]({{< ref path="design/omha.md" lang="en">}}) 以获取详细设计文档;
* Ozone 的分发包中的 compose/ozone-om-ha 目录下提供了一个配置 OM 高可用的示例,可以借助 [docker-compose]({{< ref path="start/RunningViaDocker.md" lang="en">}}) 进行测试。
* Ozone 的分发包中的 `compose/ozone-om-ha` 目录下提供了一个配置 OM 高可用的示例,可以借助 [docker-compose]({{< ref path="start/RunningViaDocker.md" lang="en">}}) 进行测试。
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public void setNoLeaderTimeout(Duration duration) {
this.noLeaderTimeout = duration.toMillis();
}

@Config(key = "rpcslowness.timeout",
@Config(key = "rpc.slowness.timeout",
defaultValue = "300s",
type = ConfigType.TIME,
tags = {OZONE, DATANODE, RATIS},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ public final class DBStoreBuilder {
private CodecRegistry registry;
private String rocksDbStat;
private RocksDBConfiguration rocksDBConfiguration;
// Flag to indicate if the RocksDB should be opened readonly.
private boolean openReadOnly = false;

/**
* Create DBStoreBuilder from a generic DBDefinition.
Expand Down Expand Up @@ -187,7 +189,7 @@ public DBStore build() throws IOException {
}

return new RDBStore(dbFile, rocksDBOption, writeOptions, tableConfigs,
registry);
registry, openReadOnly);
}

public DBStoreBuilder setName(String name) {
Expand Down Expand Up @@ -227,6 +229,11 @@ public DBStoreBuilder setPath(Path path) {
return this;
}

public DBStoreBuilder setOpenReadOnly(boolean readOnly) {
this.openReadOnly = readOnly;
return this;
}

/**
* Set the {@link DBOptions} and default {@link ColumnFamilyOptions} based
* on {@code prof}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,13 @@ public class RDBStore implements DBStore {
@VisibleForTesting
public RDBStore(File dbFile, DBOptions options,
Set<TableConfig> families) throws IOException {
this(dbFile, options, new WriteOptions(), families, new CodecRegistry());
this(dbFile, options, new WriteOptions(), families, new CodecRegistry(),
false);
}

public RDBStore(File dbFile, DBOptions options,
WriteOptions writeOptions, Set<TableConfig> families,
CodecRegistry registry)
CodecRegistry registry, boolean readOnly)
throws IOException {
Preconditions.checkNotNull(dbFile, "DB file location cannot be null");
Preconditions.checkNotNull(families);
Expand Down Expand Up @@ -108,8 +109,13 @@ public RDBStore(File dbFile, DBOptions options,
extraCf.forEach(cf -> columnFamilyDescriptors.add(cf.getDescriptor()));
}

db = RocksDB.open(dbOptions, dbLocation.getAbsolutePath(),
columnFamilyDescriptors, columnFamilyHandles);
if (readOnly) {
db = RocksDB.openReadOnly(dbOptions, dbLocation.getAbsolutePath(),
columnFamilyDescriptors, columnFamilyHandles);
} else {
db = RocksDB.open(dbOptions, dbLocation.getAbsolutePath(),
columnFamilyDescriptors, columnFamilyHandles);
}

for (int x = 0; x < columnFamilyHandles.size(); x++) {
handleTable.put(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ static void destroyPipeline(DatanodeDetails dn, PipelineID pipelineID,
try(RaftClient client = RatisHelper
.newRaftClient(SupportedRpcType.valueOfIgnoreCase(rpcType), p,
retryPolicy, grpcTlsConfig, ozoneConf)) {
client.groupRemove(RaftGroupId.valueOf(pipelineID.getId()),
true, false, p.getId());
client.getGroupManagementApi(p.getId())
.remove(RaftGroupId.valueOf(pipelineID.getId()), true, false);
}
}

Expand Down
Loading

0 comments on commit bf7624b

Please sign in to comment.