// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #pragma once #include #include #include #include #include #include #include #include #include "parquet/column_reader.h" #include "parquet/file_reader.h" #include "parquet/stream_writer.h" namespace parquet { /// \brief A class for reading Parquet files using an output stream type API. /// /// The values given must be of the correct type i.e. the type must /// match the file schema exactly otherwise a ParquetException will be /// thrown. /// /// The user must explicitly advance to the next row using the /// EndRow() function or EndRow input manipulator. /// /// Required and optional fields are supported: /// - Required fields are read using operator>>(T) /// - Optional fields are read with /// operator>>(std::optional) /// /// Note that operator>>(std::optional) can be used to read /// required fields. /// /// Similarly operator>>(T) can be used to read optional fields. /// However, if the value is not present then a ParquetException will /// be raised. /// /// Currently there is no support for repeated fields. /// class PARQUET_EXPORT StreamReader { public: template using optional = ::std::optional; // N.B. Default constructed objects are not usable. This // constructor is provided so that the object may be move // assigned afterwards. StreamReader() = default; explicit StreamReader(std::unique_ptr reader); ~StreamReader() = default; bool eof() const { return eof_; } int current_column() const { return column_index_; } int64_t current_row() const { return current_row_; } int num_columns() const; int64_t num_rows() const; // Moving is possible. StreamReader(StreamReader&&) = default; StreamReader& operator=(StreamReader&&) = default; // Copying is not allowed. StreamReader(const StreamReader&) = delete; StreamReader& operator=(const StreamReader&) = delete; StreamReader& operator>>(bool& v); StreamReader& operator>>(int8_t& v); StreamReader& operator>>(uint8_t& v); StreamReader& operator>>(int16_t& v); StreamReader& operator>>(uint16_t& v); StreamReader& operator>>(int32_t& v); StreamReader& operator>>(uint32_t& v); StreamReader& operator>>(int64_t& v); StreamReader& operator>>(uint64_t& v); StreamReader& operator>>(std::chrono::milliseconds& v); StreamReader& operator>>(std::chrono::microseconds& v); StreamReader& operator>>(float& v); StreamReader& operator>>(double& v); StreamReader& operator>>(char& v); template StreamReader& operator>>(char (&v)[N]) { ReadFixedLength(v, N); return *this; } template StreamReader& operator>>(std::array& v) { ReadFixedLength(v.data(), static_cast(N)); return *this; } // N.B. Cannot allow for reading to a arbitrary char pointer as the // length cannot be verified. Also it would overshadow the // char[N] input operator. // StreamReader& operator>>(char * v); StreamReader& operator>>(std::string& v); StreamReader& operator>>(::arrow::Decimal128& v); // Input operators for optional fields. StreamReader& operator>>(optional& v); StreamReader& operator>>(optional& v); StreamReader& operator>>(optional& v); StreamReader& operator>>(optional& v); StreamReader& operator>>(optional& v); StreamReader& operator>>(optional& v); StreamReader& operator>>(optional& v); StreamReader& operator>>(optional& v); StreamReader& operator>>(optional& v); StreamReader& operator>>(optional& v); StreamReader& operator>>(optional& v); StreamReader& operator>>(optional& v); StreamReader& operator>>(optional& v); StreamReader& operator>>(optional& v); StreamReader& operator>>(optional& v); StreamReader& operator>>(optional<::arrow::Decimal128>& v); template StreamReader& operator>>(optional>& v) { CheckColumn(Type::FIXED_LEN_BYTE_ARRAY, ConvertedType::NONE, N); FixedLenByteArray flba; if (ReadOptional(&flba)) { v = std::array{}; std::memcpy(v->data(), flba.ptr, N); } else { v.reset(); } return *this; } /// \brief Terminate current row and advance to next one. /// \throws ParquetException if all columns in the row were not /// read or skipped. void EndRow(); /// \brief Skip the data in the next columns. /// If the number of columns exceeds the columns remaining on the /// current row then skipping is terminated - it does _not_ continue /// skipping columns on the next row. /// Skipping of columns still requires the use 'EndRow' even if all /// remaining columns were skipped. /// \return Number of columns actually skipped. int64_t SkipColumns(int64_t num_columns_to_skip); /// \brief Skip the data in the next rows. /// Skipping of rows is not allowed if reading of data for the /// current row is not finished. /// Skipping of rows will be terminated if the end of file is /// reached. /// \return Number of rows actually skipped. int64_t SkipRows(int64_t num_rows_to_skip); protected: [[noreturn]] void ThrowReadFailedException( const std::shared_ptr& node); template void Read(T* v) { const auto& node = nodes_[column_index_]; auto reader = static_cast(column_readers_[column_index_++].get()); int16_t def_level; int16_t rep_level; int64_t values_read; reader->ReadBatch(kBatchSizeOne, &def_level, &rep_level, v, &values_read); if (values_read != 1) { ThrowReadFailedException(node); } } template void Read(T* v) { const auto& node = nodes_[column_index_]; auto reader = static_cast(column_readers_[column_index_++].get()); int16_t def_level; int16_t rep_level; ReadType tmp; int64_t values_read; reader->ReadBatch(kBatchSizeOne, &def_level, &rep_level, &tmp, &values_read); if (values_read == 1) { *v = tmp; } else { ThrowReadFailedException(node); } } template void ReadOptional(optional* v) { const auto& node = nodes_[column_index_]; auto reader = static_cast(column_readers_[column_index_++].get()); int16_t def_level; int16_t rep_level; ReadType tmp; int64_t values_read; reader->ReadBatch(kBatchSizeOne, &def_level, &rep_level, &tmp, &values_read); if (values_read == 1) { *v = T(tmp); } else if ((values_read == 0) && (def_level == 0)) { v->reset(); } else { ThrowReadFailedException(node); } } void ReadFixedLength(char* ptr, int len); void Read(ByteArray* v); void Read(FixedLenByteArray* v); bool ReadOptional(ByteArray* v); bool ReadOptional(FixedLenByteArray* v); void NextRowGroup(); void CheckColumn(Type::type physical_type, ConvertedType::type converted_type, int length = 0); void SkipRowsInColumn(ColumnReader* reader, int64_t num_rows_to_skip); void SetEof(); private: std::unique_ptr file_reader_; std::shared_ptr file_metadata_; std::shared_ptr row_group_reader_; std::vector> column_readers_; std::vector> nodes_; bool eof_{true}; int row_group_index_{0}; int column_index_{0}; int64_t current_row_{0}; int64_t row_group_row_offset_{0}; static constexpr int64_t kBatchSizeOne = 1; }; // namespace parquet PARQUET_EXPORT StreamReader& operator>>(StreamReader&, EndRowType); } // namespace parquet