Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 4207 - Transaction log follower logic #4225

Draft
wants to merge 7 commits into
base: 4247-read-snapshots-by-range
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package sleeper.core.statestore.transactionlog;

import sleeper.core.statestore.transactionlog.state.StateListenerBeforeApply;
import sleeper.core.statestore.transactionlog.transaction.FileReferenceTransaction;
import sleeper.core.statestore.transactionlog.transaction.PartitionTransaction;
import sleeper.core.statestore.transactionlog.transaction.StateStoreTransaction;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import sleeper.core.statestore.exception.ReplaceRequestsFailedException;
import sleeper.core.statestore.exception.SplitRequestsFailedException;
import sleeper.core.statestore.transactionlog.log.TransactionLogEntry;
import sleeper.core.statestore.transactionlog.state.StateListenerBeforeApply;
import sleeper.core.statestore.transactionlog.state.StateStoreFiles;
import sleeper.core.statestore.transactionlog.transaction.impl.AddFilesTransaction;
import sleeper.core.statestore.transactionlog.transaction.impl.AssignJobIdsTransaction;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import sleeper.core.statestore.transactionlog.log.TransactionLogStore;
import sleeper.core.statestore.transactionlog.snapshot.TransactionLogSnapshot;
import sleeper.core.statestore.transactionlog.snapshot.TransactionLogSnapshotLoader;
import sleeper.core.statestore.transactionlog.state.StateListenerBeforeApply;
import sleeper.core.statestore.transactionlog.state.StateStoreFiles;
import sleeper.core.statestore.transactionlog.state.StateStorePartitions;
import sleeper.core.statestore.transactionlog.transaction.FileReferenceTransaction;
Expand Down Expand Up @@ -167,9 +168,9 @@ private void attemptAddTransaction(Instant updateTime, AddTransactionRequest req
} catch (RuntimeException e) {
throw new StateStoreException("Failed adding transaction", e);
}
request.getBeforeApplyListener().beforeApply(entry, state);
Instant startApplyTime = Instant.now();
StateStoreTransaction<T> transaction = request.getTransaction();
request.getBeforeApplyListener().beforeApply(entry, transaction, state);
Instant startApplyTime = Instant.now();
transaction.apply(state, updateTime);
lastTransactionNumber = transactionNumber;
LOGGER.debug("Applied transaction {} in {}",
Expand Down Expand Up @@ -287,7 +288,7 @@ private void applyTransaction(TransactionLogEntry entry, StateListenerBeforeAppl
}
StateStoreTransaction<T> transaction = transactionType.cast(
entry.getTransactionOrLoadFromPointer(sleeperTable.getTableUniqueId(), transactionBodyStore));
listener.beforeApply(entry, state);
listener.beforeApply(entry, transaction, state);
transaction.apply(state, entry.getUpdateTime());
lastTransactionNumber = entry.getTransactionNumber();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import sleeper.core.statestore.transactionlog.log.TransactionLogEntry;
import sleeper.core.statestore.transactionlog.log.TransactionLogStore;
import sleeper.core.statestore.transactionlog.snapshot.TransactionLogSnapshotLoader;
import sleeper.core.statestore.transactionlog.state.StateListenerBeforeApply;
import sleeper.core.statestore.transactionlog.state.StateStoreFiles;
import sleeper.core.statestore.transactionlog.state.StateStorePartitions;
import sleeper.core.statestore.transactionlog.transaction.FileReferenceTransaction;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2022-2024 Crown Copyright
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package sleeper.core.statestore.transactionlog.state;

import sleeper.core.statestore.transactionlog.log.TransactionLogEntry;
import sleeper.core.statestore.transactionlog.transaction.impl.ReplaceFileReferencesTransaction;
import sleeper.core.table.TableStatus;
import sleeper.core.tracker.compaction.job.CompactionJobTracker;

/**
* A listener to update the compaction job tracker based on compaction commit transactions.
*/
public class CompactionJobTrackerStateListener implements StateListenerBeforeApplyByType<StateStoreFiles, ReplaceFileReferencesTransaction> {

private final TableStatus sleeperTable;
private final CompactionJobTracker tracker;

public CompactionJobTrackerStateListener(TableStatus sleeperTable, CompactionJobTracker tracker) {
this.sleeperTable = sleeperTable;
this.tracker = tracker;
}

@Override
public void beforeApply(TransactionLogEntry entry, ReplaceFileReferencesTransaction transaction, StateStoreFiles state) {
transaction.reportJobCommits(tracker, sleeperTable, state, entry.getUpdateTime());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright 2022-2024 Crown Copyright
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package sleeper.core.statestore.transactionlog.state;

import sleeper.core.statestore.transactionlog.log.TransactionLogEntry;
import sleeper.core.statestore.transactionlog.transaction.StateStoreTransaction;
import sleeper.core.statestore.transactionlog.transaction.impl.ReplaceFileReferencesTransaction;
import sleeper.core.table.TableStatus;
import sleeper.core.tracker.compaction.job.CompactionJobTracker;
import sleeper.core.tracker.ingest.job.IngestJobTracker;

import java.util.List;
import java.util.function.Consumer;

/**
* Listens to see the state before a transaction is applied to the local state. This is when the transaction is in the
* log, but before it is applied locally.
*
* @param <S> the type of state to listen on
*/
@FunctionalInterface
public interface StateListenerBeforeApply<S> {

/**
* Informs the listener that the transaction is about to be applied to the local state.
*
* @param transaction the transaction
* @param state the state
*/
void beforeApply(TransactionLogEntry entry, StateStoreTransaction<?> transaction, S state);

/**
* Creates a transaction listener that does nothing.
*
* @param <S> the type of state the transaction operates on
* @return the listener
*/
static <S> StateListenerBeforeApply<S> none() {
return (entry, transaction, state) -> {
};
}

/**
* Creates a transaction listener that updates job trackers.
*
* @param sleeperTable the Sleeper table status
* @param ingestTracker the ingest job tracker
* @param compactionTracker the compaction job tracker
* @return the listener
*/
static StateListenerBeforeApply<StateStoreFiles> updateTrackers(
TableStatus sleeperTable, IngestJobTracker ingestTracker, CompactionJobTracker compactionTracker) {
return and(List.of(
byTransactionType(ReplaceFileReferencesTransaction.class,
new CompactionJobTrackerStateListener(sleeperTable, compactionTracker))));
}

/**
* Creates a transaction listener that operates on just the state.
*
* @param <S> the type of state the transaction operates on
* @return the listener
*/
static <S> StateListenerBeforeApply<S> withState(Consumer<S> run) {
return (entry, transaction, state) -> run.accept(state);
}

/**
* Creates a transaction listener that runs if the transaction is a certain type.
*
* @param <S> the type of state the transaction operates on
* @param <T> the type of the transaction
* @return the listener
*/
static <S, T extends StateStoreTransaction<S>> StateListenerBeforeApply<S> byTransactionType(Class<T> transactionType, StateListenerBeforeApplyByType<S, T> listener) {
return (entry, transaction, state) -> {
if (transactionType.isInstance(transaction)) {
listener.beforeApply(entry, transactionType.cast(transaction), state);
}
};
}

/**
* Creates a transaction listener that operates on just the state.
*
* @param <S> the type of state the transaction operates on
* @return the listener
*/
static <S> StateListenerBeforeApply<S> and(List<StateListenerBeforeApply<S>> listeners) {
return (entry, transaction, state) -> listeners.forEach(listener -> listener.beforeApply(entry, transaction, state));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,35 +13,27 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package sleeper.core.statestore.transactionlog;
package sleeper.core.statestore.transactionlog.state;

import sleeper.core.statestore.transactionlog.log.TransactionLogEntry;
import sleeper.core.statestore.transactionlog.transaction.StateStoreTransaction;

/**
* Listens to see the state before a transaction is applied to the local state. This is when the transaction is in the
* log, but before it is applied locally.
*
* @param <S> the type of state to listen on
* @param <T> the type of transaction to listen for
*/
@FunctionalInterface
public interface StateListenerBeforeApply<S> {
public interface StateListenerBeforeApplyByType<S, T extends StateStoreTransaction<S>> {

/**
* Informs the listener that the transaction is about to be applied to the local state.
*
* @param entry the transaction log entry
* @param state the state
* @param transaction the transaction
* @param state the state
*/
void beforeApply(TransactionLogEntry entry, S state);
void beforeApply(TransactionLogEntry entry, T transaction, S state);

/**
* Creates a transaction listener that does nothing.
*
* @param <S> the type of state the transaction operates on
* @return the listener
*/
static <S> StateListenerBeforeApply<S> none() {
return (entry, state) -> {
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@
package sleeper.core.statestore.transactionlog;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

import sleeper.core.partition.PartitionsBuilder;
import sleeper.core.schema.type.LongType;
import sleeper.core.statestore.FileReference;
import sleeper.core.statestore.ReplaceFileReferencesRequest;
import sleeper.core.statestore.transactionlog.log.TransactionLogEntry;
import sleeper.core.tracker.compaction.job.update.CompactionJobCreatedEvent;
import sleeper.core.statestore.transactionlog.state.StateListenerBeforeApply;

import java.util.ArrayList;
import java.util.List;
Expand All @@ -33,11 +33,10 @@
import static sleeper.core.schema.SchemaTestHelper.schemaWithKey;
import static sleeper.core.statestore.AssignJobIdRequest.assignJobOnPartitionToFiles;

public class TransactionLogStateStoreFollowTransactionsTest extends InMemoryTransactionLogStateStoreCompactionTrackerTestBase {
public class TransactionLogStateStoreFollowTransactionsTest extends InMemoryTransactionLogStateStoreTestBase {

// Tests to add:
// - Follow partition transactions
// - Read a snapshot when updating from log
// - Local state has already applied the given transaction (fail or just ignore it?)

private TransactionLogStateStore committerStore;
Expand Down Expand Up @@ -122,7 +121,11 @@ void shouldFollowTransactionReadingPreviousSnapshot() {
// And a snapshot with file 2 replaced with file 3 (so that this will fail if reapplied on top of the second snapshot)
committerStore.assignJobIds(List.of(assignJobOnPartitionToFiles("test-job", "root", List.of("file2.parquet"))));
TransactionLogEntry entry3 = filesLogStore.getLastEntry();
committerStore.atomicallyReplaceFileReferencesWithNewOnes(List.of(replaceJobFileReferencesBuilder("test-job", List.of("file2.parquet"), file3).build()));
committerStore.atomicallyReplaceFileReferencesWithNewOnes(List.of(ReplaceFileReferencesRequest.builder()
.jobId("test-job")
.inputFiles(List.of("file2.parquet"))
.newReference(file3)
.build()));
TransactionLogEntry entry4 = filesLogStore.getLastEntry();
createSnapshots();
trackTransactionLogReads();
Expand All @@ -137,32 +140,9 @@ void shouldFollowTransactionReadingPreviousSnapshot() {
assertThat(transactionEntriesThatWereRead).containsExactly(entry3, entry4);
}

@Test
@Disabled("TODO")
void shouldUpdateTransactionLogBasedOnStateStoreProvided() {
// Given
FileReference oldFile = factory.rootFile("oldFile", 100L);
FileReference newFile = factory.rootFile("newFile", 100L);
committerStore.addFiles(List.of(oldFile));
committerStore.assignJobIds(List.of(
assignJobOnPartitionToFiles("job1", "root", List.of("oldFile"))));
CompactionJobCreatedEvent trackedJob = trackJobCreated("job1", "root", 1);
trackJobRun(trackedJob, "test-run");
committerStore.atomicallyReplaceFileReferencesWithNewOnes(List.of(
replaceJobFileReferencesBuilder("job1", List.of("oldFile"), newFile).jobRunId("test-run").build()));
TransactionLogEntry logEntry = filesLogStore.getLastEntry();

// When
loadNextTransaction(logEntry);

// Then
assertThat(tracker.getAllJobs(sleeperTable.getTableUniqueId()))
.containsExactly(defaultStatus(trackedJob, defaultCommittedRun(100)));
}

private void loadNextTransaction(TransactionLogEntry entry) {
followerStore.applyEntryFromLog(entry, (e, state) -> {
});
followerStore.applyEntryFromLog(entry, StateListenerBeforeApply.withState(state -> {
}));
}

private void trackTransactionLogReads() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright 2022-2024 Crown Copyright
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package sleeper.core.statestore.transactionlog.state;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import sleeper.core.partition.PartitionsBuilder;
import sleeper.core.schema.type.LongType;
import sleeper.core.statestore.FileReference;
import sleeper.core.statestore.transactionlog.InMemoryTransactionLogStateStoreCompactionTrackerTestBase;
import sleeper.core.statestore.transactionlog.TransactionLogStateStore;
import sleeper.core.statestore.transactionlog.log.TransactionLogEntry;
import sleeper.core.tracker.compaction.job.update.CompactionJobCreatedEvent;
import sleeper.core.tracker.ingest.job.IngestJobTracker;

import java.util.List;

import static org.assertj.core.api.Assertions.assertThat;
import static sleeper.core.schema.SchemaTestHelper.schemaWithKey;
import static sleeper.core.statestore.AssignJobIdRequest.assignJobOnPartitionToFiles;

public class CompactionJobTrackerStateListenerFollowTransactionTest extends InMemoryTransactionLogStateStoreCompactionTrackerTestBase {

private TransactionLogStateStore committerStore;
private TransactionLogStateStore followerStore;

@BeforeEach
void setUp() {
initialiseWithPartitions(new PartitionsBuilder(schemaWithKey("key", new LongType())).singlePartition("root"));
committerStore = (TransactionLogStateStore) super.store;
followerStore = stateStoreBuilder(schemaWithKey("key", new LongType())).build();
}

@Test
void shouldUpdateCompactionJobTrackerBasedOnTransaction() {
// Given
FileReference oldFile = factory.rootFile("oldFile", 100L);
FileReference newFile = factory.rootFile("newFile", 100L);
committerStore.addFiles(List.of(oldFile));
committerStore.assignJobIds(List.of(
assignJobOnPartitionToFiles("job1", "root", List.of("oldFile"))));
CompactionJobCreatedEvent trackedJob = trackJobCreated("job1", "root", 1);
trackJobRun(trackedJob, "test-run");
committerStore.fixFileUpdateTime(DEFAULT_COMMIT_TIME);
committerStore.atomicallyReplaceFileReferencesWithNewOnes(List.of(
replaceJobFileReferencesBuilder("job1", List.of("oldFile"), newFile).jobRunId("test-run").build()));
TransactionLogEntry logEntry = filesLogStore.getLastEntry();

// When
loadNextTransaction(logEntry);

// Then
assertThat(tracker.getAllJobs(sleeperTable.getTableUniqueId()))
.containsExactly(defaultStatus(trackedJob, defaultCommittedRun(100)));
}

private void loadNextTransaction(TransactionLogEntry entry) {
followerStore.applyEntryFromLog(entry, StateListenerBeforeApply.updateTrackers(sleeperTable, IngestJobTracker.NONE, tracker));
}

}
Loading
Loading