Skip to content

Commit

Permalink
Run git gc periodically (#564)
Browse files Browse the repository at this point in the history
Motivation:

Central Dogma git repository does not run git gc automatically.
git gc will save disk storage and improve performance.

Modifications:

- Add `RepositoryGarbageCollectionPlugin` that schedules GC with a Quartz cron expression
- Add `RepositoryGarbageCollectionConfig` in order to configure a GC schedule and the minimum number of new commits for the GC
- Add `GitGcRevision` that reads and writes the last revision when GC was run. 

Result:

- You can now schedule git gc with a Quartz cron expression and the minimum number of new commits.
  ```json
  "repositoryGarbageCollection": {
    "minNumNewCommits": 1000,
    "schedule": "0 0 * * * ?"
  }
  ```
- Fixes #264
  • Loading branch information
ikhoon authored Mar 16, 2021
1 parent 50eba3a commit e6caf26
Show file tree
Hide file tree
Showing 13 changed files with 720 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,13 @@ public final class CentralDogmaBuilder {
private String sessionValidationSchedule = DEFAULT_SESSION_VALIDATION_SCHEDULE;
@Nullable
private Object authProviderProperties;

private int writeQuota;
private int timeWindowSeconds;

@Nullable
private RepositoryGarbageCollectionConfig repositoryGarbageCollection;

/**
* Creates a new builder with the specified data directory.
*/
Expand Down Expand Up @@ -520,6 +524,18 @@ public CentralDogmaBuilder writeQuotaPerRepository(int writeQuota, int timeWindo
return this;
}

/**
* Sets the minimum required number of commits newly added to run a garbage collection and the
* <a href="https://www.quartz-scheduler.org/documentation/quartz-2.3.0/tutorials/crontrigger.html">
* Quartz cron expression</a> when garbage collections is suppose to be triggered.
*/
public CentralDogmaBuilder repositoryGarbageCollection(int minNumNewCommits, String schedule) {
checkArgument(minNumNewCommits > 0, "minNumNewCommits: %s (expected: > 0)", minNumNewCommits);
requireNonNull(schedule, "schedule");
repositoryGarbageCollection = new RepositoryGarbageCollectionConfig(minNumNewCommits, schedule);
return this;
}

/**
* Returns a newly-created {@link CentralDogma} server.
*/
Expand Down Expand Up @@ -553,6 +569,6 @@ private CentralDogmaConfig buildConfig() {
maxRemovedRepositoryAgeMillis, gracefulShutdownTimeout,
webAppEnabled, webAppTitle, mirroringEnabled, numMirroringThreads,
maxNumFilesPerMirror, maxNumBytesPerMirror, replicationConfig,
null, accessLogFormat, authCfg, quotaConfig);
null, accessLogFormat, authCfg, quotaConfig, repositoryGarbageCollection);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,9 @@ public final class CentralDogmaConfig {
@Nullable
private final QuotaConfig writeQuotaPerRepository;

@Nullable
private final RepositoryGarbageCollectionConfig repositoryGarbageCollection;

CentralDogmaConfig(
@JsonProperty(value = "dataDir", required = true) File dataDir,
@JsonProperty(value = "ports", required = true)
Expand Down Expand Up @@ -165,7 +168,9 @@ public final class CentralDogmaConfig {
@JsonProperty("csrfTokenRequiredForThrift") @Nullable Boolean csrfTokenRequiredForThrift,
@JsonProperty("accessLogFormat") @Nullable String accessLogFormat,
@JsonProperty("authentication") @Nullable AuthConfig authConfig,
@JsonProperty("writeQuotaPerRepository") @Nullable QuotaConfig writeQuotaPerRepository) {
@JsonProperty("writeQuotaPerRepository") @Nullable QuotaConfig writeQuotaPerRepository,
@JsonProperty("repositoryGarbageCollection")
@Nullable RepositoryGarbageCollectionConfig repositoryGarbageCollection) {

this.dataDir = requireNonNull(dataDir, "dataDir");
this.ports = ImmutableList.copyOf(requireNonNull(ports, "ports"));
Expand Down Expand Up @@ -219,6 +224,7 @@ public final class CentralDogmaConfig {
ports.stream().anyMatch(ServerPort::hasProxyProtocol));

this.writeQuotaPerRepository = writeQuotaPerRepository;
this.repositoryGarbageCollection = repositoryGarbageCollection;
}

/**
Expand Down Expand Up @@ -456,6 +462,16 @@ public QuotaConfig writeQuotaPerRepository() {
return writeQuotaPerRepository;
}

/**
* Returns the {@link RepositoryGarbageCollectionConfig} for
* cleanuping unnecessary files and optimizing {@link Repository}s.
*/
@Nullable
@JsonProperty("repositoryGarbageCollection")
public RepositoryGarbageCollectionConfig repositoryGarbageCollection() {
return repositoryGarbageCollection;
}

@Override
public String toString() {
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright 2021 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.centraldogma.server;

import static com.google.common.base.MoreObjects.firstNonNull;
import static org.apache.curator.shaded.com.google.common.base.Preconditions.checkArgument;

import javax.annotation.Nullable;

import com.cronutils.model.Cron;
import com.cronutils.model.CronType;
import com.cronutils.model.definition.CronDefinitionBuilder;
import com.cronutils.parser.CronParser;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

import com.linecorp.centraldogma.server.storage.repository.Repository;

/**
* A configuration for {@link Repository} garbage collection.
*/
public final class RepositoryGarbageCollectionConfig {

private static final String DEFAULT_SCHEDULE = "0 0 * * * ?"; // Every day
private static final CronParser cronParser =
new CronParser(CronDefinitionBuilder.instanceDefinitionFor(CronType.QUARTZ));

private final int minNumNewCommits;
private final Cron schedule;

/**
* Creates a new instance.
*/
@JsonCreator
public RepositoryGarbageCollectionConfig(@JsonProperty("minNumNewCommits") int minNumNewCommits,
@JsonProperty("schedule") @Nullable String schedule) {
checkArgument(minNumNewCommits > 0, "minNumNewCommits: %s (expected: > 0)", minNumNewCommits);

this.minNumNewCommits = minNumNewCommits;
this.schedule = cronParser.parse(firstNonNull(schedule, DEFAULT_SCHEDULE));
}

/**
* Returns the minimum required number of commits newly added to run a garbage collection.
*/
public int minNumNewCommits() {
return minNumNewCommits;
}

/**
* Returns the schedule when garbage collections is supposed to be triggered.
*/
public Cron schedule() {
return schedule;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
/*
* Copyright 2021 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package com.linecorp.centraldogma.server.internal.storage;

import static java.util.Objects.requireNonNull;

import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import javax.annotation.Nullable;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.cronutils.model.time.ExecutionTime;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableScheduledFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;

import com.linecorp.armeria.common.util.TextFormatter;
import com.linecorp.centraldogma.common.Revision;
import com.linecorp.centraldogma.server.CentralDogmaConfig;
import com.linecorp.centraldogma.server.RepositoryGarbageCollectionConfig;
import com.linecorp.centraldogma.server.plugin.Plugin;
import com.linecorp.centraldogma.server.plugin.PluginContext;
import com.linecorp.centraldogma.server.plugin.PluginTarget;
import com.linecorp.centraldogma.server.storage.project.Project;
import com.linecorp.centraldogma.server.storage.project.ProjectManager;
import com.linecorp.centraldogma.server.storage.repository.Repository;

import io.netty.util.concurrent.DefaultThreadFactory;

public final class RepositoryGarbageCollectionPlugin implements Plugin {

private static final Logger logger =
LoggerFactory.getLogger(RepositoryGarbageCollectionPlugin.class);

@Nullable
private RepositoryGarbageCollectionConfig gcConfig;
@Nullable
private ListeningScheduledExecutorService gcWorker;
@Nullable
private ListenableScheduledFuture<?> scheduledFuture;

private volatile boolean stopping;

@Override
public PluginTarget target() {
return PluginTarget.ALL_REPLICAS;
}

@Override
public boolean isEnabled(CentralDogmaConfig config) {
return config.repositoryGarbageCollection() != null;
}

@Override
public synchronized CompletionStage<Void> start(PluginContext context) {
requireNonNull(context, "context");

initialize(context);
scheduleGc(context);

return CompletableFuture.completedFuture(null);
}

@VisibleForTesting
void initialize(PluginContext context) {
gcConfig = context.config().repositoryGarbageCollection();
gcWorker = MoreExecutors.listeningDecorator(Executors.newSingleThreadScheduledExecutor(
new DefaultThreadFactory("repository-gc-worker", true)));
}

private void scheduleGc(PluginContext context) {
if (stopping) {
return;
}

final Duration nextExecution = ExecutionTime.forCron(gcConfig.schedule())
.timeToNextExecution(ZonedDateTime.now());
scheduledFuture = gcWorker.schedule(() -> gc(context), nextExecution);

Futures.addCallback(scheduledFuture, new FutureCallback<Object>() {
@Override
public void onSuccess(@Nullable Object result) {}

@Override
public void onFailure(Throwable cause) {
if (!stopping) {
logger.warn("Repository gc scheduler stopped due to an unexpected exception:", cause);
}
}
}, gcWorker);
}

@Override
public synchronized CompletionStage<Void> stop(PluginContext context) {
stopping = true;
if (scheduledFuture != null) {
scheduledFuture.cancel(false);
}

try {
if (gcWorker != null && !gcWorker.isTerminated()) {
logger.info("Stopping the repository gc worker ..");
boolean interruptLater = false;
while (!gcWorker.isTerminated()) {
gcWorker.shutdownNow();
try {
gcWorker.awaitTermination(1, TimeUnit.SECONDS);
} catch (InterruptedException e) {
// Interrupt later.
interruptLater = true;
}
}
logger.info("Stopped the repository gc worker.");

if (interruptLater) {
Thread.currentThread().interrupt();
}
}
} catch (Throwable t) {
logger.warn("Failed to stop the repository gc worker:", t);
}
return CompletableFuture.completedFuture(null);
}

@VisibleForTesting
void gc(PluginContext context) {
if (stopping) {
return;
}

final ProjectManager pm = context.projectManager();
final Stopwatch stopwatch = Stopwatch.createUnstarted();
for (Project project : pm.list().values()) {
for (Repository repo : project.repos().list().values()) {
runGc(project, repo, stopwatch);
}
}

scheduleGc(context);
}

private void runGc(Project project, Repository repo, Stopwatch stopwatch) {
try {
if (!needsGc(repo)) {
return;
}

logger.info("Starting repository gc on {}/{} ..", project.name(), repo.name());
stopwatch.reset();
repo.gc();
final long elapsedNanos = stopwatch.elapsed(TimeUnit.NANOSECONDS);
logger.info("Finished repository gc on {}/{} - took {}", project.name(), repo.name(),
TextFormatter.elapsed(elapsedNanos));
} catch (Exception e) {
logger.warn("Failed to run repository gc on {}/{}", project.name(), repo.name(), e);
}
}

private boolean needsGc(Repository repo) {
final Revision endRevision = repo.normalizeNow(Revision.HEAD);
final Revision gcRevision = repo.lastGcRevision();
final int newCommitsSinceLastGc;
if (gcRevision == null) {
newCommitsSinceLastGc = endRevision.major();
} else {
newCommitsSinceLastGc = endRevision.major() - gcRevision.major();
}

return newCommitsSinceLastGc >= gcConfig.minNumNewCommits();
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.omitNullValues()
.add("target", target())
.add("scheduledFuture", scheduledFuture)
.add("stopping", stopping)
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,16 @@ public <T> CompletableFuture<MergedEntry<T>> mergeFiles(Revision revision, Merge
return unwrap().mergeFiles(revision, query);
}

@Override
public Revision gc() throws Exception {
return unwrap().gc();
}

@Override
public Revision lastGcRevision() {
return unwrap().lastGcRevision();
}

@Override
public String toString() {
return Util.simpleTypeName(this) + '(' + unwrap() + ')';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,16 @@ public <T> CompletableFuture<MergedEntry<T>> mergeFiles(Revision revision, Merge
});
}

@Override
public Revision gc() throws Exception {
return repo.gc();
}

@Override
public Revision lastGcRevision() {
return repo.lastGcRevision();
}

@Override
public String toString() {
return toStringHelper(this)
Expand Down
Loading

0 comments on commit e6caf26

Please sign in to comment.