Skip to content

Commit

Permalink
fix(websocket sink): send encoded message as binary frame (vectordotd…
Browse files Browse the repository at this point in the history
…ev#18060)

* fix(websocket sink): send encoded message as binary frame

* fix: add an extra flag to decide frame format

* fix: address comments

* fix: binary frames for avro and native codecs

* fix: list all codes instead of using default

* fix: use use statement

* fix: extract the logic into a function

* fix: make the func const

* fix: fix format
  • Loading branch information
zhongchen authored Jul 25, 2023
1 parent c592cb1 commit b85f4f9
Showing 1 changed file with 18 additions and 1 deletion.
19 changes: 18 additions & 1 deletion src/sinks/websocket/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,17 @@ impl WebSocketSink {
Ok(())
}

const fn should_encode_as_binary(&self) -> bool {
use codecs::encoding::Serializer::{
Avro, Csv, Gelf, Json, Logfmt, Native, NativeJson, RawMessage, Text,
};

match self.encoder.serializer() {
RawMessage(_) | Avro(_) | Native(_) => true,
Csv(_) | Logfmt(_) | Gelf(_) | Json(_) | Text(_) | NativeJson(_) => false,
}
}

async fn handle_events<I, WS, O>(
&mut self,
input: &mut I,
Expand All @@ -258,6 +269,7 @@ impl WebSocketSink {

let bytes_sent = register!(BytesSent::from(Protocol("websocket".into())));
let events_sent = register!(EventsSent::from(Output(None)));
let encode_as_binary = self.should_encode_as_binary();

loop {
let result = tokio::select! {
Expand Down Expand Up @@ -298,7 +310,12 @@ impl WebSocketSink {
Ok(()) => {
finalizers.update_status(EventStatus::Delivered);

let message = Message::text(String::from_utf8_lossy(&bytes));
let message = if encode_as_binary {
Message::binary(bytes)
}
else {
Message::text(String::from_utf8_lossy(&bytes))
};
let message_len = message.len();

ws_sink.send(message).await.map(|_| {
Expand Down

0 comments on commit b85f4f9

Please sign in to comment.