Skip to content

Commit

Permalink
#6 Add RotationCallback#onOpen() method support.
Browse files Browse the repository at this point in the history
  • Loading branch information
vy committed Nov 26, 2019
1 parent 29c7e6d commit 02f8c2e
Show file tree
Hide file tree
Showing 5 changed files with 377 additions and 82 deletions.
6 changes: 6 additions & 0 deletions src/main/java/com/vlkan/rfos/LoggingRotationCallback.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.OutputStream;
import java.time.Instant;

public class LoggingRotationCallback implements RotationCallback {
Expand All @@ -42,6 +43,11 @@ public void onTrigger(RotationPolicy policy, Instant instant) {
LOGGER.debug("rotation trigger {policy={}, instant={}}", policy, instant);
}

@Override
public void onOpen(RotationPolicy policy, Instant instant, OutputStream ignored) {
LOGGER.debug("file open {policy={}, instant={}}", policy, instant);
}

@Override
public void onSuccess(RotationPolicy policy, Instant instant, File file) {
LOGGER.debug("rotation success {policy={}, instant={}, file={}}", policy, instant, file);
Expand Down
7 changes: 4 additions & 3 deletions src/main/java/com/vlkan/rfos/RotatingFileOutputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public RotatingFileOutputStream(RotationConfig config) {
this.config = config;
this.runningThreads = Collections.synchronizedList(new LinkedList<>());
this.writeSensitivePolicies = collectWriteSensitivePolicies(config.getPolicies());
this.stream = open();
this.stream = open(null, config.getClock().now());
startPolicies();
}

Expand All @@ -61,9 +61,10 @@ private void startPolicies() {
}
}

private ByteCountingOutputStream open() {
private ByteCountingOutputStream open(RotationPolicy policy, Instant instant) {
try {
FileOutputStream fileOutputStream = new FileOutputStream(config.getFile(), config.isAppend());
config.getCallback().onOpen(policy, instant, fileOutputStream);
long size = config.isAppend() ? config.getFile().length() : 0;
return new ByteCountingOutputStream(fileOutputStream, size);
} catch (IOException error) {
Expand Down Expand Up @@ -108,7 +109,7 @@ private synchronized void unsafeRotate(RotationPolicy policy, Instant instant) t

// Re-open the file.
LOGGER.debug("re-opening file {file={}}", config.getFile());
stream = open();
stream = open(policy, instant);

// Compress the old file, if necessary.
if (config.isCompress()) {
Expand Down
9 changes: 9 additions & 0 deletions src/main/java/com/vlkan/rfos/RotationCallback.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.vlkan.rfos.policy.RotationPolicy;

import java.io.File;
import java.io.OutputStream;
import java.time.Instant;

public interface RotationCallback {
Expand All @@ -28,6 +29,14 @@ public interface RotationCallback {
*/
void onTrigger(RotationPolicy policy, Instant instant);

/**
* Triggered by {@link RotatingFileOutputStream} either at start or during
* rotation. At start, {@code policy} argument will be null. During rotation,
* the callback will be awaited to complete the rotation, hence make sure
* that the method doesn't block.
*/
void onOpen(RotationPolicy policy, Instant instant, OutputStream stream);

/**
* Triggered by {@link RotatingFileOutputStream} after a successful rotation.
* Note that the callback will be awaited to complete the rotation, hence make
Expand Down
129 changes: 129 additions & 0 deletions src/test/java/com/vlkan/rfos/RecordingRotationCallback.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package com.vlkan.rfos;

import com.vlkan.rfos.policy.RotationPolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.OutputStream;
import java.time.Instant;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

class RecordingRotationCallback implements RotationCallback {

private static final Logger LOGGER = LoggerFactory.getLogger(RecordingRotationCallback.class);

interface CallContext {}

final BlockingQueue<CallContext> receivedCallContexts;

RecordingRotationCallback(int receivedCallContextCapacity) {
this.receivedCallContexts = new LinkedBlockingQueue<>(receivedCallContextCapacity);
}

static final class OnTriggerContext implements CallContext {

final RotationPolicy policy;

final Instant instant;

private OnTriggerContext(RotationPolicy policy, Instant instant) {
this.policy = policy;
this.instant = instant;
}

}

@Override
public void onTrigger(RotationPolicy policy, Instant instant) {
LOGGER.trace("onTrigger({}, {})", policy, instant);
try {
receivedCallContexts.put(new OnTriggerContext(policy, instant));
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
}
}

static final class OnOpenContext implements CallContext {

final RotationPolicy policy;

final Instant instant;

final OutputStream outputStream;

private OnOpenContext(RotationPolicy policy, Instant instant, OutputStream outputStream) {
this.policy = policy;
this.instant = instant;
this.outputStream = outputStream;
}

}

@Override
public void onOpen(RotationPolicy policy, Instant instant, OutputStream outputStream) {
LOGGER.trace("onOpen({}, {})", policy, instant);
try {
receivedCallContexts.put(new OnOpenContext(policy, instant, outputStream));
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
}
}

static final class OnSuccessContext implements CallContext {

final RotationPolicy policy;

final Instant instant;

final File file;

private OnSuccessContext(RotationPolicy policy, Instant instant, File file) {
this.policy = policy;
this.instant = instant;
this.file = file;
}

}

@Override
public void onSuccess(RotationPolicy policy, Instant instant, File file) {
LOGGER.trace("onSuccess({}, {}, {})", policy, instant, file);
try {
receivedCallContexts.put(new OnSuccessContext(policy, instant, file));
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
}
}

static final class OnFailureContext implements CallContext {

final RotationPolicy policy;

final Instant instant;

final File file;

final Exception error;

private OnFailureContext(RotationPolicy policy, Instant instant, File file, Exception error) {
this.policy = policy;
this.instant = instant;
this.file = file;
this.error = error;
}

}

@Override
public void onFailure(RotationPolicy policy, Instant instant, File file, Exception error) {
LOGGER.trace("onFailure({}, {}, {}, {})", policy, instant, file, error);
try {
receivedCallContexts.put(new OnFailureContext(policy, instant, file, error));
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
}
}

}
Loading

0 comments on commit 02f8c2e

Please sign in to comment.