-
Notifications
You must be signed in to change notification settings - Fork 42
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[reconfigurator] Reject clickhouse configurations from old generations #7347
base: main
Are you sure you want to change the base?
Conversation
log: &Logger, | ||
) -> Self { | ||
let log = log.new(slog::o!("component" => "ClickhouseCli")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is part of the refactoring, the logs were a bit of a mess.
let clickhouse_server_config = | ||
PropertyGroupBuilder::new("config") | ||
.add_property( | ||
"config_path", | ||
"astring", | ||
format!("{CLICKHOUSE_SERVER_CONFIG_DIR}/{CLICKHOUSE_SERVER_CONFIG_FILE}"), | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also part of the refactoring. Let's use the constants we are using for the configuration files in the SMF service as well, so we don't have to hardcode things into an SMF method script.
pub fn new( | ||
log: &Logger, | ||
binary_path: Utf8PathBuf, | ||
listen_address: SocketAddrV6, | ||
) -> Result<Self> { | ||
let clickhouse_cli = | ||
ClickhouseCli::new(binary_path, listen_address, log); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactor as well, there was no need to pass clickhouse_cli as a parameter, but not clickward etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great stuff @karencfv!
clickhouse-admin/src/context.rs
Outdated
// If there is already a configuration file with a generation number we'll | ||
// use that. Otherwise, we set the generation number to None. | ||
let gen = read_generation_from_file(config_path)?; | ||
let generation = Mutex::new(gen); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's become practice at Oxide to avoid tokio mutexes wherever possible as they have significant problems when cancelled and generally just don't do what we want. I realize there's already some usage here with regards to initialization. We don't have to fix that in this PR, but we should avoid adding new uses. We should instead use a std::sync::mutex
. I left a comment below about this as well.
See the following for more details:
https://rfd.shared.oxide.computer/rfd/0400#no_mutex
https://rfd.shared.oxide.computer/rfd/0397#_example_with_mutexes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lol I was definitely on the fence on that one, I went for consistency in the end be1afc7#diff-c816600501b7aaa7de4a2eb9dc86498662030cea6390fa23e11a22c990efb510L28-L29
Thanks for the links! Hadn't seen those RFDs, will read them both
clickhouse-admin/src/context.rs
Outdated
@@ -36,6 +60,10 @@ impl KeeperServerContext { | |||
pub fn log(&self) -> &Logger { | |||
&self.log | |||
} | |||
|
|||
pub async fn generation(&self) -> Option<Generation> { | |||
*self.generation.lock().await |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We only need read access here, and so we can easily avoid an async mutex here. Generation
is also Copy
, so this is cheap. I'd suggest making this a synchronous function and calling *self.generation.lock()
instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I was wrong here. I wasn't considering the usage of the generation with regards to concurrent requests.
clickhouse-admin/src/context.rs
Outdated
} | ||
|
||
pub fn initialization_lock(&self) -> Arc<Mutex<()>> { | ||
self.initialization_lock.clone() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if this usage of a tokio lock is safe or not due to cancellation. It looks like it aligns with the exact usage we have in our ServerContext
. I also don't have an easy workaround for this right now, and so I guess I'm fine leaving this in to keep moving.
@sunshowers @jgallagher Do you have any ideas here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Various thoughts; sorry if some of this is obvious, but I don't have much context here so am just hopping in:
- Cloning an
Arc<tokio::Mutex<_>>
is fine (the clone is fully at theArc
layer) - ... that said I don't think we need to clone here? Returning
&Mutex<()>
looks like it'd be okay. Mutex<()>
is kinda fishy and probably worthy of a comment, since typically the mutex is protecting some data. (Maybe there is one somewhere that I'm not seeing!)- It looks like the use of this is to prevent the
/init_db
endpoint from running concurrently? That is definitely not cancel safe. If dropshot were configured to cancel handlers on client disconnect, a client could start an/init_db
, drop the request (unlocking the mutex), then start it again while the first one was still running.
On the last point: I think this is "fine" as long as dropshot is configured correctly (i.e., to not cancel handlers). If we wanted this to be correct even under cancellation, I'd probably move the init process into a separate tokio task and manage that either with channels or a sync mutex. Happy to expand on those ideas if it'd be helpful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the input!
Mutex<()> is kinda fishy and probably worthy of a comment, since typically the mutex is protecting some data. (Maybe there is one somewhere that I'm not seeing!)
Tbh, I'm just moving code around that was already here. I'm not really sure what the intention was initially.
On the last point: I think this is "fine" as long as dropshot is configured correctly (i.e., to not cancel handlers). If we wanted this to be correct even under cancellation, I'd probably move the init process into a separate tokio task and manage that either with channels or a sync mutex.
That sounds like a good idea regardless of what the initial intention was. Do you mind expanding a little on those ideas? It'd definitely be helpful
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure thing! One pattern we've used in a bunch places is to spawn a long-lived tokio task and then communicate with it via channels. This looks something like (untested and lots of details omitted):
// kinds of things we can ask the task to do
enum Request {
DoSomeThing {
// any inputs from us the task needs
data: DataNeededToDoSomeThing,
// a oneshot channel the task uses to send us the result of our request
response: oneshot::Sender<ResultOfSomeThing>,
},
}
// the long-lived task: loop over incoming requests and handle them
fn long_running_task(incoming: Receiver<Request>) {
// run until the sending half of `incoming` is dropped
while let Some(request) = incoming.recv().await {
match request {
Request::DoSomeThing { data, response } => {
let result = do_some_thing(data);
response.send(response);
}
}
}
}
// our main code: one time up front, create the channel we use to talk to the inner task and spawn that task
let (inner_tx, inner_rx) = mpsc::channel(N); // picking N here can be hard
let join_handle = tokio::spawn(long_running_task(inner_rx));
// ... somewhere else, when we want the task to do something for us ...
let (response_tx, response_rx) = oneshot::channel();
inner_tx.send(Request::DoSomeThing { data, response_tx });
let result = response_rx.await;
A real example of this pattern (albeit more complex; I'm not finding any super simple ones at the moment) is in the bootstrap agent: here's where we spawn the inner task. It has a couple different channels for incoming requests, so its run loop is a tokio::select over those channels but is otherwise pretty similar to the outline above.
This pattern is nice because regardless of how many concurrent callers try to send messages to the inner task, it itself can do things serially. In my pseudocode above, if the ... somewhere else
bit is an HTTP handler, even if we get a dozen concurrent requests, the inner task will process them one at a time because it's forcing serialization via the channel it's receiving on.
I really like this pattern. But it has some problems:
- Picking the channel depth is hard. Whatever
N
we pick, that means up to that many callers can be waiting in line. Sometimes we don't want that at all, but tokio's mpsc channels don't allowN=0
. (There are other channel implementations that do if we decide we need this.) - If we just use
inner_tx.send(_)
as in my pseudocode, even if the channel is full, that will just block until there's room, so we actually have an infinite line. This can be avoided viatry_send
instead, which allows us to bubble out some kind of "we're too busy for more requests" backpressure to our caller. - If
do_some_thing()
is slow, this can all compound and make everybody slow. - If
do_some_thing()
hangs, then everybody trying to send requests to the inner task hangs too. (This recently happened to us in sled-agent!)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A "build your own" variant of the above in the case where you want at most one instance of some operation is to use a sync::Mutex
around a tokio task join handle. This would look something like (again untested, details omitted):
// one time up front, create a sync mutex around an optional tokio task join handle
let task_lock = sync::Mutex::new(None);
// ... somewhere else, where we want to do work ...
// acquire the lock
let mut task_lock = task_lock.lock().unwrap();
// if there's a previous task running, is it still running?
let still_running = match task_lock.as_ref() {
Some(joinhandle) => !joinhandle.is_finished(),
None => false,
};
if still_running {
// return a "we're busy" error
}
// any previous task is done; start a new one
*task_lock = Some(tokio::spawn(do_some_work()));
This has its own problems; the biggest one is that we can't wait for the result of do_some_work()
while holding the lock, so this really only works for background stuff that either doesn't need to return results at all, or the caller is in a position to poll us for completion at some point in the future. (In the joinhandle.is_finished()
case, we can .await
it to get the result of do_some_work()
.)
We don't use this pattern as much. One example is in installinator, where we do want to get the result of previously-completed tasks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the write up John. I think, overall, it's probably simpler to have a long running task and issue requests that way. However, as you mentioned this has its own problems. However, we know what those problems are and we use this pattern all over sled agent.
In this case we can constraint the problem such that we only want to handle one in flight request at a time, since reconfigurator execution will retry again later anyway. I'd suggest using a flume bounded channel with a size of 0 to act as a rendezvous
channel. That should give the behavior we want. We could have separate tasks for performing initialization and config writing so we don't have one block out the other.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
excellent! Thanks a bunch for the write up!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could have separate tasks for performing initialization and config writing so we don't have one block out the other.
@andrewjstone , do we really not want them to block out each other? It'd be problematic to have the db init job trying to run when the generate config one hasn't finished and vice versa no?
// file generation. | ||
if let Some(current) = current_generation { | ||
if current > incoming_generation { | ||
return Err(HttpError::for_internal_error( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't feel like an internal error to me. This is an expected race condition, and so I think we should return a 400 level error instead of a 500 level error. I think 412 is an appropriate error code, even though we are not using etags for a precondition. @davepacheco does that make sense to you?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Definitely agreed it's not a 500. It looks like Sled Agent uses 409 (Conflict) for this and I'd suggest using that for consistency.
omicron/sled-agent/src/services.rs
Lines 3418 to 3424 in 1f0c185
// Absolutely refuse to downgrade the configuration. | |
if ledger_zone_config.omicron_generation > request.generation { | |
return Err(Error::RequestedConfigOutdated { | |
requested: request.generation, | |
current: ledger_zone_config.omicron_generation, | |
}); | |
} |
omicron/sled-agent/src/services.rs
Lines 308 to 310 in 1f0c185
Error::RequestedConfigOutdated { .. } => { | |
omicron_common::api::external::Error::conflict(&err.to_string()) | |
} |
// file generation. | ||
if let Some(current) = current_generation { | ||
if current > incoming_generation { | ||
return Err(HttpError::for_internal_error( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same thing as above. I think this should be a 400-level error.
|
||
// We want to update the generation number only if the config file has been | ||
// generated successfully. | ||
*ctx.generation.lock().await = Some(incoming_generation); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a TOCTOU problem here, in that ctx.generation
could have changed between when we checked it above and when we reacquire the lock here to set it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, I guess that depends on how reconfigurator works? How often is the generation changing?
I decided to update the generation number once the config file had been successfully generated, because if it hadn't, then the zone wouldn't be fully in that generation. Do you think it makes more sense to update the generation immediately?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I'd consider this outside the context of reconfigurator. If this endpoint is called multiple times concurrently with different incoming generations, does it behave correctly? That way we don't have an implicit dependency between the correctness of this endpoint and the behavior or timing of reconfigurator.
Sorry for the dumb questions, but - is it safe for two instances of generate_server_config()
to be running concurrently? I think that has implications on what we need to do with the lock on generation
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I'd consider this outside the context of reconfigurator. If this endpoint is called multiple times concurrently with different incoming generations, does it behave correctly?
I guess there could be an error if two generate_server_config()
s with different generation numbers are running
and they both read an initial value for generation, but one with the lower number manages to write after the one with the higher one.
Thanks for the input! I guess that settles it, I'll update the number immediately after reading. I was on the fence about this one anyway. Even if the config is borked, it'll be borked in that generation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, I'm not sure that's enough. We may need to write the config file while holding the lock too, I think?
Imagine we're currently on gen 1 and we get two concurrent requests, one that gives us gen 2 and one that gives us gen 3. If our code is something like:
{
let gen = acquire_generation_lock().await;
if *gen > incoming_generation {
return an error;
}
*gen = incoming_generation;
} // release `gen` lock
write_new_config_file();
then one possible ordering is:
- The request for gen 2 acquires the lock. We're currently on gen 1, so this is fine. We update to gen=2 and release the lock. Then we get parked for some reason.
- The request for gen 3 acquires the lock. We're currently on gen 2, so this is fine. We update to gen=3 and release the lock. We write our config file.
- The gen 2 request gets unparked. It writes its config file.
Then at this point we think we're on gen=3 but the config file on disk is the one from gen=2.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the detailed answer!
Hm, I'm not sure that's enough. We may need to write the config file while holding the lock too, I think?
Yep, that makes total sense
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, you are right @jgallagher. These requests all need to be serialized. (I know you are currently writing up some options, just wanted to drop a note).
Thanks for the reviews everyone! I'm not finished here, but leaving it for today. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I've addressed all of the comments, let me know if there's something I'm missing!
I've run all the manual tests I did before and received the same results as before.
clickhouse-admin/src/context.rs
Outdated
pub fn generate_config_and_enable_svc( | ||
&self, | ||
replica_settings: ServerConfigurableSettings, | ||
) -> Result<ReplicaConfig, HttpError> { | ||
let mut current_generation = self.generation.lock().unwrap(); | ||
let incoming_generation = replica_settings.generation(); | ||
|
||
// If the incoming generation number is lower, then we have a problem. | ||
// We should return an error instead of silently skipping the configuration | ||
// file generation. | ||
if let Some(current) = *current_generation { | ||
if current > incoming_generation { | ||
return Err(HttpError::for_client_error( | ||
Some(String::from("Conflict")), | ||
StatusCode::CONFLICT, | ||
format!( | ||
"current generation '{}' is greater than incoming generation '{}'", | ||
current, | ||
incoming_generation, | ||
) | ||
)); | ||
} | ||
}; | ||
|
||
let output = | ||
self.clickward().generate_server_config(replica_settings)?; | ||
|
||
// We want to update the generation number only if the config file has been | ||
// generated successfully. | ||
*current_generation = Some(incoming_generation); | ||
|
||
// Once we have generated the client we can safely enable the clickhouse_server service | ||
let fmri = "svc:/oxide/clickhouse_server:default".to_string(); | ||
Svcadm::enable_service(fmri)?; | ||
|
||
Ok(output) | ||
} | ||
|
||
pub async fn init_db(&self) -> Result<(), HttpError> { | ||
let log = self.log(); | ||
// Initialize the database only if it was not previously initialized. | ||
// TODO: Migrate schema to newer version without wiping data. | ||
let client = self.oximeter_client(); | ||
let version = client.read_latest_version().await.map_err(|e| { | ||
HttpError::for_internal_error(format!( | ||
"can't read ClickHouse version: {e}", | ||
)) | ||
})?; | ||
if version == 0 { | ||
info!( | ||
log, | ||
"initializing replicated ClickHouse cluster to version {OXIMETER_VERSION}" | ||
); | ||
let replicated = true; | ||
self.oximeter_client() | ||
.initialize_db_with_version(replicated, OXIMETER_VERSION) | ||
.await | ||
.map_err(|e| { | ||
HttpError::for_internal_error(format!( | ||
"can't initialize replicated ClickHouse cluster \ | ||
to version {OXIMETER_VERSION}: {e}", | ||
)) | ||
})?; | ||
} else { | ||
info!( | ||
log, | ||
"skipping initialization of replicated ClickHouse cluster at version {version}" | ||
); | ||
} | ||
|
||
Ok(()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a mechanical change, moving most of the functionality from context.rs
to here so we can these from long_running_ch_server_task
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for taking the time to do a live review of this PR @jgallagher @andrewjstone 🙇♀️
I think I've addressed all of the changes we discussed. Please let me know if I missed anything!
} => { | ||
let result = | ||
init_db(clickhouse_address, log.clone(), replicated).await; | ||
if let Err(e) = response.send(result) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jgallagher Didn't change this to try_send
because it's a oneshot channel so it doesn't have that method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, that makes sense: oneshot
channels are single-use only, so it's not possible for them block due to the channel being full.
Ran all the previous manual tests and additionally grabbed a bit of the logs to show they're happily moving along: 08:03:53.840Z INFO clickhouse-admin-server (dropshot): accepted connection
file = /home/coatlicue/.cargo/registry/src/index.crates.io-6f17d22bba15001f/dropshot-0.13.0/src/server.rs:1023
local_addr = [fd00:1122:3344:101::26]:8888
remote_addr = [fd00:1122:3344:101::c]:33133
08:03:53.900Z INFO clickhouse-admin-server (dropshot): request completed
file = /home/coatlicue/.cargo/registry/src/index.crates.io-6f17d22bba15001f/dropshot-0.13.0/src/server.rs:863
latency_us = 32935
local_addr = [fd00:1122:3344:101::26]:8888
method = PUT
remote_addr = [fd00:1122:3344:101::c]:33133
req_id = d11525f4-c81c-492c-ae42-2f29d698e178
response_code = 201
uri = /config
08:03:53.904Z INFO clickhouse-admin-server (ServerContext): skipping initialization of oximeter database at version 13
file = clickhouse-admin/src/context.rs:296
08:03:53.904Z INFO clickhouse-admin-server (dropshot): request completed
file = /home/coatlicue/.cargo/registry/src/index.crates.io-6f17d22bba15001f/dropshot-0.13.0/src/server.rs:863
latency_us = 3324
local_addr = [fd00:1122:3344:101::26]:8888
method = PUT
remote_addr = [fd00:1122:3344:101::c]:33133
req_id = 66a91f3e-af45-43ef-a570-175c04e2436e
response_code = 204
uri = /init
08:04:37.917Z INFO clickhouse-admin-server (dropshot): accepted connection
file = /home/coatlicue/.cargo/registry/src/index.crates.io-6f17d22bba15001f/dropshot-0.13.0/src/server.rs:1023
local_addr = [fd00:1122:3344:101::26]:8888
remote_addr = [fd00:1122:3344:101::a]:61669
08:04:37.982Z INFO clickhouse-admin-server (dropshot): request completed
file = /home/coatlicue/.cargo/registry/src/index.crates.io-6f17d22bba15001f/dropshot-0.13.0/src/server.rs:863
latency_us = 30612
local_addr = [fd00:1122:3344:101::26]:8888
method = PUT
remote_addr = [fd00:1122:3344:101::a]:61669
req_id = a07d07b6-0c66-4df8-af55-ff74202d9822
response_code = 201
uri = /config
08:04:37.989Z INFO clickhouse-admin-server (ServerContext): skipping initialization of oximeter database at version 13
file = clickhouse-admin/src/context.rs:296
08:04:37.989Z INFO clickhouse-admin-server (dropshot): request completed
file = /home/coatlicue/.cargo/registry/src/index.crates.io-6f17d22bba15001f/dropshot-0.13.0/src/server.rs:863
latency_us = 3178
local_addr = [fd00:1122:3344:101::26]:8888
method = PUT
remote_addr = [fd00:1122:3344:101::a]:61669
req_id = 99cbf23a-7995-4847-87d8-b7939d214ec1
response_code = 204
uri = /init |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for all the back and forth on this! I think there are a few structural things to address on the async / inner task / channel side of things.
// If the incoming generation number is lower, then we have a problem. | ||
// We should return an error instead of silently skipping the configuration | ||
// file generation. | ||
if let Some(current) = current_generation { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this method has lost its concurrency protection - it has neither a mutex nor a task that enforces serialization. I think KeeperServerContext
needs to spawn a long_running_generate_config_task
just like ServerContext
, and then we need to communicate with that task in this endpoint.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
whoops forgot that one 😅
clickhouse-admin/src/context.rs
Outdated
// If there is already a configuration file with a generation number we'll | ||
// use that. Otherwise, we set the generation number to None. | ||
let gen = read_generation_from_file(config_path)?; | ||
let (generation_tx, _rx) = watch::channel(gen); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is probably correct, but it isn't really making use of the watch
channel (it's just treating it as a fancy mutex). I think what should happen here is:
- When the watch channel is created, the
tx
side is given tolong_running_generate_config_task
. ServerContext
should only hold therx
side.- The watch channel should not be involved in the message to generate a config at all;
long_running_generate_config_task
already has the sending side, so it can update it as needed. This means thegenerate_config
endpoint no longer needs to know anything about the generation channel at all. - The
generation
endpoint can.borrow()
the receiving end of the channel held byServerContext
to read the latest value set bylong_running_generate_config_task
.
} => { | ||
let result = | ||
init_db(clickhouse_address, log.clone(), replicated).await; | ||
if let Err(e) = response.send(result) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, that makes sense: oneshot
channels are single-use only, so it's not possible for them block due to the channel being full.
Svcadm::enable_service(fmri)?; | ||
let (response_tx, response_rx) = oneshot::channel(); | ||
ctx.generate_config_tx | ||
.send_async(GenerateConfigRequest::GenerateConfig { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A couple comments here, one substantive and one stylistic:
- I think based on our conversation, all the
.send_async
s in these endpoints should be.try_send
, right? So they don't block if the channel is full and instead return some kind of HTTP busy error? - The fact that we're using an inner task to serialize requests can be an implementation detail of
ServerContext
; the http endpoints shouldn't need to know that there's a channel underneath IMO. Could we move these into methods on the context types? Untested and probably has typos, but something like:
// inside impl ServerContext
pub async fn generate_config(&self, replica_settings: ReplicaSettings) -> Result<ReplicaConfig, SomeErrorType> {
let (response_tx, response_rx) = oneshot::channel();
self.generate_config_tx.try_send(GenerateConfigRequest::GenerateConfig {
clickward: self.clickward,
log: self.log.clone(),
replica_settings,
response: response_tx,
}).map_err(/* error handling */)?;
response_rx.await.map_err(/* error handling */)
}
Then this endpoint can probably be reduced to something like
let ctx = rqctx.context();
let replica_settings = body.into_inner();
let result = ctx.generate_config(replica_settings).map_err(/* error handling */)?;
Ok(HttpResponseCreated(result))
and not need to use all the exposed details from how the server contexts are implemented.
Thanks for taking the time to review, and leave such detailed comments @jgallagher ! I think I've addressed all of them. Please let me know if there's something missing :) Ran all my manual tests again, and received same results |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, this is looking great! I left a bunch of nitpicky comments, mostly around error handling. I'll defer to @andrewjstone on all the clickhouse bits, but the async / concurrency stuff looks like it's in good shape. 👍
rqctx: RequestContext<Self::Context>, | ||
) -> Result<HttpResponseOk<Generation>, HttpError> { | ||
let ctx = rqctx.context(); | ||
let gen = match *ctx.generation_rx.borrow() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might be a little cleaner if ctx
exposed a generation(&self) -> Option<Generation>
method? That way (a) generation_rx
could be private and (b) HTTP handlers wouldn't need to know the details that the context is managing generations via a watch channel.
I think this is also better hygiene for watch channels. The docs on borrow()
note:
Outstanding borrows hold a read lock on the inner value. This means that long-lived borrows could cause the producer half to block. It is recommended to keep the borrow as short-lived as possible.
If context exposes the channel directly, then every user of it is responsible for keeping their borrows short; if instead it only provides a helper method for reading the current value, that helper method guarantees all borrows are short. (This is easy to do in this case because Generation
is Copy
; it's harder to do with watch channels over types that aren't cheap to clone.)
Svcadm::enable_service(fmri)?; | ||
let replica_settings = body.into_inner(); | ||
let result = | ||
ctx.send_generate_config_and_enable_svc(replica_settings).await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Naming nit - I think I'd remove the send_
prefix on all of the generate config / init db methods. It makes sense when looking at the implementation of the method (it's send-ing messages on channels), but I think it's kinda confusing at the callsite here: where are we sending the config generation?
pub generate_config_tx: Sender<GenerateConfigRequest>, | ||
pub generation_rx: watch::Receiver<Option<Generation>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we drop the pub
from these? (In conjunction with the earlier comment about a helper method for reading the current generation)
let clickward = Clickward::new(); | ||
Self { clickward, clickhouse_cli, log } | ||
let config_path = Utf8PathBuf::from_str(CLICKHOUSE_KEEPER_CONFIG_DIR) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit - I think all the uses of Utf8PathBuf::from_str(..)
could instead be UtfPathBuf::from(..)
and then not need to be .unwrap()
'd.
}) | ||
.map_err(|e| { | ||
HttpError::for_internal_error(format!( | ||
"failure to send request: {e}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure including the {e}
here will be useful. Maybe instead we should match on it and return different kinds of HTTP errors for the two cases? Something like
.map_err(|e| match e {
TrySendError::Full(_) => HttpError::for_unavail(
None,
"channel full: another config request is still running"
.to_string(),
),
TrySendError::Disconnected(_) => {
HttpError::for_internal_error(
"long-running generate-config task died".to_string(),
)
}
})?;
generation_tx.send(Some(incoming_generation)).map_err(|e| { | ||
HttpError::for_internal_error(format!("failure to send request: {e}")) | ||
})?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is fine but kind of awkward: send
can only fail if there are no subscribers, which in our case would mean the context object is gone, which presumably means there isn't anyone around to receive the HTTP error we're creating.
watch::Sender
has a few methods that let you update the value without failing even if there are no receivers (send_replace
, send_if_modified
, send_modify
). Since Generation
is basically just an integer, send_replace
might be the cleanest here?
generation_tx.send(Some(incoming_generation)).map_err(|e| { | |
HttpError::for_internal_error(format!("failure to send request: {e}")) | |
})?; | |
generation_tx.send_replace(Some(incoming_generation)); |
// TODO: Migrate schema to newer version without wiping data. | ||
let version = client.read_latest_version().await.map_err(|e| { | ||
HttpError::for_internal_error(format!( | ||
"can't read ClickHouse version: {e}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Including the error here is good, but as written {e}
will only include the top-most error and not the full error chain. Can we add a dependency on slog-error-chain
and use it to get the full chain of errors?
"can't read ClickHouse version: {e}", | |
"can't read ClickHouse version: {}", InlineErrorChain::new(e), |
.map_err(|e| { | ||
HttpError::for_internal_error(format!( | ||
"can't initialize oximeter database \ | ||
to version {OXIMETER_VERSION}: {e}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Same note as above - suggest InlineErrorChain::new(&e)
here)
return Ok(None); | ||
} | ||
|
||
let file = File::open(&path)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add context to this error? Something like
let file = File::open(&path)?; | |
let file = File::open(&path).with_context(|| format!("failed to open {path}"))?; |
|
||
let line_parts: Vec<&str> = first_line.rsplit(':').collect(); | ||
if line_parts.len() != 2 { | ||
bail!("first line of configuration file is malformed: {}", first_line); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add path
to this error (and the other bail
s / anyhow
s below)?
Overview
This commit adds functionality to clickhouse-admin to keep track of the blueprint generation number. There is also a new validation check where if reconfigurator attempts to generate a configuration file from a previous generation, clickhouse-admin will not generate such configuration file, and exit with an error.
Additionally, there's been a small clean up of the clickhouse-admin code.
Manual testing
In a local omicron deployment first tell reconfigurator to deploy a clickhouse policy
both
with the default number of replicas and keepers.We can see keepers and servers are at generation 2.
Now we zlogin into a keeper zone to check we have recorded that information and that the node has joined the quorum.
We zlogin into a replica zone and check we have recorded that information, and the database contains the expected oximeter table and fields.
No we want to force a new generation number, so we set a clickhouse policy with an additional server and keeper
We deploy it and do the same checks on the same zones we checked previously and in the new zones
Old keeper zone:
New keeper zone:
Old replica zone:
New replica zone:
To verify clickhouse-admin exits with an error if the incoming generation number is lower than the current one, I tested by runing clickhouse-admin against a local clickward deployment:
Closes: #7137