Skip to content
This repository has been archived by the owner on Oct 25, 2024. It is now read-only.

Commit

Permalink
add more clarity to schema mismatch
Browse files Browse the repository at this point in the history
  • Loading branch information
Rashad Alston authored and Rashad Alston committed Aug 16, 2023
1 parent 9db959f commit 1b690d8
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 55 deletions.
15 changes: 10 additions & 5 deletions packages/fuel-indexer-api-server/src/uses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,11 +316,16 @@ pub(crate) async fn register_indexer_assets(

queries::commit_transaction(&mut conn).await?;

tx.send(ServiceRequest::Reload(ReloadRequest {
namespace,
identifier,
}))
.await?;
if let Err(e) = tx
.send(ServiceRequest::Reload(ReloadRequest {
namespace,
identifier,
}))
.await
{
error!("Failed to send ServiceRequest::Reload: {e:?}");
return Err(e.into());
}

return Ok(Json(json!({
"success": "true",
Expand Down
73 changes: 46 additions & 27 deletions packages/fuel-indexer/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -688,13 +688,10 @@ impl WasmIndexExecutor {
let schema_version_from_wasm = ffi::get_version(&mut store_mut, &instance)?;

if schema_version_from_wasm != schema_version {
// Just an FYI this error below is not getting unwrapped into a panic...
let msg = format!(
"Schema version from WASM {} does not match schema version {}",
return Err(IndexerError::SchemaVersionMismatch(format!(
"Schema version from WASM {} does not match schema version from database {}",
schema_version_from_wasm, schema_version
);
error!("error: {msg}");
return Err(IndexerError::SchemaVersionMismatch(msg));
)));
}

data_mut.memory = Some(instance.exports.get_memory("memory")?.clone());
Expand Down Expand Up @@ -744,6 +741,7 @@ impl WasmIndexExecutor {
schema_version: String,
) -> IndexerResult<(JoinHandle<()>, ExecutorSource, Arc<AtomicBool>)> {
let killer = Arc::new(AtomicBool::new(false));
let uid = manifest.uid();

match &exec_source {
ExecutorSource::Manifest => match manifest.module() {
Expand All @@ -752,41 +750,62 @@ impl WasmIndexExecutor {
let mut file = File::open(module).await?;
file.read_to_end(&mut bytes).await?;

let executor = WasmIndexExecutor::new(
match WasmIndexExecutor::new(
config,
manifest,
bytes.clone(),
pool,
schema_version,
)
.await?;

let handle = tokio::spawn(run_executor(
config,
manifest,
executor,
killer.clone(),
));

Ok((handle, ExecutorSource::Registry(bytes), killer))
.await
{
Ok(executor) => {
let handle = tokio::spawn(run_executor(
config,
manifest,
executor,
killer.clone(),
));

Ok((handle, ExecutorSource::Registry(bytes), killer))
}
Err(e) => {
error!(
"Could not instantiate WasmIndexExecutor({uid}) from ExecutorSource::Manifest: {e:?}."
);
Err(IndexerError::WasmExecutionInstantiationError)
}
}
}
crate::Module::Native => {
Err(IndexerError::NativeExecutionInstantiationError)
}
},
ExecutorSource::Registry(bytes) => {
let executor =
WasmIndexExecutor::new(config, manifest, bytes, pool, schema_version)
.await?;

let handle = tokio::spawn(run_executor(
match WasmIndexExecutor::new(
config,
manifest,
executor,
killer.clone(),
));

Ok((handle, exec_source, killer))
bytes,
pool,
schema_version,
)
.await
{
Ok(executor) => {
let handle = tokio::spawn(run_executor(
config,
manifest,
executor,
killer.clone(),
));

Ok((handle, exec_source, killer))
}
Err(e) => {
error!("Could not instantiate WasmIndexExecutor({uid}) from ExecutorSource::Registry: {e:?}.");
Err(IndexerError::WasmExecutionInstantiationError)
}
}
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions packages/fuel-indexer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ pub enum IndexerError {
SchemaError(#[from] IndexerSchemaDbError),
#[error("Manifest error: {0:?}")]
ManifestError(#[from] ManifestError),
#[error("Error creating wasm executor.")]
WasmExecutionInstantiationError,
#[error("Error creating native executor.")]
NativeExecutionInstantiationError,
#[error("Native execution runtime error.")]
Expand Down
62 changes: 39 additions & 23 deletions packages/fuel-indexer/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,18 +181,24 @@ impl IndexerService {
let start_block = get_start_block(&mut conn, &manifest).await.unwrap_or(1);
manifest.set_start_block(start_block);

let (handle, _module_bytes, killer) = WasmIndexExecutor::create(
if let Ok((handle, _module_bytes, killer)) = WasmIndexExecutor::create(
&self.config,
&manifest,
ExecutorSource::Registry(assets.wasm.bytes),
self.pool.clone(),
assets.schema.digest,
)
.await?;

info!("Registered Indexer({})", manifest.uid());
self.handles.insert(manifest.uid(), handle);
self.killers.insert(manifest.uid(), killer);
.await
{
info!("Registered Indexer({})", manifest.uid());
self.handles.insert(manifest.uid(), handle);
self.killers.insert(manifest.uid(), killer);
} else {
error!(
"Failed to register Indexer({}) from registry.",
manifest.uid()
);
}
}

Ok(())
Expand Down Expand Up @@ -308,24 +314,34 @@ async fn create_service_task(
get_start_block(&mut conn, &manifest).await?;
manifest.set_start_block(start_block);

let (handle, _module_bytes, killer) =
WasmIndexExecutor::create(
&config,
&manifest,
ExecutorSource::Registry(assets.wasm.bytes),
pool.clone(),
assets.schema.digest,
)
.await?;

futs.push(handle);

if let Some(killer_for_prev_executor) =
killers.insert(manifest.uid(), killer)
match WasmIndexExecutor::create(
&config,
&manifest,
ExecutorSource::Registry(assets.wasm.bytes),
pool.clone(),
assets.schema.digest,
)
.await
{
let uid = manifest.uid();
info!("Indexer({uid}) was replaced. Stopping previous version of Indexer({uid}).");
killer_for_prev_executor.store(true, Ordering::SeqCst);
Ok((handle, _module_bytes, killer)) => {
futs.push(handle);

if let Some(killer_for_prev_executor) =
killers.insert(manifest.uid(), killer)
{
let uid = manifest.uid();
info!("Indexer({uid}) was replaced. Stopping previous version of Indexer({uid}).");
killer_for_prev_executor
.store(true, Ordering::SeqCst);
}
}
Err(e) => {
error!(
"Failed to reload Indexer({}.{}): {e:?}",
&request.namespace, &request.identifier
);
return Ok(());
}
}
}
Err(e) => {
Expand Down
7 changes: 7 additions & 0 deletions scripts/utils/build_modules.bash
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@ bash scripts/stripper.bash fuel_indexer_test.wasm
cp fuel_indexer_test.wasm target/wasm32-unknown-unknown/release/
rm -fv fuel_indexer_test.wasm

# This is a test index, keep its assets for now
cargo build -p simple-wasm --release --target wasm32-unknown-unknown
bash scripts/stripper.bash simple_wasm.wasm
cp simple_wasm.wasm target/wasm32-unknown-unknown/release/
cp simple_wasm.wasm packages/fuel-indexer-tests/indexers/simple-wasm
rm -fv simple_wasm.wasm

cargo build -p fuel_explorer --release --target wasm32-unknown-unknown
bash scripts/stripper.bash fuel_explorer.wasm
cp fuel_explorer.wasm target/wasm32-unknown-unknown/release/
Expand Down

0 comments on commit 1b690d8

Please sign in to comment.