Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 99 additions & 0 deletions streaming_data_types/fbschemas/stringevent_vs00/StringEvent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
# automatically generated by the FlatBuffers compiler, do not modify

# namespace:

import flatbuffers
from flatbuffers.compat import import_numpy

np = import_numpy()


class StringEvent(object):
__slots__ = ["_tab"]

@classmethod
def GetRootAs(cls, buf, offset=0):
n = flatbuffers.encode.Get(flatbuffers.packer.uoffset, buf, offset)
x = StringEvent()
x.Init(buf, n + offset)
return x

@classmethod
def GetRootAsStringEvent(cls, buf, offset=0):
"""This method is deprecated. Please switch to GetRootAs."""
return cls.GetRootAs(buf, offset)

@classmethod
def StringEventBufferHasIdentifier(cls, buf, offset, size_prefixed=False):
return flatbuffers.util.BufferHasIdentifier(
buf, offset, b"\x76\x73\x30\x30", size_prefixed=size_prefixed
)

# StringEvent
def Init(self, buf, pos):
self._tab = flatbuffers.table.Table(buf, pos)

# StringEvent
def Timestamp(self):
o = flatbuffers.number_types.UOffsetTFlags.py_type(self._tab.Offset(4))
if o != 0:
return self._tab.Get(flatbuffers.number_types.Int64Flags, o + self._tab.Pos)
return 0

# StringEvent
def SourceName(self):
o = flatbuffers.number_types.UOffsetTFlags.py_type(self._tab.Offset(6))
if o != 0:
return self._tab.String(o + self._tab.Pos)
return None

# StringEvent
def Data(self):
o = flatbuffers.number_types.UOffsetTFlags.py_type(self._tab.Offset(8))
if o != 0:
return self._tab.String(o + self._tab.Pos)
return None


def StringEventStart(builder):
builder.StartObject(3)


def Start(builder):
StringEventStart(builder)


def StringEventAddTimestamp(builder, timestamp):
builder.PrependInt64Slot(0, timestamp, 0)


def AddTimestamp(builder, timestamp):
StringEventAddTimestamp(builder, timestamp)


def StringEventAddSourceName(builder, sourceName):
builder.PrependUOffsetTRelativeSlot(
1, flatbuffers.number_types.UOffsetTFlags.py_type(sourceName), 0
)


def AddSourceName(builder, sourceName):
StringEventAddSourceName(builder, sourceName)


def StringEventAddData(builder, data):
builder.PrependUOffsetTRelativeSlot(
2, flatbuffers.number_types.UOffsetTFlags.py_type(data), 0
)


def AddData(builder, data):
StringEventAddData(builder, data)


def StringEventEnd(builder):
return builder.EndObject()


def End(builder):
return StringEventEnd(builder)
Empty file.
43 changes: 43 additions & 0 deletions streaming_data_types/stringevent_vs00.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from collections import namedtuple

import flatbuffers

import streaming_data_types.fbschemas.stringevent_vs00.StringEvent as StringEvent
from streaming_data_types.utils import check_schema_identifier

FILE_IDENTIFIER = b"vs00"

EventData = namedtuple(
"EventData",
("timestamp", "source_name", "data"),
)


def deserialise_vs00(buffer) -> EventData:
check_schema_identifier(buffer, FILE_IDENTIFIER)
event = StringEvent.StringEvent.GetRootAs(buffer, 0)

return EventData(
event.Timestamp(),
event.SourceName().decode("utf-8"),
event.Data().decode("utf-8"),
)


def serialise_vs00(source_name: str, timestamp: int, value: str) -> bytes:

builder = flatbuffers.Builder(128)
builder.ForceDefaults(True)

source = builder.CreateString(source_name)
data_str = builder.CreateString(value)

# Build the actual buffer
StringEvent.StringEventStart(builder)
StringEvent.StringEventAddSourceName(builder, source)
StringEvent.StringEventAddTimestamp(builder, timestamp)
StringEvent.StringEventAddData(builder, data_str)
data = StringEvent.StringEventEnd(builder)

builder.Finish(data, file_identifier=FILE_IDENTIFIER)
return bytes(builder.Output())
58 changes: 58 additions & 0 deletions tests/test_vs00.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import pytest

from streaming_data_types.stringevent_vs00 import deserialise_vs00, serialise_vs00


class TestSerialisationVS00:
def test_vs00_serialization_deserialization(self):
# Example data for vs00
source_name = "test_source"
value = "All happy?"
timestamp = 1625077800

# Serialize
serialized = serialise_vs00(source_name, timestamp, value)
assert isinstance(serialized, bytes)

# Deserialize
deserialized = deserialise_vs00(serialized)
assert deserialized.source_name == source_name
assert deserialized.data == value
assert deserialized.timestamp == timestamp

def test_vs00_invalid_value_serialization(self):
# Test with invalid types
with pytest.raises(TypeError):
serialise_vs00("source_123", 23, 123.4)

def test_vs00_invalid_timestamp_serialization(self):
# Test with invalid types
with pytest.raises(TypeError):
serialise_vs00("source_123", "23", "a failure")

def test_vs00_invalid_source_serialization(self):
# Test with invalid types
with pytest.raises(TypeError):
serialise_vs00(13, 23, "another failure")

def test_vs00_long_string(self):
# Test with edge float values
source_name = "long_test"
value = "a" * 70000 # Very long string
timestamp = 1625077800
serialized = serialise_vs00(source_name, timestamp, value)
deserialized = deserialise_vs00(serialized)
assert deserialized.source_name == source_name
assert deserialized.data == value
assert deserialized.timestamp == timestamp

def test_vs00_empty_string(self):
# Test with edge float values
source_name = "empty_test"
value = "" # Empty string
timestamp = 1625077800
serialized = serialise_vs00(source_name, timestamp, value)
deserialized = deserialise_vs00(serialized)
assert deserialized.source_name == source_name
assert deserialized.data == value
assert deserialized.timestamp == timestamp