diff --git a/Cargo.lock b/Cargo.lock index 234e4a8c..9e07a0b8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -192,6 +192,7 @@ dependencies = [ "arrow-data", "arrow-schema", "chrono", + "chrono-tz", "half", "hashbrown", "num", @@ -590,6 +591,28 @@ dependencies = [ "windows-targets 0.52.4", ] +[[package]] +name = "chrono-tz" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d59ae0466b83e838b81a54256c39d5d7c20b9d7daa10510a242d9b75abd5936e" +dependencies = [ + "chrono", + "chrono-tz-build", + "phf", +] + +[[package]] +name = "chrono-tz-build" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "433e39f13c9a060046954e0592a8d0a4bcb1040125cbf91cb8ee58964cfb350f" +dependencies = [ + "parse-zoneinfo", + "phf", + "phf_codegen", +] + [[package]] name = "ciborium" version = "0.2.2" @@ -1316,6 +1339,7 @@ dependencies = [ "arrow-cast", "arrow-data", "arrow-ipc", + "arrow-json", "arrow-schema", "async-stream", "async-trait", @@ -1330,11 +1354,14 @@ dependencies = [ "geo", "geo-index", "geodesy", + "geojson", "geos", "geozero", + "half", "http-range-client", "indexmap", "itertools 0.12.1", + "lexical-core", "num_enum", "object_store", "parquet", @@ -2313,6 +2340,15 @@ dependencies = [ "zstd", ] +[[package]] +name = "parse-zoneinfo" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c705f256449c60da65e11ff6626e0c16a0a0b96aaa348de61376b249bc340f41" +dependencies = [ + "regex", +] + [[package]] name = "paste" version = "1.0.14" @@ -2350,6 +2386,16 @@ dependencies = [ "phf_shared", ] +[[package]] +name = "phf_codegen" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8d39688d359e6b34654d328e262234662d16cc0f60ec8dcbe5e718709342a5a" +dependencies = [ + "phf_generator", + "phf_shared", +] + [[package]] name = "phf_generator" version = "0.11.2" diff --git a/Cargo.toml b/Cargo.toml index 39bbef4b..44e7ed16 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -47,11 +47,12 @@ rayon = ["dep:rayon"] [dependencies] anyhow = "1" arrow = { version = "51", features = ["ffi"] } -arrow-array = "51" +arrow-array = { version = "51", features = ["chrono-tz"] } arrow-buffer = "51" arrow-cast = "51" arrow-data = "51" arrow-ipc = "51" +arrow-json = "51" arrow-schema = "51" async-stream = { version = "0.3", optional = true } async-trait = { version = "0.1", optional = true } @@ -96,6 +97,13 @@ sqlx = { version = "0.7", optional = true, default-features = false, features = thiserror = "1" tokio = { version = "1", default-features = false, optional = true } +# Temporary until https://github.com/georust/geozero/pull/208 is merged and released. +geojson = { version = "0.24.1", default-features = false } + +half = "*" +lexical-core = "*" + + [dev-dependencies] approx = "0.5.1" bytes = "1.5.0" diff --git a/js/Cargo.lock b/js/Cargo.lock index 534428ed..edf63c64 100644 --- a/js/Cargo.lock +++ b/js/Cargo.lock @@ -186,6 +186,7 @@ dependencies = [ "arrow-data", "arrow-schema", "chrono", + "chrono-tz", "half", "hashbrown", "num", @@ -546,6 +547,28 @@ dependencies = [ "windows-targets 0.52.4", ] +[[package]] +name = "chrono-tz" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d59ae0466b83e838b81a54256c39d5d7c20b9d7daa10510a242d9b75abd5936e" +dependencies = [ + "chrono", + "chrono-tz-build", + "phf", +] + +[[package]] +name = "chrono-tz-build" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "433e39f13c9a060046954e0592a8d0a4bcb1040125cbf91cb8ee58964cfb350f" +dependencies = [ + "parse-zoneinfo", + "phf", + "phf_codegen", +] + [[package]] name = "clap" version = "4.5.4" @@ -994,6 +1017,7 @@ dependencies = [ "arrow-cast", "arrow-data", "arrow-ipc", + "arrow-json", "arrow-schema", "bumpalo", "byteorder", @@ -1003,9 +1027,12 @@ dependencies = [ "geo", "geo-index", "geodesy", + "geojson", "geozero", + "half", "indexmap", "itertools 0.12.1", + "lexical-core", "num_enum", "object_store", "parquet", @@ -1924,6 +1951,15 @@ dependencies = [ "zstd", ] +[[package]] +name = "parse-zoneinfo" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f2a05b18d44e2957b88f96ba460715e295bc1d7510468a2f3d3b44535d26c24" +dependencies = [ + "regex", +] + [[package]] name = "paste" version = "1.0.14" @@ -1946,6 +1982,16 @@ dependencies = [ "phf_shared", ] +[[package]] +name = "phf_codegen" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8d39688d359e6b34654d328e262234662d16cc0f60ec8dcbe5e718709342a5a" +dependencies = [ + "phf_generator", + "phf_shared", +] + [[package]] name = "phf_generator" version = "0.11.2" diff --git a/js/src/io/geojson.rs b/js/src/io/geojson.rs index 25e998c1..ea450473 100644 --- a/js/src/io/geojson.rs +++ b/js/src/io/geojson.rs @@ -1,6 +1,6 @@ use std::io::Cursor; -use geoarrow::io::geojson::read_geojson as _read_geojson; +use geoarrow::io::geojson::{read_geojson as _read_geojson, write_geojson as _write_geojson}; // use parquet_wasm::utils::assert_parquet_file_not_empty; use wasm_bindgen::prelude::*; @@ -32,3 +32,15 @@ pub fn read_geojson(file: &[u8], batch_size: Option) -> WasmResult let (schema, batches) = geo_table.into_inner(); Ok(Table::new(schema, batches)) } + +/// Write table to GeoJSON +/// +/// Note that this consumes the table input +#[wasm_bindgen(js_name = writeGeoJSON)] +pub fn write_geojson(table: Table) -> WasmResult> { + let (schema, batches) = table.into_inner(); + let mut rust_table = geoarrow::table::Table::try_new(schema, batches)?; + let mut output_file: Vec = vec![]; + _write_geojson(&mut rust_table, &mut output_file)?; + Ok(output_file) +} diff --git a/python/core/Cargo.lock b/python/core/Cargo.lock index 07ed18d4..781553e4 100644 --- a/python/core/Cargo.lock +++ b/python/core/Cargo.lock @@ -138,6 +138,7 @@ dependencies = [ "arrow-data", "arrow-schema", "chrono", + "chrono-tz", "half", "hashbrown", "num", @@ -492,6 +493,28 @@ dependencies = [ "windows-targets 0.52.4", ] +[[package]] +name = "chrono-tz" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d59ae0466b83e838b81a54256c39d5d7c20b9d7daa10510a242d9b75abd5936e" +dependencies = [ + "chrono", + "chrono-tz-build", + "phf", +] + +[[package]] +name = "chrono-tz-build" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "433e39f13c9a060046954e0592a8d0a4bcb1040125cbf91cb8ee58964cfb350f" +dependencies = [ + "parse-zoneinfo", + "phf", + "phf_codegen", +] + [[package]] name = "const-oid" version = "0.9.6" @@ -999,6 +1022,7 @@ dependencies = [ "arrow-cast", "arrow-data", "arrow-ipc", + "arrow-json", "arrow-schema", "async-stream", "async-trait", @@ -1010,10 +1034,13 @@ dependencies = [ "futures", "geo", "geo-index", + "geojson", "geozero", + "half", "http-range-client", "indexmap", "itertools 0.12.1", + "lexical-core", "num_enum", "object_store", "parquet", @@ -2105,6 +2132,15 @@ dependencies = [ "zstd", ] +[[package]] +name = "parse-zoneinfo" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c705f256449c60da65e11ff6626e0c16a0a0b96aaa348de61376b249bc340f41" +dependencies = [ + "regex", +] + [[package]] name = "paste" version = "1.0.14" @@ -2136,6 +2172,16 @@ dependencies = [ "phf_shared", ] +[[package]] +name = "phf_codegen" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8d39688d359e6b34654d328e262234662d16cc0f60ec8dcbe5e718709342a5a" +dependencies = [ + "phf_generator", + "phf_shared", +] + [[package]] name = "phf_generator" version = "0.11.2" diff --git a/python/core/src/io/parquet/reader.rs b/python/core/src/io/parquet/reader.rs index a112e410..8363be26 100644 --- a/python/core/src/io/parquet/reader.rs +++ b/python/core/src/io/parquet/reader.rs @@ -384,7 +384,6 @@ impl ParquetFile { #[pyclass(module = "geoarrow.rust.core._rust")] pub struct ParquetDataset { inner: _ParquetDataset, - #[allow(dead_code)] rt: Arc, } diff --git a/src/io/geojson/geojson_reader.rs b/src/io/geojson/geojson_reader.rs new file mode 100644 index 00000000..47ec5772 --- /dev/null +++ b/src/io/geojson/geojson_reader.rs @@ -0,0 +1,293 @@ +use geojson::{Feature, FeatureReader}; +use geojson::{GeoJson as GeoGeoJson, Geometry, Value}; +use geozero::error::{GeozeroError, Result}; +use geozero::{ + ColumnValue, FeatureProcessor, GeomProcessor, GeozeroDatasource, GeozeroGeometry, + PropertyProcessor, +}; +use serde_json::map::Map; +use serde_json::value::Value as JsonValue; +use std::io::Read; + +/// GeoJSON String. +#[derive(Debug)] +pub struct GeoJsonString(pub String); + +impl GeozeroGeometry for GeoJsonString { + fn process_geom(&self, processor: &mut P) -> Result<()> { + read_geojson_geom(&mut self.0.as_bytes(), processor) + } +} + +impl GeozeroDatasource for GeoJsonString { + fn process(&mut self, processor: &mut P) -> Result<()> { + read_geojson(&mut self.0.as_bytes(), processor) + } +} + +/// GeoJSON String slice. +pub struct GeoJson<'a>(pub &'a str); + +impl GeozeroGeometry for GeoJson<'_> { + fn process_geom(&self, processor: &mut P) -> Result<()> { + read_geojson_geom(&mut self.0.as_bytes(), processor) + } +} + +impl GeozeroDatasource for GeoJson<'_> { + fn process(&mut self, processor: &mut P) -> Result<()> { + read_geojson(&mut self.0.as_bytes(), processor) + } +} + +/// GeoJSON Reader. +pub struct GeoJsonReader(pub R); + +impl GeozeroDatasource for GeoJsonReader { + fn process(&mut self, processor: &mut P) -> Result<()> { + read_geojson(&mut self.0, processor) + } +} + +/// Read and process GeoJSON. +pub fn read_geojson(mut reader: R, processor: &mut P) -> Result<()> { + let mut geojson_str = String::new(); + reader.read_to_string(&mut geojson_str)?; + let geojson = geojson_str.parse::()?; + process_geojson(&geojson, processor) +} + +#[allow(dead_code)] +pub fn read_geojson_fc(reader: R, processor: &mut P) -> Result<()> { + for (idx, feature) in FeatureReader::from_reader(reader).features().enumerate() { + process_geojson_feature(&feature?, idx, processor)?; + } + + Ok(()) +} + +/// Read and process GeoJSON geometry. +pub fn read_geojson_geom( + reader: &mut R, + processor: &mut P, +) -> Result<()> { + let mut geojson_str = String::new(); + reader.read_to_string(&mut geojson_str)?; + let geojson = geojson_str.parse::()?; + process_geojson_geom(&geojson, processor) +} + +/// Process top-level GeoJSON items +fn process_geojson(gj: &GeoGeoJson, processor: &mut P) -> Result<()> { + match *gj { + GeoGeoJson::FeatureCollection(ref collection) => { + processor.dataset_begin(None)?; + for (idx, feature) in collection.features.iter().enumerate() { + processor.feature_begin(idx as u64)?; + if let Some(ref properties) = feature.properties { + processor.properties_begin()?; + process_properties(properties, processor)?; + processor.properties_end()?; + } + if let Some(ref geometry) = feature.geometry { + processor.geometry_begin()?; + process_geojson_geom_n(geometry, idx, processor)?; + processor.geometry_end()?; + } + processor.feature_end(idx as u64)?; + } + processor.dataset_end() + } + GeoGeoJson::Feature(ref feature) => process_geojson_feature(feature, 0, processor), + GeoGeoJson::Geometry(ref geometry) => process_geojson_geom_n(geometry, 0, processor), + } +} + +/// Process top-level GeoJSON items +fn process_geojson_feature( + feature: &Feature, + idx: usize, + processor: &mut P, +) -> Result<()> { + processor.dataset_begin(None)?; + if feature.geometry.is_some() || feature.properties.is_some() { + processor.feature_begin(idx as u64)?; + if let Some(ref properties) = feature.properties { + processor.properties_begin()?; + process_properties(properties, processor)?; + processor.properties_end()?; + } + if let Some(ref geometry) = feature.geometry { + processor.geometry_begin()?; + process_geojson_geom_n(geometry, idx, processor)?; + processor.geometry_end()?; + } + processor.feature_end(idx as u64)?; + } + processor.dataset_end() +} + +/// Process top-level GeoJSON items (geometry only) +fn process_geojson_geom(gj: &GeoGeoJson, processor: &mut P) -> Result<()> { + match *gj { + GeoGeoJson::FeatureCollection(ref collection) => { + for (idx, geometry) in collection + .features + .iter() + // Only pass on non-empty geometries, doing so by reference + .filter_map(|feature| feature.geometry.as_ref()) + .enumerate() + { + process_geojson_geom_n(geometry, idx, processor)?; + } + } + GeoGeoJson::Feature(ref feature) => { + if let Some(ref geometry) = feature.geometry { + process_geojson_geom_n(geometry, 0, processor)?; + } + } + GeoGeoJson::Geometry(ref geometry) => { + process_geojson_geom_n(geometry, 0, processor)?; + } + } + Ok(()) +} + +/// Process GeoJSON geometries +pub(crate) fn process_geojson_geom_n( + geom: &Geometry, + idx: usize, + processor: &mut P, +) -> Result<()> { + match geom.value { + Value::Point(ref geometry) => { + processor.point_begin(idx)?; + process_coord(geometry, processor.multi_dim(), 0, processor)?; + processor.point_end(idx) + } + Value::MultiPoint(ref geometry) => { + processor.multipoint_begin(geometry.len(), idx)?; + let multi_dim = processor.multi_dim(); + for (idxc, point_type) in geometry.iter().enumerate() { + process_coord(point_type, multi_dim, idxc, processor)?; + } + processor.multipoint_end(idx) + } + Value::LineString(ref geometry) => process_linestring(geometry, true, idx, processor), + Value::MultiLineString(ref geometry) => { + processor.multilinestring_begin(geometry.len(), idx)?; + for (idx2, linestring_type) in geometry.iter().enumerate() { + process_linestring(linestring_type, false, idx2, processor)?; + } + processor.multilinestring_end(idx) + } + Value::Polygon(ref geometry) => process_polygon(geometry, true, idx, processor), + Value::MultiPolygon(ref geometry) => { + processor.multipolygon_begin(geometry.len(), idx)?; + for (idx2, polygon_type) in geometry.iter().enumerate() { + process_polygon(polygon_type, false, idx2, processor)?; + } + processor.multipolygon_end(idx) + } + Value::GeometryCollection(ref collection) => { + processor.geometrycollection_begin(collection.len(), idx)?; + for (idx2, geometry) in collection.iter().enumerate() { + process_geojson_geom_n(geometry, idx2, processor)?; + } + processor.geometrycollection_end(idx) + } + } +} + +/// Process GeoJSON properties +pub(crate) fn process_properties( + properties: &Map, + processor: &mut P, +) -> Result<()> { + for (i, (key, value)) in properties.iter().enumerate() { + // Could we provide a stable property index? + match value { + JsonValue::String(v) => processor.property(i, key, &ColumnValue::String(v))?, + JsonValue::Number(v) => { + if v.is_f64() { + processor.property(i, key, &ColumnValue::Double(v.as_f64().unwrap()))? + } else if v.is_i64() { + processor.property(i, key, &ColumnValue::Long(v.as_i64().unwrap()))? + } else if v.is_u64() { + processor.property(i, key, &ColumnValue::ULong(v.as_u64().unwrap()))? + } else { + unreachable!() + } + } + JsonValue::Bool(v) => processor.property(i, key, &ColumnValue::Bool(*v))?, + JsonValue::Array(v) => { + let json_string = + serde_json::to_string(v).map_err(|_err| GeozeroError::Property(key.clone()))?; + processor.property(i, key, &ColumnValue::Json(&json_string))? + } + JsonValue::Object(v) => { + let json_string = + serde_json::to_string(v).map_err(|_err| GeozeroError::Property(key.clone()))?; + processor.property(i, key, &ColumnValue::Json(&json_string))? + } + // For null values omit the property + JsonValue::Null => false, + }; + } + Ok(()) +} + +type Position = Vec; +type PointType = Position; +type LineStringType = Vec; +type PolygonType = Vec>; + +fn process_coord( + point_type: &PointType, + multi_dim: bool, + idx: usize, + processor: &mut P, +) -> Result<()> { + if multi_dim { + processor.coordinate( + point_type[0], + point_type[1], + point_type.get(2).copied(), + None, + None, + None, + idx, + ) + } else { + processor.xy(point_type[0], point_type[1], idx) + } +} + +fn process_linestring( + linestring_type: &LineStringType, + tagged: bool, + idx: usize, + processor: &mut P, +) -> Result<()> { + processor.linestring_begin(tagged, linestring_type.len(), idx)?; + let multi_dim = processor.multi_dim(); + for (idxc, point_type) in linestring_type.iter().enumerate() { + process_coord(point_type, multi_dim, idxc, processor)?; + } + processor.linestring_end(tagged, idx) +} + +fn process_polygon( + polygon_type: &PolygonType, + tagged: bool, + idx: usize, + processor: &mut P, +) -> Result<()> { + processor.polygon_begin(tagged, polygon_type.len(), idx)?; + for (idx2, linestring_type) in polygon_type.iter().enumerate() { + process_linestring(linestring_type, false, idx2, processor)?; + } + processor.polygon_end(tagged, idx) +} + +// Note: we excluded the upstream geozero geojson reader tests diff --git a/src/io/geojson/geojson_writer.rs b/src/io/geojson/geojson_writer.rs new file mode 100644 index 00000000..f58ae682 --- /dev/null +++ b/src/io/geojson/geojson_writer.rs @@ -0,0 +1,248 @@ +//! Vendored from geozero under the MIT/Apache 2 license until +//! https://github.com/georust/geozero/pull/208 is merged and released. + +use geozero::error::Result; +use geozero::{ColumnValue, CoordDimensions, FeatureProcessor, GeomProcessor, PropertyProcessor}; +use std::fmt::Display; +use std::io::Write; + +/// GeoJSON writer. +pub struct GeoJsonWriter { + dims: CoordDimensions, + pub(crate) out: W, +} + +impl GeoJsonWriter { + pub fn new(out: W) -> Self { + GeoJsonWriter { + dims: CoordDimensions::default(), + out, + } + } + #[allow(dead_code)] + pub fn with_dims(out: W, dims: CoordDimensions) -> Self { + GeoJsonWriter { dims, out } + } + fn comma(&mut self, idx: usize) -> Result<()> { + if idx > 0 { + self.out.write_all(b",")?; + } + Ok(()) + } +} + +impl FeatureProcessor for GeoJsonWriter { + fn dataset_begin(&mut self, name: Option<&str>) -> Result<()> { + self.out.write_all( + br#"{ +"type": "FeatureCollection""#, + )?; + if let Some(name) = name { + write!(self.out, ",\n\"name\": \"{name}\"")?; + } + self.out.write_all( + br#", +"features": ["#, + )?; + Ok(()) + } + fn dataset_end(&mut self) -> Result<()> { + self.out.write_all(b"]}")?; + Ok(()) + } + fn feature_begin(&mut self, idx: u64) -> Result<()> { + if idx > 0 { + self.out.write_all(b",\n")?; + } + self.out.write_all(br#"{"type": "Feature""#)?; + Ok(()) + } + fn feature_end(&mut self, _idx: u64) -> Result<()> { + self.out.write_all(b"}")?; + Ok(()) + } + fn properties_begin(&mut self) -> Result<()> { + self.out.write_all(br#", "properties": {"#)?; + Ok(()) + } + fn properties_end(&mut self) -> Result<()> { + self.out.write_all(b"}")?; + Ok(()) + } + fn geometry_begin(&mut self) -> Result<()> { + self.out.write_all(br#", "geometry": "#)?; + Ok(()) + } + fn geometry_end(&mut self) -> Result<()> { + Ok(()) + } +} + +impl GeomProcessor for GeoJsonWriter { + fn dimensions(&self) -> CoordDimensions { + self.dims + } + fn xy(&mut self, x: f64, y: f64, idx: usize) -> Result<()> { + self.comma(idx)?; + self.out.write_all(format!("[{x},{y}]").as_bytes())?; + Ok(()) + } + fn coordinate( + &mut self, + x: f64, + y: f64, + z: Option, + _m: Option, + _t: Option, + _tm: Option, + idx: usize, + ) -> Result<()> { + self.comma(idx)?; + self.out.write_all(format!("[{x},{y}").as_bytes())?; + if let Some(z) = z { + self.out.write_all(format!(",{z}").as_bytes())?; + } + self.out.write_all(b"]")?; + Ok(()) + } + fn empty_point(&mut self, idx: usize) -> Result<()> { + self.comma(idx)?; + self.out + .write_all(br#"{"type": "Point", "coordinates": []}"#)?; + Ok(()) + } + fn point_begin(&mut self, idx: usize) -> Result<()> { + self.comma(idx)?; + self.out + .write_all(br#"{"type": "Point", "coordinates": "#)?; + Ok(()) + } + fn point_end(&mut self, _idx: usize) -> Result<()> { + self.out.write_all(b"}")?; + Ok(()) + } + fn multipoint_begin(&mut self, _size: usize, idx: usize) -> Result<()> { + self.comma(idx)?; + self.out + .write_all(br#"{"type": "MultiPoint", "coordinates": ["#)?; + Ok(()) + } + fn multipoint_end(&mut self, _idx: usize) -> Result<()> { + self.out.write_all(b"]}")?; + Ok(()) + } + fn linestring_begin(&mut self, tagged: bool, _size: usize, idx: usize) -> Result<()> { + self.comma(idx)?; + if tagged { + self.out + .write_all(br#"{"type": "LineString", "coordinates": ["#)?; + } else { + self.out.write_all(b"[")?; + } + Ok(()) + } + fn linestring_end(&mut self, tagged: bool, _idx: usize) -> Result<()> { + if tagged { + self.out.write_all(b"]}")?; + } else { + self.out.write_all(b"]")?; + } + Ok(()) + } + fn multilinestring_begin(&mut self, _size: usize, idx: usize) -> Result<()> { + self.comma(idx)?; + self.out + .write_all(br#"{"type": "MultiLineString", "coordinates": ["#)?; + Ok(()) + } + fn multilinestring_end(&mut self, _idx: usize) -> Result<()> { + self.out.write_all(b"]}")?; + Ok(()) + } + fn polygon_begin(&mut self, tagged: bool, _size: usize, idx: usize) -> Result<()> { + self.comma(idx)?; + if tagged { + self.out + .write_all(br#"{"type": "Polygon", "coordinates": ["#)?; + } else { + self.out.write_all(b"[")?; + } + Ok(()) + } + fn polygon_end(&mut self, tagged: bool, _idx: usize) -> Result<()> { + if tagged { + self.out.write_all(b"]}")?; + } else { + self.out.write_all(b"]")?; + } + Ok(()) + } + fn multipolygon_begin(&mut self, _size: usize, idx: usize) -> Result<()> { + self.comma(idx)?; + self.out + .write_all(br#"{"type": "MultiPolygon", "coordinates": ["#)?; + Ok(()) + } + fn multipolygon_end(&mut self, _idx: usize) -> Result<()> { + self.out.write_all(b"]}")?; + Ok(()) + } + fn geometrycollection_begin(&mut self, _size: usize, idx: usize) -> Result<()> { + self.comma(idx)?; + self.out + .write_all(br#"{"type": "GeometryCollection", "geometries": ["#)?; + Ok(()) + } + fn geometrycollection_end(&mut self, _idx: usize) -> Result<()> { + self.out.write_all(b"]}")?; + Ok(()) + } +} + +fn write_num_prop(mut out: W, colname: &str, v: &dyn Display) -> Result<()> { + let colname = colname.replace('\"', "\\\""); + out.write_all(format!(r#""{colname}": {v}"#).as_bytes())?; + Ok(()) +} + +fn write_str_prop(mut out: W, colname: &str, v: &str) -> Result<()> { + let colname = colname.replace('\"', "\\\""); + let value = v.replace('\"', "\\\""); + out.write_all(format!(r#""{colname}": "{value}""#).as_bytes())?; + Ok(()) +} + +fn write_json_prop(mut out: W, colname: &str, v: &str) -> Result<()> { + let colname = colname.replace('\"', "\\\""); + out.write_all(format!(r#""{colname}": {v}"#).as_bytes())?; + Ok(()) +} + +impl PropertyProcessor for GeoJsonWriter { + fn property(&mut self, i: usize, colname: &str, colval: &ColumnValue) -> Result { + if i > 0 { + self.out.write_all(b", ")?; + } + match colval { + ColumnValue::Byte(v) => write_num_prop(&mut self.out, colname, &v)?, + ColumnValue::UByte(v) => write_num_prop(&mut self.out, colname, &v)?, + ColumnValue::Bool(v) => write_num_prop(&mut self.out, colname, &v)?, + ColumnValue::Short(v) => write_num_prop(&mut self.out, colname, &v)?, + ColumnValue::UShort(v) => write_num_prop(&mut self.out, colname, &v)?, + ColumnValue::Int(v) => write_num_prop(&mut self.out, colname, &v)?, + ColumnValue::UInt(v) => write_num_prop(&mut self.out, colname, &v)?, + ColumnValue::Long(v) => write_num_prop(&mut self.out, colname, &v)?, + ColumnValue::ULong(v) => write_num_prop(&mut self.out, colname, &v)?, + ColumnValue::Float(v) => write_num_prop(&mut self.out, colname, &v)?, + ColumnValue::Double(v) => write_num_prop(&mut self.out, colname, &v)?, + ColumnValue::String(v) | ColumnValue::DateTime(v) => { + write_str_prop(&mut self.out, colname, v)?; + } + ColumnValue::Json(v) => write_json_prop(&mut self.out, colname, v)?, + ColumnValue::Binary(_v) => (), + }; + Ok(false) + } +} + +// Note: we excluded the upstream geozero geojson writer tests diff --git a/src/io/geojson/mod.rs b/src/io/geojson/mod.rs index 2d389947..2b865a85 100644 --- a/src/io/geojson/mod.rs +++ b/src/io/geojson/mod.rs @@ -3,5 +3,7 @@ pub use reader::read_geojson; pub use writer::write_geojson; +mod geojson_reader; +mod geojson_writer; mod reader; mod writer; diff --git a/src/io/geojson/reader.rs b/src/io/geojson/reader.rs index 46d072fc..bfb193bc 100644 --- a/src/io/geojson/reader.rs +++ b/src/io/geojson/reader.rs @@ -1,4 +1,4 @@ -use geozero::geojson::GeoJsonReader; +use super::geojson_reader::GeoJsonReader; use geozero::GeozeroDatasource; use std::io::Read; diff --git a/src/io/geojson/writer.rs b/src/io/geojson/writer.rs index b497993f..e789c3da 100644 --- a/src/io/geojson/writer.rs +++ b/src/io/geojson/writer.rs @@ -1,6 +1,6 @@ +use super::geojson_writer::GeoJsonWriter; use crate::error::Result; use crate::table::Table; -use geozero::geojson::GeoJsonWriter; use geozero::GeozeroDatasource; use std::io::Write; diff --git a/src/io/geozero/table/data_source.rs b/src/io/geozero/table/data_source.rs index cf2d7c12..8afa1c54 100644 --- a/src/io/geozero/table/data_source.rs +++ b/src/io/geozero/table/data_source.rs @@ -1,14 +1,16 @@ #![allow(deprecated)] +use std::str::FromStr; + use crate::array::geometry::GeometryArray; use crate::io::geozero::scalar::process_geometry; +use crate::io::geozero::table::json_encoder::{make_encoder, EncoderOptions}; use crate::table::Table; use crate::trait_::GeometryArrayAccessor; -use arrow_array::{ - BinaryArray, Float16Array, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, - Int8Array, LargeBinaryArray, LargeStringArray, RecordBatch, StringArray, UInt16Array, - UInt32Array, UInt64Array, UInt8Array, -}; +use arrow::array::AsArray; +use arrow::datatypes::*; +use arrow_array::timezone::Tz; +use arrow_array::{Array, RecordBatch}; use arrow_schema::{DataType, Schema}; use geozero::error::GeozeroError; use geozero::{ColumnValue, FeatureProcessor, GeomProcessor, GeozeroDatasource, PropertyProcessor}; @@ -104,9 +106,22 @@ fn process_properties( } let name = field.name(); + // Don't pass null properties to geozero + if array.is_null(within_batch_row_idx) { + continue; + } + match field.data_type() { + DataType::Boolean => { + let arr = array.as_boolean(); + processor.property( + property_idx, + name, + &ColumnValue::Bool(arr.value(within_batch_row_idx)), + )?; + } DataType::UInt8 => { - let arr = array.as_any().downcast_ref::().unwrap(); + let arr = array.as_primitive::(); processor.property( property_idx, name, @@ -114,7 +129,7 @@ fn process_properties( )?; } DataType::Int8 => { - let arr = array.as_any().downcast_ref::().unwrap(); + let arr = array.as_primitive::(); processor.property( property_idx, name, @@ -122,7 +137,7 @@ fn process_properties( )?; } DataType::UInt16 => { - let arr = array.as_any().downcast_ref::().unwrap(); + let arr = array.as_primitive::(); processor.property( property_idx, name, @@ -130,7 +145,7 @@ fn process_properties( )?; } DataType::Int16 => { - let arr = array.as_any().downcast_ref::().unwrap(); + let arr = array.as_primitive::(); processor.property( property_idx, name, @@ -138,7 +153,7 @@ fn process_properties( )?; } DataType::UInt32 => { - let arr = array.as_any().downcast_ref::().unwrap(); + let arr = array.as_primitive::(); processor.property( property_idx, name, @@ -146,7 +161,7 @@ fn process_properties( )?; } DataType::Int32 => { - let arr = array.as_any().downcast_ref::().unwrap(); + let arr = array.as_primitive::(); processor.property( property_idx, name, @@ -154,7 +169,7 @@ fn process_properties( )?; } DataType::UInt64 => { - let arr = array.as_any().downcast_ref::().unwrap(); + let arr = array.as_primitive::(); processor.property( property_idx, name, @@ -162,7 +177,7 @@ fn process_properties( )?; } DataType::Int64 => { - let arr = array.as_any().downcast_ref::().unwrap(); + let arr = array.as_primitive::(); processor.property( property_idx, name, @@ -170,7 +185,7 @@ fn process_properties( )?; } DataType::Float16 => { - let arr = array.as_any().downcast_ref::().unwrap(); + let arr = array.as_primitive::(); processor.property( property_idx, name, @@ -178,7 +193,7 @@ fn process_properties( )?; } DataType::Float32 => { - let arr = array.as_any().downcast_ref::().unwrap(); + let arr = array.as_primitive::(); processor.property( property_idx, name, @@ -186,7 +201,7 @@ fn process_properties( )?; } DataType::Float64 => { - let arr = array.as_any().downcast_ref::().unwrap(); + let arr = array.as_primitive::(); processor.property( property_idx, name, @@ -194,7 +209,7 @@ fn process_properties( )?; } DataType::Utf8 => { - let arr = array.as_any().downcast_ref::().unwrap(); + let arr = array.as_string::(); processor.property( property_idx, name, @@ -202,7 +217,7 @@ fn process_properties( )?; } DataType::LargeUtf8 => { - let arr = array.as_any().downcast_ref::().unwrap(); + let arr = array.as_string::(); processor.property( property_idx, name, @@ -210,7 +225,7 @@ fn process_properties( )?; } DataType::Binary => { - let arr = array.as_any().downcast_ref::().unwrap(); + let arr = array.as_binary::(); processor.property( property_idx, name, @@ -218,15 +233,79 @@ fn process_properties( )?; } DataType::LargeBinary => { - let arr = array.as_any().downcast_ref::().unwrap(); + let arr = array.as_binary::(); processor.property( property_idx, name, &ColumnValue::Binary(arr.value(within_batch_row_idx)), )?; } - // geozero type system also supports json and datetime - _ => todo!("json and datetime types"), + DataType::Struct(_) | DataType::List(_) | DataType::LargeList(_) => { + if array.is_valid(within_batch_row_idx) { + let mut encoder = make_encoder( + array, + &EncoderOptions { + explicit_nulls: false, + }, + ) + .map_err(|err| GeozeroError::Property(err.to_string()))?; + let mut buf = vec![]; + encoder.encode(within_batch_row_idx, &mut buf); + let json_string = String::from_utf8(buf) + .map_err(|err| GeozeroError::Property(err.to_string()))?; + processor.property(property_idx, name, &ColumnValue::Json(&json_string))?; + } + } + DataType::Date32 => { + let arr = array.as_primitive::(); + if arr.is_valid(within_batch_row_idx) { + let datetime = arr.value_as_datetime(within_batch_row_idx).unwrap(); + let dt_str = datetime.and_utc().to_rfc3339(); + processor.property(property_idx, name, &ColumnValue::DateTime(&dt_str))?; + } + } + DataType::Date64 => { + let arr = array.as_primitive::(); + if arr.is_valid(within_batch_row_idx) { + let datetime = arr.value_as_datetime(within_batch_row_idx).unwrap(); + let dt_str = datetime.and_utc().to_rfc3339(); + processor.property(property_idx, name, &ColumnValue::DateTime(&dt_str))?; + } + } + DataType::Timestamp(unit, tz) => { + let arrow_tz = if let Some(tz) = tz { + Some(Tz::from_str(tz).map_err(|err| GeozeroError::Property(err.to_string()))?) + } else { + None + }; + + macro_rules! impl_timestamp { + ($arrow_type:ty) => {{ + let arr = array.as_primitive::<$arrow_type>(); + let dt_str = if let Some(arrow_tz) = arrow_tz { + arr.value_as_datetime_with_tz(within_batch_row_idx, arrow_tz) + .unwrap() + .to_rfc3339() + } else { + arr.value_as_datetime(within_batch_row_idx) + .unwrap() + .and_utc() + .to_rfc3339() + }; + processor.property(property_idx, name, &ColumnValue::DateTime(&dt_str))?; + }}; + } + + if array.is_valid(within_batch_row_idx) { + match unit { + TimeUnit::Microsecond => impl_timestamp!(TimestampMicrosecondType), + TimeUnit::Millisecond => impl_timestamp!(TimestampMillisecondType), + TimeUnit::Nanosecond => impl_timestamp!(TimestampNanosecondType), + TimeUnit::Second => impl_timestamp!(TimestampSecondType), + } + } + } + dt => todo!("unsupported type: {:?}", dt), } property_idx += 1; } diff --git a/src/io/geozero/table/json_encoder.rs b/src/io/geozero/table/json_encoder.rs new file mode 100644 index 00000000..f51d65e3 --- /dev/null +++ b/src/io/geozero/table/json_encoder.rs @@ -0,0 +1,526 @@ +//! This is vendored from arrow-json under the Apache 2 license +//! We want our own JSON encoder so that we can encode specific struct/list rows to JSON to fit +//! into the geozero type system. + +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow_array::cast::AsArray; +use arrow_array::types::*; +use arrow_array::*; +use arrow_buffer::{ArrowNativeType, NullBuffer, OffsetBuffer, ScalarBuffer}; +use arrow_cast::display::{ArrayFormatter, FormatOptions}; +use arrow_schema::{ArrowError, DataType, FieldRef}; +use half::f16; +use lexical_core::FormattedSize; +use serde::Serializer; +use std::io::Write; + +#[derive(Debug, Clone, Default)] +pub struct EncoderOptions { + pub explicit_nulls: bool, +} + +/// A trait to format array values as JSON values +/// +/// Nullability is handled by the caller to allow encoding nulls implicitly, i.e. `{}` instead of `{"a": null}` +pub trait Encoder { + /// Encode the non-null value at index `idx` to `out` + /// + /// The behaviour is unspecified if `idx` corresponds to a null index + fn encode(&mut self, idx: usize, out: &mut Vec); +} + +pub fn make_encoder<'a>( + array: &'a dyn Array, + options: &EncoderOptions, +) -> Result, ArrowError> { + let (encoder, _nulls) = make_encoder_impl(array, options)?; + // Note: we comment this out because we're encoding _inner_ columns, our struct array does not + // represent a RecordBatch. + // assert!(nulls.is_none(), "root cannot be nullable"); + Ok(encoder) +} + +fn make_encoder_impl<'a>( + array: &'a dyn Array, + options: &EncoderOptions, +) -> Result<(Box, Option), ArrowError> { + macro_rules! primitive_helper { + ($t:ty) => {{ + let array = array.as_primitive::<$t>(); + let nulls = array.nulls().cloned(); + (Box::new(PrimitiveEncoder::new(array)) as _, nulls) + }}; + } + + Ok(downcast_integer! { + array.data_type() => (primitive_helper), + DataType::Float16 => primitive_helper!(Float16Type), + DataType::Float32 => primitive_helper!(Float32Type), + DataType::Float64 => primitive_helper!(Float64Type), + DataType::Boolean => { + let array = array.as_boolean(); + (Box::new(BooleanEncoder(array)), array.nulls().cloned()) + } + DataType::Null => (Box::new(NullEncoder), array.logical_nulls()), + DataType::Utf8 => { + let array = array.as_string::(); + (Box::new(StringEncoder(array)) as _, array.nulls().cloned()) + } + DataType::LargeUtf8 => { + let array = array.as_string::(); + (Box::new(StringEncoder(array)) as _, array.nulls().cloned()) + } + DataType::List(_) => { + let array = array.as_list::(); + (Box::new(ListEncoder::try_new(array, options)?) as _, array.nulls().cloned()) + } + DataType::LargeList(_) => { + let array = array.as_list::(); + (Box::new(ListEncoder::try_new(array, options)?) as _, array.nulls().cloned()) + } + DataType::FixedSizeList(_, _) => { + let array = array.as_fixed_size_list(); + (Box::new(FixedSizeListEncoder::try_new(array, options)?) as _, array.nulls().cloned()) + } + + DataType::Dictionary(_, _) => downcast_dictionary_array! { + array => (Box::new(DictionaryEncoder::try_new(array, options)?) as _, array.logical_nulls()), + _ => unreachable!() + } + + DataType::Map(_, _) => { + let array = array.as_map(); + (Box::new(MapEncoder::try_new(array, options)?) as _, array.nulls().cloned()) + } + + DataType::FixedSizeBinary(_) => { + let array = array.as_fixed_size_binary(); + (Box::new(FixedSizeBinaryEncoder::new(array)) as _, array.nulls().cloned()) + } + + DataType::Struct(fields) => { + let array = array.as_struct(); + let encoders = fields.iter().zip(array.columns()).map(|(field, array)| { + let (encoder, nulls) = make_encoder_impl(array, options)?; + Ok(FieldEncoder{ + field: field.clone(), + encoder, nulls + }) + }).collect::, ArrowError>>()?; + + let encoder = StructArrayEncoder{ + encoders, + explicit_nulls: options.explicit_nulls, + }; + (Box::new(encoder) as _, array.nulls().cloned()) + } + d => match d.is_temporal() { + true => { + // Note: the implementation of Encoder for ArrayFormatter assumes it does not produce + // characters that would need to be escaped within a JSON string, e.g. `'"'`. + // If support for user-provided format specifications is added, this assumption + // may need to be revisited + let options = FormatOptions::new().with_display_error(true); + let formatter = ArrayFormatter::try_new(array, &options)?; + (Box::new(formatter) as _, array.nulls().cloned()) + } + false => return Err(ArrowError::InvalidArgumentError(format!("JSON Writer does not support data type: {d}"))), + } + }) +} + +fn encode_string(s: &str, out: &mut Vec) { + let mut serializer = serde_json::Serializer::new(out); + serializer.serialize_str(s).unwrap(); +} + +struct FieldEncoder<'a> { + field: FieldRef, + encoder: Box, + nulls: Option, +} + +struct StructArrayEncoder<'a> { + encoders: Vec>, + explicit_nulls: bool, +} + +impl<'a> Encoder for StructArrayEncoder<'a> { + fn encode(&mut self, idx: usize, out: &mut Vec) { + out.push(b'{'); + let mut is_first = true; + for field_encoder in &mut self.encoders { + let is_null = field_encoder.nulls.as_ref().is_some_and(|n| n.is_null(idx)); + if is_null && !self.explicit_nulls { + continue; + } + + if !is_first { + out.push(b','); + } + is_first = false; + + encode_string(field_encoder.field.name(), out); + out.push(b':'); + + match is_null { + true => out.extend_from_slice(b"null"), + false => field_encoder.encoder.encode(idx, out), + } + } + out.push(b'}'); + } +} + +trait PrimitiveEncode: ArrowNativeType { + type Buffer; + + // Workaround https://github.com/rust-lang/rust/issues/61415 + fn init_buffer() -> Self::Buffer; + + /// Encode the primitive value as bytes, returning a reference to that slice. + /// + /// `buf` is temporary space that may be used + fn encode(self, buf: &mut Self::Buffer) -> &[u8]; +} + +macro_rules! integer_encode { + ($($t:ty),*) => { + $( + impl PrimitiveEncode for $t { + type Buffer = [u8; Self::FORMATTED_SIZE]; + + fn init_buffer() -> Self::Buffer { + [0; Self::FORMATTED_SIZE] + } + + fn encode(self, buf: &mut Self::Buffer) -> &[u8] { + lexical_core::write(self, buf) + } + } + )* + }; +} +integer_encode!(i8, i16, i32, i64, u8, u16, u32, u64); + +macro_rules! float_encode { + ($($t:ty),*) => { + $( + impl PrimitiveEncode for $t { + type Buffer = [u8; Self::FORMATTED_SIZE]; + + fn init_buffer() -> Self::Buffer { + [0; Self::FORMATTED_SIZE] + } + + fn encode(self, buf: &mut Self::Buffer) -> &[u8] { + if self.is_infinite() || self.is_nan() { + b"null" + } else { + lexical_core::write(self, buf) + } + } + } + )* + }; +} +float_encode!(f32, f64); + +impl PrimitiveEncode for f16 { + type Buffer = ::Buffer; + + fn init_buffer() -> Self::Buffer { + f32::init_buffer() + } + + fn encode(self, buf: &mut Self::Buffer) -> &[u8] { + self.to_f32().encode(buf) + } +} + +struct PrimitiveEncoder { + values: ScalarBuffer, + buffer: N::Buffer, +} + +impl PrimitiveEncoder { + fn new>(array: &PrimitiveArray

) -> Self { + Self { + values: array.values().clone(), + buffer: N::init_buffer(), + } + } +} + +impl Encoder for PrimitiveEncoder { + fn encode(&mut self, idx: usize, out: &mut Vec) { + out.extend_from_slice(self.values[idx].encode(&mut self.buffer)); + } +} + +struct BooleanEncoder<'a>(&'a BooleanArray); + +impl<'a> Encoder for BooleanEncoder<'a> { + fn encode(&mut self, idx: usize, out: &mut Vec) { + match self.0.value(idx) { + true => out.extend_from_slice(b"true"), + false => out.extend_from_slice(b"false"), + } + } +} + +struct StringEncoder<'a, O: OffsetSizeTrait>(&'a GenericStringArray); + +impl<'a, O: OffsetSizeTrait> Encoder for StringEncoder<'a, O> { + fn encode(&mut self, idx: usize, out: &mut Vec) { + encode_string(self.0.value(idx), out); + } +} + +struct ListEncoder<'a, O: OffsetSizeTrait> { + offsets: OffsetBuffer, + nulls: Option, + encoder: Box, +} + +impl<'a, O: OffsetSizeTrait> ListEncoder<'a, O> { + fn try_new( + array: &'a GenericListArray, + options: &EncoderOptions, + ) -> Result { + let (encoder, nulls) = make_encoder_impl(array.values().as_ref(), options)?; + Ok(Self { + offsets: array.offsets().clone(), + encoder, + nulls, + }) + } +} + +impl<'a, O: OffsetSizeTrait> Encoder for ListEncoder<'a, O> { + fn encode(&mut self, idx: usize, out: &mut Vec) { + let end = self.offsets[idx + 1].as_usize(); + let start = self.offsets[idx].as_usize(); + out.push(b'['); + match self.nulls.as_ref() { + Some(n) => (start..end).for_each(|idx| { + if idx != start { + out.push(b',') + } + match n.is_null(idx) { + true => out.extend_from_slice(b"null"), + false => self.encoder.encode(idx, out), + } + }), + None => (start..end).for_each(|idx| { + if idx != start { + out.push(b',') + } + self.encoder.encode(idx, out); + }), + } + out.push(b']'); + } +} + +struct FixedSizeListEncoder<'a> { + value_length: usize, + nulls: Option, + encoder: Box, +} + +impl<'a> FixedSizeListEncoder<'a> { + fn try_new( + array: &'a FixedSizeListArray, + options: &EncoderOptions, + ) -> Result { + let (encoder, nulls) = make_encoder_impl(array.values().as_ref(), options)?; + Ok(Self { + encoder, + nulls, + value_length: array.value_length().as_usize(), + }) + } +} + +impl<'a> Encoder for FixedSizeListEncoder<'a> { + fn encode(&mut self, idx: usize, out: &mut Vec) { + let start = idx * self.value_length; + let end = start + self.value_length; + out.push(b'['); + match self.nulls.as_ref() { + Some(n) => (start..end).for_each(|idx| { + if idx != start { + out.push(b','); + } + if n.is_null(idx) { + out.extend_from_slice(b"null"); + } else { + self.encoder.encode(idx, out); + } + }), + None => (start..end).for_each(|idx| { + if idx != start { + out.push(b','); + } + self.encoder.encode(idx, out); + }), + } + out.push(b']'); + } +} + +struct DictionaryEncoder<'a, K: ArrowDictionaryKeyType> { + keys: ScalarBuffer, + encoder: Box, +} + +impl<'a, K: ArrowDictionaryKeyType> DictionaryEncoder<'a, K> { + fn try_new( + array: &'a DictionaryArray, + options: &EncoderOptions, + ) -> Result { + let encoder = make_encoder(array.values().as_ref(), options)?; + + Ok(Self { + keys: array.keys().values().clone(), + encoder, + }) + } +} + +impl<'a, K: ArrowDictionaryKeyType> Encoder for DictionaryEncoder<'a, K> { + fn encode(&mut self, idx: usize, out: &mut Vec) { + self.encoder.encode(self.keys[idx].as_usize(), out) + } +} + +impl<'a> Encoder for ArrayFormatter<'a> { + fn encode(&mut self, idx: usize, out: &mut Vec) { + out.push(b'"'); + // Should be infallible + // Note: We are making an assumption that the formatter does not produce characters that require escaping + let _ = write!(out, "{}", self.value(idx)); + out.push(b'"') + } +} + +struct NullEncoder; + +impl Encoder for NullEncoder { + fn encode(&mut self, _idx: usize, _out: &mut Vec) { + unreachable!() + } +} + +struct MapEncoder<'a> { + offsets: OffsetBuffer, + keys: Box, + values: Box, + value_nulls: Option, + explicit_nulls: bool, +} + +impl<'a> MapEncoder<'a> { + fn try_new(array: &'a MapArray, options: &EncoderOptions) -> Result { + let values = array.values(); + let keys = array.keys(); + + if !matches!(keys.data_type(), DataType::Utf8 | DataType::LargeUtf8) { + return Err(ArrowError::JsonError(format!( + "Only UTF8 keys supported by JSON MapArray Writer: got {:?}", + keys.data_type() + ))); + } + + let (keys, key_nulls) = make_encoder_impl(keys, options)?; + let (values, value_nulls) = make_encoder_impl(values, options)?; + + // We sanity check nulls as these are currently not enforced by MapArray (#1697) + if key_nulls.is_some_and(|x| x.null_count() != 0) { + return Err(ArrowError::InvalidArgumentError( + "Encountered nulls in MapArray keys".to_string(), + )); + } + + if array.entries().nulls().is_some_and(|x| x.null_count() != 0) { + return Err(ArrowError::InvalidArgumentError( + "Encountered nulls in MapArray entries".to_string(), + )); + } + + Ok(Self { + offsets: array.offsets().clone(), + keys, + values, + value_nulls, + explicit_nulls: options.explicit_nulls, + }) + } +} + +impl<'a> Encoder for MapEncoder<'a> { + fn encode(&mut self, idx: usize, out: &mut Vec) { + let end = self.offsets[idx + 1].as_usize(); + let start = self.offsets[idx].as_usize(); + + let mut is_first = true; + + out.push(b'{'); + for idx in start..end { + let is_null = self.value_nulls.as_ref().is_some_and(|n| n.is_null(idx)); + if is_null && !self.explicit_nulls { + continue; + } + + if !is_first { + out.push(b','); + } + is_first = false; + + self.keys.encode(idx, out); + out.push(b':'); + + match is_null { + true => out.extend_from_slice(b"null"), + false => self.values.encode(idx, out), + } + } + out.push(b'}'); + } +} + +struct FixedSizeBinaryEncoder<'a>(&'a FixedSizeBinaryArray); + +impl<'a> FixedSizeBinaryEncoder<'a> { + fn new(array: &'a FixedSizeBinaryArray) -> Self { + Self(array) + } +} + +impl<'a> Encoder for FixedSizeBinaryEncoder<'a> { + fn encode(&mut self, idx: usize, out: &mut Vec) { + out.push(b'"'); + for byte in self.0.value(idx) { + // this write is infallible + write!(out, "{byte:02x}").unwrap(); + } + out.push(b'"'); + } +} diff --git a/src/io/geozero/table/mod.rs b/src/io/geozero/table/mod.rs index cd7b4407..344b4872 100644 --- a/src/io/geozero/table/mod.rs +++ b/src/io/geozero/table/mod.rs @@ -1,4 +1,5 @@ mod builder; mod data_source; +mod json_encoder; pub use builder::{GeoTableBuilder, GeoTableBuilderOptions};