Skip to content
This repository was archived by the owner on Nov 14, 2024. It is now read-only.

Refactor Logger to SafeLogger, part 1 #5619

Merged
merged 11 commits into from
Sep 13, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,17 @@
import com.palantir.atlasdb.keyvalue.api.KeyValueService;
import com.palantir.atlasdb.keyvalue.api.TableReference;
import com.palantir.atlasdb.util.MetricsManager;
import com.palantir.logsafe.SafeArg;
import com.palantir.logsafe.logger.SafeLogger;
import com.palantir.logsafe.logger.SafeLoggerFactory;
import com.palantir.refreshable.Refreshable;
import com.palantir.timestamp.ManagedTimestampService;
import com.palantir.timestamp.TimestampStoreInvalidator;
import java.util.Optional;
import java.util.function.LongSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public interface AtlasDbFactory<MERGED_CONFIG extends KeyValueServiceConfig> {
Logger log = LoggerFactory.getLogger(AtlasDbFactory.class);
SafeLogger log = SafeLoggerFactory.get(AtlasDbFactory.class);

long NO_OP_FAST_FORWARD_TIMESTAMP = Long.MIN_VALUE + 1; // Note: Long.MIN_VALUE itself is not allowed.
boolean DEFAULT_INITIALIZE_ASYNC = false;
Expand Down Expand Up @@ -81,7 +82,9 @@ ManagedTimestampService createManagedTimestampService(
default TimestampStoreInvalidator createTimestampStoreInvalidator(
KeyValueService rawKvs, Optional<TableReference> tableReferenceOverride) {
return () -> {
log.warn("AtlasDB doesn't yet support automated migration for KVS type {}.", getType());
log.warn(
"AtlasDB doesn't yet support automated migration for KVS type {}.",
SafeArg.of("kvsType", getType()));
return NO_OP_FAST_FORWARD_TIMESTAMP;
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
import com.palantir.atlasdb.spi.KeyValueServiceRuntimeConfig;
import com.palantir.atlasdb.util.MetricsManager;
import com.palantir.atlasdb.versions.AtlasDbVersion;
import com.palantir.logsafe.SafeArg;
import com.palantir.logsafe.logger.SafeLogger;
import com.palantir.logsafe.logger.SafeLoggerFactory;
import com.palantir.refreshable.Refreshable;
import com.palantir.timestamp.ManagedTimestampService;
import com.palantir.timestamp.PersistentTimestampServiceImpl;
Expand All @@ -39,12 +42,10 @@
import java.util.Optional;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@AutoService(AtlasDbFactory.class)
public class CassandraAtlasDbFactory implements AtlasDbFactory<CassandraReloadableKvsConfig> {
private static Logger log = LoggerFactory.getLogger(CassandraAtlasDbFactory.class);
private static SafeLogger log = SafeLoggerFactory.get(CassandraAtlasDbFactory.class);
private CassandraKeyValueServiceRuntimeConfig latestValidRuntimeConfig =
CassandraKeyValueServiceRuntimeConfig.getDefault();

Expand Down Expand Up @@ -104,7 +105,7 @@ Supplier<CassandraKeyValueServiceRuntimeConfig> preprocessKvsRuntimeConfig(
"Invalid KeyValueServiceRuntimeConfig. Expected a KeyValueServiceRuntimeConfig of"
+ " type CassandraKeyValueServiceRuntimeConfig, found {}. Using latest valid"
+ " CassandraKeyValueServiceRuntimeConfig.",
config.getClass());
SafeArg.of("configClass", config.getClass()));
return latestValidRuntimeConfig;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public static TimestampBoundStore create(CassandraKeyValueService kvs, boolean i
private CassandraTimestampBoundStore(CassandraClientPool clientPool, CassandraKeyValueService kvs) {
DebugLogger.logger.info(
"Creating CassandraTimestampBoundStore object on thread {}. This should only happen once.",
Thread.currentThread().getName());
SafeArg.of("thread", Thread.currentThread().getName()));
this.clientPool = Preconditions.checkNotNull(clientPool, "clientPool cannot be null");
this.kvs = kvs;
}
Expand Down Expand Up @@ -125,7 +125,7 @@ public Long apply(CassandraClient client) {
if (result == null) {
DebugLogger.logger.info(
"[GET] Null result, setting timestamp limit to {}",
CassandraTimestampUtils.INITIAL_VALUE);
SafeArg.of("newLimit", CassandraTimestampUtils.INITIAL_VALUE));
cas(client, null, CassandraTimestampUtils.INITIAL_VALUE);
return CassandraTimestampUtils.INITIAL_VALUE;
}
Expand All @@ -152,14 +152,15 @@ private long extractUpperLimit(ColumnOrSuperColumn result) {
}
});

DebugLogger.logger.debug("[GET] Setting cached timestamp limit to {}.", currentLimit);
DebugLogger.logger.debug(
"[GET] Setting cached timestamp limit to {}.", SafeArg.of("currentLimit", currentLimit));
currentLimit = upperLimit;
return currentLimit;
}

@Override
public synchronized void storeUpperLimit(final long limit) {
DebugLogger.logger.debug("[PUT] Storing upper limit of {}.", limit);
DebugLogger.logger.debug("[PUT] Storing upper limit of {}.", SafeArg.of("limit", limit));

clientPool.runWithRetry(new FunctionCheckedException<CassandraClient, Void, RuntimeException>() {
@GuardedBy("CassandraTimestampBoundStore.this")
Expand All @@ -175,7 +176,10 @@ public Void apply(CassandraClient client) {
private void cas(CassandraClient client, Long oldVal, long newVal) {
final CASResult result;
try {
DebugLogger.logger.info("[CAS] Trying to set upper limit from {} to {}.", oldVal, newVal);
DebugLogger.logger.info(
"[CAS] Trying to set upper limit from {} to {}.",
SafeArg.of("oldLimit", oldVal),
SafeArg.of("newLimit", newVal));
result = client.cas(
AtlasDbConstants.TIMESTAMP_TABLE,
getRowName(),
Expand Down Expand Up @@ -226,7 +230,7 @@ private void cas(CassandraClient client, Long oldVal, long newVal) {
DebugLogger.logger.error("Thread dump: {}", SafeArg.of("threadDump", ThreadDumps.programmaticThreadDump()));
throw err;
} else {
DebugLogger.logger.debug("[CAS] Setting cached limit to {}.", newVal);
DebugLogger.logger.debug("[CAS] Setting cached limit to {}.", SafeArg.of("newLimit", newVal));
currentLimit = newVal;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import com.palantir.logsafe.Arg;
import com.palantir.logsafe.SafeArg;
import com.palantir.logsafe.UnsafeArg;
import com.palantir.logsafe.logger.SafeLogger;
import com.palantir.logsafe.logger.SafeLoggerFactory;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
Expand All @@ -51,12 +53,10 @@
import org.apache.cassandra.thrift.CqlResult;
import org.apache.cassandra.thrift.CqlRow;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CqlExecutorImpl implements CqlExecutor {
private final QueryExecutor queryExecutor;
private static final Logger log = LoggerFactory.getLogger(CqlExecutorImpl.class);
private static final SafeLogger log = SafeLoggerFactory.get(CqlExecutorImpl.class);

public interface QueryExecutor {
CqlResult execute(CqlQuery cqlQuery, byte[] rowHintForHostSelection);
Expand Down Expand Up @@ -151,7 +151,7 @@ private void scheduleSweepRowTask(
// RejectedExecutionException are expected.
// The executor is shutdown when we already fetched all the values we were interested
// for the current iteration.
log.trace("Rejecting row {} because executor is closed", rows.get(rowIndex), e);
log.trace("Rejecting row {} because executor is closed", UnsafeArg.of("row", rows.get(rowIndex)), e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These kind of args are crap anyway because we are logging a reference only. I guess therefore it could be also logged as safe, even though the intention was almost certainly to log the hex representation of the byte array (and therefore unsafe). So this is not wrong, but I would drive by fix it and log the actual array

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@
import com.palantir.common.base.Throwables;
import com.palantir.common.streams.KeyedStream;
import com.palantir.logsafe.SafeArg;
import com.palantir.logsafe.UnsafeArg;
import com.palantir.logsafe.exceptions.SafeIllegalStateException;
import com.palantir.logsafe.logger.SafeLogger;
import com.palantir.logsafe.logger.SafeLoggerFactory;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
Expand All @@ -63,12 +66,10 @@
import java.util.stream.Collectors;
import org.apache.cassandra.thrift.EndpointDetails;
import org.apache.cassandra.thrift.TokenRange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CassandraService implements AutoCloseable {
// TODO(tboam): keep logging on old class?
private static final Logger log = LoggerFactory.getLogger(CassandraService.class);
private static final SafeLogger log = SafeLoggerFactory.get(CassandraService.class);
private static final Interner<RangeMap<LightweightOppToken, List<InetSocketAddress>>> tokensInterner =
Interners.newWeakInterner();

Expand Down Expand Up @@ -341,7 +342,7 @@ public void debugLogStateOfPool() {
activeCheckouts > 0 ? Integer.toString(activeCheckouts) : "(unknown)",
totalAllowed > 0 ? Integer.toString(totalAllowed) : "(not bounded)"));
}
log.debug("Current pool state: {}", currentState.toString());
log.debug("Current pool state: {}", UnsafeArg.of("currentState", currentState.toString()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why unsafe?

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.palantir.atlasdb.keyvalue.api.RangeRequest;
import com.palantir.atlasdb.keyvalue.api.TableReference;
import com.palantir.atlasdb.keyvalue.api.Value;
import com.palantir.atlasdb.logging.LoggingArgs;
import com.palantir.atlasdb.schema.SweepSchema;
import com.palantir.atlasdb.schema.generated.SweepPriorityTable;
import com.palantir.atlasdb.schema.generated.SweepPriorityTable.SweepPriorityNamedColumn;
Expand All @@ -43,6 +44,8 @@
import com.palantir.logsafe.Preconditions;
import com.palantir.logsafe.SafeArg;
import com.palantir.logsafe.UnsafeArg;
import com.palantir.logsafe.logger.SafeLogger;
import com.palantir.logsafe.logger.SafeLoggerFactory;
import com.palantir.timestamp.TimestampService;
import java.util.Collection;
import java.util.Map;
Expand All @@ -55,8 +58,6 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This kvs wrapper tracks the approximate number of writes to every table
Expand All @@ -65,7 +66,7 @@
*/
public final class SweepStatsKeyValueService extends ForwardingKeyValueService {

private static final Logger log = LoggerFactory.getLogger(SweepStatsKeyValueService.class);
private static final SafeLogger log = SafeLoggerFactory.get(SweepStatsKeyValueService.class);
private static final int CLEAR_WEIGHT = 1 << 14; // 16384
private static final long FLUSH_DELAY_SECONDS = 42;

Expand Down Expand Up @@ -262,7 +263,7 @@ private void flushTask() {
}
} catch (Throwable t) {
if (!Thread.interrupted()) {
log.warn("Error occurred while flushing sweep stats: {}", t, t);
log.warn("Error occurred while flushing sweep stats: {}", UnsafeArg.of("throwable", t), t);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a benefit to attaching the throwable also as an arg?

}
}
}
Expand Down Expand Up @@ -311,7 +312,11 @@ private void flushWrites(Multiset<TableReference> writes, Set<TableReference> cl
.hydrateFromBytes(oldValue.getContents())
.getValue();
long newValue = clears.contains(tableRef) ? writes.count(tableRef) : oldCount + writes.count(tableRef);
log.debug("Sweep priority for {} has {} writes (was {})", tableRef, newValue, oldCount);
log.debug(
"Sweep priority for {} has {} writes (was {})",
LoggingArgs.tableRef(tableRef),
SafeArg.of("newValue", newValue),
SafeArg.of("oldCount", oldCount));
newWriteCounts.put(
cell, SweepPriorityTable.WriteCount.of(newValue).persistValue());
}
Expand All @@ -331,7 +336,11 @@ private void flushWrites(Multiset<TableReference> writes, Set<TableReference> cl
// ignore problems when sweep or transaction tables don't exist
log.warn("Ignoring failed sweep stats flush due to ", e);
}
log.warn("Unable to flush sweep stats for writes {} and clears {}: ", writes, clears, e);
log.warn(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could add a method to convert these in LoggingArgs?

"Unable to flush sweep stats for writes {} and clears {}: ",
UnsafeArg.of("writes", writes),
UnsafeArg.of("clears", clears),
e);
throw e;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.palantir.atlasdb.keyvalue.api.RowResult;
import com.palantir.atlasdb.keyvalue.api.TableReference;
import com.palantir.atlasdb.keyvalue.impl.Cells;
import com.palantir.atlasdb.logging.LoggingArgs;
import com.palantir.atlasdb.transaction.api.Transaction;
import com.palantir.atlasdb.transaction.api.TransactionManager;
import com.palantir.atlasdb.transaction.impl.TransactionConstants;
Expand All @@ -36,16 +37,18 @@
import com.palantir.common.base.AbortingVisitors;
import com.palantir.common.base.BatchingVisitable;
import com.palantir.common.collect.Maps2;
import com.palantir.logsafe.SafeArg;
import com.palantir.logsafe.UnsafeArg;
import com.palantir.logsafe.logger.SafeLogger;
import com.palantir.logsafe.logger.SafeLoggerFactory;
import com.palantir.util.Mutable;
import com.palantir.util.Mutables;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang3.mutable.MutableLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KvsRangeMigrator implements RangeMigrator {
private static final Logger log = LoggerFactory.getLogger(KvsRangeMigrator.class);
private static final SafeLogger log = SafeLoggerFactory.get(KvsRangeMigrator.class);

private final TableReference srcTable;
private final TableReference destTable;
Expand Down Expand Up @@ -92,15 +95,18 @@ private void logStatus(Transaction tx, int numRangeBoundaries) {
if (checkpoint != null) {
log.info(
"({}/{}) Migration from table {} to table {} will start/resume at {}",
rangeId,
numRangeBoundaries,
srcTable,
destTable,
PtBytes.encodeHexString(checkpoint));
SafeArg.of("rangeId", rangeId),
SafeArg.of("numRangeBoundaries", numRangeBoundaries),
LoggingArgs.tableRef("srcTable", srcTable),
LoggingArgs.tableRef("destTable", destTable),
UnsafeArg.of("checkpoint", PtBytes.encodeHexString(checkpoint)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(this is how you encode the hex string)

return;
}
}
log.info("Migration from table {} to {} has already been completed", srcTable, destTable);
log.info(
"Migration from table {} to {} has already been completed",
LoggingArgs.tableRef("srcTable", srcTable),
LoggingArgs.tableRef("destTable", destTable));
}

@Override
Expand Down Expand Up @@ -144,18 +150,26 @@ private byte[] copyOneTransactionInternal(RangeRequest range, long rangeId, Tran
if (log.isTraceEnabled()) {
log.trace(
"Copying table {} range {} from {} to {}",
srcTable,
rangeId,
BaseEncoding.base16().lowerCase().encode(rangeToUse.getStartInclusive()),
BaseEncoding.base16().lowerCase().encode(rangeToUse.getEndExclusive()));
LoggingArgs.tableRef("srcTable", srcTable),
SafeArg.of("rangeId", rangeId),
UnsafeArg.of(
"rangeStartInclusive",
BaseEncoding.base16().lowerCase().encode(rangeToUse.getStartInclusive())),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use the PtBytes method instead to clean up code

UnsafeArg.of(
"rangeStartExclusive",
BaseEncoding.base16().lowerCase().encode(rangeToUse.getEndExclusive())));
}

BatchingVisitable<RowResult<byte[]>> bv = readT.getRange(srcTable, rangeToUse);

Map<Cell, byte[]> writeMap = new HashMap<>();
byte[] lastRow = internalCopyRange(bv, maxBytes, writeMap);
if (log.isTraceEnabled() && (lastRow != null)) {
log.trace("Copying {} bytes for range {} on table {}", lastRow.length, rangeId, srcTable);
log.trace(
"Copying {} bytes for range {} on table {}",
SafeArg.of("lengths", lastRow.length),
SafeArg.of("rangeId", rangeId),
LoggingArgs.tableRef("srcTable", srcTable));
}
writeToKvs(writeMap);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,25 @@
import com.palantir.atlasdb.keyvalue.api.RowResult;
import com.palantir.atlasdb.keyvalue.api.TableReference;
import com.palantir.atlasdb.keyvalue.impl.Cells;
import com.palantir.atlasdb.logging.LoggingArgs;
import com.palantir.atlasdb.transaction.api.Transaction;
import com.palantir.atlasdb.transaction.api.TransactionManager;
import com.palantir.atlasdb.transaction.impl.TransactionConstants;
import com.palantir.common.annotation.Output;
import com.palantir.common.base.AbortingVisitor;
import com.palantir.common.base.AbortingVisitors;
import com.palantir.common.base.BatchingVisitable;
import com.palantir.logsafe.SafeArg;
import com.palantir.logsafe.UnsafeArg;
import com.palantir.logsafe.logger.SafeLogger;
import com.palantir.logsafe.logger.SafeLoggerFactory;
import com.palantir.util.Mutable;
import com.palantir.util.Mutables;
import java.util.Map;
import org.apache.commons.lang3.mutable.MutableLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TransactionRangeMigrator implements RangeMigrator {
private static final Logger log = LoggerFactory.getLogger(TransactionRangeMigrator.class);
private static final SafeLogger log = SafeLoggerFactory.get(TransactionRangeMigrator.class);

private final TableReference srcTable;
private final TableReference destTable;
Expand Down Expand Up @@ -79,15 +82,18 @@ private void logStatus(Transaction tx, int numRangeBoundaries) {
if (checkpoint != null) {
log.info(
"({}/{}) Migration from table {} to table {} will start/resume at {}",
rangeId,
numRangeBoundaries,
srcTable,
destTable,
PtBytes.encodeHexString(checkpoint));
SafeArg.of("rangeId", rangeId),
SafeArg.of("numRangeBoundaries", numRangeBoundaries),
LoggingArgs.tableRef("srcTable", srcTable),
LoggingArgs.tableRef("destTable", destTable),
UnsafeArg.of("checkpoint", PtBytes.encodeHexString(checkpoint)));
return;
}
}
log.info("Migration from table {} to {} has already been completed", srcTable, destTable);
log.info(
"Migration from table {} to {} has already been completed",
LoggingArgs.tableRef("srcTable", srcTable),
LoggingArgs.tableRef("destTable", destTable));
}

@Override
Expand Down
Loading