Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-13831 #8563

Closed
wants to merge 25 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
d5976e5
IGNITE-13831 wip
tkalkirill Dec 9, 2020
892def9
IGNITE-13831 wip add archiveSize
tkalkirill Dec 10, 2020
cc1564f
IGNITE-13831 wip move clean wal archive to rollOver
tkalkirill Dec 10, 2020
380b672
IGNITE-13831 wip add reserved wal archive size and some test
tkalkirill Dec 10, 2020
4aaef36
IGNITE-13831 wip fix FileDecomressor
tkalkirill Dec 11, 2020
968b4a4
IGNITE-13831 wip fix clean wal archive and tests
tkalkirill Dec 11, 2020
1cae418
IGNITE-13831 wip fix error tmp file for archiver
tkalkirill Dec 11, 2020
9279cf7
IGNITE-13831 wip
tkalkirill Dec 11, 2020
b057c15
IGNITE-13831 wip fix test
tkalkirill Dec 11, 2020
0d12527
Merge branch 'ai-master' into ignite-13831
tkalkirill Dec 14, 2020
bc51b9d
IGNITE-13831 fix after merge master
tkalkirill Dec 14, 2020
670ad93
IGNITE-13831 wip some minor fix
tkalkirill Dec 14, 2020
626599e
IGNITE-13831 wip
tkalkirill Dec 14, 2020
80ffe09
Merge branch 'ai-master' into ignite-13831
tkalkirill Dec 15, 2020
7b35509
IGNITE-13831 wip add SegmentArchiveSizeStorage
tkalkirill Dec 15, 2020
99151df
IGNITE-13831 wip add FileCleaner
tkalkirill Dec 16, 2020
17f4a4b
IGNITE-13831 wip
tkalkirill Dec 16, 2020
2c3cc8d
IGNITE-13831 wip
tkalkirill Dec 16, 2020
0ae12d6
IGNITE-13831 wip add SegmentTruncateStorage
tkalkirill Dec 17, 2020
93b5354
IGNITE-13831 wip modify clean logic
tkalkirill Dec 17, 2020
eb17323
Merge branch 'ai-master' into ignite-13831
tkalkirill Dec 17, 2020
058b134
IGNITE-13831 wip fix after merge
tkalkirill Dec 17, 2020
a0d342b
Merge branch 'ai-master' into ignite-13831
tkalkirill Dec 17, 2020
ae82e60
IGNITE-13831 wip after merge
tkalkirill Dec 17, 2020
87a0e8b
IGNITE-13831 wip little changes
tkalkirill Dec 21, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -185,11 +185,6 @@ public WALIterator replay(
*/
public long lastCompactedSegment();

/**
* @return Max allowed index of archived segment to delete or -1 if it does not exist.
*/
public long maxArchivedSegmentToDelete();

/**
* Checks if WAL segment is under lock or reserved
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2743,11 +2743,12 @@ private byte convertToTxState(TransactionState state) {
}

/**
* Wal truncate callBack.
* Wal truncate callback.
*
* @param highBound WALPointer.
* @param highBound Upper bound.
* @throws IgniteCheckedException If failed.
*/
public void onWalTruncated(WALPointer highBound) throws IgniteCheckedException {
public void onWalTruncated(@Nullable WALPointer highBound) throws IgniteCheckedException {
checkpointManager.removeCheckpointsUntil(highBound);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx;
import org.apache.ignite.internal.util.GridConcurrentMultiPairQueue;
import org.apache.ignite.lang.IgniteBiTuple;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

/**
Expand All @@ -34,26 +33,25 @@ class Checkpoint {
/** Checkpoint pages. */
final GridConcurrentMultiPairQueue<PageMemoryEx, FullPageId> cpPages;

/** */
/** Checkpoint progress status. */
final CheckpointProgressImpl progress;

/** Number of deleted WAL files. */
int walFilesDeleted;

/** WAL segments fully covered by this checkpoint. */
IgniteBiTuple<Long, Long> walSegsCoveredRange;

/** */
/** Number of dirty pages. */
final int pagesSize;

/**
* Constructor.
*
* @param cpEntry Checkpoint entry.
* @param cpPages Pages to write to the page store.
* @param progress Checkpoint progress status.
*/
Checkpoint(
@Nullable CheckpointEntry cpEntry,
@NotNull GridConcurrentMultiPairQueue<PageMemoryEx, FullPageId> cpPages,
GridConcurrentMultiPairQueue<PageMemoryEx, FullPageId> cpPages,
CheckpointProgressImpl progress
) {
this.cpEntry = cpEntry;
Expand All @@ -70,13 +68,6 @@ public boolean hasDelta() {
return pagesSize != 0;
}

/**
* @param walFilesDeleted Wal files deleted.
*/
public void walFilesDeleted(int walFilesDeleted) {
this.walFilesDeleted = walFilesDeleted;
}

/**
* @param walSegsCoveredRange WAL segments fully covered by this checkpoint.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,22 +87,26 @@ public class CheckpointHistory {
* Constructor.
*
* @param dsCfg Data storage configuration.
* @param wal Write ahead log.
* @param logFun Function for getting a logger.
* @param wal WAL manager.
* @param inapplicable Checkpoint inapplicable filter.
*/
CheckpointHistory(
DataStorageConfiguration dsCfg,
Function<Class<?>, IgniteLogger> logger,
Function<Class<?>, IgniteLogger> logFun,
IgniteWriteAheadLogManager wal,
IgniteThrowableBiPredicate<Long, Integer> inapplicable
) {
this.log = logger.apply(getClass());
this.log = logFun.apply(getClass());
this.wal = wal;
this.checkpointInapplicable = inapplicable;

isWalTruncationEnabled = dsCfg.getMaxWalArchiveSize() != DataStorageConfiguration.UNLIMITED_WAL_ARCHIVE;

maxCpHistMemSize = IgniteSystemProperties.getInteger(IGNITE_PDS_MAX_CHECKPOINT_MEMORY_HISTORY_SIZE, DFLT_PDS_MAX_CHECKPOINT_MEMORY_HISTORY_SIZE);
maxCpHistMemSize = IgniteSystemProperties.getInteger(
IGNITE_PDS_MAX_CHECKPOINT_MEMORY_HISTORY_SIZE,
DFLT_PDS_MAX_CHECKPOINT_MEMORY_HISTORY_SIZE
);

reservationDisabled = dsCfg.getWalMode() == WALMode.NONE;
}
Expand Down Expand Up @@ -228,7 +232,7 @@ private void updateEarliestCpMap(CheckpointEntry entry) {
addCpGroupStatesToEarliestCpMap(entry, states);
}
catch (IgniteCheckedException ex) {
U.warn(log, "Failed to process checkpoint: " + (entry != null ? entry : "none"), ex);
U.warn(log, "Failed to process checkpoint: " + entry, ex);

earliestCp.clear();
}
Expand Down Expand Up @@ -314,6 +318,7 @@ public boolean hasSpace() {
/**
* Clears checkpoint history after WAL truncation.
*
* @param highBound Upper bound.
* @return List of checkpoint entries removed from history.
*/
public List<CheckpointEntry> onWalTruncated(WALPointer highBound) {
Expand Down Expand Up @@ -391,100 +396,13 @@ private boolean removeCheckpoint(CheckpointEntry checkpoint) {
/**
* Logs and clears checkpoint history after checkpoint finish.
*
* @param chp Finished checkpoint.
* @return List of checkpoints removed from history.
*/
public List<CheckpointEntry> onCheckpointFinished(Checkpoint chp) {
chp.walSegsCoveredRange(calculateWalSegmentsCovered());

int removeCount = isWalTruncationEnabled
? checkpointCountUntilDeleteByArchiveSize()
: (histMap.size() - maxCpHistMemSize);

if (removeCount <= 0)
return Collections.emptyList();

List<CheckpointEntry> deletedCheckpoints = removeCheckpoints(removeCount);

if (isWalTruncationEnabled) {
int deleted = wal.truncate(firstCheckpointPointer());

chp.walFilesDeleted(deleted);
}

return deletedCheckpoints;
}

/**
* @param first One of pointers to choose the newest.
* @param second One of pointers to choose the newest.
* @return The newest pointer from input ones.
*/
private WALPointer newerPointer(WALPointer first, WALPointer second) {
if (first == null)
return second;

if (second == null)
return first;

return first.index() > second.index() ? first : second;
}

/**
* Calculate mark until delete by maximum checkpoint history memory size.
*
* @return Checkpoint mark until which checkpoints can be deleted(not including this pointer).
*/
private WALPointer checkpointMarkUntilDeleteByMemorySize() {
if (histMap.size() <= maxCpHistMemSize)
return null;

int calculatedCpHistSize = maxCpHistMemSize;

for (Map.Entry<Long, CheckpointEntry> entry : histMap.entrySet()) {
if (histMap.size() <= calculatedCpHistSize++)
return entry.getValue().checkpointMark();
}

return lastCheckpoint().checkpointMark();
}

/**
* Calculate count of checkpoints to delete by maximum allowed archive size.
*
* @return Checkpoint count to be deleted.
*/
private int checkpointCountUntilDeleteByArchiveSize() {
long absFileIdxToDel = wal.maxArchivedSegmentToDelete();

if (absFileIdxToDel < 0)
return 0;

long fileUntilDel = absFileIdxToDel + 1;

long checkpointFileIdx = absFileIdx(lastCheckpoint());

int countToRemove = 0;

for (CheckpointEntry cpEntry : histMap.values()) {
long currFileIdx = absFileIdx(cpEntry);

if (checkpointFileIdx <= currFileIdx || fileUntilDel <= currFileIdx)
return countToRemove;

countToRemove++;
}

return histMap.size() - 1;
}

/**
* Retrieve absolute file index by checkpoint entry.
*
* @param pointer checkpoint entry for which need to calculate absolute file index.
* @return absolute file index for given checkpoint entry.
*/
private long absFileIdx(CheckpointEntry pointer) {
return pointer.checkpointMark().index();
return removeCheckpoints(isWalTruncationEnabled ? 0 : histMap.size() - maxCpHistMemSize);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,11 +300,12 @@ public void initializeStorage() throws IgniteCheckedException {
}

/**
* Wal truncate callBack.
* Wal truncate callback.
*
* @param highBound WALPointer.
* @param highBound Upper bound.
* @throws IgniteCheckedException If failed.
*/
public void removeCheckpointsUntil(WALPointer highBound) throws IgniteCheckedException {
public void removeCheckpointsUntil(@Nullable WALPointer highBound) throws IgniteCheckedException {
checkpointMarkersStorage.removeCheckpointsUntil(highBound);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,24 +141,28 @@ public void initialize() throws IgniteCheckedException {
}

/**
* Wal truncate callBack.
* Wal truncate callback.
*
* @param highBound WALPointer.
* @param highBound Upper bound.
* @throws IgniteCheckedException If failed.
*/
public void removeCheckpointsUntil(WALPointer highBound) throws IgniteCheckedException {
List<CheckpointEntry> removedFromHistory = history().onWalTruncated(highBound);
public void removeCheckpointsUntil(@Nullable WALPointer highBound) throws IgniteCheckedException {
List<CheckpointEntry> rmvFromHist = history().onWalTruncated(highBound);

for (CheckpointEntry cp : removedFromHistory)
for (CheckpointEntry cp : rmvFromHist)
removeCheckpointFiles(cp);
}

/**
* Logs and clears checkpoint history after checkpoint finish.
*
* @param chp Finished checkpoint.
* @throws IgniteCheckedException If failed.
*/
public void onCheckpointFinished(Checkpoint chp) throws IgniteCheckedException {
List<CheckpointEntry> removedFromHistory = history().onCheckpointFinished(chp);
List<CheckpointEntry> rmvFromHist = history().onCheckpointFinished(chp);

for (CheckpointEntry cp : removedFromHistory)
for (CheckpointEntry cp : rmvFromHist)
removeCheckpointFiles(cp);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -456,16 +456,12 @@ private void doCheckpoint() {

if (chp.hasDelta() || destroyedPartitionsCnt > 0) {
if (log.isInfoEnabled()) {
String walSegsCoveredMsg = chp.walSegsCoveredRange == null ? "" : prepareWalSegsCoveredMsg(chp.walSegsCoveredRange);

log.info(String.format("Checkpoint finished [cpId=%s, pages=%d, markPos=%s, " +
"walSegmentsCleared=%d, walSegmentsCovered=%s, markDuration=%dms, pagesWrite=%dms, fsync=%dms, " +
"total=%dms]",
"walSegmentsCovered=%s, markDuration=%dms, pagesWrite=%dms, fsync=%dms, total=%dms]",
chp.cpEntry != null ? chp.cpEntry.checkpointId() : "",
chp.pagesSize,
chp.cpEntry != null ? chp.cpEntry.checkpointMark() : "",
chp.walFilesDeleted,
walSegsCoveredMsg,
walRangeStr(chp.walSegsCoveredRange),
tracker.markDuration(),
tracker.pagesWriteDuration(),
tracker.fsyncDuration(),
Expand Down Expand Up @@ -591,9 +587,15 @@ private void updateMetrics(Checkpoint chp, CheckpointMetricsTracker tracker) {
}

/**
* Creates a string of a range WAL segments.
*
* @param walRange Range of WAL segments.
* @return The message about how many WAL segments was between previous checkpoint and current one.
*/
private String prepareWalSegsCoveredMsg(IgniteBiTuple<Long, Long> walRange) {
private String walRangeStr(@Nullable IgniteBiTuple<Long, Long> walRange) {
if (walRange == null)
return "";

String res;

long startIdx = walRange.get1();
Expand Down
Loading