Skip to content

Commit

Permalink
Improve stream id finder
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Aug 22, 2022
1 parent 329d559 commit 4912fc6
Showing 1 changed file with 17 additions and 16 deletions.
33 changes: 17 additions & 16 deletions shotover-proxy/src/transforms/cassandra/sink_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use cql3_parser::common::{FQName, Identifier};
use cql3_parser::select::SelectElement;
use futures::stream::FuturesOrdered;
use futures::StreamExt;
use itertools::Itertools;
use metrics::{register_counter, Counter};
use rand::prelude::*;
use serde::Deserialize;
Expand Down Expand Up @@ -176,22 +177,7 @@ impl CassandraSinkCluster {
.collect();

for table_to_rewrite in tables_to_rewrite.iter().rev() {
let mut stream_id = 0;
let mut restart = true;
while restart {
restart = false;
for message in &mut messages {
if let Some(Frame::Cassandra(frame)) = message.frame() {
if stream_id == frame.stream_id {
match stream_id.checked_add(1) {
Some(new_stream_id) => stream_id = new_stream_id,
None => return Err(anyhow!("Ran out of stream_ids")),
}
restart = true;
}
}
}
}
let stream_id = get_unused_stream_id(&messages)?;

if let RewriteTableTy::Local = table_to_rewrite.ty {
messages.insert(
Expand Down Expand Up @@ -771,6 +757,21 @@ fn is_use_statement_successful(response: Option<Result<Response>>) -> bool {
false
}

fn get_unused_stream_id(messages: &Messages) -> Result<i16> {
// start at an unusual number to hopefully avoid looping many times when we receive stream ids that look like [0, 1, 2, ..]
// We can quite happily give up 358 stream ids as that still allows for shotover message batches containing 2 ** 16 - 358 = 65178 messages
for i in 358..i16::MAX {
if !messages
.iter()
.filter_map(|message| message.stream_id())
.contains(&i)
{
return Ok(i);
}
}
Err(anyhow!("Ran out of stream ids"))
}

struct SystemPeer {
tokens: Vec<MessageValue>,
schema_version: Uuid,
Expand Down

0 comments on commit 4912fc6

Please sign in to comment.