// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #include "io.h" #include #include #include #include #include #include "arrow/io/memory.h" #include "arrow/memory_pool.h" #include "arrow/status.h" #include "arrow/util/logging.h" #include "arrow/python/common.h" #include "arrow/python/pyarrow.h" namespace arrow { using arrow::io::TransformInputStream; namespace py { // ---------------------------------------------------------------------- // Python file // A common interface to a Python file-like object. Must acquire GIL before // calling any methods class PythonFile { public: explicit PythonFile(PyObject* file) : file_(file), checked_read_buffer_(false) { Py_INCREF(file); } Status CheckClosed() const { if (!file_) { return Status::Invalid("operation on closed Python file"); } return Status::OK(); } Status Close() { if (file_) { PyObject* result = cpp_PyObject_CallMethod(file_.obj(), "close", "()"); Py_XDECREF(result); file_.reset(); PY_RETURN_IF_ERROR(StatusCode::IOError); } return Status::OK(); } Status Abort() { file_.reset(); return Status::OK(); } bool closed() const { if (!file_) { return true; } PyObject* result = PyObject_GetAttrString(file_.obj(), "closed"); if (result == NULL) { // Can't propagate the error, so write it out and return an arbitrary value PyErr_WriteUnraisable(NULL); return true; } int ret = PyObject_IsTrue(result); Py_XDECREF(result); if (ret < 0) { PyErr_WriteUnraisable(NULL); return true; } return ret != 0; } Status Seek(int64_t position, int whence) { RETURN_NOT_OK(CheckClosed()); // NOTE: `long long` is at least 64 bits in the C standard, the cast below is // therefore safe. // whence: 0 for relative to start of file, 2 for end of file PyObject* result = cpp_PyObject_CallMethod(file_.obj(), "seek", "(Li)", static_cast(position), whence); Py_XDECREF(result); PY_RETURN_IF_ERROR(StatusCode::IOError); return Status::OK(); } Status Read(int64_t nbytes, PyObject** out) { RETURN_NOT_OK(CheckClosed()); PyObject* result = cpp_PyObject_CallMethod(file_.obj(), "read", "(L)", static_cast(nbytes)); PY_RETURN_IF_ERROR(StatusCode::IOError); *out = result; return Status::OK(); } Status ReadBuffer(int64_t nbytes, PyObject** out) { PyObject* result = cpp_PyObject_CallMethod(file_.obj(), "read_buffer", "(L)", static_cast(nbytes)); PY_RETURN_IF_ERROR(StatusCode::IOError); *out = result; return Status::OK(); } Status Write(const void* data, int64_t nbytes) { RETURN_NOT_OK(CheckClosed()); // Since the data isn't owned, we have to make a copy PyObject* py_data = PyBytes_FromStringAndSize(reinterpret_cast(data), nbytes); PY_RETURN_IF_ERROR(StatusCode::IOError); PyObject* result = cpp_PyObject_CallMethod(file_.obj(), "write", "(O)", py_data); Py_XDECREF(py_data); Py_XDECREF(result); PY_RETURN_IF_ERROR(StatusCode::IOError); return Status::OK(); } Status Write(const std::shared_ptr& buffer) { RETURN_NOT_OK(CheckClosed()); PyObject* py_data = wrap_buffer(buffer); PY_RETURN_IF_ERROR(StatusCode::IOError); PyObject* result = cpp_PyObject_CallMethod(file_.obj(), "write", "(O)", py_data); Py_XDECREF(py_data); Py_XDECREF(result); PY_RETURN_IF_ERROR(StatusCode::IOError); return Status::OK(); } Result Tell() { RETURN_NOT_OK(CheckClosed()); PyObject* result = cpp_PyObject_CallMethod(file_.obj(), "tell", "()"); PY_RETURN_IF_ERROR(StatusCode::IOError); int64_t position = PyLong_AsLongLong(result); Py_DECREF(result); // PyLong_AsLongLong can raise OverflowError PY_RETURN_IF_ERROR(StatusCode::IOError); return position; } std::mutex& lock() { return lock_; } bool HasReadBuffer() { if (!checked_read_buffer_) { // we don't want to check this each time has_read_buffer_ = PyObject_HasAttrString(file_.obj(), "read_buffer") == 1; checked_read_buffer_ = true; } return has_read_buffer_; } private: std::mutex lock_; OwnedRefNoGIL file_; bool has_read_buffer_; bool checked_read_buffer_; }; // ---------------------------------------------------------------------- // Seekable input stream PyReadableFile::PyReadableFile(PyObject* file) { file_.reset(new PythonFile(file)); } // The destructor does not close the underlying Python file object, as // there may be multiple references to it. Instead let the Python // destructor do its job. PyReadableFile::~PyReadableFile() {} Status PyReadableFile::Abort() { return SafeCallIntoPython([this]() { return file_->Abort(); }); } Status PyReadableFile::Close() { return SafeCallIntoPython([this]() { return file_->Close(); }); } bool PyReadableFile::closed() const { bool res; Status st = SafeCallIntoPython([this, &res]() { res = file_->closed(); return Status::OK(); }); return res; } Status PyReadableFile::Seek(int64_t position) { return SafeCallIntoPython([=] { return file_->Seek(position, 0); }); } Result PyReadableFile::Tell() const { return SafeCallIntoPython([=]() -> Result { return file_->Tell(); }); } Result PyReadableFile::Read(int64_t nbytes, void* out) { return SafeCallIntoPython([=]() -> Result { OwnedRef bytes; RETURN_NOT_OK(file_->Read(nbytes, bytes.ref())); PyObject* bytes_obj = bytes.obj(); DCHECK(bytes_obj != NULL); Py_buffer py_buf; if (!PyObject_GetBuffer(bytes_obj, &py_buf, PyBUF_ANY_CONTIGUOUS)) { const uint8_t* data = reinterpret_cast(py_buf.buf); std::memcpy(out, data, py_buf.len); int64_t len = py_buf.len; PyBuffer_Release(&py_buf); return len; } else { return Status::TypeError( "Python file read() should have returned a bytes object or an object " "supporting the buffer protocol, got '", Py_TYPE(bytes_obj)->tp_name, "' (did you open the file in binary mode?)"); } }); } Result> PyReadableFile::Read(int64_t nbytes) { return SafeCallIntoPython([=]() -> Result> { OwnedRef buffer_obj; if (file_->HasReadBuffer()) { RETURN_NOT_OK(file_->ReadBuffer(nbytes, buffer_obj.ref())); } else { RETURN_NOT_OK(file_->Read(nbytes, buffer_obj.ref())); } DCHECK(buffer_obj.obj() != NULL); return PyBuffer::FromPyObject(buffer_obj.obj()); }); } Result PyReadableFile::ReadAt(int64_t position, int64_t nbytes, void* out) { std::lock_guard guard(file_->lock()); return SafeCallIntoPython([=]() -> Result { RETURN_NOT_OK(Seek(position)); return Read(nbytes, out); }); } Result> PyReadableFile::ReadAt(int64_t position, int64_t nbytes) { std::lock_guard guard(file_->lock()); return SafeCallIntoPython([=]() -> Result> { RETURN_NOT_OK(Seek(position)); return Read(nbytes); }); } Result PyReadableFile::GetSize() { return SafeCallIntoPython([=]() -> Result { ARROW_ASSIGN_OR_RAISE(int64_t current_position, file_->Tell()); RETURN_NOT_OK(file_->Seek(0, 2)); ARROW_ASSIGN_OR_RAISE(int64_t file_size, file_->Tell()); // Restore previous file position RETURN_NOT_OK(file_->Seek(current_position, 0)); return file_size; }); } // ---------------------------------------------------------------------- // Output stream PyOutputStream::PyOutputStream(PyObject* file) : position_(0) { file_.reset(new PythonFile(file)); } // The destructor does not close the underlying Python file object, as // there may be multiple references to it. Instead let the Python // destructor do its job. PyOutputStream::~PyOutputStream() {} Status PyOutputStream::Abort() { return SafeCallIntoPython([=]() { return file_->Abort(); }); } Status PyOutputStream::Close() { return SafeCallIntoPython([=]() { return file_->Close(); }); } bool PyOutputStream::closed() const { bool res; Status st = SafeCallIntoPython([this, &res]() { res = file_->closed(); return Status::OK(); }); return res; } Result PyOutputStream::Tell() const { return position_; } Status PyOutputStream::Write(const void* data, int64_t nbytes) { return SafeCallIntoPython([=]() { position_ += nbytes; return file_->Write(data, nbytes); }); } Status PyOutputStream::Write(const std::shared_ptr& buffer) { return SafeCallIntoPython([=]() { position_ += buffer->size(); return file_->Write(buffer); }); } // ---------------------------------------------------------------------- // Foreign buffer Status PyForeignBuffer::Make(const uint8_t* data, int64_t size, PyObject* base, std::shared_ptr* out) { PyForeignBuffer* buf = new PyForeignBuffer(data, size, base); if (buf == NULL) { return Status::OutOfMemory("could not allocate foreign buffer object"); } else { *out = std::shared_ptr(buf); return Status::OK(); } } // ---------------------------------------------------------------------- // TransformInputStream::TransformFunc wrapper struct TransformFunctionWrapper { TransformFunctionWrapper(TransformCallback cb, PyObject* arg) : cb_(std::move(cb)), arg_(std::make_shared(arg)) { Py_INCREF(arg); } Result> operator()(const std::shared_ptr& src) { return SafeCallIntoPython([=]() -> Result> { std::shared_ptr dest; cb_(arg_->obj(), src, &dest); RETURN_NOT_OK(CheckPyError()); return dest; }); } protected: // Need to wrap OwnedRefNoGIL because std::function needs the callable // to be copy-constructible... TransformCallback cb_; std::shared_ptr arg_; }; std::shared_ptr<::arrow::io::InputStream> MakeTransformInputStream( std::shared_ptr<::arrow::io::InputStream> wrapped, TransformInputStreamVTable vtable, PyObject* handler) { TransformInputStream::TransformFunc transform( TransformFunctionWrapper{std::move(vtable.transform), handler}); return std::make_shared(std::move(wrapped), std::move(transform)); } std::shared_ptr MakeStreamTransformFunc(TransformInputStreamVTable vtable, PyObject* handler) { TransformInputStream::TransformFunc transform( TransformFunctionWrapper{std::move(vtable.transform), handler}); StreamWrapFunc func = [transform](std::shared_ptr<::arrow::io::InputStream> wrapped) { return std::make_shared(wrapped, transform); }; return std::make_shared(func); } } // namespace py } // namespace arrow