Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] introduce two phase deletion protocol. #15834

Closed
wants to merge 46 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
0c1a891
init snip-145
horizonzy May 28, 2022
df055a6
redefine TrashDataComponent proto
horizonzy May 28, 2022
4731f73
fix compile problem.
horizonzy May 28, 2022
e37e5c4
fix code style.
horizonzy May 28, 2022
5d04e3d
fix code format.
horizonzy May 28, 2022
b009d7e
complete ManagedTrashMXBean
horizonzy May 28, 2022
93ca968
fix trashData value null problem.
horizonzy May 28, 2022
a133007
fix EMPTY_LEDGER_INFO initialize problem.
horizonzy May 29, 2022
c5ca979
remove triggerDelete schedule task.
horizonzy May 30, 2022
61a0e91
code clean.
horizonzy May 30, 2022
464a407
revert broker.conf
horizonzy May 30, 2022
2a34693
code format.
horizonzy May 30, 2022
3e571e2
optimize persist data when the context is empty.
horizonzy May 30, 2022
699d05a
complete admin api.
horizonzy May 30, 2022
2f9acb2
support trigger config.
horizonzy May 30, 2022
79deefd
fix check style
horizonzy May 30, 2022
510da88
compatible
horizonzy May 31, 2022
9dd35c3
support config to control is support two phase deletion.
horizonzy May 31, 2022
0f97c20
code style fix.
horizonzy May 31, 2022
488d2ce
make ManagedTrash cover more case.
horizonzy May 31, 2022
234dd94
make triggerDeleteInBackground to interface.
horizonzy May 31, 2022
cfb950d
make triggerDeleteInBackground to interface.
horizonzy May 31, 2022
428bf11
fix ignore uncompleted offload ledger problem.
horizonzy Jun 1, 2022
cb6e122
when close managedLedger, close managedTrash together.
horizonzy Jun 1, 2022
02c7867
use object to take place of string on trashData key.
horizonzy Jun 2, 2022
26e55a1
change TrashDataKey -> TrashKey.
horizonzy Jun 2, 2022
5a2c4a4
add backoff policy to avoid same key to delete frequently in a short …
horizonzy Jun 2, 2022
59a21fc
code clean.
horizonzy Jun 2, 2022
c121e42
complete unit test.
horizonzy Jun 6, 2022
7e6c007
Merge branch 'master' into snip-145
horizonzy Jun 6, 2022
9dd87e2
code clean
horizonzy Jun 6, 2022
82dd67c
check retryCount is 0 when persist archive data.
horizonzy Jun 6, 2022
9db6dec
use timestamp to take place of ledgerId as archive node suffix.
horizonzy Jun 6, 2022
896720e
add test unit to cover code.
horizonzy Jun 8, 2022
82011c0
Merge remote-tracking branch 'upstream/master' into snip-145
horizonzy Jun 8, 2022
6552514
when didn't get lock to delete, make next delete procedure no delay.
horizonzy Jun 8, 2022
6cfa99d
when didn't get lock to delete, make next delete procedure no delay.
horizonzy Jun 8, 2022
0b21f49
when no more trashData to delete, make continueDeleteImmediately = 0
horizonzy Jun 8, 2022
b4c54d5
1.make maxDeleteCount support config.
horizonzy Jun 8, 2022
fc70690
trigger delete after update trashData.
horizonzy Jun 10, 2022
3c7303c
make allTrashDataDeleteOnce async.
horizonzy Jun 10, 2022
056372a
add test case for getToDeleteData
horizonzy Jun 13, 2022
9cba2ab
enhance unit test.
horizonzy Jun 13, 2022
8fa183d
Add api doc info.
horizonzy Jun 13, 2022
91d8fcc
code clean.
horizonzy Jun 13, 2022
793687c
fix code style.
horizonzy Jun 13, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ public class ManagedLedgerConfig {
@Getter
@Setter
private boolean cacheEvictionByMarkDeletedPosition = false;
private boolean supportTwoPhaseDeletion = false;
private int archiveDataLimitSize = 500;
private int retryDeleteIntervalSeconds = 60;
private int maxDeleteCount = 5;

public boolean isCreateIfMissing() {
return createIfMissing;
Expand Down Expand Up @@ -683,4 +687,35 @@ public void setInactiveLedgerRollOverTime(int inactiveLedgerRollOverTimeMs, Time
this.inactiveLedgerRollOverTimeMs = (int) unit.toMillis(inactiveLedgerRollOverTimeMs);
}

public boolean isSupportTwoPhaseDeletion() {
return supportTwoPhaseDeletion;
}

public void setSupportTwoPhaseDeletion(boolean supportTwoPhaseDeletion) {
this.supportTwoPhaseDeletion = supportTwoPhaseDeletion;
}

public int getArchiveDataLimitSize() {
return archiveDataLimitSize;
}

public void setArchiveDataLimitSize(int archiveDataLimitSize) {
this.archiveDataLimitSize = archiveDataLimitSize;
}

public int getRetryDeleteIntervalSeconds() {
return retryDeleteIntervalSeconds;
}

public void setRetryDeleteIntervalSeconds(int retryDeleteIntervalSeconds) {
this.retryDeleteIntervalSeconds = retryDeleteIntervalSeconds;
}

public void setMaxDeleteCount(int maxDeleteCount) {
this.maxDeleteCount = maxDeleteCount;
}

public int getMaxDeleteCount() {
return maxDeleteCount;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger;

import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.mledger.impl.ManagedTrashImpl;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;

public interface ManagedTrash {

enum ManagedType {
MANAGED_LEDGER("managed-ledger"),
MANAGED_CURSOR("managed-cursor"),
SCHEMA("schema");
private final String name;

ManagedType(String name) {
this.name = name;
}

public String getName() {
return name;
}
}

enum LedgerType {
BOTH,
OFFLOAD_LEDGER,
LEDGER
}

/**
* ManagedTrash name.
*
* @return full topic name + type
*/
String name();

/**
* Initialize.
*/
CompletableFuture<?> initialize();

/**
* Append waiting to delete ledger.
*
* @param ledgerId ledgerId
* @param context ledgerInfo, if offload ledger, need offload context
* @param type LEDGER or OFFLOAD_LEDGER
* @throws ManagedLedgerException
*/
void appendLedgerTrashData(long ledgerId, LedgerInfo context, LedgerType type) throws ManagedLedgerException;

/**
* Persist trash data to meta store.
*/
CompletableFuture<?> asyncUpdateTrashData();

/**
* Trigger deletion procedure.
*/
void triggerDeleteInBackground();

/**
* Get all archive index, it needs combine with getArchiveData.
*/
CompletableFuture<List<Long>> getAllArchiveIndex();

/**
* Get archive data detail info.
*
* @param index archive index
* @return
*/
CompletableFuture<Map<ManagedTrashImpl.TrashKey, LedgerInfo>> getArchiveData(long index);

/**
* Async close managedTrash, it will persist trash data to meta store.
* @return
*/
CompletableFuture<?> asyncClose();

/**
* Async close managedTrash, it can ensure that all ledger least delete once (exclude offload_ledger).
* @return
*/
CompletableFuture<?> asyncCloseAfterAllLedgerDeleteOnce();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger;

import org.apache.bookkeeper.common.annotation.InterfaceAudience;
import org.apache.bookkeeper.common.annotation.InterfaceStability;

/**
* JMX Bean interface for ManagedTrash stats.
*/
@InterfaceAudience.LimitedPrivate
@InterfaceStability.Stable
public interface ManagedTrashMXBean {

String getName();

long getCurrentNumberOfLedgersWaitingToDelete();

void increaseTotalNumberOfDeleteLedgers();

long getTotalNumberOfDeleteLedgers();

long getCurrentNumberOfLedgersWaitingToArchive();

void increaseTotalNumberOfArchiveLedgers();

long getTotalNumberOfArchiveLedgers();
}
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ public void asyncOpen(final String name, final ManagedLedgerConfig config, final
bookkeeperFactory.get(
new EnsemblePlacementPolicyConfig(config.getBookKeeperEnsemblePlacementPolicyClassName(),
config.getBookKeeperEnsemblePlacementPolicyProperties())),
store, config, scheduledExecutor, name, mlOwnershipChecker);
store, metadataStore, config, scheduledExecutor, name, mlOwnershipChecker);
PendingInitializeManagedLedger pendingLedger = new PendingInitializeManagedLedger(newledger);
pendingInitializeLedgers.put(name, pendingLedger);
newledger.initialize(new ManagedLedgerInitializeLedgerCallback() {
Expand Down Expand Up @@ -480,7 +480,7 @@ public void asyncOpenReadOnlyCursor(String managedLedgerName, Position startPosi
bookkeeperFactory
.get(new EnsemblePlacementPolicyConfig(config.getBookKeeperEnsemblePlacementPolicyClassName(),
config.getBookKeeperEnsemblePlacementPolicyProperties())),
store, config, scheduledExecutor, managedLedgerName);
store, metadataStore, config, scheduledExecutor, managedLedgerName);

roManagedLedger.initializeAndCreateCursor((PositionImpl) startPosition)
.thenAccept(roCursor -> callback.openReadOnlyCursorComplete(roCursor, ctx))
Expand Down
Loading