Skip to content

Commit

Permalink
HBASE-28547 Support specifying connection configuration through queri…
Browse files Browse the repository at this point in the history
…es of the connection uri (#5853)

Signed-off-by: Nick Dimiduk <ndimiduk@apache.org>
(cherry picked from commit 2dc7e15)
  • Loading branch information
Apache9 committed May 20, 2024
1 parent a21c649 commit 03fa8b8
Show file tree
Hide file tree
Showing 4 changed files with 238 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.hbase.util.Strings;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -410,9 +411,16 @@ public static Connection createConnection(URI connectionUri, Configuration conf,
Constructor<?> constructor = clazz.getDeclaredConstructor(Configuration.class,
ExecutorService.class, User.class, ConnectionRegistry.class, Map.class);
constructor.setAccessible(true);
ConnectionRegistry registry = connectionUri != null
? ConnectionRegistryFactory.create(connectionUri, conf, user)
: ConnectionRegistryFactory.create(conf, user);
ConnectionRegistry registry;
Configuration appliedConf;
if (connectionUri != null) {
appliedConf = new Configuration(conf);
Strings.applyURIQueriesToConf(connectionUri, appliedConf);
registry = ConnectionRegistryFactory.create(connectionUri, appliedConf, user);
} else {
appliedConf = conf;
registry = ConnectionRegistryFactory.create(appliedConf, user);
}
return user.runAs((PrivilegedExceptionAction<Connection>) () -> (Connection) constructor
.newInstance(conf, pool, user, registry, connectionAttributes));
} catch (NoSuchMethodException e) {
Expand Down Expand Up @@ -577,8 +585,16 @@ public static CompletableFuture<AsyncConnection> createAsyncConnection(URI conne
Configuration conf, final User user, Map<String, byte[]> connectionAttributes) {
return TraceUtil.tracedFuture(() -> {
ConnectionRegistry registry;
Configuration appliedConf;
try {
registry = createConnectionRegistry(connectionUri, conf, user);
if (connectionUri != null) {
appliedConf = new Configuration(conf);
Strings.applyURIQueriesToConf(connectionUri, appliedConf);
registry = ConnectionRegistryFactory.create(connectionUri, appliedConf, user);
} else {
appliedConf = conf;
registry = ConnectionRegistryFactory.create(appliedConf, user);
}
} catch (Exception e) {
return FutureUtils.failedFuture(e);
}
Expand All @@ -594,8 +610,8 @@ public static CompletableFuture<AsyncConnection> createAsyncConnection(URI conne
future.completeExceptionally(new IOException("clusterid came back null"));
return;
}
Class<? extends AsyncConnection> clazz = conf.getClass(HBASE_CLIENT_ASYNC_CONNECTION_IMPL,
AsyncConnectionImpl.class, AsyncConnection.class);
Class<? extends AsyncConnection> clazz = appliedConf.getClass(
HBASE_CLIENT_ASYNC_CONNECTION_IMPL, AsyncConnectionImpl.class, AsyncConnection.class);
try {
future.complete(
user.runAs((PrivilegedExceptionAction<? extends AsyncConnection>) () -> ReflectionUtils
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;

import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.when;

import java.net.URI;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.ArgumentCaptor;
import org.mockito.MockedStatic;

@Category({ ClientTests.class, SmallTests.class })
public class TestConnectionFactoryApplyURIQueries {

@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestConnectionFactoryApplyURIQueries.class);

private Configuration conf;

private MockedStatic<ConnectionRegistryFactory> mockedConnectionRegistryFactory;

private ConnectionRegistry registry;

@Before
public void setUp() {
conf = HBaseConfiguration.create();
mockedConnectionRegistryFactory = mockStatic(ConnectionRegistryFactory.class);
registry = mock(ConnectionRegistry.class);
mockedConnectionRegistryFactory
.when(() -> ConnectionRegistryFactory.create(any(), any(), any())).thenReturn(registry);
when(registry.getClusterId()).thenReturn(CompletableFuture.completedFuture("cluster"));
}

@After
public void tearDown() {
mockedConnectionRegistryFactory.closeOnDemand();
}

@Test
public void testApplyURIQueries() throws Exception {
ConnectionFactory.createConnection(new URI("hbase+rpc://server:16010?a=1&b=2&c"), conf);
ArgumentCaptor<Configuration> captor = ArgumentCaptor.forClass(Configuration.class);
mockedConnectionRegistryFactory
.verify(() -> ConnectionRegistryFactory.create(any(), captor.capture(), any()));
Configuration c = captor.getValue();
assertEquals("1", c.get("a"));
assertEquals("2", c.get("b"));
assertEquals("", c.get("c"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,15 @@
*/
package org.apache.hadoop.hbase.util;

import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
Expand Down Expand Up @@ -93,4 +101,37 @@ public static String padFront(String input, char padding, int length) {
int numPaddingCharacters = length - input.length();
return StringUtils.repeat(padding, numPaddingCharacters) + input;
}

/**
* Parse the query string of an URI to a key value map. If a single key occurred multiple times,
* only the first one will take effect.
*/
public static Map<String, String> parseURIQueries(URI uri) {
if (StringUtils.isBlank(uri.getRawQuery())) {
return Collections.emptyMap();
}
return Splitter.on('&').trimResults().splitToStream(uri.getRawQuery()).map(kv -> {
int idx = kv.indexOf('=');
try {
if (idx > 0) {
return Pair.newPair(
URLDecoder.decode(kv.substring(0, idx), StandardCharsets.UTF_8.name()),
URLDecoder.decode(kv.substring(idx + 1), StandardCharsets.UTF_8.name()));
} else {
return Pair.newPair(URLDecoder.decode(kv, StandardCharsets.UTF_8.name()), "");
}
} catch (UnsupportedEncodingException e) {
// should not happen
throw new AssertionError(e);
}
}).collect(Collectors.toMap(Pair::getFirst, Pair::getSecond, (v1, v2) -> v1));
}

/**
* Apply the key value pairs in the query string of the given URI to the given Configuration. If a
* single key occurred multiple times, only the first one will take effect.
*/
public static void applyURIQueriesToConf(URI uri, Configuration conf) {
parseURIQueries(uri).forEach(conf::set);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;

import java.net.URI;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({ SmallTests.class })
public class TestStrings {

@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestStrings.class);

@Test
public void testAppendKeyValue() {
assertEquals("foo, bar=baz",
Strings.appendKeyValue(new StringBuilder("foo"), "bar", "baz").toString());
assertEquals("bar->baz",
Strings.appendKeyValue(new StringBuilder(), "bar", "baz", "->", "| ").toString());
assertEquals("foo, bar=baz",
Strings.appendKeyValue(new StringBuilder("foo"), "bar", "baz", "=", ", ").toString());
assertEquals("foo| bar->baz",
Strings.appendKeyValue(new StringBuilder("foo"), "bar", "baz", "->", "| ").toString());
}

@Test
public void testDomainNamePointerToHostName() {
assertNull(Strings.domainNamePointerToHostName(null));
assertEquals("foo", Strings.domainNamePointerToHostName("foo"));
assertEquals("foo.com", Strings.domainNamePointerToHostName("foo.com"));
assertEquals("foo.bar.com", Strings.domainNamePointerToHostName("foo.bar.com"));
assertEquals("foo.bar.com", Strings.domainNamePointerToHostName("foo.bar.com."));
}

@Test
public void testPadFront() {
assertEquals("ddfoo", Strings.padFront("foo", 'd', 5));
assertThrows(IllegalArgumentException.class, () -> Strings.padFront("foo", 'd', 1));
}

@Test
public void testParseURIQueries() throws Exception {
Map<String,
String> queries = Strings.parseURIQueries(new URI("hbase+rpc://server01:123?a=1&b=2&a=3&"
+ URLEncoder.encode("& ?", StandardCharsets.UTF_8.name()) + "=&"
+ URLEncoder.encode("===", StandardCharsets.UTF_8.name())));
assertEquals("1", queries.get("a"));
assertEquals("2", queries.get("b"));
assertEquals("", queries.get("& ?"));
assertEquals("", queries.get("==="));
assertEquals(4, queries.size());

assertTrue(Strings.parseURIQueries(new URI("hbase+zk://zk1:2181/")).isEmpty());
assertTrue(Strings.parseURIQueries(new URI("hbase+zk://zk1:2181/?")).isEmpty());
assertTrue(Strings.parseURIQueries(new URI("hbase+zk://zk1:2181/?#anchor")).isEmpty());
}

@Test
public void testApplyURIQueriesToConf() throws Exception {
Configuration conf = new Configuration();
Strings.applyURIQueriesToConf(new URI("hbase+zk://aaa:2181/root?a=1&b=2&c"), conf);
assertEquals("1", conf.get("a"));
assertEquals("2", conf.get("b"));
assertEquals("", conf.get("c"));
}
}

0 comments on commit 03fa8b8

Please sign in to comment.