// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #pragma once #include #include #include #include #include #include #include #include #include "parquet/column_writer.h" #include "parquet/file_writer.h" namespace parquet { /// \brief A class for writing Parquet files using an output stream type API. /// /// The values given must be of the correct type i.e. the type must /// match the file schema exactly otherwise a ParquetException will be /// thrown. /// /// The user must explicitly indicate the end of the row using the /// EndRow() function or EndRow output manipulator. /// /// A maximum row group size can be configured, the default size is /// 512MB. Alternatively the row group size can be set to zero and the /// user can create new row groups by calling the EndRowGroup() /// function or using the EndRowGroup output manipulator. /// /// Required and optional fields are supported: /// - Required fields are written using operator<<(T) /// - Optional fields are written using /// operator<<(std::optional). /// /// Note that operator<<(T) can be used to write optional fields. /// /// Similarly, operator<<(std::optional) can be used to /// write required fields. However if the optional parameter does not /// have a value (i.e. it is nullopt) then a ParquetException will be /// raised. /// /// Currently there is no support for repeated fields. /// class PARQUET_EXPORT StreamWriter { public: template using optional = ::std::optional; // N.B. Default constructed objects are not usable. This // constructor is provided so that the object may be move // assigned afterwards. StreamWriter() = default; explicit StreamWriter(std::unique_ptr writer); ~StreamWriter() = default; static void SetDefaultMaxRowGroupSize(int64_t max_size); void SetMaxRowGroupSize(int64_t max_size); int current_column() const { return column_index_; } int64_t current_row() const { return current_row_; } int num_columns() const; // Moving is possible. StreamWriter(StreamWriter&&) = default; StreamWriter& operator=(StreamWriter&&) = default; // Copying is not allowed. StreamWriter(const StreamWriter&) = delete; StreamWriter& operator=(const StreamWriter&) = delete; /// \brief Output operators for required fields. /// These can also be used for optional fields when a value must be set. StreamWriter& operator<<(bool v); StreamWriter& operator<<(int8_t v); StreamWriter& operator<<(uint8_t v); StreamWriter& operator<<(int16_t v); StreamWriter& operator<<(uint16_t v); StreamWriter& operator<<(int32_t v); StreamWriter& operator<<(uint32_t v); StreamWriter& operator<<(int64_t v); StreamWriter& operator<<(uint64_t v); StreamWriter& operator<<(const std::chrono::milliseconds& v); StreamWriter& operator<<(const std::chrono::microseconds& v); StreamWriter& operator<<(float v); StreamWriter& operator<<(double v); StreamWriter& operator<<(char v); /// \brief Helper class to write fixed length strings. /// This is useful as the standard string view (such as /// std::string_view) is for variable length data. struct PARQUET_EXPORT FixedStringView { FixedStringView() = default; explicit FixedStringView(const char* data_ptr); FixedStringView(const char* data_ptr, std::size_t data_len); const char* data{NULLPTR}; std::size_t size{0}; }; /// \brief Output operators for fixed length strings. template StreamWriter& operator<<(const char (&v)[N]) { return WriteFixedLength(v, N); } template StreamWriter& operator<<(const std::array& v) { return WriteFixedLength(v.data(), N); } StreamWriter& operator<<(FixedStringView v); /// \brief Output operators for variable length strings. StreamWriter& operator<<(const char* v); StreamWriter& operator<<(const std::string& v); StreamWriter& operator<<(::std::string_view v); /// \brief Output operator for optional fields. template StreamWriter& operator<<(const optional& v) { if (v) { return operator<<(*v); } SkipOptionalColumn(); return *this; } /// \brief Skip the next N columns of optional data. If there are /// less than N columns remaining then the excess columns are /// ignored. /// \throws ParquetException if there is an attempt to skip any /// required column. /// \return Number of columns actually skipped. int64_t SkipColumns(int num_columns_to_skip); /// \brief Terminate the current row and advance to next one. /// \throws ParquetException if all columns in the row were not /// written or skipped. void EndRow(); /// \brief Terminate the current row group and create new one. void EndRowGroup(); protected: template StreamWriter& Write(const T v) { auto writer = static_cast(row_group_writer_->column(column_index_++)); writer->WriteBatch(kBatchSizeOne, &kDefLevelOne, &kRepLevelZero, &v); if (max_row_group_size_ > 0) { row_group_size_ += writer->estimated_buffered_value_bytes(); } return *this; } StreamWriter& WriteVariableLength(const char* data_ptr, std::size_t data_len); StreamWriter& WriteFixedLength(const char* data_ptr, std::size_t data_len); void CheckColumn(Type::type physical_type, ConvertedType::type converted_type, int length = -1); /// \brief Skip the next column which must be optional. /// \throws ParquetException if the next column does not exist or is /// not optional. void SkipOptionalColumn(); void WriteNullValue(ColumnWriter* writer); private: using node_ptr_type = std::shared_ptr; struct null_deleter { void operator()(void*) {} }; int32_t column_index_{0}; int64_t current_row_{0}; int64_t row_group_size_{0}; int64_t max_row_group_size_{default_row_group_size_}; std::unique_ptr file_writer_; std::unique_ptr row_group_writer_; std::vector nodes_; static constexpr int16_t kDefLevelZero = 0; static constexpr int16_t kDefLevelOne = 1; static constexpr int16_t kRepLevelZero = 0; static constexpr int64_t kBatchSizeOne = 1; static int64_t default_row_group_size_; }; struct PARQUET_EXPORT EndRowType {}; constexpr EndRowType EndRow = {}; struct PARQUET_EXPORT EndRowGroupType {}; constexpr EndRowGroupType EndRowGroup = {}; PARQUET_EXPORT StreamWriter& operator<<(StreamWriter&, EndRowType); PARQUET_EXPORT StreamWriter& operator<<(StreamWriter&, EndRowGroupType); } // namespace parquet