From 7728272c914a7a4033aeeb6fc2ebd20b1650b842 Mon Sep 17 00:00:00 2001 From: YeChao Chen Date: Mon, 3 Aug 2020 16:02:48 +0800 Subject: [PATCH 1/2] HBASE-24791 Improve HFileOutputFormat2 to avoid always call getTableRelativePath method (#2167) Signed-off-by: Anoop Signed-off-by: Ted Yu Change-Id: Ibf946930c20b15595a846d7b62e8b59cee42be07 --- .../hbase/mapreduce/HFileOutputFormat2.java | 28 ++++++++----------- 1 file changed, 11 insertions(+), 17 deletions(-) diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java index bd08e5bab46b..0c66183f0f1c 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java @@ -242,6 +242,7 @@ static RecordWriter createRecordWrit private final Map writers = new TreeMap<>(Bytes.BYTES_COMPARATOR); private final Map previousRows = new TreeMap<>(Bytes.BYTES_COMPARATOR); private final long now = EnvironmentEdgeManager.currentTime(); + private byte[] tableNameBytes = writeMultipleTables ? null : Bytes.toBytes(writeTableNames); @Override public void write(ImmutableBytesWritable row, V cell) throws IOException { @@ -255,7 +256,6 @@ public void write(ImmutableBytesWritable row, V cell) throws IOException { byte[] rowKey = CellUtil.cloneRow(kv); int length = (PrivateCellUtil.estimatedSerializedSizeOf(kv)) - Bytes.SIZEOF_INT; byte[] family = CellUtil.cloneFamily(kv); - byte[] tableNameBytes = null; if (writeMultipleTables) { tableNameBytes = MultiTableHFileOutputFormat.getTableName(row.get()); tableNameBytes = TableName.valueOf(tableNameBytes).toBytes(); @@ -263,10 +263,7 @@ public void write(ImmutableBytesWritable row, V cell) throws IOException { throw new IllegalArgumentException( "TableName " + Bytes.toString(tableNameBytes) + " not expected"); } - } else { - tableNameBytes = Bytes.toBytes(writeTableNames); } - Path tableRelPath = getTableRelativePath(tableNameBytes); byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableNameBytes, family); WriterLength wl = this.writers.get(tableAndFamily); @@ -274,6 +271,7 @@ public void write(ImmutableBytesWritable row, V cell) throws IOException { if (wl == null) { Path writerPath = null; if (writeMultipleTables) { + Path tableRelPath = getTableRelativePath(tableNameBytes); writerPath = new Path(outputDir, new Path(tableRelPath, Bytes.toString(family))); } else { writerPath = new Path(outputDir, Bytes.toString(family)); @@ -292,15 +290,15 @@ public void write(ImmutableBytesWritable row, V cell) throws IOException { // create a new WAL writer, if necessary if (wl == null || wl.writer == null) { + InetSocketAddress[] favoredNodes = null; if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) { HRegionLocation loc = null; String tableName = Bytes.toString(tableNameBytes); if (tableName != null) { - try ( - Connection connection = - ConnectionFactory.createConnection(createRemoteClusterConf(conf)); - RegionLocator locator = connection.getRegionLocator(TableName.valueOf(tableName))) { + try (Connection connection = ConnectionFactory.createConnection(createRemoteClusterConf(conf)); + RegionLocator locator = + connection.getRegionLocator(TableName.valueOf(tableName))) { loc = locator.getRegionLocation(rowKey); } catch (Throwable e) { LOG.warn("Something wrong locating rowkey {} in {}", Bytes.toString(rowKey), @@ -308,26 +306,22 @@ public void write(ImmutableBytesWritable row, V cell) throws IOException { loc = null; } } - if (null == loc) { LOG.trace("Failed get of location, use default writer {}", Bytes.toString(rowKey)); - wl = getNewWriter(tableNameBytes, family, conf, null); } else { LOG.debug("First rowkey: [{}]", Bytes.toString(rowKey)); InetSocketAddress initialIsa = new InetSocketAddress(loc.getHostname(), loc.getPort()); if (initialIsa.isUnresolved()) { LOG.trace("Failed resolve address {}, use default writer", loc.getHostnamePort()); - wl = getNewWriter(tableNameBytes, family, conf, null); } else { LOG.debug("Use favored nodes writer: {}", initialIsa.getHostString()); - wl = getNewWriter(tableNameBytes, family, conf, - new InetSocketAddress[] { initialIsa }); + favoredNodes = new InetSocketAddress[] { initialIsa }; } } - } else { - wl = getNewWriter(tableNameBytes, family, conf, null); } + wl = getNewWriter(tableNameBytes, family, conf, favoredNodes); + } // we now have the proper WAL writer. full steam ahead @@ -342,9 +336,9 @@ public void write(ImmutableBytesWritable row, V cell) throws IOException { private Path getTableRelativePath(byte[] tableNameBytes) { String tableName = Bytes.toString(tableNameBytes); String[] tableNameParts = tableName.split(":"); - Path tableRelPath = new Path(tableName.split(":")[0]); + Path tableRelPath = new Path(tableNameParts[0]); if (tableNameParts.length > 1) { - tableRelPath = new Path(tableRelPath, tableName.split(":")[1]); + tableRelPath = new Path(tableRelPath, tableNameParts[1]); } return tableRelPath; } From 7ae50459b08725f7ea46c83b5a5d30b0ea889e26 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vill=C5=91=20Sz=C5=B1cs?= Date: Fri, 10 May 2024 13:46:33 +0200 Subject: [PATCH 2/2] spotless fix Change-Id: I74529a80ed2fd0109764edab791bb70aa2aff054 --- .../apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java index 0c66183f0f1c..d86956b684c9 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java @@ -296,9 +296,10 @@ public void write(ImmutableBytesWritable row, V cell) throws IOException { String tableName = Bytes.toString(tableNameBytes); if (tableName != null) { - try (Connection connection = ConnectionFactory.createConnection(createRemoteClusterConf(conf)); - RegionLocator locator = - connection.getRegionLocator(TableName.valueOf(tableName))) { + try ( + Connection connection = + ConnectionFactory.createConnection(createRemoteClusterConf(conf)); + RegionLocator locator = connection.getRegionLocator(TableName.valueOf(tableName))) { loc = locator.getRegionLocation(rowKey); } catch (Throwable e) { LOG.warn("Something wrong locating rowkey {} in {}", Bytes.toString(rowKey),