From 58d144369cd8955cd3214293c43d55411fbbace7 Mon Sep 17 00:00:00 2001 From: Scolley <74013924+scolley31@users.noreply.github.com> Date: Mon, 4 Nov 2024 02:06:32 +0800 Subject: [PATCH] HDDS-10240. Cleanup zero-copy EC (#7340) --- .../apache/hadoop/ozone/OzoneConfigKeys.java | 3 - .../src/main/resources/ozone-default.xml | 8 - .../transport/server/GrpcXceiverService.java | 19 +- .../transport/server/XceiverServerGrpc.java | 9 +- .../replication/GrpcReplicationService.java | 35 +- .../replication/ReplicationServer.java | 29 +- .../SendContainerRequestHandler.java | 6 +- .../TestGrpcReplicationService.java | 8 +- ...estGrpcReplicationServiceWithZeroCopy.java | 31 -- .../rpc/AbstractTestECKeyOutputStream.java | 492 ------------------ .../client/rpc/TestECKeyOutputStream.java | 466 ++++++++++++++++- .../TestECKeyOutputStreamWithZeroCopy.java | 31 -- 12 files changed, 479 insertions(+), 658 deletions(-) delete mode 100644 hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcReplicationServiceWithZeroCopy.java delete mode 100644 hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/AbstractTestECKeyOutputStream.java delete mode 100644 hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStreamWithZeroCopy.java diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java index df0fdc59a4a..5719803b94c 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java @@ -201,9 +201,6 @@ public final class OzoneConfigKeys { "ozone.client.ec.grpc.write.timeout"; public static final String OZONE_CLIENT_EC_GRPC_WRITE_TIMEOUT_DEFAULT = "30s"; - public static final String OZONE_EC_GRPC_ZERO_COPY_ENABLED = - "ozone.ec.grpc.zerocopy.enabled"; - public static final boolean OZONE_EC_GRPC_ZERO_COPY_ENABLED_DEFAULT = true; /** * Ozone administrator users delimited by comma. diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index e2231a5c38e..bc90a87b11e 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -4536,14 +4536,6 @@ to existing buckets till this operation is completed. - - ozone.ec.grpc.zerocopy.enabled - true - OZONE, DATANODE - - Specify if zero-copy should be enabled for EC GRPC protocol. - - ozone.om.max.buckets 100000 diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java index 9c3f29d0f0c..5f1914402d0 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java @@ -20,7 +20,6 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto; import org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc; -import org.apache.hadoop.hdds.utils.IOUtils; import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; import org.apache.ratis.grpc.util.ZeroCopyMessageMarshaller; import org.apache.ratis.thirdparty.com.google.protobuf.MessageLite; @@ -31,7 +30,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.InputStream; import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc.getSendMethod; @@ -45,28 +43,20 @@ public class GrpcXceiverService extends LOG = LoggerFactory.getLogger(GrpcXceiverService.class); private final ContainerDispatcher dispatcher; - private final boolean zeroCopyEnabled; private final ZeroCopyMessageMarshaller zeroCopyMessageMarshaller = new ZeroCopyMessageMarshaller<>( ContainerCommandRequestProto.getDefaultInstance()); - public GrpcXceiverService(ContainerDispatcher dispatcher, - boolean zeroCopyEnabled) { + public GrpcXceiverService(ContainerDispatcher dispatcher) { this.dispatcher = dispatcher; - this.zeroCopyEnabled = zeroCopyEnabled; } /** - * Bind service with zerocopy marshaller equipped for the `send` API if - * zerocopy is enabled. + * Bind service with zerocopy marshaller equipped for the `send` API. * @return service definition. */ public ServerServiceDefinition bindServiceWithZeroCopy() { ServerServiceDefinition orig = super.bindService(); - if (!zeroCopyEnabled) { - LOG.info("Zerocopy is not enabled."); - return orig; - } ServerServiceDefinition.Builder builder = ServerServiceDefinition.builder(orig.getServiceDescriptor().getName()); @@ -117,10 +107,7 @@ public void onNext(ContainerCommandRequestProto request) { isClosed.set(true); responseObserver.onError(e); } finally { - InputStream popStream = zeroCopyMessageMarshaller.popStream(request); - if (popStream != null) { - IOUtils.close(LOG, popStream); - } + zeroCopyMessageMarshaller.release(request); } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java index 42daaa94be3..624f153e876 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java @@ -67,9 +67,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_EC_GRPC_ZERO_COPY_ENABLED; -import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_EC_GRPC_ZERO_COPY_ENABLED_DEFAULT; - /** * Creates a Grpc server endpoint that acts as the communication layer for * Ozone containers. @@ -135,13 +132,9 @@ public XceiverServerGrpc(DatanodeDetails datanodeDetails, eventLoopGroup = new NioEventLoopGroup(poolSize / 10, factory); channelType = NioServerSocketChannel.class; } - final boolean zeroCopyEnabled = conf.getBoolean( - OZONE_EC_GRPC_ZERO_COPY_ENABLED, - OZONE_EC_GRPC_ZERO_COPY_ENABLED_DEFAULT); LOG.info("GrpcServer channel type {}", channelType.getSimpleName()); - GrpcXceiverService xceiverService = new GrpcXceiverService(dispatcher, - zeroCopyEnabled); + GrpcXceiverService xceiverService = new GrpcXceiverService(dispatcher); NettyServerBuilder nettyServerBuilder = NettyServerBuilder.forPort(port) .maxInboundMessageSize(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE) .bossEventLoopGroup(eventLoopGroup) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java index 6bc237207b3..26cd0d82a99 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java @@ -19,7 +19,6 @@ package org.apache.hadoop.ozone.container.replication; import java.io.IOException; -import java.io.InputStream; import java.io.OutputStream; import java.util.HashSet; import java.util.Set; @@ -59,37 +58,24 @@ public class GrpcReplicationService extends private final ContainerReplicationSource source; private final ContainerImporter importer; - private final boolean zeroCopyEnabled; - private final ZeroCopyMessageMarshaller sendContainerZeroCopyMessageMarshaller; private final ZeroCopyMessageMarshaller copyContainerZeroCopyMessageMarshaller; - public GrpcReplicationService(ContainerReplicationSource source, - ContainerImporter importer, boolean zeroCopyEnabled) { + public GrpcReplicationService(ContainerReplicationSource source, ContainerImporter importer) { this.source = source; this.importer = importer; - this.zeroCopyEnabled = zeroCopyEnabled; - - if (zeroCopyEnabled) { - sendContainerZeroCopyMessageMarshaller = new ZeroCopyMessageMarshaller<>( - SendContainerRequest.getDefaultInstance()); - copyContainerZeroCopyMessageMarshaller = new ZeroCopyMessageMarshaller<>( - CopyContainerRequestProto.getDefaultInstance()); - } else { - sendContainerZeroCopyMessageMarshaller = null; - copyContainerZeroCopyMessageMarshaller = null; - } + + sendContainerZeroCopyMessageMarshaller = new ZeroCopyMessageMarshaller<>( + SendContainerRequest.getDefaultInstance()); + copyContainerZeroCopyMessageMarshaller = new ZeroCopyMessageMarshaller<>( + CopyContainerRequestProto.getDefaultInstance()); } public ServerServiceDefinition bindServiceWithZeroCopy() { ServerServiceDefinition orig = super.bindService(); - if (!zeroCopyEnabled) { - LOG.info("Zerocopy is not enabled."); - return orig; - } Set methodNames = new HashSet<>(); ServerServiceDefinition.Builder builder = @@ -155,14 +141,7 @@ public void download(CopyContainerRequestProto request, } finally { // output may have already been closed, ignore such errors IOUtils.cleanupWithLogger(LOG, outputStream); - - if (copyContainerZeroCopyMessageMarshaller != null) { - InputStream popStream = - copyContainerZeroCopyMessageMarshaller.popStream(request); - if (popStream != null) { - IOUtils.cleanupWithLogger(LOG, popStream); - } - } + copyContainerZeroCopyMessageMarshaller.release(request); } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java index b4e92a4a60a..6ca474bdd8a 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java @@ -99,13 +99,12 @@ public ReplicationServer(ContainerController controller, new LinkedBlockingQueue<>(replicationQueueLimit), threadFactory); - init(replicationConfig.isZeroCopyEnable()); + init(); } - public void init(boolean enableZeroCopy) { + public void init() { GrpcReplicationService grpcReplicationService = new GrpcReplicationService( - new OnDemandContainerReplicationSource(controller), importer, - enableZeroCopy); + new OnDemandContainerReplicationSource(controller), importer); NettyServerBuilder nettyServerBuilder = NettyServerBuilder.forPort(port) .maxInboundMessageSize(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE) .addService(ServerInterceptors.intercept( @@ -203,11 +202,6 @@ public static final class ReplicationConfig { static final String REPLICATION_OUTOFSERVICE_FACTOR_KEY = PREFIX + "." + OUTOFSERVICE_FACTOR_KEY; - public static final String ZEROCOPY_ENABLE_KEY = "zerocopy.enabled"; - private static final boolean ZEROCOPY_ENABLE_DEFAULT = true; - private static final String ZEROCOPY_ENABLE_DEFAULT_VALUE = - "true"; - /** * The maximum number of replication commands a single datanode can execute * simultaneously. @@ -249,15 +243,6 @@ public static final class ReplicationConfig { ) private double outOfServiceFactor = OUTOFSERVICE_FACTOR_DEFAULT; - @Config(key = ZEROCOPY_ENABLE_KEY, - type = ConfigType.BOOLEAN, - defaultValue = ZEROCOPY_ENABLE_DEFAULT_VALUE, - tags = {DATANODE, SCM}, - description = "Specify if zero-copy should be enabled for " + - "replication protocol." - ) - private boolean zeroCopyEnable = ZEROCOPY_ENABLE_DEFAULT; - public double getOutOfServiceFactor() { return outOfServiceFactor; } @@ -291,14 +276,6 @@ public void setReplicationQueueLimit(int limit) { this.replicationQueueLimit = limit; } - public boolean isZeroCopyEnable() { - return zeroCopyEnable; - } - - public void setZeroCopyEnable(boolean zeroCopyEnable) { - this.zeroCopyEnable = zeroCopyEnable; - } - @PostConstruct public void validate() { if (replicationMaxStreams < 1) { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java index 506a96fe051..40b4dec3493 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java @@ -30,7 +30,6 @@ import org.slf4j.LoggerFactory; import java.io.IOException; -import java.io.InputStream; import java.io.OutputStream; import java.nio.file.Files; import java.nio.file.Path; @@ -105,10 +104,7 @@ public void onNext(SendContainerRequest req) { onError(t); } finally { if (marshaller != null) { - InputStream popStream = marshaller.popStream(req); - if (popStream != null) { - IOUtils.cleanupWithLogger(LOG, popStream); - } + marshaller.release(req); } } } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcReplicationService.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcReplicationService.java index 03901b99be3..bca98d1e7dc 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcReplicationService.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcReplicationService.java @@ -83,17 +83,15 @@ class TestGrpcReplicationService { @BeforeEach public void setUp() throws Exception { - init(false); + init(); } - public void init(boolean isZeroCopy) throws Exception { + public void init() throws Exception { conf = new OzoneConfiguration(); ReplicationServer.ReplicationConfig replicationConfig = conf.getObject(ReplicationServer.ReplicationConfig.class); - replicationConfig.setZeroCopyEnable(isZeroCopy); - SecurityConfig secConf = new SecurityConfig(conf); ContainerSet containerSet = new ContainerSet(1000); @@ -230,7 +228,7 @@ public void copyData(long containerId, OutputStream destination, }; ContainerImporter importer = mock(ContainerImporter.class); GrpcReplicationService subject = - new GrpcReplicationService(source, importer, false); + new GrpcReplicationService(source, importer); CopyContainerRequestProto request = CopyContainerRequestProto.newBuilder() .setContainerID(1) diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcReplicationServiceWithZeroCopy.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcReplicationServiceWithZeroCopy.java deleted file mode 100644 index 00891cf3e24..00000000000 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcReplicationServiceWithZeroCopy.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ozone.container.replication; - -import org.junit.jupiter.api.BeforeEach; - -/** - * Tests {@link GrpcReplicationService}. - */ -class TestGrpcReplicationServiceWithZeroCopy - extends TestGrpcReplicationService { - @BeforeEach - public void setUp() throws Exception { - init(true); - } -} diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/AbstractTestECKeyOutputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/AbstractTestECKeyOutputStream.java deleted file mode 100644 index 3063e2587e4..00000000000 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/AbstractTestECKeyOutputStream.java +++ /dev/null @@ -1,492 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.ozone.client.rpc; - -import org.apache.commons.lang3.NotImplementedException; -import org.apache.hadoop.hdds.HddsConfigKeys; -import org.apache.hadoop.hdds.client.DefaultReplicationConfig; -import org.apache.hadoop.hdds.client.ECReplicationConfig; -import org.apache.hadoop.hdds.client.RatisReplicationConfig; -import org.apache.hadoop.hdds.client.ReplicationConfig; -import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.hadoop.hdds.conf.StorageUnit; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.scm.OzoneClientConfig; -import org.apache.hadoop.hdds.scm.ScmConfigKeys; -import org.apache.hadoop.hdds.scm.cli.ContainerOperationClient; -import org.apache.hadoop.hdds.scm.pipeline.Pipeline; -import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; -import org.apache.hadoop.hdds.utils.IOUtils; -import org.apache.hadoop.ozone.ClientConfigForTesting; -import org.apache.hadoop.ozone.MiniOzoneCluster; -import org.apache.hadoop.ozone.OzoneConfigKeys; -import org.apache.hadoop.ozone.client.BucketArgs; -import org.apache.hadoop.ozone.client.ObjectStore; -import org.apache.hadoop.ozone.client.OzoneBucket; -import org.apache.hadoop.ozone.client.OzoneClient; -import org.apache.hadoop.ozone.client.OzoneClientFactory; -import org.apache.hadoop.ozone.client.OzoneKey; -import org.apache.hadoop.ozone.client.OzoneKeyDetails; -import org.apache.hadoop.ozone.client.OzoneVolume; -import org.apache.hadoop.ozone.client.io.ECKeyOutputStream; -import org.apache.hadoop.ozone.client.io.KeyOutputStream; -import org.apache.hadoop.ozone.client.io.OzoneInputStream; -import org.apache.hadoop.ozone.client.io.OzoneOutputStream; -import org.apache.hadoop.ozone.container.TestHelper; -import org.apache.ozone.test.GenericTestUtils; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Test; - -import java.io.IOException; -import java.util.Arrays; -import java.util.HashMap; -import java.util.UUID; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import static java.nio.charset.StandardCharsets.UTF_8; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; -import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.jupiter.api.Assertions.assertArrayEquals; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertInstanceOf; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.fail; - -/** - * Tests key output stream. - */ -abstract class AbstractTestECKeyOutputStream { - private static MiniOzoneCluster cluster; - private static OzoneConfiguration conf = new OzoneConfiguration(); - private static OzoneClient client; - private static ObjectStore objectStore; - private static int chunkSize; - private static int flushSize; - private static int maxFlushSize; - private static int blockSize; - private static String volumeName; - private static String bucketName; - private static String keyString; - private static int dataBlocks = 3; - private static int inputSize = dataBlocks * chunkSize; - private static byte[][] inputChunks = new byte[dataBlocks][chunkSize]; - - /** - * Create a MiniDFSCluster for testing. - */ - protected static void init(boolean zeroCopyEnabled) throws Exception { - chunkSize = 1024 * 1024; - flushSize = 2 * chunkSize; - maxFlushSize = 2 * flushSize; - blockSize = 2 * maxFlushSize; - - OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class); - clientConfig.setChecksumType(ContainerProtos.ChecksumType.NONE); - clientConfig.setStreamBufferFlushDelay(false); - conf.setFromObject(clientConfig); - - // If SCM detects dead node too quickly, then container would be moved to - // closed state and all in progress writes will get exception. To avoid - // that, we are just keeping higher timeout and none of the tests depending - // on deadnode detection timeout currently. - conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 30, TimeUnit.SECONDS); - conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 60, TimeUnit.SECONDS); - conf.setTimeDuration("hdds.ratis.raft.server.rpc.slowness.timeout", 300, - TimeUnit.SECONDS); - conf.setTimeDuration( - "hdds.ratis.raft.server.notification.no-leader.timeout", 300, - TimeUnit.SECONDS); - conf.setQuietMode(false); - conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4, - StorageUnit.MB); - conf.setTimeDuration(HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL, 500, - TimeUnit.MILLISECONDS); - conf.setTimeDuration(HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL, 1, - TimeUnit.SECONDS); - conf.setBoolean(OzoneConfigKeys.OZONE_EC_GRPC_ZERO_COPY_ENABLED, - zeroCopyEnabled); - conf.setInt(ScmConfigKeys.OZONE_SCM_RATIS_PIPELINE_LIMIT, 10); - // "Enable" hsync to verify that hsync would be blocked by ECKeyOutputStream - conf.setBoolean(OzoneConfigKeys.OZONE_HBASE_ENHANCEMENTS_ALLOWED, true); - conf.setBoolean("ozone.client.hbase.enhancements.allowed", true); - conf.setBoolean(OzoneConfigKeys.OZONE_FS_HSYNC_ENABLED, true); - - ClientConfigForTesting.newBuilder(StorageUnit.BYTES) - .setBlockSize(blockSize) - .setChunkSize(chunkSize) - .setStreamBufferFlushSize(flushSize) - .setStreamBufferMaxSize(maxFlushSize) - .applyTo(conf); - - cluster = MiniOzoneCluster.newBuilder(conf) - .setNumDatanodes(10) - .build(); - cluster.waitForClusterToBeReady(); - client = OzoneClientFactory.getRpcClient(conf); - objectStore = client.getObjectStore(); - keyString = UUID.randomUUID().toString(); - volumeName = "testeckeyoutputstream"; - bucketName = volumeName; - objectStore.createVolume(volumeName); - objectStore.getVolume(volumeName).createBucket(bucketName); - initInputChunks(); - } - - @BeforeAll - public static void init() throws Exception { - init(false); - } - - /** - * Shutdown MiniDFSCluster. - */ - @AfterAll - public static void shutdown() { - IOUtils.closeQuietly(client); - if (cluster != null) { - cluster.shutdown(); - } - } - - @Test - public void testCreateKeyWithECReplicationConfig() throws Exception { - try (OzoneOutputStream key = TestHelper - .createKey(keyString, new ECReplicationConfig(3, 2, - ECReplicationConfig.EcCodec.RS, chunkSize), inputSize, - objectStore, volumeName, bucketName)) { - assertInstanceOf(ECKeyOutputStream.class, key.getOutputStream()); - } - } - - @Test - public void testCreateKeyWithOutBucketDefaults() throws Exception { - OzoneVolume volume = objectStore.getVolume(volumeName); - OzoneBucket bucket = volume.getBucket(bucketName); - try (OzoneOutputStream out = bucket.createKey("myKey", inputSize)) { - assertInstanceOf(KeyOutputStream.class, out.getOutputStream()); - for (byte[] inputChunk : inputChunks) { - out.write(inputChunk); - } - } - } - - @Test - public void testCreateKeyWithBucketDefaults() throws Exception { - String myBucket = UUID.randomUUID().toString(); - OzoneVolume volume = objectStore.getVolume(volumeName); - final BucketArgs.Builder bucketArgs = BucketArgs.newBuilder(); - bucketArgs.setDefaultReplicationConfig( - new DefaultReplicationConfig( - new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS, - chunkSize))); - - volume.createBucket(myBucket, bucketArgs.build()); - OzoneBucket bucket = volume.getBucket(myBucket); - - try (OzoneOutputStream out = bucket.createKey(keyString, inputSize)) { - assertInstanceOf(ECKeyOutputStream.class, out.getOutputStream()); - for (byte[] inputChunk : inputChunks) { - out.write(inputChunk); - } - } - byte[] buf = new byte[chunkSize]; - try (OzoneInputStream in = bucket.readKey(keyString)) { - for (byte[] inputChunk : inputChunks) { - int read = in.read(buf, 0, chunkSize); - assertEquals(chunkSize, read); - assertArrayEquals(buf, inputChunk); - } - } - } - - @Test - public void testOverwriteECKeyWithRatisKey() throws Exception { - String myBucket = UUID.randomUUID().toString(); - OzoneVolume volume = objectStore.getVolume(volumeName); - final BucketArgs.Builder bucketArgs = BucketArgs.newBuilder(); - volume.createBucket(myBucket, bucketArgs.build()); - OzoneBucket bucket = volume.getBucket(myBucket); - createKeyAndCheckReplicationConfig(keyString, bucket, - new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS, - chunkSize)); - - //Overwrite with RATIS/THREE - createKeyAndCheckReplicationConfig(keyString, bucket, - RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE)); - - //Overwrite with RATIS/ONE - createKeyAndCheckReplicationConfig(keyString, bucket, - RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.ONE)); - } - - @Test - public void testOverwriteRatisKeyWithECKey() throws Exception { - String myBucket = UUID.randomUUID().toString(); - OzoneVolume volume = objectStore.getVolume(volumeName); - final BucketArgs.Builder bucketArgs = BucketArgs.newBuilder(); - volume.createBucket(myBucket, bucketArgs.build()); - OzoneBucket bucket = volume.getBucket(myBucket); - - createKeyAndCheckReplicationConfig(keyString, bucket, - RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE)); - // Overwrite with EC key - createKeyAndCheckReplicationConfig(keyString, bucket, - new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS, - chunkSize)); - } - - private void createKeyAndCheckReplicationConfig(String keyName, - OzoneBucket bucket, ReplicationConfig replicationConfig) - throws IOException { - try (OzoneOutputStream out = bucket - .createKey(keyName, inputSize, replicationConfig, new HashMap<>())) { - for (byte[] inputChunk : inputChunks) { - out.write(inputChunk); - } - } - OzoneKeyDetails key = bucket.getKey(keyName); - assertEquals(replicationConfig, key.getReplicationConfig()); - } - - @Test - public void testCreateRatisKeyAndWithECBucketDefaults() throws Exception { - OzoneBucket bucket = getOzoneBucket(); - try (OzoneOutputStream out = bucket.createKey( - "testCreateRatisKeyAndWithECBucketDefaults", 2000, - RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE), - new HashMap<>())) { - assertInstanceOf(KeyOutputStream.class, out.getOutputStream()); - for (byte[] inputChunk : inputChunks) { - out.write(inputChunk); - } - } - } - - @Test - public void test13ChunksInSingleWriteOp() throws IOException { - testMultipleChunksInSingleWriteOp(13); - } - - @Test - public void testChunksInSingleWriteOpWithOffset() throws IOException { - testMultipleChunksInSingleWriteOp(11, 25, 19); - } - - @Test - public void test15ChunksInSingleWriteOp() throws IOException { - testMultipleChunksInSingleWriteOp(15); - } - - @Test - public void test20ChunksInSingleWriteOp() throws IOException { - testMultipleChunksInSingleWriteOp(20); - } - - @Test - public void test21ChunksInSingleWriteOp() throws IOException { - testMultipleChunksInSingleWriteOp(21); - } - - private void testMultipleChunksInSingleWriteOp(int offset, - int bufferChunks, int numChunks) - throws IOException { - byte[] inputData = getInputBytes(offset, bufferChunks, numChunks); - final OzoneBucket bucket = getOzoneBucket(); - String keyName = - String.format("testMultipleChunksInSingleWriteOpOffset" + - "%dBufferChunks%dNumChunks", offset, bufferChunks, - numChunks); - try (OzoneOutputStream out = bucket.createKey(keyName, 4096, - new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS, - chunkSize), new HashMap<>())) { - out.write(inputData, offset, numChunks * chunkSize); - } - - validateContent(offset, numChunks * chunkSize, inputData, bucket, - bucket.getKey(keyName)); - } - - private void testMultipleChunksInSingleWriteOp(int numChunks) - throws IOException { - testMultipleChunksInSingleWriteOp(0, numChunks, numChunks); - } - - @Test - public void testECContainerKeysCountAndNumContainerReplicas() - throws IOException, InterruptedException, TimeoutException { - byte[] inputData = getInputBytes(1); - final OzoneBucket bucket = getOzoneBucket(); - ContainerOperationClient containerOperationClient = - new ContainerOperationClient(conf); - - ECReplicationConfig repConfig = new ECReplicationConfig( - 3, 2, ECReplicationConfig.EcCodec.RS, chunkSize); - // Close all EC pipelines so we must get a fresh pipeline and hence - // container for this test. - PipelineManager pm = - cluster.getStorageContainerManager().getPipelineManager(); - for (Pipeline p : pm.getPipelines(repConfig)) { - pm.closePipeline(p, true); - } - - String keyName = UUID.randomUUID().toString(); - try (OzoneOutputStream out = bucket.createKey(keyName, 4096, - repConfig, new HashMap<>())) { - out.write(inputData); - } - OzoneKeyDetails key = bucket.getKey(keyName); - long currentKeyContainerID = - key.getOzoneKeyLocations().get(0).getContainerID(); - - GenericTestUtils.waitFor(() -> { - try { - return (containerOperationClient.getContainer(currentKeyContainerID) - .getNumberOfKeys() == 1) && (containerOperationClient - .getContainerReplicas(currentKeyContainerID).size() == 5); - } catch (IOException exception) { - fail("Unexpected exception " + exception); - return false; - } - }, 100, 10000); - validateContent(inputData, bucket, key); - } - - private void validateContent(byte[] inputData, OzoneBucket bucket, - OzoneKey key) throws IOException { - validateContent(0, inputData.length, inputData, bucket, key); - } - - private void validateContent(int offset, int length, byte[] inputData, - OzoneBucket bucket, - OzoneKey key) throws IOException { - try (OzoneInputStream is = bucket.readKey(key.getName())) { - byte[] fileContent = new byte[length]; - assertEquals(length, is.read(fileContent)); - assertEquals(new String(Arrays.copyOfRange(inputData, offset, - offset + length), UTF_8), - new String(fileContent, UTF_8)); - } - } - - private OzoneBucket getOzoneBucket() throws IOException { - String myBucket = UUID.randomUUID().toString(); - OzoneVolume volume = objectStore.getVolume(volumeName); - final BucketArgs.Builder bucketArgs = BucketArgs.newBuilder(); - bucketArgs.setDefaultReplicationConfig( - new DefaultReplicationConfig( - new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS, - chunkSize))); - - volume.createBucket(myBucket, bucketArgs.build()); - return volume.getBucket(myBucket); - } - - private static void initInputChunks() { - for (int i = 0; i < dataBlocks; i++) { - inputChunks[i] = getBytesWith(i + 1, chunkSize); - } - } - - private static byte[] getBytesWith(int singleDigitNumber, int total) { - StringBuilder builder = new StringBuilder(singleDigitNumber); - for (int i = 1; i <= total; i++) { - builder.append(singleDigitNumber); - } - return builder.toString().getBytes(UTF_8); - } - - @Test - public void testWriteShouldSucceedWhenDNKilled() throws Exception { - int numChunks = 3; - byte[] inputData = getInputBytes(numChunks); - final OzoneBucket bucket = getOzoneBucket(); - String keyName = "testWriteShouldSucceedWhenDNKilled" + numChunks; - DatanodeDetails nodeToKill = null; - try { - try (OzoneOutputStream out = bucket.createKey(keyName, 1024, - new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS, - chunkSize), new HashMap<>())) { - ECKeyOutputStream ecOut = (ECKeyOutputStream) out.getOutputStream(); - out.write(inputData); - // Kill a node from first pipeline - nodeToKill = ecOut.getStreamEntries() - .get(0).getPipeline().getFirstNode(); - cluster.shutdownHddsDatanode(nodeToKill); - - out.write(inputData); - - // Wait for flushing thread to finish its work. - final long checkpoint = System.currentTimeMillis(); - ecOut.insertFlushCheckpoint(checkpoint); - GenericTestUtils.waitFor(() -> ecOut.getFlushCheckpoint() == checkpoint, - 100, 10000); - - // Check the second blockGroup pipeline to make sure that the failed - // node is not selected. - assertThat(ecOut.getStreamEntries().get(1).getPipeline().getNodes()) - .doesNotContain(nodeToKill); - } - - try (OzoneInputStream is = bucket.readKey(keyName)) { - // We wrote "inputData" twice, so do two reads and ensure the correct - // data comes back. - for (int i = 0; i < 2; i++) { - byte[] fileContent = new byte[inputData.length]; - assertEquals(inputData.length, is.read(fileContent)); - assertEquals(new String(inputData, UTF_8), - new String(fileContent, UTF_8)); - } - } - } finally { - cluster.restartHddsDatanode(nodeToKill, true); - } - } - - private byte[] getInputBytes(int numChunks) { - return getInputBytes(0, numChunks, numChunks); - } - - private byte[] getInputBytes(int offset, int bufferChunks, int numChunks) { - byte[] inputData = new byte[offset + bufferChunks * chunkSize]; - for (int i = 0; i < numChunks; i++) { - int start = offset + (i * chunkSize); - Arrays.fill(inputData, start, start + chunkSize - 1, - String.valueOf(i % 9).getBytes(UTF_8)[0]); - } - return inputData; - } - - @Test - public void testBlockedHflushAndHsync() throws Exception { - // Expect ECKeyOutputStream hflush and hsync calls to throw exception - try (OzoneOutputStream oOut = TestHelper.createKey( - keyString, new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS, chunkSize), - inputSize, objectStore, volumeName, bucketName)) { - assertInstanceOf(ECKeyOutputStream.class, oOut.getOutputStream()); - KeyOutputStream kOut = (KeyOutputStream) oOut.getOutputStream(); - - assertThrows(NotImplementedException.class, () -> kOut.hflush()); - assertThrows(NotImplementedException.class, () -> kOut.hsync()); - } - } - -} diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java index c5147ecfb01..5743866f2d2 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java @@ -17,15 +17,471 @@ package org.apache.hadoop.ozone.client.rpc; +import org.apache.commons.lang3.NotImplementedException; +import org.apache.hadoop.hdds.HddsConfigKeys; +import org.apache.hadoop.hdds.client.DefaultReplicationConfig; +import org.apache.hadoop.hdds.client.ECReplicationConfig; +import org.apache.hadoop.hdds.client.RatisReplicationConfig; +import org.apache.hadoop.hdds.client.ReplicationConfig; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.conf.StorageUnit; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.OzoneClientConfig; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.cli.ContainerOperationClient; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; +import org.apache.hadoop.hdds.utils.IOUtils; +import org.apache.hadoop.ozone.ClientConfigForTesting; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.client.BucketArgs; +import org.apache.hadoop.ozone.client.ObjectStore; +import org.apache.hadoop.ozone.client.OzoneBucket; +import org.apache.hadoop.ozone.client.OzoneClient; +import org.apache.hadoop.ozone.client.OzoneClientFactory; +import org.apache.hadoop.ozone.client.OzoneKey; +import org.apache.hadoop.ozone.client.OzoneKeyDetails; +import org.apache.hadoop.ozone.client.OzoneVolume; +import org.apache.hadoop.ozone.client.io.ECKeyOutputStream; +import org.apache.hadoop.ozone.client.io.KeyOutputStream; +import org.apache.hadoop.ozone.client.io.OzoneInputStream; +import org.apache.hadoop.ozone.client.io.OzoneOutputStream; +import org.apache.hadoop.ozone.container.TestHelper; +import org.apache.ozone.test.GenericTestUtils; +import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.fail; /** - * Tests key output stream without zero-copy enabled. + * Tests key output stream. */ -public class TestECKeyOutputStream extends - AbstractTestECKeyOutputStream { +public class TestECKeyOutputStream { + private static MiniOzoneCluster cluster; + private static OzoneConfiguration conf = new OzoneConfiguration(); + private static OzoneClient client; + private static ObjectStore objectStore; + private static int chunkSize; + private static int flushSize; + private static int maxFlushSize; + private static int blockSize; + private static String volumeName; + private static String bucketName; + private static String keyString; + private static int dataBlocks = 3; + private static int inputSize = dataBlocks * chunkSize; + private static byte[][] inputChunks = new byte[dataBlocks][chunkSize]; + + /** + * Create a MiniDFSCluster for testing. + */ @BeforeAll - public static void init() throws Exception { - init(false); + protected static void init() throws Exception { + chunkSize = 1024 * 1024; + flushSize = 2 * chunkSize; + maxFlushSize = 2 * flushSize; + blockSize = 2 * maxFlushSize; + + OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class); + clientConfig.setChecksumType(ContainerProtos.ChecksumType.NONE); + clientConfig.setStreamBufferFlushDelay(false); + conf.setFromObject(clientConfig); + + // If SCM detects dead node too quickly, then container would be moved to + // closed state and all in progress writes will get exception. To avoid + // that, we are just keeping higher timeout and none of the tests depending + // on deadnode detection timeout currently. + conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 30, TimeUnit.SECONDS); + conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 60, TimeUnit.SECONDS); + conf.setTimeDuration("hdds.ratis.raft.server.rpc.slowness.timeout", 300, + TimeUnit.SECONDS); + conf.setTimeDuration( + "hdds.ratis.raft.server.notification.no-leader.timeout", 300, + TimeUnit.SECONDS); + conf.setQuietMode(false); + conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4, + StorageUnit.MB); + conf.setTimeDuration(HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL, 500, + TimeUnit.MILLISECONDS); + conf.setTimeDuration(HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL, 1, + TimeUnit.SECONDS); + conf.setInt(ScmConfigKeys.OZONE_SCM_RATIS_PIPELINE_LIMIT, 10); + // "Enable" hsync to verify that hsync would be blocked by ECKeyOutputStream + conf.setBoolean(OzoneConfigKeys.OZONE_HBASE_ENHANCEMENTS_ALLOWED, true); + conf.setBoolean("ozone.client.hbase.enhancements.allowed", true); + conf.setBoolean(OzoneConfigKeys.OZONE_FS_HSYNC_ENABLED, true); + + ClientConfigForTesting.newBuilder(StorageUnit.BYTES) + .setBlockSize(blockSize) + .setChunkSize(chunkSize) + .setStreamBufferFlushSize(flushSize) + .setStreamBufferMaxSize(maxFlushSize) + .applyTo(conf); + + cluster = MiniOzoneCluster.newBuilder(conf) + .setNumDatanodes(10) + .build(); + cluster.waitForClusterToBeReady(); + client = OzoneClientFactory.getRpcClient(conf); + objectStore = client.getObjectStore(); + keyString = UUID.randomUUID().toString(); + volumeName = "testeckeyoutputstream"; + bucketName = volumeName; + objectStore.createVolume(volumeName); + objectStore.getVolume(volumeName).createBucket(bucketName); + initInputChunks(); } + + /** + * Shutdown MiniDFSCluster. + */ + @AfterAll + public static void shutdown() { + IOUtils.closeQuietly(client); + if (cluster != null) { + cluster.shutdown(); + } + } + + @Test + public void testCreateKeyWithECReplicationConfig() throws Exception { + try (OzoneOutputStream key = TestHelper + .createKey(keyString, new ECReplicationConfig(3, 2, + ECReplicationConfig.EcCodec.RS, chunkSize), inputSize, + objectStore, volumeName, bucketName)) { + assertInstanceOf(ECKeyOutputStream.class, key.getOutputStream()); + } + } + + @Test + public void testCreateKeyWithOutBucketDefaults() throws Exception { + OzoneVolume volume = objectStore.getVolume(volumeName); + OzoneBucket bucket = volume.getBucket(bucketName); + try (OzoneOutputStream out = bucket.createKey("myKey", inputSize)) { + assertInstanceOf(KeyOutputStream.class, out.getOutputStream()); + for (byte[] inputChunk : inputChunks) { + out.write(inputChunk); + } + } + } + + @Test + public void testCreateKeyWithBucketDefaults() throws Exception { + String myBucket = UUID.randomUUID().toString(); + OzoneVolume volume = objectStore.getVolume(volumeName); + final BucketArgs.Builder bucketArgs = BucketArgs.newBuilder(); + bucketArgs.setDefaultReplicationConfig( + new DefaultReplicationConfig( + new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS, + chunkSize))); + + volume.createBucket(myBucket, bucketArgs.build()); + OzoneBucket bucket = volume.getBucket(myBucket); + + try (OzoneOutputStream out = bucket.createKey(keyString, inputSize)) { + assertInstanceOf(ECKeyOutputStream.class, out.getOutputStream()); + for (byte[] inputChunk : inputChunks) { + out.write(inputChunk); + } + } + byte[] buf = new byte[chunkSize]; + try (OzoneInputStream in = bucket.readKey(keyString)) { + for (byte[] inputChunk : inputChunks) { + int read = in.read(buf, 0, chunkSize); + assertEquals(chunkSize, read); + assertArrayEquals(buf, inputChunk); + } + } + } + + @Test + public void testOverwriteECKeyWithRatisKey() throws Exception { + String myBucket = UUID.randomUUID().toString(); + OzoneVolume volume = objectStore.getVolume(volumeName); + final BucketArgs.Builder bucketArgs = BucketArgs.newBuilder(); + volume.createBucket(myBucket, bucketArgs.build()); + OzoneBucket bucket = volume.getBucket(myBucket); + createKeyAndCheckReplicationConfig(keyString, bucket, + new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS, + chunkSize)); + + //Overwrite with RATIS/THREE + createKeyAndCheckReplicationConfig(keyString, bucket, + RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE)); + + //Overwrite with RATIS/ONE + createKeyAndCheckReplicationConfig(keyString, bucket, + RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.ONE)); + } + + @Test + public void testOverwriteRatisKeyWithECKey() throws Exception { + String myBucket = UUID.randomUUID().toString(); + OzoneVolume volume = objectStore.getVolume(volumeName); + final BucketArgs.Builder bucketArgs = BucketArgs.newBuilder(); + volume.createBucket(myBucket, bucketArgs.build()); + OzoneBucket bucket = volume.getBucket(myBucket); + + createKeyAndCheckReplicationConfig(keyString, bucket, + RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE)); + // Overwrite with EC key + createKeyAndCheckReplicationConfig(keyString, bucket, + new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS, + chunkSize)); + } + + private void createKeyAndCheckReplicationConfig(String keyName, + OzoneBucket bucket, ReplicationConfig replicationConfig) + throws IOException { + try (OzoneOutputStream out = bucket + .createKey(keyName, inputSize, replicationConfig, new HashMap<>())) { + for (byte[] inputChunk : inputChunks) { + out.write(inputChunk); + } + } + OzoneKeyDetails key = bucket.getKey(keyName); + assertEquals(replicationConfig, key.getReplicationConfig()); + } + + @Test + public void testCreateRatisKeyAndWithECBucketDefaults() throws Exception { + OzoneBucket bucket = getOzoneBucket(); + try (OzoneOutputStream out = bucket.createKey( + "testCreateRatisKeyAndWithECBucketDefaults", 2000, + RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE), + new HashMap<>())) { + assertInstanceOf(KeyOutputStream.class, out.getOutputStream()); + for (byte[] inputChunk : inputChunks) { + out.write(inputChunk); + } + } + } + + @Test + public void test13ChunksInSingleWriteOp() throws IOException { + testMultipleChunksInSingleWriteOp(13); + } + + @Test + public void testChunksInSingleWriteOpWithOffset() throws IOException { + testMultipleChunksInSingleWriteOp(11, 25, 19); + } + + @Test + public void test15ChunksInSingleWriteOp() throws IOException { + testMultipleChunksInSingleWriteOp(15); + } + + @Test + public void test20ChunksInSingleWriteOp() throws IOException { + testMultipleChunksInSingleWriteOp(20); + } + + @Test + public void test21ChunksInSingleWriteOp() throws IOException { + testMultipleChunksInSingleWriteOp(21); + } + + private void testMultipleChunksInSingleWriteOp(int offset, + int bufferChunks, int numChunks) + throws IOException { + byte[] inputData = getInputBytes(offset, bufferChunks, numChunks); + final OzoneBucket bucket = getOzoneBucket(); + String keyName = + String.format("testMultipleChunksInSingleWriteOpOffset" + + "%dBufferChunks%dNumChunks", offset, bufferChunks, + numChunks); + try (OzoneOutputStream out = bucket.createKey(keyName, 4096, + new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS, + chunkSize), new HashMap<>())) { + out.write(inputData, offset, numChunks * chunkSize); + } + + validateContent(offset, numChunks * chunkSize, inputData, bucket, + bucket.getKey(keyName)); + } + + private void testMultipleChunksInSingleWriteOp(int numChunks) + throws IOException { + testMultipleChunksInSingleWriteOp(0, numChunks, numChunks); + } + + @Test + public void testECContainerKeysCountAndNumContainerReplicas() + throws IOException, InterruptedException, TimeoutException { + byte[] inputData = getInputBytes(1); + final OzoneBucket bucket = getOzoneBucket(); + ContainerOperationClient containerOperationClient = + new ContainerOperationClient(conf); + + ECReplicationConfig repConfig = new ECReplicationConfig( + 3, 2, ECReplicationConfig.EcCodec.RS, chunkSize); + // Close all EC pipelines so we must get a fresh pipeline and hence + // container for this test. + PipelineManager pm = + cluster.getStorageContainerManager().getPipelineManager(); + for (Pipeline p : pm.getPipelines(repConfig)) { + pm.closePipeline(p, true); + } + + String keyName = UUID.randomUUID().toString(); + try (OzoneOutputStream out = bucket.createKey(keyName, 4096, + repConfig, new HashMap<>())) { + out.write(inputData); + } + OzoneKeyDetails key = bucket.getKey(keyName); + long currentKeyContainerID = + key.getOzoneKeyLocations().get(0).getContainerID(); + + GenericTestUtils.waitFor(() -> { + try { + return (containerOperationClient.getContainer(currentKeyContainerID) + .getNumberOfKeys() == 1) && (containerOperationClient + .getContainerReplicas(currentKeyContainerID).size() == 5); + } catch (IOException exception) { + fail("Unexpected exception " + exception); + return false; + } + }, 100, 10000); + validateContent(inputData, bucket, key); + } + + private void validateContent(byte[] inputData, OzoneBucket bucket, + OzoneKey key) throws IOException { + validateContent(0, inputData.length, inputData, bucket, key); + } + + private void validateContent(int offset, int length, byte[] inputData, + OzoneBucket bucket, + OzoneKey key) throws IOException { + try (OzoneInputStream is = bucket.readKey(key.getName())) { + byte[] fileContent = new byte[length]; + assertEquals(length, is.read(fileContent)); + assertEquals(new String(Arrays.copyOfRange(inputData, offset, + offset + length), UTF_8), + new String(fileContent, UTF_8)); + } + } + + private OzoneBucket getOzoneBucket() throws IOException { + String myBucket = UUID.randomUUID().toString(); + OzoneVolume volume = objectStore.getVolume(volumeName); + final BucketArgs.Builder bucketArgs = BucketArgs.newBuilder(); + bucketArgs.setDefaultReplicationConfig( + new DefaultReplicationConfig( + new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS, + chunkSize))); + + volume.createBucket(myBucket, bucketArgs.build()); + return volume.getBucket(myBucket); + } + + private static void initInputChunks() { + for (int i = 0; i < dataBlocks; i++) { + inputChunks[i] = getBytesWith(i + 1, chunkSize); + } + } + + private static byte[] getBytesWith(int singleDigitNumber, int total) { + StringBuilder builder = new StringBuilder(singleDigitNumber); + for (int i = 1; i <= total; i++) { + builder.append(singleDigitNumber); + } + return builder.toString().getBytes(UTF_8); + } + + @Test + public void testWriteShouldSucceedWhenDNKilled() throws Exception { + int numChunks = 3; + byte[] inputData = getInputBytes(numChunks); + final OzoneBucket bucket = getOzoneBucket(); + String keyName = "testWriteShouldSucceedWhenDNKilled" + numChunks; + DatanodeDetails nodeToKill = null; + try { + try (OzoneOutputStream out = bucket.createKey(keyName, 1024, + new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS, + chunkSize), new HashMap<>())) { + ECKeyOutputStream ecOut = (ECKeyOutputStream) out.getOutputStream(); + out.write(inputData); + // Kill a node from first pipeline + nodeToKill = ecOut.getStreamEntries() + .get(0).getPipeline().getFirstNode(); + cluster.shutdownHddsDatanode(nodeToKill); + + out.write(inputData); + + // Wait for flushing thread to finish its work. + final long checkpoint = System.currentTimeMillis(); + ecOut.insertFlushCheckpoint(checkpoint); + GenericTestUtils.waitFor(() -> ecOut.getFlushCheckpoint() == checkpoint, + 100, 10000); + + // Check the second blockGroup pipeline to make sure that the failed + // node is not selected. + assertThat(ecOut.getStreamEntries().get(1).getPipeline().getNodes()) + .doesNotContain(nodeToKill); + } + + try (OzoneInputStream is = bucket.readKey(keyName)) { + // We wrote "inputData" twice, so do two reads and ensure the correct + // data comes back. + for (int i = 0; i < 2; i++) { + byte[] fileContent = new byte[inputData.length]; + assertEquals(inputData.length, is.read(fileContent)); + assertEquals(new String(inputData, UTF_8), + new String(fileContent, UTF_8)); + } + } + } finally { + cluster.restartHddsDatanode(nodeToKill, true); + } + } + + private byte[] getInputBytes(int numChunks) { + return getInputBytes(0, numChunks, numChunks); + } + + private byte[] getInputBytes(int offset, int bufferChunks, int numChunks) { + byte[] inputData = new byte[offset + bufferChunks * chunkSize]; + for (int i = 0; i < numChunks; i++) { + int start = offset + (i * chunkSize); + Arrays.fill(inputData, start, start + chunkSize - 1, + String.valueOf(i % 9).getBytes(UTF_8)[0]); + } + return inputData; + } + + @Test + public void testBlockedHflushAndHsync() throws Exception { + // Expect ECKeyOutputStream hflush and hsync calls to throw exception + try (OzoneOutputStream oOut = TestHelper.createKey( + keyString, new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS, chunkSize), + inputSize, objectStore, volumeName, bucketName)) { + assertInstanceOf(ECKeyOutputStream.class, oOut.getOutputStream()); + KeyOutputStream kOut = (KeyOutputStream) oOut.getOutputStream(); + + assertThrows(NotImplementedException.class, () -> kOut.hflush()); + assertThrows(NotImplementedException.class, () -> kOut.hsync()); + } + } + } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStreamWithZeroCopy.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStreamWithZeroCopy.java deleted file mode 100644 index 47c94e03cb2..00000000000 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStreamWithZeroCopy.java +++ /dev/null @@ -1,31 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.hadoop.ozone.client.rpc; - -import org.junit.jupiter.api.BeforeAll; - -/** - * Tests key output stream with zero-copy enabled. - */ -public class TestECKeyOutputStreamWithZeroCopy extends - AbstractTestECKeyOutputStream { - @BeforeAll - public static void init() throws Exception { - init(true); - } -}