Skip to content

Commit

Permalink
Issue #9270 . Ensure that file writes are finished on close.
Browse files Browse the repository at this point in the history
  • Loading branch information
andrii0lomakin committed Jul 15, 2020
1 parent 5322245 commit 98a2618
Showing 1 changed file with 17 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import java.util.NavigableSet;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
Expand Down Expand Up @@ -191,7 +192,7 @@ public final class CASDiskWriteAheadLog implements OWriteAheadLog {
private final AtomicReference<WrittenUpTo> writtenUpTo = new AtomicReference<>();
private long segmentId = -1;

private volatile ScheduledFuture<?> recordsWriterFuture;
private final ScheduledFuture<?> recordsWriterFuture;

private final Path masterRecordPath;

Expand Down Expand Up @@ -223,8 +224,6 @@ public final class CASDiskWriteAheadLog implements OWriteAheadLog {
private final int fsyncInterval;
private volatile long segmentAdditionTs;

private final int commitDelay;

private long currentPosition = 0;

private boolean useFirstBuffer = true;
Expand Down Expand Up @@ -261,8 +260,6 @@ public final class CASDiskWriteAheadLog implements OWriteAheadLog {

private long reportTs = -1;

private volatile boolean stopWrite = false;

public CASDiskWriteAheadLog(
final String storageName,
final Path storagePath,
Expand Down Expand Up @@ -393,8 +390,6 @@ public CASDiskWriteAheadLog(

writtenUpTo.set(new WrittenUpTo(new OLogSequenceNumber(currentSegment, 0), 0));

this.commitDelay = commitDelay;

writeBufferPointerOne = allocator.allocate(bufferSize1, blockSize, false);
writeBufferOne = writeBufferPointerOne.getNativeByteBuffer().order(ByteOrder.nativeOrder());
assert writeBufferOne.position() == 0;
Expand All @@ -406,8 +401,8 @@ public CASDiskWriteAheadLog(
log(new EmptyWALRecord());

this.recordsWriterFuture =
commitExecutor.schedule(
new RecordsWriter(false, false, true), commitDelay, TimeUnit.MILLISECONDS);
commitExecutor.scheduleWithFixedDelay(
new RecordsWriter(false, false), commitDelay, commitDelay, TimeUnit.MILLISECONDS);

flush();
}
Expand Down Expand Up @@ -1511,16 +1506,18 @@ public void close(final boolean flush) throws IOException {
doFlush(true);
}

stopWrite = true;
if (!recordsWriterFuture.cancel(false) && !recordsWriterFuture.isDone()) {
throw new OStorageException("Can not cancel background write thread in WAL");
}

if (recordsWriterFuture != null) {
try {
recordsWriterFuture.get();
} catch (InterruptedException | ExecutionException e) {
throw OException.wrapException(
new OStorageException("Error during writing of WAL records in storage " + storageName),
e);
}
try {
recordsWriterFuture.get();
} catch (CancellationException e) {
// ignore, we canceled scheduled execution
} catch (InterruptedException | ExecutionException e) {
throw OException.wrapException(
new OStorageException("Error during writing of WAL records in storage " + storageName),
e);
}

if (writeFuture != null) {
Expand Down Expand Up @@ -1641,7 +1638,7 @@ public void addSegmentOverflowListener(final SegmentOverflowListener listener) {
}

private void doFlush(final boolean forceSync) {
final Future<?> future = commitExecutor.submit(new RecordsWriter(forceSync, true, false));
final Future<?> future = commitExecutor.submit(new RecordsWriter(forceSync, true));
try {
future.get();
} catch (final Exception e) {
Expand Down Expand Up @@ -1942,19 +1939,14 @@ private String getSegmentName(final long segment) {
private final class RecordsWriter implements Runnable {
private final boolean forceSync;
private final boolean fullWrite;
private final boolean reschedule;

private RecordsWriter(final boolean forceSync, final boolean fullWrite, boolean reschedule) {
private RecordsWriter(final boolean forceSync, final boolean fullWrite) {
this.forceSync = forceSync;
this.fullWrite = fullWrite;
this.reschedule = reschedule;
}

@Override
public void run() {
if (stopWrite) {
return;
}
try {
if (printPerformanceStatistic) {
printReport();
Expand Down Expand Up @@ -2280,10 +2272,6 @@ assert record != null;
} catch (final RuntimeException | Error e) {
OLogManager.instance().errorNoDb(this, "Error during WAL writing", e);
throw e;
} finally {
if (reschedule && !stopWrite) {
recordsWriterFuture = commitExecutor.schedule(this, commitDelay, TimeUnit.MILLISECONDS);
}
}
}

Expand Down

0 comments on commit 98a2618

Please sign in to comment.