Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 48 additions & 107 deletions arrow-pyarrow/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@

use std::convert::{From, TryFrom};
use std::ffi::CStr;
use std::ptr::{addr_of, addr_of_mut};
use std::sync::Arc;

use arrow_array::ffi;
Expand Down Expand Up @@ -156,36 +155,27 @@ impl FromPyArrow for DataType {
// method, so prefer it over _export_to_c.
// See https://arrow.apache.org/docs/format/CDataInterface/PyCapsuleInterface.html
if value.hasattr("__arrow_c_schema__")? {
let capsule = value.getattr("__arrow_c_schema__")?.call0()?;
let capsule = capsule.cast::<PyCapsule>()?;
validate_pycapsule(capsule, "arrow_schema")?;
let capsule = value.call_method0("__arrow_c_schema__")?.extract()?;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

validate_pycapsule(&capsule, "arrow_schema")?;

let schema_ptr = capsule
.pointer_checked(Some(ARROW_SCHEMA_CAPSULE_NAME))?
.cast::<FFI_ArrowSchema>();
unsafe {
let dtype = DataType::try_from(schema_ptr.as_ref()).map_err(to_py_err)?;
return Ok(dtype);
}
return unsafe { DataType::try_from(schema_ptr.as_ref()) }.map_err(to_py_err);
}

validate_class(data_type_class(value.py())?, value)?;

let c_schema = FFI_ArrowSchema::empty();
let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
value.call_method1("_export_to_c", (c_schema_ptr as Py_uintptr_t,))?;
let dtype = DataType::try_from(&c_schema).map_err(to_py_err)?;
Ok(dtype)
let mut c_schema = FFI_ArrowSchema::empty();
value.call_method1("_export_to_c", (&raw mut c_schema as Py_uintptr_t,))?;
DataType::try_from(&c_schema).map_err(to_py_err)
}
}

impl ToPyArrow for DataType {
fn to_pyarrow<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
let c_schema = FFI_ArrowSchema::try_from(self).map_err(to_py_err)?;
let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
let dtype =
data_type_class(py)?.call_method1("_import_from_c", (c_schema_ptr as Py_uintptr_t,))?;
Ok(dtype)
data_type_class(py)?.call_method1("_import_from_c", (&raw const c_schema as Py_uintptr_t,))
}
}

Expand All @@ -195,36 +185,27 @@ impl FromPyArrow for Field {
// method, so prefer it over _export_to_c.
// See https://arrow.apache.org/docs/format/CDataInterface/PyCapsuleInterface.html
if value.hasattr("__arrow_c_schema__")? {
let capsule = value.getattr("__arrow_c_schema__")?.call0()?;
let capsule = capsule.cast::<PyCapsule>()?;
validate_pycapsule(capsule, "arrow_schema")?;
let capsule = value.call_method0("__arrow_c_schema__")?.extract()?;
validate_pycapsule(&capsule, "arrow_schema")?;

let schema_ptr = capsule
.pointer_checked(Some(ARROW_SCHEMA_CAPSULE_NAME))?
.cast::<FFI_ArrowSchema>();
unsafe {
let field = Field::try_from(schema_ptr.as_ref()).map_err(to_py_err)?;
return Ok(field);
}
return unsafe { Field::try_from(schema_ptr.as_ref()) }.map_err(to_py_err);
}

validate_class(field_class(value.py())?, value)?;

let c_schema = FFI_ArrowSchema::empty();
let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
value.call_method1("_export_to_c", (c_schema_ptr as Py_uintptr_t,))?;
let field = Field::try_from(&c_schema).map_err(to_py_err)?;
Ok(field)
let mut c_schema = FFI_ArrowSchema::empty();
value.call_method1("_export_to_c", (&raw mut c_schema as Py_uintptr_t,))?;
Field::try_from(&c_schema).map_err(to_py_err)
}
}

impl ToPyArrow for Field {
fn to_pyarrow<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
let c_schema = FFI_ArrowSchema::try_from(self).map_err(to_py_err)?;
let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
let dtype =
field_class(py)?.call_method1("_import_from_c", (c_schema_ptr as Py_uintptr_t,))?;
Ok(dtype)
field_class(py)?.call_method1("_import_from_c", (&raw const c_schema as Py_uintptr_t,))
}
}

Expand All @@ -234,36 +215,27 @@ impl FromPyArrow for Schema {
// method, so prefer it over _export_to_c.
// See https://arrow.apache.org/docs/format/CDataInterface/PyCapsuleInterface.html
if value.hasattr("__arrow_c_schema__")? {
let capsule = value.getattr("__arrow_c_schema__")?.call0()?;
let capsule = capsule.cast::<PyCapsule>()?;
validate_pycapsule(capsule, "arrow_schema")?;
let capsule = value.call_method0("__arrow_c_schema__")?.extract()?;
validate_pycapsule(&capsule, "arrow_schema")?;

let schema_ptr = capsule
.pointer_checked(Some(ARROW_SCHEMA_CAPSULE_NAME))?
.cast::<FFI_ArrowSchema>();
unsafe {
let schema = Schema::try_from(schema_ptr.as_ref()).map_err(to_py_err)?;
return Ok(schema);
}
return unsafe { Schema::try_from(schema_ptr.as_ref()) }.map_err(to_py_err);
}

validate_class(schema_class(value.py())?, value)?;

let c_schema = FFI_ArrowSchema::empty();
let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
value.call_method1("_export_to_c", (c_schema_ptr as Py_uintptr_t,))?;
let schema = Schema::try_from(&c_schema).map_err(to_py_err)?;
Ok(schema)
let mut c_schema = FFI_ArrowSchema::empty();
value.call_method1("_export_to_c", (&raw mut c_schema as Py_uintptr_t,))?;
Schema::try_from(&c_schema).map_err(to_py_err)
}
}

impl ToPyArrow for Schema {
fn to_pyarrow<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
let c_schema = FFI_ArrowSchema::try_from(self).map_err(to_py_err)?;
let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
let schema =
schema_class(py)?.call_method1("_import_from_c", (c_schema_ptr as Py_uintptr_t,))?;
Ok(schema)
schema_class(py)?.call_method1("_import_from_c", (&raw const c_schema as Py_uintptr_t,))
}
}

Expand All @@ -273,21 +245,11 @@ impl FromPyArrow for ArrayData {
// method, so prefer it over _export_to_c.
// See https://arrow.apache.org/docs/format/CDataInterface/PyCapsuleInterface.html
if value.hasattr("__arrow_c_array__")? {
let tuple = value.getattr("__arrow_c_array__")?.call0()?;

if !tuple.is_instance_of::<PyTuple>() {
return Err(PyTypeError::new_err(
"Expected __arrow_c_array__ to return a tuple.",
));
}
Comment on lines -278 to -282
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fwiw previously there was a nice user-facing error here, while now the error generated from extract will be much more obtuse. Ideally this exception will never be raised except if the producer doesn't follow the spec correctly.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will make a follow on PR to keep the message

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


let schema_capsule = tuple.get_item(0)?;
let schema_capsule = schema_capsule.cast::<PyCapsule>()?;
let array_capsule = tuple.get_item(1)?;
let array_capsule = array_capsule.cast::<PyCapsule>()?;
let (schema_capsule, array_capsule) =
value.call_method0("__arrow_c_array__")?.extract()?;

validate_pycapsule(schema_capsule, "arrow_schema")?;
validate_pycapsule(array_capsule, "arrow_array")?;
validate_pycapsule(&schema_capsule, "arrow_schema")?;
validate_pycapsule(&array_capsule, "arrow_array")?;

let schema_ptr = schema_capsule
.pointer_checked(Some(ARROW_SCHEMA_CAPSULE_NAME))?
Expand Down Expand Up @@ -315,8 +277,8 @@ impl FromPyArrow for ArrayData {
value.call_method1(
"_export_to_c",
(
addr_of_mut!(array) as Py_uintptr_t,
addr_of_mut!(schema) as Py_uintptr_t,
&raw mut array as Py_uintptr_t,
&raw mut schema as Py_uintptr_t,
),
)?;

Expand All @@ -328,15 +290,13 @@ impl ToPyArrow for ArrayData {
fn to_pyarrow<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
let array = FFI_ArrowArray::new(self);
let schema = FFI_ArrowSchema::try_from(self.data_type()).map_err(to_py_err)?;

let array = array_class(py)?.call_method1(
array_class(py)?.call_method1(
"_import_from_c",
(
addr_of!(array) as Py_uintptr_t,
addr_of!(schema) as Py_uintptr_t,
&raw const array as Py_uintptr_t,
&raw const schema as Py_uintptr_t,
),
)?;
Ok(array)
)
}
}

Expand Down Expand Up @@ -364,21 +324,11 @@ impl FromPyArrow for RecordBatch {
// See https://arrow.apache.org/docs/format/CDataInterface/PyCapsuleInterface.html

if value.hasattr("__arrow_c_array__")? {
let tuple = value.getattr("__arrow_c_array__")?.call0()?;
let (schema_capsule, array_capsule) =
value.call_method0("__arrow_c_array__")?.extract()?;

if !tuple.is_instance_of::<PyTuple>() {
return Err(PyTypeError::new_err(
"Expected __arrow_c_array__ to return a tuple.",
));
}

let schema_capsule = tuple.get_item(0)?;
let schema_capsule = schema_capsule.cast::<PyCapsule>()?;
let array_capsule = tuple.get_item(1)?;
let array_capsule = array_capsule.cast::<PyCapsule>()?;

validate_pycapsule(schema_capsule, "arrow_schema")?;
validate_pycapsule(array_capsule, "arrow_array")?;
validate_pycapsule(&schema_capsule, "arrow_schema")?;
validate_pycapsule(&array_capsule, "arrow_array")?;

let schema_ptr = schema_capsule
.pointer_checked(Some(ARROW_SCHEMA_CAPSULE_NAME))?
Expand Down Expand Up @@ -455,9 +405,9 @@ impl FromPyArrow for ArrowArrayStreamReader {
// method, so prefer it over _export_to_c.
// See https://arrow.apache.org/docs/format/CDataInterface/PyCapsuleInterface.html
if value.hasattr("__arrow_c_stream__")? {
let capsule = value.getattr("__arrow_c_stream__")?.call0()?;
let capsule = capsule.cast::<PyCapsule>()?;
validate_pycapsule(capsule, "arrow_array_stream")?;
let capsule = value.call_method0("__arrow_c_stream__")?.extract()?;

validate_pycapsule(&capsule, "arrow_array_stream")?;

let stream = unsafe {
FFI_ArrowArrayStream::from_raw(
Expand All @@ -476,20 +426,17 @@ impl FromPyArrow for ArrowArrayStreamReader {

validate_class(record_batch_reader_class(value.py())?, value)?;

// prepare a pointer to receive the stream struct
// prepare the stream struct to receive the content
let mut stream = FFI_ArrowArrayStream::empty();
let stream_ptr = &mut stream as *mut FFI_ArrowArrayStream;

// make the conversion through PyArrow's private API
// this changes the pointer's memory and is thus unsafe.
// In particular, `_export_to_c` can go out of bounds
let args = PyTuple::new(value.py(), [stream_ptr as Py_uintptr_t])?;
let args = PyTuple::new(value.py(), [&raw mut stream as Py_uintptr_t])?;
value.call_method1("_export_to_c", args)?;

let stream_reader = ArrowArrayStreamReader::try_new(stream)
.map_err(|err| PyValueError::new_err(err.to_string()))?;

Ok(stream_reader)
ArrowArrayStreamReader::try_new(stream)
.map_err(|err| PyValueError::new_err(err.to_string()))
}
}

Expand All @@ -498,13 +445,9 @@ impl IntoPyArrow for Box<dyn RecordBatchReader + Send> {
// We can't implement `ToPyArrow` for `T: RecordBatchReader + Send` because
// there is already a blanket implementation for `T: ToPyArrow`.
fn into_pyarrow<'py>(self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
let mut stream = FFI_ArrowArrayStream::new(self);

let stream_ptr = (&mut stream) as *mut FFI_ArrowArrayStream;
let reader = record_batch_reader_class(py)?
.call_method1("_import_from_c", (stream_ptr as Py_uintptr_t,))?;

Ok(reader)
let stream = FFI_ArrowArrayStream::new(self);
record_batch_reader_class(py)?
.call_method1("_import_from_c", (&raw const stream as Py_uintptr_t,))
}
}

Expand Down Expand Up @@ -588,7 +531,7 @@ impl FromPyArrow for Table {
fn from_pyarrow_bound(ob: &Bound<PyAny>) -> PyResult<Self> {
let reader: Box<dyn RecordBatchReader> =
Box::new(ArrowArrayStreamReader::from_pyarrow_bound(ob)?);
Self::try_from(reader).map_err(|err| PyErr::new::<PyValueError, _>(err.to_string()))
Self::try_from(reader).map_err(|err| PyValueError::new_err(err.to_string()))
}
}

Expand All @@ -601,9 +544,7 @@ impl IntoPyArrow for Table {
let kwargs = PyDict::new(py);
kwargs.set_item("schema", py_schema)?;

let reader = table_class(py)?.call_method("from_batches", (py_batches,), Some(&kwargs))?;

Ok(reader)
table_class(py)?.call_method("from_batches", (py_batches,), Some(&kwargs))
}
}

Expand Down Expand Up @@ -664,7 +605,7 @@ impl<'py, T: IntoPyArrow> IntoPyObject<'py> for PyArrowType<T> {

type Error = PyErr;

fn into_pyobject(self, py: Python<'py>) -> Result<Self::Output, PyErr> {
fn into_pyobject(self, py: Python<'py>) -> PyResult<Self::Output> {
self.0.into_pyarrow(py)
}
}
Expand Down
Loading