diff --git a/src/duckdb/src/function/table/version/pragma_version.cpp b/src/duckdb/src/function/table/version/pragma_version.cpp index ed489243f..f68b2bd2d 100644 --- a/src/duckdb/src/function/table/version/pragma_version.cpp +++ b/src/duckdb/src/function/table/version/pragma_version.cpp @@ -1,5 +1,5 @@ #ifndef DUCKDB_PATCH_VERSION -#define DUCKDB_PATCH_VERSION "5-dev29" +#define DUCKDB_PATCH_VERSION "5-dev31" #endif #ifndef DUCKDB_MINOR_VERSION #define DUCKDB_MINOR_VERSION 4 @@ -8,10 +8,10 @@ #define DUCKDB_MAJOR_VERSION 1 #endif #ifndef DUCKDB_VERSION -#define DUCKDB_VERSION "v1.4.5-dev29" +#define DUCKDB_VERSION "v1.4.5-dev31" #endif #ifndef DUCKDB_SOURCE_ID -#define DUCKDB_SOURCE_ID "72a5d755e1" +#define DUCKDB_SOURCE_ID "7a3a63c5c0" #endif #include "duckdb/function/table/system_functions.hpp" #include "duckdb/main/database.hpp" diff --git a/src/duckdb/src/include/duckdb/storage/compression/zstd/zstd.hpp b/src/duckdb/src/include/duckdb/storage/compression/zstd/zstd.hpp new file mode 100644 index 000000000..30f000bef --- /dev/null +++ b/src/duckdb/src/include/duckdb/storage/compression/zstd/zstd.hpp @@ -0,0 +1,356 @@ +#pragma once + +#include "duckdb/common/constants.hpp" +#include "duckdb/common/numeric_utils.hpp" +#include "duckdb/function/compression_function.hpp" +#include "duckdb/common/types/string_type.hpp" +#include "duckdb/common/helper.hpp" +#include "duckdb/storage/storage_info.hpp" +#include "duckdb/common/optional_idx.hpp" +#include "duckdb/storage/buffer/buffer_handle.hpp" +#include "duckdb/storage/table/column_segment.hpp" + +namespace duckdb { + +using page_id_t = int64_t; +using page_offset_t = uint32_t; +using uncompressed_size_t = uint64_t; +using compressed_size_t = uint64_t; +using string_length_t = uint32_t; + +struct ZSTDCompressionBufferFlags { +public: + ZSTDCompressionBufferFlags() : value(0) { + } + ZSTDCompressionBufferFlags(const ZSTDCompressionBufferFlags &other) : value(other.value) { + } + ~ZSTDCompressionBufferFlags() = default; + + ZSTDCompressionBufferFlags &operator=(const ZSTDCompressionBufferFlags &other) { + value = other.value; + return *this; + } + + bool operator==(const ZSTDCompressionBufferFlags &other) const { + return other.value == value; + } + bool operator!=(const ZSTDCompressionBufferFlags &other) const { + return !(*this == other); + } + +public: + // Bit layout + static constexpr uint8_t VECTOR_METADATA_BIT = 0; + static constexpr uint8_t STRING_METADATA_BIT = 1; + static constexpr uint8_t DATA_BIT = 2; + + // Getters + bool HasVectorMetadata() const { + return IsSet(); + } + bool HasStringMetadata() const { + return IsSet(); + } + bool HasData() const { + return IsSet(); + } + + // Setters + void SetVectorMetadata() { + Set(); + } + void SetStringMetadata() { + Set(); + } + void SetData() { + Set(); + } + + // Unsetters + void UnsetVectorMetadata() { + Unset(); + } + void UnsetStringMetadata() { + Unset(); + } + void UnsetData() { + Unset(); + } + + // Clear all flags + void Clear() { + value = 0; + } + +protected: + template + bool IsSet() const { + static const uint8_t FLAG = (1 << BIT); + return (value & FLAG) == FLAG; + } + + template + void Set() { + static const uint8_t FLAG = (1 << BIT); + value |= FLAG; + } + + template + void Unset() { + static const uint8_t FLAG = (1 << BIT); + value &= ~FLAG; + } + +private: + uint8_t value; +}; + +struct ZSTDCompressionBufferState { + //! Flags indicating use of this buffer + ZSTDCompressionBufferFlags flags; + page_offset_t offset = 0; + bool full = false; +}; + +struct ZSTDCompressionBufferCollection { +public: + enum class Slot : uint8_t { SEGMENT, OVERFLOW_0, OVERFLOW_1 }; + +public: + struct BufferData { + public: + BufferData(BufferHandle &handle, ZSTDCompressionBufferState &state, Slot slot) + : handle(handle), state(state), slot(slot) { + } + + public: + BufferHandle &handle; + ZSTDCompressionBufferState &state; + Slot slot; + }; + +public: + page_id_t GetCurrentId() const { +#ifdef DEBUG + if (!buffer_index.IsValid() || buffer_index == 0) { + D_ASSERT(block_id == INVALID_BLOCK); + } else { + D_ASSERT(block_id != INVALID_BLOCK); + } +#endif + return block_id; + } + +public: + void SetCurrentBuffer(Slot slot, page_offset_t offset = 0) { + idx_t index; + switch (slot) { + case Slot::SEGMENT: + index = 0; + break; + case Slot::OVERFLOW_0: + index = 1; + break; + case Slot::OVERFLOW_1: + index = 2; + break; + default: + throw InternalException("ZSTDCompressionBufferCollection::Slot value not handled"); + }; + buffer_index = index; + buffer_states[index].offset = offset; + } + page_offset_t &GetCurrentOffset() { + if (!buffer_index.IsValid()) { + throw InternalException( + "(ZSTDCompressionBufferCollection::GetCurrentOffset) Can't get BufferHandle, no buffer set yet!"); + } + auto index = buffer_index.GetIndex(); + auto &offset = buffer_states[index].offset; + return offset; + } + void AlignCurrentOffset() { + auto &offset = GetCurrentOffset(); + offset = UnsafeNumericCast( + AlignValue(UnsafeNumericCast(offset))); + } + BufferHandle &BufferHandleMutable() { + if (!buffer_index.IsValid()) { + throw InternalException( + "(ZSTDCompressionBufferCollection::BufferHandleMutable) Can't get BufferHandle, no buffer set yet!"); + } + auto index = buffer_index.GetIndex(); + if (index == 0) { + return segment_handle; + } + D_ASSERT(index < 3); + return extra_pages[index - 1]; + } + vector GetBufferData(bool include_segment) { + vector res; + for (idx_t i = 0; i < 3; i++) { + if (!i) { + if (include_segment) { + res.emplace_back(segment_handle, buffer_states[i], Slot::SEGMENT); + } + continue; + } + res.emplace_back(extra_pages[i - 1], buffer_states[i], i == 1 ? Slot::OVERFLOW_0 : Slot::OVERFLOW_1); + } + return res; + } + data_ptr_t GetCurrentBufferPtr() { + if (!buffer_index.IsValid()) { + throw InternalException( + "(ZSTDCompressionBufferCollection::GetCurrentBufferPtr) Can't get BufferHandle, no buffer set yet!"); + } + auto index = buffer_index.GetIndex(); + auto &state = buffer_states[index]; + return BufferHandleMutable().Ptr() + state.offset; + } + bool CanFlush() const { + if (!buffer_index.IsValid()) { + throw InternalException( + "(ZSTDCompressionBufferCollection::CanFlush) Can't determine CanFlush, no buffer set yet!"); + } + auto index = buffer_index.GetIndex(); + if (index == 0) { + //! Can't flush the segment buffer + return false; + } + auto &flags = buffer_states[index].flags; + return !flags.HasVectorMetadata() && !flags.HasStringMetadata(); + } + ZSTDCompressionBufferFlags &GetCurrentFlags() { + return GetCurrentBufferState().flags; + } + ZSTDCompressionBufferState &GetCurrentBufferState() { + if (!buffer_index.IsValid()) { + throw InternalException( + "(ZSTDCompressionBufferCollection::GetCurrentBufferState) Can't get BufferState, no buffer set yet!"); + } + return buffer_states[buffer_index.GetIndex()]; + } + bool IsOnSegmentBuffer() const { + if (!buffer_index.IsValid()) { + return false; + } + return buffer_index.GetIndex() == 0; + } + +public: + //! Current block-id of the overflow page we're writing + //! NOTE: INVALID_BLOCK means we haven't spilled to an overflow page yet + block_id_t block_id = INVALID_BLOCK; + + //! The current segment + buffer of the segment + unique_ptr segment; + BufferHandle segment_handle; + // Non-segment buffers + BufferHandle extra_pages[2]; + + //! 0: segment_handle + //! 1: extra_pages[0]; + //! 2: extra_pages[1]; + ZSTDCompressionBufferState buffer_states[3]; + +private: + optional_idx buffer_index; +}; + +//! State for the current segment (a collection of vectors) +struct ZSTDCompressionSegmentState { +public: + ZSTDCompressionSegmentState() { + } + +public: + void InitializeSegment(ZSTDCompressionBufferCollection &buffer_collection, idx_t vectors_in_segment) { + total_vectors_in_segment = vectors_in_segment; + vector_in_segment_count = 0; + buffer_collection.block_id = INVALID_BLOCK; + + //! Have to be on the segment handle + if (!buffer_collection.IsOnSegmentBuffer()) { + throw InternalException("(ZSTDCompressionSegmentState::InitializeSegment) Can't InitializeSegment on a " + "non-segment buffer!"); + } + auto base = buffer_collection.segment_handle.Ptr(); + page_offset_t offset = 0; + page_ids = reinterpret_cast(base + offset); + offset += (sizeof(page_id_t) * vectors_in_segment); + + offset = AlignValue(offset); + page_offsets = reinterpret_cast(base + offset); + offset += (sizeof(page_offset_t) * vectors_in_segment); + + offset = AlignValue(offset); + uncompressed_sizes = reinterpret_cast(base + offset); + offset += (sizeof(uncompressed_size_t) * vectors_in_segment); + + offset = AlignValue(offset); + compressed_sizes = reinterpret_cast(base + offset); + offset += (sizeof(compressed_size_t) * vectors_in_segment); + + buffer_collection.buffer_states[0].offset = offset; + } + +public: + //! Amount of vectors in this segment, determined during analyze + idx_t total_vectors_in_segment = 0xDEADBEEF; + //! The amount of vectors we've seen in the current segment + idx_t vector_in_segment_count = 0; + + page_id_t *page_ids = nullptr; + page_offset_t *page_offsets = nullptr; + uncompressed_size_t *uncompressed_sizes = nullptr; + compressed_size_t *compressed_sizes = nullptr; +}; + +//===--------------------------------------------------------------------===// +// Vector metadata +//===--------------------------------------------------------------------===// +struct ZSTDCompressionVectorState { +public: + ZSTDCompressionVectorState() { + } + +public: + bool AddStringLength(const string_t &str) { + string_lengths[tuple_count++] = UnsafeNumericCast(str.GetSize()); + return tuple_count >= vector_size; + } + + void Initialize(idx_t expected_tuple_count, ZSTDCompressionBufferCollection &buffer_collection, + const CompressionInfo &info) { + vector_size = expected_tuple_count; + + auto current_offset = buffer_collection.GetCurrentOffset(); + //! Mark where the vector begins (page_id + page_offset) + starting_offset = current_offset; + starting_page = buffer_collection.GetCurrentId(); + + //! Set the string_lengths destination and save in what buffer its stored + buffer_collection.GetCurrentFlags().SetStringMetadata(); + string_lengths = reinterpret_cast(buffer_collection.GetCurrentBufferPtr()); + + //! Finally forward the current_buffer_ptr to point *after* all string lengths we'll write + buffer_collection.GetCurrentOffset() += expected_tuple_count * sizeof(string_length_t); + } + +public: + page_id_t starting_page; + page_offset_t starting_offset; + + idx_t uncompressed_size = 0; + idx_t compressed_size = 0; + string_length_t *string_lengths = nullptr; + + bool in_vector = false; + //! Amount of tuples we have seen for the current vector + idx_t tuple_count = 0; + //! The expected size of this vector (ZSTD_VECTOR_SIZE except for the last one) + idx_t vector_size; +}; + +} // namespace duckdb diff --git a/src/duckdb/src/storage/compression/zstd.cpp b/src/duckdb/src/storage/compression/zstd.cpp index 1ccb489a6..cd2540cb9 100644 --- a/src/duckdb/src/storage/compression/zstd.cpp +++ b/src/duckdb/src/storage/compression/zstd.cpp @@ -9,6 +9,7 @@ #include "duckdb/common/serializer/deserializer.hpp" #include "duckdb/storage/segment/uncompressed.hpp" +#include "duckdb/storage/compression/zstd/zstd.hpp" #include "zstd.h" /* @@ -32,12 +33,6 @@ Data layout per segment: +--------------------------------------------+ */ -using page_id_t = int64_t; -using page_offset_t = uint32_t; -using uncompressed_size_t = uint64_t; -using compressed_size_t = uint64_t; -using string_length_t = uint32_t; - static int32_t GetCompressionLevel() { return duckdb_zstd::ZSTD_defaultCLevel(); } @@ -123,7 +118,7 @@ struct ZSTDAnalyzeState : public AnalyzeState { public: DBConfig &config; - duckdb_zstd::ZSTD_CCtx *context; + duckdb_zstd::ZSTD_CCtx *context = nullptr; //! The combined string lengths for all values in the segment idx_t total_size = 0; //! The total amount of values in the segment @@ -199,6 +194,9 @@ idx_t ZSTDStorage::StringFinalAnalyze(AnalyzeState &state_p) { if (state.values_in_vector) { D_ASSERT(state.values_in_vector < ZSTD_VECTOR_SIZE); + state.vectors_in_segment++; + } + if (state.vectors_in_segment) { state.segment_count++; } @@ -235,87 +233,90 @@ class ZSTDCompressionState : public CompressionState { : CompressionState(analyze_state_p->info), analyze_state(std::move(analyze_state_p)), checkpoint_data(checkpoint_data), partial_block_manager(checkpoint_data.GetCheckpointState().GetPartialBlockManager()), - function(checkpoint_data.GetCompressionFunction(CompressionType::COMPRESSION_ZSTD)) { - - total_vector_count = GetVectorCount(analyze_state->count); - total_segment_count = analyze_state->segment_count; - vectors_per_segment = analyze_state->vectors_per_segment; + function(checkpoint_data.GetCompressionFunction(CompressionType::COMPRESSION_ZSTD)), + total_tuple_count(analyze_state->count), total_vector_count(GetVectorCount(total_tuple_count)), + total_segment_count(analyze_state->segment_count), vectors_per_segment(analyze_state->vectors_per_segment) { segment_count = 0; vector_count = 0; - vector_in_segment_count = 0; - tuple_count = 0; + vector_state.tuple_count = 0; - idx_t offset = NewSegment(); - SetCurrentBuffer(segment_handle); - current_buffer_ptr = segment_handle.Ptr() + offset; - D_ASSERT(GetCurrentOffset() <= GetWritableSpace(info)); + NewSegment(); + if (!(buffer_collection.GetCurrentOffset() <= GetWritableSpace(info))) { + throw InternalException( + "(ZSTDCompressionState::ZSTDCompressionState) Offset (%d) exceeds writable space! (%d)", + buffer_collection.GetCurrentOffset(), GetWritableSpace(info)); + } } public: void ResetOutBuffer() { - D_ASSERT(GetCurrentOffset() <= GetWritableSpace(info)); - out_buffer.dst = current_buffer_ptr; + if (!(buffer_collection.GetCurrentOffset() <= GetWritableSpace(info))) { + throw InternalException("(ZSTDCompressionState::ResetOutBuffer) Offset (%d) exceeds writable space! (%d)", + buffer_collection.GetCurrentOffset(), GetWritableSpace(info)); + } + out_buffer.dst = buffer_collection.GetCurrentBufferPtr(); out_buffer.pos = 0; - auto remaining_space = info.GetBlockSize() - GetCurrentOffset() - sizeof(block_id_t); + auto remaining_space = info.GetBlockSize() - buffer_collection.GetCurrentOffset() - sizeof(block_id_t); out_buffer.size = remaining_space; } - void SetCurrentBuffer(BufferHandle &handle) { - current_buffer = &handle; - current_buffer_ptr = handle.Ptr(); + void WriteBlockIdPointer(page_id_t block_id) { + auto ptr = buffer_collection.GetCurrentBufferPtr(); + Store(block_id, ptr); + buffer_collection.GetCurrentOffset() += sizeof(block_id_t); } - BufferHandle &GetExtraPageBuffer(block_id_t current_block_id) { + void GetExtraPageBuffer(block_id_t current_block_id) { auto &block_manager = partial_block_manager.GetBlockManager(); auto &buffer_manager = block_manager.buffer_manager; - optional_ptr to_use; - - if (in_vector) { - // Currently in a Vector, we have to be mindful of the buffer that the string_lengths lives on - // as that will have to stay writable until the Vector is finished - bool already_separated = current_buffer != vector_lengths_buffer; - if (already_separated) { - // Already separated, can keep using the other buffer (flush it first) - FlushPage(*current_buffer, current_block_id); - to_use = current_buffer; - } else { - // Not already separated, have to use the other page - to_use = current_buffer == &extra_pages[0] ? &extra_pages[1] : &extra_pages[0]; - } - } else { - // Start of a new Vector, the string_lengths did not fit on the previous page - bool previous_page_is_segment = current_buffer == &segment_handle; - if (!previous_page_is_segment) { - // We're asking for a fresh buffer to start the vectors data - // that means the previous vector is finished - so we can flush the current page and reuse it - D_ASSERT(current_block_id != INVALID_BLOCK); - FlushPage(*current_buffer, current_block_id); - to_use = current_buffer; - } else { - // Previous buffer was the segment, take the first extra page in this case - to_use = &extra_pages[0]; - } + auto ¤t_buffer_state = buffer_collection.GetCurrentBufferState(); + current_buffer_state.full = true; + + if (buffer_collection.CanFlush()) { + auto &buffer_state = buffer_collection.GetCurrentBufferState(); + FlushPage(buffer_collection.BufferHandleMutable(), current_block_id); + buffer_state.flags.Clear(); + buffer_state.full = false; + buffer_state.offset = 0; + return; } - if (!to_use->IsValid()) { - *to_use = buffer_manager.Allocate(MemoryTag::OVERFLOW_STRINGS, &block_manager); + //! Cycle through the extra pages, to figure out which one we can use + //! In the worst case, the segment handle is entirely filled with vector metadata + //! The last part of the first extra page is entirely filled with string metadata + //! So we can only use the second extra page for data + auto buffer_data = buffer_collection.GetBufferData(/*include_segment = */ false); + for (auto &buffer : buffer_data) { + auto &buffer_state = buffer.state; + auto &flags = buffer_state.flags; + if (flags.HasStringMetadata() || buffer_state.full) { + continue; + } + buffer_collection.SetCurrentBuffer(buffer.slot); + auto &buffer_handle = buffer_collection.BufferHandleMutable(); + if (!buffer_handle.IsValid()) { + buffer_handle = buffer_manager.Allocate(MemoryTag::OVERFLOW_STRINGS, &block_manager); + } + return; } - return *to_use; + throw InternalException( + "(ZSTDCompressionState::GetExtraPageBuffer) Wasn't able to find a buffer to write overflow data to!"); } - idx_t NewSegment() { - if (current_buffer == &segment_handle) { + void NewSegment() { + if (buffer_collection.IsOnSegmentBuffer()) { // This should never happen, the string lengths + vector metadata size should always exceed a page size, // even if the strings are all empty - throw InternalException("We are asking for a new segment, but somehow we're still writing vector data onto " + throw InternalException("(ZSTDCompressionState::NewSegment) We are asking for a new segment, but somehow " + "we're still writing vector data onto " "the initial (segment) page"); } idx_t row_start; - if (segment) { - row_start = segment->start + segment->count; + if (buffer_collection.segment) { + row_start = buffer_collection.segment->start + buffer_collection.segment->count; FlushSegment(); } else { row_start = checkpoint_data.GetRowGroup().start; @@ -325,65 +326,56 @@ class ZSTDCompressionState : public CompressionState { // Figure out how many vectors we are storing in this segment idx_t vectors_in_segment; if (segment_count + 1 >= total_segment_count) { - vectors_in_segment = total_vector_count - vector_count; + vectors_in_segment = total_vector_count - (segment_count * vectors_per_segment); } else { vectors_in_segment = vectors_per_segment; } - idx_t offset = 0; - page_ids = reinterpret_cast(segment_handle.Ptr() + offset); - offset += (sizeof(page_id_t) * vectors_in_segment); - - offset = AlignValue(offset); - page_offsets = reinterpret_cast(segment_handle.Ptr() + offset); - offset += (sizeof(page_offset_t) * vectors_in_segment); - - offset = AlignValue(offset); - uncompressed_sizes = reinterpret_cast(segment_handle.Ptr() + offset); - offset += (sizeof(uncompressed_size_t) * vectors_in_segment); - - offset = AlignValue(offset); - compressed_sizes = reinterpret_cast(segment_handle.Ptr() + offset); - offset += (sizeof(compressed_size_t) * vectors_in_segment); - - D_ASSERT(offset == GetVectorMetadataSize(vectors_in_segment)); - return offset; + buffer_collection.SetCurrentBuffer(ZSTDCompressionBufferCollection::Slot::SEGMENT); + buffer_collection.buffer_states[0].flags.SetVectorMetadata(); + segment_state.InitializeSegment(buffer_collection, vectors_in_segment); + if (!(buffer_collection.GetCurrentOffset() <= GetWritableSpace(info))) { + throw InternalException("(ZSTDCompressionState::NewSegment) Offset (%d) exceeds writable space! (%d)", + buffer_collection.GetCurrentOffset(), GetWritableSpace(info)); + } } void InitializeVector() { - D_ASSERT(!in_vector); + D_ASSERT(!vector_state.in_vector); + idx_t expected_tuple_count; if (vector_count + 1 >= total_vector_count) { //! Last vector - vector_size = analyze_state->count - (ZSTD_VECTOR_SIZE * vector_count); + expected_tuple_count = analyze_state->count - (ZSTD_VECTOR_SIZE * vector_count); } else { - vector_size = ZSTD_VECTOR_SIZE; + expected_tuple_count = ZSTD_VECTOR_SIZE; + } + buffer_collection.AlignCurrentOffset(); + if (!(buffer_collection.GetCurrentOffset() <= GetWritableSpace(info))) { + throw InternalException("(ZSTDCompressionState::InitializeVector) Offset (%d) exceeds writable space! (%d)", + buffer_collection.GetCurrentOffset(), GetWritableSpace(info)); } - auto current_offset = GetCurrentOffset(); - current_offset = UnsafeNumericCast( - AlignValue(UnsafeNumericCast(current_offset))); - current_buffer_ptr = current_buffer->Ptr() + current_offset; - D_ASSERT(GetCurrentOffset() <= GetWritableSpace(info)); - compressed_size = 0; - uncompressed_size = 0; - - if (GetVectorMetadataSize(vector_in_segment_count + 1) > GetWritableSpace(info)) { - D_ASSERT(vector_in_segment_count <= vectors_per_segment); - // Can't fit this vector on this segment anymore, have to flush and a grab new one + vector_state.compressed_size = 0; + vector_state.uncompressed_size = 0; + vector_state.string_lengths = nullptr; + vector_state.tuple_count = 0; + vector_state.vector_size = 0; + vector_state.starting_page = 0XDEADBEEF; + vector_state.starting_offset = 0XDEADBEEF; + + if (segment_state.vector_in_segment_count + 1 > segment_state.total_vectors_in_segment) { + //! Last vector in the segment NewSegment(); } - if (current_offset + (vector_size * sizeof(string_length_t)) >= GetWritableSpace(info)) { + if (buffer_collection.GetCurrentOffset() + (expected_tuple_count * sizeof(string_length_t)) >= + GetWritableSpace(info)) { // Check if there is room on the current page for the vector data NewPage(); } - current_offset = GetCurrentOffset(); - starting_offset = current_offset; - starting_page = GetCurrentId(); - - vector_lengths_buffer = current_buffer; - string_lengths = reinterpret_cast(current_buffer->Ptr() + current_offset); - current_buffer_ptr = reinterpret_cast(string_lengths); - current_buffer_ptr += vector_size * sizeof(string_length_t); + + buffer_collection.AlignCurrentOffset(); + vector_state.Initialize(expected_tuple_count, buffer_collection, info); + // 'out_buffer' should be set to point directly after the string_lengths ResetOutBuffer(); @@ -392,7 +384,7 @@ class ZSTDCompressionState : public CompressionState { duckdb_zstd::ZSTD_CCtx_refCDict(analyze_state->context, nullptr); duckdb_zstd::ZSTD_CCtx_setParameter(analyze_state->context, duckdb_zstd::ZSTD_c_compressionLevel, GetCompressionLevel()); - in_vector = true; + vector_state.in_vector = true; } void CompressString(const string_t &string, bool end_of_vector) { @@ -403,7 +395,7 @@ class ZSTDCompressionState : public CompressionState { if (!end_of_vector && string.GetSize() == 0) { return; } - uncompressed_size += string.GetSize(); + vector_state.uncompressed_size += string.GetSize(); const auto end_mode = end_of_vector ? duckdb_zstd::ZSTD_e_end : duckdb_zstd::ZSTD_e_continue; size_t compress_result; @@ -414,14 +406,18 @@ class ZSTDCompressionState : public CompressionState { duckdb_zstd::ZSTD_compressStream2(analyze_state->context, &out_buffer, &in_buffer, end_mode); D_ASSERT(out_buffer.pos >= old_pos); auto diff = out_buffer.pos - old_pos; - compressed_size += diff; - current_buffer_ptr += diff; + vector_state.compressed_size += diff; + buffer_collection.GetCurrentOffset() += diff; if (duckdb_zstd::ZSTD_isError(compress_result)) { throw InvalidInputException("ZSTD Compression failed: %s", duckdb_zstd::ZSTD_getErrorName(compress_result)); } - D_ASSERT(GetCurrentOffset() <= GetWritableSpace(info)); + if (!(buffer_collection.GetCurrentOffset() <= GetWritableSpace(info))) { + throw InternalException( + "(ZSTDCompressionState::CompressString) Offset (%d) exceeds writable space! (%d)", + buffer_collection.GetCurrentOffset(), GetWritableSpace(info)); + } if (compress_result == 0) { // Finished break; @@ -430,44 +426,43 @@ class ZSTDCompressionState : public CompressionState { } } - void AddString(const string_t &string) { - if (!tuple_count) { + void AddStringInternal(const string_t &string) { + if (!vector_state.tuple_count) { InitializeVector(); } - - string_lengths[tuple_count] = UnsafeNumericCast(string.GetSize()); - bool final_tuple = tuple_count + 1 >= vector_size; - CompressString(string, final_tuple); - - tuple_count++; - if (tuple_count == vector_size) { + auto is_final_string = vector_state.AddStringLength(string); + CompressString(string, is_final_string); + if (is_final_string) { // Reached the end of this vector FlushVector(); } + } - UncompressedStringStorage::UpdateStringStats(segment->stats, string); + void AddString(const string_t &string) { + AddStringInternal(string); + UncompressedStringStorage::UpdateStringStats(buffer_collection.segment->stats, string); } void NewPage(bool additional_data_page = false) { block_id_t new_id = FinalizePage(); - block_id_t current_block_id = block_id; - auto &buffer = GetExtraPageBuffer(current_block_id); - block_id = new_id; - SetCurrentBuffer(buffer); + block_id_t current_block_id = buffer_collection.GetCurrentId(); + GetExtraPageBuffer(current_block_id); + buffer_collection.block_id = new_id; ResetOutBuffer(); } block_id_t FinalizePage() { auto &block_manager = partial_block_manager.GetBlockManager(); auto new_id = block_manager.GetFreeBlockId(); - auto &state = segment->GetSegmentState()->Cast(); + auto &state = buffer_collection.segment->GetSegmentState()->Cast(); state.RegisterBlock(block_manager, new_id); - D_ASSERT(GetCurrentOffset() <= GetWritableSpace(info)); + auto &buffer_state = buffer_collection.GetCurrentBufferState(); + buffer_state.full = true; // Write the new id at the end of the last page - Store(new_id, current_buffer_ptr); - current_buffer_ptr += sizeof(block_id_t); + WriteBlockIdPointer(new_id); + D_ASSERT(buffer_state.offset <= info.GetBlockSize()); return new_id; } @@ -483,46 +478,63 @@ class ZSTDCompressionState : public CompressionState { void FlushVector() { // Write the metadata for this Vector - page_ids[vector_in_segment_count] = starting_page; - page_offsets[vector_in_segment_count] = starting_offset; - compressed_sizes[vector_in_segment_count] = compressed_size; - uncompressed_sizes[vector_in_segment_count] = uncompressed_size; + segment_state.page_ids[segment_state.vector_in_segment_count] = vector_state.starting_page; + segment_state.page_offsets[segment_state.vector_in_segment_count] = vector_state.starting_offset; + segment_state.compressed_sizes[segment_state.vector_in_segment_count] = vector_state.compressed_size; + segment_state.uncompressed_sizes[segment_state.vector_in_segment_count] = vector_state.uncompressed_size; + if (segment_state.vector_in_segment_count >= segment_state.total_vectors_in_segment) { + throw InternalException( + "(ZSTDCompressionState::FlushVector) Written too many vectors (%d) to this segment! (expected: %d)", + segment_state.vector_in_segment_count, segment_state.total_vectors_in_segment); + } vector_count++; - vector_in_segment_count++; - in_vector = false; - segment->count += tuple_count; - - const bool is_last_vector = vector_count == total_vector_count; - tuple_count = 0; - if (is_last_vector) { - FlushPage(*current_buffer, block_id); - if (starting_page != block_id) { - FlushPage(*vector_lengths_buffer, starting_page); - } - } else { - if (vector_lengths_buffer == current_buffer) { - // We did not cross a page boundary writing this vector - return; + segment_state.vector_in_segment_count++; + vector_state.in_vector = false; + buffer_collection.segment->count += vector_state.tuple_count; + + vector_state.tuple_count = 0; + + //! If the string lengths live on an overflow page, and that buffer is full + //! then we want to flush it + + auto buffer_data = buffer_collection.GetBufferData(/*include_segment=*/true); + ZSTDCompressionBufferCollection::Slot slot = ZSTDCompressionBufferCollection::Slot::SEGMENT; + optional_ptr buffer_handle_ptr; + optional_ptr buffer_state_ptr; + for (auto &buffer : buffer_data) { + auto &buffer_state = buffer.state; + if (buffer_state.flags.HasStringMetadata()) { + if (buffer_handle_ptr) { + throw InternalException("(ZSTDCompressionState::FlushVector) Multiple buffers (%d and %d) have " + "string metadata on them, this is impossible and indicates a bug!", + static_cast(slot), static_cast(buffer.slot)); + } + slot = buffer.slot; + buffer_state_ptr = buffer.state; + buffer_handle_ptr = buffer.handle; } - // Flush the page that holds the vector lengths - FlushPage(*vector_lengths_buffer, starting_page); + buffer_state.flags.UnsetStringMetadata(); + buffer_state.flags.UnsetData(); } - } - page_id_t GetCurrentId() { - if (&segment_handle == current_buffer.get()) { - return INVALID_BLOCK; + if (!buffer_handle_ptr) { + throw InternalException("(ZSTDCompressionState::FlushVector) None of the buffers have string metadata on " + "them, this is impossible and indicates a bug!"); } - return block_id; - } - - page_offset_t GetCurrentOffset() { - auto &handle = *current_buffer; - auto start_of_buffer = handle.Ptr(); - D_ASSERT(current_buffer_ptr >= start_of_buffer); - auto res = (page_offset_t)(current_buffer_ptr - start_of_buffer); - D_ASSERT(res <= GetWritableSpace(info)); - return res; + if (slot == ZSTDCompressionBufferCollection::Slot::SEGMENT) { + //! This is the segment handle, we don't need to flush that here, it'll get flushed when the segment is done + return; + } + auto &buffer_state = *buffer_state_ptr; + if (!buffer_state.full) { + //! It contains the string metadata of the current vector, but the buffer isn't full + //! so we don't need to flush it yet + return; + } + auto &buffer_handle = *buffer_handle_ptr; + FlushPage(buffer_handle, vector_state.starting_page); + buffer_state.offset = 0; + buffer_state.full = false; } void CreateEmptySegment(idx_t row_start) { @@ -530,36 +542,63 @@ class ZSTDCompressionState : public CompressionState { auto &type = checkpoint_data.GetType(); auto compressed_segment = ColumnSegment::CreateTransientSegment(db, function, type, row_start, info.GetBlockSize(), info.GetBlockManager()); - segment = std::move(compressed_segment); + buffer_collection.segment = std::move(compressed_segment); auto &buffer_manager = BufferManager::GetBufferManager(checkpoint_data.GetDatabase()); - segment_handle = buffer_manager.Pin(segment->block); + buffer_collection.segment_handle = buffer_manager.Pin(buffer_collection.segment->block); } void FlushSegment() { - auto &state = checkpoint_data.GetCheckpointState(); - idx_t segment_block_size; + if (segment_state.vector_in_segment_count != segment_state.total_vectors_in_segment) { + throw InternalException("(ZSTDCompressionState::FlushSegment) We haven't written all vectors that we were " + "expecting to write (%d instead of %d)!", + segment_state.vector_in_segment_count, segment_state.total_vectors_in_segment); + } - if (current_buffer.get() == &segment_handle) { - segment_block_size = GetCurrentOffset(); - } else { - // Block is fully used - segment_block_size = info.GetBlockSize(); + auto &segment_buffer_state = buffer_collection.buffer_states[0]; + auto segment_block_size = segment_buffer_state.offset; + if (segment_block_size < GetVectorMetadataSize(segment_state.total_vectors_in_segment)) { + throw InternalException( + "(ZSTDCompressionState::FlushSegment) Expected offset to be at least %d, but found %d instead", + GetVectorMetadataSize(segment_state.total_vectors_in_segment), segment_block_size); } - state.FlushSegment(std::move(segment), std::move(segment_handle), segment_block_size); + bool seen_dirty_buffer = false; + auto buffer_data = buffer_collection.GetBufferData(/*include_segment=*/false); + for (auto &buffer : buffer_data) { + auto &buffer_state = buffer.state; + auto &buffer_handle = buffer.handle; + if (buffer_state.offset != 0) { + if (seen_dirty_buffer) { + throw InternalException("(ZSTDCompressionState::FlushSegment) Both extra pages were dirty (needed " + "to be flushed), this should be impossible"); + } + FlushPage(buffer_handle, buffer_collection.block_id); + buffer_state.full = false; + buffer_state.offset = 0; + buffer_state.flags.Clear(); + seen_dirty_buffer = true; + } + } + auto &state = checkpoint_data.GetCheckpointState(); + state.FlushSegment(std::move(buffer_collection.segment), std::move(buffer_collection.segment_handle), + segment_block_size); + segment_buffer_state.flags.Clear(); + segment_buffer_state.full = true; + segment_buffer_state.offset = 0; segment_count++; - vector_in_segment_count = 0; } void Finalize() { - D_ASSERT(!tuple_count); + D_ASSERT(!vector_state.tuple_count); FlushSegment(); - segment.reset(); + buffer_collection.segment.reset(); } void AddNull() { - AddString(""); + buffer_collection.segment->stats.statistics.SetHasNullFast(); + string_t empty(static_cast(0)); + AddStringInternal(empty); } public: @@ -568,56 +607,29 @@ class ZSTDCompressionState : public CompressionState { PartialBlockManager &partial_block_manager; CompressionFunction &function; - // The segment state - //! Current segment index we're at - idx_t segment_count = 0; + //! --- Analyzed Data --- + //! The amount of tuples we're writing + const idx_t total_tuple_count; + //! The amount of vectors we're writing + const idx_t total_vector_count; //! The total amount of segments we're writing - idx_t total_segment_count = 0; - //! The vectors to store in the last segment - idx_t vectors_in_last_segment = 0; + const idx_t total_segment_count; //! The vectors to store in a segment (not the last one) - idx_t vectors_per_segment = 0; - unique_ptr segment; - BufferHandle segment_handle; - - // Non-segment buffers - BufferHandle extra_pages[2]; - block_id_t block_id = INVALID_BLOCK; + const idx_t vectors_per_segment; - // Current block state - optional_ptr current_buffer; - //! The buffer that contains the vector lengths - optional_ptr vector_lengths_buffer; - data_ptr_t current_buffer_ptr; - - //===--------------------------------------------------------------------===// - // Vector metadata - //===--------------------------------------------------------------------===// - page_id_t starting_page; - page_offset_t starting_offset; - - page_id_t *page_ids; - page_offset_t *page_offsets; - uncompressed_size_t *uncompressed_sizes; - compressed_size_t *compressed_sizes; //! The amount of vectors we've seen so far idx_t vector_count = 0; - //! The amount of vectors we've seen in the current segment - idx_t vector_in_segment_count = 0; - //! The amount of vectors we're writing - idx_t total_vector_count = 0; - //! Whether we are currently in a Vector - bool in_vector = false; + //! Current segment index we're at + idx_t segment_count = 0; + + ZSTDCompressionBufferCollection buffer_collection; + //! The compression context indicating where we are in the output buffer duckdb_zstd::ZSTD_outBuffer out_buffer; - idx_t uncompressed_size = 0; - idx_t compressed_size = 0; - string_length_t *string_lengths; - - //! Amount of tuples we have seen for the current vector - idx_t tuple_count = 0; - //! The expected size of this vector (ZSTD_VECTOR_SIZE except for the last one) - idx_t vector_size; + //! State of the current Vector we are writing + ZSTDCompressionVectorState vector_state; + //! State of the current Segment we are writing + ZSTDCompressionSegmentState segment_state; }; unique_ptr ZSTDStorage::InitCompression(ColumnDataCheckpointData &checkpoint_data, @@ -637,7 +649,7 @@ void ZSTDStorage::Compress(CompressionState &state_p, Vector &input, idx_t count for (idx_t i = 0; i < count; i++) { auto idx = vdata.sel->get_index(i); // Note: we treat nulls and empty strings the same - if (!vdata.validity.RowIsValid(idx) || data[idx].GetSize() == 0) { + if (!vdata.validity.RowIsValid(idx)) { state.AddNull(); continue; }