|
| 1 | +From 87e178e98713539ac5e543c34520cc7aca3d8c03 Mon Sep 17 00:00:00 2001 |
| 2 | +From: Lars Francke <git@lars-francke.de> |
| 3 | +Date: Mon, 20 Oct 2025 08:36:40 +0200 |
| 4 | +Subject: Fix problem where multi-chars delimiters fail to be parsed if they |
| 5 | + happen right at a buffer boundary. |
| 6 | + |
| 7 | +This can be removed when we use the version of Vector that includes this |
| 8 | +fix. |
| 9 | +See: https://github.com/vectordotdev/vector/pull/24028 |
| 10 | + |
| 11 | +NOTE: async/await removed from the patch, because async is added after |
| 12 | +0.49.0. |
| 13 | +--- |
| 14 | + lib/file-source/src/buffer.rs | 209 ++++++++++++++++++++++++++++++++-- |
| 15 | + 1 file changed, 202 insertions(+), 7 deletions(-) |
| 16 | + |
| 17 | +diff --git a/lib/file-source/src/buffer.rs b/lib/file-source/src/buffer.rs |
| 18 | +index 55dd48133..1ef892aaf 100644 |
| 19 | +--- a/lib/file-source/src/buffer.rs |
| 20 | ++++ b/lib/file-source/src/buffer.rs |
| 21 | +@@ -48,6 +48,12 @@ pub fn read_until_with_max_size<'a, R: BufRead + ?Sized>( |
| 22 | + let delim_finder = Finder::new(delim); |
| 23 | + let delim_len = delim.len(); |
| 24 | + let mut discarded_for_size_and_truncated = Vec::new(); |
| 25 | ++ // The following line from the upstream change is only needed once the funciton becomes async |
| 26 | ++ // let mut reader = Box::new(reader); |
| 27 | ++ |
| 28 | ++ // Track partial delimiter matches across buffer boundaries |
| 29 | ++ let mut partial_delim: BytesMut = BytesMut::with_capacity(delim_len); |
| 30 | ++ |
| 31 | + loop { |
| 32 | + let available: &[u8] = match reader.fill_buf() { |
| 33 | + Ok(n) => n, |
| 34 | +@@ -55,6 +61,35 @@ pub fn read_until_with_max_size<'a, R: BufRead + ?Sized>( |
| 35 | + Err(e) => return Err(e), |
| 36 | + }; |
| 37 | + |
| 38 | ++ // First, check if we have a partial delimiter from the previous iteration |
| 39 | ++ if !partial_delim.is_empty() { |
| 40 | ++ let expected_suffix = &delim[partial_delim.len()..]; |
| 41 | ++ let expected_suffix_len = expected_suffix.len(); |
| 42 | ++ |
| 43 | ++ if available.len() >= expected_suffix_len |
| 44 | ++ && &available[..expected_suffix_len] == expected_suffix |
| 45 | ++ { |
| 46 | ++ // Complete delimiter found! Consume the suffix |
| 47 | ++ reader.consume(expected_suffix_len); |
| 48 | ++ *position += expected_suffix_len as u64; |
| 49 | ++ total_read += expected_suffix_len; |
| 50 | ++ partial_delim.clear(); |
| 51 | ++ |
| 52 | ++ // Found a complete delimiter, return the current buffer |
| 53 | ++ return Ok(ReadResult { |
| 54 | ++ successfully_read: Some(total_read), |
| 55 | ++ discarded_for_size_and_truncated, |
| 56 | ++ }); |
| 57 | ++ } else { |
| 58 | ++ // Not a complete delimiter after all. Add partial_delim to output buffer |
| 59 | ++ if !discarding { |
| 60 | ++ buf.extend_from_slice(&partial_delim); |
| 61 | ++ } |
| 62 | ++ partial_delim.clear(); |
| 63 | ++ // Continue processing current available buffer |
| 64 | ++ } |
| 65 | ++ } |
| 66 | ++ |
| 67 | + let (done, used) = { |
| 68 | + match delim_finder.find(available) { |
| 69 | + Some(i) => { |
| 70 | +@@ -64,13 +99,47 @@ pub fn read_until_with_max_size<'a, R: BufRead + ?Sized>( |
| 71 | + (true, i + delim_len) |
| 72 | + } |
| 73 | + None => { |
| 74 | +- if !discarding { |
| 75 | +- buf.extend_from_slice(available); |
| 76 | ++ // No delimiter found in current buffer. Check if buffer ends with a |
| 77 | ++ // partial delimiter match. For multi-byte delimiters like \r\n, we need |
| 78 | ++ // to handle the case where the delimiter is split across buffer boundaries. |
| 79 | ++ let mut partial_match_len = 0; |
| 80 | ++ if !available.is_empty() && delim_len > 1 { |
| 81 | ++ // Check if the end matches a prefix of the delimiter. |
| 82 | ++ // We iterate from longest to shortest prefix and break on first match. |
| 83 | ++ // Performance: For typical 2-byte delimiters (CRLF), this is 1 iteration. |
| 84 | ++ // For longer delimiters, this runs O(delim_len) times but only occurs |
| 85 | ++ // at buffer boundaries (~every 8KB), making the impact negligible. |
| 86 | ++ for prefix_len in (1..delim_len).rev() { |
| 87 | ++ if available.len() >= prefix_len |
| 88 | ++ && available.ends_with(&delim[..prefix_len]) |
| 89 | ++ { |
| 90 | ++ partial_match_len = prefix_len; |
| 91 | ++ break; |
| 92 | ++ } |
| 93 | ++ } |
| 94 | + } |
| 95 | ++ |
| 96 | ++ let bytes_to_copy = available.len() - partial_match_len; |
| 97 | ++ |
| 98 | ++ if !discarding && bytes_to_copy > 0 { |
| 99 | ++ buf.extend_from_slice(&available[..bytes_to_copy]); |
| 100 | ++ } |
| 101 | ++ |
| 102 | ++ // If we found a potential partial delimiter, save it for the next iteration |
| 103 | ++ if partial_match_len > 0 { |
| 104 | ++ partial_delim.clear(); |
| 105 | ++ partial_delim.extend_from_slice(&available[bytes_to_copy..]); |
| 106 | ++ } |
| 107 | ++ |
| 108 | + (false, available.len()) |
| 109 | + } |
| 110 | + } |
| 111 | + }; |
| 112 | ++ |
| 113 | ++ // Check if we're at EOF before we start processing |
| 114 | ++ // (for borrow checker, has to come before `consume`) |
| 115 | ++ let at_eof = available.is_empty(); |
| 116 | ++ |
| 117 | + reader.consume(used); |
| 118 | + *position += used as u64; // do this at exactly same time |
| 119 | + total_read += used; |
| 120 | +@@ -94,11 +163,12 @@ pub fn read_until_with_max_size<'a, R: BufRead + ?Sized>( |
| 121 | + discarding = false; |
| 122 | + buf.clear(); |
| 123 | + } |
| 124 | +- } else if used == 0 { |
| 125 | +- // We've hit EOF but not yet seen a newline. This can happen when unlucky timing causes |
| 126 | +- // us to observe an incomplete write. We return None here and let the loop continue |
| 127 | +- // next time the method is called. This is safe because the buffer is specific to this |
| 128 | +- // FileWatcher. |
| 129 | ++ } else if used == 0 && at_eof { |
| 130 | ++ // We've hit EOF but haven't seen a delimiter. This can happen when: |
| 131 | ++ // 1. The file ends without a trailing delimiter |
| 132 | ++ // 2. We're observing an incomplete write |
| 133 | ++ // |
| 134 | ++ // Return None to signal the caller to retry later. |
| 135 | + return Ok(ReadResult { |
| 136 | + successfully_read: None, |
| 137 | + discarded_for_size_and_truncated, |
| 138 | +@@ -253,4 +323,129 @@ mod test { |
| 139 | + .max_tests(2_000) |
| 140 | + .quickcheck(qc_inner as fn(Vec<Vec<u8>>, u8, NonZeroU8) -> TestResult); |
| 141 | + } |
| 142 | ++ |
| 143 | ++ /// Generic test helper that tests delimiter splits across buffer boundaries |
| 144 | ++ /// for any delimiter length. This function: |
| 145 | ++ /// 1. Creates test data with delimiters positioned to split at buffer boundaries |
| 146 | ++ /// 2. Tests multiple iterations to ensure state tracking works correctly |
| 147 | ++ /// 3. Verifies all lines are correctly separated without merging |
| 148 | ++ fn test_delimiter_boundary_split_helper(delimiter: &[u8], num_lines: usize) { |
| 149 | ++ let delimiter_len = delimiter.len(); |
| 150 | ++ |
| 151 | ++ // Use a buffer capacity that will force splits |
| 152 | ++ // We'll position delimiters to split at this boundary |
| 153 | ++ let buffer_capacity = 10; |
| 154 | ++ |
| 155 | ++ println!( |
| 156 | ++ "\n=== Testing delimiter: {:?} (length: {}) ===", |
| 157 | ++ String::from_utf8_lossy(delimiter), |
| 158 | ++ delimiter_len |
| 159 | ++ ); |
| 160 | ++ println!("Buffer capacity: {} bytes", buffer_capacity); |
| 161 | ++ |
| 162 | ++ // Build test data where each delimiter is positioned to split across buffer boundary |
| 163 | ++ // Strategy: For each line, calculate position so delimiter starts at boundary - (delimiter_len - 1) |
| 164 | ++ let mut data = Vec::new(); |
| 165 | ++ let mut expected_lines = Vec::new(); |
| 166 | ++ |
| 167 | ++ for i in 0..num_lines { |
| 168 | ++ // Create line content that positions the delimiter to split at buffer boundary |
| 169 | ++ // We want the delimiter to straddle a buffer_capacity boundary |
| 170 | ++ |
| 171 | ++ // Calculate how many bytes until the next buffer boundary |
| 172 | ++ let current_pos = data.len(); |
| 173 | ++ let bytes_until_boundary = buffer_capacity - (current_pos % buffer_capacity); |
| 174 | ++ |
| 175 | ++ // Create line content that will position delimiter to split |
| 176 | ++ // We want (delimiter_len - 1) bytes before boundary, then 1 byte after |
| 177 | ++ let line_content = if bytes_until_boundary > delimiter_len { |
| 178 | ++ let content_len = bytes_until_boundary - (delimiter_len - 1); |
| 179 | ++ format!("line{:0width$}", i, width = content_len.saturating_sub(4)).into_bytes() |
| 180 | ++ } else { |
| 181 | ++ // Not enough room in this buffer, pad to next boundary |
| 182 | ++ let padding = bytes_until_boundary; |
| 183 | ++ let extra_content = buffer_capacity - (delimiter_len - 1); |
| 184 | ++ let mut content = vec![b'X'; padding]; |
| 185 | ++ content.extend_from_slice( |
| 186 | ++ format!("L{:0width$}", i, width = extra_content.saturating_sub(1)).as_bytes(), |
| 187 | ++ ); |
| 188 | ++ content |
| 189 | ++ }; |
| 190 | ++ |
| 191 | ++ println!( |
| 192 | ++ "Line {}: '{}' (len: {}, data pos: {})", |
| 193 | ++ i, |
| 194 | ++ String::from_utf8_lossy(&line_content), |
| 195 | ++ line_content.len(), |
| 196 | ++ current_pos |
| 197 | ++ ); |
| 198 | ++ |
| 199 | ++ expected_lines.push(line_content.clone()); |
| 200 | ++ data.extend_from_slice(&line_content); |
| 201 | ++ data.extend_from_slice(delimiter); |
| 202 | ++ } |
| 203 | ++ |
| 204 | ++ println!("Total test data size: {} bytes\n", data.len()); |
| 205 | ++ |
| 206 | ++ // Now test reading this data |
| 207 | ++ let cursor = Cursor::new(data); |
| 208 | ++ let mut reader = BufReader::with_capacity(buffer_capacity, cursor); |
| 209 | ++ let mut position = 0; |
| 210 | ++ let max_size = 1024; |
| 211 | ++ |
| 212 | ++ // Read each line and verify it matches expected |
| 213 | ++ for (i, expected_line) in expected_lines.iter().enumerate() { |
| 214 | ++ let mut buffer = BytesMut::new(); |
| 215 | ++ let result = read_until_with_max_size( |
| 216 | ++ Box::pin(&mut reader), |
| 217 | ++ &mut position, |
| 218 | ++ delimiter, |
| 219 | ++ &mut buffer, |
| 220 | ++ max_size, |
| 221 | ++ ) |
| 222 | ++ .unwrap(); |
| 223 | ++ |
| 224 | ++ assert_eq!( |
| 225 | ++ buffer.as_ref(), |
| 226 | ++ expected_line.as_slice(), |
| 227 | ++ "Line {} should match expected content. Got: {:?}, Expected: {:?}", |
| 228 | ++ i, |
| 229 | ++ String::from_utf8_lossy(&buffer), |
| 230 | ++ String::from_utf8_lossy(expected_line) |
| 231 | ++ ); |
| 232 | ++ |
| 233 | ++ assert!( |
| 234 | ++ result.successfully_read.is_some(), |
| 235 | ++ "Should find delimiter for line {}", |
| 236 | ++ i |
| 237 | ++ ); |
| 238 | ++ } |
| 239 | ++ } |
| 240 | ++ |
| 241 | ++ #[test] |
| 242 | ++ fn test_single_byte_delimiter_boundary() { |
| 243 | ++ // Test single-byte delimiter (should work without any special handling) |
| 244 | ++ test_delimiter_boundary_split_helper(b"\n", 5); |
| 245 | ++ } |
| 246 | ++ |
| 247 | ++ #[test] |
| 248 | ++ fn test_two_byte_delimiter_boundary() { |
| 249 | ++ // Test two-byte delimiter (CRLF case) |
| 250 | ++ test_delimiter_boundary_split_helper(b"\r\n", 5); |
| 251 | ++ } |
| 252 | ++ |
| 253 | ++ #[test] |
| 254 | ++ fn test_three_byte_delimiter_boundary() { |
| 255 | ++ test_delimiter_boundary_split_helper(b"|||", 5); |
| 256 | ++ } |
| 257 | ++ |
| 258 | ++ #[test] |
| 259 | ++ fn test_four_byte_delimiter_boundary() { |
| 260 | ++ test_delimiter_boundary_split_helper(b"<|>|", 5); |
| 261 | ++ } |
| 262 | ++ |
| 263 | ++ #[test] |
| 264 | ++ fn test_five_byte_delimiter_boundary() { |
| 265 | ++ test_delimiter_boundary_split_helper(b"<<>>>", 5); |
| 266 | ++ } |
| 267 | + } |
0 commit comments