Skip to content

Commit

Permalink
HBASE-28719 Use ExtendedCell in WALEdit (#6108)
Browse files Browse the repository at this point in the history
Signed-off-by: Xin Sun <sunxin@apache.org>
(cherry picked from commit 836f2d9)
  • Loading branch information
Apache9 committed Jul 28, 2024
1 parent 5d9c023 commit aa96377
Show file tree
Hide file tree
Showing 44 changed files with 305 additions and 158 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
* TODO: A better solution is to separate the data structures used in client and server.
*/
@InterfaceAudience.Private
public class PackagePrivateFieldAccessor {
public class ClientInternalHelper {

public static void setMvccReadPoint(Scan scan, long mvccReadPoint) {
scan.setMvccReadPoint(mvccReadPoint);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import org.apache.hadoop.hbase.client.BalancerDecision;
import org.apache.hadoop.hbase.client.BalancerRejection;
import org.apache.hadoop.hbase.client.CheckAndMutate;
import org.apache.hadoop.hbase.client.ClientInternalHelper;
import org.apache.hadoop.hbase.client.ClientUtil;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
Expand All @@ -89,7 +90,6 @@
import org.apache.hadoop.hbase.client.LogEntry;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.OnlineLogRecord;
import org.apache.hadoop.hbase.client.PackagePrivateFieldAccessor;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.RegionLoadStats;
Expand Down Expand Up @@ -1082,7 +1082,7 @@ public static ClientProtos.Scan toScan(final Scan scan) throws IOException {
if (scan.getCaching() > 0) {
scanBuilder.setCaching(scan.getCaching());
}
long mvccReadPoint = PackagePrivateFieldAccessor.getMvccReadPoint(scan);
long mvccReadPoint = ClientInternalHelper.getMvccReadPoint(scan);
if (mvccReadPoint > 0) {
scanBuilder.setMvccReadPoint(mvccReadPoint);
}
Expand Down Expand Up @@ -1192,7 +1192,7 @@ public static Scan toScan(final ClientProtos.Scan proto) throws IOException {
scan.setCaching(proto.getCaching());
}
if (proto.hasMvccReadPoint()) {
PackagePrivateFieldAccessor.setMvccReadPoint(scan, proto.getMvccReadPoint());
ClientInternalHelper.setMvccReadPoint(scan, proto.getMvccReadPoint());
}
if (proto.hasReadType()) {
scan.setReadType(toReadType(proto.getReadType()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,12 @@
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ClientInternalHelper;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.PackagePrivateFieldAccessor;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Result;
Expand Down Expand Up @@ -205,7 +205,7 @@ public void map(ImmutableBytesWritable row, Result value, Context context) throw
filter == null || !filter.filterRowKey(
PrivateCellUtil.createFirstOnRow(row.get(), row.getOffset(), (short) row.getLength()))
) {
for (ExtendedCell kv : PackagePrivateFieldAccessor.getExtendedRawCells(value)) {
for (ExtendedCell kv : ClientInternalHelper.getExtendedRawCells(value)) {
kv = filterKv(filter, kv);
// skip if we filtered it out
if (kv == null) {
Expand Down Expand Up @@ -271,7 +271,7 @@ public void map(ImmutableBytesWritable row, Result value, Context context) throw
filter == null || !filter.filterRowKey(
PrivateCellUtil.createFirstOnRow(row.get(), row.getOffset(), (short) row.getLength()))
) {
for (ExtendedCell kv : PackagePrivateFieldAccessor.getExtendedRawCells(value)) {
for (ExtendedCell kv : ClientInternalHelper.getExtendedRawCells(value)) {
kv = filterKv(filter, kv);
// skip if we filtered it out
if (kv == null) {
Expand Down Expand Up @@ -336,7 +336,7 @@ private void writeResult(ImmutableBytesWritable key, Result result, Context cont

protected void processKV(ImmutableBytesWritable key, Result result, Context context, Put put,
Delete delete) throws IOException, InterruptedException {
for (ExtendedCell kv : PackagePrivateFieldAccessor.getExtendedRawCells(result)) {
for (ExtendedCell kv : ClientInternalHelper.getExtendedRawCells(result)) {
kv = filterKv(filter, kv);
// skip if we filter it out
if (kv == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import java.util.Map;
import java.util.Map.Entry;
import org.apache.hadoop.hbase.ExtendedCell;
import org.apache.hadoop.hbase.client.PackagePrivateFieldAccessor;
import org.apache.hadoop.hbase.client.ClientInternalHelper;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.yetus.audience.InterfaceAudience;
Expand Down Expand Up @@ -55,9 +55,9 @@ protected void reduce(K row, Iterable<Put> vals, Context context)
cnt++;
if (combinedPut == null) {
combinedPut = p;
combinedFamilyMap = PackagePrivateFieldAccessor.getExtendedFamilyCellMap(combinedPut);
combinedFamilyMap = ClientInternalHelper.getExtendedFamilyCellMap(combinedPut);
} else {
for (Entry<byte[], List<ExtendedCell>> entry : PackagePrivateFieldAccessor
for (Entry<byte[], List<ExtendedCell>> entry : ClientInternalHelper
.getExtendedFamilyCellMap(p).entrySet()) {
List<ExtendedCell> existCells = combinedFamilyMap.get(entry.getKey());
if (existCells == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.TagType;
import org.apache.hadoop.hbase.TagUtil;
import org.apache.hadoop.hbase.client.PackagePrivateFieldAccessor;
import org.apache.hadoop.hbase.client.ClientInternalHelper;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
Expand Down Expand Up @@ -100,8 +100,7 @@ protected void reduce(ImmutableBytesWritable row, Iterable<Put> puts,
// just ignoring the bad one?
throw new IOException("Invalid visibility expression found in mutation " + p, e);
}
for (List<ExtendedCell> cells : PackagePrivateFieldAccessor.getExtendedFamilyCellMap(p)
.values()) {
for (List<ExtendedCell> cells : ClientInternalHelper.getExtendedFamilyCellMap(p).values()) {
for (ExtendedCell cell : cells) {
// Creating the KV which needs to be directly written to HFiles. Using the Facade
// KVCreator for creation of kvs.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,14 @@
import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.client.ClientInternalHelper;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.PackagePrivateFieldAccessor;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Result;
Expand Down Expand Up @@ -366,7 +366,7 @@ public void testWithDeletes() throws Throwable {
s.setRaw(true);
ResultScanner scanner = t.getScanner(s);
Result r = scanner.next();
ExtendedCell[] res = PackagePrivateFieldAccessor.getExtendedRawCells(r);
ExtendedCell[] res = ClientInternalHelper.getExtendedRawCells(r);
assertTrue(PrivateCellUtil.isDeleteFamily(res[0]));
assertEquals(now + 4, res[1].getTimestamp());
assertEquals(now + 3, res[2].getTimestamp());
Expand Down Expand Up @@ -934,8 +934,7 @@ public void testTagsWithEmptyCodec() throws Exception {
int count = 0;
Result result;
while ((result = scanner.next()) != null) {
List<ExtendedCell> cells =
Arrays.asList(PackagePrivateFieldAccessor.getExtendedRawCells(result));
List<ExtendedCell> cells = Arrays.asList(ClientInternalHelper.getExtendedRawCells(result));
assertEquals(2, cells.size());
ExtendedCell cell = cells.get(0);
assertTrue(CellUtil.isDelete(cell));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALEditInternalHelper;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
Expand Down Expand Up @@ -143,10 +144,12 @@ public void testPartialRead() throws Exception {
// being millisecond based.
long ts = EnvironmentEdgeManager.currentTime();
WALEdit edit = new WALEdit();
edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), ts, value));
WALEditInternalHelper.addExtendedCell(edit,
new KeyValue(rowName, family, Bytes.toBytes("1"), ts, value));
log.appendData(info, getWalKeyImpl(ts, scopes), edit);
edit = new WALEdit();
edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), ts + 1, value));
WALEditInternalHelper.addExtendedCell(edit,
new KeyValue(rowName, family, Bytes.toBytes("2"), ts + 1, value));
log.appendData(info, getWalKeyImpl(ts + 1, scopes), edit);
log.sync();
Threads.sleep(10);
Expand All @@ -158,10 +161,12 @@ public void testPartialRead() throws Exception {
long ts1 = EnvironmentEdgeManager.currentTime();

edit = new WALEdit();
edit.add(new KeyValue(rowName, family, Bytes.toBytes("3"), ts1 + 1, value));
WALEditInternalHelper.addExtendedCell(edit,
new KeyValue(rowName, family, Bytes.toBytes("3"), ts1 + 1, value));
log.appendData(info, getWalKeyImpl(ts1 + 1, scopes), edit);
edit = new WALEdit();
edit.add(new KeyValue(rowName, family, Bytes.toBytes("4"), ts1 + 2, value));
WALEditInternalHelper.addExtendedCell(edit,
new KeyValue(rowName, family, Bytes.toBytes("4"), ts1 + 2, value));
log.appendData(info, getWalKeyImpl(ts1 + 2, scopes), edit);
log.sync();
log.shutdown();
Expand Down Expand Up @@ -203,8 +208,8 @@ public void testWALRecordReader() throws Exception {
WAL log = walfactory.getWAL(info);
byte[] value = Bytes.toBytes("value");
WALEdit edit = new WALEdit();
edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), EnvironmentEdgeManager.currentTime(),
value));
WALEditInternalHelper.addExtendedCell(edit, new KeyValue(rowName, family, Bytes.toBytes("1"),
EnvironmentEdgeManager.currentTime(), value));
long txid =
log.appendData(info, getWalKeyImpl(EnvironmentEdgeManager.currentTime(), scopes), edit);
log.sync(txid);
Expand All @@ -214,8 +219,8 @@ public void testWALRecordReader() throws Exception {
log.rollWriter();

edit = new WALEdit();
edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), EnvironmentEdgeManager.currentTime(),
value));
WALEditInternalHelper.addExtendedCell(edit, new KeyValue(rowName, family, Bytes.toBytes("2"),
EnvironmentEdgeManager.currentTime(), value));
txid = log.appendData(info, getWalKeyImpl(EnvironmentEdgeManager.currentTime(), scopes), edit);
log.sync(txid);
log.shutdown();
Expand Down Expand Up @@ -261,17 +266,17 @@ public void testWALRecordReaderActiveArchiveTolerance() throws Exception {
WAL log = walfactory.getWAL(info);
byte[] value = Bytes.toBytes("value");
WALEdit edit = new WALEdit();
edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), EnvironmentEdgeManager.currentTime(),
value));
WALEditInternalHelper.addExtendedCell(edit, new KeyValue(rowName, family, Bytes.toBytes("1"),
EnvironmentEdgeManager.currentTime(), value));
long txid =
log.appendData(info, getWalKeyImpl(EnvironmentEdgeManager.currentTime(), scopes), edit);
log.sync(txid);

Thread.sleep(10); // make sure 2nd edit gets a later timestamp

edit = new WALEdit();
edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), EnvironmentEdgeManager.currentTime(),
value));
WALEditInternalHelper.addExtendedCell(edit, new KeyValue(rowName, family, Bytes.toBytes("2"),
EnvironmentEdgeManager.currentTime(), value));
txid = log.appendData(info, getWalKeyImpl(EnvironmentEdgeManager.currentTime(), scopes), edit);
log.sync(txid);
log.shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.CheckAndMutate;
import org.apache.hadoop.hbase.client.CheckAndMutateResult;
import org.apache.hadoop.hbase.client.ClientInternalHelper;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.CompactionState;
import org.apache.hadoop.hbase.client.Delete;
Expand All @@ -112,7 +113,6 @@
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.IsolationLevel;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.PackagePrivateFieldAccessor;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
Expand Down Expand Up @@ -181,6 +181,7 @@
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALEditInternalHelper;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
Expand Down Expand Up @@ -3513,7 +3514,7 @@ protected void checkAndPrepareMutation(int index, long timestamp) throws IOExcep
// store the family map reference to allow for mutations
// we know that in mutation, only ExtendedCells are allow so here we do a fake cast, to
// simplify later logic
familyCellMaps[index] = PackagePrivateFieldAccessor.getExtendedFamilyCellMap(mutation);
familyCellMaps[index] = ClientInternalHelper.getExtendedFamilyCellMap(mutation);
}

// store durability for the batch (highest durability of all operations in the batch)
Expand Down Expand Up @@ -3709,7 +3710,9 @@ public boolean visit(int index) throws IOException {

// Add WAL edits from CPs.
WALEdit fromCP = walEditsFromCoprocessors[index];
List<Cell> cellsFromCP = fromCP == null ? Collections.emptyList() : fromCP.getCells();
List<ExtendedCell> cellsFromCP = fromCP == null
? Collections.emptyList()
: WALEditInternalHelper.getExtendedCells(fromCP);
addNonSkipWALMutationsToWALEdit(miniBatchOp, walEdit, cellsFromCP, familyCellMaps[index]);
return true;
}
Expand All @@ -3719,14 +3722,14 @@ public boolean visit(int index) throws IOException {

protected void addNonSkipWALMutationsToWALEdit(
final MiniBatchOperationInProgress<Mutation> miniBatchOp, WALEdit walEdit,
List<Cell> cellsFromCP, Map<byte[], List<ExtendedCell>> familyCellMap) {
List<ExtendedCell> cellsFromCP, Map<byte[], List<ExtendedCell>> familyCellMap) {
doAddCellsToWALEdit(walEdit, cellsFromCP, familyCellMap);
}

protected static void doAddCellsToWALEdit(WALEdit walEdit, List<Cell> cellsFromCP,
protected static void doAddCellsToWALEdit(WALEdit walEdit, List<ExtendedCell> cellsFromCP,
Map<byte[], List<ExtendedCell>> familyCellMap) {
walEdit.add(cellsFromCP);
walEdit.add((Map) familyCellMap);
WALEditInternalHelper.addExtendedCell(walEdit, cellsFromCP);
WALEditInternalHelper.addMap(walEdit, familyCellMap);
}

protected abstract void cacheSkipWALMutationForRegionReplication(
Expand Down Expand Up @@ -4064,7 +4067,7 @@ private Map<byte[], List<ExtendedCell>> reckonDeltas(Mutation mutation,
assert mutation instanceof Increment || mutation instanceof Append;
Map<byte[], List<ExtendedCell>> ret = new TreeMap<>(Bytes.BYTES_COMPARATOR);
// Process a Store/family at a time.
for (Map.Entry<byte[], List<ExtendedCell>> entry : PackagePrivateFieldAccessor
for (Map.Entry<byte[], List<ExtendedCell>> entry : ClientInternalHelper
.getExtendedFamilyCellMap(mutation).entrySet()) {
final byte[] columnFamilyName = entry.getKey();
List<ExtendedCell> deltas = (List) entry.getValue();
Expand Down Expand Up @@ -4260,7 +4263,7 @@ protected void cacheSkipWALMutationForRegionReplication(
this.createWALEditForReplicateSkipWAL(miniBatchOp, nonceKeyAndWALEdits);
miniBatchOp.setWalEditForReplicateIfExistsSkipWAL(walEditForReplicateIfExistsSkipWAL);
}
walEditForReplicateIfExistsSkipWAL.add((Map) familyCellMap);
WALEditInternalHelper.addMap(walEditForReplicateIfExistsSkipWAL, familyCellMap);

}

Expand All @@ -4279,8 +4282,7 @@ private WALEdit createWALEditForReplicateSkipWAL(
@Override
protected void addNonSkipWALMutationsToWALEdit(
final MiniBatchOperationInProgress<Mutation> miniBatchOp, WALEdit walEdit,
List<Cell> cellsFromCP, Map<byte[], List<ExtendedCell>> familyCellMap) {

List<ExtendedCell> cellsFromCP, Map<byte[], List<ExtendedCell>> familyCellMap) {
super.addNonSkipWALMutationsToWALEdit(miniBatchOp, walEdit, cellsFromCP, familyCellMap);
WALEdit walEditForReplicateIfExistsSkipWAL =
miniBatchOp.getWalEditForReplicateIfExistsSkipWAL();
Expand Down Expand Up @@ -4524,7 +4526,7 @@ private void checkAndMergeCPMutations(final MiniBatchOperationInProgress<Mutatio
// Returned mutations from coprocessor correspond to the Mutation at index i. We can
// directly add the cells from those mutations to the familyMaps of this mutation.
Map<byte[], List<ExtendedCell>> cpFamilyMap =
PackagePrivateFieldAccessor.getExtendedFamilyCellMap(cpMutation);
ClientInternalHelper.getExtendedFamilyCellMap(cpMutation);
region.rewriteCellTags(cpFamilyMap, mutation);
// will get added to the memStore later
mergeFamilyMaps(familyCellMaps[i], cpFamilyMap);
Expand Down Expand Up @@ -5096,16 +5098,16 @@ private CheckAndMutateResult checkAndMutateInternal(CheckAndMutate checkAndMutat
byte[] byteTs = Bytes.toBytes(ts);
if (mutation != null) {
if (mutation instanceof Put) {
updateCellTimestamps(
PackagePrivateFieldAccessor.getExtendedFamilyCellMap(mutation).values(), byteTs);
updateCellTimestamps(ClientInternalHelper.getExtendedFamilyCellMap(mutation).values(),
byteTs);
}
// And else 'delete' is not needed since it already does a second get, and sets the
// timestamp from get (see prepareDeleteTimestamps).
} else {
for (Mutation m : rowMutations.getMutations()) {
if (m instanceof Put) {
updateCellTimestamps(
PackagePrivateFieldAccessor.getExtendedFamilyCellMap(m).values(), byteTs);
updateCellTimestamps(ClientInternalHelper.getExtendedFamilyCellMap(m).values(),
byteTs);
}
}
// And else 'delete' is not needed since it already does a second get, and sets the
Expand Down
Loading

0 comments on commit aa96377

Please sign in to comment.