Skip to content

Commit

Permalink
fill in todos
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangjinwu committed Jul 10, 2024
1 parent 17ab8c1 commit b67cf20
Showing 1 changed file with 29 additions and 2 deletions.
31 changes: 29 additions & 2 deletions src/connector/src/parser/avro/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,26 @@ impl AvroAccessBuilder {
}
}
WriterSchemaCache::Glue(resolver) => {
todo!()
if payload.len() < 18 {
bail!("payload shorter than 18-byte glue header");
}
if payload[0] != 3 {
bail!(
"Only support glue header version 3 but found {}",
payload[0]
);
}
if payload[1] != 0 {
bail!("Non-zero compression {} not supported", payload[1]);
}
let schema_version_id = uuid::Uuid::from_slice(&payload[2..18]).unwrap();
let writer_schema = resolver.get_by_id(schema_version_id).await?;
let mut raw_payload = &payload[18..];
Ok(Some(from_avro_datum(
writer_schema.as_ref(),
&mut raw_payload,
Some(&self.schema.original_schema),
)?))
}
}
}
Expand Down Expand Up @@ -218,7 +237,15 @@ impl AvroParserConfig {
schema_arn,
aws_auth_props,
} => {
todo!()
let client = aws_sdk_glue::Client::new(&aws_auth_props.build_config().await?);
let resolver = GlueSchemaCache::new(client);
let schema = resolver.get_by_name(&schema_arn).await?;
Ok(Self {
schema: Arc::new(ResolvedAvroSchema::create(schema)?),
key_schema: None,
writer_schema_cache: WriterSchemaCache::Glue(Arc::new(resolver)),
map_handling,
})
}
}
}
Expand Down

0 comments on commit b67cf20

Please sign in to comment.