Skip to content

Commit

Permalink
CLI command implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanzlenko committed Sep 9, 2024
1 parent 93c4b83 commit 071c487
Show file tree
Hide file tree
Showing 2 changed files with 158 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.ozone.shell.fsck;

import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.GetBlockResponseProto;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.cli.ContainerOperationClient;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
import org.apache.hadoop.ozone.client.OzoneBucket;
import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.OzoneKey;
import org.apache.hadoop.ozone.client.OzoneVolume;
import org.apache.hadoop.ozone.om.helpers.KeyInfoWithVolumeContext;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
import org.apache.hadoop.ozone.shell.OzoneAddress;

import java.io.IOException;
import java.io.Writer;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.STAND_ALONE;

public class OzoneFsckHandler implements AutoCloseable {
private final OzoneAddress address;

private final OzoneFsckVerboseSettings verboseSettings;

private final Writer writer;

private final boolean deleteCorruptedKeys;

private final OzoneClient client;

private final OzoneManagerProtocol omClient;

private final ContainerOperationClient containerOperationClient;

private final XceiverClientManager xceiverClientManager;

public OzoneFsckHandler(OzoneAddress address, OzoneFsckVerboseSettings verboseSettings, Writer writer,
boolean deleteCorruptedKeys, OzoneClient client, OzoneConfiguration ozoneConfiguration) throws IOException {
this.address = address;
this.verboseSettings = verboseSettings;
this.writer = writer;
this.deleteCorruptedKeys = deleteCorruptedKeys;
this.client = client;
this.omClient = client.getObjectStore().getClientProxy().getOzoneManagerClient();
this.containerOperationClient = new ContainerOperationClient(ozoneConfiguration);
this.xceiverClientManager = containerOperationClient.getXceiverClientManager();
}

public void scan() throws IOException, InterruptedException {
scanVolumes();
}

private void scanVolumes() throws IOException, InterruptedException {
Iterator<? extends OzoneVolume> volumes = client.getObjectStore().listVolumes(address.getVolumeName());

while (volumes.hasNext()) {
scanBuckets(volumes.next());
}
}

private void scanBuckets(OzoneVolume volume) throws IOException, InterruptedException {
Iterator<? extends OzoneBucket> buckets = volume.listBuckets(address.getBucketName());

while (buckets.hasNext()) {
scanKeys(buckets.next());
}
}

private void scanKeys(OzoneBucket bucket) throws IOException, InterruptedException {
Iterator<? extends OzoneKey> keys = bucket.listKeys(address.getKeyName());

while (keys.hasNext()) {
scanKey(keys.next());
}
}

private void scanKey(OzoneKey key) throws IOException, InterruptedException {
OmKeyArgs keyArgs = createKeyArgs(key);

KeyInfoWithVolumeContext keyInfoWithContext = omClient.getKeyInfo(keyArgs, false);

OmKeyInfo keyInfo = keyInfoWithContext.getKeyInfo();

List<OmKeyLocationInfo> locations = keyInfo.getLatestVersionLocations().getBlocksLatestVersionOnly();

for (OmKeyLocationInfo location : locations) {
Pipeline keyPipeline = location.getPipeline();
boolean isECKey = keyPipeline.getReplicationConfig().getReplicationType() == HddsProtos.ReplicationType.EC;
Pipeline pipeline;
if (!isECKey && keyPipeline.getType() != STAND_ALONE) {
pipeline = Pipeline.newBuilder(keyPipeline)
.setReplicationConfig(StandaloneReplicationConfig.getInstance(ONE))
.build();
} else {
pipeline = keyPipeline;
}

XceiverClientSpi xceiverClient = xceiverClientManager.acquireClientForReadData(pipeline);

Map<DatanodeDetails, GetBlockResponseProto> responses = ContainerProtocolCalls.getBlockFromAllNodes(
xceiverClient,
location.getBlockID().getDatanodeBlockIDProtobuf(),
location.getToken()
);
}
}

private OmKeyArgs createKeyArgs(OzoneKey key) {
return new OmKeyArgs.Builder()
.setVolumeName(key.getVolumeName())
.setBucketName(key.getBucketName())
.setKeyName(key.getName())
.build();
}

@Override
public void close() throws Exception {
this.xceiverClientManager.close();
this.containerOperationClient.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package org.apache.hadoop.ozone.shell.fsck;

public enum OzoneFsckVerboseSettings {

}

0 comments on commit 071c487

Please sign in to comment.