diff --git a/server/src/main/java/com/linecorp/centraldogma/server/command/AbstractPushCommand.java b/server/src/main/java/com/linecorp/centraldogma/server/command/AbstractPushCommand.java index 2905513fc7..b7f83c3a13 100644 --- a/server/src/main/java/com/linecorp/centraldogma/server/command/AbstractPushCommand.java +++ b/server/src/main/java/com/linecorp/centraldogma/server/command/AbstractPushCommand.java @@ -106,7 +106,7 @@ public boolean equals(Object obj) { return false; } - final AbstractPushCommand that = (AbstractPushCommand) obj; + final AbstractPushCommand that = (AbstractPushCommand) obj; return super.equals(that) && baseRevision.equals(that.baseRevision) && summary.equals(that.summary) && diff --git a/server/src/main/java/com/linecorp/centraldogma/server/command/Command.java b/server/src/main/java/com/linecorp/centraldogma/server/command/Command.java index 11b5fa583e..1732648e74 100644 --- a/server/src/main/java/com/linecorp/centraldogma/server/command/Command.java +++ b/server/src/main/java/com/linecorp/centraldogma/server/command/Command.java @@ -341,6 +341,20 @@ static Command push(@Nullable Long timestamp, Author author, summary, detail, markup, changes); } + /** + * Returns a new {@link Command} that transforms the content at the base revision with + * the specified {@link ContentTransformer} and pushed the result of transformation. + * You can find the result of transformation from {@link CommitResult#changes()}. + */ + static Command transform(@Nullable Long timestamp, Author author, + String projectName, String repositoryName, + Revision baseRevision, String summary, + String detail, Markup markup, + ContentTransformer transformer) { + return TransformCommand.of(timestamp, author, projectName, repositoryName, + baseRevision, summary, detail, markup, transformer); + } + /** * Returns a new {@link Command} which is used to create a new session. * diff --git a/server/src/main/java/com/linecorp/centraldogma/server/command/CommandType.java b/server/src/main/java/com/linecorp/centraldogma/server/command/CommandType.java index a688455339..f9a599cf9a 100644 --- a/server/src/main/java/com/linecorp/centraldogma/server/command/CommandType.java +++ b/server/src/main/java/com/linecorp/centraldogma/server/command/CommandType.java @@ -29,6 +29,7 @@ public enum CommandType { REMOVE_REPOSITORY(Void.class), UNREMOVE_REPOSITORY(Void.class), NORMALIZING_PUSH(CommitResult.class), + TRANSFORM(CommitResult.class), PUSH(Revision.class), SAVE_NAMED_QUERY(Void.class), REMOVE_NAMED_QUERY(Void.class), diff --git a/server/src/main/java/com/linecorp/centraldogma/server/command/ContentTransformer.java b/server/src/main/java/com/linecorp/centraldogma/server/command/ContentTransformer.java new file mode 100644 index 0000000000..0b874535fc --- /dev/null +++ b/server/src/main/java/com/linecorp/centraldogma/server/command/ContentTransformer.java @@ -0,0 +1,75 @@ +/* + * Copyright 2024 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package com.linecorp.centraldogma.server.command; + +import static com.google.common.base.Preconditions.checkArgument; +import static java.util.Objects.requireNonNull; + +import java.util.function.Function; + +import com.google.common.base.MoreObjects; + +import com.linecorp.centraldogma.common.EntryType; + +/** + * A {@link Function} which is used for transforming the content at the specified path of the repository. + */ +public final class ContentTransformer { + + private final String path; + private final EntryType entryType; + private final Function transformer; + + /** + * Creates a new instance. + */ + public ContentTransformer(String path, EntryType entryType, Function transformer) { + this.path = requireNonNull(path, "path"); + checkArgument(entryType == EntryType.JSON, "entryType: %s (expected: %s)", entryType, EntryType.JSON); + this.entryType = requireNonNull(entryType, "entryType"); + this.transformer = requireNonNull(transformer, "transformer"); + } + + /** + * Returns the path of the content to be transformed. + */ + public String path() { + return path; + } + + /** + * Returns the {@link EntryType} of the content to be transformed. + */ + public EntryType entryType() { + return entryType; + } + + /** + * Returns the {@link Function} which transforms the content. + */ + public Function transformer() { + return transformer; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("path", path) + .add("entryType", entryType) + .add("transformer", transformer) + .toString(); + } +} diff --git a/server/src/main/java/com/linecorp/centraldogma/server/command/NormalizableCommit.java b/server/src/main/java/com/linecorp/centraldogma/server/command/NormalizableCommit.java new file mode 100644 index 0000000000..ad29a95108 --- /dev/null +++ b/server/src/main/java/com/linecorp/centraldogma/server/command/NormalizableCommit.java @@ -0,0 +1,29 @@ +/* + * Copyright 2024 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package com.linecorp.centraldogma.server.command; + +/** + * A {@link Command} that can be transformed to a {@link PushAsIsCommand} via {@link #asIs(CommitResult)}. + */ +@FunctionalInterface +public interface NormalizableCommit { + + /** + * Returns a new {@link PushAsIsCommand} which is converted using {@link CommitResult} + * for replicating to other replicas. + */ + PushAsIsCommand asIs(CommitResult commitResult); +} diff --git a/server/src/main/java/com/linecorp/centraldogma/server/command/NormalizingPushCommand.java b/server/src/main/java/com/linecorp/centraldogma/server/command/NormalizingPushCommand.java index 33756196bc..c0401e0e84 100644 --- a/server/src/main/java/com/linecorp/centraldogma/server/command/NormalizingPushCommand.java +++ b/server/src/main/java/com/linecorp/centraldogma/server/command/NormalizingPushCommand.java @@ -34,7 +34,8 @@ * You can find the normalized changes from the {@link CommitResult#changes()} that is the result of * {@link CommandExecutor#execute(Command)}. */ -public final class NormalizingPushCommand extends AbstractPushCommand { +public final class NormalizingPushCommand extends AbstractPushCommand + implements NormalizableCommit { @JsonCreator NormalizingPushCommand(@JsonProperty("timestamp") @Nullable Long timestamp, @@ -50,11 +51,7 @@ public final class NormalizingPushCommand extends AbstractPushCommand CompletableFuture doExecute(Command command) throws Exceptio .thenApply(CommitResult::revision); } + if (command instanceof TransformCommand) { + return (CompletableFuture) push((TransformCommand) command, true); + } + if (command instanceof CreateSessionCommand) { return (CompletableFuture) createSession((CreateSessionCommand) command); } @@ -288,7 +292,7 @@ private CompletableFuture purgeRepository(PurgeRepositoryCommand c) { }, repositoryWorker); } - private CompletableFuture push(AbstractPushCommand c, boolean normalizing) { + private CompletableFuture push(RepositoryCommand c, boolean normalizing) { if (c.projectName().equals(INTERNAL_PROJECT_DOGMA) || c.repositoryName().equals(Project.REPO_DOGMA) || !writeQuotaEnabled()) { return push0(c, normalizing); @@ -305,7 +309,7 @@ private CompletableFuture push(AbstractPushCommand c, boolean n } private CompletableFuture tryPush( - AbstractPushCommand c, boolean normalizing, @Nullable RateLimiter rateLimiter) { + RepositoryCommand c, boolean normalizing, @Nullable RateLimiter rateLimiter) { if (rateLimiter == null || rateLimiter == UNLIMITED || rateLimiter.tryAcquire()) { return push0(c, normalizing); } else { @@ -314,9 +318,19 @@ private CompletableFuture tryPush( } } - private CompletableFuture push0(AbstractPushCommand c, boolean normalizing) { - return repo(c).commit(c.baseRevision(), c.timestamp(), c.author(), c.summary(), c.detail(), c.markup(), - c.changes(), normalizing); + private CompletableFuture push0(RepositoryCommand c, boolean normalizing) { + if (c instanceof TransformCommand) { + final TransformCommand transformCommand = (TransformCommand) c; + return repo(c).commit(transformCommand.baseRevision(), transformCommand.timestamp(), + transformCommand.author(), transformCommand.summary(), + transformCommand.detail(), transformCommand.markup(), + transformCommand.transformer()); + } + assert c instanceof AbstractPushCommand; + final AbstractPushCommand pushCommand = (AbstractPushCommand) c; + return repo(c).commit(pushCommand.baseRevision(), pushCommand.timestamp(), pushCommand.author(), + pushCommand.summary(), pushCommand.detail(), pushCommand.markup(), + pushCommand.changes(), normalizing); } private CompletableFuture getRateLimiter(String projectName, String repoName) { diff --git a/server/src/main/java/com/linecorp/centraldogma/server/command/TransformCommand.java b/server/src/main/java/com/linecorp/centraldogma/server/command/TransformCommand.java new file mode 100644 index 0000000000..868b7d81c0 --- /dev/null +++ b/server/src/main/java/com/linecorp/centraldogma/server/command/TransformCommand.java @@ -0,0 +1,112 @@ +/* + * Copyright 2024 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package com.linecorp.centraldogma.server.command; + +import static java.util.Objects.requireNonNull; + +import javax.annotation.Nullable; + +import com.fasterxml.jackson.annotation.JsonProperty; + +import com.linecorp.centraldogma.common.Author; +import com.linecorp.centraldogma.common.Markup; +import com.linecorp.centraldogma.common.Revision; + +/** + * A {@link Command} that transforms the content at the base revision with + * the specified {@link ContentTransformer} and pushed the result of transformation. + * You can find the result of transformation from {@link CommitResult#changes()}. + * Note that this command is not serialized and deserialized. + */ +public final class TransformCommand extends RepositoryCommand implements NormalizableCommit { + + /** + * Creates a new instance. + */ + public static TransformCommand of(@Nullable Long timestamp, + @Nullable Author author, String projectName, + String repositoryName, Revision baseRevision, + String summary, String detail, Markup markup, + ContentTransformer transformer) { + return new TransformCommand(timestamp, author, projectName, repositoryName, + baseRevision, summary, detail, markup, transformer); + } + + private final Revision baseRevision; + private final String summary; + private final String detail; + private final Markup markup; + private final ContentTransformer transformer; + + private TransformCommand(@Nullable Long timestamp, @Nullable Author author, + String projectName, String repositoryName, Revision baseRevision, + String summary, String detail, Markup markup, + ContentTransformer transformer) { + super(CommandType.TRANSFORM, timestamp, author, projectName, repositoryName); + this.baseRevision = baseRevision; + this.summary = summary; + this.detail = detail; + this.markup = markup; + this.transformer = transformer; + } + + /** + * Returns the base {@link Revision}. + */ + @JsonProperty + public Revision baseRevision() { + return baseRevision; + } + + /** + * Returns the human-readable summary of the commit. + */ + @JsonProperty + public String summary() { + return summary; + } + + /** + * Returns the human-readable detail of the commit. + */ + @JsonProperty + public String detail() { + return detail; + } + + /** + * Returns the {@link Markup} of the {@link #detail()}. + */ + @JsonProperty + public Markup markup() { + return markup; + } + + /** + * Returns the {@link ContentTransformer} which is used for transforming the content. + */ + public ContentTransformer transformer() { + return transformer; + } + + @Override + public PushAsIsCommand asIs(CommitResult commitResult) { + requireNonNull(commitResult, "commitResult"); + return new PushAsIsCommand(timestamp(), author(), projectName(), repositoryName(), + commitResult.revision().backward(1), summary(), detail(), + markup(), commitResult.changes()); + } +} diff --git a/server/src/main/java/com/linecorp/centraldogma/server/internal/replication/ZooKeeperCommandExecutor.java b/server/src/main/java/com/linecorp/centraldogma/server/internal/replication/ZooKeeperCommandExecutor.java index d73c7a9bc2..1ea949c563 100644 --- a/server/src/main/java/com/linecorp/centraldogma/server/internal/replication/ZooKeeperCommandExecutor.java +++ b/server/src/main/java/com/linecorp/centraldogma/server/internal/replication/ZooKeeperCommandExecutor.java @@ -104,6 +104,7 @@ import com.linecorp.centraldogma.server.command.CommandType; import com.linecorp.centraldogma.server.command.CommitResult; import com.linecorp.centraldogma.server.command.ForcePushCommand; +import com.linecorp.centraldogma.server.command.NormalizableCommit; import com.linecorp.centraldogma.server.command.NormalizingPushCommand; import com.linecorp.centraldogma.server.command.RemoveRepositoryCommand; import com.linecorp.centraldogma.server.command.UpdateServerStatusCommand; @@ -1317,19 +1318,14 @@ private T blockingExecute(Command command) throws Exception { final T result = delegate.execute(command).get(); final ReplicationLog log; - if (command.type() == CommandType.NORMALIZING_PUSH) { - final NormalizingPushCommand normalizingPushCommand = (NormalizingPushCommand) command; + final Command maybeUnwrapped = unwrapForcePush(command); + if (maybeUnwrapped instanceof NormalizableCommit) { + final NormalizableCommit normalizingPushCommand = (NormalizableCommit) maybeUnwrapped; assert result instanceof CommitResult : result; final CommitResult commitResult = (CommitResult) result; final Command pushAsIsCommand = normalizingPushCommand.asIs(commitResult); - log = new ReplicationLog<>(replicaId(), pushAsIsCommand, commitResult.revision()); - } else if (command.type() == CommandType.FORCE_PUSH && - ((ForcePushCommand) command).delegate().type() == CommandType.NORMALIZING_PUSH) { - final NormalizingPushCommand delegated = - (NormalizingPushCommand) ((ForcePushCommand) command).delegate(); - final CommitResult commitResult = (CommitResult) result; - final Command command0 = Command.forcePush(delegated.asIs(commitResult)); - log = new ReplicationLog<>(replicaId(), command0, commitResult.revision()); + log = new ReplicationLog<>(replicaId(), + maybeWrap(command, pushAsIsCommand), commitResult.revision()); } else { log = new ReplicationLog<>(replicaId(), command, result); } @@ -1349,6 +1345,20 @@ private T blockingExecute(Command command) throws Exception { } } + private static Command unwrapForcePush(Command command) { + if (command.type() == CommandType.FORCE_PUSH) { + return ((ForcePushCommand) command).delegate(); + } + return command; + } + + private static Command maybeWrap(Command oldCommand, Command pushAsIsCommand) { + if (oldCommand.type() == CommandType.FORCE_PUSH) { + return Command.forcePush(pushAsIsCommand); + } + return pushAsIsCommand; + } + private void createParentNodes() throws Exception { if (createdParentNodes) { return; diff --git a/server/src/main/java/com/linecorp/centraldogma/server/internal/storage/repository/RepositoryWrapper.java b/server/src/main/java/com/linecorp/centraldogma/server/internal/storage/repository/RepositoryWrapper.java index 51366e7cc6..7ae8102f3a 100644 --- a/server/src/main/java/com/linecorp/centraldogma/server/internal/storage/repository/RepositoryWrapper.java +++ b/server/src/main/java/com/linecorp/centraldogma/server/internal/storage/repository/RepositoryWrapper.java @@ -34,6 +34,7 @@ import com.linecorp.centraldogma.common.RevisionRange; import com.linecorp.centraldogma.internal.Util; import com.linecorp.centraldogma.server.command.CommitResult; +import com.linecorp.centraldogma.server.command.ContentTransformer; import com.linecorp.centraldogma.server.storage.project.Project; import com.linecorp.centraldogma.server.storage.repository.DiffResultType; import com.linecorp.centraldogma.server.storage.repository.FindOption; @@ -184,6 +185,13 @@ public CompletableFuture commit(Revision baseRevision, long commit return unwrap().commit(baseRevision, commitTimeMillis, author, summary, detail, markup, changes); } + @Override + public CompletableFuture commit(Revision baseRevision, long commitTimeMillis, Author author, + String summary, String detail, Markup markup, + ContentTransformer transformer) { + return unwrap().commit(baseRevision, commitTimeMillis, author, summary, detail, markup, transformer); + } + @Override public CompletableFuture findLatestRevision(Revision lastKnownRevision, String pathPattern, boolean errorOnEntryNotFound) { diff --git a/server/src/main/java/com/linecorp/centraldogma/server/internal/storage/repository/cache/CachingRepository.java b/server/src/main/java/com/linecorp/centraldogma/server/internal/storage/repository/cache/CachingRepository.java index 75e7126071..ca3732ebcf 100644 --- a/server/src/main/java/com/linecorp/centraldogma/server/internal/storage/repository/cache/CachingRepository.java +++ b/server/src/main/java/com/linecorp/centraldogma/server/internal/storage/repository/cache/CachingRepository.java @@ -44,6 +44,7 @@ import com.linecorp.centraldogma.common.Revision; import com.linecorp.centraldogma.common.RevisionRange; import com.linecorp.centraldogma.server.command.CommitResult; +import com.linecorp.centraldogma.server.command.ContentTransformer; import com.linecorp.centraldogma.server.internal.storage.repository.RepositoryCache; import com.linecorp.centraldogma.server.storage.project.Project; import com.linecorp.centraldogma.server.storage.repository.DiffResultType; @@ -327,6 +328,13 @@ public CompletableFuture commit(Revision baseRevision, long commit normalizing); } + @Override + public CompletableFuture commit(Revision baseRevision, long commitTimeMillis, Author author, + String summary, String detail, Markup markup, + ContentTransformer transformer) { + return repo.commit(baseRevision, commitTimeMillis, author, summary, detail, markup, transformer); + } + @Override public String toString() { return toStringHelper(this) diff --git a/server/src/main/java/com/linecorp/centraldogma/server/internal/storage/repository/git/AbstractChangesApplier.java b/server/src/main/java/com/linecorp/centraldogma/server/internal/storage/repository/git/AbstractChangesApplier.java new file mode 100644 index 0000000000..3860315daa --- /dev/null +++ b/server/src/main/java/com/linecorp/centraldogma/server/internal/storage/repository/git/AbstractChangesApplier.java @@ -0,0 +1,136 @@ +/* + * Copyright 2024 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package com.linecorp.centraldogma.server.internal.storage.repository.git; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import java.io.IOException; + +import javax.annotation.Nullable; + +import org.eclipse.jgit.dircache.DirCache; +import org.eclipse.jgit.dircache.DirCacheBuilder; +import org.eclipse.jgit.dircache.DirCacheEditor; +import org.eclipse.jgit.dircache.DirCacheEditor.PathEdit; +import org.eclipse.jgit.dircache.DirCacheEntry; +import org.eclipse.jgit.lib.Constants; +import org.eclipse.jgit.lib.FileMode; +import org.eclipse.jgit.lib.ObjectId; +import org.eclipse.jgit.lib.ObjectInserter; +import org.eclipse.jgit.lib.ObjectReader; +import org.eclipse.jgit.lib.Repository; + +import com.fasterxml.jackson.databind.JsonNode; + +import com.linecorp.centraldogma.common.CentralDogmaException; +import com.linecorp.centraldogma.common.Revision; +import com.linecorp.centraldogma.internal.Jackson; +import com.linecorp.centraldogma.server.storage.StorageException; + +abstract class AbstractChangesApplier { + + private static final byte[] EMPTY_BYTE = new byte[0]; + + int apply(Repository jGitRepository, @Nullable Revision baseRevision, + @Nullable ObjectId baseTreeId, DirCache dirCache) { + try (ObjectInserter inserter = jGitRepository.newObjectInserter(); + ObjectReader reader = jGitRepository.newObjectReader()) { + + if (baseTreeId != null) { + // the DirCacheBuilder is to used for doing update operations on the given DirCache object + final DirCacheBuilder builder = dirCache.builder(); + + // Add the tree object indicated by the prevRevision to the temporary DirCache object. + builder.addTree(EMPTY_BYTE, 0, reader, baseTreeId); + builder.finish(); + } + + return doApply(dirCache, reader, inserter); + } catch (CentralDogmaException | IllegalArgumentException e) { + throw e; + } catch (Exception e) { + throw new StorageException("failed to apply changes on revision: " + + (baseRevision != null ? baseRevision.major() : 0), e); + } + } + + abstract int doApply(DirCache dirCache, ObjectReader reader, ObjectInserter inserter) throws IOException; + + static void applyPathEdit(DirCache dirCache, PathEdit edit) { + final DirCacheEditor e = dirCache.editor(); + e.add(edit); + e.finish(); + } + + // PathEdit implementations which is used when applying changes. + + static final class InsertJson extends PathEdit { + private final ObjectInserter inserter; + private final JsonNode jsonNode; + + InsertJson(String entryPath, ObjectInserter inserter, JsonNode jsonNode) { + super(entryPath); + this.inserter = inserter; + this.jsonNode = jsonNode; + } + + @Override + public void apply(DirCacheEntry ent) { + try { + ent.setObjectId(inserter.insert(Constants.OBJ_BLOB, Jackson.writeValueAsBytes(jsonNode))); + ent.setFileMode(FileMode.REGULAR_FILE); + } catch (IOException e) { + throw new StorageException("failed to create a new JSON blob", e); + } + } + } + + static final class InsertText extends PathEdit { + private final ObjectInserter inserter; + private final String text; + + InsertText(String entryPath, ObjectInserter inserter, String text) { + super(entryPath); + this.inserter = inserter; + this.text = text; + } + + @Override + public void apply(DirCacheEntry ent) { + try { + ent.setObjectId(inserter.insert(Constants.OBJ_BLOB, text.getBytes(UTF_8))); + ent.setFileMode(FileMode.REGULAR_FILE); + } catch (IOException e) { + throw new StorageException("failed to create a new text blob", e); + } + } + } + + static final class CopyOldEntry extends PathEdit { + private final DirCacheEntry oldEntry; + + CopyOldEntry(String entryPath, DirCacheEntry oldEntry) { + super(entryPath); + this.oldEntry = oldEntry; + } + + @Override + public void apply(DirCacheEntry ent) { + ent.setFileMode(oldEntry.getFileMode()); + ent.setObjectId(oldEntry.getObjectId()); + } + } +} diff --git a/server/src/main/java/com/linecorp/centraldogma/server/internal/storage/repository/git/CommitExecutor.java b/server/src/main/java/com/linecorp/centraldogma/server/internal/storage/repository/git/CommitExecutor.java new file mode 100644 index 0000000000..bfd2ae8921 --- /dev/null +++ b/server/src/main/java/com/linecorp/centraldogma/server/internal/storage/repository/git/CommitExecutor.java @@ -0,0 +1,212 @@ +/* + * Copyright 2024 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package com.linecorp.centraldogma.server.internal.storage.repository.git; + +import static com.linecorp.centraldogma.server.internal.storage.repository.git.GitRepository.R_HEADS_MASTER; +import static com.linecorp.centraldogma.server.internal.storage.repository.git.GitRepository.doRefUpdate; +import static com.linecorp.centraldogma.server.internal.storage.repository.git.GitRepository.newRevWalk; +import static com.linecorp.centraldogma.server.internal.storage.repository.git.GitRepository.toTree; +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.Objects.requireNonNull; + +import java.util.List; +import java.util.function.Function; + +import javax.annotation.Nullable; + +import org.eclipse.jgit.diff.DiffEntry; +import org.eclipse.jgit.diff.DiffFormatter; +import org.eclipse.jgit.dircache.DirCache; +import org.eclipse.jgit.dircache.DirCacheIterator; +import org.eclipse.jgit.lib.CommitBuilder; +import org.eclipse.jgit.lib.ObjectId; +import org.eclipse.jgit.lib.ObjectInserter; +import org.eclipse.jgit.lib.ObjectReader; +import org.eclipse.jgit.lib.PersonIdent; +import org.eclipse.jgit.lib.Repository; +import org.eclipse.jgit.revwalk.RevWalk; +import org.eclipse.jgit.treewalk.CanonicalTreeParser; + +import com.google.common.collect.ImmutableList; + +import com.linecorp.centraldogma.common.Author; +import com.linecorp.centraldogma.common.CentralDogmaException; +import com.linecorp.centraldogma.common.Change; +import com.linecorp.centraldogma.common.ChangeConflictException; +import com.linecorp.centraldogma.common.Markup; +import com.linecorp.centraldogma.common.RedundantChangeException; +import com.linecorp.centraldogma.common.Revision; +import com.linecorp.centraldogma.server.command.CommitResult; +import com.linecorp.centraldogma.server.storage.StorageException; + +final class CommitExecutor { + + final GitRepository gitRepository; + private final long commitTimeMillis; + private final Author author; + private final String summary; + private final String detail; + private final Markup markup; + private final boolean allowEmptyCommit; + + CommitExecutor(GitRepository gitRepository, long commitTimeMillis, Author author, + String summary, String detail, Markup markup, boolean allowEmptyCommit) { + this.gitRepository = gitRepository; + this.commitTimeMillis = commitTimeMillis; + this.author = author; + this.summary = summary; + this.detail = detail; + this.markup = markup; + this.allowEmptyCommit = allowEmptyCommit; + } + + Author author() { + return author; + } + + String summary() { + return summary; + } + + void executeInitialCommit(Iterable> changes) { + commit(null, Revision.INIT, changes); + } + + CommitResult execute(Revision baseRevision, + Function>> applyingChangesProvider) { + final RevisionAndEntries res; + final Iterable> applyingChanges; + gitRepository.writeLock(); + try { + final Revision normBaseRevision = gitRepository.normalizeNow(baseRevision); + final Revision headRevision = gitRepository.cachedHeadRevision(); + if (headRevision.major() != normBaseRevision.major()) { + throw new ChangeConflictException( + "invalid baseRevision: " + baseRevision + " (expected: " + headRevision + + " or equivalent)"); + } + + applyingChanges = applyingChangesProvider.apply(normBaseRevision); + res = commit(headRevision, headRevision.forward(1), applyingChanges); + + gitRepository.setHeadRevision(res.revision); + } finally { + gitRepository.writeUnLock(); + } + + // Note that the notification is made while no lock is held to avoid the risk of a dead lock. + gitRepository.notifyWatchers(res.revision, res.diffEntries); + return CommitResult.of(res.revision, applyingChanges); + } + + RevisionAndEntries commit(@Nullable Revision prevRevision, Revision nextRevision, + Iterable> changes) { + requireNonNull(nextRevision, "nextRevision"); + requireNonNull(changes, "changes"); + + assert prevRevision == null || prevRevision.major() > 0; + assert nextRevision.major() > 0; + + final Repository jGitRepository = gitRepository.jGitRepository(); + try (ObjectInserter inserter = jGitRepository.newObjectInserter(); + ObjectReader reader = jGitRepository.newObjectReader(); + RevWalk revWalk = newRevWalk(reader)) { + + final CommitIdDatabase commitIdDatabase = gitRepository.commitIdDatabase(); + + final ObjectId prevTreeId = + prevRevision != null ? toTree(commitIdDatabase, revWalk, prevRevision) : null; + + // The staging area that keeps the entries of the new tree. + // It starts with the entries of the tree at the prevRevision (or with no entries if the + // prevRevision is the initial commit), and then this method will apply the requested changes + // to build the new tree. + final DirCache dirCache = DirCache.newInCore(); + + // Apply the changes and retrieve the list of the affected files. + final int numEdits = new DefaultChangesApplier(changes) + .apply(jGitRepository, prevRevision, prevTreeId, dirCache); + + // Reject empty commit if necessary. + final List diffEntries; + boolean isEmpty = numEdits == 0; + if (!isEmpty) { + // Even if there are edits, the resulting tree might be identical with the previous tree. + final CanonicalTreeParser p = new CanonicalTreeParser(); + p.reset(reader, prevTreeId); + final DiffFormatter diffFormatter = new DiffFormatter(null); + diffFormatter.setRepository(jGitRepository); + diffEntries = diffFormatter.scan(p, new DirCacheIterator(dirCache)); + isEmpty = diffEntries.isEmpty(); + } else { + diffEntries = ImmutableList.of(); + } + + if (!allowEmptyCommit && isEmpty) { + throw new RedundantChangeException( + "changes did not change anything in " + gitRepository.parent().name() + '/' + + gitRepository.name() + " at revision " + + (prevRevision != null ? prevRevision.major() : 0) + ": " + changes); + } + + // flush the current index to repository and get the result tree object id. + final ObjectId nextTreeId = dirCache.writeTree(inserter); + + // build a commit object + final PersonIdent personIdent = new PersonIdent(author.name(), author.email(), + commitTimeMillis / 1000L * 1000L, 0); + + final CommitBuilder commitBuilder = new CommitBuilder(); + + commitBuilder.setAuthor(personIdent); + commitBuilder.setCommitter(personIdent); + commitBuilder.setTreeId(nextTreeId); + commitBuilder.setEncoding(UTF_8); + + // Write summary, detail and revision to commit's message as JSON format. + commitBuilder.setMessage(CommitUtil.toJsonString(summary, detail, markup, nextRevision)); + + // if the head commit exists, use it as the parent commit. + if (prevRevision != null) { + commitBuilder.setParentId(commitIdDatabase.get(prevRevision)); + } + + final ObjectId nextCommitId = inserter.insert(commitBuilder); + inserter.flush(); + + // tagging the revision object, for history lookup purpose. + commitIdDatabase.put(nextRevision, nextCommitId); + doRefUpdate(jGitRepository, revWalk, R_HEADS_MASTER, nextCommitId); + + return new RevisionAndEntries(nextRevision, diffEntries); + } catch (CentralDogmaException | IllegalArgumentException e) { + throw e; + } catch (Exception e) { + throw new StorageException("failed to push at '" + gitRepository.parent().name() + '/' + + gitRepository.name() + '\'', e); + } + } + + static final class RevisionAndEntries { + final Revision revision; + final List diffEntries; + + RevisionAndEntries(Revision revision, List diffEntries) { + this.revision = revision; + this.diffEntries = diffEntries; + } + } +} diff --git a/server/src/main/java/com/linecorp/centraldogma/server/internal/storage/repository/git/DefaultChangesApplier.java b/server/src/main/java/com/linecorp/centraldogma/server/internal/storage/repository/git/DefaultChangesApplier.java new file mode 100644 index 0000000000..269f7a66f6 --- /dev/null +++ b/server/src/main/java/com/linecorp/centraldogma/server/internal/storage/repository/git/DefaultChangesApplier.java @@ -0,0 +1,312 @@ +/* + * Copyright 2024 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License + */ +package com.linecorp.centraldogma.server.internal.storage.repository.git; + +import static com.google.common.base.MoreObjects.firstNonNull; +import static com.linecorp.centraldogma.server.internal.storage.repository.git.GitRepository.sanitizeText; +import static java.nio.charset.StandardCharsets.UTF_8; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.StringJoiner; + +import javax.annotation.Nullable; + +import org.eclipse.jgit.dircache.DirCache; +import org.eclipse.jgit.dircache.DirCacheEditor; +import org.eclipse.jgit.dircache.DirCacheEditor.DeletePath; +import org.eclipse.jgit.dircache.DirCacheEditor.DeleteTree; +import org.eclipse.jgit.dircache.DirCacheEntry; +import org.eclipse.jgit.lib.Constants; +import org.eclipse.jgit.lib.ObjectInserter; +import org.eclipse.jgit.lib.ObjectReader; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import com.google.common.base.MoreObjects; + +import com.linecorp.centraldogma.common.Change; +import com.linecorp.centraldogma.common.ChangeConflictException; +import com.linecorp.centraldogma.internal.Jackson; +import com.linecorp.centraldogma.internal.Util; +import com.linecorp.centraldogma.internal.jsonpatch.JsonPatch; + +import difflib.DiffUtils; +import difflib.Patch; + +final class DefaultChangesApplier extends AbstractChangesApplier { + + private final Iterable> changes; + + DefaultChangesApplier(Iterable> changes) { + this.changes = changes; + } + + @Override + int doApply(DirCache dirCache, ObjectReader reader, ObjectInserter inserter) throws IOException { + int numEdits = 0; + // loop over the specified changes. + for (Change change : changes) { + final String changePath = change.path().substring(1); // Strip the leading '/'. + final DirCacheEntry oldEntry = dirCache.getEntry(changePath); + final byte[] oldContent = oldEntry != null ? reader.open(oldEntry.getObjectId()).getBytes() + : null; + + switch (change.type()) { + case UPSERT_JSON: { + final JsonNode oldJsonNode = oldContent != null ? Jackson.readTree(oldContent) : null; + final JsonNode newJsonNode = firstNonNull((JsonNode) change.content(), + JsonNodeFactory.instance.nullNode()); + + // Upsert only when the contents are really different. + if (!Objects.equals(newJsonNode, oldJsonNode)) { + applyPathEdit(dirCache, new InsertJson(changePath, inserter, newJsonNode)); + numEdits++; + } + break; + } + case UPSERT_TEXT: { + final String sanitizedOldText; + if (oldContent != null) { + sanitizedOldText = sanitizeText(new String(oldContent, UTF_8)); + } else { + sanitizedOldText = null; + } + + final String sanitizedNewText = sanitizeText(change.contentAsText()); + + // Upsert only when the contents are really different. + if (!sanitizedNewText.equals(sanitizedOldText)) { + applyPathEdit(dirCache, new InsertText(changePath, inserter, sanitizedNewText)); + numEdits++; + } + break; + } + case REMOVE: + if (oldEntry != null) { + applyPathEdit(dirCache, new DeletePath(changePath)); + numEdits++; + break; + } + + // The path might be a directory. + if (applyDirectoryEdits(dirCache, changePath, null, change)) { + numEdits++; + } else { + // Was not a directory either; conflict. + reportNonExistentEntry(change); + break; + } + break; + case RENAME: { + final String newPath = + ((String) change.content()).substring(1); // Strip the leading '/'. + + if (dirCache.getEntry(newPath) != null) { + throw new ChangeConflictException("a file exists at the target path: " + change); + } + + if (oldEntry != null) { + if (changePath.equals(newPath)) { + // Redundant rename request - old path and new path are same. + break; + } + + final DirCacheEditor editor = dirCache.editor(); + editor.add(new DeletePath(changePath)); + editor.add(new CopyOldEntry(newPath, oldEntry)); + editor.finish(); + numEdits++; + break; + } + + // The path might be a directory. + if (applyDirectoryEdits(dirCache, changePath, newPath, change)) { + numEdits++; + } else { + // Was not a directory either; conflict. + reportNonExistentEntry(change); + } + break; + } + case APPLY_JSON_PATCH: { + final JsonNode oldJsonNode; + if (oldContent != null) { + oldJsonNode = Jackson.readTree(oldContent); + } else { + oldJsonNode = Jackson.nullNode; + } + + final JsonNode newJsonNode; + try { + newJsonNode = JsonPatch.fromJson((JsonNode) change.content()).apply(oldJsonNode); + } catch (Exception e) { + throw new ChangeConflictException("failed to apply JSON patch: " + change, e); + } + + // Apply only when the contents are really different. + if (!newJsonNode.equals(oldJsonNode)) { + applyPathEdit(dirCache, new InsertJson(changePath, inserter, newJsonNode)); + numEdits++; + } + break; + } + case APPLY_TEXT_PATCH: + final Patch patch = DiffUtils.parseUnifiedDiff( + Util.stringToLines(sanitizeText((String) change.content()))); + + final String sanitizedOldText; + final List sanitizedOldTextLines; + if (oldContent != null) { + sanitizedOldText = sanitizeText(new String(oldContent, UTF_8)); + sanitizedOldTextLines = Util.stringToLines(sanitizedOldText); + } else { + sanitizedOldText = null; + sanitizedOldTextLines = Collections.emptyList(); + } + + final String newText; + try { + final List newTextLines = DiffUtils.patch(sanitizedOldTextLines, patch); + if (newTextLines.isEmpty()) { + newText = ""; + } else { + final StringJoiner joiner = new StringJoiner("\n", "", "\n"); + for (String line : newTextLines) { + joiner.add(line); + } + newText = joiner.toString(); + } + } catch (Exception e) { + throw new ChangeConflictException("failed to apply text patch: " + change, e); + } + + // Apply only when the contents are really different. + if (!newText.equals(sanitizedOldText)) { + applyPathEdit(dirCache, new InsertText(changePath, inserter, newText)); + numEdits++; + } + break; + } + } + return numEdits; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("changes", changes) + .toString(); + } + + /** + * Applies recursive directory edits. + * + * @param oldDir the path to the directory to make a recursive change + * @param newDir the path to the renamed directory, or {@code null} to remove the directory. + * + * @return {@code true} if any edits were made to {@code dirCache}, {@code false} otherwise + */ + private static boolean applyDirectoryEdits(DirCache dirCache, + String oldDir, @Nullable String newDir, Change change) { + + if (!oldDir.endsWith("/")) { + oldDir += '/'; + } + if (newDir != null && !newDir.endsWith("/")) { + newDir += '/'; + } + + final byte[] rawOldDir = Constants.encode(oldDir); + final byte[] rawNewDir = newDir != null ? Constants.encode(newDir) : null; + final int numEntries = dirCache.getEntryCount(); + DirCacheEditor editor = null; + + loop: + for (int i = 0; i < numEntries; i++) { + final DirCacheEntry e = dirCache.getEntry(i); + final byte[] rawPath = e.getRawPath(); + + // Ensure that there are no entries under the newDir; we have a conflict otherwise. + if (rawNewDir != null) { + boolean conflict = true; + if (rawPath.length > rawNewDir.length) { + // Check if there is a file whose path starts with 'newDir'. + for (int j = 0; j < rawNewDir.length; j++) { + if (rawNewDir[j] != rawPath[j]) { + conflict = false; + break; + } + } + } else if (rawPath.length == rawNewDir.length - 1) { + // Check if there is a file whose path is exactly same with newDir without trailing '/'. + for (int j = 0; j < rawNewDir.length - 1; j++) { + if (rawNewDir[j] != rawPath[j]) { + conflict = false; + break; + } + } + } else { + conflict = false; + } + + if (conflict) { + throw new ChangeConflictException("target directory exists already: " + change); + } + } + + // Skip the entries that do not belong to the oldDir. + if (rawPath.length <= rawOldDir.length) { + continue; + } + for (int j = 0; j < rawOldDir.length; j++) { + if (rawOldDir[j] != rawPath[j]) { + continue loop; + } + } + + // Do not create an editor until we find an entry to rename/remove. + // We can tell if there was any matching entries or not from the nullness of editor later. + if (editor == null) { + editor = dirCache.editor(); + editor.add(new DeleteTree(oldDir)); + if (newDir == null) { + // Recursive removal + break; + } + } + + assert newDir != null; // We should get here only when it's a recursive rename. + + final String oldPath = e.getPathString(); + final String newPath = newDir + oldPath.substring(oldDir.length()); + editor.add(new CopyOldEntry(newPath, e)); + } + + if (editor != null) { + editor.finish(); + return true; + } else { + return false; + } + } + + private static void reportNonExistentEntry(Change change) { + throw new ChangeConflictException("non-existent file/directory: " + change); + } +} diff --git a/server/src/main/java/com/linecorp/centraldogma/server/internal/storage/repository/git/GitRepository.java b/server/src/main/java/com/linecorp/centraldogma/server/internal/storage/repository/git/GitRepository.java index 3a2f6422c5..97087c954c 100644 --- a/server/src/main/java/com/linecorp/centraldogma/server/internal/storage/repository/git/GitRepository.java +++ b/server/src/main/java/com/linecorp/centraldogma/server/internal/storage/repository/git/GitRepository.java @@ -16,7 +16,6 @@ package com.linecorp.centraldogma.server.internal.storage.repository.git; -import static com.google.common.base.MoreObjects.firstNonNull; import static com.google.common.base.Preconditions.checkState; import static com.linecorp.centraldogma.server.internal.storage.repository.git.FailFastUtil.context; import static com.linecorp.centraldogma.server.internal.storage.repository.git.FailFastUtil.failFastIfTimedOut; @@ -35,9 +34,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Properties; -import java.util.StringJoiner; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; @@ -47,6 +44,7 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.BiConsumer; +import java.util.function.Function; import java.util.function.Supplier; import java.util.regex.Pattern; @@ -55,19 +53,10 @@ import org.eclipse.jgit.diff.DiffEntry; import org.eclipse.jgit.diff.DiffFormatter; import org.eclipse.jgit.dircache.DirCache; -import org.eclipse.jgit.dircache.DirCacheBuilder; -import org.eclipse.jgit.dircache.DirCacheEditor; -import org.eclipse.jgit.dircache.DirCacheEditor.DeletePath; -import org.eclipse.jgit.dircache.DirCacheEditor.DeleteTree; -import org.eclipse.jgit.dircache.DirCacheEditor.PathEdit; -import org.eclipse.jgit.dircache.DirCacheEntry; import org.eclipse.jgit.dircache.DirCacheIterator; -import org.eclipse.jgit.lib.CommitBuilder; import org.eclipse.jgit.lib.Constants; -import org.eclipse.jgit.lib.FileMode; import org.eclipse.jgit.lib.ObjectId; import org.eclipse.jgit.lib.ObjectIdOwnerMap; -import org.eclipse.jgit.lib.ObjectInserter; import org.eclipse.jgit.lib.ObjectReader; import org.eclipse.jgit.lib.PersonIdent; import org.eclipse.jgit.lib.Ref; @@ -89,7 +78,6 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.JsonNodeFactory; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; import com.google.common.collect.ImmutableList; @@ -100,7 +88,6 @@ import com.linecorp.centraldogma.common.Author; import com.linecorp.centraldogma.common.CentralDogmaException; import com.linecorp.centraldogma.common.Change; -import com.linecorp.centraldogma.common.ChangeConflictException; import com.linecorp.centraldogma.common.Commit; import com.linecorp.centraldogma.common.Entry; import com.linecorp.centraldogma.common.EntryNotFoundException; @@ -117,6 +104,7 @@ import com.linecorp.centraldogma.internal.jsonpatch.JsonPatch; import com.linecorp.centraldogma.internal.jsonpatch.ReplaceMode; import com.linecorp.centraldogma.server.command.CommitResult; +import com.linecorp.centraldogma.server.command.ContentTransformer; import com.linecorp.centraldogma.server.internal.IsolatedSystemReader; import com.linecorp.centraldogma.server.internal.JGitUtil; import com.linecorp.centraldogma.server.internal.storage.repository.RepositoryCache; @@ -128,8 +116,6 @@ import com.linecorp.centraldogma.server.storage.repository.Repository; import com.linecorp.centraldogma.server.storage.repository.RepositoryListener; -import difflib.DiffUtils; -import difflib.Patch; import io.netty.channel.EventLoop; /** @@ -141,7 +127,6 @@ class GitRepository implements Repository { static final String R_HEADS_MASTER = Constants.R_HEADS + Constants.MASTER; - private static final byte[] EMPTY_BYTE = new byte[0]; private static final Pattern CR = Pattern.compile("\r", Pattern.LITERAL); private static final Field revWalkObjectsField; @@ -262,10 +247,9 @@ class GitRepository implements Repository { // Initialize the commit ID database. commitIdDatabase = new CommitIdDatabase(jGitRepository); - // Insert the initial commit into the master branch. - commit0(null, Revision.INIT, creationTimeMillis, author, - "Create a new repository", "", Markup.PLAINTEXT, - Collections.emptyList(), true); + new CommitExecutor(this, creationTimeMillis, author, "Create a new repository", "", + Markup.PLAINTEXT, true) + .executeInitialCommit(Collections.emptyList()); headRevision = Revision.INIT; success = true; @@ -394,6 +378,10 @@ void internalClose() { close(() -> new CentralDogmaException("should never reach here")); } + CommitIdDatabase commitIdDatabase() { + return commitIdDatabase; + } + @Override public org.eclipse.jgit.lib.Repository jGitRepository() { return jGitRepository; @@ -726,13 +714,11 @@ public CompletableFuture>> previewDiff(Revision baseRevisi final ServiceRequestContext ctx = context(); return CompletableFuture.supplyAsync(() -> { failFastIfTimedOut(this, logger, ctx, "previewDiff", baseRevision); - return blockingPreviewDiff(baseRevision, changes); + return blockingPreviewDiff(baseRevision, new DefaultChangesApplier(changes)); }, repositoryWorker); } - private Map> blockingPreviewDiff(Revision baseRevision, Iterable> changes) { - requireNonNull(baseRevision, "baseRevision"); - requireNonNull(changes, "changes"); + Map> blockingPreviewDiff(Revision baseRevision, AbstractChangesApplier changesApplier) { baseRevision = normalizeNow(baseRevision); readLock(); @@ -742,7 +728,7 @@ private Map> blockingPreviewDiff(Revision baseRevision, Iterab final ObjectId baseTreeId = toTree(revWalk, baseRevision); final DirCache dirCache = DirCache.newInCore(); - final int numEdits = applyChanges(baseRevision, baseTreeId, dirCache, changes); + final int numEdits = changesApplier.apply(jGitRepository, baseRevision, baseTreeId, dirCache); if (numEdits == 0) { return Collections.emptyMap(); } @@ -868,313 +854,49 @@ public CompletableFuture commit( requireNonNull(detail, "detail"); requireNonNull(markup, "markup"); requireNonNull(changes, "changes"); - - final ServiceRequestContext ctx = context(); - return CompletableFuture.supplyAsync(() -> { - failFastIfTimedOut(this, logger, ctx, "commit", baseRevision, author, summary); - return blockingCommit(baseRevision, commitTimeMillis, - author, summary, detail, markup, changes, false, directExecution); - }, repositoryWorker); - } - - private CommitResult blockingCommit( - Revision baseRevision, long commitTimeMillis, Author author, String summary, - String detail, Markup markup, Iterable> changes, boolean allowEmptyCommit, - boolean directExecution) { - - requireNonNull(baseRevision, "baseRevision"); - - final RevisionAndEntries res; - final Iterable> applyingChanges; - writeLock(); - try { - final Revision normBaseRevision = normalizeNow(baseRevision); - final Revision headRevision = cachedHeadRevision(); - if (headRevision.major() != normBaseRevision.major()) { - throw new ChangeConflictException( - "invalid baseRevision: " + baseRevision + " (expected: " + headRevision + - " or equivalent)"); - } - - if (directExecution) { - applyingChanges = blockingPreviewDiff(normBaseRevision, changes).values(); - } else { - applyingChanges = changes; + final CommitExecutor commitExecutor = + new CommitExecutor(this, commitTimeMillis, author, summary, detail, markup, false); + return commit(baseRevision, commitExecutor, normBaseRevision -> { + if (!directExecution) { + return changes; } - res = commit0(headRevision, headRevision.forward(1), commitTimeMillis, - author, summary, detail, markup, applyingChanges, allowEmptyCommit); - - this.headRevision = res.revision; - } finally { - writeUnLock(); - } - - // Note that the notification is made while no lock is held to avoid the risk of a dead lock. - notifyWatchers(res.revision, res.diffEntries); - return CommitResult.of(res.revision, applyingChanges); + return blockingPreviewDiff(normBaseRevision, new DefaultChangesApplier(changes)).values(); + }); } - private RevisionAndEntries commit0(@Nullable Revision prevRevision, Revision nextRevision, - long commitTimeMillis, Author author, String summary, - String detail, Markup markup, - Iterable> changes, boolean allowEmpty) { - + @Override + public CompletableFuture commit(Revision baseRevision, long commitTimeMillis, Author author, + String summary, String detail, Markup markup, + ContentTransformer transformer) { + requireNonNull(baseRevision, "baseRevision"); requireNonNull(author, "author"); requireNonNull(summary, "summary"); - requireNonNull(changes, "changes"); requireNonNull(detail, "detail"); requireNonNull(markup, "markup"); - - assert prevRevision == null || prevRevision.major() > 0; - assert nextRevision.major() > 0; - - try (ObjectInserter inserter = jGitRepository.newObjectInserter(); - ObjectReader reader = jGitRepository.newObjectReader(); - RevWalk revWalk = newRevWalk(reader)) { - - final ObjectId prevTreeId = prevRevision != null ? toTree(revWalk, prevRevision) : null; - - // The staging area that keeps the entries of the new tree. - // It starts with the entries of the tree at the prevRevision (or with no entries if the - // prevRevision is the initial commit), and then this method will apply the requested changes - // to build the new tree. - final DirCache dirCache = DirCache.newInCore(); - - // Apply the changes and retrieve the list of the affected files. - final int numEdits = applyChanges(prevRevision, prevTreeId, dirCache, changes); - - // Reject empty commit if necessary. - final List diffEntries; - boolean isEmpty = numEdits == 0; - if (!isEmpty) { - // Even if there are edits, the resulting tree might be identical with the previous tree. - final CanonicalTreeParser p = new CanonicalTreeParser(); - p.reset(reader, prevTreeId); - final DiffFormatter diffFormatter = new DiffFormatter(null); - diffFormatter.setRepository(jGitRepository); - diffEntries = diffFormatter.scan(p, new DirCacheIterator(dirCache)); - isEmpty = diffEntries.isEmpty(); - } else { - diffEntries = ImmutableList.of(); - } - - if (!allowEmpty && isEmpty) { - throw new RedundantChangeException( - "changes did not change anything in " + parent().name() + '/' + name() + - " at revision " + (prevRevision != null ? prevRevision.major() : 0) + - ": " + changes); - } - - // flush the current index to repository and get the result tree object id. - final ObjectId nextTreeId = dirCache.writeTree(inserter); - - // build a commit object - final PersonIdent personIdent = new PersonIdent(author.name(), author.email(), - commitTimeMillis / 1000L * 1000L, 0); - - final CommitBuilder commitBuilder = new CommitBuilder(); - - commitBuilder.setAuthor(personIdent); - commitBuilder.setCommitter(personIdent); - commitBuilder.setTreeId(nextTreeId); - commitBuilder.setEncoding(UTF_8); - - // Write summary, detail and revision to commit's message as JSON format. - commitBuilder.setMessage(CommitUtil.toJsonString(summary, detail, markup, nextRevision)); - - // if the head commit exists, use it as the parent commit. - if (prevRevision != null) { - commitBuilder.setParentId(commitIdDatabase.get(prevRevision)); - } - - final ObjectId nextCommitId = inserter.insert(commitBuilder); - inserter.flush(); - - // tagging the revision object, for history lookup purpose. - commitIdDatabase.put(nextRevision, nextCommitId); - doRefUpdate(revWalk, R_HEADS_MASTER, nextCommitId); - - return new RevisionAndEntries(nextRevision, diffEntries); - } catch (CentralDogmaException | IllegalArgumentException e) { - throw e; - } catch (Exception e) { - throw new StorageException("failed to push at '" + parent.name() + '/' + name + '\'', e); - } - } - - private int applyChanges(@Nullable Revision baseRevision, @Nullable ObjectId baseTreeId, DirCache dirCache, - Iterable> changes) { - - int numEdits = 0; - - try (ObjectInserter inserter = jGitRepository.newObjectInserter(); - ObjectReader reader = jGitRepository.newObjectReader()) { - - if (baseTreeId != null) { - // the DirCacheBuilder is to used for doing update operations on the given DirCache object - final DirCacheBuilder builder = dirCache.builder(); - - // Add the tree object indicated by the prevRevision to the temporary DirCache object. - builder.addTree(EMPTY_BYTE, 0, reader, baseTreeId); - builder.finish(); - } - - // loop over the specified changes. - for (Change change : changes) { - final String changePath = change.path().substring(1); // Strip the leading '/'. - final DirCacheEntry oldEntry = dirCache.getEntry(changePath); - final byte[] oldContent = oldEntry != null ? reader.open(oldEntry.getObjectId()).getBytes() - : null; - - switch (change.type()) { - case UPSERT_JSON: { - final JsonNode oldJsonNode = oldContent != null ? Jackson.readTree(oldContent) : null; - final JsonNode newJsonNode = firstNonNull((JsonNode) change.content(), - JsonNodeFactory.instance.nullNode()); - - // Upsert only when the contents are really different. - if (!Objects.equals(newJsonNode, oldJsonNode)) { - applyPathEdit(dirCache, new InsertJson(changePath, inserter, newJsonNode)); - numEdits++; - } - break; - } - case UPSERT_TEXT: { - final String sanitizedOldText; - if (oldContent != null) { - sanitizedOldText = sanitizeText(new String(oldContent, UTF_8)); - } else { - sanitizedOldText = null; - } - - final String sanitizedNewText = sanitizeText(change.contentAsText()); - - // Upsert only when the contents are really different. - if (!sanitizedNewText.equals(sanitizedOldText)) { - applyPathEdit(dirCache, new InsertText(changePath, inserter, sanitizedNewText)); - numEdits++; - } - break; - } - case REMOVE: - if (oldEntry != null) { - applyPathEdit(dirCache, new DeletePath(changePath)); - numEdits++; - break; - } - - // The path might be a directory. - if (applyDirectoryEdits(dirCache, changePath, null, change)) { - numEdits++; - } else { - // Was not a directory either; conflict. - reportNonExistentEntry(change); - break; - } - break; - case RENAME: { - final String newPath = - ((String) change.content()).substring(1); // Strip the leading '/'. - - if (dirCache.getEntry(newPath) != null) { - throw new ChangeConflictException("a file exists at the target path: " + change); - } - - if (oldEntry != null) { - if (changePath.equals(newPath)) { - // Redundant rename request - old path and new path are same. - break; - } - - final DirCacheEditor editor = dirCache.editor(); - editor.add(new DeletePath(changePath)); - editor.add(new CopyOldEntry(newPath, oldEntry)); - editor.finish(); - numEdits++; - break; - } - - // The path might be a directory. - if (applyDirectoryEdits(dirCache, changePath, newPath, change)) { - numEdits++; - } else { - // Was not a directory either; conflict. - reportNonExistentEntry(change); - } - break; - } - case APPLY_JSON_PATCH: { - final JsonNode oldJsonNode; - if (oldContent != null) { - oldJsonNode = Jackson.readTree(oldContent); - } else { - oldJsonNode = Jackson.nullNode; - } - - final JsonNode newJsonNode; - try { - newJsonNode = JsonPatch.fromJson((JsonNode) change.content()).apply(oldJsonNode); - } catch (Exception e) { - throw new ChangeConflictException("failed to apply JSON patch: " + change, e); - } - - // Apply only when the contents are really different. - if (!newJsonNode.equals(oldJsonNode)) { - applyPathEdit(dirCache, new InsertJson(changePath, inserter, newJsonNode)); - numEdits++; - } - break; - } - case APPLY_TEXT_PATCH: - final Patch patch = DiffUtils.parseUnifiedDiff( - Util.stringToLines(sanitizeText((String) change.content()))); - - final String sanitizedOldText; - final List sanitizedOldTextLines; - if (oldContent != null) { - sanitizedOldText = sanitizeText(new String(oldContent, UTF_8)); - sanitizedOldTextLines = Util.stringToLines(sanitizedOldText); - } else { - sanitizedOldText = null; - sanitizedOldTextLines = Collections.emptyList(); - } - - final String newText; - try { - final List newTextLines = DiffUtils.patch(sanitizedOldTextLines, patch); - if (newTextLines.isEmpty()) { - newText = ""; - } else { - final StringJoiner joiner = new StringJoiner("\n", "", "\n"); - for (String line : newTextLines) { - joiner.add(line); - } - newText = joiner.toString(); - } - } catch (Exception e) { - throw new ChangeConflictException("failed to apply text patch: " + change, e); - } - - // Apply only when the contents are really different. - if (!newText.equals(sanitizedOldText)) { - applyPathEdit(dirCache, new InsertText(changePath, inserter, newText)); - numEdits++; - } - break; - } - } - } catch (CentralDogmaException | IllegalArgumentException e) { - throw e; - } catch (Exception e) { - throw new StorageException("failed to apply changes on revision " + baseRevision, e); - } - return numEdits; + requireNonNull(transformer, "transformer"); + final CommitExecutor commitExecutor = + new CommitExecutor(this, commitTimeMillis, author, summary, detail, markup, false); + return commit(baseRevision, commitExecutor, + normBaseRevision -> blockingPreviewDiff( + normBaseRevision, new TransformingChangesApplier(transformer)).values()); + } + + private CompletableFuture commit( + Revision baseRevision, + CommitExecutor commitExecutor, + Function>> applyingChangesProvider) { + final ServiceRequestContext ctx = context(); + return CompletableFuture.supplyAsync(() -> { + failFastIfTimedOut(this, logger, ctx, "commit", baseRevision, + commitExecutor.author(), commitExecutor.summary()); + return commitExecutor.execute(baseRevision, applyingChangesProvider); + }, repositoryWorker); } /** * Removes {@code \r} and appends {@code \n} on the last line if it does not end with {@code \n}. */ - private static String sanitizeText(String text) { + static String sanitizeText(String text) { if (text.indexOf('\r') >= 0) { text = CR.matcher(text).replaceAll(""); } @@ -1184,113 +906,6 @@ private static String sanitizeText(String text) { return text; } - private static void reportNonExistentEntry(Change change) { - throw new ChangeConflictException("non-existent file/directory: " + change); - } - - private static void applyPathEdit(DirCache dirCache, PathEdit edit) { - final DirCacheEditor e = dirCache.editor(); - e.add(edit); - e.finish(); - } - - /** - * Applies recursive directory edits. - * - * @param oldDir the path to the directory to make a recursive change - * @param newDir the path to the renamed directory, or {@code null} to remove the directory. - * - * @return {@code true} if any edits were made to {@code dirCache}, {@code false} otherwise - */ - private static boolean applyDirectoryEdits(DirCache dirCache, - String oldDir, @Nullable String newDir, Change change) { - - if (!oldDir.endsWith("/")) { - oldDir += '/'; - } - if (newDir != null && !newDir.endsWith("/")) { - newDir += '/'; - } - - final byte[] rawOldDir = Constants.encode(oldDir); - final byte[] rawNewDir = newDir != null ? Constants.encode(newDir) : null; - final int numEntries = dirCache.getEntryCount(); - DirCacheEditor editor = null; - - loop: - for (int i = 0; i < numEntries; i++) { - final DirCacheEntry e = dirCache.getEntry(i); - final byte[] rawPath = e.getRawPath(); - - // Ensure that there are no entries under the newDir; we have a conflict otherwise. - if (rawNewDir != null) { - boolean conflict = true; - if (rawPath.length > rawNewDir.length) { - // Check if there is a file whose path starts with 'newDir'. - for (int j = 0; j < rawNewDir.length; j++) { - if (rawNewDir[j] != rawPath[j]) { - conflict = false; - break; - } - } - } else if (rawPath.length == rawNewDir.length - 1) { - // Check if there is a file whose path is exactly same with newDir without trailing '/'. - for (int j = 0; j < rawNewDir.length - 1; j++) { - if (rawNewDir[j] != rawPath[j]) { - conflict = false; - break; - } - } - } else { - conflict = false; - } - - if (conflict) { - throw new ChangeConflictException("target directory exists already: " + change); - } - } - - // Skip the entries that do not belong to the oldDir. - if (rawPath.length <= rawOldDir.length) { - continue; - } - for (int j = 0; j < rawOldDir.length; j++) { - if (rawOldDir[j] != rawPath[j]) { - continue loop; - } - } - - // Do not create an editor until we find an entry to rename/remove. - // We can tell if there was any matching entries or not from the nullness of editor later. - if (editor == null) { - editor = dirCache.editor(); - editor.add(new DeleteTree(oldDir)); - if (newDir == null) { - // Recursive removal - break; - } - } - - assert newDir != null; // We should get here only when it's a recursive rename. - - final String oldPath = e.getPathString(); - final String newPath = newDir + oldPath.substring(oldDir.length()); - editor.add(new CopyOldEntry(newPath, e)); - } - - if (editor != null) { - editor.finish(); - return true; - } else { - return false; - } - } - - private void doRefUpdate(RevWalk revWalk, String ref, ObjectId commitId) throws IOException { - doRefUpdate(jGitRepository, revWalk, ref, commitId); - } - - @VisibleForTesting static void doRefUpdate(org.eclipse.jgit.lib.Repository jGitRepository, RevWalk revWalk, String ref, ObjectId commitId) throws IOException { @@ -1558,7 +1173,7 @@ public void removeListener(RepositoryListener listener) { listeners.remove(listener); } - private void notifyWatchers(Revision newRevision, List diffEntries) { + void notifyWatchers(Revision newRevision, List diffEntries) { for (DiffEntry entry : diffEntries) { switch (entry.getChangeType()) { case ADD: @@ -1574,10 +1189,14 @@ private void notifyWatchers(Revision newRevision, List diffEntries) { } } - private Revision cachedHeadRevision() { + Revision cachedHeadRevision() { return headRevision; } + void setHeadRevision(Revision headRevision) { + this.headRevision = headRevision; + } + /** * Returns the current revision. */ @@ -1598,6 +1217,10 @@ private Revision uncachedHeadRevision() { } private RevTree toTree(RevWalk revWalk, Revision revision) { + return toTree(commitIdDatabase, revWalk, revision); + } + + static RevTree toTree(CommitIdDatabase commitIdDatabase, RevWalk revWalk, Revision revision) { final ObjectId commitId = commitIdDatabase.get(revision); try { return revWalk.parseCommit(commitId).getTree(); @@ -1612,7 +1235,7 @@ private RevWalk newRevWalk() { return revWalk; } - private static RevWalk newRevWalk(ObjectReader reader) { + static RevWalk newRevWalk(ObjectReader reader) { final RevWalk revWalk = new RevWalk(reader); configureRevWalk(revWalk); return revWalk; @@ -1635,7 +1258,7 @@ private void readUnlock() { rwLock.readLock().unlock(); } - private void writeLock() { + void writeLock() { rwLock.writeLock().lock(); if (closePending.get() != null) { writeUnLock(); @@ -1643,7 +1266,7 @@ private void writeLock() { } } - private void writeUnLock() { + void writeUnLock() { rwLock.writeLock().unlock(); } @@ -1675,7 +1298,7 @@ public void cloneTo(File newRepoDir, BiConsumer progressListen final int batch = 16; final List commits = blockingHistory( new Revision(i), new Revision(Math.min(endRevision.major(), i + batch - 1)), - Repository.ALL_PATH, batch); + ALL_PATH, batch); checkState(!commits.isEmpty(), "empty commits"); if (previousNonEmptyRevision == null) { @@ -1688,19 +1311,21 @@ public void cloneTo(File newRepoDir, BiConsumer progressListen final Revision baseRevision = revision.backward(1); final Collection> changes = - diff(previousNonEmptyRevision, revision, Repository.ALL_PATH).join().values(); + diff(previousNonEmptyRevision, revision, ALL_PATH).join().values(); try { - newRepo.blockingCommit( - baseRevision, c.when(), c.author(), c.summary(), c.detail(), c.markup(), - changes, /* allowEmptyCommit */ false, false); + new CommitExecutor(newRepo, c.when(), c.author(), c.summary(), + c.detail(), c.markup(), false) + .execute(baseRevision, normBaseRevision -> blockingPreviewDiff( + normBaseRevision, new DefaultChangesApplier(changes)).values()); previousNonEmptyRevision = revision; } catch (RedundantChangeException e) { // NB: We allow an empty commit here because an old version of Central Dogma had a bug // which allowed the creation of an empty commit. - newRepo.blockingCommit( - baseRevision, c.when(), c.author(), c.summary(), c.detail(), c.markup(), - changes, /* allowEmptyCommit */ true, false); + new CommitExecutor(newRepo, c.when(), c.author(), c.summary(), + c.detail(), c.markup(), true) + .execute(baseRevision, normBaseRevision -> blockingPreviewDiff( + normBaseRevision, new DefaultChangesApplier(changes)).values()); } progressListener.accept(i, endRevision.major()); @@ -1731,73 +1356,4 @@ public String toString() { .add("dir", jGitRepository.getDirectory()) .toString(); } - - private static final class RevisionAndEntries { - final Revision revision; - final List diffEntries; - - RevisionAndEntries(Revision revision, List diffEntries) { - this.revision = revision; - this.diffEntries = diffEntries; - } - } - - // PathEdit implementations which is used when applying changes. - - private static final class InsertText extends PathEdit { - private final ObjectInserter inserter; - private final String text; - - InsertText(String entryPath, ObjectInserter inserter, String text) { - super(entryPath); - this.inserter = inserter; - this.text = text; - } - - @Override - public void apply(DirCacheEntry ent) { - try { - ent.setObjectId(inserter.insert(Constants.OBJ_BLOB, text.getBytes(UTF_8))); - ent.setFileMode(FileMode.REGULAR_FILE); - } catch (IOException e) { - throw new StorageException("failed to create a new text blob", e); - } - } - } - - private static final class InsertJson extends PathEdit { - private final ObjectInserter inserter; - private final JsonNode jsonNode; - - InsertJson(String entryPath, ObjectInserter inserter, JsonNode jsonNode) { - super(entryPath); - this.inserter = inserter; - this.jsonNode = jsonNode; - } - - @Override - public void apply(DirCacheEntry ent) { - try { - ent.setObjectId(inserter.insert(Constants.OBJ_BLOB, Jackson.writeValueAsBytes(jsonNode))); - ent.setFileMode(FileMode.REGULAR_FILE); - } catch (IOException e) { - throw new StorageException("failed to create a new JSON blob", e); - } - } - } - - private static final class CopyOldEntry extends PathEdit { - private final DirCacheEntry oldEntry; - - CopyOldEntry(String entryPath, DirCacheEntry oldEntry) { - super(entryPath); - this.oldEntry = oldEntry; - } - - @Override - public void apply(DirCacheEntry ent) { - ent.setFileMode(oldEntry.getFileMode()); - ent.setObjectId(oldEntry.getObjectId()); - } - } } diff --git a/server/src/main/java/com/linecorp/centraldogma/server/internal/storage/repository/git/TransformingChangesApplier.java b/server/src/main/java/com/linecorp/centraldogma/server/internal/storage/repository/git/TransformingChangesApplier.java new file mode 100644 index 0000000000..663f41c36c --- /dev/null +++ b/server/src/main/java/com/linecorp/centraldogma/server/internal/storage/repository/git/TransformingChangesApplier.java @@ -0,0 +1,77 @@ +/* + * Copyright 2024 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package com.linecorp.centraldogma.server.internal.storage.repository.git; + +import static com.google.common.base.Preconditions.checkArgument; +import static java.util.Objects.requireNonNull; + +import java.io.IOException; +import java.util.Objects; + +import org.eclipse.jgit.dircache.DirCache; +import org.eclipse.jgit.dircache.DirCacheEntry; +import org.eclipse.jgit.lib.ObjectInserter; +import org.eclipse.jgit.lib.ObjectReader; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import com.google.common.base.MoreObjects; + +import com.linecorp.centraldogma.common.ChangeConflictException; +import com.linecorp.centraldogma.common.EntryType; +import com.linecorp.centraldogma.internal.Jackson; +import com.linecorp.centraldogma.server.command.ContentTransformer; + +final class TransformingChangesApplier extends AbstractChangesApplier { + + private final ContentTransformer transformer; + + TransformingChangesApplier(ContentTransformer transformer) { + checkArgument(transformer.entryType() == EntryType.JSON, + "transformer: %s (expected: JSON type)", transformer); + //noinspection unchecked + this.transformer = (ContentTransformer) transformer; + } + + @Override + int doApply(DirCache dirCache, ObjectReader reader, ObjectInserter inserter) throws IOException { + final String changePath = transformer.path().substring(1); // Strip the leading '/'. + final DirCacheEntry oldEntry = dirCache.getEntry(changePath); + final byte[] oldContent = oldEntry != null ? reader.open(oldEntry.getObjectId()).getBytes() + : null; + final JsonNode oldJsonNode = oldContent != null ? Jackson.readTree(oldContent) + : JsonNodeFactory.instance.nullNode(); + try { + final JsonNode newJsonNode = transformer.transformer().apply(oldJsonNode.deepCopy()); + requireNonNull(newJsonNode, "transformer.transformer().apply() returned null"); + if (!Objects.equals(newJsonNode, oldJsonNode)) { + applyPathEdit(dirCache, new InsertJson(changePath, inserter, newJsonNode)); + return 1; + } + } catch (Exception e) { + throw new ChangeConflictException("failed to transform the content: " + oldJsonNode + + " transformer: " + transformer, e); + } + return 0; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("transformer", transformer) + .toString(); + } +} diff --git a/server/src/main/java/com/linecorp/centraldogma/server/storage/repository/Repository.java b/server/src/main/java/com/linecorp/centraldogma/server/storage/repository/Repository.java index bd6f6136dc..b374ca42dd 100644 --- a/server/src/main/java/com/linecorp/centraldogma/server/storage/repository/Repository.java +++ b/server/src/main/java/com/linecorp/centraldogma/server/storage/repository/Repository.java @@ -54,6 +54,7 @@ import com.linecorp.centraldogma.common.RevisionRange; import com.linecorp.centraldogma.internal.HistoryConstants; import com.linecorp.centraldogma.server.command.CommitResult; +import com.linecorp.centraldogma.server.command.ContentTransformer; import com.linecorp.centraldogma.server.internal.replication.ReplicationLog; import com.linecorp.centraldogma.server.storage.StorageException; import com.linecorp.centraldogma.server.storage.project.Project; @@ -404,6 +405,14 @@ CompletableFuture commit(Revision baseRevision, long commitTimeMil Author author, String summary, String detail, Markup markup, Iterable> changes, boolean directExecution); + /** + * Adds the content that is transformed by the specified {@link ContentTransformer} to + * this {@link Repository}. + */ + CompletableFuture commit(Revision baseRevision, long commitTimeMillis, + Author author, String summary, String detail, Markup markup, + ContentTransformer transformer); + /** * Get a list of {@link Commit} for given pathPattern. * diff --git a/server/src/test/java/com/linecorp/centraldogma/server/command/StandaloneCommandExecutorTest.java b/server/src/test/java/com/linecorp/centraldogma/server/command/StandaloneCommandExecutorTest.java index a097f6d5ba..6c913bf9e2 100644 --- a/server/src/test/java/com/linecorp/centraldogma/server/command/StandaloneCommandExecutorTest.java +++ b/server/src/test/java/com/linecorp/centraldogma/server/command/StandaloneCommandExecutorTest.java @@ -21,6 +21,7 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.util.List; +import java.util.function.Function; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -34,6 +35,7 @@ import com.linecorp.centraldogma.common.Author; import com.linecorp.centraldogma.common.Change; +import com.linecorp.centraldogma.common.EntryType; import com.linecorp.centraldogma.common.Markup; import com.linecorp.centraldogma.common.ReadOnlyException; import com.linecorp.centraldogma.common.Revision; @@ -98,7 +100,8 @@ void jsonUpsertPushCommandConvertedIntoJsonPatchWhenApplicable() { Markup.PLAINTEXT, change)) .join(); // The same json upsert. - assertThat(commitResult).isEqualTo(CommitResult.of(new Revision(2), ImmutableList.of(change))); + final Revision previousRevision = commitResult.revision(); + assertThat(commitResult).isEqualTo(CommitResult.of(previousRevision, ImmutableList.of(change))); // Json upsert is converted into json patch. change = Change.ofJsonUpsert("/foo.json", "{\"a\": \"c\"}"); @@ -108,7 +111,7 @@ void jsonUpsertPushCommandConvertedIntoJsonPatchWhenApplicable() { Markup.PLAINTEXT, change)) .join(); - assertThat(commitResult.revision()).isEqualTo(new Revision(3)); + assertThat(commitResult.revision()).isEqualTo(previousRevision.forward(1)); final List> changes = commitResult.changes(); assertThat(changes).hasSize(1); assertThatJson(changes.get(0).content()).isEqualTo( @@ -116,14 +119,14 @@ void jsonUpsertPushCommandConvertedIntoJsonPatchWhenApplicable() { "\"path\":\"/a\"," + "\"oldValue\":\"b\"," + "\"value\":\"c\"}" + - "]"); + ']'); change = Change.ofJsonUpsert("/foo.json", "{\"a\": \"d\"}"); // PushAsIs just uses the json upsert. final Revision revision = executor.execute( new PushAsIsCommand(0L, Author.SYSTEM, TEST_PRJ, TEST_REPO2, Revision.HEAD, "", "", Markup.PLAINTEXT, ImmutableList.of(change))).join(); - assertThat(revision).isEqualTo(new Revision(4)); + assertThat(revision).isEqualTo(previousRevision.forward(2)); } @Test @@ -148,6 +151,8 @@ void shouldPerformAdministrativeCommandWithReadOnly() throws JsonParseException .join() .contentAsJson(); assertThat(json.get("a").asText()).isEqualTo("b"); + executor.execute(Command.updateServerStatus(ServerStatus.WRITABLE)).join(); + assertThat(executor.isWritable()).isTrue(); } @Test @@ -169,4 +174,46 @@ void createInternalProject() { .join(); assertThat(commitResult).isEqualTo(CommitResult.of(new Revision(2), ImmutableList.of(change))); } + + @Test + void transformCommandConvertedIntoJsonPatch() { + final StandaloneCommandExecutor executor = (StandaloneCommandExecutor) extension.executor(); + + // Initial commit. + final Change change = Change.ofJsonUpsert("/bar.json", "{\"a\": \"b\"}"); + CommitResult commitResult = + executor.execute(Command.push( + Author.SYSTEM, TEST_PRJ, TEST_REPO2, Revision.HEAD, "", "", + Markup.PLAINTEXT, change)) + .join(); + // The same json upsert. + final Revision previousRevision = commitResult.revision(); + assertThat(commitResult).isEqualTo(CommitResult.of(previousRevision, ImmutableList.of(change))); + + final Function transformer = jsonNode -> { + if (jsonNode.has("a")) { + ((ObjectNode) jsonNode).put("a", "c"); + } + return jsonNode; + }; + final ContentTransformer contentTransformer = + new ContentTransformer<>("/bar.json", EntryType.JSON, transformer); + + commitResult = + executor.execute(Command.transform( + null, Author.SYSTEM, TEST_PRJ, TEST_REPO2, Revision.HEAD, "", "", + Markup.PLAINTEXT, contentTransformer)).join(); + + // Json upsert is converted into json patch. + + assertThat(commitResult.revision()).isEqualTo(previousRevision.forward(1)); + final List> changes = commitResult.changes(); + assertThat(changes).hasSize(1); + assertThatJson(changes.get(0).content()).isEqualTo( + "[{\"op\":\"safeReplace\"," + + "\"path\":\"/a\"," + + "\"oldValue\":\"b\"," + + "\"value\":\"c\"}" + + ']'); + } } diff --git a/server/src/test/java/com/linecorp/centraldogma/server/internal/replication/ReplicationLogTest.java b/server/src/test/java/com/linecorp/centraldogma/server/internal/replication/ReplicationLogTest.java index bc82e83f44..effefa96be 100644 --- a/server/src/test/java/com/linecorp/centraldogma/server/internal/replication/ReplicationLogTest.java +++ b/server/src/test/java/com/linecorp/centraldogma/server/internal/replication/ReplicationLogTest.java @@ -30,9 +30,7 @@ import com.linecorp.centraldogma.server.command.CommitResult; import com.linecorp.centraldogma.server.command.NormalizingPushCommand; import com.linecorp.centraldogma.server.command.PushAsIsCommand; -import com.linecorp.centraldogma.testing.internal.FlakyTest; -@FlakyTest class ReplicationLogTest { private static final Author AUTHOR = new Author("foo", "bar@baz.com"); diff --git a/server/src/test/java/com/linecorp/centraldogma/server/internal/replication/ZooKeeperCommandExecutorTest.java b/server/src/test/java/com/linecorp/centraldogma/server/internal/replication/ZooKeeperCommandExecutorTest.java index 99a7baeb9d..e3f10e9ac5 100644 --- a/server/src/test/java/com/linecorp/centraldogma/server/internal/replication/ZooKeeperCommandExecutorTest.java +++ b/server/src/test/java/com/linecorp/centraldogma/server/internal/replication/ZooKeeperCommandExecutorTest.java @@ -49,9 +49,14 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.function.ThrowingConsumer; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.github.benmanes.caffeine.cache.Cache; import com.google.common.collect.ImmutableList; @@ -59,15 +64,19 @@ import com.linecorp.armeria.common.metric.MoreMeters; import com.linecorp.centraldogma.common.Author; import com.linecorp.centraldogma.common.Change; +import com.linecorp.centraldogma.common.EntryType; import com.linecorp.centraldogma.common.Markup; import com.linecorp.centraldogma.common.ReadOnlyException; import com.linecorp.centraldogma.common.Revision; import com.linecorp.centraldogma.server.QuotaConfig; import com.linecorp.centraldogma.server.command.Command; +import com.linecorp.centraldogma.server.command.CommandType; import com.linecorp.centraldogma.server.command.CommitResult; +import com.linecorp.centraldogma.server.command.ContentTransformer; import com.linecorp.centraldogma.server.command.ForcePushCommand; import com.linecorp.centraldogma.server.command.NormalizingPushCommand; import com.linecorp.centraldogma.server.command.PushAsIsCommand; +import com.linecorp.centraldogma.server.command.TransformCommand; import com.linecorp.centraldogma.server.management.ServerStatus; import com.linecorp.centraldogma.testing.internal.FlakyTest; @@ -75,9 +84,10 @@ class ZooKeeperCommandExecutorTest { private static final Logger logger = LoggerFactory.getLogger(ZooKeeperCommandExecutorTest.class); - - private static final Change pushChange = Change.ofTextUpsert("/foo", "bar"); - private static final Change normalizedChange = Change.ofTextUpsert("/foo", "bar_normalized"); + private static final Change pushChange = Change.ofJsonUpsert("/foo.json", "{\"a\":\"b\"}"); + private static final Change normalizedChange = + Change.ofJsonPatch("/foo.json", + "[{\"op\":\"safeReplace\",\"path\":\"/a\",\"oldValue\":\"b\",\"value\":\"c\"}]"); @Test void testLogWatch() throws Exception { @@ -276,9 +286,10 @@ void hierarchicalQuorums() throws Throwable { } } - @Test @Timeout(120) - void hierarchicalQuorumsWithFailOver() throws Throwable { + @ValueSource(booleans = { true, false }) + @ParameterizedTest + void hierarchicalQuorumsWithFailOver(boolean normalizingPushCommand) throws Throwable { try (Cluster cluster = Cluster.builder() .numReplicas(9) .numGroup(3) @@ -313,18 +324,7 @@ void hierarchicalQuorumsWithFailOver() throws Throwable { cluster.get(1).commandExecutor().stop().join(); cluster.get(4).commandExecutor().stop().join(); - final Command normalizingPushCommand = - Command.push(0L, Author.SYSTEM, "project", "repo1", new Revision(1), - "summary", "detail", - Markup.PLAINTEXT, - ImmutableList.of(pushChange)); - - assert normalizingPushCommand instanceof NormalizingPushCommand; - final PushAsIsCommand asIsCommand = ((NormalizingPushCommand) normalizingPushCommand).asIs( - CommitResult.of(new Revision(2), ImmutableList.of(normalizedChange))); - - assertThat(replica1.commandExecutor().execute(normalizingPushCommand).join().revision()) - .isEqualTo(new Revision(2)); + final PushAsIsCommand asIsCommand = executeCommand(replica1, normalizingPushCommand); final ReplicationLog commandResult2 = replica1.commandExecutor().loadLog(1, false).get(); assertThat(commandResult2.command()).isEqualTo(asIsCommand); assertThat(commandResult2.result()).isInstanceOf(Revision.class); @@ -359,6 +359,47 @@ void hierarchicalQuorumsWithFailOver() throws Throwable { } } + private static PushAsIsCommand executeCommand(Replica replica1, boolean normalizingPush) { + if (normalizingPush) { + final Command normalizingPushCommand = + Command.push(0L, Author.SYSTEM, "project", "repo1", new Revision(1), + "summary", "detail", + Markup.PLAINTEXT, + ImmutableList.of(pushChange)); + + assert normalizingPushCommand instanceof NormalizingPushCommand; + final PushAsIsCommand asIsCommand = ((NormalizingPushCommand) normalizingPushCommand).asIs( + CommitResult.of(new Revision(2), ImmutableList.of(normalizedChange))); + + assertThat(replica1.commandExecutor().execute(normalizingPushCommand).join().revision()) + .isEqualTo(new Revision(2)); + return asIsCommand; + } else { + final Function transformer = jsonNode -> { + final JsonNode oldContent = pushChange.content(); + assertThat(jsonNode).isEqualTo(oldContent); + final JsonNode newContent = oldContent.deepCopy(); + ((ObjectNode) newContent).put("a", "c"); + return newContent; + }; + final ContentTransformer contentTransformer = new ContentTransformer<>( + pushChange.path(), EntryType.JSON, transformer); + final Command transformCommand = + Command.transform(0L, Author.SYSTEM, "project", "repo1", new Revision(1), + "summary", "detail", + Markup.PLAINTEXT, contentTransformer); + + assert transformCommand instanceof TransformCommand; + final PushAsIsCommand asIsCommand = + ((TransformCommand) transformCommand).asIs( + CommitResult.of(new Revision(2), ImmutableList.of(normalizedChange))); + + assertThat(replica1.commandExecutor().execute(transformCommand).join().revision()) + .isEqualTo(new Revision(2)); + return asIsCommand; + } + } + @Test void hierarchicalQuorums_writingOnZeroWeightReplica() throws Throwable { try (Cluster cluster = Cluster.builder() @@ -628,12 +669,27 @@ static Function, CompletableFuture> newMockDelegate() { argument = ((ForcePushCommand) argument).delegate(); } if (argument instanceof NormalizingPushCommand) { - if (((NormalizingPushCommand) argument).changes().equals( + final NormalizingPushCommand normalizingPushCommand = + (NormalizingPushCommand) argument; + assertThat(normalizingPushCommand.type()).isSameAs(CommandType.NORMALIZING_PUSH); + if (normalizingPushCommand.changes().equals( ImmutableList.of(pushChange))) { return completedFuture( CommitResult.of(revision, ImmutableList.of(normalizedChange))); } } + + if (argument instanceof TransformCommand) { + final TransformCommand pushCommand = + (TransformCommand) argument; + assertThat(pushCommand.type()).isSameAs(CommandType.TRANSFORM); + final Function transformer = + (Function) pushCommand.transformer().transformer(); + final JsonNode applied = transformer.apply(pushChange.content()); + assertThat(applied).isEqualTo(JsonNodeFactory.instance.objectNode().put("a", "c")); + return completedFuture( + CommitResult.of(revision, ImmutableList.of(normalizedChange))); + } return completedFuture(CommitResult.of(revision, ImmutableList.of())); });