Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option to set raw_datums in avro schema editor #685

Merged
merged 2 commits into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions crates/arroyo-connectors/src/kafka/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use arroyo_rpc::api_types::connections::{ConnectionProfile, ConnectionSchema, Te
use arroyo_rpc::df::ArroyoSchema;
use arroyo_rpc::formats::{BadData, Format, JsonFormat};
use arroyo_rpc::schema_resolver::{
ConfluentSchemaRegistry, ConfluentSchemaRegistryClient, FailingSchemaResolver, SchemaResolver,
ConfluentSchemaRegistry, ConfluentSchemaRegistryClient, SchemaResolver,
};
use arroyo_rpc::{schema_resolver, var_str::VarStr, OperatorConfig};
use arroyo_types::string_to_map;
Expand Down Expand Up @@ -342,24 +342,24 @@ impl Connector for KafkaConnector {
.insert("isolation.level".to_string(), "read_committed".to_string());
}

let schema_resolver: Arc<dyn SchemaResolver + Sync> =
let schema_resolver: Option<Arc<dyn SchemaResolver + Sync>> =
if let Some(SchemaRegistry::ConfluentSchemaRegistry {
endpoint,
api_key,
api_secret,
}) = &profile.schema_registry_enum
{
Arc::new(
Some(Arc::new(
ConfluentSchemaRegistry::new(
endpoint,
&table.subject(),
api_key.clone(),
api_secret.clone(),
)
.expect("failed to construct confluent schema resolver"),
)
))
} else {
Arc::new(FailingSchemaResolver::new())
None
};

Ok(OperatorNode::from_source(Box::new(KafkaSourceFunc {
Expand Down
22 changes: 15 additions & 7 deletions crates/arroyo-connectors/src/kafka/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub struct KafkaSourceFunc {
pub format: Format,
pub framing: Option<Framing>,
pub bad_data: Option<BadData>,
pub schema_resolver: Arc<dyn SchemaResolver + Sync>,
pub schema_resolver: Option<Arc<dyn SchemaResolver + Sync>>,
pub client_configs: HashMap<String, String>,
pub messages_per_second: NonZeroU32,
}
Expand Down Expand Up @@ -150,12 +150,20 @@ impl KafkaSourceFunc {
.await;
}

ctx.initialize_deserializer_with_resolver(
self.format.clone(),
self.framing.clone(),
self.bad_data.clone(),
self.schema_resolver.clone(),
);
if let Some(schema_resolver) = &self.schema_resolver {
ctx.initialize_deserializer_with_resolver(
self.format.clone(),
self.framing.clone(),
self.bad_data.clone(),
schema_resolver.clone(),
);
} else {
ctx.initialize_deserializer(
self.format.clone(),
self.framing.clone(),
self.bad_data.clone(),
);
}

let mut flush_ticker = tokio::time::interval(Duration::from_millis(50));
flush_ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
Expand Down
3 changes: 1 addition & 2 deletions crates/arroyo-connectors/src/kafka/source/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use arroyo_operator::operator::SourceOperator;
use arroyo_rpc::df::ArroyoSchema;
use arroyo_rpc::formats::{Format, RawStringFormat};
use arroyo_rpc::grpc::rpc::{CheckpointMetadata, OperatorCheckpointMetadata, OperatorMetadata};
use arroyo_rpc::schema_resolver::FailingSchemaResolver;
use arroyo_rpc::{CheckpointCompleted, ControlMessage, ControlResp};
use arroyo_types::{
single_item_hash_map, to_micros, ArrowMessage, CheckpointBarrier, SignalMessage, TaskInfo,
Expand Down Expand Up @@ -85,7 +84,7 @@ impl KafkaTopicTester {
format: Format::RawString(RawStringFormat {}),
framing: None,
bad_data: None,
schema_resolver: Arc::new(FailingSchemaResolver::new()),
schema_resolver: None,
client_configs: HashMap::new(),
messages_per_second: NonZeroU32::new(100).unwrap(),
});
Expand Down
2 changes: 1 addition & 1 deletion crates/arroyo-formats/src/avro/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ fn to_arrow_datatype(schema: &Schema) -> (DataType, bool, Option<ArroyoExtension
),
Schema::Float => (DataType::Float32, false, None),
Schema::Double => (DataType::Float64, false, None),
Schema::Bytes | Schema::Fixed(_) | Schema::Decimal(_) => (DataType::Binary, false, None),
Schema::Bytes | Schema::Fixed(_) | Schema::Decimal(_) => (DataType::Utf8, false, None),
Schema::String | Schema::Enum(_) | Schema::Uuid => (DataType::Utf8, false, None),
Schema::Union(union) => {
// currently just support unions that have [t, null] as variants, which is the
Expand Down
49 changes: 48 additions & 1 deletion webui/src/routes/connections/SchemaEditor.tsx
Original file line number Diff line number Diff line change
@@ -1,6 +1,17 @@
import { Dispatch, useEffect, useRef, useState } from 'react';
import { CreateConnectionState } from './CreateConnection';
import { Alert, AlertIcon, Box, Button, List, ListItem, Stack } from '@chakra-ui/react';
import {
Alert,
AlertIcon,
Box,
Button,
Checkbox,
FormControl,
FormHelperText,
List,
ListItem,
Stack,
} from '@chakra-ui/react';
import * as monaco from 'monaco-editor/esm/vs/editor/editor.api';
import { ConnectionSchema, post } from '../../lib/data_fetching';
import { formatError } from '../../lib/util';
Expand All @@ -22,10 +33,22 @@ export function SchemaEditor({
const [errors, setErrors] = useState<Array<string> | null>(null);
const [testing, setTesting] = useState<boolean>(false);
const [tested, setTested] = useState<string | undefined>();
const [rawDatum, setRawDatum] = useState<boolean>(false);

const valid = tested == editor?.getValue() && errors?.length == 0;

const testSchema = async () => {
// if avro and raw datum, then we need to add the raw datum encoding
if (format == 'avro' && rawDatum) {
// @ts-ignore
state.schema!.format['avro']!.rawDatums = rawDatum;
// update the state
setState({
...state,
schema: state.schema,
});
}

setTesting(true);
setErrors(null);
const { error } = await post('/v1/connection_tables/schemas/test', {
Expand Down Expand Up @@ -68,6 +91,28 @@ export function SchemaEditor({
}
}

let avroOptions = null;
if (format == 'avro') {
avroOptions = (
<Box maxW={'lg'}>
<FormControl>
<Checkbox
onChange={e => {
console.log('CHECKED = ', e.target.checked);
setRawDatum(e.target.checked);
}}
>
Raw datum encoding
</Checkbox>
<FormHelperText>
This encoding should be used for streams composed of individual <i>avro datums</i>,
rather than complete Avro documents with embedded schemas
</FormHelperText>
</FormControl>
</Box>
);
}

useEffect(() => {
if (monacoEl && !editor && !created.current) {
let e = monaco.editor.create(monacoEl.current!, {
Expand All @@ -90,6 +135,7 @@ export function SchemaEditor({

// @ts-ignore
schema.format![format] = {};

// @ts-ignore
schema.definition![format + '_schema'] = e.getValue();

Expand All @@ -108,6 +154,7 @@ export function SchemaEditor({

return (
<Stack spacing={4}>
{avroOptions}
<Box marginTop={5} width="100%">
<div className="editor" ref={monacoEl}></div>
</Box>
Expand Down
Loading