// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #pragma once #include #include #include #include #include "arrow/compare.h" #include "arrow/result.h" #include "arrow/status.h" #include "arrow/type_fwd.h" #include "arrow/util/iterator.h" #include "arrow/util/macros.h" #include "arrow/util/visibility.h" namespace arrow { /// \class RecordBatch /// \brief Collection of equal-length arrays matching a particular Schema /// /// A record batch is table-like data structure that is semantically a sequence /// of fields, each a contiguous Arrow array class ARROW_EXPORT RecordBatch { public: virtual ~RecordBatch() = default; /// \param[in] schema The record batch schema /// \param[in] num_rows length of fields in the record batch. Each array /// should have the same length as num_rows /// \param[in] columns the record batch fields as vector of arrays static std::shared_ptr Make(std::shared_ptr schema, int64_t num_rows, std::vector> columns); /// \brief Construct record batch from vector of internal data structures /// \since 0.5.0 /// /// This class is intended for internal use, or advanced users. /// /// \param schema the record batch schema /// \param num_rows the number of semantic rows in the record batch. This /// should be equal to the length of each field /// \param columns the data for the batch's columns static std::shared_ptr Make( std::shared_ptr schema, int64_t num_rows, std::vector> columns); /// \brief Create an empty RecordBatch of a given schema /// /// The output RecordBatch will be created with DataTypes from /// the given schema. /// /// \param[in] schema the schema of the empty RecordBatch /// \param[in] pool the memory pool to allocate memory from /// \return the resulting RecordBatch static Result> MakeEmpty( std::shared_ptr schema, MemoryPool* pool = default_memory_pool()); /// \brief Convert record batch to struct array /// /// Create a struct array whose child arrays are the record batch's columns. /// Note that the record batch's top-level field metadata cannot be reflected /// in the resulting struct array. Result> ToStructArray() const; /// \brief Convert record batch with one data type to Tensor /// /// Create a Tensor object with shape (number of rows, number of columns) and /// strides (type size in bytes, type size in bytes * number of rows). /// Generated Tensor will have column-major layout. /// /// \param[in] null_to_nan if true, convert nulls to NaN /// \param[in] row_major if true, create row-major Tensor else column-major Tensor /// \param[in] pool the memory pool to allocate the tensor buffer /// \return the resulting Tensor Result> ToTensor( bool null_to_nan = false, bool row_major = true, MemoryPool* pool = default_memory_pool()) const; /// \brief Construct record batch from struct array /// /// This constructs a record batch using the child arrays of the given /// array, which must be a struct array. /// /// \param[in] array the source array, must be a StructArray /// \param[in] pool the memory pool to allocate new validity bitmaps /// /// This operation will usually be zero-copy. However, if the struct array has an /// offset or a validity bitmap then these will need to be pushed into the child arrays. /// Pushing the offset is zero-copy but pushing the validity bitmap is not. static Result> FromStructArray( const std::shared_ptr& array, MemoryPool* pool = default_memory_pool()); /// \brief Determine if two record batches are exactly equal /// /// \param[in] other the RecordBatch to compare with /// \param[in] check_metadata if true, check that Schema metadata is the same /// \param[in] opts the options for equality comparisons /// \return true if batches are equal bool Equals(const RecordBatch& other, bool check_metadata = false, const EqualOptions& opts = EqualOptions::Defaults()) const; /// \brief Determine if two record batches are approximately equal /// /// \param[in] other the RecordBatch to compare with /// \param[in] opts the options for equality comparisons /// \return true if batches are approximately equal bool ApproxEquals(const RecordBatch& other, const EqualOptions& opts = EqualOptions::Defaults()) const; /// \return the record batch's schema const std::shared_ptr& schema() const { return schema_; } /// \brief Replace the schema with another schema with the same types, but potentially /// different field names and/or metadata. Result> ReplaceSchema( std::shared_ptr schema) const; /// \brief Retrieve all columns at once virtual const std::vector>& columns() const = 0; /// \brief Retrieve an array from the record batch /// \param[in] i field index, does not boundscheck /// \return an Array object virtual std::shared_ptr column(int i) const = 0; /// \brief Retrieve an array from the record batch /// \param[in] name field name /// \return an Array or null if no field was found std::shared_ptr GetColumnByName(const std::string& name) const; /// \brief Retrieve an array's internal data from the record batch /// \param[in] i field index, does not boundscheck /// \return an internal ArrayData object virtual std::shared_ptr column_data(int i) const = 0; /// \brief Retrieve all arrays' internal data from the record batch. virtual const ArrayDataVector& column_data() const = 0; /// \brief Add column to the record batch, producing a new RecordBatch /// /// \param[in] i field index, which will be boundschecked /// \param[in] field field to be added /// \param[in] column column to be added virtual Result> AddColumn( int i, const std::shared_ptr& field, const std::shared_ptr& column) const = 0; /// \brief Add new nullable column to the record batch, producing a new /// RecordBatch. /// /// For non-nullable columns, use the Field-based version of this method. /// /// \param[in] i field index, which will be boundschecked /// \param[in] field_name name of field to be added /// \param[in] column column to be added virtual Result> AddColumn( int i, std::string field_name, const std::shared_ptr& column) const; /// \brief Replace a column in the record batch, producing a new RecordBatch /// /// \param[in] i field index, does boundscheck /// \param[in] field field to be replaced /// \param[in] column column to be replaced virtual Result> SetColumn( int i, const std::shared_ptr& field, const std::shared_ptr& column) const = 0; /// \brief Remove column from the record batch, producing a new RecordBatch /// /// \param[in] i field index, does boundscheck virtual Result> RemoveColumn(int i) const = 0; virtual std::shared_ptr ReplaceSchemaMetadata( const std::shared_ptr& metadata) const = 0; /// \brief Name in i-th column const std::string& column_name(int i) const; /// \return the number of columns in the table int num_columns() const; /// \return the number of rows (the corresponding length of each column) int64_t num_rows() const { return num_rows_; } /// \brief Copy the entire RecordBatch to destination MemoryManager /// /// This uses Array::CopyTo on each column of the record batch to create /// a new record batch where all underlying buffers for the columns have /// been copied to the destination MemoryManager. This uses /// MemoryManager::CopyBuffer under the hood. Result> CopyTo( const std::shared_ptr& to) const; /// \brief View or Copy the entire RecordBatch to destination MemoryManager /// /// This uses Array::ViewOrCopyTo on each column of the record batch to create /// a new record batch where all underlying buffers for the columns have /// been zero-copy viewed on the destination MemoryManager, falling back /// to performing a copy if it can't be viewed as a zero-copy buffer. This uses /// Buffer::ViewOrCopy under the hood. Result> ViewOrCopyTo( const std::shared_ptr& to) const; /// \brief Slice each of the arrays in the record batch /// \param[in] offset the starting offset to slice, through end of batch /// \return new record batch virtual std::shared_ptr Slice(int64_t offset) const; /// \brief Slice each of the arrays in the record batch /// \param[in] offset the starting offset to slice /// \param[in] length the number of elements to slice from offset /// \return new record batch virtual std::shared_ptr Slice(int64_t offset, int64_t length) const = 0; /// \return PrettyPrint representation suitable for debugging std::string ToString() const; /// \brief Return names of all columns std::vector ColumnNames() const; /// \brief Rename columns with provided names Result> RenameColumns( const std::vector& names) const; /// \brief Return new record batch with specified columns Result> SelectColumns( const std::vector& indices) const; /// \brief Perform cheap validation checks to determine obvious inconsistencies /// within the record batch's schema and internal data. /// /// This is O(k) where k is the total number of fields and array descendents. /// /// \return Status virtual Status Validate() const; /// \brief Perform extensive validation checks to determine inconsistencies /// within the record batch's schema and internal data. /// /// This is potentially O(k*n) where n is the number of rows. /// /// \return Status virtual Status ValidateFull() const; protected: RecordBatch(const std::shared_ptr& schema, int64_t num_rows); std::shared_ptr schema_; int64_t num_rows_; private: ARROW_DISALLOW_COPY_AND_ASSIGN(RecordBatch); }; struct ARROW_EXPORT RecordBatchWithMetadata { std::shared_ptr batch; std::shared_ptr custom_metadata; }; /// \brief Abstract interface for reading stream of record batches class ARROW_EXPORT RecordBatchReader { public: using ValueType = std::shared_ptr; virtual ~RecordBatchReader(); /// \return the shared schema of the record batches in the stream virtual std::shared_ptr schema() const = 0; /// \brief Read the next record batch in the stream. Return null for batch /// when reaching end of stream /// /// \param[out] batch the next loaded batch, null at end of stream /// \return Status virtual Status ReadNext(std::shared_ptr* batch) = 0; virtual Result ReadNext() { return Status::NotImplemented("ReadNext with custom metadata"); } /// \brief Iterator interface Result> Next() { std::shared_ptr batch; ARROW_RETURN_NOT_OK(ReadNext(&batch)); return batch; } /// \brief finalize reader virtual Status Close() { return Status::OK(); } class RecordBatchReaderIterator { public: using iterator_category = std::input_iterator_tag; using difference_type = std::ptrdiff_t; using value_type = std::shared_ptr; using pointer = value_type const*; using reference = value_type const&; RecordBatchReaderIterator() : batch_(RecordBatchEnd()), reader_(NULLPTR) {} explicit RecordBatchReaderIterator(RecordBatchReader* reader) : batch_(RecordBatchEnd()), reader_(reader) { Next(); } bool operator==(const RecordBatchReaderIterator& other) const { return batch_ == other.batch_; } bool operator!=(const RecordBatchReaderIterator& other) const { return !(*this == other); } Result> operator*() { ARROW_RETURN_NOT_OK(batch_.status()); return batch_; } RecordBatchReaderIterator& operator++() { Next(); return *this; } RecordBatchReaderIterator operator++(int) { RecordBatchReaderIterator tmp(*this); Next(); return tmp; } private: std::shared_ptr RecordBatchEnd() { return std::shared_ptr(NULLPTR); } void Next() { if (reader_ == NULLPTR) { batch_ = RecordBatchEnd(); return; } batch_ = reader_->Next(); } Result> batch_; RecordBatchReader* reader_; }; /// \brief Return an iterator to the first record batch in the stream RecordBatchReaderIterator begin() { return RecordBatchReaderIterator(this); } /// \brief Return an iterator to the end of the stream RecordBatchReaderIterator end() { return RecordBatchReaderIterator(); } /// \brief Consume entire stream as a vector of record batches Result ToRecordBatches(); /// \brief Read all batches and concatenate as arrow::Table Result> ToTable(); /// \brief Create a RecordBatchReader from a vector of RecordBatch. /// /// \param[in] batches the vector of RecordBatch to read from /// \param[in] schema schema to conform to. Will be inferred from the first /// element if not provided. static Result> Make( RecordBatchVector batches, std::shared_ptr schema = NULLPTR); /// \brief Create a RecordBatchReader from an Iterator of RecordBatch. /// /// \param[in] batches an iterator of RecordBatch to read from. /// \param[in] schema schema that each record batch in iterator will conform to. static Result> MakeFromIterator( Iterator> batches, std::shared_ptr schema); }; /// \brief Concatenate record batches /// /// The columns of the new batch are formed by concatenate the same columns of each input /// batch. Concatenate multiple batches into a new batch requires that the schema must be /// consistent. It supports merging batches without columns (only length, scenarios such /// as count(*)). /// /// \param[in] batches a vector of record batches to be concatenated /// \param[in] pool memory to store the result will be allocated from this memory pool /// \return the concatenated record batch ARROW_EXPORT Result> ConcatenateRecordBatches( const RecordBatchVector& batches, MemoryPool* pool = default_memory_pool()); } // namespace arrow