// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. // Public API for different memory sharing / IO mechanisms #pragma once #include #include #include #include #include "arrow/io/concurrency.h" #include "arrow/io/interfaces.h" #include "arrow/type_fwd.h" #include "arrow/util/visibility.h" namespace arrow { class Status; namespace io { /// \brief An output stream that writes to a resizable buffer class ARROW_EXPORT BufferOutputStream : public OutputStream { public: explicit BufferOutputStream(const std::shared_ptr& buffer); /// \brief Create in-memory output stream with indicated capacity using a /// memory pool /// \param[in] initial_capacity the initial allocated internal capacity of /// the OutputStream /// \param[in,out] pool a MemoryPool to use for allocations /// \return the created stream static Result> Create( int64_t initial_capacity = 4096, MemoryPool* pool = default_memory_pool()); ~BufferOutputStream() override; // Implement the OutputStream interface /// Close the stream, preserving the buffer (retrieve it with Finish()). Status Close() override; bool closed() const override; Result Tell() const override; Status Write(const void* data, int64_t nbytes) override; /// \cond FALSE using OutputStream::Write; /// \endcond /// Close the stream and return the buffer Result> Finish(); /// \brief Initialize state of OutputStream with newly allocated memory and /// set position to 0 /// \param[in] initial_capacity the starting allocated capacity /// \param[in,out] pool the memory pool to use for allocations /// \return Status Status Reset(int64_t initial_capacity = 1024, MemoryPool* pool = default_memory_pool()); int64_t capacity() const { return capacity_; } private: BufferOutputStream(); // Ensures there is sufficient space available to write nbytes Status Reserve(int64_t nbytes); std::shared_ptr buffer_; bool is_open_; int64_t capacity_; int64_t position_; uint8_t* mutable_data_; }; /// \brief A helper class to track the size of allocations /// /// Writes to this stream do not copy or retain any data, they just bump /// a size counter that can be later used to know exactly which data size /// needs to be allocated for actual writing. class ARROW_EXPORT MockOutputStream : public OutputStream { public: MockOutputStream() : extent_bytes_written_(0), is_open_(true) {} // Implement the OutputStream interface Status Close() override; bool closed() const override; Result Tell() const override; Status Write(const void* data, int64_t nbytes) override; /// \cond FALSE using Writable::Write; /// \endcond int64_t GetExtentBytesWritten() const { return extent_bytes_written_; } private: int64_t extent_bytes_written_; bool is_open_; }; /// \brief An output stream that writes into a fixed-size mutable buffer class ARROW_EXPORT FixedSizeBufferWriter : public WritableFile { public: /// Input buffer must be mutable, will abort if not explicit FixedSizeBufferWriter(const std::shared_ptr& buffer); ~FixedSizeBufferWriter() override; Status Close() override; bool closed() const override; Status Seek(int64_t position) override; Result Tell() const override; Status Write(const void* data, int64_t nbytes) override; /// \cond FALSE using Writable::Write; /// \endcond Status WriteAt(int64_t position, const void* data, int64_t nbytes) override; void set_memcopy_threads(int num_threads); void set_memcopy_blocksize(int64_t blocksize); void set_memcopy_threshold(int64_t threshold); protected: class FixedSizeBufferWriterImpl; std::unique_ptr impl_; }; /// \class BufferReader /// \brief Random access zero-copy reads on an arrow::Buffer class ARROW_EXPORT BufferReader : public internal::RandomAccessFileConcurrencyWrapper { public: /// \brief Instantiate from std::shared_ptr. /// /// This is a zero-copy constructor. explicit BufferReader(std::shared_ptr buffer); ARROW_DEPRECATED( "Deprecated in 14.0.0. Use FromString or BufferReader(std::shared_ptr " "buffer) instead.") explicit BufferReader(const Buffer& buffer); ARROW_DEPRECATED( "Deprecated in 14.0.0. Use FromString or BufferReader(std::shared_ptr " "buffer) instead.") BufferReader(const uint8_t* data, int64_t size); /// \brief Instantiate from std::string_view. Does not own data /// \deprecated Deprecated in 14.0.0. Use FromString or /// BufferReader(std::shared_ptr buffer) instead. ARROW_DEPRECATED( "Deprecated in 14.0.0. Use FromString or BufferReader(std::shared_ptr " "buffer) instead.") explicit BufferReader(std::string_view data); /// \brief Instantiate from std::string. Owns data. static std::unique_ptr FromString(std::string data); bool closed() const override; bool supports_zero_copy() const override; std::shared_ptr buffer() const { return buffer_; } // Synchronous ReadAsync override Future> ReadAsync(const IOContext&, int64_t position, int64_t nbytes) override; Status WillNeed(const std::vector& ranges) override; protected: friend RandomAccessFileConcurrencyWrapper; Status DoClose(); Result DoRead(int64_t nbytes, void* buffer); Result> DoRead(int64_t nbytes); Result DoReadAt(int64_t position, int64_t nbytes, void* out); Result> DoReadAt(int64_t position, int64_t nbytes); Result DoPeek(int64_t nbytes) override; Result DoTell() const; Status DoSeek(int64_t position); Result DoGetSize(); Status CheckClosed() const { if (!is_open_) { return Status::Invalid("Operation forbidden on closed BufferReader"); } return Status::OK(); } std::shared_ptr buffer_; const uint8_t* data_; int64_t size_; int64_t position_; bool is_open_; }; } // namespace io } // namespace arrow