Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce log writer thread #1146

Draft
wants to merge 21 commits into
base: main
Choose a base branch
from
Draft

Introduce log writer thread #1146

wants to merge 21 commits into from

Conversation

mihirj1993
Copy link

@mihirj1993 mihirj1993 commented Dec 16, 2021

  • Introduce a log_writer thread which reads the txn_table and writes txn updates to the log.
  • The log_writer thread uses the log_writer APIs (Introduce log writer APIs (log_io.cpp, log_io.hpp) #1144) to process the txn log and generate pwrite() requests for the log file.
  • txn_commit() will signal to the log_writer thread to wake up and consume any new writes.
  • The log_writer is signaled twice: on the onset of validation to persist the txn updates and post validation to persist the txn decision.
  • The commit path will block till a txn's decision has been persisted to the log.
  • Once awoken, the log_writer thread will loop till a certain timeout (to let other txns get their writes in); this can be optimized to break out of the loop if two back to back iterations don't have any new changes.

This PR can use some refactoring (thread functionality can be moved from db_server to an alternate file. I have created a task for the same)
https://gaiaplatform.atlassian.net/browse/GAIAPLAT-1818

This codepath is currently switched off. Will be enabled when recovery is in place (PR coming up next)

auto end = start;
bool updates_exist = false;

// Run loop till there are no more updates to consume or there is a timeout.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should there also be a batch-size limit?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's implicit, see later comment "Internally writes will get logged to disk when the batch becomes full."

Specifically, what you want is in production/db/core/inc/async_disk_writer.hpp:

static constexpr size_t c_async_batch_size = 32;

This value was chosen to approximate internal write parallelism in common SSD devices (within a factor of 2-4). It might be possible to get better information on device capabilities or current queuing depth from the Linux block driver or block I/O layer, but this seemed like a reasonable starting point (assuming an SSD logging device).

ssize_t bytes_read = ::read(s_validate_persistence_batch_eventfd, &val, sizeof(val));
if (bytes_read == -1)
{
ASSERT_INVARIANT(errno == EAGAIN, "");

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (errno ==EAGAIN) {
continue;
} else {
throw a proper exception!
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I disagree; as I noted in another comment, according to the eventfd manpage, the only other value that read() can return in this context is EINVAL, which definitely indicates a programming error. (However, the assert message needs to be filled in as I indicated in that comment.)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added error message

@@ -114,6 +115,12 @@ class server_t
private:
static inline server_config_t s_server_conf{};

// TODO: Delete this once recovery/checkpointing implementation is in.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of adding all these pieces to db_server, can you instead create a separate abstraction for handling log writing and just have the server keep track of an instance of it? That should separate the main server code from the log writing component.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

@senderista senderista left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably should schedule a discussion after you've had a chance to review comments on this PR and the preceding PR.

@@ -132,6 +132,11 @@ inline void allocate_object(
gaia_offset_t object_offset = chunk_manager->allocate(size + c_db_object_header_size);
if (object_offset == c_invalid_gaia_offset)
{
if (gaia::db::get_mapped_log()->data()->chunk_count == c_max_chunks_per_txn)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not clear to me that there's any reason for the chunk_count to be persisted in the txn log itself, rather than in client-side session thread TLS or shared session state (which we need anyway for crash recovery). Do we need the chunk count in the txn log as anything but a consistency check when we extract the set of used chunks from redo offsets during a scan on the server?

In general I'd prefer to avoid storing redundant information like this in persistent structures, unless there's a compelling reason to do so for performance or simplicity.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Youre correct. i can infer the set of chunks from the txn log and move this to client tls.

@@ -114,6 +115,12 @@ class server_t
private:
static inline server_config_t s_server_conf{};

// TODO: Delete this once recovery/checkpointing implementation is in.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@@ -155,6 +172,11 @@ class server_t
thread_local static inline bool s_session_shutdown = false;
thread_local static inline int s_session_shutdown_eventfd = -1;

thread_local static inline int s_session_decision_eventfd = -1;

// Signal to persistence thread that a batch is ready to be validated.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This declaration should be moved out of the group labeled "These fields have session lifetime." since it's not thread-local and not scoped to a session.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixd

static inline std::atomic<gaia_txn_id_t> s_last_queued_commit_ts_upper_bound = c_invalid_gaia_txn_id;

// Keep a track of undecided txns submitted to the async_disk_writer.
static inline std::set<gaia_txn_id_t> s_seen_and_undecided_txn_set{};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be ordered? If not I'd prefer std::unordered_set. Also, I generally want to avoid allocation-heavy structures in the server, even though this isn't in the txn commit critical path. Why don't we discuss the requirements for this data structure further? (I think it may be worth adding a lock-free set that uses just std::array with linear search to our common library.)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This structure tracks txn commit timestamps and it enables txn decisions to be logged in commit order - so for example if txn 1 -> txn 2 -> txn 3; then these 3 decisions will be logged in order and recovery will proceed in this order.
Note that the txn data itself can be logged out of commit order.
Regarding the lock free array, how will memory reclamation work? Is it similar to how it happens in the txn array? Will this also be a shared memory array? I like the idea but I will keep out of current PR since I need to understand more details.

@@ -159,6 +164,9 @@ inline void allocate_object(
// on the server in case we crash.
gaia::db::get_mapped_log()->data()->current_chunk = new_chunk_offset;

auto& chunk = gaia::db::get_mapped_log()->data()->chunks[gaia::db::get_mapped_log()->data()->chunk_count++];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this initialization that is immediately overwritten. You aren't using the initial value of chunk anywhere, so what is the point of the initialization expression (besides incrementing chunk_count as a side effect)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I think I see, you're actually initializing a reference to the new chunk offset's location and assigning to the dereferenced location. I think that's quite unclear and you should just take the address and assign to a dereferenced pointer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Besides which, I'm not sure we need to store the set of chunks at all in the log, but if we do, why are you reconstructing it again from the offsets when you scan the log?)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The set of chunks is merely stored to retain chunk order. I have deleted this logic now.

// Mark txn as durable in metadata so we can GC the txn log.
// We only mark it durable after validation to simplify the
// state transitions:
// TXN_VALIDATING -> TXN_DECIDED -> TXN_DURABLE.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should ensure this comment is preserved in the new codepath, because the same considerations apply.

@@ -3226,6 +3507,26 @@ void server_t::run(server_config_t server_conf)
close_fd(s_server_shutdown_eventfd);
});

// To signal to the persistence thread to validate the return values of a batch of async I/O operations post batch flush.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These eventfds should be commented at their declarations, not at initialization, and both the signaling and consuming threads should be identified.

// unnecessarily - in case several session threads signal log/decision eventfd writes.
s_signal_log_write_eventfd = make_nonblocking_eventfd();

s_signal_decision_eventfd = make_nonblocking_eventfd();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This eventfd needs similar comments (assuming it is necessary--see earlier comments).

s_validate_persistence_batch_eventfd = make_nonblocking_eventfd();

// To signal to the persistence thread that new writes are available to be written.
// Semaphore semantics in this case will be correct but will lead to the thread being awake
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"awakened"


// To signal to the persistence thread that new writes are available to be written.
// Semaphore semantics in this case will be correct but will lead to the thread being awake
// unnecessarily - in case several session threads signal log/decision eventfd writes.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you be a bit more explicit? I think what you're trying to say is that with non-semaphore semantics, multiple threads could concurrently increment the eventfd to a value > 1 before it's serviced by the epoll loop, but a single read will reset the eventfd to 0, effectively debouncing the multiple notifications?

@senderista
Copy link
Contributor

Just had a crazy thought: since we only care about Linux, we should investigate whether it's feasible to use the futex(2) system call to directly block on (FUTEX_WAIT) and signal (FUTEX_WAKE) the txn metadata word itself for a given commit_ts. This would avoid needing to introduce eventfds at all to signal that the TXN_DURABLE bit has been set in the txn metadata entry for a commit_ts (and to block until that condition is true).

@senderista
Copy link
Contributor

Just had a crazy thought: since we only care about Linux, we should investigate whether it's feasible to use the futex(2) system call to directly block on (FUTEX_WAIT) and signal (FUTEX_WAKE) the txn metadata word itself for a given commit_ts. This would avoid needing to introduce eventfds at all to signal that the TXN_DURABLE bit has been set in the txn metadata entry for a commit_ts (and to block until that condition is true).

There's some discussion btw about adding futex support to io_uring for Postgres use cases: https://www.spinics.net/lists/io-uring/msg08827.html. But I think we can get by without direct futex support in io_uring.

if (txn_metadata_t::is_uninitialized_ts(ts))
{
// Simply continue; validation is responsible for sealing ts.
continue;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems incorrect: an uninitialized txn metadata entry could always be used for a commit_ts after your scan completed and you updated s_last_queued_commit_ts_upper_bound, so you would never see it in that case, right?

I think you have 2 alternatives: 1) seal all uninitialized entries during the scan, or 2) only update s_last_queued_commit_ts_upper_bound if you didn't encounter any uninitialized entries during the scan. Since 1) isn't required for correctness, and currently only validation code seals entries (which it must for correctness), I would lean toward 2). This would lead to repeated scans in rare cases, but I expect that overhead to be acceptable, and I think it's better than forcing commit_ts allocation to fail and retry due to sealed entries (which should also be rare, but it's in a critical path, unlike this code).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great catch! Will use the second solution as described. I can avoid duplicate log writes here by simply checking whether txn commit_ts is already present in s_seen_and_undecided_txn_set

Copy link
Author

@mihirj1993 mihirj1993 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed feedback. Need to test with new changes.

@@ -132,6 +132,11 @@ inline void allocate_object(
gaia_offset_t object_offset = chunk_manager->allocate(size + c_db_object_header_size);
if (object_offset == c_invalid_gaia_offset)
{
if (gaia::db::get_mapped_log()->data()->chunk_count == c_max_chunks_per_txn)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Youre correct. i can infer the set of chunks from the txn log and move this to client tls.

@@ -159,6 +164,9 @@ inline void allocate_object(
// on the server in case we crash.
gaia::db::get_mapped_log()->data()->current_chunk = new_chunk_offset;

auto& chunk = gaia::db::get_mapped_log()->data()->chunks[gaia::db::get_mapped_log()->data()->chunk_count++];
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The set of chunks is merely stored to retain chunk order. I have deleted this logic now.

@@ -132,6 +132,11 @@ inline void allocate_object(
gaia_offset_t object_offset = chunk_manager->allocate(size + c_db_object_header_size);
if (object_offset == c_invalid_gaia_offset)
{
if (gaia::db::get_mapped_log()->data()->chunk_count == c_max_chunks_per_txn)
{
throw memory_allocation_error_internal("Maximum number of chunks for this transaction has been reached.");
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -155,6 +172,11 @@ class server_t
thread_local static inline bool s_session_shutdown = false;
thread_local static inline int s_session_shutdown_eventfd = -1;

thread_local static inline int s_session_decision_eventfd = -1;

// Signal to persistence thread that a batch is ready to be validated.
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixd

@@ -409,6 +437,14 @@ class server_t

static void client_dispatch_handler(const std::string& socket_name);

static void log_writer_handler();

static void write_to_persistent_log(int64_t txn_group_timeout_us, bool sync_writes = false);
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated everywhere to constant.

if (c_use_gaia_log_implementation)
{
// Signal to the persistence thread to write txn log to disk.
eventfd_write(s_signal_log_write_eventfd, static_cast<eventfd_t>(commit_ts));
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you're right. I didn't need to use the commit ts; I have added a comment to point out that it is arbitrary.


// Keep track of every chunk used in a transaction. This helps retain the order in which chunks are
// assigned to a txn; with chunk reuse they can be assigned out of order.
auto& chunk = s_log.data()->chunks[s_log.data()->chunk_count++];
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've removed additional fields from the txn_log_record.

I now use top sort to obtain chunk order from the txn log.

@@ -82,6 +83,10 @@ class persistent_store_manager
*/
void destroy_persistent_store();

void init_log(const std::string& data_dir, const int validate_persistence_batch_eventfd);
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will fix in recovery PR.

// TXN_VALIDATING -> TXN_DECIDED -> TXN_DURABLE.
txn_metadata_t::set_txn_durable(commit_ts);
// Signal to the persistence thread to write txn decision to disk.
// Use another decision fd to not lose events.
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The commit path signals the log_writer thread twice

  1. Post submit_txn()
  2. Post validation

There is a possibility that the first scan of the txn table does not include the txn decision. After scanning all txn entries the log_writer thread will block in epoll_wait. This requires a second write to awaken the log_writer thread and process any decisions that are now ready.

txn_metadata_t::set_txn_durable(commit_ts);
// Signal to the persistence thread to write txn decision to disk.
// Use another decision fd to not lose events.
eventfd_write(s_signal_decision_eventfd, static_cast<eventfd_t>(commit_ts));
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was arbitrary. Have added a comment for the same.

@LaurentiuCristofor LaurentiuCristofor marked this pull request as draft March 23, 2022 22:10
Base automatically changed from log_writer_apis to master May 17, 2022 02:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants