Skip to content

Commit

Permalink
More hooks for the stopping-harvest-in-progress functionality (#7940)
Browse files Browse the repository at this point in the history
  • Loading branch information
landreev committed Jun 15, 2022
1 parent 49424f0 commit 744fa10
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,6 @@ public void doHarvest(DataverseRequest dataverseRequest, Long harvestingClientId


List<Long> harvestedDatasetIds = null;

List<Long> harvestedDatasetIdsThisBatch = new ArrayList<Long>();

List<String> failedIdentifiers = new ArrayList<String>();
List<String> deletedIdentifiers = new ArrayList<String>();

Expand All @@ -170,37 +167,20 @@ public void doHarvest(DataverseRequest dataverseRequest, Long harvestingClientId

} else {
harvestingClientService.resetHarvestInProgress(harvestingClientId);
harvestingClientService.setHarvestInProgress(harvestingClientId, harvestStartTime);


ClientHarvestRun currentRun = harvestingClientService.setHarvestInProgress(harvestingClientId, harvestStartTime);

if (harvestingClientConfig.isOai()) {
harvestedDatasetIds = harvestOAI(dataverseRequest, harvestingClientConfig, hdLogger, importCleanupLog, harvestErrorOccurred, failedIdentifiers, deletedIdentifiers, harvestedDatasetIdsThisBatch);
harvestedDatasetIds = harvestOAI(dataverseRequest, harvestingClientConfig, currentRun.getId(), hdLogger, importCleanupLog, harvestErrorOccurred, failedIdentifiers, deletedIdentifiers);

} else {
throw new IOException("Unsupported harvest type");
}
harvestingClientService.setHarvestSuccess(harvestingClientId, new Date(), harvestedDatasetIds.size(), failedIdentifiers.size(), deletedIdentifiers.size());
hdLogger.log(Level.INFO, "COMPLETED HARVEST, server=" + harvestingClientConfig.getArchiveUrl() + ", metadataPrefix=" + harvestingClientConfig.getMetadataPrefix());
hdLogger.log(Level.INFO, "Datasets created/updated: " + harvestedDatasetIds.size() + ", datasets deleted: " + deletedIdentifiers.size() + ", datasets failed: " + failedIdentifiers.size());

// now index all the datasets we have harvested - created, modified or deleted:
/* (TODO: may not be needed at all. In Dataverse4, we may be able to get away with the normal
reindexing after every import. See the rest of the comments about batch indexing throughout
this service bean)
if (this.processedSizeThisBatch > 0) {
hdLogger.log(Level.INFO, "POST HARVEST, reindexing the remaining studies.");
if (this.harvestedDatasetIdsThisBatch != null) {
hdLogger.log(Level.INFO, this.harvestedDatasetIdsThisBatch.size()+" studies in the batch");
}
hdLogger.log(Level.INFO, this.processedSizeThisBatch + " bytes of content");
indexService.updateIndexList(this.harvestedDatasetIdsThisBatch);
hdLogger.log(Level.INFO, "POST HARVEST, calls to index finished.");
} else {
hdLogger.log(Level.INFO, "(All harvested content already reindexed)");
}
*/

harvestingClientService.setHarvestSuccess(harvestingClientId, new Date(), harvestedDatasetIds.size(), failedIdentifiers.size(), deletedIdentifiers.size());
hdLogger.log(Level.INFO, "COMPLETED HARVEST, server=" + harvestingClientConfig.getArchiveUrl() + ", metadataPrefix=" + harvestingClientConfig.getMetadataPrefix());
hdLogger.log(Level.INFO, "Datasets created/updated: " + harvestedDatasetIds.size() + ", datasets deleted: " + deletedIdentifiers.size() + ", datasets failed: " + failedIdentifiers.size());

}
//mailService.sendHarvestNotification(...getSystemEmail(), harvestingDataverse.getName(), logFileName, logTimestamp, harvestErrorOccurred.booleanValue(), harvestedDatasetIds.size(), failedIdentifiers);
} catch (Throwable e) {
harvestErrorOccurred.setValue(true);
String message = "Exception processing harvest, server= " + harvestingClientConfig.getHarvestingUrl() + ",format=" + harvestingClientConfig.getMetadataPrefix() + " " + e.getClass().getName() + " " + e.getMessage();
Expand All @@ -224,18 +204,22 @@ public void doHarvest(DataverseRequest dataverseRequest, Long harvestingClientId

/**
*
* @param dataverseRequest DataverseRequest object that will be used for imports
* @param harvestingClient the harvesting client object
* @param thisJobId the numeric id of this ongoing harvesting job
* @param hdLogger custom logger (specific to this harvesting run)
* @param importCleanupLog PrintWriter for the Cleanup Log
* @param harvestErrorOccurred have we encountered any errors during harvest?
* @param failedIdentifiers Study Identifiers for failed "GetRecord" requests
* @param failedIdentifiers Identifiers that we failed to harvest
* @param deletedIdentifiers Identifiers that the Server tagged as "deleted"
* @return List of database ids of the successfully imported datasets
*/
private List<Long> harvestOAI(DataverseRequest dataverseRequest, HarvestingClient harvestingClient, Logger hdLogger, PrintWriter importCleanupLog, MutableBoolean harvestErrorOccurred, List<String> failedIdentifiers, List<String> deletedIdentifiers, List<Long> harvestedDatasetIdsThisBatch)
private List<Long> harvestOAI(DataverseRequest dataverseRequest, HarvestingClient harvestingClient, Long thisJobId, Logger hdLogger, PrintWriter importCleanupLog, MutableBoolean harvestErrorOccurred, List<String> failedIdentifiers, List<String> deletedIdentifiers)
throws IOException, ParserConfigurationException, SAXException, TransformerException {

logBeginOaiHarvest(hdLogger, harvestingClient);

List<Long> harvestedDatasetIds = new ArrayList<Long>();
MutableLong processedSizeThisBatch = new MutableLong(0L);
List<Long> harvestedDatasetIds = new ArrayList<>();
OaiHandler oaiHandler;

try {
Expand All @@ -252,48 +236,38 @@ private List<Long> harvestOAI(DataverseRequest dataverseRequest, HarvestingClien
try {
for (Iterator<Header> idIter = oaiHandler.runListIdentifiers(); idIter.hasNext();) {

// Before each iteration, check if this harvesting job needs to be aborted:
if (false) { // (harvestingClientService.checkIfStoppingJob(thisJobId) {

}
Header h = idIter.next();
String identifier = h.getIdentifier();
Date dateStamp = h.getDatestamp();

hdLogger.info("processing identifier: " + identifier + ", date: " + dateStamp);

if (h.isDeleted()) {
hdLogger.info("Deleting harvesting dataset for " + identifier + ", per ListIdentifiers.");

deleteHarvestedDatasetIfExists(identifier, oaiHandler.getHarvestingClient().getDataverse(), dataverseRequest, deletedIdentifiers, hdLogger);
continue;
}

MutableBoolean getRecordErrorOccurred = new MutableBoolean(false);

// Retrieve and process this record with a separate GetRecord call:
Long datasetId = processRecord(dataverseRequest, hdLogger, importCleanupLog, oaiHandler, identifier, getRecordErrorOccurred, processedSizeThisBatch, deletedIdentifiers, dateStamp);
Long datasetId = processRecord(dataverseRequest, hdLogger, importCleanupLog, oaiHandler, identifier, getRecordErrorOccurred, deletedIdentifiers, dateStamp);

hdLogger.info("Total content processed in this batch so far: "+processedSizeThisBatch);
if (datasetId != null) {
harvestedDatasetIds.add(datasetId);

if ( harvestedDatasetIdsThisBatch == null ) {
harvestedDatasetIdsThisBatch = new ArrayList<Long>();
}
harvestedDatasetIdsThisBatch.add(datasetId);

}

if (getRecordErrorOccurred.booleanValue() == true) {
if (getRecordErrorOccurred.booleanValue()) {
failedIdentifiers.add(identifier);
harvestErrorOccurred.setValue(true);
//temporary:
//throw new IOException("Exception occured, stopping harvest");
}

// reindexing in batches? - this is from DVN 3;
// we may not need it anymore.
if ( processedSizeThisBatch.longValue() > INDEXING_CONTENT_BATCH_SIZE ) {

hdLogger.log(Level.INFO, "REACHED CONTENT BATCH SIZE LIMIT; calling index ("+ harvestedDatasetIdsThisBatch.size()+" datasets in the batch).");
//indexService.updateIndexList(this.harvestedDatasetIdsThisBatch);
hdLogger.log(Level.INFO, "REINDEX DONE.");


processedSizeThisBatch.setValue(0L);
harvestedDatasetIdsThisBatch = null;
}

}
} catch (OaiHandlerException e) {
throw new IOException("Failed to run ListIdentifiers: " + e.getMessage());
Expand All @@ -307,32 +281,22 @@ private List<Long> harvestOAI(DataverseRequest dataverseRequest, HarvestingClien



private Long processRecord(DataverseRequest dataverseRequest, Logger hdLogger, PrintWriter importCleanupLog, OaiHandler oaiHandler, String identifier, MutableBoolean recordErrorOccurred, MutableLong processedSizeThisBatch, List<String> deletedIdentifiers, Date dateStamp) {
String errMessage = null;
private Long processRecord(DataverseRequest dataverseRequest, Logger hdLogger, PrintWriter importCleanupLog, OaiHandler oaiHandler, String identifier, MutableBoolean recordErrorOccurred, List<String> deletedIdentifiers, Date dateStamp) {
Dataset harvestedDataset = null;
logGetRecord(hdLogger, oaiHandler, identifier);
File tempFile = null;

try {
FastGetRecord record = oaiHandler.runGetRecord(identifier);
errMessage = record.getErrorMessage();
String errorMessage = record.getErrorMessage();

if (errMessage != null) {
hdLogger.log(Level.SEVERE, "Error calling GetRecord - " + errMessage);
if (errorMessage != null) {
hdLogger.log(Level.SEVERE, "Error calling GetRecord - " + errorMessage);
recordErrorOccurred.setValue(true);
} else if (record.isDeleted()) {
hdLogger.info("Deleting harvesting dataset for "+identifier+", per the OAI server's instructions.");

Dataset dataset = datasetService.getDatasetByHarvestInfo(oaiHandler.getHarvestingClient().getDataverse(), identifier);
if (dataset != null) {
hdLogger.info("Deleting dataset " + dataset.getGlobalIdString());
datasetService.deleteHarvestedDataset(dataset, dataverseRequest, hdLogger);
// TODO:
// check the status of that Delete - see if it actually succeeded
deletedIdentifiers.add(identifier);
} else {
hdLogger.info("No dataset found for "+identifier+", skipping delete. ");
}

deleteHarvestedDatasetIfExists(identifier, oaiHandler.getHarvestingClient().getDataverse(), dataverseRequest, deletedIdentifiers, hdLogger);
} else {
hdLogger.info("Successfully retrieved GetRecord response.");

Expand All @@ -348,38 +312,30 @@ private Long processRecord(DataverseRequest dataverseRequest, Logger hdLogger, P

hdLogger.fine("Harvest Successful for identifier " + identifier);
hdLogger.fine("Size of this record: " + record.getMetadataFile().length());
processedSizeThisBatch.add(record.getMetadataFile().length());
}
} catch (Throwable e) {
logGetRecordException(hdLogger, oaiHandler, identifier, e);
errMessage = "Caught exception while executing GetRecord on "+identifier;
//logException(e, hdLogger);

recordErrorOccurred.setValue(true);
} finally {
if (tempFile != null) {
// temporary - let's not delete the temp metadata file if anything went wrong, for now:
if (errMessage == null) {
try{tempFile.delete();}catch(Throwable t){};
}
}
}

// TODO: the message below is taken from DVN3; - figure out what it means...
//
// If we got an Error from the OAI server or an exception happened during import, then
// set recordErrorOccurred to true (if recordErrorOccurred is being used)
// otherwise throw an exception (if recordErrorOccurred is not used, i.e null)

if (errMessage != null) {
if (recordErrorOccurred != null) {
recordErrorOccurred.setValue(true);
} else {
throw new EJBException(errMessage);
try{tempFile.delete();}catch(Throwable t){};
}
}

return harvestedDataset != null ? harvestedDataset.getId() : null;
}

private void deleteHarvestedDatasetIfExists(String persistentIdentifier, Dataverse harvestingDataverse, DataverseRequest dataverseRequest, List<String> deletedIdentifiers, Logger hdLogger) {
Dataset dataset = datasetService.getDatasetByHarvestInfo(harvestingDataverse, persistentIdentifier);
if (dataset != null) {
datasetService.deleteHarvestedDataset(dataset, dataverseRequest, hdLogger);
// TODO:
// check the status of that Delete - see if it actually succeeded
deletedIdentifiers.add(persistentIdentifier);
return;
}
hdLogger.info("No dataset found for " + persistentIdentifier + ", skipping delete. ");
}

private void logBeginOaiHarvest(Logger hdLogger, HarvestingClient harvestingClient) {
hdLogger.log(Level.INFO, "BEGIN HARVEST, oaiUrl="
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,20 +85,25 @@ public void resetHarvestInProgress(Long hcId) {
em.refresh(harvestingClient);
harvestingClient.setHarvestingNow(false);

// And if there is an unfinished RunResult object, we'll
// just mark it as a failure:
if (harvestingClient.getLastRun() != null
&& harvestingClient.getLastRun().isInProgress()) {
harvestingClient.getLastRun().setFailed();
// And if there is still an unfinished RunResult object, we'll
// just mark it as a failure; similarly a Job in a "stopping"
// state should be marked as "Aborted":
if (harvestingClient.getLastRun() != null) {

if (harvestingClient.getLastRun().isInProgress()) {
harvestingClient.getLastRun().setFailed();
} else if (harvestingClient.getLastRun().isStopping()) {
harvestingClient.getLastRun().markAsAborted();
}
}

}

@TransactionAttribute(TransactionAttributeType.REQUIRES_NEW)
public void setHarvestInProgress(Long hcId, Date startTime) {
public ClientHarvestRun setHarvestInProgress(Long hcId, Date startTime) {
HarvestingClient harvestingClient = em.find(HarvestingClient.class, hcId);
if (harvestingClient == null) {
return;
return null;
}
em.refresh(harvestingClient);
harvestingClient.setHarvestingNow(true);
Expand All @@ -110,6 +115,7 @@ public void setHarvestInProgress(Long hcId, Date startTime) {
currentRun.setStartTime(startTime);
currentRun.setInProgress();
harvestingClient.getRunHistory().add(currentRun);
return currentRun;
}

@TransactionAttribute(TransactionAttributeType.REQUIRES_NEW)
Expand Down Expand Up @@ -171,19 +177,23 @@ public void setHarvestSuccess(Long hcId, Date currentTime, int harvestedCount, i
if (harvestingClient == null) {
return;
}
em.refresh(harvestingClient);
//em.refresh(harvestingClient);

ClientHarvestRun currentRun = harvestingClient.getLastRun();

if (currentRun != null && currentRun.isInProgress()) {
// TODO: what if there's no current run in progress? should we just
// give up quietly, or should we make a noise of some kind? -- L.A. 4.4
if (currentRun != null) {


currentRun.setSuccess();
currentRun.setFinishTime(currentTime);
currentRun.setHarvestedDatasetCount(new Long(harvestedCount));
currentRun.setFailedDatasetCount(new Long(failedCount));
currentRun.setDeletedDatasetCount(new Long(deletedCount));

if (currentRun.isInProgress()) {
currentRun.setSuccess();
} else if (currentRun.isStopping()) {
currentRun.markAsAborted();
}
}
}

Expand All @@ -204,7 +214,21 @@ public void setHarvestFailure(Long hcId, Date currentTime) {
currentRun.setFailed();
currentRun.setFinishTime(currentTime);
}
}
}

public ClientHarvestRun refreshHarvestingJobState(ClientHarvestRun harvestingRun) {
//return em.find(ClientHarvestRun.class, jobId);
em.refresh(harvestingRun);
return harvestingRun;
}

public boolean checkIfStoppingJob(Long jobId) {
ClientHarvestRun harvestingJob = em.find(ClientHarvestRun.class, jobId);
if (harvestingJob != null) {
return harvestingJob.isStopping();
}
return false;
}

public Long getNumberOfHarvestedDatasetByClients(List<HarvestingClient> clients) {
String dvs = null;
Expand Down

0 comments on commit 744fa10

Please sign in to comment.