Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Records in the queues should be more than just a payload #2834

Closed
fulmicoton opened this issue Feb 17, 2023 · 14 comments
Closed

Records in the queues should be more than just a payload #2834

fulmicoton opened this issue Feb 17, 2023 · 14 comments
Assignees
Labels
enhancement New feature or request

Comments

@fulmicoton
Copy link
Contributor

The PushAPI relies on mrecordlog to store the documents that were ingested as is.

This is fine for the moment, but it will prevent us from further evolution...
In particular #2699 will most likely require us to append some commit Command to the queue.

We need the record in the queue to not be directly documents, but rather something closer to

enum Command<'a> {
     Ingest {
         json_payload: &'a [u8]
     },
     Commit,
     // ... more to come?
}

with an adhoc simple, versioned and extensible format.

@fulmicoton fulmicoton added the enhancement New feature or request label Feb 17, 2023
@imotov
Copy link
Collaborator

imotov commented Feb 23, 2023

I can see two driving forces that can influence how this change should be implemented. First, since the data that we handle here can be huge, it is performance sensitive. That part is pretty clear. The second aspect (that I don't quite understand the motivation behind) is having mrecordlog as a separate project. Depending on how generic we want mrecordlog to remain I can see 3 ways of implementing this change.

  1. Leave mrecordlog as is and add some logic to encode Command into &'a [u8]. This will allow mrecordlog to remain unchanged, but that will mean we will have to clone the payload twice, which is not good from the performance perspective. So, I would say this option is no go.
  2. Change mrecordlog's append_records function to accept an iterator over something like mrecordlog::record::Serializable instead of &'a [u8].
  3. Make mrecordlog aware of Commit and possibly other marker records that we might want to add to the log in the future.

Assuming that we want to keep mrecordlog generic 2) seems to be the best solution. But as any abstraction it comes at the price of mrecordlog not knowing what it is storing. So if later we will want to add a viewer to mrecordlog or make it more intelligent about that it actually stores the option 3) might be more useful, but in this case it might also make sense to consider merging mrecordlog into quickwit.

@guilload
Copy link
Member

@imotov, we're planning on switching from &[u8] to Bytes (reference counted slice of memory) in the ingest pipeline in 0.6.0, so we can still consider 1.

@imotov
Copy link
Collaborator

imotov commented Feb 23, 2023

@guilload, did you mean using bytes::Buf instead of &'a [u8] in 2)? In other words, we would change the mrecordlog interface to look something like this:

    pub async fn append_records<T: Iterator<Item = Buf>>(
        &mut self,
        queue: &str,
        position_opt: Option<u64>,
        payloads: T,
    ) -> Result<Option<u64>, AppendError> {
....
    }

    pub fn range<R>(
        &self,
        queue: &str,
        range: R,
    ) -> Option<impl Iterator<Item = (u64, impl Buf)>>
    where
        R: RangeBounds<u64> + 'static,
    {
...
    }

The interface remains backward compatible with the current implementation, so it is kind of a combination of 1) and 2). It should also simplify migration to Bytes later on.

@fulmicoton
Copy link
Contributor Author

We have plenty of contradictory needs and nice-to-have things we are considering:

  • compression (LZ4 most likely) in mem and on disk. Logs compress really well.
  • zero-copy: we would prefer not to copy data around.
  • validation: right now we ditch documents that do not match the schema at index time. This is too late. We should do that on the ingest API side. This will require some sort of JSON parsing. We could convert the message to a more machine friendly zero-copy friendly format.

@imotov
Copy link
Collaborator

imotov commented Feb 24, 2023

One way to approach it by separating the concerns.

Compression sounds like something mrecordlog should be concerned about, especially considering that logs compress really well only if you compress a block of many individual records. So, it feels like something we can benefit more from when we collect a large number of records (mrecordlog) comparing to the situation when these records are in smaller batches in flight and needs to be analyzed individually.

The validation is something quickwit should worry about. There is a couple of approaches there but at the end of the day if you want it to be stored on disk we still need to transfer it into a series of bytes, so I think concern can stay in the quickwit land.

And zero-copy is the concern of this interface. By introducing the Buf trait we can delay the actual act of serialization as far as possible, assuming that serialization always happen. If we think that some sort of pre-parsed representation of records will be needed we can add a provision to have dual representation for the records. For example, records can be stored in memory in their pre-parsed object format or serialized and deserialized into bytes for compression or on-disk storage at mrecordlog's discretion when and if needed.

@fulmicoton
Copy link
Contributor Author

I personally vote for forgetting chasing after zero-copy and stick to json for the moment.
Deserialize and validate in the rest API... and not do any conversion.

@guilload What do you think?

@guilload
Copy link
Member

We're all aligned on what we want the perfect ingest pipeline (early commit, early schema validation, zero-copy, compression, etc.) to look like by Quickwit 0.6.0 or 0.7.0. So how do we get there? I'd like to land the user-friendly features before the performance features because we have some headroom there. What we do today is not particularly clever, yet our indexing throughput is still good because the throughput of memcpy is much higher than that of the doc processor or the indexer.

So here's my proposal:

Iteration 1: early commit

  1. Receive an HTTP body of NDJSON documents -> Bytes
  2. Split the body on newline -> Iterator<Item = Bytes>
  3. Parse each doc to a JsonValue and return 400 if any doc is malformed. Yes, we throw away the JSON value.
  4. Group docs by index -> Hashmap<IndexId, Vec<Bytes>
  5. Form ingest and commit commands described above -> command header + command payload = Bytes.chain(Bytes) = Bytes
  6. Store commands in mrecordlog as &[u8]
  7. Fetch and decode commands from mrecordlog in ingest source
  8. Ingest source propagates ingest and commit commands to the indexer
  9. Indexer commits on receiving a commit command, the rest of the indexing pipeline remains unchanged

Iteration 2: early schema validation

After step 3. from iteration 1, we validate each document with the appropriate doc mapper. Yes, we throw away the tantivy::Document emitted by the doc mapper

Iteration 3: mrecordlog optimization

  • mrecordlog receives records as Bytes instead of &[u8] and no longer manages record buffers
  • Enable compression for records stored on disk

Iteration 4: parse and validate documents only once

  • We choose a better format than JSON (bincode, MessagePack, Protobuf, rkyv, etc.) to store the docs in mrecordlog. Things to consider: zero-copy vs. compression ratio, self-describing capabilities, etc. Note that picking a non-self-describing format imposes storing the schema version with the records.
  • We have yet to transform records serialized into that format into tantivy::Document which is what tantivy currently accepts as indexing input.

Iteration 5: remove the need for tantivy::Document and allocations

We've already experimented with borrowed formats (TantivyValue<'a>), but another approach could be for tantivy to drive a visitor that emits a stream of field values such as FieldValue<'a> { field: Cow<'a, [u8]>, value: Value<'a> }. That would remove most allocations.

Iteration 3 is independent.

Of course, iterations 4 and 5 require more thinking.

@fulmicoton
Copy link
Contributor Author

Ok about iteration 1 and iteration 2 with the plan with probably 4 before 3, since the validation is Index specific.
TIL I learn about Buf and Chain. That's perfect.

I don't understand Iteration 3, but I think we don't need to discuss it right now. My high level point is that I think compression and zero-copy are mutually exclusive.

Let's got with iteration 1 anyway.

imotov added a commit to imotov/quickwit that referenced this issue Mar 2, 2023
fulmicoton pushed a commit that referenced this issue Mar 2, 2023
@imotov
Copy link
Collaborator

imotov commented Mar 2, 2023

Since we are changing binary format of the data that can persist server restarts, what's your current view on the upgrade experience? I cannot find any place the format version is stored and current record is basically just a list of bytes, so theoretically it can be anything. Is handling this gracefully a valid goal?

If it is I can think of a couple of solution, but they will abuse the fact that the current record (if it is valid) will most likely start with { or some whitespace character. So, we can use these characters as a "magic" number indicating the "legacy" (current format) records and otherwise try to parse the new record format (that should probably include the version number in case you will want to change this again in the future). Alternatively, we can parse version+command byte and if this fail we can fall back to treating it as a "legacy" record. The former solution is more brittle during migration, but will help us to detect if somebody tries to downgrade after version change in the future. The later solution could cause issues on upgrade in case somebody tried to index something other than valid json, but will be more stable in long term.

@guilload
Copy link
Member

guilload commented Mar 2, 2023

Unfortunately, in Quickwit 0.5.0 and potentially 0.6.0, the file format (tantivy) will change and won't be backward compatible. Consequently, we don't need to worry about the upgrade experience for this specific feature because I suspect our users will upgrade with downtime and will reingest their data from scratch.

Starting with Quickwit 0.6.0, we want to maintain backward comparability with future versions.

@imotov
Copy link
Collaborator

imotov commented Mar 2, 2023

I see, so you have to start with empty qwdata and therefore the situation that I have described will never happen? I think it is a good opportunity to build in some versioning mechanism then to make life of whoever will work on this next easier.

Ideally, it would be also nice to build some mechanism into qwdata to prevent a downgraded server from starting on a queue that were touched by a server with a later serialization version so we only have to worry about reading older records, and consider anything we cannot recognize as a record log corruption and safely ignore without fear of losing good new records.

imotov added a commit to imotov/quickwit that referenced this issue Mar 5, 2023
Makes it possible to inject different commands such as force commit
alongside with documents into
the ingest stream.

See quickwit-oss#2834
imotov added a commit that referenced this issue Mar 10, 2023
Makes it possible to inject different commands such as force commit
alongside with documents into the ingest stream.

See #2834
@imotov
Copy link
Collaborator

imotov commented Mar 21, 2023

Should we close this issue and move Iterations 1, 2, 4 and 5 into a separate issues or a meta issue with an appropriate title?

@guilload
Copy link
Member

Yeah, that makes sense. You can open new issues.

@imotov
Copy link
Collaborator

imotov commented Mar 21, 2023

Closing this as done. We had discussed some interesting ideas on this issue related to the following other issues:

@imotov imotov closed this as completed Mar 21, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

3 participants