// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #pragma once #include #include #include #include #include #include "arrow/io/caching.h" #include "arrow/type.h" #include "arrow/util/compression.h" #include "arrow/util/type_fwd.h" #include "parquet/encryption/encryption.h" #include "parquet/exception.h" #include "parquet/parquet_version.h" #include "parquet/platform.h" #include "parquet/schema.h" #include "parquet/type_fwd.h" #include "parquet/types.h" namespace parquet { /// Controls serialization format of data pages. parquet-format v2.0.0 /// introduced a new data page metadata type DataPageV2 and serialized page /// structure (for example, encoded levels are no longer compressed). Prior to /// the completion of PARQUET-457 in 2020, this library did not implement /// DataPageV2 correctly, so if you use the V2 data page format, you may have /// forward compatibility issues (older versions of the library will be unable /// to read the files). Note that some Parquet implementations do not implement /// DataPageV2 at all. enum class ParquetDataPageVersion { V1, V2 }; /// Align the default buffer size to a small multiple of a page size. constexpr int64_t kDefaultBufferSize = 4096 * 4; constexpr int32_t kDefaultThriftStringSizeLimit = 100 * 1000 * 1000; // Structs in the thrift definition are relatively large (at least 300 bytes). // This limits total memory to the same order of magnitude as // kDefaultStringSizeLimit. constexpr int32_t kDefaultThriftContainerSizeLimit = 1000 * 1000; class PARQUET_EXPORT ReaderProperties { public: explicit ReaderProperties(MemoryPool* pool = ::arrow::default_memory_pool()) : pool_(pool) {} MemoryPool* memory_pool() const { return pool_; } std::shared_ptr GetStream(std::shared_ptr source, int64_t start, int64_t num_bytes); /// Buffered stream reading allows the user to control the memory usage of /// parquet readers. This ensure that all `RandomAccessFile::ReadAt` calls are /// wrapped in a buffered reader that uses a fix sized buffer (of size /// `buffer_size()`) instead of the full size of the ReadAt. /// /// The primary reason for this control knobs is for resource control and not /// performance. bool is_buffered_stream_enabled() const { return buffered_stream_enabled_; } /// Enable buffered stream reading. void enable_buffered_stream() { buffered_stream_enabled_ = true; } /// Disable buffered stream reading. void disable_buffered_stream() { buffered_stream_enabled_ = false; } bool read_dense_for_nullable() const { return read_dense_for_nullable_; } void enable_read_dense_for_nullable() { read_dense_for_nullable_ = true; } void disable_read_dense_for_nullable() { read_dense_for_nullable_ = false; } /// Return the size of the buffered stream buffer. int64_t buffer_size() const { return buffer_size_; } /// Set the size of the buffered stream buffer in bytes. void set_buffer_size(int64_t size) { buffer_size_ = size; } /// \brief Return the size limit on thrift strings. /// /// This limit helps prevent space and time bombs in files, but may need to /// be increased in order to read files with especially large headers. int32_t thrift_string_size_limit() const { return thrift_string_size_limit_; } /// Set the size limit on thrift strings. void set_thrift_string_size_limit(int32_t size) { thrift_string_size_limit_ = size; } /// \brief Return the size limit on thrift containers. /// /// This limit helps prevent space and time bombs in files, but may need to /// be increased in order to read files with especially large headers. int32_t thrift_container_size_limit() const { return thrift_container_size_limit_; } /// Set the size limit on thrift containers. void set_thrift_container_size_limit(int32_t size) { thrift_container_size_limit_ = size; } /// Set the decryption properties. void file_decryption_properties(std::shared_ptr decryption) { file_decryption_properties_ = std::move(decryption); } /// Return the decryption properties. const std::shared_ptr& file_decryption_properties() const { return file_decryption_properties_; } bool page_checksum_verification() const { return page_checksum_verification_; } void set_page_checksum_verification(bool check_crc) { page_checksum_verification_ = check_crc; } private: MemoryPool* pool_; int64_t buffer_size_ = kDefaultBufferSize; int32_t thrift_string_size_limit_ = kDefaultThriftStringSizeLimit; int32_t thrift_container_size_limit_ = kDefaultThriftContainerSizeLimit; bool buffered_stream_enabled_ = false; bool page_checksum_verification_ = false; // Used with a RecordReader. bool read_dense_for_nullable_ = false; std::shared_ptr file_decryption_properties_; }; ReaderProperties PARQUET_EXPORT default_reader_properties(); static constexpr int64_t kDefaultDataPageSize = 1024 * 1024; static constexpr bool DEFAULT_IS_DICTIONARY_ENABLED = true; static constexpr int64_t DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT = kDefaultDataPageSize; static constexpr int64_t DEFAULT_WRITE_BATCH_SIZE = 1024; static constexpr int64_t DEFAULT_MAX_ROW_GROUP_LENGTH = 1024 * 1024; static constexpr bool DEFAULT_ARE_STATISTICS_ENABLED = true; static constexpr int64_t DEFAULT_MAX_STATISTICS_SIZE = 4096; static constexpr Encoding::type DEFAULT_ENCODING = Encoding::UNKNOWN; static const char DEFAULT_CREATED_BY[] = CREATED_BY_VERSION; static constexpr Compression::type DEFAULT_COMPRESSION_TYPE = Compression::UNCOMPRESSED; static constexpr bool DEFAULT_IS_PAGE_INDEX_ENABLED = false; class PARQUET_EXPORT ColumnProperties { public: ColumnProperties(Encoding::type encoding = DEFAULT_ENCODING, Compression::type codec = DEFAULT_COMPRESSION_TYPE, bool dictionary_enabled = DEFAULT_IS_DICTIONARY_ENABLED, bool statistics_enabled = DEFAULT_ARE_STATISTICS_ENABLED, size_t max_stats_size = DEFAULT_MAX_STATISTICS_SIZE, bool page_index_enabled = DEFAULT_IS_PAGE_INDEX_ENABLED) : encoding_(encoding), codec_(codec), dictionary_enabled_(dictionary_enabled), statistics_enabled_(statistics_enabled), max_stats_size_(max_stats_size), page_index_enabled_(page_index_enabled) {} void set_encoding(Encoding::type encoding) { encoding_ = encoding; } void set_compression(Compression::type codec) { codec_ = codec; } void set_dictionary_enabled(bool dictionary_enabled) { dictionary_enabled_ = dictionary_enabled; } void set_statistics_enabled(bool statistics_enabled) { statistics_enabled_ = statistics_enabled; } void set_max_statistics_size(size_t max_stats_size) { max_stats_size_ = max_stats_size; } void set_compression_level(int compression_level) { if (!codec_options_) { codec_options_ = std::make_shared(); } codec_options_->compression_level = compression_level; } void set_codec_options(const std::shared_ptr& codec_options) { codec_options_ = codec_options; } void set_page_index_enabled(bool page_index_enabled) { page_index_enabled_ = page_index_enabled; } Encoding::type encoding() const { return encoding_; } Compression::type compression() const { return codec_; } bool dictionary_enabled() const { return dictionary_enabled_; } bool statistics_enabled() const { return statistics_enabled_; } size_t max_statistics_size() const { return max_stats_size_; } int compression_level() const { if (!codec_options_) { return ::arrow::util::kUseDefaultCompressionLevel; } return codec_options_->compression_level; } const std::shared_ptr& codec_options() const { return codec_options_; } bool page_index_enabled() const { return page_index_enabled_; } private: Encoding::type encoding_; Compression::type codec_; bool dictionary_enabled_; bool statistics_enabled_; size_t max_stats_size_; std::shared_ptr codec_options_; bool page_index_enabled_; }; class PARQUET_EXPORT WriterProperties { public: class Builder { public: Builder() : pool_(::arrow::default_memory_pool()), dictionary_pagesize_limit_(DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT), write_batch_size_(DEFAULT_WRITE_BATCH_SIZE), max_row_group_length_(DEFAULT_MAX_ROW_GROUP_LENGTH), pagesize_(kDefaultDataPageSize), version_(ParquetVersion::PARQUET_2_6), data_page_version_(ParquetDataPageVersion::V1), created_by_(DEFAULT_CREATED_BY), store_decimal_as_integer_(false), page_checksum_enabled_(false) {} explicit Builder(const WriterProperties& properties) : pool_(properties.memory_pool()), dictionary_pagesize_limit_(properties.dictionary_pagesize_limit()), write_batch_size_(properties.write_batch_size()), max_row_group_length_(properties.max_row_group_length()), pagesize_(properties.data_pagesize()), version_(properties.version()), data_page_version_(properties.data_page_version()), created_by_(properties.created_by()), store_decimal_as_integer_(properties.store_decimal_as_integer()), page_checksum_enabled_(properties.page_checksum_enabled()), sorting_columns_(properties.sorting_columns()), default_column_properties_(properties.default_column_properties()) {} virtual ~Builder() {} /// Specify the memory pool for the writer. Default default_memory_pool. Builder* memory_pool(MemoryPool* pool) { pool_ = pool; return this; } /// Enable dictionary encoding in general for all columns. Default /// enabled. Builder* enable_dictionary() { default_column_properties_.set_dictionary_enabled(true); return this; } /// Disable dictionary encoding in general for all columns. Default /// enabled. Builder* disable_dictionary() { default_column_properties_.set_dictionary_enabled(false); return this; } /// Enable dictionary encoding for column specified by `path`. Default /// enabled. Builder* enable_dictionary(const std::string& path) { dictionary_enabled_[path] = true; return this; } /// Enable dictionary encoding for column specified by `path`. Default /// enabled. Builder* enable_dictionary(const std::shared_ptr& path) { return this->enable_dictionary(path->ToDotString()); } /// Disable dictionary encoding for column specified by `path`. Default /// enabled. Builder* disable_dictionary(const std::string& path) { dictionary_enabled_[path] = false; return this; } /// Disable dictionary encoding for column specified by `path`. Default /// enabled. Builder* disable_dictionary(const std::shared_ptr& path) { return this->disable_dictionary(path->ToDotString()); } /// Specify the dictionary page size limit per row group. Default 1MB. Builder* dictionary_pagesize_limit(int64_t dictionary_psize_limit) { dictionary_pagesize_limit_ = dictionary_psize_limit; return this; } /// Specify the write batch size while writing batches of Arrow values /// into Parquet. Default 1024. Builder* write_batch_size(int64_t write_batch_size) { write_batch_size_ = write_batch_size; return this; } /// Specify the max number of rows to put in a single row group. /// Default 1Mi rows. Builder* max_row_group_length(int64_t max_row_group_length) { max_row_group_length_ = max_row_group_length; return this; } /// Specify the data page size. /// Default 1MB. Builder* data_pagesize(int64_t pg_size) { pagesize_ = pg_size; return this; } /// Specify the data page version. /// Default V1. Builder* data_page_version(ParquetDataPageVersion data_page_version) { data_page_version_ = data_page_version; return this; } /// Specify the Parquet file version. /// Default PARQUET_2_6. Builder* version(ParquetVersion::type version) { version_ = version; return this; } Builder* created_by(const std::string& created_by) { created_by_ = created_by; return this; } Builder* enable_page_checksum() { page_checksum_enabled_ = true; return this; } Builder* disable_page_checksum() { page_checksum_enabled_ = false; return this; } /// \brief Define the encoding that is used when we don't utilise dictionary encoding. // /// This either apply if dictionary encoding is disabled or if we fallback /// as the dictionary grew too large. Builder* encoding(Encoding::type encoding_type) { if (encoding_type == Encoding::PLAIN_DICTIONARY || encoding_type == Encoding::RLE_DICTIONARY) { throw ParquetException("Can't use dictionary encoding as fallback encoding"); } default_column_properties_.set_encoding(encoding_type); return this; } /// \brief Define the encoding that is used when we don't utilise dictionary encoding. // /// This either apply if dictionary encoding is disabled or if we fallback /// as the dictionary grew too large. Builder* encoding(const std::string& path, Encoding::type encoding_type) { if (encoding_type == Encoding::PLAIN_DICTIONARY || encoding_type == Encoding::RLE_DICTIONARY) { throw ParquetException("Can't use dictionary encoding as fallback encoding"); } encodings_[path] = encoding_type; return this; } /// \brief Define the encoding that is used when we don't utilise dictionary encoding. // /// This either apply if dictionary encoding is disabled or if we fallback /// as the dictionary grew too large. Builder* encoding(const std::shared_ptr& path, Encoding::type encoding_type) { return this->encoding(path->ToDotString(), encoding_type); } /// Specify compression codec in general for all columns. /// Default UNCOMPRESSED. Builder* compression(Compression::type codec) { default_column_properties_.set_compression(codec); return this; } /// Specify max statistics size to store min max value. /// Default 4KB. Builder* max_statistics_size(size_t max_stats_sz) { default_column_properties_.set_max_statistics_size(max_stats_sz); return this; } /// Specify compression codec for the column specified by `path`. /// Default UNCOMPRESSED. Builder* compression(const std::string& path, Compression::type codec) { codecs_[path] = codec; return this; } /// Specify compression codec for the column specified by `path`. /// Default UNCOMPRESSED. Builder* compression(const std::shared_ptr& path, Compression::type codec) { return this->compression(path->ToDotString(), codec); } /// \brief Specify the default compression level for the compressor in /// every column. In case a column does not have an explicitly specified /// compression level, the default one would be used. /// /// The provided compression level is compressor specific. The user would /// have to familiarize oneself with the available levels for the selected /// compressor. If the compressor does not allow for selecting different /// compression levels, calling this function would not have any effect. /// Parquet and Arrow do not validate the passed compression level. If no /// level is selected by the user or if the special /// std::numeric_limits::min() value is passed, then Arrow selects the /// compression level. /// /// If other compressor-specific options need to be set in addition to the compression /// level, use the codec_options method. Builder* compression_level(int compression_level) { default_column_properties_.set_compression_level(compression_level); return this; } /// \brief Specify a compression level for the compressor for the column /// described by path. /// /// The provided compression level is compressor specific. The user would /// have to familiarize oneself with the available levels for the selected /// compressor. If the compressor does not allow for selecting different /// compression levels, calling this function would not have any effect. /// Parquet and Arrow do not validate the passed compression level. If no /// level is selected by the user or if the special /// std::numeric_limits::min() value is passed, then Arrow selects the /// compression level. Builder* compression_level(const std::string& path, int compression_level) { if (!codec_options_[path]) { codec_options_[path] = std::make_shared(); } codec_options_[path]->compression_level = compression_level; return this; } /// \brief Specify a compression level for the compressor for the column /// described by path. /// /// The provided compression level is compressor specific. The user would /// have to familiarize oneself with the available levels for the selected /// compressor. If the compressor does not allow for selecting different /// compression levels, calling this function would not have any effect. /// Parquet and Arrow do not validate the passed compression level. If no /// level is selected by the user or if the special /// std::numeric_limits::min() value is passed, then Arrow selects the /// compression level. Builder* compression_level(const std::shared_ptr& path, int compression_level) { return this->compression_level(path->ToDotString(), compression_level); } /// \brief Specify the default codec options for the compressor in /// every column. /// /// The codec options allow configuring the compression level as well /// as other codec-specific options. Builder* codec_options( const std::shared_ptr<::arrow::util::CodecOptions>& codec_options) { default_column_properties_.set_codec_options(codec_options); return this; } /// \brief Specify the codec options for the compressor for the column /// described by path. Builder* codec_options( const std::string& path, const std::shared_ptr<::arrow::util::CodecOptions>& codec_options) { codec_options_[path] = codec_options; return this; } /// \brief Specify the codec options for the compressor for the column /// described by path. Builder* codec_options( const std::shared_ptr& path, const std::shared_ptr<::arrow::util::CodecOptions>& codec_options) { return this->codec_options(path->ToDotString(), codec_options); } /// Define the file encryption properties. /// Default NULL. Builder* encryption( std::shared_ptr file_encryption_properties) { file_encryption_properties_ = std::move(file_encryption_properties); return this; } /// Enable statistics in general. /// Default enabled. Builder* enable_statistics() { default_column_properties_.set_statistics_enabled(true); return this; } /// Disable statistics in general. /// Default enabled. Builder* disable_statistics() { default_column_properties_.set_statistics_enabled(false); return this; } /// Enable statistics for the column specified by `path`. /// Default enabled. Builder* enable_statistics(const std::string& path) { statistics_enabled_[path] = true; return this; } /// Enable statistics for the column specified by `path`. /// Default enabled. Builder* enable_statistics(const std::shared_ptr& path) { return this->enable_statistics(path->ToDotString()); } /// Define the sorting columns. /// Default empty. /// /// If sorting columns are set, user should ensure that records /// are sorted by sorting columns. Otherwise, the storing data /// will be inconsistent with sorting_columns metadata. Builder* set_sorting_columns(std::vector sorting_columns) { sorting_columns_ = std::move(sorting_columns); return this; } /// Disable statistics for the column specified by `path`. /// Default enabled. Builder* disable_statistics(const std::string& path) { statistics_enabled_[path] = false; return this; } /// Disable statistics for the column specified by `path`. /// Default enabled. Builder* disable_statistics(const std::shared_ptr& path) { return this->disable_statistics(path->ToDotString()); } /// Allow decimals with 1 <= precision <= 18 to be stored as integers. /// /// In Parquet, DECIMAL can be stored in any of the following physical types: /// - int32: for 1 <= precision <= 9. /// - int64: for 10 <= precision <= 18. /// - fixed_len_byte_array: precision is limited by the array size. /// Length n can store <= floor(log_10(2^(8*n - 1) - 1)) base-10 digits. /// - binary: precision is unlimited. The minimum number of bytes to store /// the unscaled value is used. /// /// By default, this is DISABLED and all decimal types annotate fixed_len_byte_array. /// /// When enabled, the C++ writer will use following physical types to store decimals: /// - int32: for 1 <= precision <= 9. /// - int64: for 10 <= precision <= 18. /// - fixed_len_byte_array: for precision > 18. /// /// As a consequence, decimal columns stored in integer types are more compact. Builder* enable_store_decimal_as_integer() { store_decimal_as_integer_ = true; return this; } /// Disable decimal logical type with 1 <= precision <= 18 to be stored /// as integer physical type. /// /// Default disabled. Builder* disable_store_decimal_as_integer() { store_decimal_as_integer_ = false; return this; } /// Enable writing page index in general for all columns. Default disabled. /// /// Writing statistics to the page index disables the old method of writing /// statistics to each data page header. /// The page index makes filtering more efficient than the page header, as /// it gathers all the statistics for a Parquet file in a single place, /// avoiding scattered I/O. /// /// Please check the link below for more details: /// https://github.com/apache/parquet-format/blob/master/PageIndex.md Builder* enable_write_page_index() { default_column_properties_.set_page_index_enabled(true); return this; } /// Disable writing page index in general for all columns. Default disabled. Builder* disable_write_page_index() { default_column_properties_.set_page_index_enabled(false); return this; } /// Enable writing page index for column specified by `path`. Default disabled. Builder* enable_write_page_index(const std::string& path) { page_index_enabled_[path] = true; return this; } /// Enable writing page index for column specified by `path`. Default disabled. Builder* enable_write_page_index(const std::shared_ptr& path) { return this->enable_write_page_index(path->ToDotString()); } /// Disable writing page index for column specified by `path`. Default disabled. Builder* disable_write_page_index(const std::string& path) { page_index_enabled_[path] = false; return this; } /// Disable writing page index for column specified by `path`. Default disabled. Builder* disable_write_page_index(const std::shared_ptr& path) { return this->disable_write_page_index(path->ToDotString()); } /// \brief Build the WriterProperties with the builder parameters. /// \return The WriterProperties defined by the builder. std::shared_ptr build() { std::unordered_map column_properties; auto get = [&](const std::string& key) -> ColumnProperties& { auto it = column_properties.find(key); if (it == column_properties.end()) return column_properties[key] = default_column_properties_; else return it->second; }; for (const auto& item : encodings_) get(item.first).set_encoding(item.second); for (const auto& item : codecs_) get(item.first).set_compression(item.second); for (const auto& item : codec_options_) get(item.first).set_codec_options(item.second); for (const auto& item : dictionary_enabled_) get(item.first).set_dictionary_enabled(item.second); for (const auto& item : statistics_enabled_) get(item.first).set_statistics_enabled(item.second); for (const auto& item : page_index_enabled_) get(item.first).set_page_index_enabled(item.second); return std::shared_ptr(new WriterProperties( pool_, dictionary_pagesize_limit_, write_batch_size_, max_row_group_length_, pagesize_, version_, created_by_, page_checksum_enabled_, std::move(file_encryption_properties_), default_column_properties_, column_properties, data_page_version_, store_decimal_as_integer_, std::move(sorting_columns_))); } private: MemoryPool* pool_; int64_t dictionary_pagesize_limit_; int64_t write_batch_size_; int64_t max_row_group_length_; int64_t pagesize_; ParquetVersion::type version_; ParquetDataPageVersion data_page_version_; std::string created_by_; bool store_decimal_as_integer_; bool page_checksum_enabled_; std::shared_ptr file_encryption_properties_; // If empty, there is no sorting columns. std::vector sorting_columns_; // Settings used for each column unless overridden in any of the maps below ColumnProperties default_column_properties_; std::unordered_map encodings_; std::unordered_map codecs_; std::unordered_map> codec_options_; std::unordered_map dictionary_enabled_; std::unordered_map statistics_enabled_; std::unordered_map page_index_enabled_; }; inline MemoryPool* memory_pool() const { return pool_; } inline int64_t dictionary_pagesize_limit() const { return dictionary_pagesize_limit_; } inline int64_t write_batch_size() const { return write_batch_size_; } inline int64_t max_row_group_length() const { return max_row_group_length_; } inline int64_t data_pagesize() const { return pagesize_; } inline ParquetDataPageVersion data_page_version() const { return parquet_data_page_version_; } inline ParquetVersion::type version() const { return parquet_version_; } inline std::string created_by() const { return parquet_created_by_; } inline bool store_decimal_as_integer() const { return store_decimal_as_integer_; } inline bool page_checksum_enabled() const { return page_checksum_enabled_; } inline Encoding::type dictionary_index_encoding() const { if (parquet_version_ == ParquetVersion::PARQUET_1_0) { return Encoding::PLAIN_DICTIONARY; } else { return Encoding::RLE_DICTIONARY; } } inline Encoding::type dictionary_page_encoding() const { if (parquet_version_ == ParquetVersion::PARQUET_1_0) { return Encoding::PLAIN_DICTIONARY; } else { return Encoding::PLAIN; } } const ColumnProperties& column_properties( const std::shared_ptr& path) const { auto it = column_properties_.find(path->ToDotString()); if (it != column_properties_.end()) return it->second; return default_column_properties_; } Encoding::type encoding(const std::shared_ptr& path) const { return column_properties(path).encoding(); } Compression::type compression(const std::shared_ptr& path) const { return column_properties(path).compression(); } int compression_level(const std::shared_ptr& path) const { return column_properties(path).compression_level(); } const std::shared_ptr codec_options( const std::shared_ptr& path) const { return column_properties(path).codec_options(); } bool dictionary_enabled(const std::shared_ptr& path) const { return column_properties(path).dictionary_enabled(); } const std::vector& sorting_columns() const { return sorting_columns_; } bool statistics_enabled(const std::shared_ptr& path) const { return column_properties(path).statistics_enabled(); } size_t max_statistics_size(const std::shared_ptr& path) const { return column_properties(path).max_statistics_size(); } bool page_index_enabled(const std::shared_ptr& path) const { return column_properties(path).page_index_enabled(); } bool page_index_enabled() const { if (default_column_properties_.page_index_enabled()) { return true; } for (const auto& item : column_properties_) { if (item.second.page_index_enabled()) { return true; } } return false; } inline FileEncryptionProperties* file_encryption_properties() const { return file_encryption_properties_.get(); } std::shared_ptr column_encryption_properties( const std::string& path) const { if (file_encryption_properties_) { return file_encryption_properties_->column_encryption_properties(path); } else { return NULLPTR; } } // \brief Return the default column properties const ColumnProperties& default_column_properties() const { return default_column_properties_; } private: explicit WriterProperties( MemoryPool* pool, int64_t dictionary_pagesize_limit, int64_t write_batch_size, int64_t max_row_group_length, int64_t pagesize, ParquetVersion::type version, const std::string& created_by, bool page_write_checksum_enabled, std::shared_ptr file_encryption_properties, const ColumnProperties& default_column_properties, const std::unordered_map& column_properties, ParquetDataPageVersion data_page_version, bool store_short_decimal_as_integer, std::vector sorting_columns) : pool_(pool), dictionary_pagesize_limit_(dictionary_pagesize_limit), write_batch_size_(write_batch_size), max_row_group_length_(max_row_group_length), pagesize_(pagesize), parquet_data_page_version_(data_page_version), parquet_version_(version), parquet_created_by_(created_by), store_decimal_as_integer_(store_short_decimal_as_integer), page_checksum_enabled_(page_write_checksum_enabled), file_encryption_properties_(file_encryption_properties), sorting_columns_(std::move(sorting_columns)), default_column_properties_(default_column_properties), column_properties_(column_properties) {} MemoryPool* pool_; int64_t dictionary_pagesize_limit_; int64_t write_batch_size_; int64_t max_row_group_length_; int64_t pagesize_; ParquetDataPageVersion parquet_data_page_version_; ParquetVersion::type parquet_version_; std::string parquet_created_by_; bool store_decimal_as_integer_; bool page_checksum_enabled_; std::shared_ptr file_encryption_properties_; std::vector sorting_columns_; ColumnProperties default_column_properties_; std::unordered_map column_properties_; }; PARQUET_EXPORT const std::shared_ptr& default_writer_properties(); // ---------------------------------------------------------------------- // Properties specific to Apache Arrow columnar read and write static constexpr bool kArrowDefaultUseThreads = false; // Default number of rows to read when using ::arrow::RecordBatchReader static constexpr int64_t kArrowDefaultBatchSize = 64 * 1024; /// EXPERIMENTAL: Properties for configuring FileReader behavior. class PARQUET_EXPORT ArrowReaderProperties { public: explicit ArrowReaderProperties(bool use_threads = kArrowDefaultUseThreads) : use_threads_(use_threads), read_dict_indices_(), batch_size_(kArrowDefaultBatchSize), pre_buffer_(true), cache_options_(::arrow::io::CacheOptions::LazyDefaults()), coerce_int96_timestamp_unit_(::arrow::TimeUnit::NANO) {} /// \brief Set whether to use the IO thread pool to parse columns in parallel. /// /// Default is false. void set_use_threads(bool use_threads) { use_threads_ = use_threads; } /// Return whether will use multiple threads. bool use_threads() const { return use_threads_; } /// \brief Set whether to read a particular column as dictionary encoded. /// /// If the file metadata contains a serialized Arrow schema, then ... //// /// This is only supported for columns with a Parquet physical type of /// BYTE_ARRAY, such as string or binary types. void set_read_dictionary(int column_index, bool read_dict) { if (read_dict) { read_dict_indices_.insert(column_index); } else { read_dict_indices_.erase(column_index); } } /// Return whether the column at the index will be read as dictionary. bool read_dictionary(int column_index) const { if (read_dict_indices_.find(column_index) != read_dict_indices_.end()) { return true; } else { return false; } } /// \brief Set the maximum number of rows to read into a record batch. /// /// Will only be fewer rows when there are no more rows in the file. /// Note that some APIs such as ReadTable may ignore this setting. void set_batch_size(int64_t batch_size) { batch_size_ = batch_size; } /// Return the batch size in rows. /// /// Note that some APIs such as ReadTable may ignore this setting. int64_t batch_size() const { return batch_size_; } /// Enable read coalescing (default false). /// /// When enabled, the Arrow reader will pre-buffer necessary regions /// of the file in-memory. This is intended to improve performance on /// high-latency filesystems (e.g. Amazon S3). void set_pre_buffer(bool pre_buffer) { pre_buffer_ = pre_buffer; } /// Return whether read coalescing is enabled. bool pre_buffer() const { return pre_buffer_; } /// Set options for read coalescing. This can be used to tune the /// implementation for characteristics of different filesystems. void set_cache_options(::arrow::io::CacheOptions options) { cache_options_ = options; } /// Return the options for read coalescing. const ::arrow::io::CacheOptions& cache_options() const { return cache_options_; } /// Set execution context for read coalescing. void set_io_context(const ::arrow::io::IOContext& ctx) { io_context_ = ctx; } /// Return the execution context used for read coalescing. const ::arrow::io::IOContext& io_context() const { return io_context_; } /// Set timestamp unit to use for deprecated INT96-encoded timestamps /// (default is NANO). void set_coerce_int96_timestamp_unit(::arrow::TimeUnit::type unit) { coerce_int96_timestamp_unit_ = unit; } ::arrow::TimeUnit::type coerce_int96_timestamp_unit() const { return coerce_int96_timestamp_unit_; } private: bool use_threads_; std::unordered_set read_dict_indices_; int64_t batch_size_; bool pre_buffer_; ::arrow::io::IOContext io_context_; ::arrow::io::CacheOptions cache_options_; ::arrow::TimeUnit::type coerce_int96_timestamp_unit_; }; /// EXPERIMENTAL: Constructs the default ArrowReaderProperties PARQUET_EXPORT ArrowReaderProperties default_arrow_reader_properties(); class PARQUET_EXPORT ArrowWriterProperties { public: enum EngineVersion { V1, // Supports only nested lists. V2 // Full support for all nesting combinations }; class Builder { public: Builder() : write_timestamps_as_int96_(false), coerce_timestamps_enabled_(false), coerce_timestamps_unit_(::arrow::TimeUnit::SECOND), truncated_timestamps_allowed_(false), store_schema_(false), compliant_nested_types_(true), engine_version_(V2), use_threads_(kArrowDefaultUseThreads), executor_(NULLPTR) {} virtual ~Builder() = default; /// \brief Disable writing legacy int96 timestamps (default disabled). Builder* disable_deprecated_int96_timestamps() { write_timestamps_as_int96_ = false; return this; } /// \brief Enable writing legacy int96 timestamps (default disabled). /// /// May be turned on to write timestamps compatible with older Parquet writers. /// This takes precedent over coerce_timestamps. Builder* enable_deprecated_int96_timestamps() { write_timestamps_as_int96_ = true; return this; } /// \brief Coerce all timestamps to the specified time unit. /// \param unit time unit to truncate to. /// For Parquet versions 1.0 and 2.4, nanoseconds are casted to microseconds. Builder* coerce_timestamps(::arrow::TimeUnit::type unit) { coerce_timestamps_enabled_ = true; coerce_timestamps_unit_ = unit; return this; } /// \brief Allow loss of data when truncating timestamps. /// /// This is disallowed by default and an error will be returned. Builder* allow_truncated_timestamps() { truncated_timestamps_allowed_ = true; return this; } /// \brief Disallow loss of data when truncating timestamps (default). Builder* disallow_truncated_timestamps() { truncated_timestamps_allowed_ = false; return this; } /// \brief EXPERIMENTAL: Write binary serialized Arrow schema to the file, /// to enable certain read options (like "read_dictionary") to be set /// automatically Builder* store_schema() { store_schema_ = true; return this; } /// \brief When enabled, will not preserve Arrow field names for list types. /// /// Instead of using the field names Arrow uses for the values array of /// list types (default "item"), will use "element", as is specified in /// the Parquet spec. /// /// This is enabled by default. Builder* enable_compliant_nested_types() { compliant_nested_types_ = true; return this; } /// Preserve Arrow list field name. Builder* disable_compliant_nested_types() { compliant_nested_types_ = false; return this; } /// Set the version of the Parquet writer engine. Builder* set_engine_version(EngineVersion version) { engine_version_ = version; return this; } /// \brief Set whether to use multiple threads to write columns /// in parallel in the buffered row group mode. /// /// WARNING: If writing multiple files in parallel in the same /// executor, deadlock may occur if use_threads is true. Please /// disable it in this case. /// /// Default is false. Builder* set_use_threads(bool use_threads) { use_threads_ = use_threads; return this; } /// \brief Set the executor to write columns in parallel in the /// buffered row group mode. /// /// Default is nullptr and the default cpu executor will be used. Builder* set_executor(::arrow::internal::Executor* executor) { executor_ = executor; return this; } /// Create the final properties. std::shared_ptr build() { return std::shared_ptr(new ArrowWriterProperties( write_timestamps_as_int96_, coerce_timestamps_enabled_, coerce_timestamps_unit_, truncated_timestamps_allowed_, store_schema_, compliant_nested_types_, engine_version_, use_threads_, executor_)); } private: bool write_timestamps_as_int96_; bool coerce_timestamps_enabled_; ::arrow::TimeUnit::type coerce_timestamps_unit_; bool truncated_timestamps_allowed_; bool store_schema_; bool compliant_nested_types_; EngineVersion engine_version_; bool use_threads_; ::arrow::internal::Executor* executor_; }; bool support_deprecated_int96_timestamps() const { return write_timestamps_as_int96_; } bool coerce_timestamps_enabled() const { return coerce_timestamps_enabled_; } ::arrow::TimeUnit::type coerce_timestamps_unit() const { return coerce_timestamps_unit_; } bool truncated_timestamps_allowed() const { return truncated_timestamps_allowed_; } bool store_schema() const { return store_schema_; } /// \brief Enable nested type naming according to the parquet specification. /// /// Older versions of arrow wrote out field names for nested lists based on the name /// of the field. According to the parquet specification they should always be /// "element". bool compliant_nested_types() const { return compliant_nested_types_; } /// \brief The underlying engine version to use when writing Arrow data. /// /// V2 is currently the latest V1 is considered deprecated but left in /// place in case there are bugs detected in V2. EngineVersion engine_version() const { return engine_version_; } /// \brief Returns whether the writer will use multiple threads /// to write columns in parallel in the buffered row group mode. bool use_threads() const { return use_threads_; } /// \brief Returns the executor used to write columns in parallel. ::arrow::internal::Executor* executor() const; private: explicit ArrowWriterProperties(bool write_nanos_as_int96, bool coerce_timestamps_enabled, ::arrow::TimeUnit::type coerce_timestamps_unit, bool truncated_timestamps_allowed, bool store_schema, bool compliant_nested_types, EngineVersion engine_version, bool use_threads, ::arrow::internal::Executor* executor) : write_timestamps_as_int96_(write_nanos_as_int96), coerce_timestamps_enabled_(coerce_timestamps_enabled), coerce_timestamps_unit_(coerce_timestamps_unit), truncated_timestamps_allowed_(truncated_timestamps_allowed), store_schema_(store_schema), compliant_nested_types_(compliant_nested_types), engine_version_(engine_version), use_threads_(use_threads), executor_(executor) {} const bool write_timestamps_as_int96_; const bool coerce_timestamps_enabled_; const ::arrow::TimeUnit::type coerce_timestamps_unit_; const bool truncated_timestamps_allowed_; const bool store_schema_; const bool compliant_nested_types_; const EngineVersion engine_version_; const bool use_threads_; ::arrow::internal::Executor* executor_; }; /// \brief State object used for writing Arrow data directly to a Parquet /// column chunk. API possibly not stable struct ArrowWriteContext { ArrowWriteContext(MemoryPool* memory_pool, ArrowWriterProperties* properties) : memory_pool(memory_pool), properties(properties), data_buffer(AllocateBuffer(memory_pool)), def_levels_buffer(AllocateBuffer(memory_pool)) {} template ::arrow::Status GetScratchData(const int64_t num_values, T** out) { ARROW_RETURN_NOT_OK(this->data_buffer->Resize(num_values * sizeof(T), false)); *out = reinterpret_cast(this->data_buffer->mutable_data()); return ::arrow::Status::OK(); } MemoryPool* memory_pool; const ArrowWriterProperties* properties; // Buffer used for storing the data of an array converted to the physical type // as expected by parquet-cpp. std::shared_ptr data_buffer; // We use the shared ownership of this buffer std::shared_ptr def_levels_buffer; }; PARQUET_EXPORT std::shared_ptr default_arrow_writer_properties(); } // namespace parquet