Skip to content

Commit

Permalink
storage-mgr: synchro copy - no unlock after timeout of a passive thread
Browse files Browse the repository at this point in the history
  • Loading branch information
DenChaykovskiy committed May 8, 2024
1 parent 6fa4498 commit 933667a
Showing 1 changed file with 160 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,30 +74,15 @@ public ResponseEntity<RestFileInfo> getRestFileInfoByPathInfo(String pathInfo) {
return new ResponseEntity<>(http.errorHeaders(msg), HttpStatus.BAD_REQUEST);
}

// 1. copy Storage -> Cache
// 1. copy Storage -> Cache (synchronized)
// 2. add to cache cache.put(cache file)
// pathInfo is absolute path s3://bucket/.. or /storagePath/..
// Info: pathInfo is absolute path s3://bucket/.. or /storagePath/..

String absoluteStoragePath = pathInfo;

StorageFile cacheFile;
try {

String relativePath = storageProvider.getRelativePath(absoluteStoragePath);
cacheFile = storageProvider.getCacheFile(relativePath);

} catch (IOException e) {

String msg = logger.log(StorageMgrMessage.PRODUCT_FILE_CANNOT_BE_DOWNLOADED, e.getMessage());
return new ResponseEntity<>(http.errorHeaders(msg), HttpStatus.BAD_REQUEST);
}

StorageFileLocker fileLocker = new StorageFileLocker(cacheFile.getFullPath(), cfg.getFileCheckWaitTime(),
cfg.getFileCheckMaxCycles());

try {

RestFileInfo restFileInfo = copyFileStorageToCache(absoluteStoragePath, fileLocker);
RestFileInfo restFileInfo = copyStorageFileToCache(absoluteStoragePath); // synchronized
return new ResponseEntity<>(restFileInfo, HttpStatus.OK);

} catch (FileLockedAfterMaxCyclesException e) {
Expand All @@ -116,10 +101,6 @@ public ResponseEntity<RestFileInfo> getRestFileInfoByPathInfo(String pathInfo) {

String msg = logger.log(StorageMgrMessage.INTERNAL_ERROR, e.getMessage());
return new ResponseEntity<>(http.errorHeaders(msg), HttpStatus.INTERNAL_SERVER_ERROR);

} finally {

fileLocker.unlock();
}
}

Expand All @@ -129,16 +110,18 @@ public ResponseEntity<RestFileInfo> getRestFileInfoByPathInfo(String pathInfo) {
*
* @param pathInfo Source file name
* @param productId Product id
* @return Target file name
* @param fileSize File Size
* @return RestFileInfo Rest File Info
* @throws Exception Internal Server Error
*/
@Override
public ResponseEntity<RestFileInfo> updateProductfiles(String pathInfo, Long productId, Long fileSize) {

if (logger.isTraceEnabled())
logger.trace(">>> updateProductfiles({}, {})", pathInfo, productId);
logger.trace(">>> updateProductfiles({}, {}, {})", pathInfo, productId, fileSize);

// copies absolute external file -> cache file -> storage file
// 1. copy external -> cache
// 1. copy external -> cache (synchronized)
// 2. add to cache cache.put(cache file)
// 3. copy cache -> storage
// pathInfo is absolute external path
Expand All @@ -150,17 +133,12 @@ public ResponseEntity<RestFileInfo> updateProductfiles(String pathInfo, Long pro
String externalPath = pathInfo;

String relativePath = getProductFolderWithFilename(externalPath, productId);
StorageFile cacheFile = storageProvider.getCacheFile(relativePath);

StorageFileLocker fileLocker = new StorageFileLocker(cacheFile.getFullPath(), cfg.getFileCheckWaitTime(),
cfg.getFileCheckMaxCycles());

try {

RestFileInfo restFileInfo = copyFileExternalToCache(externalPath, productId, fileSize, fileLocker);
fileLocker.unlock();
RestFileInfo restFileInfo = copyExternalFileToCache(externalPath, productId); // synchronized

restFileInfo = copyFileCacheToStorage(relativePath);
restFileInfo = copyCacheFileToStorage(relativePath);

logger.log(StorageMgrMessage.PRODUCT_FILE_UPLOADED_TO_STORAGE, externalPath, productId);
return new ResponseEntity<>(restFileInfo, HttpStatus.CREATED);
Expand All @@ -169,11 +147,49 @@ public ResponseEntity<RestFileInfo> updateProductfiles(String pathInfo, Long pro

String msg = logger.log(StorageMgrMessage.INTERNAL_ERROR, e.getMessage());
return new ResponseEntity<>(http.errorHeaders(msg), HttpStatus.INTERNAL_SERVER_ERROR);
}
}

} finally {
/**
* Checks if the file is in the cache. If yes (in the cache), returns the file
* from the cache, no copy needed. If no, copies the file from the storage to
* the cache using synchronization. During the copying to the cache, the status
* of the file will be "not exists", after the completion the status will be set
* to "ready". Sets the file permission 444.
*
* @param absoluteStorageFilePath the file path in the storage
* @return RestFileInfo
* @throws FileLockedAfterMaxCyclesException
* @throws IOException
* @throws Exception
*/
private RestFileInfo copyStorageFileToCache(String absoluteStorageFilePath)
throws FileLockedAfterMaxCyclesException, IOException, Exception {

fileLocker.unlock();
if (logger.isTraceEnabled())
logger.trace(">>> copyFileStorageToCache({})", absoluteStorageFilePath);

// relative path depends on path, not on actual storage
String relativePath = storageProvider.getRelativePath(absoluteStorageFilePath);

StorageFile storageFile = storageProvider.getStorageFile(relativePath);
StorageFile cacheFile = storageProvider.getCacheFile(storageFile.getRelativePath());

FileCache cache = FileCache.getInstance();

if (!cache.containsKey(cacheFile.getFullPath())) {

synchroCopyStorageFileToCache(storageFile, cacheFile); // synchronized

} else {

logger.debug("... no download and no lock - the file is in cache: {}", cacheFile.getFullPath());
}

RestFileInfo restFileInfo = convertToRestFileInfo(cacheFile,
storageProvider.getCacheFileSize(storageFile.getRelativePath()));

return restFileInfo;
}

/**
Expand All @@ -182,64 +198,108 @@ public ResponseEntity<RestFileInfo> updateProductfiles(String pathInfo, Long pro
* the completion the status will be set to "ready". Sets the file permission
* 444.
*
* @param storageFilePath the file path in the storage
* @param fileLocker file locker is used for synchronization
* @return RestFileInfo
* @param srcStorageFile Source Storage File
* @param destCacheFile Destination Cache File
* @throws FileLockedAfterMaxCyclesException
* @throws IOException
* @throws Exception
*/
private RestFileInfo copyFileStorageToCache(String storageFilePath, StorageFileLocker fileLocker)
private void synchroCopyStorageFileToCache(StorageFile srcStorageFile, StorageFile destCacheFile)
throws FileLockedAfterMaxCyclesException, IOException, Exception {

if (logger.isTraceEnabled())
logger.trace(">>> copyFileStorageToCache({}, {})", storageFilePath, fileLocker);

// x-to-cache-copy method, status "not exists" is used
logger.trace(">>> synchroCopyFileStorageToCache({}, {})", srcStorageFile, destCacheFile);

// relative path depends on path, not on actual storage
String relativePath = storageProvider.getRelativePath(storageFilePath);
// synchronized x-to-cache-copy method, status "not exists" is used

StorageFile storageFile = storageProvider.getStorageFile(relativePath);
StorageFile cacheFile = storageProvider.getCacheFile(storageFile.getRelativePath());
StorageFileLocker fileLocker = new StorageFileLocker(destCacheFile.getFullPath(), cfg.getFileCheckWaitTime(),
cfg.getFileCheckMaxCycles());

FileCache cache = FileCache.getInstance();

if (!cache.containsKey(cacheFile.getFullPath())) {
fileLocker.lockOrWaitUntilUnlockedAndLock();

fileLocker.lockOrWaitUntilUnlockedAndLock();
// check again, the file could be copied to cache from another thread after lock
if (!cache.containsKey(destCacheFile.getFullPath())) {

// check again, the file could be copied to cache from another thread after lock
if (!cache.containsKey(cacheFile.getFullPath())) {
try {

// active thread - copies the file to the cache storage and puts it to the cache
logger.debug("... active-thread: copies the file to the cache storage and puts it to the cache: {}", cacheFile.getFullPath());
logger.debug("... active-thread: copies the file to the cache storage and puts it to the cache: {}",
destCacheFile.getFullPath());

cache.setCacheFileStatus(cacheFile.getFullPath(), CacheFileStatus.INCOMPLETE);
cache.setCacheFileStatus(destCacheFile.getFullPath(), CacheFileStatus.INCOMPLETE);

storageProvider.getStorage().downloadFile(storageFile, cacheFile);
storageProvider.getStorage().downloadFile(srcStorageFile, destCacheFile);

logger.log(StorageMgrMessage.PRODUCT_FILE_DOWNLOADED_FROM_STORAGE, cacheFile.getFullPath());
logger.log(StorageMgrMessage.PRODUCT_FILE_DOWNLOADED_FROM_STORAGE, destCacheFile.getFullPath());

Files.setPosixFilePermissions(Paths.get(cacheFile.getFullPath()),
Files.setPosixFilePermissions(Paths.get(destCacheFile.getFullPath()),
PosixFilePermissions.fromString(READ_ONLY_FOR_ALL_USERS));

cache.put(cacheFile.getFullPath()); // cache file status = READY
cache.put(destCacheFile.getFullPath()); // cache file status = READY

} catch (Exception e) {

} else {
throw e;

// passive thread - did nothing, waited for copied file and use it from cache
logger.debug("... waiting-thread: waited until the file was downloaded to cache from external storage and use it from cache: {}",
cacheFile.getFullPath());
} finally {

fileLocker.unlock();
}

} else {

logger.debug("... no download and no lock - the file is in cache: {}", cacheFile.getFullPath());
// passive thread - did nothing, waited for copied file and use it from cache.
// In case of FileLockedAfterMaxCyclesException from this waiting thread the
// file is locked not by this thread and thats's why will be not unlocked.

logger.debug(
"... waiting-thread: waited until the file was downloaded to cache from external storage and use it from cache: {}",
destCacheFile.getFullPath());

fileLocker.unlock();
}
}

RestFileInfo restFileInfo = convertToRestFileInfo(cacheFile,
storageProvider.getCacheFileSize(storageFile.getRelativePath()));
/**
* Checks if the file is in the cache. If yes (in the cache), returns the file
* from the cache, no copy needed. If no, copies the file from the external
* source to the cache using synchronization. During the copying to the cache,
* the status of the file will be "not exists", after the completion the status
* will be set to "ready". Sets the file permission 444.
*
* @param srcExternalPath external absolute path of the file, which will be copied to the
* cache
* @param productId product id is used as a directory to store copied file in
* cache
* @return Rest File Info
* @throws FileLockedAfterMaxCyclesException
* @throws IOException
* @throws Exception
*/
private RestFileInfo copyExternalFileToCache(String srcExternalPath, Long productId)
throws FileLockedAfterMaxCyclesException, IOException, Exception {

if (logger.isTraceEnabled())
logger.trace(">>> copyFileExternalToCache({}, {}, {}, {})", srcExternalPath, productId);

String productFolderWithFilename = getProductFolderWithFilename(srcExternalPath, productId);
StorageFile destCacheFile = storageProvider.getCacheFile(productFolderWithFilename);

FileCache cache = FileCache.getInstance();

if (!cache.containsKey(destCacheFile.getFullPath())) {

synchroCopyExternalFileToCache(srcExternalPath, productId, destCacheFile); // synchronized

} else {

logger.debug("... no download and no lock - the file is in cache: {}", destCacheFile.getFullPath());
}

RestFileInfo restFileInfo = convertToRestFileInfo(destCacheFile,
storageProvider.getCacheFileSize(destCacheFile.getRelativePath()));

return restFileInfo;
}
Expand All @@ -250,69 +310,72 @@ private RestFileInfo copyFileStorageToCache(String storageFilePath, StorageFileL
* after the completion the status will be set to "ready". Sets the file
* permission 444.
*
* @param externalPath external path of the file, which will be copied to the
* cache
* @param productId product id is used as a directory to store copied file in
* cache
* @param fileSize file size
* @param fileLocker file locker is used for synchronization
* @return Rest File Info
* @param externalPath external path of the file, which will be copied to the
* cache
* @param productId product id is used as a directory to store copied file
* in cache
* @param destCacheFile Information about destination cache file
* @throws FileLockedAfterMaxCyclesException
* @throws IOException
* @throws Exception
*/
private RestFileInfo copyFileExternalToCache(String externalPath, Long productId, Long fileSize,
StorageFileLocker fileLocker) throws FileLockedAfterMaxCyclesException, IOException, Exception {
private void synchroCopyExternalFileToCache(String srcExternalPath, Long productId, StorageFile destCacheFile)
throws FileLockedAfterMaxCyclesException, IOException, Exception {

if (logger.isTraceEnabled())
logger.trace(">>> copyFileExternalToCache({}, {}, {}, {})", externalPath, productId, fileSize, fileLocker);
logger.trace(">>> synchroCopyFileExternalToCache({}, {}, {})", srcExternalPath, productId, destCacheFile);

// x-to-cache-copy method, status "not exists" is used

String productFolderWithFilename = getProductFolderWithFilename(externalPath, productId);
StorageFile cacheFile = storageProvider.getCacheFile(productFolderWithFilename);
// synchronized x-to-cache-copy method, status "not exists" is used

FileCache cache = FileCache.getInstance();

if (!cache.containsKey(cacheFile.getFullPath())) {
StorageFileLocker fileLocker = new StorageFileLocker(destCacheFile.getFullPath(), cfg.getFileCheckWaitTime(),
cfg.getFileCheckMaxCycles());

fileLocker.lockOrWaitUntilUnlockedAndLock();

fileLocker.lockOrWaitUntilUnlockedAndLock();
// check again, the file could be copied to cache from another thread after lock
if (!cache.containsKey(destCacheFile.getFullPath())) {

// check again, the file could be copied to cache from another thread after lock
if (!cache.containsKey(cacheFile.getFullPath())) {
try {

// active thread - copies the file to the cache storage and puts it to the cache
logger.debug("... active-thread: copies the file to the cache storage and puts it to the cache: {}",
cacheFile.getFullPath());
destCacheFile.getFullPath());

cache.setCacheFileStatus(cacheFile.getFullPath(), CacheFileStatus.INCOMPLETE);
cache.setCacheFileStatus(destCacheFile.getFullPath(), CacheFileStatus.INCOMPLETE);

storageProvider.copyAbsoluteFilesToCache(externalPath, productId);
storageProvider.copyAbsoluteFilesToCache(srcExternalPath, productId);

logger.log(StorageMgrMessage.PRODUCT_FILE_DOWNLOADED_FROM_EXTERNAL_TO_CACHE, cacheFile.getFullPath());
logger.log(StorageMgrMessage.PRODUCT_FILE_DOWNLOADED_FROM_EXTERNAL_TO_CACHE,
destCacheFile.getFullPath());

Files.setPosixFilePermissions(Paths.get(cacheFile.getFullPath()),
Files.setPosixFilePermissions(Paths.get(destCacheFile.getFullPath()),
PosixFilePermissions.fromString(READ_ONLY_FOR_ALL_USERS));

cache.put(cacheFile.getFullPath()); // cache file status = READY
cache.put(destCacheFile.getFullPath()); // cache file status = READY

} else {
} catch (Exception e) {

// passive thread - did nothing, waited for copied file and use it from cache
logger.debug(
"... waiting-thread: waited until the file was downloaded to cache from external storage and use it from cache: {}",
cacheFile.getFullPath());
throw e;

} finally {

fileLocker.unlock();
}

} else {

logger.debug("... no download and no lock - the file is in cache: {}", cacheFile.getFullPath());
}
// passive thread - did nothing, waited for copied file and use it from cache.
// In case of FileLockedAfterMaxCyclesException from this waiting thread the
// file is locked not by this thread and thats's why will be not unlocked.

RestFileInfo restFileInfo = convertToRestFileInfo(cacheFile,
storageProvider.getCacheFileSize(cacheFile.getRelativePath()));
logger.debug(
"... waiting-thread: waited until the file was downloaded to cache from external storage and use it from cache: {}",
destCacheFile.getFullPath());

return restFileInfo;
fileLocker.unlock();
}
}

/**
Expand All @@ -323,7 +386,7 @@ private RestFileInfo copyFileExternalToCache(String externalPath, Long productId
* @throws IOException
* @throws Exception
*/
private RestFileInfo copyFileCacheToStorage(String relativeCachePath)
private RestFileInfo copyCacheFileToStorage(String relativeCachePath)
throws FileLockedAfterMaxCyclesException, IOException, Exception {

if (logger.isTraceEnabled())
Expand Down

0 comments on commit 933667a

Please sign in to comment.