Skip to content

Commit

Permalink
HBASE-24791 Improve HFileOutputFormat2 to avoid always call getTableR…
Browse files Browse the repository at this point in the history
…elativePath method (apache#2167)

Signed-off-by: Anoop <anoopsamjohn@apache.org>
Signed-off-by: Ted Yu <tyu@apache.org>
Change-Id: Ief23ecd94cba02af177a8187ad27a1d13fd0243e
  • Loading branch information
utf7 authored and Villő Szűcs committed May 10, 2024
1 parent 40beff0 commit 9022f2d
Showing 1 changed file with 11 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ static <V extends Cell> RecordWriter<ImmutableBytesWritable, V> createRecordWrit
private final Map<byte[], WriterLength> writers = new TreeMap<>(Bytes.BYTES_COMPARATOR);
private final Map<byte[], byte[]> previousRows = new TreeMap<>(Bytes.BYTES_COMPARATOR);
private final long now = EnvironmentEdgeManager.currentTime();
private byte[] tableNameBytes = writeMultipleTables ? null : Bytes.toBytes(writeTableNames);

@Override
public void write(ImmutableBytesWritable row, V cell) throws IOException {
Expand All @@ -265,25 +266,22 @@ public void write(ImmutableBytesWritable row, V cell) throws IOException {
byte[] rowKey = CellUtil.cloneRow(kv);
int length = (PrivateCellUtil.estimatedSerializedSizeOf(kv)) - Bytes.SIZEOF_INT;
byte[] family = CellUtil.cloneFamily(kv);
byte[] tableNameBytes = null;
if (writeMultipleTables) {
tableNameBytes = MultiTableHFileOutputFormat.getTableName(row.get());
tableNameBytes = TableName.valueOf(tableNameBytes).toBytes();
if (!allTableNames.contains(Bytes.toString(tableNameBytes))) {
throw new IllegalArgumentException(
"TableName " + Bytes.toString(tableNameBytes) + " not expected");
}
} else {
tableNameBytes = Bytes.toBytes(writeTableNames);
}
Path tableRelPath = getTableRelativePath(tableNameBytes);
byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableNameBytes, family);
WriterLength wl = this.writers.get(tableAndFamily);

// If this is a new column family, verify that the directory exists
if (wl == null) {
Path writerPath = null;
if (writeMultipleTables) {
Path tableRelPath = getTableRelativePath(tableNameBytes);
writerPath = new Path(outputDir, new Path(tableRelPath, Bytes.toString(family)));
} else {
writerPath = new Path(outputDir, Bytes.toString(family));
Expand All @@ -302,42 +300,38 @@ public void write(ImmutableBytesWritable row, V cell) throws IOException {

// create a new WAL writer, if necessary
if (wl == null || wl.writer == null) {
InetSocketAddress[] favoredNodes = null;
if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
HRegionLocation loc = null;

String tableName = Bytes.toString(tableNameBytes);
if (tableName != null) {
try (
Connection connection =
ConnectionFactory.createConnection(createRemoteClusterConf(conf));
RegionLocator locator = connection.getRegionLocator(TableName.valueOf(tableName))) {
try (Connection connection = ConnectionFactory.createConnection(createRemoteClusterConf(conf));
RegionLocator locator =
connection.getRegionLocator(TableName.valueOf(tableName))) {
loc = locator.getRegionLocation(rowKey);
} catch (Throwable e) {
LOG.warn("Something wrong locating rowkey {} in {}", Bytes.toString(rowKey),
tableName, e);
loc = null;
}
}

if (null == loc) {
LOG.trace("Failed get of location, use default writer {}", Bytes.toString(rowKey));
wl = getNewWriter(tableNameBytes, family, conf, null);
} else {
LOG.debug("First rowkey: [{}]", Bytes.toString(rowKey));
InetSocketAddress initialIsa =
new InetSocketAddress(loc.getHostname(), loc.getPort());
if (initialIsa.isUnresolved()) {
LOG.trace("Failed resolve address {}, use default writer", loc.getHostnamePort());
wl = getNewWriter(tableNameBytes, family, conf, null);
} else {
LOG.debug("Use favored nodes writer: {}", initialIsa.getHostString());
wl = getNewWriter(tableNameBytes, family, conf,
new InetSocketAddress[] { initialIsa });
favoredNodes = new InetSocketAddress[] { initialIsa };
}
}
} else {
wl = getNewWriter(tableNameBytes, family, conf, null);
}
wl = getNewWriter(tableNameBytes, family, conf, favoredNodes);

}

// we now have the proper WAL writer. full steam ahead
Expand All @@ -352,9 +346,9 @@ public void write(ImmutableBytesWritable row, V cell) throws IOException {
private Path getTableRelativePath(byte[] tableNameBytes) {
String tableName = Bytes.toString(tableNameBytes);
String[] tableNameParts = tableName.split(":");
Path tableRelPath = new Path(tableName.split(":")[0]);
Path tableRelPath = new Path(tableNameParts[0]);
if (tableNameParts.length > 1) {
tableRelPath = new Path(tableRelPath, tableName.split(":")[1]);
tableRelPath = new Path(tableRelPath, tableNameParts[1]);
}
return tableRelPath;
}
Expand Down

0 comments on commit 9022f2d

Please sign in to comment.