Skip to content

Commit

Permalink
Revert "Get rid of persistanceManager, persistanceMAnagerFactory and …
Browse files Browse the repository at this point in the history
…Transaction"

This reverts commit 1f1ab1d.
khys95 committed Jun 18, 2024
1 parent 1f1ab1d commit d772420
Showing 2 changed files with 101 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -34,14 +34,16 @@
import java.util.function.Consumer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.jdo.PersistenceManager;
import javax.jdo.PersistenceManagerFactory;
import javax.jdo.Transaction;
import org.dcache.cells.CellStub;
import org.dcache.poolmanager.PoolManagerStub;
import org.dcache.util.Args;
import org.dcache.util.CDCExecutorServiceDecorator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.kafka.core.KafkaTemplate;


@@ -79,13 +81,15 @@ public abstract class TransferManager extends AbstractCellComponent
public final Set<PnfsId> justRequestedIDs = new HashSet<>();
private final ExecutorService executor =
new CDCExecutorServiceDecorator<>(Executors.newCachedThreadPool());
private PersistenceManagerFactory _pmf;

public void cleanUp() {
executor.shutdown();
}

@Override
public void getInfo(PrintWriter pw) {
pw.printf("DB logging : %b\n", doDbLogging());
pw.printf("Transfer ID generated : %s\n", idGenerator == null ? "locally" : "from DB");
pw.printf("Next Transfer ID : %d\n", nextMessageID);
pw.printf("Active transfers : %d\n", _numTransfers);
@@ -337,16 +341,55 @@ public void stopTimer(long id) {
}
}

// TODO: get rid of?
public void addActiveTransfer(long id, TransferManagerHandler handler) {
_activeTransfers.put(id, handler);
if (doDbLogging()) {
PersistenceManager pm = _pmf.getPersistenceManager();
try {
Transaction tx = pm.currentTransaction();
try {
tx.begin();
pm.makePersistent(handler);
tx.commit();
LOGGER.debug("Recording new handler into database.");
} catch (Exception e) {
LOGGER.error(e.toString());
} finally {
rollbackIfActive(tx);
}
} finally {
pm.close();
}
}
}
// TODO: get rid of?

public void removeActiveTransfer(long id) {
TransferManagerHandler handler = _activeTransfers.remove(id);
if (handler == null) {
return;
}
if (doDbLogging()) {
PersistenceManager pm = _pmf.getPersistenceManager();
try {
Transaction tx = pm.currentTransaction();
TransferManagerHandlerBackup handlerBackup
= new TransferManagerHandlerBackup(handler);
try {
tx.begin();
pm.makePersistent(handler);
pm.deletePersistent(handler);
pm.makePersistent(handlerBackup);
tx.commit();
LOGGER.debug("handler removed from db");
} catch (Exception e) {
LOGGER.error(e.toString());
} finally {
rollbackIfActive(tx);
}
} finally {
pm.close();
}
}
}

public CellStub getPoolStub() {
@@ -381,10 +424,43 @@ public String getIoQueueName() {
return _ioQueueName;
}

public static void rollbackIfActive(Transaction tx) {
if (tx != null && tx.isActive()) {
tx.rollback();
}
}

public boolean doDbLogging() {
return _pmf != null;
}

public int getMaxNumberOfDeleteRetries() {
return _maxNumberOfDeleteRetries;
}

public void persist(Object o) {
if (doDbLogging()) {
PersistenceManager pm = _pmf.getPersistenceManager();
try {
Transaction tx = pm.currentTransaction();
try {
tx.begin();
pm.makePersistent(o);
tx.commit();
LOGGER.debug("[{}]: Recording new state of handler into database.",
o);
} catch (Exception e) {
LOGGER.error("[{}]: failed to persist object: {}.",
o, e.getMessage());
} finally {
rollbackIfActive(tx);
}
} finally {
pm.close();
}
}
}

@Override
public CellAddressCore getCellAddress() {
return super.getCellAddress();
@@ -399,8 +475,7 @@ public void setBilling(CellStub billingStub) {
}

@Autowired(required = false)
@Qualifier("")
public void setTransferTemplate( KafkaTemplate kafkaTemplate) {
public void setTransferTemplate(KafkaTemplate kafkaTemplate) {
_kafkaSender = kafkaTemplate::sendDefault;
}

@@ -436,6 +511,10 @@ public void setOverwrite(boolean overwrite) {
_overwrite = overwrite;
}

public void setPersistenceManagerFactory(PersistenceManagerFactory pmf) {
_pmf = pmf;
}

public void setTLogRoot(String tLogRoot) {
_tLogRoot = tLogRoot;
}
Original file line number Diff line number Diff line change
@@ -65,6 +65,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.KafkaException;
import org.springframework.kafka.core.KafkaTemplate;

public class TransferManagerHandler extends AbstractMessageCallback<Message> {

@@ -268,6 +269,7 @@ public void handle() {
}

private void sentToPnfsManager(PnfsMessage message) {
manager.persist(this);
CellStub.addCallback(manager.getPnfsManagerStub().send(message), this, executor);
}

@@ -337,6 +339,7 @@ public void success(Message message) {
sendErrorReply();
}
}
manager.persist(this);
} catch (RuntimeException e) {
LOGGER.error(
"Bug detected in transfermanager, please report this to <support@dCache.org>", e);
@@ -451,6 +454,7 @@ private void retryPoolSelection(int rc, Object error) {

public void createEntryResponseArrived(PnfsCreateEntryMessage create) {
created = true;
manager.persist(this);

fileAttributes = create.getFileAttributes();
pnfsId = create.getPnfsId();
@@ -466,12 +470,15 @@ public void createEntryResponseArrived(PnfsCreateEntryMessage create) {
}

private void getFileAttributesArrived(FileAttributes attributes) {
manager.persist(this);

fileAttributes = attributes;
info.setStorageInfo(attributes.getStorageInfo());
if (attributes.isDefined(STORAGEINFO)
&& attributes.getStorageInfo().getKey("path") != null) {
info.setBillingPath(attributes.getStorageInfo().getKey("path"));
}

selectPool();
}

@@ -483,6 +490,7 @@ private void storageInfoArrived(PnfsId id, FileAttributes attributes) {
info.setPnfsId(pnfsId);
info.setStorageInfo(attributes.getStorageInfo());
pnfsIdString = pnfsId.toString();
manager.persist(this);
if (store) {
synchronized (manager.justRequestedIDs) {
if (manager.justRequestedIDs.contains(id)) {
@@ -518,6 +526,7 @@ public void selectPool() {
request.setSubject(transferRequest.getSubject());
LOGGER.debug("PoolMgrSelectPoolMsg: {}", request);
setState(WAITING_FOR_POOL_INFO_STATE);
manager.persist(this);
CellStub.addCallback(manager.getPoolManagerStub().sendAsync(request), this, executor);
}

@@ -531,6 +540,7 @@ public void poolInfoArrived(PoolMgrSelectPoolMsg pool_info) {

setPool(pool_info.getPool());
fileAttributes = pool_info.getFileAttributes();
manager.persist(this);
LOGGER.debug("Positive reply from pool {}", pool);
startMoverOnThePool();
}
@@ -557,6 +567,7 @@ public void startMoverOnThePool() {
poolMessage.setInitiator(info.getTransaction());
poolMessage.setId(id);
setState(WAITING_FIRST_POOL_REPLY_STATE);
manager.persist(this);
CellStub.addCallback(
manager.getPoolManagerStub().startAsync(pool.getAddress(), poolMessage), this,
executor);
@@ -570,12 +581,15 @@ public void poolFirstReplyArrived(PoolIoFileMessage poolMessage) {
LOGGER.debug("Starting moverTimeout timer");
manager.startTimer(id);
setMoverId(poolMessage.getMoverId());
manager.persist(this);

}

public void deletePnfsEntry() {
if (state == RECEIVED_PNFS_CHECK_BEFORE_DELETE_STATE) {
PnfsDeleteEntryMessage pnfsMsg = new PnfsDeleteEntryMessage(pnfsPath);
setState(WAITING_FOR_PNFS_ENTRY_DELETE);
manager.persist(this);
pnfsMsg.setReplyRequired(true);
CellStub.addCallback(manager.getPnfsManagerStub().send(pnfsMsg), this, executor);
} else {
@@ -627,6 +641,7 @@ private void sendErrorReply(int replyCode, Serializable errorObject) {
sendDoorRequestInfo(replyCode, errorObject.toString());

setState(SENT_ERROR_REPLY_STATE, errorObject);
manager.persist(this);
manager.stopTimer(id);

if (store) {
@@ -669,6 +684,7 @@ private void sendErrorReply() {
sendDoorRequestInfo(replyCode, errorObject.toString());

setState(SENT_ERROR_REPLY_STATE, errorObject);
manager.persist(this);
manager.stopTimer(id);

if (store) {
@@ -703,6 +719,7 @@ public void sendSuccessReply() {
}
sendDoorRequestInfo(0, "");
setState(SENT_SUCCESS_REPLY_STATE);
manager.persist(this);
manager.stopTimer(id);
if (store) {
synchronized (manager.justRequestedIDs) {

0 comments on commit d772420

Please sign in to comment.