-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: task based archiving #659
base: main
Are you sure you want to change the base?
Conversation
5b8f59b
to
eaa21f5
Compare
server/src/main/java/com/hedera/block/server/ack/AckHandlerImpl.java
Outdated
Show resolved
Hide resolved
server/src/main/java/com/hedera/block/server/config/ServerMappedConfigSourceInitializer.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work. Extensive and detailed javadoc and comments!
server/src/main/java/com/hedera/block/server/config/ServerMappedConfigSourceInitializer.java
Outdated
Show resolved
Hide resolved
@@ -52,6 +52,9 @@ public record PersistenceStorageConfig( | |||
*/ | |||
public PersistenceStorageConfig { | |||
Objects.requireNonNull(type); | |||
if (archiveBatchSize <= 1) { | |||
throw new IllegalArgumentException("The archive group size must be greater than [1]"); | |||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we use Preconditions.requireGreaterOrEqual(2, archiveBatchSize)
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Really this should just require >0
because we also test that the batch is a power of 10 on the next line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✅ Done, the precondition is improved.
...er/src/test/java/com/hedera/block/server/config/ServerMappedConfigSourceInitializerTest.java
Outdated
Show resolved
Hide resolved
I tried attempted to start the container but it fails with |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
About 1/3 finished.
server/src/main/java/com/hedera/block/server/persistence/StreamPersistenceHandlerImpl.java
Outdated
Show resolved
Hide resolved
.../java/com/hedera/block/server/persistence/storage/archive/AsyncBlockAsLocalFileArchiver.java
Outdated
Show resolved
Hide resolved
.../java/com/hedera/block/server/persistence/storage/archive/AsyncBlockAsLocalFileArchiver.java
Outdated
Show resolved
Hide resolved
.../java/com/hedera/block/server/persistence/storage/archive/AsyncBlockAsLocalFileArchiver.java
Outdated
Show resolved
Hide resolved
// either a bug in the archiving task, or an unhandled case | ||
throw new RuntimeException(e.getCause()); | ||
} catch (final InterruptedException e) { | ||
// @todo(517) What shall we do here? How to handle? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most likely, what's here is all we need to do.
We might call something like shutdownNow on the executor, if we have a reference. Otherwise just resetting the interrupt flag for this thread should be sufficient.
.../java/com/hedera/block/server/persistence/storage/archive/AsyncBlockAsLocalFileArchiver.java
Outdated
Show resolved
Hide resolved
try (final Stream<Path> pathsToDelete = Files.walk(movedToDelete)) { | ||
pathsToDelete.sorted(Comparator.reverseOrder()).map(Path::toFile).forEach(File::delete); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A 150ms sleep (or some other mechanism to delay the removal a bit) before deleting is a wise precaution to ensure that the files and folders are all closed before we try to remove them. That's needed to reduce the risk an IOException will be thrown just because some other thread hasn't ended in time.
Perhaps a better approach would be to scan for open files/folders in the path, but that's likely to take quite a bit longer than just sleeping a little...
@@ -52,6 +52,9 @@ public record PersistenceStorageConfig( | |||
*/ | |||
public PersistenceStorageConfig { | |||
Objects.requireNonNull(type); | |||
if (archiveBatchSize <= 1) { | |||
throw new IllegalArgumentException("The archive group size must be greater than [1]"); | |||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Really this should just require >0
because we also test that the batch is a power of 10 on the next line.
/** | ||
* This type of archive does nothing, essentially acting as if it were | ||
* disabled. | ||
*/ | ||
NO_OP |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given this is just for testing, I wonder if we just remove the type configuration and only use the no-op implementation via injection in tests.
We shouldn't plan on creating more archive options and choosing between them based on configuration...
.../java/com/hedera/block/server/persistence/storage/archive/AsyncBlockAsLocalFileArchiver.java
Outdated
Show resolved
Hide resolved
f20d1af
to
6d94da8
Compare
6d94da8
to
3565aa4
Compare
- introduce a task-based archiver - the new archiver is currently part of the persistence service (this is due to change in the future and the archiver will be a standalone component, need additional infrastructure) - updates to reading logic to accommodate proper zip entry naming - currently we rely on gap between latest live written and what will be zipped for proper operation due to lack of certain infrastructure - tests for new logic - live and archive roots are created by the persistence handler Signed-off-by: Atanas Atanasov <a.v.atanasov98@gmail.com>
1b7d063
to
31544fd
Compare
- chagned zip out stream method to STORED - changed zip out stream level to NO_COMPRESSION - changed each individual zip entry method to STORED Signed-off-by: Atanas Atanasov <a.v.atanasov98@gmail.com>
- deletion of unnecessary classes/wrappers and simplification of the archivers Signed-off-by: Atanas Atanasov <a.v.atanasov98@gmail.com>
Signed-off-by: Atanas Atanasov <a.v.atanasov98@gmail.com>
Signed-off-by: Atanas Atanasov <a.v.atanasov98@gmail.com>
Signed-off-by: Atanas Atanasov <a.v.atanasov98@gmail.com>
…nd proper naming for archiver task Signed-off-by: Atanas Atanasov <a.v.atanasov98@gmail.com>
Signed-off-by: Atanas Atanasov <a.v.atanasov98@gmail.com>
@mattp-swirldslabs to answer your question, from the previous chore that was just merged in main #371, there the roots were created in two places, which was wrong, but that was to be immediately fixed with this pr. Now both roots are created in the proper place and you should be able to start it without any issues. It was a matter of rebasing. |
…ckhandler get or create from map Signed-off-by: Atanas Atanasov <a.v.atanasov98@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some additional suggestions.
Also, as a side note, when we get to where we're trying to read these files, we should look at the java zip filesystem so we can treat the zip file just like it's the folder it replaced...
/** | ||
* A simple exception class to represent an error during block archiving. | ||
*/ | ||
public final class BlockArchivingException extends RuntimeException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When creating exceptions, it is important to always include all four standard constructors:
BlockArchivingException()
BlockArchivingException(String message)
BlockArchivingException(Throwable cause)
BlockArchivingException(String message, Throwable cause)
This keeps exception throwing semantics consistent and reliable.
Preconditions.requireExactlyDivisibleBy( | ||
blockNumberThreshold, archiveGroupSize, "Block Number [%d] is required to be a positive power of 10."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test here does not match the message. You're checking divisible by the group size, but the message is about powers of 10.
private final BlockPathResolver pathResolver; | ||
private final long blockNumberThreshold; | ||
|
||
LocalGroupZipArchiveTask( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Javadoc would be helpful here to explain the difference between threshold, group size, and block number.
Also, describing the expectations for the config and path resolver would be helpful.
public Void call() { | ||
try { | ||
doArchive(); | ||
} catch (final IOException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
call
declares throws exception
in the interface; we don't need to catch the exception here and, lacking a good means of recovery, we should not catch the exception.
It will be handled in the Archiver.
return null; | ||
} | ||
|
||
private void doArchive() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As a simplification, you could just rename this to call
, make it public, and this would be correct.
One possible improvement would be to return an int with the number of files archived, and the Archiver could use that to publish a metric, and perhaps to publish an "archive complete" message as well (in the future).
final byte[] blockFileBytes = Files.readAllBytes(pathToArchive); | ||
zipEntry.setSize(blockFileBytes.length); | ||
zipEntry.setCompressedSize(blockFileBytes.length); | ||
final CRC32 crc32 = new CRC32(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have we considered using Adler32
here? It's less computationally expensive, and nearly as good.
final ZipEntry zipEntry = new ZipEntry(relativizedEntryName); | ||
LOGGER.log(Level.TRACE, "Adding Zip Entry [%s] to zip file [%s]".formatted(zipEntry, zipFilePath)); | ||
zipEntry.setMethod(ZipEntry.STORED); | ||
// @todo(517) we should put a limit to the size that could be read into memory |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should pipe the data instead of reading into memory. A stream of high TPS blocks with jumbo transactions could easily be large enough that we cannot hold it in memory safely.
I'll send you a code suggestion on Monday (it's tricky with the ZipEntry structure, but still possible).
Files.createLink(livelink, zipFilePath); | ||
} catch (final Exception e) { | ||
LOGGER.log( | ||
Level.ERROR, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't an error (which usually triggers alerts), it's more of an expected condition.
I recommend logging this at either info or trace level.
} catch (final Exception e) { | ||
LOGGER.log( | ||
Level.ERROR, | ||
"Error while creating link [%s <-> %s], attempting to create a symlink" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would avoid the word error here; it causes problems for operations.
"Error while creating link [%s <-> %s], attempting to create a symlink" | |
"Unable to create hard link [%s <-> %s], attempting to create a symbolic link" |
final Path livelink = FileUtilities.appendExtension(rootToArchive, ".zip"); | ||
try { | ||
Files.createLink(livelink, zipFilePath); | ||
} catch (final Exception e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would only catch IOException here; other exceptions, and runtime exceptions, should bubble up.
Reviewer Notes
Related Issue(s)
Closes #517