diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/AuthenticatedProducerConsumerTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/AuthenticatedProducerConsumerTest.java index ca379ecee9ae8..3415a90eb21d6 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/AuthenticatedProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/AuthenticatedProducerConsumerTest.java @@ -165,7 +165,7 @@ public void testTlsSyncProducerAndConsumer(int batchMessageDelayMs) throws Excep * @throws Exception */ @Test - public void testAuthemticationFilterNegative() throws Exception { + public void testAuthenticationFilterNegative() throws Exception { log.info("-- Starting {} test --", methodName); Map authParams = new HashMap<>(); diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/TlsProducerConsumerTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/TlsProducerConsumerTest.java new file mode 100644 index 0000000000000..f71e9a1d8b9b1 --- /dev/null +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/TlsProducerConsumerTest.java @@ -0,0 +1,121 @@ +/** + * Copyright 2016 Yahoo Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.yahoo.pulsar.client.api; + +import static org.mockito.Mockito.spy; + +import java.net.URI; +import java.util.Arrays; +import java.util.concurrent.TimeUnit; + +import org.junit.Assert; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.yahoo.pulsar.client.admin.PulsarAdmin; +import com.yahoo.pulsar.common.policies.data.ClusterData; +import com.yahoo.pulsar.common.policies.data.PropertyAdmin; + +public class TlsProducerConsumerTest extends ProducerConsumerBase { + private static final Logger log = LoggerFactory.getLogger(AuthenticatedProducerConsumerTest.class); + + private final String TLS_TRUST_CERT_FILE_PATH = "./src/test/resources/authentication/tls/cacert.pem"; + private final String TLS_SERVER_CERT_FILE_PATH = "./src/test/resources/authentication/tls/broker-cert.pem"; + private final String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/authentication/tls/broker-key.pem"; + + @BeforeMethod + @Override + protected void setup() throws Exception { + + conf.setTlsEnabled(true); + conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); + conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); + + conf.setClusterName("use"); + + super.init(); + } + + protected final void internalSetupForTls() throws Exception { + + com.yahoo.pulsar.client.api.ClientConfiguration clientConf = new com.yahoo.pulsar.client.api.ClientConfiguration(); + clientConf.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH); + clientConf.setUseTls(true); + + admin = spy(new PulsarAdmin(brokerUrlTls, clientConf)); + String lookupUrl = new URI("pulsar+ssl://localhost:" + BROKER_PORT_TLS).toString(); + pulsarClient = PulsarClient.create(lookupUrl, clientConf); + } + + @AfterMethod + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + /** + * verifies that messages whose size is larger than 2^14 bytes (max size of single TLS chunk) can be + * produced/consumed + * + * @throws Exception + */ + @Test + public void testTlsLargeSizeMessage() throws Exception { + log.info("-- Starting {} test --", methodName); + + final int MESSAGE_SIZE = 16 * 1024 + 1; + log.info("-- message size --", MESSAGE_SIZE); + + internalSetupForTls(); + + admin.clusters().createCluster("use", new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(), + "pulsar://localhost:" + BROKER_PORT, "pulsar+ssl://localhost:" + BROKER_PORT_TLS)); + admin.properties().createProperty("my-property", + new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("use"))); + admin.namespaces().createNamespace("my-property/use/my-ns"); + + ConsumerConfiguration conf = new ConsumerConfiguration(); + conf.setSubscriptionType(SubscriptionType.Exclusive); + Consumer consumer = pulsarClient.subscribe("persistent://my-property/use/my-ns/my-topic1", "my-subscriber-name", + conf); + + ProducerConfiguration producerConf = new ProducerConfiguration(); + + Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic1", producerConf); + for (int i = 0; i < 10; i++) { + byte[] message = new byte[MESSAGE_SIZE]; + Arrays.fill(message, (byte) i); + producer.send(message); + } + + Message msg = null; + for (int i = 0; i < 10; i++) { + msg = consumer.receive(5, TimeUnit.SECONDS); + byte[] expected = new byte[MESSAGE_SIZE]; + Arrays.fill(expected, (byte) i); + Assert.assertArrayEquals(expected, msg.getData()); + } + // Acknowledge the consumption of all messages at once + consumer.acknowledgeCumulative(msg); + consumer.close(); + log.info("-- Exiting {} test --", methodName); + } +} diff --git a/pulsar-common/src/main/java/com/yahoo/pulsar/common/api/DoubleByteBuf.java b/pulsar-common/src/main/java/com/yahoo/pulsar/common/api/DoubleByteBuf.java index 67b39e7570285..ed3649493a76e 100644 --- a/pulsar-common/src/main/java/com/yahoo/pulsar/common/api/DoubleByteBuf.java +++ b/pulsar-common/src/main/java/com/yahoo/pulsar/common/api/DoubleByteBuf.java @@ -134,11 +134,6 @@ public int capacity() { return b1.capacity() + b2.capacity(); } - @Override - public int readableBytes() { - return b1.readableBytes() + b2.readableBytes(); - } - @Override public int writableBytes() { return 0; diff --git a/pulsar-common/src/test/java/com/yahoo/pulsar/common/api/DoubleByteBufTest.java b/pulsar-common/src/test/java/com/yahoo/pulsar/common/api/DoubleByteBufTest.java new file mode 100644 index 0000000000000..e48eed8b157cc --- /dev/null +++ b/pulsar-common/src/test/java/com/yahoo/pulsar/common/api/DoubleByteBufTest.java @@ -0,0 +1,49 @@ +/** + * Copyright 2016 Yahoo Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.yahoo.pulsar.common.api; + +import static org.testng.Assert.assertEquals; +import org.testng.annotations.Test; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; + +public class DoubleByteBufTest { + + /** + * Verify that readableBytes() returns writerIndex - readerIndex. In this case writerIndex is the end of the buffer + * and readerIndex is increased by 64. + * + * @throws Exception + */ + @Test + public void testReadableBytes() throws Exception { + + ByteBuf b1 = PooledByteBufAllocator.DEFAULT.heapBuffer(128, 128); + b1.writerIndex(b1.capacity()); + ByteBuf b2 = PooledByteBufAllocator.DEFAULT.heapBuffer(128, 128); + b2.writerIndex(b2.capacity()); + ByteBuf buf = DoubleByteBuf.get(b1, b2); + + assertEquals(buf.readerIndex(), 0); + assertEquals(buf.writerIndex(), 256); + assertEquals(buf.readableBytes(), 256); + + for (int i = 0; i < 4; ++i) { + buf.skipBytes(64); + assertEquals(buf.readableBytes(), 256 - 64 * (i + 1)); + } + } +}