Skip to content

Commit

Permalink
Merge pull request #11213 from rouault/GetArrowStream_DATETIME_AS_STRING
Browse files Browse the repository at this point in the history
OGRLayer::GetArrowStream(): add a DATETIME_AS_STRING=YES/NO option
  • Loading branch information
rouault authored Nov 25, 2024
2 parents 181b6b9 + 324d024 commit c7606da
Show file tree
Hide file tree
Showing 18 changed files with 710 additions and 78 deletions.
32 changes: 29 additions & 3 deletions apps/ogr2ogr_lib.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3997,7 +3997,8 @@ static int GetArrowGeomFieldIndex(const struct ArrowSchema *psLayerSchema,
/************************************************************************/

static CPLStringList
BuildGetArrowStreamOptions(const GDALVectorTranslateOptions *psOptions,
BuildGetArrowStreamOptions(OGRLayer *poSrcLayer, OGRLayer *poDstLayer,
const GDALVectorTranslateOptions *psOptions,
bool bPreserveFID)
{
CPLStringList aosOptionsGetArrowStream;
Expand All @@ -4021,6 +4022,31 @@ BuildGetArrowStreamOptions(const GDALVectorTranslateOptions *psOptions,
"MAX_FEATURES_IN_BATCH",
CPLSPrintf("%d", psOptions->nGroupTransactions));
}

auto poSrcDS = poSrcLayer->GetDataset();
auto poDstDS = poDstLayer->GetDataset();
if (poSrcDS && poDstDS)
{
auto poSrcDriver = poSrcDS->GetDriver();
auto poDstDriver = poDstDS->GetDriver();

const auto IsArrowNativeDriver = [](GDALDriver *poDriver)
{
return EQUAL(poDriver->GetDescription(), "ARROW") ||
EQUAL(poDriver->GetDescription(), "PARQUET") ||
EQUAL(poDriver->GetDescription(), "ADBC");
};

if (poSrcDriver && poDstDriver && !IsArrowNativeDriver(poSrcDriver) &&
!IsArrowNativeDriver(poDstDriver))
{
// For non-Arrow-native drivers, request DateTime as string, to
// allow mix of timezones
aosOptionsGetArrowStream.SetNameValue(GAS_OPT_DATETIME_AS_STRING,
"YES");
}
}

return aosOptionsGetArrowStream;
}

Expand Down Expand Up @@ -4085,8 +4111,8 @@ bool SetupTargetLayer::CanUseWriteArrowBatch(
}
}

const CPLStringList aosGetArrowStreamOptions(
BuildGetArrowStreamOptions(psOptions, bPreserveFID));
const CPLStringList aosGetArrowStreamOptions(BuildGetArrowStreamOptions(
poSrcLayer, poDstLayer, psOptions, bPreserveFID));
if (poSrcLayer->GetArrowStream(streamSrc.get(),
aosGetArrowStreamOptions.List()))
{
Expand Down
2 changes: 1 addition & 1 deletion autotest/cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ gdal_standard_includes(gdal_unit_test)
target_compile_options(gdal_unit_test PRIVATE ${GDAL_CXX_WARNING_FLAGS})
target_compile_definitions(gdal_unit_test PRIVATE -DGDAL_TEST_ROOT_DIR="${GDAL_ROOT_TEST_DIR}")
target_include_directories(
gdal_unit_test PRIVATE $<TARGET_PROPERTY:appslib,SOURCE_DIR> $<TARGET_PROPERTY:gdal_vrt,SOURCE_DIR>)
gdal_unit_test PRIVATE $<TARGET_PROPERTY:appslib,SOURCE_DIR> $<TARGET_PROPERTY:gdal_vrt,SOURCE_DIR> $<TARGET_PROPERTY:ogrsf_generic,SOURCE_DIR>)
if (GDAL_USE_SQLITE3)
target_compile_definitions(gdal_unit_test PRIVATE -DHAVE_SQLITE3)
target_include_directories(
Expand Down
40 changes: 40 additions & 0 deletions autotest/cpp/test_ogr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "ogrsf_frmts.h"
#include "../../ogr/ogrsf_frmts/osm/gpb.h"
#include "ogr_recordbatch.h"
#include "ogrlayerarrow.h"

#include <string>
#include <algorithm>
Expand Down Expand Up @@ -4498,4 +4499,43 @@ TEST_F(test_ogr, OGRFeature_SetGeomField)
poFeatureDefn->Release();
}

TEST_F(test_ogr, GetArrowStream_DateTime_As_String)
{
auto poDS = std::unique_ptr<GDALDataset>(
GetGDALDriverManager()->GetDriverByName("Memory")->Create(
"", 0, 0, 0, GDT_Unknown, nullptr));
auto poLayer = poDS->CreateLayer("test", nullptr, wkbNone);
OGRFieldDefn oFieldDefn("dt", OFTDateTime);
poLayer->CreateField(&oFieldDefn);
struct ArrowArrayStream stream;
CPLStringList aosOptions;
aosOptions.SetNameValue("INCLUDE_FID", "NO");
aosOptions.SetNameValue("DATETIME_AS_STRING", "YES");
ASSERT_TRUE(poLayer->GetArrowStream(&stream, aosOptions.List()));
struct ArrowSchema schema;
memset(&schema, 0, sizeof(schema));
EXPECT_EQ(stream.get_schema(&stream, &schema), 0);
EXPECT_TRUE(schema.n_children == 1 &&
strcmp(schema.children[0]->format, "u") == 0)
<< schema.n_children;
if (schema.n_children == 1 && strcmp(schema.children[0]->format, "u") == 0)
{
EXPECT_TRUE(schema.children[0]->metadata != nullptr);
if (schema.children[0]->metadata)
{
auto oMapKeyValue =
OGRParseArrowMetadata(schema.children[0]->metadata);
EXPECT_EQ(oMapKeyValue.size(), 1);
if (oMapKeyValue.size() == 1)
{
EXPECT_STREQ(oMapKeyValue.begin()->first.c_str(),
"GDAL:OGR:type");
EXPECT_STREQ(oMapKeyValue.begin()->second.c_str(), "DateTime");
}
}
}
schema.release(&schema);
stream.release(&stream);
}

} // namespace
30 changes: 30 additions & 0 deletions autotest/ogr/ogr_adbc.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,36 @@ def test_ogr_adbc_test_ogrsf_parquet_filename_with_glob():
assert "ERROR" not in ret


###############################################################################
# Test DATETIME_AS_STRING=YES GetArrowStream() option


def test_ogr_adbc_arrow_stream_numpy_datetime_as_string(tmp_vsimem):
pytest.importorskip("osgeo.gdal_array")
pytest.importorskip("numpy")

if not _has_libduckdb():
pytest.skip("libduckdb.so missing")

with gdal.OpenEx(
"data/parquet/test.parquet", gdal.OF_VECTOR, allowed_drivers=["ADBC"]
) as ds:
lyr = ds.GetLayer(0)
stream = lyr.GetArrowStreamAsNumPy(
options=["USE_MASKED_ARRAYS=NO", "DATETIME_AS_STRING=YES"]
)
batches = [batch for batch in stream]
batch = batches[0]
# Should be "2019-01-01T14:00:00.500-02:15" but DuckDB returns in UTC
# On my machine, for some reason it returns without the Z, whereas on
# the ubuntu_2404 it returns with the Z... despite both using libduckdb 1.1.3
# at time of writing...
assert batch["timestamp_ms_gmt_minus_0215"][0] in (
b"2019-01-01T16:15:00.500",
b"2019-01-01T16:15:00.500Z",
)


###############################################################################
# Run test_ogrsf on a DuckDB dataset

Expand Down
49 changes: 49 additions & 0 deletions autotest/ogr/ogr_flatgeobuf.py
Original file line number Diff line number Diff line change
Expand Up @@ -1564,3 +1564,52 @@ def test_ogr_flatgeobuf_sql_arrow(tmp_vsimem):
assert f["bar"] == "baz"
assert f.GetGeometryRef().ExportToWkt() == "POINT (1 2)"
f = tmp_lyr.GetNextFeature()


###############################################################################
# Test DATETIME_AS_STRING=YES GetArrowStream() option


def test_ogr_flatgeobuf_arrow_stream_numpy_datetime_as_string(tmp_vsimem):
pytest.importorskip("osgeo.gdal_array")
pytest.importorskip("numpy")

filename = str(tmp_vsimem / "datetime_as_string.fgb")
with ogr.GetDriverByName("FlatGeoBuf").CreateDataSource(filename) as ds:
lyr = ds.CreateLayer("test")

field = ogr.FieldDefn("datetime", ogr.OFTDateTime)
lyr.CreateField(field)

f = ogr.Feature(lyr.GetLayerDefn())
f.SetGeometry(ogr.CreateGeometryFromWkt("POINT (1 2)"))
lyr.CreateFeature(f)

f = ogr.Feature(lyr.GetLayerDefn())
f.SetField("datetime", "2022-05-31T12:34:56.789Z")
f.SetGeometry(ogr.CreateGeometryFromWkt("POINT (1 2)"))
lyr.CreateFeature(f)

f = ogr.Feature(lyr.GetLayerDefn())
f.SetField("datetime", "2022-05-31T12:34:56")
f.SetGeometry(ogr.CreateGeometryFromWkt("POINT (1 2)"))
lyr.CreateFeature(f)

f = ogr.Feature(lyr.GetLayerDefn())
f.SetField("datetime", "2022-05-31T12:34:56+12:30")
f.SetGeometry(ogr.CreateGeometryFromWkt("POINT (1 2)"))
lyr.CreateFeature(f)

with ogr.Open(filename) as ds:
lyr = ds.GetLayer(0)
stream = lyr.GetArrowStreamAsNumPy(
options=["USE_MASKED_ARRAYS=NO", "DATETIME_AS_STRING=YES"]
)
batches = [batch for batch in stream]
assert len(batches) == 1
batch = batches[0]
assert len(batch["datetime"]) == 4
assert batch["datetime"][0] == b""
assert batch["datetime"][1] == b"2022-05-31T12:34:56.789Z"
assert batch["datetime"][2] == b"2022-05-31T12:34:56"
assert batch["datetime"][3] == b"2022-05-31T12:34:56+12:30"
73 changes: 73 additions & 0 deletions autotest/ogr/ogr_gpkg.py
Original file line number Diff line number Diff line change
Expand Up @@ -10831,3 +10831,76 @@ def test_ogr_gpkg_write_check_golden_file(tmp_path, src_filename):
golden_data[96] = golden_data[97] = golden_data[98] = golden_data[99] = 0
got_data[96] = got_data[97] = got_data[98] = got_data[99] = 0
assert got_data == golden_data


###############################################################################
# Test DATETIME_AS_STRING=YES GetArrowStream() option


def test_ogr_gpkg_arrow_stream_numpy_datetime_as_string(tmp_vsimem):
pytest.importorskip("osgeo.gdal_array")
pytest.importorskip("numpy")

filename = str(tmp_vsimem / "datetime_as_string.gpkg")
ds = ogr.GetDriverByName("GPKG").CreateDataSource(filename)
lyr = ds.CreateLayer("test")

field = ogr.FieldDefn("datetime", ogr.OFTDateTime)
lyr.CreateField(field)

f = ogr.Feature(lyr.GetLayerDefn())
lyr.CreateFeature(f)

f = ogr.Feature(lyr.GetLayerDefn())
f.SetField("datetime", "2022-05-31T12:34:56.789Z")
lyr.CreateFeature(f)

f = ogr.Feature(lyr.GetLayerDefn())
f.SetField("datetime", "2022-05-31T12:34:56.000")
lyr.CreateFeature(f)

f = ogr.Feature(lyr.GetLayerDefn())
f.SetField("datetime", "2022-05-31T12:34:56.000+12:30")
lyr.CreateFeature(f)

# Test DATETIME_AS_STRING=YES
stream = lyr.GetArrowStreamAsNumPy(
options=["USE_MASKED_ARRAYS=NO", "DATETIME_AS_STRING=YES"]
)
batches = [batch for batch in stream]
assert len(batches) == 1
batch = batches[0]
assert len(batch["datetime"]) == 4
assert batch["datetime"][0] == b""
assert batch["datetime"][1] == b"2022-05-31T12:34:56.789Z"
assert batch["datetime"][2] == b"2022-05-31T12:34:56.000"
assert batch["datetime"][3] == b"2022-05-31T12:34:56.000+12:30"

# Setting a filer tests the use of the less optimized
# OGRGeoPackageTableLayer::GetNextArray() implementation
lyr.SetAttributeFilter("1 = 1")
stream = lyr.GetArrowStreamAsNumPy(
options=["USE_MASKED_ARRAYS=NO", "DATETIME_AS_STRING=YES"]
)
lyr.SetAttributeFilter(None)
batches = [batch for batch in stream]
assert len(batches) == 1
batch = batches[0]
assert len(batch["datetime"]) == 4
assert batch["datetime"][0] == b""
assert batch["datetime"][1] == b"2022-05-31T12:34:56.789Z"
assert batch["datetime"][2] == b"2022-05-31T12:34:56.000"
assert batch["datetime"][3] == b"2022-05-31T12:34:56.000+12:30"

with ds.ExecuteSQL("SELECT * FROM test") as sql_lyr:
stream = sql_lyr.GetArrowStreamAsNumPy(
options=["USE_MASKED_ARRAYS=NO", "DATETIME_AS_STRING=YES"]
)
batches = [batch for batch in stream]
assert len(batches) == 1
batch = batches[0]
assert len(batch["datetime"]) == 4
assert batch["datetime"][0] == b""
assert batch["datetime"][1] == b"2022-05-31T12:34:56.789Z"
assert batch["datetime"][2] == b"2022-05-31T12:34:56.000"
assert batch["datetime"][3] == b"2022-05-31T12:34:56.000+12:30"
95 changes: 95 additions & 0 deletions autotest/ogr/ogr_mem.py
Original file line number Diff line number Diff line change
Expand Up @@ -979,6 +979,101 @@ def test_ogr_mem_arrow_stream_numpy():
assert len(batches) == 0


###############################################################################
# Test DATETIME_AS_STRING=YES GetArrowStream() option


def test_ogr_mem_arrow_stream_numpy_datetime_as_string():
pytest.importorskip("osgeo.gdal_array")
pytest.importorskip("numpy")

ds = ogr.GetDriverByName("Memory").CreateDataSource("")
lyr = ds.CreateLayer("foo")

field = ogr.FieldDefn("datetime", ogr.OFTDateTime)
lyr.CreateField(field)

f = ogr.Feature(lyr.GetLayerDefn())
lyr.CreateFeature(f)

f = ogr.Feature(lyr.GetLayerDefn())
f.SetField("datetime", "2022-05-31T12:34:56.789Z")
lyr.CreateFeature(f)

f = ogr.Feature(lyr.GetLayerDefn())
f.SetField("datetime", "2022-05-31T12:34:56")
lyr.CreateFeature(f)

f = ogr.Feature(lyr.GetLayerDefn())
f.SetField("datetime", "2022-05-31T12:34:56+12:30")
lyr.CreateFeature(f)

# Test DATETIME_AS_STRING=YES
stream = lyr.GetArrowStreamAsNumPy(
options=["USE_MASKED_ARRAYS=NO", "DATETIME_AS_STRING=YES"]
)
batches = [batch for batch in stream]
assert len(batches) == 1
batch = batches[0]
assert len(batch["datetime"]) == 4
assert batch["datetime"][0] == b""
assert batch["datetime"][1] == b"2022-05-31T12:34:56.789Z"
assert batch["datetime"][2] == b"2022-05-31T12:34:56"
assert batch["datetime"][3] == b"2022-05-31T12:34:56+12:30"


###############################################################################
# Test CreateFieldFromArrowSchema() when there is a GDAL:OGR:type=DateTime
# Arrow schema metadata.


@gdaltest.enable_exceptions()
def test_ogr_mem_arrow_write_with_datetime_as_string():

src_ds = ogr.GetDriverByName("Memory").CreateDataSource("")
src_lyr = src_ds.CreateLayer("src_lyr", geom_type=ogr.wkbNone)

field = ogr.FieldDefn("dt", ogr.OFTDateTime)
src_lyr.CreateField(field)

f = ogr.Feature(src_lyr.GetLayerDefn())
src_lyr.CreateFeature(f)

f = ogr.Feature(src_lyr.GetLayerDefn())
f.SetField("dt", "2022-05-31T12:34:56.789Z")
src_lyr.CreateFeature(f)

f = ogr.Feature(src_lyr.GetLayerDefn())
f.SetField("dt", "2022-05-31T12:34:56")
src_lyr.CreateFeature(f)

f = ogr.Feature(src_lyr.GetLayerDefn())
f.SetField("dt", "2022-05-31T12:34:56+12:30")
src_lyr.CreateFeature(f)

ds = ogr.GetDriverByName("Memory").CreateDataSource("")
dst_lyr = ds.CreateLayer("dst_lyr")

stream = src_lyr.GetArrowStream(["DATETIME_AS_STRING=YES"])
schema = stream.GetSchema()

for i in range(schema.GetChildrenCount()):
dst_lyr.CreateFieldFromArrowSchema(schema.GetChild(i))

while True:
array = stream.GetNextRecordBatch()
if array is None:
break
dst_lyr.WriteArrowBatch(schema, array)

assert [f.GetField("dt") for f in dst_lyr] == [
None,
"2022/05/31 12:34:56.789+00",
"2022/05/31 12:34:56",
"2022/05/31 12:34:56+1230",
]


###############################################################################


Expand Down
Loading

0 comments on commit c7606da

Please sign in to comment.