Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
Expand All @@ -46,8 +45,6 @@
import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
import org.apache.hadoop.hbase.replication.EmptyEntriesPolicy;
import org.apache.hadoop.hbase.replication.ReplicationResult;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
Expand Down Expand Up @@ -104,7 +101,6 @@ public void init(Context context) throws IOException {

initializePeerUUID();
initializeBackupFileSystemManager();
startWalFlushExecutor();
LOG.info("{} Initialization complete", Utils.logPeerId(peerId));
}

Expand Down Expand Up @@ -137,26 +133,20 @@ private void initializeBackupFileSystemManager() throws IOException {
}
}

private void startWalFlushExecutor() {
int initialDelay = conf.getInt(CONF_STAGED_WAL_FLUSH_INITIAL_DELAY,
DEFAULT_STAGED_WAL_FLUSH_INITIAL_DELAY_SECONDS);
int flushInterval =
conf.getInt(CONF_STAGED_WAL_FLUSH_INTERVAL, DEFAULT_STAGED_WAL_FLUSH_INTERVAL_SECONDS);

flushExecutor = Executors.newSingleThreadScheduledExecutor();
flushExecutor.scheduleAtFixedRate(this::flushAndBackupSafely, initialDelay, flushInterval,
TimeUnit.SECONDS);
LOG.info("{} Scheduled WAL flush executor started with initial delay {}s and interval {}s",
Utils.logPeerId(peerId), initialDelay, flushInterval);
public long getMaxBufferSize() {
return conf.getLong(CONF_BACKUP_MAX_WAL_SIZE, DEFAULT_MAX_WAL_SIZE);
}

private void flushAndBackupSafely() {
public long maxFlushInterval() {
return conf.getLong(CONF_STAGED_WAL_FLUSH_INTERVAL, DEFAULT_STAGED_WAL_FLUSH_INTERVAL_SECONDS);
}

public void beforePersistingReplicationOffset() {
lock.lock();
try {
LOG.info("{} Periodic WAL flush triggered", Utils.logPeerId(peerId));
LOG.info("{} WAL flush triggered", Utils.logPeerId(peerId));
flushWriters();
replicationSource.persistOffsets();
LOG.info("{} Periodic WAL flush and offset persistence completed successfully",
LOG.info("{} WAL flush and offset persistence completed successfully",
Utils.logPeerId(peerId));
} catch (IOException e) {
LOG.error("{} Error during WAL flush: {}", Utils.logPeerId(peerId), e.getMessage(), e);
Expand Down Expand Up @@ -212,19 +202,11 @@ protected void doStart() {
}

@Override
public EmptyEntriesPolicy getEmptyEntriesPolicy() {
// Since this endpoint writes to S3 asynchronously, an empty entry batch
// does not guarantee that all previously submitted entries were persisted.
// Hence, avoid committing the WAL position.
return EmptyEntriesPolicy.SUBMIT;
}

@Override
public ReplicationResult replicate(ReplicateContext replicateContext) {
public boolean replicate(ReplicateContext replicateContext) {
final List<WAL.Entry> entries = replicateContext.getEntries();
if (entries.isEmpty()) {
LOG.debug("{} No WAL entries to replicate", Utils.logPeerId(peerId));
return ReplicationResult.SUBMITTED;
return true;
}

LOG.debug("{} Received {} WAL entries for replication", Utils.logPeerId(peerId),
Expand All @@ -244,24 +226,15 @@ public ReplicationResult replicate(ReplicateContext replicateContext) {

// Capture the timestamp of the last WAL entry processed. This is used as the replication
// checkpoint so that point-in-time restores know the latest consistent time up to which
// replication has
// occurred.
// replication has occurred.
latestWALEntryTimestamp = entries.get(entries.size() - 1).getKey().getWriteTime();

if (isAnyWriterFull()) {
LOG.debug("{} Some WAL writers reached max size, triggering flush",
Utils.logPeerId(peerId));
flushWriters();
LOG.debug("{} Replication committed after WAL flush", Utils.logPeerId(peerId));
return ReplicationResult.COMMITTED;
}

LOG.debug("{} Replication submitted successfully", Utils.logPeerId(peerId));
return ReplicationResult.SUBMITTED;
return true;
} catch (IOException e) {
LOG.error("{} Replication failed. Error details: {}", Utils.logPeerId(peerId), e.getMessage(),
e);
return ReplicationResult.FAILED;
return false;
} finally {
lock.unlock();
}
Expand All @@ -288,15 +261,6 @@ private Map<Long, List<WAL.Entry>> groupEntriesByDay(List<WAL.Entry> entries) {
* ONE_DAY_IN_MILLISECONDS));
}

private boolean isAnyWriterFull() {
return walWriters.values().stream().anyMatch(this::isWriterFull);
}

private boolean isWriterFull(FSHLogProvider.Writer writer) {
long maxWalSize = conf.getLong(CONF_BACKUP_MAX_WAL_SIZE, DEFAULT_MAX_WAL_SIZE);
return writer.getLength() >= maxWalSize;
}

private void backupWalEntries(long day, List<WAL.Entry> walEntries) throws IOException {
LOG.debug("{} Starting backup of {} WAL entries for day {}", Utils.logPeerId(peerId),
walEntries.size(), day);
Expand Down Expand Up @@ -379,7 +343,6 @@ private void close() {
lock.lock();
try {
flushWriters();
replicationSource.persistOffsets();
} catch (IOException e) {
LOG.error("{} Failed to Flush Open Wal Writers: {}", Utils.logPeerId(peerId), e.getMessage(),
e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ public int getTimeout() {
* the context are assumed to be persisted in the target cluster.
* @param replicateContext a context where WAL entries and other parameters can be obtained.
*/
ReplicationResult replicate(ReplicateContext replicateContext);
boolean replicate(ReplicateContext replicateContext);

// The below methods are inspired by Guava Service. See
// https://github.com/google/guava/wiki/ServiceExplained for overview of Guava Service.
Expand Down Expand Up @@ -292,21 +292,25 @@ public int getTimeout() {
*/
Throwable failureCause();

/**
* Defines the behavior when the replication source encounters an empty entry batch.
* <p>
* By default, this method returns {@link EmptyEntriesPolicy#COMMIT}, meaning the replication
* source can safely consider the WAL position as committed and move on.
* </p>
* <p>
* However, certain endpoints like backup or asynchronous S3 writers may delay persistence (e.g.,
* writing to temporary files or buffers). In those cases, returning
* {@link EmptyEntriesPolicy#SUBMIT} avoids incorrectly advancing WAL position and risking data
* loss.
* </p>
* @return the {@link EmptyEntriesPolicy} to apply for empty entry batches.
*/
default EmptyEntriesPolicy getEmptyEntriesPolicy() {
return EmptyEntriesPolicy.COMMIT;
// ContinuousBackupReplicationEndpoint return config value CONF_BACKUP_MAX_WAL_SIZE because
// WAL entries are buffered before flushing WAL backup file. For other ReplicationEndpoint
// we return -1 because no buffering is required, so everytime a WALEntryBatch is shipped, we
// update replication offset. Please check ReplicationSourceShipper#shouldFlushStagedWal()
default long getMaxBufferSize() {
return -1;
}

// ContinuousBackupReplicationEndpoint return config value CONF_STAGED_WAL_FLUSH_INTERVAL because
// WAL entries are buffered before flushing WAL backup file. For other ReplicationEndpoint
// we return Long.MAX_VALUE because no buffering is required, so everytime a WALEntryBatch is
// shipped, we update replication offset. Please check
// ReplicationSourceShipper#shouldFlushStagedWal()
default long maxFlushInterval() {
return Long.MAX_VALUE;
}

// Used in ContinuousBackupReplicationEndpoint to flush WAL backup files
default void beforePersistingReplicationOffset() {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@ private void checkCell(Cell cell) {
}

@Override
public ReplicationResult replicate(ReplicateContext replicateContext) {
public boolean replicate(ReplicateContext replicateContext) {
replicateContext.entries.stream().map(WAL.Entry::getEdit).flatMap(e -> e.getCells().stream())
.forEach(this::checkCell);
return ReplicationResult.COMMITTED;
return true;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationResult;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
Expand Down Expand Up @@ -425,7 +424,7 @@ private long parallelReplicate(ReplicateContext replicateContext, List<List<Entr
* Do the shipping logic
*/
@Override
public ReplicationResult replicate(ReplicateContext replicateContext) {
public boolean replicate(ReplicateContext replicateContext) {
int sleepMultiplier = 1;
int initialTimeout = replicateContext.getTimeout();

Expand All @@ -445,7 +444,7 @@ public ReplicationResult replicate(ReplicateContext replicateContext) {
lastSinkFetchTime = EnvironmentEdgeManager.currentTime();
}
sleepForRetries("No sinks available at peer", sleepMultiplier);
return ReplicationResult.FAILED;
return false;
}

List<List<Entry>> batches = createBatches(replicateContext.getEntries());
Expand All @@ -459,7 +458,7 @@ public ReplicationResult replicate(ReplicateContext replicateContext) {
try {
// replicate the batches to sink side.
parallelReplicate(replicateContext, batches);
return ReplicationResult.COMMITTED;
return true;
} catch (IOException ioe) {
if (ioe instanceof RemoteException) {
if (dropOnDeletedTables && isTableNotFoundException(ioe)) {
Expand All @@ -468,14 +467,14 @@ public ReplicationResult replicate(ReplicateContext replicateContext) {
batches = filterNotExistTableEdits(batches);
if (batches.isEmpty()) {
LOG.warn("After filter not exist table's edits, 0 edits to replicate, just return");
return ReplicationResult.COMMITTED;
return true;
}
} else if (dropOnDeletedColumnFamilies && isNoSuchColumnFamilyException(ioe)) {
batches = filterNotExistColumnFamilyEdits(batches);
if (batches.isEmpty()) {
LOG.warn("After filter not exist column family's edits, 0 edits to replicate, "
+ "just return");
return ReplicationResult.COMMITTED;
return true;
}
} else {
LOG.warn("{} Peer encountered RemoteException, rechecking all sinks: ", logPeerId(),
Expand Down Expand Up @@ -507,7 +506,7 @@ public ReplicationResult replicate(ReplicateContext replicateContext) {
}
}
}
return ReplicationResult.FAILED; // in case we exited before replicating
return false; // in case we exited before replicating
}

protected boolean isPeerEnabled() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
import org.apache.hadoop.hbase.replication.ReplicationQueueData;
import org.apache.hadoop.hbase.replication.ReplicationQueueId;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationResult;
import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.util.Bytes;
Expand Down Expand Up @@ -865,32 +864,4 @@ public String logPeerId() {
public long getTotalReplicatedEdits() {
return totalReplicatedEdits.get();
}

@Override
public void logPositionAndCleanOldLogs(WALEntryBatch entryBatch, ReplicationResult replicated) {
String walName = entryBatch.getLastWalPath().getName();
String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(walName);

synchronized (lastEntryBatch) { // Synchronize addition and processing
lastEntryBatch.put(walPrefix, entryBatch);

if (replicated == ReplicationResult.COMMITTED) {
processAndClearEntries();
}
}
}

public void persistOffsets() {
synchronized (lastEntryBatch) {
processAndClearEntries();
}
}

private void processAndClearEntries() {
// Process all entries
lastEntryBatch
.forEach((prefix, batch) -> getSourceManager().logPositionAndCleanOldLogs(this, batch));
// Clear all processed entries
lastEntryBatch.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.hadoop.hbase.replication.ReplicationQueueData;
import org.apache.hadoop.hbase.replication.ReplicationQueueId;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationResult;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.yetus.audience.InterfaceAudience;
Expand Down Expand Up @@ -208,11 +207,7 @@ default boolean isRecovered() {
* @param entryBatch the wal entry batch we just shipped
* @return The instance of queueStorage used by this ReplicationSource.
*/
default void logPositionAndCleanOldLogs(WALEntryBatch entryBatch, ReplicationResult replicated) {

}

default public void persistOffsets() {

default void logPositionAndCleanOldLogs(WALEntryBatch entryBatch) {
getSourceManager().logPositionAndCleanOldLogs(this, entryBatch);
}
}
Loading