Skip to content

Commit

Permalink
Merge branch 'main' of github.com:geo-engine/geoengine into copernicu…
Browse files Browse the repository at this point in the history
…s-provider
  • Loading branch information
michaelmattig committed Sep 12, 2024
2 parents f1e31d5 + 46a5336 commit fb2de76
Show file tree
Hide file tree
Showing 25 changed files with 645 additions and 437 deletions.
517 changes: 317 additions & 200 deletions Cargo.lock

Large diffs are not rendered by default.

11 changes: 6 additions & 5 deletions datatypes/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,16 @@ repository.workspace = true
[features]

[dependencies]
arrow = { version = "50.0", features = ["ipc_compression"] }
arrow-array = "50.0"
arrow-ord = "50.0"
arrow = { version = "53.0", features = ["ipc_compression"] }
arrow-array = "53.0"
arrow-ord = "53.0"
arrow-schema = { version = "53", features = ["serde"] }
bytes = "1.5" # for postgres-types impls
chrono = "0.4"
fallible-iterator = "0.2" # only for postgres-protocol
float-cmp = "0.9"
gdal = "0.16"
geo = "0.27"
gdal = "0.17"
geo = "0.28"
geojson = "0.24"
image = "0.25"
num = "0.4"
Expand Down
104 changes: 100 additions & 4 deletions datatypes/src/collections/feature_collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1515,7 +1515,6 @@ mod struct_serde {
use serde::de::{SeqAccess, Visitor};
use serde::ser::Error;
use serde::{Deserializer, Serializer};

use std::fmt::Formatter;
use std::io::Cursor;

Expand Down Expand Up @@ -1599,6 +1598,97 @@ mod struct_serde {
self.visit_byte_buf(bytes)
}
}

#[cfg(test)]
pub fn serialize_json<S>(struct_array: &StructArray, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
use serde::ser::SerializeStruct;

let batch = RecordBatch::from(struct_array);

let mut json_writer = arrow::json::ArrayWriter::new(Vec::<u8>::new());
json_writer.write(&batch).map_err(S::Error::custom)?;
json_writer.finish().map_err(S::Error::custom)?;
let json: serde_json::Value =
serde_json::from_slice(&json_writer.into_inner()).map_err(S::Error::custom)?;

let mut struct_serializer = serializer.serialize_struct("FeatureCollection", 2)?;
struct_serializer.serialize_field("schema", batch.schema_ref())?;
struct_serializer.serialize_field("data", &json)?;
struct_serializer.end()
}

#[cfg(test)]
pub fn deserialize_json<'de, D>(deserializer: D) -> Result<StructArray, D::Error>
where
D: Deserializer<'de>,
{
use std::io::BufReader;

struct StructArrayJsonDeserializer;

impl<'de> Visitor<'de> for StructArrayJsonDeserializer {
type Value = StructArray;

fn expecting(&self, formatter: &mut Formatter) -> std::fmt::Result {
formatter.write_str("an Arrow StructArray")
}

fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error>
where
A: serde::de::MapAccess<'de>,
{
use serde::de::Error;

let (mut schema, mut data) = (None, None);

while let Some(key) = map.next_key()? {
match key {
"schema" => {
schema = Some(map.next_value::<arrow_schema::Schema>()?);
}
"data" => {
let value = map.next_value::<serde_json::Value>()?;
data = Some(serde_json::to_vec(&value).map_err(A::Error::custom)?);
}
other => return Err(A::Error::custom(format!("Unexpected field {other}"))),
}
}

let (Some(schema), Some(data)) = (schema, data) else {
return Err(A::Error::custom("Missing fields `schema` & `data`"));
};

let mut reader = arrow::json::ReaderBuilder::new(Arc::new(schema))
.build(BufReader::new(Cursor::new(&data)))
.map_err(A::Error::custom)?;

let batch = reader
.next()
.ok_or_else(|| {
A::Error::custom(
"Must be one batch inside the serialized data. Found none.",
)
})?
.map_err(A::Error::custom)?;
if reader.next().is_some() {
return Err(A::Error::custom(
"Must be one batch inside the serialized data. Found more.",
));
}

Ok(batch.into())
}
}

deserializer.deserialize_struct(
"FeatureCollection",
&["schema", "data"],
StructArrayJsonDeserializer,
)
}
}

impl<P, G> Reproject<P> for FeatureCollection<G>
Expand Down Expand Up @@ -1766,6 +1856,7 @@ mod tests {
.is_err());
}

/// If this test fails, change serialization to JSON (cf. methods below) instead of IPC.
#[test]
fn it_does_not_json_serialize() {
let collection = FeatureCollection::<MultiPoint>::from_data(
Expand All @@ -1783,9 +1874,14 @@ mod tests {
.unwrap();

let struct_array = collection.table;
let array: Arc<dyn arrow::array::Array> = Arc::new(struct_array);

// TODO: if this stops failing, change the strange custom byte serialization to use JSON
arrow::json::writer::array_to_json_array(&array).unwrap_err();
let serialized_struct_array =
struct_serde::serialize_json(&struct_array, serde_json::value::Serializer)
.unwrap()
.to_string();

let mut deserializer = serde_json::Deserializer::from_str(&serialized_struct_array);

assert!(struct_serde::deserialize_json(&mut deserializer).is_err());
}
}
2 changes: 1 addition & 1 deletion datatypes/src/primitives/circle.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use geo::intersects::Intersects;
use geo::Intersects;
use num_traits::abs;

use crate::operations::Contains;
Expand Down
4 changes: 2 additions & 2 deletions datatypes/tests/example-arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ fn multipoints() {
false,
))))
.len(2) // number of multipoints
.add_buffer(Buffer::from(&[0_i32, 2, 5].to_byte_slice()))
.add_buffer(Buffer::from([0_i32, 2, 5].to_byte_slice()))
.add_child_data(
ArrayData::builder(DataType::FixedSizeList(
Arc::new(Field::new("", DataType::Float64, false)),
Expand All @@ -456,7 +456,7 @@ fn multipoints() {
ArrayData::builder(DataType::Float64)
.len(10) // number of floats
.add_buffer(Buffer::from(
&[
[
1_f64, 2., 11., 12., 21., 22., 31., 32., 41., 42., 51., 52., 61., 62.,
71., 72., 81., 82., 91., 92.,
]
Expand Down
2 changes: 1 addition & 1 deletion expression/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ syn = "2.0"
tempfile = "3.10"

[dev-dependencies]
geo = "0.27.0"
geo = "0.28"
pretty_assertions = "1.4"

[lints]
Expand Down
11 changes: 5 additions & 6 deletions operators/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,25 +11,25 @@ repository.workspace = true
[features]

[dependencies]
arrow = { version = "50.0" }
arrow = { version = "53.0" }
async-trait = "0.1"
bb8-postgres = "0.8"
bytes = "1.5" # for postgres-types impls
chrono = "0.4"
csv = "1.3"
float-cmp = "0.9"
futures = "0.3"
gdal = "0.16"
gdal-sys = "0.9"
geo = "0.27"
gdal = "0.17"
gdal-sys = "0.10"
geo = "0.28"
geoengine-datatypes = { path = "../datatypes" }
geoengine-expression = { path = "../expression" }
itertools = "0.13"
libloading = "0.8"
log = "0.4"
lru = "0.12"
lz4_flex = { version = "0.11" }
ndarray = { version = "0.15", features = ["approx"] }
ndarray = { version = "0.15", features = ["approx"] } # has to match with `ort`
num-traits = "0.2"
num = "0.4"
ouroboros = "0.18"
Expand Down Expand Up @@ -68,7 +68,6 @@ async-stream = "0.3"
approx = "0.5"
geo-rand = { git = "https://github.com/lelongg/geo-rand", tag = "v0.3.0" }
httptest = "0.16.1"
ndarray = "0.15"
tracing-subscriber = { version = "0.3", features = ["json"] }
rand = { version = "0.8", features = ["small_rng"] }

Expand Down
Loading

0 comments on commit fb2de76

Please sign in to comment.