Skip to content

Commit

Permalink
HBASE-28187 NPE when flushing a non-existing column family (apache#5692)
Browse files Browse the repository at this point in the history
Signed-off-by: Nick Dimiduk <ndimiduk@apache.org>
Signed-off-by: Hui Ruan <huiruan@apache.org>
  • Loading branch information
guluo2016 authored and ndimiduk committed Sep 25, 2024
1 parent f97b903 commit 62244ba
Show file tree
Hide file tree
Showing 6 changed files with 215 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import org.apache.hadoop.hbase.quotas.QuotaSettings;
import org.apache.hadoop.hbase.quotas.QuotaTableUtil;
import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot;
import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
Expand Down Expand Up @@ -965,6 +966,8 @@ public CompletableFuture<Void> flush(TableName tableName) {

@Override
public CompletableFuture<Void> flush(TableName tableName, byte[] columnFamily) {
Preconditions.checkNotNull(columnFamily,
"columnFamily is null, If you don't specify a columnFamily, use flush(TableName) instead.");
return flush(tableName, Collections.singletonList(columnFamily));
}

Expand All @@ -974,6 +977,8 @@ public CompletableFuture<Void> flush(TableName tableName, List<byte[]> columnFam
// If the server version is lower than the client version, it's possible that the
// flushTable method is not present in the server side, if so, we need to fall back
// to the old implementation.
Preconditions.checkNotNull(columnFamilyList,
"columnFamily is null, If you don't specify a columnFamily, use flush(TableName) instead.");
List<byte[]> columnFamilies = columnFamilyList.stream()
.filter(cf -> cf != null && cf.length > 0).distinct().collect(Collectors.toList());
FlushTableRequest request = RequestConverter.buildFlushTableRequest(tableName, columnFamilies,
Expand All @@ -984,7 +989,10 @@ public CompletableFuture<Void> flush(TableName tableName, List<byte[]> columnFam
CompletableFuture<Void> future = new CompletableFuture<>();
addListener(procFuture, (ret, error) -> {
if (error != null) {
if (error instanceof TableNotFoundException || error instanceof TableNotEnabledException) {
if (
error instanceof TableNotFoundException || error instanceof TableNotEnabledException
|| error instanceof NoSuchColumnFamilyException
) {
future.completeExceptionally(error);
} else if (error instanceof DoNotRetryIOException) {
// usually this is caused by the method is not present on the server or
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,13 @@
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.procedure.flush.MasterFlushTableProcedureManager;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Strings;
import org.apache.yetus.audience.InterfaceAudience;
Expand Down Expand Up @@ -111,6 +114,29 @@ protected Flow executeFromState(MasterProcedureEnv env, FlushTableState state)
return Flow.HAS_MORE_STATE;
}

@Override
protected void preflightChecks(MasterProcedureEnv env, Boolean enabled) throws HBaseIOException {
super.preflightChecks(env, enabled);
if (columnFamilies == null) {
return;
}
MasterServices master = env.getMasterServices();
try {
TableDescriptor tableDescriptor = master.getTableDescriptors().get(tableName);
List<String> noSuchFamilies = columnFamilies.stream()
.filter(cf -> !tableDescriptor.hasColumnFamily(cf)).map(Bytes::toString).toList();
if (!noSuchFamilies.isEmpty()) {
throw new NoSuchColumnFamilyException("Column families " + noSuchFamilies
+ " don't exist in table " + tableName.getNameAsString());
}
} catch (IOException ioe) {
if (ioe instanceof HBaseIOException) {
throw (HBaseIOException) ioe;
}
throw new HBaseIOException(ioe);
}
}

@Override
protected void rollbackState(MasterProcedureEnv env, FlushTableState state)
throws IOException, InterruptedException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.stream.StreamSupport;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.errorhandling.ForeignException;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
Expand All @@ -40,9 +42,12 @@
import org.apache.hadoop.hbase.procedure.ProcedureCoordinator;
import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs;
import org.apache.hadoop.hbase.procedure.ZKProcedureCoordinator;
import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.access.AccessChecker;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Strings;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
Expand Down Expand Up @@ -152,7 +157,21 @@ public void execProcedure(ProcedureDescription desc) throws IOException {
families = nsp;
}
}
byte[] procArgs = families != null ? families.toByteArray() : new byte[0];

byte[] procArgs;
if (families != null) {
TableDescriptor tableDescriptor = master.getTableDescriptors().get(tableName);
List<String> noSuchFamilies =
StreamSupport.stream(Strings.SPLITTER.split(families.getValue()).spliterator(), false)
.filter(cf -> !tableDescriptor.hasColumnFamily(Bytes.toBytes(cf))).toList();
if (!noSuchFamilies.isEmpty()) {
throw new NoSuchColumnFamilyException("Column families " + noSuchFamilies
+ " don't exist in table " + tableName.getNameAsString());
}
procArgs = families.toByteArray();
} else {
procArgs = new byte[0];
}

// Kick of the global procedure from the master coordinator to the region servers.
// We rely on the existing Distributed Procedure framework to prevent any concurrent
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1776,8 +1776,15 @@ public FlushRegionResponse flushRegion(final RpcController controller,
// Go behind the curtain so we can manage writing of the flush WAL marker
HRegion.FlushResultImpl flushResult = null;
if (request.hasFamily()) {
List families = new ArrayList();
List<byte[]> families = new ArrayList();
families.add(request.getFamily().toByteArray());
TableDescriptor tableDescriptor = region.getTableDescriptor();
List<String> noSuchFamilies = families.stream()
.filter(f -> !tableDescriptor.hasColumnFamily(f)).map(Bytes::toString).toList();
if (!noSuchFamilies.isEmpty()) {
throw new NoSuchColumnFamilyException("Column families " + noSuchFamilies
+ " don't exist in table " + tableDescriptor.getTableName().getNameAsString());
}
flushResult =
region.flushcache(families, writeFlushWalMarker, FlushLifeCycleTracker.DUMMY);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,27 @@

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.junit.After;
import org.junit.AfterClass;
Expand Down Expand Up @@ -185,6 +192,30 @@ public void testAsyncFlushRegionFamily() throws Exception {
}
}

@Test
public void testAsyncFlushTableWithNonExistingFamilies() throws IOException {
AsyncAdmin admin = asyncConn.getAdmin();
List<byte[]> families = new ArrayList<>();
families.add(FAMILY_1);
families.add(FAMILY_2);
families.add(Bytes.toBytes("non_family01"));
families.add(Bytes.toBytes("non_family02"));
CompletableFuture<Void> future = CompletableFuture.allOf(admin.flush(tableName, families));
assertThrows(NoSuchColumnFamilyException.class, () -> FutureUtils.get(future));
}

@Test
public void testAsyncFlushRegionWithNonExistingFamily() throws IOException {
AsyncAdmin admin = asyncConn.getAdmin();
List<HRegion> regions = getRegionInfo();
assertNotNull(regions);
assertTrue(regions.size() > 0);
HRegion region = regions.get(0);
CompletableFuture<Void> future = CompletableFuture.allOf(admin
.flushRegion(region.getRegionInfo().getEncodedNameAsBytes(), Bytes.toBytes("non_family")));
assertThrows(NoSuchColumnFamilyException.class, () -> FutureUtils.get(future));
}

@Test
public void testFlushRegionServer() throws Exception {
try (Admin admin = TEST_UTIL.getAdmin()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThrows;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.procedure.flush.MasterFlushTableProcedureManager;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.io.Closeables;

@Category({ MediumTests.class, ClientTests.class })
public class TestFlushFromClientWithDisabledFlushProcedure {

@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestFlushFromClientWithDisabledFlushProcedure.class);

private static final Logger LOG =
LoggerFactory.getLogger(TestFlushFromClientWithDisabledFlushProcedure.class);
private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
private static AsyncConnection asyncConn;
private static final byte[] FAMILY = Bytes.toBytes("info");
private static final byte[] QUALIFIER = Bytes.toBytes("name");

@Rule
public TestName name = new TestName();

private TableName tableName;

@BeforeClass
public static void setUpBeforeClass() throws Exception {
Configuration configuration = TEST_UTIL.getConfiguration();
configuration.setBoolean(MasterFlushTableProcedureManager.FLUSH_PROCEDURE_ENABLED, false);
TEST_UTIL.startMiniCluster(1);
asyncConn = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
}

@AfterClass
public static void tearDownAfterClass() throws Exception {
Closeables.close(asyncConn, true);
TEST_UTIL.shutdownMiniCluster();
}

@Before
public void setUp() throws Exception {
tableName = TableName.valueOf(name.getMethodName());
try (Table t = TEST_UTIL.createTable(tableName, FAMILY)) {
List<Put> puts = new ArrayList<>();
for (int i = 0; i <= 10; ++i) {
Put put = new Put(Bytes.toBytes(i));
put.addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i));
puts.add(put);
}
t.put(puts);
}
List<HRegion> regions = TEST_UTIL.getHBaseCluster().getRegions(tableName);
assertFalse(regions.isEmpty());
}

@After
public void tearDown() throws Exception {
for (TableDescriptor htd : TEST_UTIL.getAdmin().listTableDescriptors()) {
LOG.info("Tear down, remove table=" + htd.getTableName());
TEST_UTIL.deleteTable(htd.getTableName());
}
}

@Test
public void flushTableWithNonExistingFamily() {
AsyncAdmin admin = asyncConn.getAdmin();
List<byte[]> families = new ArrayList<>();
families.add(FAMILY);
families.add(Bytes.toBytes("non_family01"));
families.add(Bytes.toBytes("non_family02"));
assertFalse(TEST_UTIL.getConfiguration().getBoolean(
MasterFlushTableProcedureManager.FLUSH_PROCEDURE_ENABLED,
MasterFlushTableProcedureManager.FLUSH_PROCEDURE_ENABLED_DEFAULT));
CompletableFuture<Void> future = CompletableFuture.allOf(admin.flush(tableName, families));
assertThrows(NoSuchColumnFamilyException.class, () -> FutureUtils.get(future));
}
}

0 comments on commit 62244ba

Please sign in to comment.