Skip to content

Commit

Permalink
Record metrics when INIT revision or an old revision is used (#874)
Browse files Browse the repository at this point in the history
Motivation:
Before we apply #681 that removes old commits, it would be nice if we could find any usages that access data with the INIT Revision or revisions that are more than 5000 (default minimum number of commit retentions) commits ahead from the head.

Modifications:
- Add a counter that is increased when INIT Revision or revisions that are more than 5000 ahead from the head are used.

Result:
- Track the number of usages.
- This commit will be reverted after we find the usage and fix it.
  • Loading branch information
minwoox authored Sep 4, 2023
1 parent 079daeb commit c0fdd1a
Show file tree
Hide file tree
Showing 8 changed files with 195 additions and 68 deletions.
3 changes: 3 additions & 0 deletions common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,8 @@ if (tasks.findByName('trimShadedJar')) {
keep "class com.linecorp.centraldogma.internal.shaded.caffeine.** { *; }"
// Prevent ProGuard from removing all enum values from Option because otherwise it becomes a non-enum class.
keep "class com.linecorp.centraldogma.internal.shaded.jsonpath.Option { *; }"

// Reduces the verbosity of ProGuardTask when running in parallel.
dontnote
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ void testSuccessMetrics() {
Mirror mirror = newMirror("git://a.com/b.git", GitMirror.class, "foo", "bar");
mirror = spy(mirror);
doNothing().when(mirror).mirror(any(), any(), anyInt(), anyLong());
new MirroringTask(mirror, meterRegistry).run(null, null, 0, 0L);
new MirroringTask(mirror, "foo", meterRegistry).run(null, null, 0, 0L);
assertThat(MoreMeters.measureAll(meterRegistry))
.contains(entry("mirroring.result#count{direction=LOCAL_TO_REMOTE,localPath=/," +
"localRepo=bar,remoteBranch=,remotePath=/,success=true}", 1.0));
"localRepo=bar,project=foo,remoteBranch=,remotePath=/,success=true}", 1.0));
}

@Test
Expand All @@ -58,12 +58,12 @@ void testFailureMetrics() {
mirror = spy(mirror);
final RuntimeException e = new RuntimeException();
doThrow(e).when(mirror).mirror(any(), any(), anyInt(), anyLong());
final MirroringTask task = new MirroringTask(mirror, meterRegistry);
final MirroringTask task = new MirroringTask(mirror, "foo", meterRegistry);
assertThatThrownBy(() -> task.run(null, null, 0, 0L))
.isSameAs(e);
assertThat(MoreMeters.measureAll(meterRegistry))
.contains(entry("mirroring.result#count{direction=LOCAL_TO_REMOTE,localPath=/," +
"localRepo=bar,remoteBranch=main,remotePath=/," +
"localRepo=bar,project=foo,remoteBranch=main,remotePath=/," +
"success=false}", 1.0));
}

Expand All @@ -76,11 +76,11 @@ void testTimerMetrics() {
Thread.sleep(1000);
return null;
}).when(mirror).mirror(any(), any(), anyInt(), anyLong());
new MirroringTask(mirror, meterRegistry).run(null, null, 0, 0L);
new MirroringTask(mirror, "foo", meterRegistry).run(null, null, 0, 0L);
assertThat(MoreMeters.measureAll(meterRegistry))
.hasEntrySatisfying(
"mirroring.task#total{direction=LOCAL_TO_REMOTE,localPath=/," +
"localRepo=bar,remoteBranch=,remotePath=/}",
"localRepo=bar,project=foo,remoteBranch=,remotePath=/}",
v -> assertThat(v).isCloseTo(1, withPercentage(30)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static com.linecorp.centraldogma.internal.Util.isValidFilePath;
import static com.linecorp.centraldogma.server.internal.api.DtoConverter.convert;
import static com.linecorp.centraldogma.server.internal.api.HttpApiUtil.returnOrThrow;
import static com.linecorp.centraldogma.server.internal.api.RepositoryServiceV1.increaseCounterIfOldRevisionUsed;
import static com.linecorp.centraldogma.server.internal.storage.repository.DefaultMetaRepository.metaRepoFiles;
import static java.util.Objects.requireNonNull;

Expand Down Expand Up @@ -114,11 +115,13 @@ public ContentServiceV1(ProjectManager projectManager, CommandExecutor executor,
* <p>Returns the list of files in the path.
*/
@Get("regex:/projects/(?<projectName>[^/]+)/repos/(?<repoName>[^/]+)/list(?<path>(|/.*))$")
public CompletableFuture<List<EntryDto<?>>> listFiles(@Param String path,
public CompletableFuture<List<EntryDto<?>>> listFiles(ServiceRequestContext ctx,
@Param String path,
@Param @Default("-1") String revision,
Repository repository) {
final String normalizedPath = normalizePath(path);
final Revision normalizedRev = repository.normalizeNow(new Revision(revision));
increaseCounterIfOldRevisionUsed(ctx, repository, normalizedRev);
final CompletableFuture<List<EntryDto<?>>> future = new CompletableFuture<>();
listFiles(repository, normalizedPath, normalizedRev, false, future);
return future;
Expand Down Expand Up @@ -214,12 +217,14 @@ private CompletableFuture<Revision> push(long commitTimeMills, Author author, Re
*/
@Post("/projects/{projectName}/repos/{repoName}/preview")
public CompletableFuture<Iterable<ChangeDto<?>>> preview(
ServiceRequestContext ctx,
@Param @Default("-1") String revision,
Repository repository,
@RequestConverter(ChangesRequestConverter.class) Iterable<Change<?>> changes) {

final Revision baseRevision = new Revision(revision);
increaseCounterIfOldRevisionUsed(ctx, repository, baseRevision);
final CompletableFuture<Map<String, Change<?>>> changesFuture =
repository.previewDiff(new Revision(revision), changes);
repository.previewDiff(baseRevision, changes);

return changesFuture.thenApply(previewDiffs -> previewDiffs.values().stream()
.map(DtoConverter::convert)
Expand All @@ -245,6 +250,7 @@ public CompletableFuture<?> getFiles(
Repository repository,
@RequestConverter(WatchRequestConverter.class) @Nullable WatchRequest watchRequest,
@RequestConverter(QueryRequestConverter.class) @Nullable Query<?> query) {
increaseCounterIfOldRevisionUsed(ctx, repository, new Revision(revision));
final String normalizedPath = normalizePath(path);

// watch repository or a file
Expand Down Expand Up @@ -325,7 +331,8 @@ private static Object handleWatchFailure(Throwable thrown) {
* specify {@code to}, this will return the list of commits.
*/
@Get("regex:/projects/(?<projectName>[^/]+)/repos/(?<repoName>[^/]+)/commits(?<revision>(|/.*))$")
public CompletableFuture<?> listCommits(@Param String revision,
public CompletableFuture<?> listCommits(ServiceRequestContext ctx,
@Param String revision,
@Param @Default("/**") String path,
@Param @Nullable String to,
@Param @Nullable Integer maxCommits,
Expand All @@ -346,6 +353,10 @@ public CompletableFuture<?> listCommits(@Param String revision,
}

final RevisionRange range = repository.normalizeNow(fromRevision, toRevision).toDescending();

increaseCounterIfOldRevisionUsed(ctx, repository, range.from());
increaseCounterIfOldRevisionUsed(ctx, repository, range.to());

final int maxCommits0 = firstNonNull(maxCommits, Repository.DEFAULT_MAX_COMMITS);
return repository
.history(range.from(), range.to(), normalizePath(path), maxCommits0)
Expand All @@ -368,17 +379,21 @@ public CompletableFuture<?> listCommits(@Param String revision,
*/
@Get("/projects/{projectName}/repos/{repoName}/compare")
public CompletableFuture<?> getDiff(
ServiceRequestContext ctx,
@Param @Default("/**") String pathPattern,
@Param @Default("1") String from, @Param @Default("head") String to,
Repository repository,
@RequestConverter(QueryRequestConverter.class) @Nullable Query<?> query) {

final Revision fromRevision = new Revision(from);
final Revision toRevision = new Revision(to);
increaseCounterIfOldRevisionUsed(ctx, repository, fromRevision);
increaseCounterIfOldRevisionUsed(ctx, repository, toRevision);
if (query != null) {
return repository.diff(new Revision(from), new Revision(to), query)
return repository.diff(fromRevision, toRevision, query)
.thenApply(DtoConverter::convert);
} else {
return repository
.diff(new Revision(from), new Revision(to), normalizePath(pathPattern))
.diff(fromRevision, toRevision, normalizePath(pathPattern))
.thenApply(changeMap -> changeMap.values().stream()
.map(DtoConverter::convert).collect(toImmutableList()));
}
Expand All @@ -402,9 +417,12 @@ private static <T> Object objectOrList(Collection<T> collection, boolean toList,
*/
@Get("/projects/{projectName}/repos/{repoName}/merge")
public <T> CompletableFuture<MergedEntryDto<T>> mergeFiles(
ServiceRequestContext ctx,
@Param @Default("-1") String revision, Repository repository,
@RequestConverter(MergeQueryRequestConverter.class) MergeQuery<T> query) {
return repository.mergeFiles(new Revision(revision), query).thenApply(DtoConverter::convert);
final Revision rev = new Revision(revision);
increaseCounterIfOldRevisionUsed(ctx, repository, rev);
return repository.mergeFiles(rev, query).thenApply(DtoConverter::convert);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.linecorp.centraldogma.server.internal.api;

import static com.google.common.base.MoreObjects.firstNonNull;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.linecorp.centraldogma.server.internal.api.HttpApiUtil.checkUnremoveArgument;
import static com.linecorp.centraldogma.server.internal.api.HttpApiUtil.returnOrThrow;
Expand All @@ -28,9 +29,11 @@
import javax.annotation.Nullable;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;

import com.linecorp.armeria.common.HttpStatus;
import com.linecorp.armeria.common.logging.RequestOnlyLog;
import com.linecorp.armeria.server.ServiceRequestContext;
import com.linecorp.armeria.server.annotation.Consumes;
import com.linecorp.armeria.server.annotation.Delete;
Expand Down Expand Up @@ -58,6 +61,8 @@
import com.linecorp.centraldogma.server.storage.project.ProjectManager;
import com.linecorp.centraldogma.server.storage.repository.Repository;

import io.micrometer.core.instrument.Tag;

/**
* Annotated service object for managing repositories.
*/
Expand Down Expand Up @@ -186,8 +191,48 @@ public CompletableFuture<RepositoryDto> patchRepository(@Param String repoName,
*/
@Get("/projects/{projectName}/repos/{repoName}/revision/{revision}")
@RequiresReadPermission
public Map<String, Integer> normalizeRevision(Repository repository, @Param String revision) {
public Map<String, Integer> normalizeRevision(ServiceRequestContext ctx,
Repository repository, @Param String revision) {
final Revision normalizedRevision = repository.normalizeNow(new Revision(revision));
final Revision head = repository.normalizeNow(Revision.HEAD);
increaseCounterIfOldRevisionUsed(ctx, repository, normalizedRevision, head);
return ImmutableMap.of("revision", normalizedRevision.major());
}

static void increaseCounterIfOldRevisionUsed(ServiceRequestContext ctx, Repository repository,
Revision revision) {
final Revision normalized = repository.normalizeNow(revision);
final Revision head = repository.normalizeNow(Revision.HEAD);
increaseCounterIfOldRevisionUsed(ctx, repository, normalized, head);
}

public static void increaseCounterIfOldRevisionUsed(
ServiceRequestContext ctx, Repository repository, Revision normalized, Revision head) {
final String projectName = repository.parent().name();
final String repoName = repository.name();
if (normalized.major() == 1) {
ctx.log().whenRequestComplete().thenAccept(
log -> ctx.meterRegistry()
.counter("revisions.init", generateTags(projectName, repoName, log).build())
.increment());
}
if (head.major() - normalized.major() >= 5000) {
ctx.log().whenRequestComplete().thenAccept(
log -> ctx.meterRegistry()
.summary("revisions.old",
generateTags(projectName, repoName, log)
.add(Tag.of("init", Boolean.toString(normalized.major() == 1)))
.build())
.record(head.major() - normalized.major()));
}
}

private static ImmutableList.Builder<Tag> generateTags(
String projectName, String repoName, RequestOnlyLog log) {
final ImmutableList.Builder<Tag> builder = ImmutableList.builder();
return builder.add(Tag.of("project", projectName),
Tag.of("repo", repoName),
Tag.of("service", firstNonNull(log.serviceName(), "none")),
Tag.of("method", log.name()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@
import java.io.File;
import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -188,35 +188,25 @@ private void schedulePendingMirrors() {
final ZonedDateTime currentLastExecutionTime = lastExecutionTime;
lastExecutionTime = now;

projectManager.list().values().stream()
.map(Project::metaRepo)
.flatMap(r -> {
projectManager.list()
.values()
.forEach(project -> {
final Set<Mirror> mirrors;
try {
return r.mirrors().stream();
mirrors = project.metaRepo().mirrors();
} catch (Exception e) {
logger.warn("Failed to load the mirror list from: {}", r.parent().name(), e);
return Stream.empty();
logger.warn("Failed to load the mirror list from: {}", project.name(), e);
return;
}
})
.filter(m -> {
try {
return m.nextExecutionTime(currentLastExecutionTime).compareTo(now) < 0;
} catch (Exception e) {
logger.warn("Failed to calculate the next execution time of: {}", m, e);
return false;
}
})
.forEach(m -> {
final ListenableFuture<?> future = worker.submit(() -> run(m, true));
Futures.addCallback(future, new FutureCallback<Object>() {
@Override
public void onSuccess(@Nullable Object result) {}

@Override
public void onFailure(Throwable cause) {
logger.warn("Unexpected Git mirroring failure: {}", m, cause);
mirrors.forEach(m -> {
try {
if (m.nextExecutionTime(currentLastExecutionTime).compareTo(now) < 0) {
run(project, m);
}
} catch (Exception e) {
logger.warn("Unexpected exception while mirroring: {}", m, e);
}
}, MoreExecutors.directExecutor());
});
});
}

Expand All @@ -229,14 +219,27 @@ public CompletableFuture<Void> mirror() {
return CompletableFuture.runAsync(
() -> projectManager.list().values()
.forEach(p -> p.metaRepo().mirrors()
.forEach(m -> run(m, false))),
.forEach(m -> run(m, p.name(), false))),
worker);
}

private void run(Mirror m, boolean logOnFailure) {
private void run(Project project, Mirror m) {
final ListenableFuture<?> future = worker.submit(() -> run(m, project.name(), true));
Futures.addCallback(future, new FutureCallback<Object>() {
@Override
public void onSuccess(@Nullable Object result) {}

@Override
public void onFailure(Throwable cause) {
logger.warn("Unexpected Git mirroring failure: {}", m, cause);
}
}, MoreExecutors.directExecutor());
}

private void run(Mirror m, String projectName, boolean logOnFailure) {
logger.info("Mirroring: {}", m);
try {
new MirroringTask(m, meterRegistry)
new MirroringTask(m, projectName, meterRegistry)
.run(workDir, commandExecutor, maxNumFilesPerMirror, maxNumBytesPerMirror);
} catch (Exception e) {
if (logOnFailure) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@

final class MirroringTask {

private static Iterable<Tag> generateTags(Mirror mirror) {
private static Iterable<Tag> generateTags(Mirror mirror, String projectName) {
return ImmutableList.of(
Tag.of("project", projectName),
Tag.of("direction", mirror.direction().name()),
Tag.of("remoteBranch", firstNonNull(mirror.remoteBranch(), "")),
Tag.of("remotePath", mirror.remotePath()),
Expand All @@ -44,10 +45,10 @@ private static Iterable<Tag> generateTags(Mirror mirror) {
private final Mirror mirror;
private final Iterable<Tag> tags;

MirroringTask(Mirror mirror, MeterRegistry meterRegistry) {
MirroringTask(Mirror mirror, String projectName, MeterRegistry meterRegistry) {
this.mirror = mirror;
this.meterRegistry = meterRegistry;
tags = generateTags(mirror);
tags = generateTags(mirror, projectName);
}

private Counter counter(boolean success) {
Expand Down
Loading

0 comments on commit c0fdd1a

Please sign in to comment.