Skip to content

Commit

Permalink
ENH: use new columnar GetArrowStream if GDAL>=3.6 and pyarrow availab…
Browse files Browse the repository at this point in the history
…le (#155)
  • Loading branch information
jorisvandenbossche authored Jan 2, 2023
1 parent 8669117 commit 0188fdb
Show file tree
Hide file tree
Showing 11 changed files with 387 additions and 6 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/docker-gdal.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ jobs:
python3 -m pip install --no-cache-dir -U pip wheel
python3 -m pip install --no-cache-dir -e .[dev,test,geopandas]
- name: Install pyarrow
if: matrix.container == "osgeo/gdal:ubuntu-small-latest"
run: |
python3 -m pip install pyarrow
- name: Test with pytest
run: |
pytest --cov=pyogrio --cov-report term-missing pyogrio/tests
1 change: 1 addition & 0 deletions MANIFEST.in
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
include versioneer.py
include pyogrio/_version.py
include pyogrio/*.pyx pyogrio/*.pxd
include pyogrio/arrow_bridge.h
exclude pyogrio/*.c
recursive-include pyogrio/tests/fixtures *
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ Supports Python 3.8 - 3.11 and GDAL 3.1.x - 3.6.x.
Reading to GeoDataFrames requires `geopandas>=0.8` with `pygeos`
or `geopandas>=0.12` with `shapely>=2`.

Additionally, installing `pyarrow` in combination with GDAL 3.6+ enables
a further speed-up when specifying `use_arrow=True`.

## Installation

Pyogrio is currently available on
Expand Down
2 changes: 2 additions & 0 deletions ci/environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,5 @@ dependencies:
- pytest
- shapely>=2
- geopandas-base
- pyarrow

3 changes: 3 additions & 0 deletions docs/source/install.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ Supports Python 3.8 - 3.11 and GDAL 3.1.x - 3.6.x
Reading to GeoDataFrames requires requires `geopandas>=0.8` with `pygeos`
or `geopandas>=0.12` with `shapely>=2`.

Additionally, installing `pyarrow` in combination with GDAL 3.6+ enables
a further speed-up when specifying `use_arrow=True`.

## Installation

### Conda-forge
Expand Down
150 changes: 147 additions & 3 deletions pyogrio/_io.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import math
import os
import warnings

from libc.stdint cimport uint8_t
from libc.stdint cimport uint8_t, uintptr_t
from libc.stdlib cimport malloc, free
from libc.string cimport strlen
from libc.math cimport isnan
Expand Down Expand Up @@ -300,14 +300,17 @@ cdef detect_encoding(OGRDataSourceH ogr_dataset, OGRLayerH ogr_layer):
return None


cdef get_fields(OGRLayerH ogr_layer, str encoding):
cdef get_fields(OGRLayerH ogr_layer, str encoding, use_arrow=False):
"""Get field names and types for layer.
Parameters
----------
ogr_layer : pointer to open OGR layer
encoding : str
encoding to use when reading field name
use_arrow : bool, default False
If using arrow, all types are supported, and we don't have to
raise warnings
Returns
-------
Expand Down Expand Up @@ -351,7 +354,7 @@ cdef get_fields(OGRLayerH ogr_layer, str encoding):

field_type = OGR_Fld_GetType(ogr_fielddef)
np_type = FIELD_TYPES[field_type]
if not np_type:
if not np_type and not use_arrow:
skipped_fields = True
log.warning(
f"Skipping field {field_name}: unsupported OGR type: {field_type}")
Expand Down Expand Up @@ -956,6 +959,147 @@ def ogr_read(
)


def ogr_read_arrow(
str path,
object layer=None,
object encoding=None,
int read_geometry=True,
int force_2d=False,
object columns=None,
int skip_features=0,
int max_features=0,
object where=None,
tuple bbox=None,
object fids=None,
str sql=None,
str sql_dialect=None,
int return_fids=False,
**kwargs):

cdef int err = 0
cdef const char *path_c = NULL
cdef const char *where_c = NULL
cdef OGRDataSourceH ogr_dataset = NULL
cdef OGRLayerH ogr_layer = NULL
cdef char **fields_c = NULL
cdef const char *field_c = NULL
cdef char **options = NULL
cdef ArrowArrayStream stream
cdef ArrowSchema schema

path_b = path.encode('utf-8')
path_c = path_b

if force_2d:
raise ValueError("forcing 2D is not supported for Arrow")

if fids is not None:
raise ValueError("reading by FID is not supported for Arrow")

if skip_features or max_features:
raise ValueError(
"specifying 'skip_features' or 'max_features' is not supported for Arrow"
)

if sql is not None and layer is not None:
raise ValueError("'sql' paramater cannot be combined with 'layer'")

ogr_dataset = ogr_open(path_c, 0, kwargs)
try:
if sql is None:
# layer defaults to index 0
if layer is None:
layer = 0
ogr_layer = get_ogr_layer(ogr_dataset, layer)
else:
ogr_layer = execute_sql(ogr_dataset, sql, sql_dialect)

crs = get_crs(ogr_layer)

# Encoding is derived from the user, from the dataset capabilities / type,
# or from the system locale
encoding = (
encoding
or detect_encoding(ogr_dataset, ogr_layer)
or locale.getpreferredencoding()
)

fields = get_fields(ogr_layer, encoding, use_arrow=True)

ignored_fields = []
if columns is not None:
# Fields are matched exactly by name, duplicates are dropped.
ignored_fields = list(set(fields[:,2]) - set(columns))
if not read_geometry:
ignored_fields.append("OGR_GEOMETRY")

geometry_type = get_geometry_type(ogr_layer)

geometry_name = get_string(OGR_L_GetGeometryColumn(ogr_layer))

# Apply the attribute filter
if where is not None and where != "":
apply_where_filter(ogr_layer, where)

# Apply the spatial filter
if bbox is not None:
apply_spatial_filter(ogr_layer, bbox)

# Limit to specified columns
if ignored_fields:
for field in ignored_fields:
field_b = field.encode("utf-8")
field_c = field_b
fields_c = CSLAddString(fields_c, field_c)

OGR_L_SetIgnoredFields(ogr_layer, <const char**>fields_c)

if not return_fids:
options = CSLSetNameValue(options, "INCLUDE_FID", "NO")

# make sure layer is read from beginning
OGR_L_ResetReading(ogr_layer)

IF CTE_GDAL_VERSION < (3, 6, 0):
raise RuntimeError("Need GDAL>=3.6 for Arrow support")

if not OGR_L_GetArrowStream(ogr_layer, &stream, options):
raise RuntimeError("Failed to open ArrowArrayStream from Layer")

stream_ptr = <uintptr_t> &stream

# stream has to be consumed before the Dataset is closed
import pyarrow as pa
table = pa.RecordBatchStreamReader._import_from_c(stream_ptr).read_all()

meta = {
'crs': crs,
'encoding': encoding,
'fields': fields[:,2], # return only names
'geometry_type': geometry_type,
'geometry_name': geometry_name,
}

finally:
CSLDestroy(options)
if fields_c != NULL:
CSLDestroy(fields_c)
fields_c = NULL
if ogr_dataset != NULL:
if sql is not None:
GDALDatasetReleaseResultSet(ogr_dataset, ogr_layer)

GDALClose(ogr_dataset)
ogr_dataset = NULL

return (
meta,
table,
None, #geometries,
None, #field_data
)


def ogr_read_bounds(
str path,
object layer=None,
Expand Down
18 changes: 17 additions & 1 deletion pyogrio/_ogr.pxd
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Contains declarations against GDAL / OGR API
from libc.stdint cimport int64_t
from libc.stdint cimport int64_t, int8_t
from libc.stdio cimport FILE


Expand Down Expand Up @@ -184,6 +184,14 @@ cdef extern from "ogr_srs_api.h":
void OSRRelease(OGRSpatialReferenceH srs)


cdef extern from "arrow_bridge.h":
struct ArrowSchema:
int64_t n_children

struct ArrowArrayStream:
int (*get_schema)(ArrowArrayStream* stream, ArrowSchema* out)


cdef extern from "ogr_api.h":
int OGRGetDriverCount()
OGRSFDriverH OGRGetDriver(int)
Expand Down Expand Up @@ -264,6 +272,7 @@ cdef extern from "ogr_api.h":
OGRErr OGR_L_CreateFeature(OGRLayerH layer, OGRFeatureH feature)
OGRErr OGR_L_CreateField(OGRLayerH layer, OGRFieldDefnH fielddefn, int flexible)
const char* OGR_L_GetName(OGRLayerH layer)
const char* OGR_L_GetGeometryColumn(OGRLayerH layer)
OGRSpatialReferenceH OGR_L_GetSpatialRef(OGRLayerH layer)
int OGR_L_TestCapability(OGRLayerH layer, const char *name)
OGRFeatureDefnH OGR_L_GetLayerDefn(OGRLayerH layer)
Expand All @@ -274,6 +283,7 @@ cdef extern from "ogr_api.h":
OGRErr OGR_L_SetNextByIndex(OGRLayerH layer, int nIndex)
int OGR_L_GetFeatureCount(OGRLayerH layer, int m)
void OGR_L_SetSpatialFilterRect(OGRLayerH layer, double xmin, double ymin, double xmax, double ymax)
OGRErr OGR_L_SetIgnoredFields(OGRLayerH layer, const char** fields)

void OGRSetNonLinearGeometriesEnabledFlag(int bFlag)
int OGRGetNonLinearGeometriesEnabledFlag()
Expand All @@ -286,6 +296,12 @@ cdef extern from "ogr_api.h":
const char* OLCFastSpatialFilter


IF CTE_GDAL_VERSION >= (3, 6, 0):

cdef extern from "ogr_api.h":
int8_t OGR_L_GetArrowStream(OGRLayerH hLayer, ArrowArrayStream *out_stream, char** papszOptions)


cdef extern from "gdal.h":
ctypedef enum GDALDataType:
GDT_Unknown
Expand Down
115 changes: 115 additions & 0 deletions pyogrio/arrow_bridge.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// This file is an extract https://github.com/apache/arrow/blob/master/cpp/src/arrow/c/abi.h
// commit 9cbb8a1a626ee301cfe85905b6c18c5d880e176b (2022-06-14)
// WARNING: DO NOT MODIFY the content as it would break interoperability !

#pragma once

#include <stdint.h>

#ifdef __cplusplus
extern "C" {
#endif

#ifndef ARROW_C_DATA_INTERFACE
#define ARROW_C_DATA_INTERFACE

#define ARROW_FLAG_DICTIONARY_ORDERED 1
#define ARROW_FLAG_NULLABLE 2
#define ARROW_FLAG_MAP_KEYS_SORTED 4

struct ArrowSchema {
// Array type description
const char* format;
const char* name;
const char* metadata;
int64_t flags;
int64_t n_children;
struct ArrowSchema** children;
struct ArrowSchema* dictionary;

// Release callback
void (*release)(struct ArrowSchema*);
// Opaque producer-specific data
void* private_data;
};

struct ArrowArray {
// Array data description
int64_t length;
int64_t null_count;
int64_t offset;
int64_t n_buffers;
int64_t n_children;
const void** buffers;
struct ArrowArray** children;
struct ArrowArray* dictionary;

// Release callback
void (*release)(struct ArrowArray*);
// Opaque producer-specific data
void* private_data;
};

#endif // ARROW_C_DATA_INTERFACE

#ifndef ARROW_C_STREAM_INTERFACE
#define ARROW_C_STREAM_INTERFACE

struct ArrowArrayStream {
// Callback to get the stream type
// (will be the same for all arrays in the stream).
//
// Return value: 0 if successful, an `errno`-compatible error code otherwise.
//
// If successful, the ArrowSchema must be released independently from the stream.
int (*get_schema)(struct ArrowArrayStream*, struct ArrowSchema* out);

// Callback to get the next array
// (if no error and the array is released, the stream has ended)
//
// Return value: 0 if successful, an `errno`-compatible error code otherwise.
//
// If successful, the ArrowArray must be released independently from the stream.
int (*get_next)(struct ArrowArrayStream*, struct ArrowArray* out);

// Callback to get optional detailed error information.
// This must only be called if the last stream operation failed
// with a non-0 return code.
//
// Return value: pointer to a null-terminated character array describing
// the last error, or NULL if no description is available.
//
// The returned pointer is only valid until the next operation on this stream
// (including release).
const char* (*get_last_error)(struct ArrowArrayStream*);

// Release callback: release the stream's own resources.
// Note that arrays returned by `get_next` must be individually released.
void (*release)(struct ArrowArrayStream*);

// Opaque producer-specific data
void* private_data;
};

#endif // ARROW_C_STREAM_INTERFACE

#ifdef __cplusplus
}
#endif
Loading

0 comments on commit 0188fdb

Please sign in to comment.