From a2f6e6eb81f385daf4e534eded6668d017a21583 Mon Sep 17 00:00:00 2001 From: "lisizhuo.lsz" Date: Tue, 26 May 2026 10:12:31 +0000 Subject: [PATCH 1/5] feat: introduce binary row format with reader/writer support --- .../common/data/abstract_binary_writer.cpp | 222 +++++++ .../common/data/abstract_binary_writer.h | 104 ++++ .../common/data/binary_data_read_utils.h | 129 ++++ src/paimon/common/data/binary_row.cpp | 311 ++++++++++ src/paimon/common/data/binary_row.h | 169 ++++++ src/paimon/common/data/binary_row_test.cpp | 563 ++++++++++++++++++ src/paimon/common/data/binary_row_writer.cpp | 181 ++++++ src/paimon/common/data/binary_row_writer.h | 125 ++++ .../common/data/binary_row_writer_test.cpp | 318 ++++++++++ src/paimon/common/data/binary_writer.h | 85 +++ 10 files changed, 2207 insertions(+) create mode 100644 src/paimon/common/data/abstract_binary_writer.cpp create mode 100644 src/paimon/common/data/abstract_binary_writer.h create mode 100644 src/paimon/common/data/binary_data_read_utils.h create mode 100644 src/paimon/common/data/binary_row.cpp create mode 100644 src/paimon/common/data/binary_row.h create mode 100644 src/paimon/common/data/binary_row_test.cpp create mode 100644 src/paimon/common/data/binary_row_writer.cpp create mode 100644 src/paimon/common/data/binary_row_writer.h create mode 100644 src/paimon/common/data/binary_row_writer_test.cpp create mode 100644 src/paimon/common/data/binary_writer.h diff --git a/src/paimon/common/data/abstract_binary_writer.cpp b/src/paimon/common/data/abstract_binary_writer.cpp new file mode 100644 index 0000000..1487493 --- /dev/null +++ b/src/paimon/common/data/abstract_binary_writer.cpp @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "paimon/common/data/abstract_binary_writer.h" + +#include +#include +#include + +#include "paimon/common/data/binary_array.h" +#include "paimon/common/data/binary_map.h" +#include "paimon/common/data/binary_row.h" +#include "paimon/common/data/binary_section.h" +#include "paimon/common/data/binary_string.h" +#include "paimon/common/memory/memory_segment_utils.h" +#include "paimon/data/decimal.h" +#include "paimon/data/timestamp.h" +#include "paimon/io/byte_order.h" +#include "paimon/memory/bytes.h" +#include "paimon/memory/memory_pool.h" + +namespace paimon { + +void AbstractBinaryWriter::WriteBytes(int32_t pos, const Bytes& bytes) { + int32_t len = bytes.size(); + if (len <= BinarySection::MAX_FIX_PART_DATA_SIZE) { + WriteBytesToFixLenPart(&segment_, GetFieldOffset(pos), bytes, len); + } else { + WriteBytesToVarLenPart(pos, bytes, len); + } +} + +void AbstractBinaryWriter::WriteString(int32_t pos, const BinaryString& input) { + int32_t len = input.GetSizeInBytes(); + if (len <= BinarySection::MAX_FIX_PART_DATA_SIZE) { + auto bytes = Bytes::AllocateBytes(len, pool_); + MemorySegmentUtils::CopyToBytes({input.GetSegment()}, input.GetOffset(), bytes.get(), 0, + len); + WriteBytesToFixLenPart(&segment_, GetFieldOffset(pos), *bytes, len); + } else { + WriteSegmentToVarLenPart(pos, input.GetSegment(), input.GetOffset(), len); + } +} + +void AbstractBinaryWriter::WriteBinary(int32_t pos, const Bytes& bytes) { + int32_t len = bytes.size(); + if (len <= BinarySection::MAX_FIX_PART_DATA_SIZE) { + WriteBytesToFixLenPart(&segment_, GetFieldOffset(pos), bytes, len); + } else { + WriteBytesToVarLenPart(pos, bytes, len); + } +} + +void AbstractBinaryWriter::WriteStringView(int32_t pos, const std::string_view& view) { + int32_t len = view.size(); + if (len <= BinarySection::MAX_FIX_PART_DATA_SIZE) { + WriteBytesToFixLenPart(&segment_, GetFieldOffset(pos), view, len); + } else { + WriteBytesToVarLenPart(pos, view, len); + } +} + +void AbstractBinaryWriter::WriteRow(int32_t pos, const BinaryRow& input) { + return WriteSegmentToVarLenPart(pos, input.GetSegment(), input.GetOffset(), + input.GetSizeInBytes()); +} + +void AbstractBinaryWriter::WriteArray(int32_t pos, const BinaryArray& input) { + return WriteSegmentToVarLenPart(pos, input.GetSegment(), input.GetOffset(), + input.GetSizeInBytes()); +} + +void AbstractBinaryWriter::WriteMap(int32_t pos, const BinaryMap& input) { + return WriteSegmentToVarLenPart(pos, input.GetSegment(), input.GetOffset(), + input.GetSizeInBytes()); +} +void AbstractBinaryWriter::WriteDecimal(int32_t pos, const std::optional& value, + int32_t precision) { + assert(value == std::nullopt || precision == value.value().Precision()); + if (Decimal::IsCompact(precision)) { + assert(value != std::nullopt); + WriteLong(pos, value.value().ToUnscaledLong()); + } else { + // grow the global buffer before writing data. + EnsureCapacity(16); + // zero-out 16 bytes + segment_.PutValue(cursor_, 0ll); + segment_.PutValue(cursor_ + 8, 0ll); + + // Make sure Decimal object has the same scale as DecimalType. + // Note that we may pass in null Decimal object to set null for it. + if (value == std::nullopt) { + SetNullBit(pos); + SetOffsetAndSize(pos, cursor_, 0l); + } else { + auto bytes = value.value().ToUnscaledBytes(); + segment_.Put(cursor_, bytes, 0, bytes.size()); + SetOffsetAndSize(pos, cursor_, bytes.size()); + } + // move the cursor forward. + cursor_ += 16; + } +} + +void AbstractBinaryWriter::WriteTimestamp(int32_t pos, const std::optional& value, + int32_t precision) { + if (Timestamp::IsCompact(precision)) { + assert(value != std::nullopt); + WriteLong(pos, value.value().GetMillisecond()); + } else { + // store the nanoOfMillisecond in fixed-length part as offset and nanoOfMillisecond + EnsureCapacity(8); + + if (value == std::nullopt) { + SetNullBit(pos); + // zero-out the bytes + segment_.PutValue(cursor_, 0l); + SetOffsetAndSize(pos, cursor_, 0l); + } else { + segment_.PutValue(cursor_, value.value().GetMillisecond()); + SetOffsetAndSize(pos, cursor_, value.value().GetNanoOfMillisecond()); + } + cursor_ += 8; + } +} + +void AbstractBinaryWriter::ZeroOutPaddingBytes(int32_t num_bytes) { + if ((num_bytes & 0x07) > 0) { + segment_.PutValue(cursor_ + ((num_bytes >> 3) << 3), 0L); + } +} + +void AbstractBinaryWriter::EnsureCapacity(int32_t needed_size) { + const int32_t length = cursor_ + needed_size; + if (segment_.Size() < length) { + Grow(length); + } +} + +void AbstractBinaryWriter::WriteSegmentToVarLenPart(int32_t pos, const MemorySegment& segment, + int32_t offset, int32_t size) { + const int32_t rounded_size = RoundNumberOfBytesToNearestWord(size); + // grow the global buffer before writing data. + EnsureCapacity(rounded_size); + ZeroOutPaddingBytes(size); + + segment.CopyTo(offset, &segment_, cursor_, size); + SetOffsetAndSize(pos, cursor_, size); + // move the cursor forward. + cursor_ += rounded_size; +} + +template +void AbstractBinaryWriter::WriteBytesToVarLenPart(int32_t pos, const T& bytes, int32_t len) { + const int32_t rounded_size = RoundNumberOfBytesToNearestWord(len); + // grow the global buffer before writing data. + EnsureCapacity(rounded_size); + ZeroOutPaddingBytes(len); + // Write the bytes to the variable length portion. + segment_.Put(cursor_, bytes, 0, len); + SetOffsetAndSize(pos, cursor_, len); + // move the cursor forward. + cursor_ += rounded_size; +} + +void AbstractBinaryWriter::Grow(int32_t min_capacity) { + int32_t old_capacity = segment_.Size(); + int32_t new_capacity = old_capacity + (old_capacity >> 1); + if (new_capacity - min_capacity < 0) { + new_capacity = min_capacity; + } + std::shared_ptr new_bytes = + Bytes::CopyOf(*(segment_.GetOrCreateHeapMemory(pool_)), new_capacity, pool_); + segment_ = MemorySegment::Wrap(new_bytes); + AfterGrow(); +} + +int32_t AbstractBinaryWriter::RoundNumberOfBytesToNearestWord(int32_t num_bytes) { + int32_t remainder = num_bytes & 0x07; + if (remainder == 0) { + return num_bytes; + } else { + return num_bytes + (8 - remainder); + } +} +template +void AbstractBinaryWriter::WriteBytesToFixLenPart(MemorySegment* segment, int32_t field_offset, + const T& bytes, int32_t len) { + int64_t first_byte = len | 0x80; // first bit is 1, other bits is len + int64_t seven_bytes = 0L; // real data + if ((SystemByteOrder() == ByteOrder::PAIMON_LITTLE_ENDIAN)) { + for (int32_t i = 0; i < len; i++) { + seven_bytes |= ((0x00000000000000FFL & bytes[i]) << (i * 8L)); + } + } else { + for (int32_t i = 0; i < len; i++) { + seven_bytes |= ((0x00000000000000FFL & bytes[i]) << ((6 - i) * 8L)); + } + } + const int64_t offset_and_size = + (first_byte << 56) | // NOLINT(clang-analyzer-core.UndefinedBinaryOperatorResult) + seven_bytes; + segment->PutValue(field_offset, offset_and_size); +} + +} // namespace paimon diff --git a/src/paimon/common/data/abstract_binary_writer.h b/src/paimon/common/data/abstract_binary_writer.h new file mode 100644 index 0000000..7565543 --- /dev/null +++ b/src/paimon/common/data/abstract_binary_writer.h @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include + +#include "paimon/common/data/binary_writer.h" +#include "paimon/common/memory/memory_segment.h" + +namespace paimon { +class BinaryArray; +class BinaryRow; +class BinaryString; +class Bytes; +class Decimal; +class MemoryPool; +class Timestamp; + +/// Use the special format to write data to a `MemorySegment` (its capacity grows +/// automatically). If write a format binary: 1. New a writer. 2. Write each field by `WriteXXX()` +/// or `SetNullAt()`. (Variable length fields can not be written repeatedly.) 3. Invoke +/// `Complete()`. If want to reuse this writer, please invoke `Reset()` first. +class AbstractBinaryWriter : public BinaryWriter { + public: + void WriteString(int32_t pos, const BinaryString& value) override; + + void WriteBinary(int32_t pos, const Bytes& bytes) override; + + void WriteDecimal(int32_t pos, const std::optional& value, int32_t precision) override; + + void WriteTimestamp(int32_t pos, const std::optional& value, + int32_t precision) override; + + void WriteArray(int32_t pos, const BinaryArray& value) override; + + void WriteRow(int32_t pos, const BinaryRow& value) override; + + void WriteMap(int32_t pos, const BinaryMap& input) override; + + void WriteStringView(int32_t pos, const std::string_view& view) override; + + const MemorySegment& GetSegment() const { + return segment_; + } + + protected: + static int32_t RoundNumberOfBytesToNearestWord(int32_t num_bytes); + + /// Set offset and size to fix len part. + virtual void SetOffsetAndSize(int32_t pos, int32_t offset, int64_t size) = 0; + + /// Get field offset. + virtual int32_t GetFieldOffset(int32_t pos) const = 0; + + /// After grow, need point to new memory. + virtual void AfterGrow() = 0; + + virtual void SetNullBit(int32_t ordinal) = 0; + + void ZeroOutPaddingBytes(int32_t num_bytes); + void EnsureCapacity(int32_t needed_size); + + private: + /// Increases the capacity to ensure that it can hold at least the minimum capacity argument. + void Grow(int32_t min_capacity); + + void WriteBytes(int32_t pos, const Bytes& bytes); + + template + void WriteBytesToVarLenPart(int32_t pos, const T& bytes, int32_t len); + + template + static void WriteBytesToFixLenPart(MemorySegment* segment, int32_t field_offset, const T& bytes, + int32_t len); + + void WriteSegmentToVarLenPart(int32_t pos, const MemorySegment& segment, int32_t offset, + int32_t size); + + protected: + int32_t cursor_ = 0; + MemoryPool* pool_ = nullptr; + MemorySegment segment_; +}; + +} // namespace paimon diff --git a/src/paimon/common/data/binary_data_read_utils.h b/src/paimon/common/data/binary_data_read_utils.h new file mode 100644 index 0000000..e47b248 --- /dev/null +++ b/src/paimon/common/data/binary_data_read_utils.h @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include + +#include "paimon/common/data/binary_array.h" +#include "paimon/common/data/binary_map.h" +#include "paimon/common/data/binary_row.h" +#include "paimon/common/data/binary_string.h" +#include "paimon/common/data/internal_map.h" +#include "paimon/data/decimal.h" +#include "paimon/data/timestamp.h" +#include "paimon/type_fwd.h" + +namespace paimon { +/// Utils for `MemorySegment`. +class BinaryDataReadUtils { + public: + BinaryDataReadUtils() = delete; + ~BinaryDataReadUtils() = delete; + + /// Gets an instance of `Timestamp` from underlying `MemorySegment`. + /// @param segment the underlying `MemorySegment`s + /// @param base_offset the base offset of current instance of TimestampData + /// @param offset_and_nanos the offset of milli-seconds part and nanoseconds + /// @return an instance of `Timestamp` + static Timestamp ReadTimestampData(const MemorySegment& segment, int32_t base_offset, + int64_t offset_and_nanos) { + auto nano_of_millisecond = static_cast(offset_and_nanos & LOW_BYTES_MASK); + auto sub_offset = static_cast(offset_and_nanos >> 32); + auto millisecond = + MemorySegmentUtils::GetValue({segment}, base_offset + sub_offset); + return Timestamp::FromEpochMillis(millisecond, nano_of_millisecond); + } + + /// Gets an instance of `Decimal` from underlying `MemorySegment`. + static Decimal ReadDecimal(const MemorySegment& segment, int32_t base_offset, + int64_t offset_and_size, int32_t precision, int32_t scale) { + auto size = static_cast(offset_and_size & LOW_BYTES_MASK); + auto sub_offset = static_cast(offset_and_size >> 32); + auto bytes = Bytes::AllocateBytes(size, GetDefaultPool().get()); + std::memset(bytes->data(), 0, bytes->size()); + + MemorySegmentUtils::CopyToBytes({segment}, base_offset + sub_offset, bytes.get(), 0, + size); + return Decimal::FromUnscaledBytes(precision, scale, bytes.get()); + } + + /// Get binary string, if len less than 8, will be include in variable_part_offset_and_len. + /// @note Need to consider the ByteOrder. + /// @param base_offset base offset of composite binary format. + /// @param field_offset absolute start offset of variable_part_offset_and_len. + /// @param variable_part_offset_and_len a long value, real data or offset and len. + + static BinaryString ReadBinaryString(const MemorySegment& segment, int32_t base_offset, + int64_t field_offset, + int64_t variable_part_offset_and_len) { + int64_t mark = variable_part_offset_and_len & BinaryString::HIGHEST_FIRST_BIT; + if (mark == 0) { + const auto sub_offset = static_cast(variable_part_offset_and_len >> 32); + const auto len = static_cast(variable_part_offset_and_len & LOW_BYTES_MASK); + return BinaryString::FromAddress(segment, base_offset + sub_offset, len); + } else { + auto len = static_cast( + (static_cast(variable_part_offset_and_len & + BinaryString::HIGHEST_SECOND_TO_EIGHTH_BIT)) >> + 56); + if (SystemByteOrder() == ByteOrder::PAIMON_LITTLE_ENDIAN) { + return BinaryString::FromAddress(segment, field_offset, len); + } else { + // field_offset + 1 to skip header. + return BinaryString::FromAddress(segment, field_offset + 1, len); + } + } + } + + static std::shared_ptr ReadArrayData(const MemorySegment& segment, + int32_t base_offset, + int64_t offset_and_size) { + auto size = static_cast(offset_and_size & LOW_BYTES_MASK); + auto offset = static_cast(offset_and_size >> 32); + auto binary_array = std::make_shared(); + binary_array->PointTo(segment, offset + base_offset, size); + return binary_array; + } + + /// Gets an instance of `InternalRow` from underlying `MemorySegment`. + static std::shared_ptr ReadRowData(const MemorySegment& segment, + int32_t num_fields, int32_t base_offset, + int64_t offset_and_size) { + auto size = static_cast(offset_and_size & LOW_BYTES_MASK); + auto offset = static_cast(offset_and_size >> 32); + auto row = std::make_shared(num_fields); + row->PointTo(segment, offset + base_offset, size); + return row; + } + + static std::shared_ptr ReadMapData(const MemorySegment& segment, + int32_t base_offset, int64_t offset_and_size) { + auto size = static_cast(offset_and_size & LOW_BYTES_MASK); + auto offset = static_cast(offset_and_size >> 32); + auto binary_map = std::make_shared(); + binary_map->PointTo(segment, offset + base_offset, size); + return binary_map; + } + + private: + static constexpr uint64_t LOW_BYTES_MASK = 0xFFFFFFFF; +}; +} // namespace paimon diff --git a/src/paimon/common/data/binary_row.cpp b/src/paimon/common/data/binary_row.cpp new file mode 100644 index 0000000..563f71f --- /dev/null +++ b/src/paimon/common/data/binary_row.cpp @@ -0,0 +1,311 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "paimon/common/data/binary_row.h" + +#include + +#include "fmt/format.h" +#include "paimon/common/data/binary_data_read_utils.h" +#include "paimon/common/memory/memory_segment.h" +#include "paimon/common/memory/memory_segment_utils.h" +#include "paimon/io/byte_order.h" +#include "paimon/memory/bytes.h" +#include "paimon/memory/memory_pool.h" + +namespace paimon { +const int64_t BinaryRow::FIRST_BYTE_ZERO = + (SystemByteOrder() == ByteOrder::PAIMON_LITTLE_ENDIAN) ? (~0xFFL) : (~(0xFFL << 56L)); + +const BinaryRow& BinaryRow::EmptyRow() { + static const BinaryRow empty_row = GetEmptyRow(); + return empty_row; +} + +BinaryRow::BinaryRow(int32_t arity) + : arity_(arity), null_bits_size_in_bytes_(CalculateBitSetWidthInBytes(arity)) { + assert(arity_ >= 0); +} + +int32_t BinaryRow::CalculateBitSetWidthInBytes(int32_t arity) { + return ((arity + 63 + HEADER_SIZE_IN_BITS) / 64) * 8; +} + +int32_t BinaryRow::CalculateFixPartSizeInBytes(int32_t arity) { + return CalculateBitSetWidthInBytes(arity) + 8 * arity; +} + +int32_t BinaryRow::GetFieldOffset(int32_t pos) const { + return offset_ + null_bits_size_in_bytes_ + pos * 8; +} + +void BinaryRow::AssertIndexIsValid(int32_t index) const { + assert(index >= 0); + assert(index < arity_); +} + +int32_t BinaryRow::GetFixedLengthPartSize() const { + return null_bits_size_in_bytes_ + 8 * arity_; +} + +BinaryRow BinaryRow::GetEmptyRow() { + BinaryRow row(0); + int32_t size = row.GetFixedLengthPartSize(); + auto bytes = Bytes::AllocateBytes(size, GetDefaultPool().get()); + row.PointTo(MemorySegment::Wrap(std::move(bytes)), 0, size); + return row; +} + +Result BinaryRow::GetRowKind() const { + char kind_value = MemorySegmentUtils::GetValue({segment_}, offset_); + return RowKind::FromByteValue(kind_value); +} + +void BinaryRow::SetRowKind(const RowKind* kind) { + segment_.PutValue(offset_, kind->ToByteValue()); +} + +void BinaryRow::SetTotalSize(int32_t size_in_bytes) { + size_in_bytes_ = size_in_bytes; +} + +bool BinaryRow::IsNullAt(int32_t pos) const { + AssertIndexIsValid(pos); + return MemorySegmentUtils::BitGet({segment_}, offset_, pos + HEADER_SIZE_IN_BITS); +} + +void BinaryRow::SetNotNullAt(int32_t i) { + AssertIndexIsValid(i); + MemorySegmentUtils::BitUnSet(&segment_, offset_, i + HEADER_SIZE_IN_BITS); +} + +void BinaryRow::SetNullAt(int32_t i) { + AssertIndexIsValid(i); + MemorySegmentUtils::BitSet(&segment_, offset_, i + HEADER_SIZE_IN_BITS); + // We must set the fixed length part zero. + // 1.Only int/long/boolean...(Fix length type) will invoke this SetNullAt. + // 2.Set to zero in order to equals and hash operation bytes calculation. + segment_.PutValue(GetFieldOffset(i), 0); +} + +void BinaryRow::SetInt(int32_t pos, int32_t value) { + AssertIndexIsValid(pos); + SetNotNullAt(pos); + segment_.PutValue(GetFieldOffset(pos), value); +} + +void BinaryRow::SetLong(int32_t pos, int64_t value) { + AssertIndexIsValid(pos); + SetNotNullAt(pos); + segment_.PutValue(GetFieldOffset(pos), value); +} + +void BinaryRow::SetDouble(int32_t pos, double value) { + AssertIndexIsValid(pos); + SetNotNullAt(pos); + segment_.PutValue(GetFieldOffset(pos), value); +} + +void BinaryRow::SetBoolean(int32_t pos, bool value) { + AssertIndexIsValid(pos); + SetNotNullAt(pos); + segment_.PutValue(GetFieldOffset(pos), value); +} + +void BinaryRow::SetShort(int32_t pos, int16_t value) { + AssertIndexIsValid(pos); + SetNotNullAt(pos); + segment_.PutValue(GetFieldOffset(pos), value); +} + +void BinaryRow::SetByte(int32_t pos, char value) { + AssertIndexIsValid(pos); + SetNotNullAt(pos); + segment_.PutValue(GetFieldOffset(pos), value); +} + +void BinaryRow::SetFloat(int32_t pos, float value) { + AssertIndexIsValid(pos); + SetNotNullAt(pos); + segment_.PutValue(GetFieldOffset(pos), value); +} + +bool BinaryRow::GetBoolean(int32_t pos) const { + AssertIndexIsValid(pos); + return MemorySegmentUtils::GetValue({segment_}, GetFieldOffset(pos)); +} + +char BinaryRow::GetByte(int32_t pos) const { + AssertIndexIsValid(pos); + return MemorySegmentUtils::GetValue({segment_}, GetFieldOffset(pos)); +} + +int16_t BinaryRow::GetShort(int32_t pos) const { + AssertIndexIsValid(pos); + return MemorySegmentUtils::GetValue({segment_}, GetFieldOffset(pos)); +} + +int32_t BinaryRow::GetInt(int32_t pos) const { + AssertIndexIsValid(pos); + return MemorySegmentUtils::GetValue({segment_}, GetFieldOffset(pos)); +} + +int32_t BinaryRow::GetDate(int32_t pos) const { + return GetInt(pos); +} + +int64_t BinaryRow::GetLong(int32_t pos) const { + AssertIndexIsValid(pos); + return MemorySegmentUtils::GetValue({segment_}, GetFieldOffset(pos)); +} + +float BinaryRow::GetFloat(int32_t pos) const { + AssertIndexIsValid(pos); + return MemorySegmentUtils::GetValue({segment_}, GetFieldOffset(pos)); +} + +double BinaryRow::GetDouble(int32_t pos) const { + AssertIndexIsValid(pos); + return MemorySegmentUtils::GetValue({segment_}, GetFieldOffset(pos)); +} + +BinaryString BinaryRow::GetString(int32_t pos) const { + AssertIndexIsValid(pos); + int32_t field_offset = GetFieldOffset(pos); + const auto offset_and_len = MemorySegmentUtils::GetValue({segment_}, field_offset); + return BinaryDataReadUtils::ReadBinaryString(segment_, offset_, field_offset, offset_and_len); +} + +std::string_view BinaryRow::GetStringView(int32_t pos) const { + BinaryString str = GetString(pos); + return str.GetStringView(); +} + +Decimal BinaryRow::GetDecimal(int32_t pos, int32_t precision, int32_t scale) const { + AssertIndexIsValid(pos); + int32_t field_offset = GetFieldOffset(pos); + if (Decimal::IsCompact(precision)) { + return Decimal::FromUnscaledLong( + MemorySegmentUtils::GetValue({segment_}, field_offset), precision, scale); + } + const auto offset_and_size = MemorySegmentUtils::GetValue({segment_}, field_offset); + return BinaryDataReadUtils::ReadDecimal(segment_, offset_, offset_and_size, precision, scale); +} + +Timestamp BinaryRow::GetTimestamp(int32_t pos, int32_t precision) const { + AssertIndexIsValid(pos); + int32_t field_offset = GetFieldOffset(pos); + if (Timestamp::IsCompact(precision)) { + return Timestamp::FromEpochMillis( + MemorySegmentUtils::GetValue({segment_}, field_offset)); + } + const auto offset_and_nano_of_milli = + MemorySegmentUtils::GetValue({segment_}, field_offset); + return BinaryDataReadUtils::ReadTimestampData(segment_, offset_, offset_and_nano_of_milli); +} + +std::shared_ptr BinaryRow::GetBinary(int32_t pos) const { + AssertIndexIsValid(pos); + int32_t field_offset = GetFieldOffset(pos); + const auto offset_and_len = MemorySegmentUtils::GetValue({segment_}, field_offset); + return BinarySection::ReadBinary(segment_, offset_, field_offset, offset_and_len, + GetDefaultPool().get()); +} + +std::shared_ptr BinaryRow::GetArray(int32_t pos) const { + AssertIndexIsValid(pos); + return BinaryDataReadUtils::ReadArrayData(segment_, offset_, GetLong(pos)); +} + +std::shared_ptr BinaryRow::GetMap(int32_t pos) const { + AssertIndexIsValid(pos); + return BinaryDataReadUtils::ReadMapData(segment_, offset_, GetLong(pos)); +} + +std::shared_ptr BinaryRow::GetRow(int32_t pos, int32_t num_fields) const { + AssertIndexIsValid(pos); + return BinaryDataReadUtils::ReadRowData(segment_, num_fields, offset_, GetLong(pos)); +} + +/// The bit is 1 when the field is null. Default is 0. +bool BinaryRow::AnyNull() const { + // Skip the header. + if ((MemorySegmentUtils::GetValue({segment_}, offset_) & FIRST_BYTE_ZERO) != 0) { + return true; + } + for (int32_t i = 8; i < null_bits_size_in_bytes_; i += 8) { + if (MemorySegmentUtils::GetValue({segment_}, offset_ + i) != 0) { + return true; + } + } + return false; +} + +bool BinaryRow::AnyNull(const std::vector& fields) const { + for (int32_t field : fields) { + if (IsNullAt(field)) { + return true; + } + } + return false; +} + +BinaryRow BinaryRow::Copy(MemoryPool* pool) const { + BinaryRow row(arity_); + Copy(&row, pool); + return row; +} + +void BinaryRow::Copy(BinaryRow* reuse, MemoryPool* pool) const { + CopyInternal(reuse, pool); +} + +void BinaryRow::CopyInternal(BinaryRow* reuse, MemoryPool* pool) const { + std::shared_ptr bytes = + MemorySegmentUtils::CopyToBytes({segment_}, offset_, size_in_bytes_, pool); + reuse->PointTo(MemorySegment::Wrap(bytes), 0, size_in_bytes_); +} + +void BinaryRow::Clear() { + segment_ = MemorySegment(); + offset_ = 0; + size_in_bytes_ = 0; +} + +bool BinaryRow::operator==(const BinaryRow& other) const { + if (this == &other) { + return true; + } + return size_in_bytes_ == other.size_in_bytes_ && + MemorySegmentUtils::Equals({segment_}, offset_, {other.segment_}, other.offset_, + size_in_bytes_); +} + +int32_t BinaryRow::HashCode() const { + if (size_in_bytes_ == 0) { + return 0; + } + return MemorySegmentUtils::HashByWords({segment_}, offset_, size_in_bytes_, nullptr); +} + +std::string BinaryRow::ToString() const { + return fmt::format("BinaryRow@{:#x}", static_cast(HashCode())); +} + +} // namespace paimon diff --git a/src/paimon/common/data/binary_row.h b/src/paimon/common/data/binary_row.h new file mode 100644 index 0000000..a864063 --- /dev/null +++ b/src/paimon/common/data/binary_row.h @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "paimon/common/data/binary_section.h" +#include "paimon/common/data/binary_string.h" +#include "paimon/common/data/data_setters.h" +#include "paimon/common/data/internal_array.h" +#include "paimon/common/data/internal_map.h" +#include "paimon/common/data/internal_row.h" +#include "paimon/common/types/row_kind.h" +#include "paimon/common/utils/murmurhash_utils.h" +#include "paimon/data/decimal.h" +#include "paimon/data/timestamp.h" +#include "paimon/result.h" + +namespace paimon { +class Bytes; +class MemoryPool; + +/// An implementation of `InternalRow` which is backed by a single `MemorySegment`. +/// A Row has two parts: Fixed-length part and variable-length part. +/// +/// Fixed-length part contains 1 byte header and null bit set and field values. Null bit +/// set is used for null tracking and is aligned to 8-byte word boundaries. Field values +/// hold fixed-length primitive types and variable-length values which can be stored in 8 +/// bytes inside. If it does not fit the variable-length field, then store the length and +/// offset of variable-length part. +/// +/// @note: Unlike the Java implementation where variable-length data may span multiple +/// MemorySegments, in this C++ implementation both the fixed-length part and the +/// variable-length part reside within a single MemorySegment. +class BinaryRow final : public BinarySection, public InternalRow, public DataSetters { + public: + BinaryRow() : BinaryRow(0) {} + explicit BinaryRow(int32_t arity); + + static constexpr int32_t HEADER_SIZE_IN_BITS = 8; + static const BinaryRow& EmptyRow(); + static int32_t CalculateBitSetWidthInBytes(int32_t arity); + static int32_t CalculateFixPartSizeInBytes(int32_t arity); + + int32_t GetFixedLengthPartSize() const; + int32_t GetFieldCount() const override { + return arity_; + } + Result GetRowKind() const override; + + void SetRowKind(const RowKind* kind) override; + void SetTotalSize(int32_t size_in_bytes); + bool IsNullAt(int32_t pos) const override; + void SetNullAt(int32_t i) override; + + void SetByte(int32_t pos, char value) override; + void SetBoolean(int32_t pos, bool value) override; + void SetShort(int32_t pos, int16_t value) override; + void SetInt(int32_t pos, int32_t value) override; + void SetLong(int32_t pos, int64_t value) override; + void SetFloat(int32_t pos, float value) override; + void SetDouble(int32_t pos, double value) override; + + char GetByte(int32_t pos) const override; + bool GetBoolean(int32_t pos) const override; + int16_t GetShort(int32_t pos) const override; + int32_t GetInt(int32_t pos) const override; + int32_t GetDate(int32_t pos) const override; + int64_t GetLong(int32_t pos) const override; + float GetFloat(int32_t pos) const override; + double GetDouble(int32_t pos) const override; + BinaryString GetString(int32_t pos) const override; + std::string_view GetStringView(int32_t pos) const override; + + Decimal GetDecimal(int32_t pos, int32_t precision, int32_t scale) const override; + Timestamp GetTimestamp(int32_t pos, int32_t precision) const override; + + std::shared_ptr GetBinary(int32_t pos) const override; + std::shared_ptr GetArray(int32_t pos) const override; + std::shared_ptr GetMap(int32_t pos) const override; + std::shared_ptr GetRow(int32_t pos, int32_t num_fields) const override; + /// The bit is 1 when the field is null. Default is 0. + bool AnyNull() const; + bool AnyNull(const std::vector& fields) const; + BinaryRow Copy(MemoryPool* pool) const; + void Copy(BinaryRow* reuse, MemoryPool* pool) const; + void Clear(); + bool operator==(const BinaryRow& other) const; + // TODO(liancheng.lsz): single column to be implemented + + std::string ToString() const override; + + int32_t HashCode() const override; + + private: + static BinaryRow GetEmptyRow(); + int32_t GetFieldOffset(int32_t pos) const; + void AssertIndexIsValid(int32_t index) const; + void SetNotNullAt(int32_t i); + void CopyInternal(BinaryRow* reuse, MemoryPool* pool) const; + + static const int64_t FIRST_BYTE_ZERO; + + private: + int32_t arity_; + int32_t null_bits_size_in_bytes_; +}; + +} // namespace paimon + +namespace std { +/// for std::unordered_map> +template <> +struct hash> { + size_t operator()(const std::pair& partition_bucket) const { + const auto& [partition, bucket] = partition_bucket; + return paimon::MurmurHashUtils::HashUnsafeBytes(reinterpret_cast(&bucket), 0, + sizeof(bucket), partition.HashCode()); + } +}; + +/// for std::unordered_map, ...> +template <> +struct hash> { + size_t operator()( + const std::tuple& partition_bucket_type) const { + const auto& [partition, bucket, index_type] = partition_bucket_type; + size_t hash = paimon::MurmurHashUtils::HashUnsafeBytes( + reinterpret_cast(&bucket), 0, sizeof(bucket), partition.HashCode()); + return paimon::MurmurHashUtils::HashUnsafeBytes(index_type.data(), 0, index_type.size(), + hash); + } +}; + +template <> +struct hash { + size_t operator()(const paimon::BinaryRow& row) const { + return row.HashCode(); + } +}; + +} // namespace std diff --git a/src/paimon/common/data/binary_row_test.cpp b/src/paimon/common/data/binary_row_test.cpp new file mode 100644 index 0000000..17ee333 --- /dev/null +++ b/src/paimon/common/data/binary_row_test.cpp @@ -0,0 +1,563 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "paimon/common/data/binary_row.h" + +#include +#include +#include +#include +#include +#include + +#include "gtest/gtest.h" +#include "paimon/common/data/binary_row_writer.h" +#include "paimon/common/data/serializer/binary_row_serializer.h" +#include "paimon/common/io/memory_segment_output_stream.h" +#include "paimon/common/memory/memory_segment.h" +#include "paimon/common/memory/memory_segment_utils.h" +#include "paimon/common/utils/date_time_utils.h" +#include "paimon/common/utils/decimal_utils.h" +#include "paimon/common/utils/serialization_utils.h" +#include "paimon/io/byte_array_input_stream.h" +#include "paimon/io/data_input_stream.h" +#include "paimon/memory/bytes.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/status.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { +class BinaryRowTest : public testing::Test { + private: + void AssertTestWriterRow(const BinaryRow& row) { + ASSERT_EQ(row.GetString(0).ToString(), "1"); + ASSERT_EQ(row.GetInt(8), 88); + ASSERT_EQ(row.GetShort(11), static_cast(292)); + ASSERT_EQ(row.GetLong(10), 284); + ASSERT_EQ(row.GetByte(2), static_cast(99)); + ASSERT_EQ(row.GetDouble(6), static_cast(87.1)); + ASSERT_EQ(row.GetFloat(7), 26.1f); + ASSERT_TRUE(row.GetBoolean(1)); + ASSERT_EQ(row.GetString(3).ToString(), "1234567"); + ASSERT_EQ(row.GetString(5).ToString(), "12345678"); + ASSERT_EQ(row.GetString(9).ToString(), "啦啦啦啦啦我是快乐的粉刷匠"); + ASSERT_EQ(row.GetString(9).HashCode(), + BinaryString::FromString("啦啦啦啦啦我是快乐的粉刷匠", GetDefaultPool().get()) + .HashCode()); + ASSERT_TRUE(row.IsNullAt(12)); + } +}; + +TEST_F(BinaryRowTest, TestBasic) { + // consider header 1 byte. + ASSERT_EQ(BinaryRow(0).GetFixedLengthPartSize(), 8); + ASSERT_EQ(BinaryRow(1).GetFixedLengthPartSize(), 16); + ASSERT_EQ(BinaryRow(65).GetFixedLengthPartSize(), 536); + ASSERT_EQ(BinaryRow(128).GetFixedLengthPartSize(), 1048); + + auto pool = GetDefaultPool(); + std::shared_ptr bytes = Bytes::AllocateBytes(100, pool.get()); + MemorySegment segment = MemorySegment::Wrap(bytes); + BinaryRow row(2); + row.PointTo(segment, 10, 48); + row.SetInt(0, 5); + row.SetDouble(1, 5.8); + ASSERT_EQ(5, row.GetInt(0)); + ASSERT_EQ(static_cast(5.8), row.GetDouble(1)); + + row.Clear(); + ASSERT_EQ(0, row.HashCode()); + std::shared_ptr bytes1 = Bytes::AllocateBytes(100, pool.get()); + MemorySegment segment1 = MemorySegment::Wrap(bytes1); + row.PointTo(segment1, 0, 20); + row.SetInt(0, 5); + ASSERT_EQ(5, row.GetInt(0)); +} + +TEST_F(BinaryRowTest, TestSetAndGet) { + auto pool = GetDefaultPool(); + std::shared_ptr bytes = Bytes::AllocateBytes(100, pool.get()); + MemorySegment segment = MemorySegment::Wrap(bytes); + BinaryRow row(9); + row.PointTo(segment, 20, 80); + row.SetNullAt(0); + row.SetInt(1, 11); + row.SetLong(2, 22); + row.SetDouble(3, 33); + row.SetBoolean(4, true); + row.SetShort(5, static_cast(55)); + row.SetByte(6, static_cast(66)); + row.SetFloat(7, static_cast(77)); + + ASSERT_EQ(row.GetInt(1), 11); + ASSERT_TRUE(row.IsNullAt(0)); + ASSERT_EQ(row.GetShort(5), static_cast(55)); + ASSERT_EQ(row.GetLong(2), 22L); + ASSERT_TRUE(row.GetBoolean(4)); + ASSERT_EQ(row.GetByte(6), static_cast(66)); + ASSERT_EQ(row.GetFloat(7), static_cast(77)); + ASSERT_EQ(row.GetDouble(3), static_cast(33)); +} + +TEST_F(BinaryRowTest, TestHeaderSize) { + ASSERT_EQ(BinaryRow::CalculateBitSetWidthInBytes(56), 8); + ASSERT_EQ(BinaryRow::CalculateBitSetWidthInBytes(57), 16); + ASSERT_EQ(BinaryRow::CalculateBitSetWidthInBytes(120), 16); + ASSERT_EQ(BinaryRow::CalculateBitSetWidthInBytes(121), 24); +} + +TEST_F(BinaryRowTest, TestWriter) { + auto pool = GetDefaultPool(); + int32_t arity = 13; + BinaryRow row(arity); + BinaryRowWriter writer(&row, 20, pool.get()); + writer.WriteString(0, BinaryString::FromString("1", pool.get())); + writer.WriteString(3, BinaryString::FromString("1234567", pool.get())); + writer.WriteString(5, BinaryString::FromString("12345678", pool.get())); + writer.WriteString(9, BinaryString::FromString("啦啦啦啦啦我是快乐的粉刷匠", pool.get())); + + writer.WriteBoolean(1, true); + writer.WriteByte(2, static_cast(99)); + writer.WriteDouble(6, 87.1); + writer.WriteFloat(7, 26.1f); + writer.WriteInt(8, 88); + writer.WriteLong(10, 284); + writer.WriteShort(11, static_cast(292)); + writer.SetNullAt(12); + + writer.Complete(); + + AssertTestWriterRow(row); + + // test copy + auto copied1 = row.Copy(pool.get()); + AssertTestWriterRow(copied1); + ASSERT_EQ(row, copied1); + + // test copy + BinaryRow copied2(arity); + row.Copy(&copied2, pool.get()); + AssertTestWriterRow(copied2); + ASSERT_EQ(row, copied2); +} + +TEST_F(BinaryRowTest, TestWriteString) { + auto pool = GetDefaultPool(); + { + // little byte[] + BinaryRow row(1); + BinaryRowWriter writer(&row, 0, pool.get()); + char chars[2]; + chars[0] = static_cast(0xFFFF); + chars[1] = 0; + writer.WriteString(0, BinaryString::FromString(std::string(chars, 2), pool.get())); + writer.Complete(); + std::string str = row.GetString(0).ToString(); + ASSERT_EQ(2, str.length()); + ASSERT_EQ(str[0], chars[0]); + ASSERT_EQ(str[1], chars[1]); + } + { + // big byte[] + std::string str = "啦啦啦啦啦我是快乐的粉刷匠"; + BinaryRow row(2); + BinaryRowWriter writer(&row, 0, pool.get()); + writer.WriteString(0, BinaryString::FromString(str, pool.get())); + writer.WriteString(1, BinaryString::FromBytes(Bytes::AllocateBytes(str, pool.get()))); + writer.Complete(); + + ASSERT_EQ(row.GetString(0).ToString(), str); + ASSERT_EQ(row.GetString(1).ToString(), str); + } +} + +TEST_F(BinaryRowTest, TestWriteBytes) { + auto pool = GetDefaultPool(); + { + BinaryRow row(1); + BinaryRowWriter writer(&row, 0, pool.get()); + std::string str = "啦啦啦啦啦我是快乐的粉刷匠"; + Bytes bytes(str, pool.get()); + writer.WriteBytes(0, bytes); + writer.Complete(); + ASSERT_EQ(row.GetString(0).ToString(), str); + } + { + BinaryRow row(1); + BinaryRowWriter writer(&row, 0, pool.get()); + std::string str = "啦"; + Bytes bytes(str, pool.get()); + writer.WriteBytes(0, bytes); + writer.Complete(); + ASSERT_EQ(row.GetString(0).ToString(), str); + } +} + +TEST_F(BinaryRowTest, TestReuseWriter) { + auto pool = GetDefaultPool(); + BinaryRow row(2); + BinaryRowWriter writer(&row, 0, pool.get()); + writer.WriteString(0, BinaryString::FromString("01234567", pool.get())); + writer.WriteString(1, BinaryString::FromString("012345678", pool.get())); + writer.Complete(); + ASSERT_EQ(row.GetString(0).ToString(), "01234567"); + ASSERT_EQ(row.GetString(1).ToString(), "012345678"); + + writer.Reset(); + writer.WriteString(0, BinaryString::FromString("1", pool.get())); + writer.WriteString(1, BinaryString::FromString("0123456789", pool.get())); + writer.Complete(); + ASSERT_EQ(row.GetString(0).ToString(), "1"); + ASSERT_EQ(row.GetString(1).ToString(), "0123456789"); +} + +TEST_F(BinaryRowTest, TestAnyNull) { + auto pool = GetDefaultPool(); + { + BinaryRow row(3); + BinaryRowWriter writer(&row, 0, pool.get()); + + ASSERT_FALSE(row.AnyNull()); + + // test header should not compute by anyNull + row.SetRowKind(RowKind::UpdateBefore()); + ASSERT_FALSE(row.AnyNull()); + + writer.SetNullAt(2); + ASSERT_TRUE(row.AnyNull()); + + writer.SetNullAt(0); + ASSERT_TRUE(row.AnyNull({0, 1, 2})); + ASSERT_FALSE(row.AnyNull({1})); + + writer.SetNullAt(1); + ASSERT_TRUE(row.AnyNull()); + } + + int32_t num_fields = 80; + for (int32_t i = 0; i < num_fields; i++) { + BinaryRow row(num_fields); + BinaryRowWriter writer(&row, 0, pool.get()); + row.SetRowKind(RowKind::Delete()); + ASSERT_FALSE(row.AnyNull()); + writer.SetNullAt(i); + ASSERT_TRUE(row.AnyNull()); + } +} + +TEST_F(BinaryRowTest, TestSingleSegmentBinaryRowHashCode) { + auto pool = GetDefaultPool(); + int64_t seed = DateTimeUtils::GetCurrentUTCTimeUs(); + std::srand(seed); + // test hash stabilization + BinaryRow row(13); + BinaryRowWriter writer(&row, 0, pool.get()); + for (int32_t i = 0; i < 99; i++) { + writer.Reset(); + writer.WriteString( + 0, BinaryString::FromString("" + std::to_string(static_cast(std::rand())), + pool.get())); + writer.WriteString(3, BinaryString::FromString("01234567", pool.get())); + writer.WriteString(5, BinaryString::FromString("012345678", pool.get())); + writer.WriteString(9, BinaryString::FromString("啦啦啦啦啦我是快乐的粉刷匠", pool.get())); + writer.WriteBoolean(1, true); + writer.WriteByte(2, static_cast(99)); + writer.WriteDouble(6, 87.1); + writer.WriteFloat(7, 26.1f); + writer.WriteInt(8, 88); + writer.WriteLong(10, 284); + writer.WriteShort(11, static_cast(292)); + writer.SetNullAt(12); + writer.Complete(); + BinaryRow copy = row.Copy(pool.get()); + ASSERT_EQ(copy.HashCode(), row.HashCode()) << "seed: " << seed << ", idx: " << i; + } + + // test hash distribution + int32_t count = 999999; + std::set hash_codes; + for (int32_t i = 0; i < count; i++) { + row.SetInt(8, i); + hash_codes.insert(row.HashCode()); + } + ASSERT_EQ(hash_codes.size(), count); + hash_codes.clear(); + BinaryRow row2(1); + BinaryRowWriter writer2(&row2, 0, pool.get()); + for (int32_t i = 0; i < count; i++) { + writer2.Reset(); + writer2.WriteString(0, BinaryString::FromString( + "啦啦啦啦啦我是快乐的粉刷匠" + std::to_string(i), pool.get())); + writer2.Complete(); + hash_codes.insert(row2.HashCode()); + } + ASSERT_GT(hash_codes.size(), static_cast(count * 0.997)); +} + +TEST_F(BinaryRowTest, TestHeader) { + auto pool = GetDefaultPool(); + BinaryRow row(2); + BinaryRowWriter writer(&row, 0, pool.get()); + + writer.WriteInt(0, 10); + writer.SetNullAt(1); + writer.WriteRowKind(RowKind::UpdateBefore()); + writer.Complete(); + + BinaryRow new_row = row.Copy(pool.get()); + ASSERT_EQ(new_row, row); + ASSERT_EQ(new_row.GetRowKind().value(), RowKind::UpdateBefore()); + + new_row.SetRowKind(RowKind::Delete()); + ASSERT_EQ(new_row.GetRowKind().value(), RowKind::Delete()); +} + +TEST_F(BinaryRowTest, TestDefaultRowKind) { + auto pool = GetDefaultPool(); + BinaryRow row(1); + BinaryRowWriter writer(&row, 0, pool.get()); + writer.WriteInt(0, 10); + writer.Complete(); + ASSERT_EQ(row.GetRowKind().value(), RowKind::Insert()); +} + +TEST_F(BinaryRowTest, TestBinary) { + auto pool = GetDefaultPool(); + BinaryRow row(2); + BinaryRowWriter writer(&row, 0, pool.get()); + char chars1[3] = {1, -1, 5}; + char chars2[8] = {1, -1, 5, 5, 1, 5, 1, 5}; + std::string str1(chars1, 3); + std::string str2(chars2, 8); + Bytes bytes1(str1, pool.get()); + Bytes bytes2(str2, pool.get()); + + writer.WriteBinary(0, bytes1); + writer.WriteBinary(1, bytes2); + writer.Complete(); + + ASSERT_EQ(*row.GetBinary(0), bytes1); + ASSERT_EQ(*row.GetBinary(1), bytes2); +} + +TEST_F(BinaryRowTest, TestCompatibleWithJava) { + auto pool = GetDefaultPool(); + { + int32_t arity = 1; + BinaryRow row(arity); + BinaryRowWriter writer(&row, 0, pool.get()); + writer.WriteString(0, BinaryString::FromString("Alice2", pool.get())); + writer.Complete(); + + ASSERT_EQ(row.GetString(0).ToString(), "Alice2"); + auto bytes = SerializationUtils::SerializeBinaryRow(row, pool.get()); + + std::string bytes_view(bytes->data(), bytes->size()); + std::vector expect_bytes = {0, 0, 0, 1, 0, 0, 0, 0, 0, 0, + 0, 0, 65, 108, 105, 99, 101, 50, 0, 134}; + std::string expect_view(reinterpret_cast(expect_bytes.data()), expect_bytes.size()); + ASSERT_EQ(bytes_view, expect_view); + + ASSERT_OK_AND_ASSIGN(auto de_row, SerializationUtils::DeserializeBinaryRow(bytes)); + ASSERT_EQ(1, de_row.GetFieldCount()); + ASSERT_EQ(de_row.GetString(0).ToString(), "Alice2"); + } + { + int32_t arity = 1; + BinaryRow row(arity); + BinaryRowWriter writer(&row, 0, pool.get()); + writer.WriteInt(0, 18); + writer.Complete(); + + ASSERT_EQ(row.GetInt(0), 18); + + auto bytes = SerializationUtils::SerializeBinaryRow(row, pool.get()); + std::string bytes_view(bytes->data(), bytes->size()); + std::vector expect_bytes = {0, 0, 0, 1, 0, 0, 0, 0, 0, 0, + 0, 0, 18, 0, 0, 0, 0, 0, 0, 0}; + std::string expect_view(reinterpret_cast(expect_bytes.data()), expect_bytes.size()); + ASSERT_EQ(bytes_view, expect_view); + ASSERT_OK_AND_ASSIGN(auto de_row, SerializationUtils::DeserializeBinaryRow(bytes)); + ASSERT_EQ(1, de_row.GetFieldCount()); + ASSERT_EQ(de_row.GetInt(0), 18); + } +} + +TEST_F(BinaryRowTest, TestZeroOutPaddingString) { + srand(static_cast(time(nullptr))); + auto pool = GetDefaultPool(); + int32_t bytes_size = 1024; + std::shared_ptr bytes = Bytes::AllocateBytes(1024, pool.get()); + + BinaryRow row(1); + BinaryRowWriter writer(&row, 0, pool.get()); + + writer.Reset(); + for (int32_t i = 0; i < bytes_size; i++) { + (*bytes)[i] = paimon::test::RandomNumber(0, 255); + } + writer.WriteBinary(0, *bytes); + writer.Reset(); + writer.WriteString(0, BinaryString::FromString("wahahah", pool.get())); + writer.Complete(); + int32_t hash1 = row.HashCode(); + + writer.Reset(); + for (int32_t i = 0; i < bytes_size; i++) { + (*bytes)[i] = paimon::test::RandomNumber(0, 255); + } + writer.WriteBinary(0, *bytes); + writer.Reset(); + writer.WriteString(0, BinaryString::FromString("wahahah", pool.get())); + writer.Complete(); + int32_t hash2 = row.HashCode(); + + ASSERT_EQ(hash2, hash1); +} + +TEST_F(BinaryRowTest, TestWriteTimestampAndDecimal) { + auto pool = GetDefaultPool(); + BinaryRow row(7); + BinaryRowWriter writer(&row, 0, pool.get()); + // timestamp with millis precision, compact + Timestamp timestamp(1723535714123ll, 0); + writer.WriteTimestamp(0, timestamp, Timestamp::MILLIS_PRECISION); + // timestamp with default precision, not compact + Timestamp timestamp1(1723535713000ll, 1234); + writer.WriteTimestamp(1, timestamp1, Timestamp::DEFAULT_PRECISION); + // 1234.56, compact, precision 6, scale 2 + Decimal decimal(6, 2, 123456); + writer.WriteDecimal(2, decimal, 6); + + // 123456789987654321.45678, not compact, precision 23, scale 5 + Decimal decimal3(23, 5, DecimalUtils::StrToInt128("12345678998765432145678").value()); + writer.WriteDecimal(3, decimal3, 23); + + // 0, not compact, precision 27, scale 4 + Decimal decimal4(27, 4, 0); + writer.WriteDecimal(4, decimal4, 27); + // -1234.56, compact, precision 6, scale 2 + Decimal decimal5(6, 2, -123456); + writer.WriteDecimal(5, decimal5, 6); + // -123456789987654321.45678, not compact, precision 23, scale 5 + Decimal decimal6(23, 5, DecimalUtils::StrToInt128("-12345678998765432145678").value()); + writer.WriteDecimal(6, decimal6, 23); + writer.Complete(); + + // test serialize + std::vector expected = { + 0, 0, 0, 0, 0, 0, 0, 0, 75, 231, 187, 74, 145, 1, 0, 0, 210, 4, 0, 0, + 64, 0, 0, 0, 64, 226, 1, 0, 0, 0, 0, 0, 10, 0, 0, 0, 72, 0, 0, 0, + 1, 0, 0, 0, 88, 0, 0, 0, 192, 29, 254, 255, 255, 255, 255, 255, 10, 0, 0, 0, + 104, 0, 0, 0, 232, 226, 187, 74, 145, 1, 0, 0, 2, 157, 66, 182, 167, 42, 157, 199, + 7, 14, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 253, 98, 189, 73, 88, 213, 98, 56, 248, 242, 0, 0, 0, 0, 0, 0}; + auto result_bytes = row.ToBytes(pool.get()); + std::vector result(result_bytes->size(), 0); + memcpy(result.data(), result_bytes->data(), result_bytes->size()); + ASSERT_EQ(expected, result); + // test hash code + ASSERT_EQ(0x773cf266, row.HashCode()); + + ASSERT_EQ(timestamp, row.GetTimestamp(0, Timestamp::MILLIS_PRECISION)); + ASSERT_EQ(timestamp1, row.GetTimestamp(1, Timestamp::DEFAULT_PRECISION)); + ASSERT_EQ(decimal, row.GetDecimal(2, 6, 2)); + ASSERT_EQ(decimal3, row.GetDecimal(3, 23, 5)); + ASSERT_EQ(decimal4, row.GetDecimal(4, 27, 4)); + ASSERT_EQ(decimal5, row.GetDecimal(5, 6, 2)); + ASSERT_EQ(decimal6, row.GetDecimal(6, 23, 5)); +} + +TEST_F(BinaryRowTest, TestWriteTimestampAndDecimalWithNull) { + auto pool = GetDefaultPool(); + BinaryRow row(5); + BinaryRowWriter writer(&row, 0, pool.get()); + writer.Reset(); + + // timestamp with max precision, compact + writer.WriteTimestamp(0, std::nullopt, Timestamp::MAX_PRECISION); + // timestamp with MAX_PRECISION, compact + Timestamp timestamp(1723535713567ll, 1234); + writer.WriteTimestamp(1, timestamp, Timestamp::MAX_PRECISION); + + // decimal precision 27, not compact + writer.WriteDecimal(2, std::nullopt, 27); + // -123456789987654321.45678, not compact, precision 23, scale 5 + Decimal decimal(23, 5, DecimalUtils::StrToInt128("-12345678998765432145678").value()); + writer.WriteDecimal(3, decimal, 23); + + writer.SetNullAt(4); + writer.Complete(); + + // test serialize + std::vector expected = { + 0, 21, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 48, 0, 0, 0, 210, 4, 0, 0, 56, 0, 0, 0, + 0, 0, 0, 0, 64, 0, 0, 0, 10, 0, 0, 0, 80, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 31, 229, 187, 74, 145, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 253, 98, 189, 73, 88, 213, 98, 56, 248, 242, 0, 0, 0, 0, 0, 0}; + auto result_bytes = row.ToBytes(pool.get()); + std::vector result(result_bytes->size(), 0); + memcpy(result.data(), result_bytes->data(), result_bytes->size()); + ASSERT_EQ(expected, result); + // test hash code + ASSERT_EQ(0xedbc7d65, row.HashCode()); + + // test value in binary row + ASSERT_TRUE(row.IsNullAt(0)); + ASSERT_EQ(timestamp, row.GetTimestamp(1, Timestamp::MAX_PRECISION)); + + ASSERT_TRUE(row.IsNullAt(2)); + ASSERT_EQ(decimal, row.GetDecimal(3, 23, 5)); + ASSERT_TRUE(row.IsNullAt(4)); +} + +TEST_F(BinaryRowTest, TestBinaryRowSerializer) { + srand(static_cast(time(nullptr))); + auto pool = GetDefaultPool(); + BinaryRow row(3); + BinaryRowWriter writer(&row, /*initial_size=*/1024, pool.get()); + writer.WriteInt(0, paimon::test::RandomNumber(0, 2000000)); + + int32_t str_size = 1024; + std::string test_string1, test_string2; + test_string1.reserve(str_size); + test_string2.reserve(str_size); + for (int32_t j = 0; j < str_size; j++) { + test_string1 += static_cast(paimon::test::RandomNumber(0, 25) + 'a'); + test_string2 += static_cast(paimon::test::RandomNumber(0, 25) + 'a'); + } + writer.WriteStringView(1, std::string_view{test_string1}); + writer.WriteStringView(2, std::string_view{test_string2}); + writer.Complete(); + + BinaryRowSerializer row_serializer(3, pool); + MemorySegmentOutputStream out(MemorySegmentOutputStream::DEFAULT_SEGMENT_SIZE, pool); + ASSERT_OK(row_serializer.Serialize(row, &out)); + PAIMON_UNIQUE_PTR bytes_serialize = + MemorySegmentUtils::CopyToBytes(out.Segments(), 0, out.CurrentSize(), pool.get()); + ASSERT_GE(bytes_serialize->size(), sizeof(int32_t) + test_string1.size() + test_string2.size()); + DataInputStream input( + std::make_shared(bytes_serialize->data(), bytes_serialize->size())); + ASSERT_OK_AND_ASSIGN(auto de_row, row_serializer.Deserialize(&input)); + ASSERT_EQ(de_row, row); + ASSERT_EQ(de_row.GetString(1).ToString(), test_string1); + ASSERT_EQ(de_row.GetString(2).ToString(), test_string2); +} + +} // namespace paimon::test diff --git a/src/paimon/common/data/binary_row_writer.cpp b/src/paimon/common/data/binary_row_writer.cpp new file mode 100644 index 0000000..eb4898e --- /dev/null +++ b/src/paimon/common/data/binary_row_writer.cpp @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "paimon/common/data/binary_row_writer.h" + +#include +#include +#include +#include +#include + +#include "arrow/type.h" +#include "arrow/util/checked_cast.h" +#include "fmt/format.h" +#include "paimon/common/data/binary_string.h" +#include "paimon/common/memory/memory_segment.h" +#include "paimon/common/utils/date_time_utils.h" +#include "paimon/data/decimal.h" +#include "paimon/data/timestamp.h" +#include "paimon/memory/bytes.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/status.h" + +namespace paimon { + +BinaryRowWriter::BinaryRowWriter(BinaryRow* row, int32_t initial_size, MemoryPool* pool) + : null_bits_size_in_bytes_(BinaryRow::CalculateBitSetWidthInBytes(row->GetFieldCount())), + fixed_size_(row->GetFixedLengthPartSize()) { + cursor_ = fixed_size_; + row_ = row; + segment_ = MemorySegment::Wrap(Bytes::AllocateBytes(fixed_size_ + initial_size, pool)); + row_->PointTo(segment_, 0, segment_.Size()); + pool_ = pool; +} + +Result BinaryRowWriter::CreateFieldSetter( + int32_t field_idx, const std::shared_ptr& field_type) { + arrow::Type::type type = field_type->id(); + BinaryRowWriter::FieldSetterFunc field_setter; + switch (type) { + case arrow::Type::type::BOOL: { + field_setter = [field_idx](const VariantType& field, BinaryRowWriter* writer) -> void { + return writer->WriteBoolean(field_idx, DataDefine::GetVariantValue(field)); + }; + break; + } + case arrow::Type::type::INT8: { + field_setter = [field_idx](const VariantType& field, BinaryRowWriter* writer) -> void { + return writer->WriteByte(field_idx, DataDefine::GetVariantValue(field)); + }; + break; + } + case arrow::Type::type::INT16: { + field_setter = [field_idx](const VariantType& field, BinaryRowWriter* writer) -> void { + return writer->WriteShort(field_idx, DataDefine::GetVariantValue(field)); + }; + break; + } + case arrow::Type::type::INT32: { + field_setter = [field_idx](const VariantType& field, BinaryRowWriter* writer) -> void { + return writer->WriteInt(field_idx, DataDefine::GetVariantValue(field)); + }; + break; + } + case arrow::Type::type::INT64: { + field_setter = [field_idx](const VariantType& field, BinaryRowWriter* writer) -> void { + return writer->WriteLong(field_idx, DataDefine::GetVariantValue(field)); + }; + break; + } + case arrow::Type::type::FLOAT: { + field_setter = [field_idx](const VariantType& field, BinaryRowWriter* writer) -> void { + return writer->WriteFloat(field_idx, DataDefine::GetVariantValue(field)); + }; + break; + } + case arrow::Type::type::DOUBLE: { + field_setter = [field_idx](const VariantType& field, BinaryRowWriter* writer) -> void { + return writer->WriteDouble(field_idx, DataDefine::GetVariantValue(field)); + }; + break; + } + case arrow::Type::type::DATE32: { + field_setter = [field_idx](const VariantType& field, BinaryRowWriter* writer) -> void { + return writer->WriteInt(field_idx, DataDefine::GetVariantValue(field)); + }; + break; + } + case arrow::Type::type::STRING: { + field_setter = [field_idx](const VariantType& field, BinaryRowWriter* writer) -> void { + const auto* view = DataDefine::GetVariantPtr(field); + if (view) { + return writer->WriteStringView(field_idx, *view); + } + return writer->WriteString(field_idx, + DataDefine::GetVariantValue(field)); + }; + break; + } + case arrow::Type::type::BINARY: { + field_setter = [field_idx](const VariantType& field, BinaryRowWriter* writer) -> void { + const auto* view = DataDefine::GetVariantPtr(field); + if (view) { + return writer->WriteStringView(field_idx, *view); + } + return writer->WriteBinary( + field_idx, *DataDefine::GetVariantValue>(field)); + }; + break; + } + case arrow::Type::type::TIMESTAMP: { + auto timestamp_type = + arrow::internal::checked_pointer_cast(field_type); + int32_t precision = DateTimeUtils::GetPrecisionFromType(timestamp_type); + field_setter = [field_idx, precision](const VariantType& field, + BinaryRowWriter* writer) -> void { + if (DataDefine::IsVariantNull(field)) { + if (!Timestamp::IsCompact(precision)) { + writer->WriteTimestamp(field_idx, std::nullopt, precision); + } else { + writer->SetNullAt(field_idx); + } + return; + } + return writer->WriteTimestamp( + field_idx, DataDefine::GetVariantValue(field), precision); + }; + return field_setter; + } + case arrow::Type::type::DECIMAL: { + auto* decimal_type = + arrow::internal::checked_cast(field_type.get()); + assert(decimal_type); + auto precision = decimal_type->precision(); + field_setter = [field_idx, precision](const VariantType& field, + BinaryRowWriter* writer) -> void { + if (DataDefine::IsVariantNull(field)) { + if (!Decimal::IsCompact(precision)) { + writer->WriteDecimal(field_idx, std::nullopt, precision); + } else { + writer->SetNullAt(field_idx); + } + return; + } + return writer->WriteDecimal(field_idx, DataDefine::GetVariantValue(field), + precision); + }; + return field_setter; + } + default: + return Status::Invalid( + fmt::format("type {} not support in data setter", field_type->ToString())); + } + BinaryRowWriter::FieldSetterFunc ret = + [field_idx, field_setter](const VariantType& field, BinaryRowWriter* writer) -> void { + if (DataDefine::IsVariantNull(field)) { + writer->SetNullAt(field_idx); + return; + } + field_setter(field, writer); + }; + return ret; +} + +} // namespace paimon diff --git a/src/paimon/common/data/binary_row_writer.h b/src/paimon/common/data/binary_row_writer.h new file mode 100644 index 0000000..06a5816 --- /dev/null +++ b/src/paimon/common/data/binary_row_writer.h @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include + +#include "paimon/common/data/abstract_binary_writer.h" +#include "paimon/common/data/binary_row.h" +#include "paimon/common/data/data_define.h" +#include "paimon/common/memory/memory_segment.h" +#include "paimon/common/memory/memory_segment_utils.h" +#include "paimon/common/types/row_kind.h" +#include "paimon/result.h" + +namespace arrow { +class DataType; +} // namespace arrow + +namespace paimon { +class MemoryPool; +/// Writer for `BinaryRow`. +class BinaryRowWriter : public AbstractBinaryWriter { + public: + BinaryRowWriter(BinaryRow* row, int32_t initial_size, MemoryPool* pool); + + /// First, call `Reset()` before set/write value. + void Reset() override { + cursor_ = fixed_size_; + for (int32_t i = 0; i < null_bits_size_in_bytes_; i += 8) { + segment_.PutValue(i, 0L); + } + } + + /// Default not null. + /// @note If type is decimal or timestamp with no compact, use + /// `WriteTimestamp(pos, null, precision)` to set null rather than `SetNullAt(pos)` + void SetNullAt(int32_t pos) override { + SetNullBit(pos); + segment_.PutValue(GetFieldOffset(pos), 0L); + } + + void SetNullBit(int32_t pos) override { + MemorySegmentUtils::BitSet(&segment_, 0, pos + BinaryRow::HEADER_SIZE_IN_BITS); + } + + void WriteRowKind(const RowKind* kind) { + segment_.Put(0, kind->ToByteValue()); + } + + void WriteBoolean(int32_t pos, bool value) override { + segment_.PutValue(GetFieldOffset(pos), value); + } + + void WriteByte(int32_t pos, int8_t value) override { + segment_.PutValue(GetFieldOffset(pos), value); + } + + void WriteShort(int32_t pos, int16_t value) override { + segment_.PutValue(GetFieldOffset(pos), value); + } + + void WriteInt(int32_t pos, int32_t value) override { + segment_.PutValue(GetFieldOffset(pos), value); + } + + void WriteLong(int32_t pos, int64_t value) override { + segment_.PutValue(GetFieldOffset(pos), value); + } + + void WriteFloat(int32_t pos, float value) override { + segment_.PutValue(GetFieldOffset(pos), value); + } + + void WriteDouble(int32_t pos, double value) override { + segment_.PutValue(GetFieldOffset(pos), value); + } + + void Complete() override { + row_->SetTotalSize(cursor_); + } + + int32_t GetFieldOffset(int32_t pos) const override { + return null_bits_size_in_bytes_ + 8 * pos; + } + + void SetOffsetAndSize(int32_t pos, int32_t offset, int64_t size) override { + const int64_t offset_and_size = (static_cast(offset) << 32) | size; + segment_.PutValue(GetFieldOffset(pos), offset_and_size); + } + + void AfterGrow() override { + row_->PointTo(segment_, 0, segment_.Size()); + } + + /// only support non-nested type + using FieldSetterFunc = std::function; + static Result CreateFieldSetter( + int32_t field_idx, const std::shared_ptr& field_type); + + private: + int32_t null_bits_size_in_bytes_; + int32_t fixed_size_; + BinaryRow* row_; +}; + +} // namespace paimon diff --git a/src/paimon/common/data/binary_row_writer_test.cpp b/src/paimon/common/data/binary_row_writer_test.cpp new file mode 100644 index 0000000..fc5ca42 --- /dev/null +++ b/src/paimon/common/data/binary_row_writer_test.cpp @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "paimon/common/data/binary_row_writer.h" + +#include +#include +#include + +#include "arrow/type_fwd.h" +#include "gtest/gtest.h" +#include "paimon/common/data/binary_array.h" +#include "paimon/common/data/binary_map.h" +#include "paimon/common/data/binary_string.h" +#include "paimon/common/data/internal_row.h" +#include "paimon/common/utils/date_time_utils.h" +#include "paimon/data/decimal.h" +#include "paimon/data/timestamp.h" +#include "paimon/memory/bytes.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/status.h" +#include "paimon/testing/utils/binary_row_generator.h" +#include "paimon/testing/utils/testharness.h" +namespace paimon::test { +TEST(BinaryRowWriterTest, TestFieldSetter) { + int32_t arity = 24; + BinaryRow row(arity); + BinaryRowWriter writer(&row, /*initial_size=*/0, GetDefaultPool().get()); + writer.Reset(); + + auto pool = GetDefaultPool(); + // set process + ASSERT_OK_AND_ASSIGN(BinaryRowWriter::FieldSetterFunc setter0, + BinaryRowWriter::CreateFieldSetter(0, arrow::boolean())); + ASSERT_OK_AND_ASSIGN(BinaryRowWriter::FieldSetterFunc setter1, + BinaryRowWriter::CreateFieldSetter(1, arrow::int8())); + ASSERT_OK_AND_ASSIGN(BinaryRowWriter::FieldSetterFunc setter2, + BinaryRowWriter::CreateFieldSetter(2, arrow::int16())); + ASSERT_OK_AND_ASSIGN(BinaryRowWriter::FieldSetterFunc setter3, + BinaryRowWriter::CreateFieldSetter(3, arrow::int32())); + ASSERT_OK_AND_ASSIGN(BinaryRowWriter::FieldSetterFunc setter4, + BinaryRowWriter::CreateFieldSetter(4, arrow::int64())); + ASSERT_OK_AND_ASSIGN(BinaryRowWriter::FieldSetterFunc setter5, + BinaryRowWriter::CreateFieldSetter(5, arrow::float32())); + ASSERT_OK_AND_ASSIGN(BinaryRowWriter::FieldSetterFunc setter6, + BinaryRowWriter::CreateFieldSetter(6, arrow::float64())); + ASSERT_OK_AND_ASSIGN(BinaryRowWriter::FieldSetterFunc setter7, + BinaryRowWriter::CreateFieldSetter(7, arrow::utf8())); + ASSERT_OK_AND_ASSIGN(BinaryRowWriter::FieldSetterFunc setter8, + BinaryRowWriter::CreateFieldSetter(8, arrow::binary())); + ASSERT_OK_AND_ASSIGN(BinaryRowWriter::FieldSetterFunc setter9, + BinaryRowWriter::CreateFieldSetter(9, arrow::date32())); + ASSERT_OK_AND_ASSIGN(BinaryRowWriter::FieldSetterFunc setter10, + BinaryRowWriter::CreateFieldSetter(10, arrow::decimal128(5, 2))); + ASSERT_OK_AND_ASSIGN( + BinaryRowWriter::FieldSetterFunc setter11, + BinaryRowWriter::CreateFieldSetter(11, arrow::timestamp(arrow::TimeUnit::NANO))); + ASSERT_OK_AND_ASSIGN(BinaryRowWriter::FieldSetterFunc setter12, + BinaryRowWriter::CreateFieldSetter(12, arrow::utf8())); + ASSERT_OK_AND_ASSIGN(BinaryRowWriter::FieldSetterFunc setter13, + BinaryRowWriter::CreateFieldSetter(13, arrow::binary())); + ASSERT_OK_AND_ASSIGN(BinaryRowWriter::FieldSetterFunc setter14, + BinaryRowWriter::CreateFieldSetter(14, arrow::decimal128(37, 2))); + ASSERT_OK_AND_ASSIGN( + BinaryRowWriter::FieldSetterFunc setter15, + BinaryRowWriter::CreateFieldSetter(15, arrow::timestamp(arrow::TimeUnit::NANO))); + // for timestamp type + ASSERT_OK_AND_ASSIGN( + BinaryRowWriter::FieldSetterFunc setter16, + BinaryRowWriter::CreateFieldSetter(16, arrow::timestamp(arrow::TimeUnit::SECOND))); + ASSERT_OK_AND_ASSIGN( + BinaryRowWriter::FieldSetterFunc setter17, + BinaryRowWriter::CreateFieldSetter(17, arrow::timestamp(arrow::TimeUnit::MILLI))); + ASSERT_OK_AND_ASSIGN( + BinaryRowWriter::FieldSetterFunc setter18, + BinaryRowWriter::CreateFieldSetter(18, arrow::timestamp(arrow::TimeUnit::MICRO))); + ASSERT_OK_AND_ASSIGN( + BinaryRowWriter::FieldSetterFunc setter19, + BinaryRowWriter::CreateFieldSetter(19, arrow::timestamp(arrow::TimeUnit::NANO))); + auto timezone = DateTimeUtils::GetLocalTimezoneName(); + ASSERT_OK_AND_ASSIGN(BinaryRowWriter::FieldSetterFunc setter20, + BinaryRowWriter::CreateFieldSetter( + 20, arrow::timestamp(arrow::TimeUnit::SECOND, timezone))); + ASSERT_OK_AND_ASSIGN( + BinaryRowWriter::FieldSetterFunc setter21, + BinaryRowWriter::CreateFieldSetter(21, arrow::timestamp(arrow::TimeUnit::MILLI, timezone))); + ASSERT_OK_AND_ASSIGN( + BinaryRowWriter::FieldSetterFunc setter22, + BinaryRowWriter::CreateFieldSetter(22, arrow::timestamp(arrow::TimeUnit::MICRO, timezone))); + ASSERT_OK_AND_ASSIGN( + BinaryRowWriter::FieldSetterFunc setter23, + BinaryRowWriter::CreateFieldSetter(23, arrow::timestamp(arrow::TimeUnit::NANO, timezone))); + + setter0(true, &writer); + setter1(static_cast(1), &writer); + setter2(static_cast(2), &writer); + setter3(static_cast(3), &writer); + setter4(static_cast(4), &writer); + setter5(static_cast(5.5), &writer); + setter6(6.66, &writer); + std::string data = "abc"; + setter7(std::string_view(data.data(), data.size()), &writer); + setter8(std::string_view(data.data(), data.size()), &writer); + setter9(9, &writer); + setter10(Decimal(5, 2, 123), &writer); + setter11(Timestamp(11, 12), &writer); + setter12(BinaryString::FromString("hello", pool.get()), &writer); + auto bytes = std::make_shared("world", pool.get()); + setter13(bytes, &writer); + setter14(NullType(), &writer); + setter15(NullType(), &writer); + + setter16(Timestamp(1758030901000l, 0), &writer); + setter17(Timestamp(1758030901001l, 0), &writer); + setter18(NullType(), &writer); + setter19(Timestamp(1758030901001l, 1001), &writer); + + setter20(NullType(), &writer); + setter21(Timestamp(1758030902002l, 0), &writer); + setter22(Timestamp(1758030902002l, 2000), &writer); + setter23(Timestamp(1758030902002l, 2002), &writer); + + writer.Complete(); + + // get process + ASSERT_OK_AND_ASSIGN(InternalRow::FieldGetterFunc getter0, + InternalRow::CreateFieldGetter(0, arrow::boolean(), /*use_view=*/true)); + ASSERT_OK_AND_ASSIGN(InternalRow::FieldGetterFunc getter1, + InternalRow::CreateFieldGetter(1, arrow::int8(), /*use_view=*/true)); + ASSERT_OK_AND_ASSIGN(InternalRow::FieldGetterFunc getter2, + InternalRow::CreateFieldGetter(2, arrow::int16(), /*use_view=*/true)); + ASSERT_OK_AND_ASSIGN(InternalRow::FieldGetterFunc getter3, + InternalRow::CreateFieldGetter(3, arrow::int32(), /*use_view=*/true)); + ASSERT_OK_AND_ASSIGN(InternalRow::FieldGetterFunc getter4, + InternalRow::CreateFieldGetter(4, arrow::int64(), /*use_view=*/true)); + ASSERT_OK_AND_ASSIGN(InternalRow::FieldGetterFunc getter5, + InternalRow::CreateFieldGetter(5, arrow::float32(), /*use_view=*/true)); + ASSERT_OK_AND_ASSIGN(InternalRow::FieldGetterFunc getter6, + InternalRow::CreateFieldGetter(6, arrow::float64(), /*use_view=*/true)); + ASSERT_OK_AND_ASSIGN(InternalRow::FieldGetterFunc getter7, + InternalRow::CreateFieldGetter(7, arrow::utf8(), /*use_view=*/true)); + ASSERT_OK_AND_ASSIGN(InternalRow::FieldGetterFunc getter8, + InternalRow::CreateFieldGetter(8, arrow::binary(), + /*use_view=*/true)); + ASSERT_OK_AND_ASSIGN(InternalRow::FieldGetterFunc getter9, + InternalRow::CreateFieldGetter(9, arrow::date32(), /*use_view=*/true)); + ASSERT_OK_AND_ASSIGN( + InternalRow::FieldGetterFunc getter10, + InternalRow::CreateFieldGetter(10, arrow::decimal128(5, 2), /*use_view=*/true)); + ASSERT_OK_AND_ASSIGN(InternalRow::FieldGetterFunc getter11, + InternalRow::CreateFieldGetter(11, arrow::timestamp(arrow::TimeUnit::NANO), + /*use_view=*/true)); + ASSERT_OK_AND_ASSIGN(InternalRow::FieldGetterFunc getter12, + InternalRow::CreateFieldGetter(12, arrow::utf8(), /*use_view=*/false)); + ASSERT_OK_AND_ASSIGN(InternalRow::FieldGetterFunc getter13, + InternalRow::CreateFieldGetter(13, arrow::binary(), + /*use_view=*/false)); + ASSERT_OK_AND_ASSIGN( + InternalRow::FieldGetterFunc getter14, + InternalRow::CreateFieldGetter(14, arrow::decimal128(37, 2), /*use_view=*/true)); + ASSERT_OK_AND_ASSIGN(InternalRow::FieldGetterFunc getter15, + InternalRow::CreateFieldGetter(15, arrow::timestamp(arrow::TimeUnit::NANO), + /*use_view=*/true)); + // for timestamp type + ASSERT_OK_AND_ASSIGN( + InternalRow::FieldGetterFunc getter16, + InternalRow::CreateFieldGetter(16, arrow::timestamp(arrow::TimeUnit::SECOND), + /*use_view=*/true)); + ASSERT_OK_AND_ASSIGN( + InternalRow::FieldGetterFunc getter17, + InternalRow::CreateFieldGetter(17, arrow::timestamp(arrow::TimeUnit::MILLI), + /*use_view=*/true)); + ASSERT_OK_AND_ASSIGN( + InternalRow::FieldGetterFunc getter18, + InternalRow::CreateFieldGetter(18, arrow::timestamp(arrow::TimeUnit::MICRO), + /*use_view=*/true)); + ASSERT_OK_AND_ASSIGN(InternalRow::FieldGetterFunc getter19, + InternalRow::CreateFieldGetter(19, arrow::timestamp(arrow::TimeUnit::NANO), + /*use_view=*/true)); + ASSERT_OK_AND_ASSIGN( + InternalRow::FieldGetterFunc getter20, + InternalRow::CreateFieldGetter(20, arrow::timestamp(arrow::TimeUnit::SECOND), + /*use_view=*/true)); + ASSERT_OK_AND_ASSIGN( + InternalRow::FieldGetterFunc getter21, + InternalRow::CreateFieldGetter(21, arrow::timestamp(arrow::TimeUnit::MILLI), + /*use_view=*/true)); + ASSERT_OK_AND_ASSIGN( + InternalRow::FieldGetterFunc getter22, + InternalRow::CreateFieldGetter(22, arrow::timestamp(arrow::TimeUnit::MICRO), + /*use_view=*/true)); + ASSERT_OK_AND_ASSIGN(InternalRow::FieldGetterFunc getter23, + InternalRow::CreateFieldGetter(23, arrow::timestamp(arrow::TimeUnit::NANO), + /*use_view=*/true)); + + ASSERT_EQ(DataDefine::GetVariantValue(getter0(row)), true); + ASSERT_EQ(DataDefine::GetVariantValue(getter1(row)), static_cast(1)); + ASSERT_EQ(DataDefine::GetVariantValue(getter2(row)), static_cast(2)); + ASSERT_EQ(DataDefine::GetVariantValue(getter3(row)), static_cast(3)); + ASSERT_EQ(DataDefine::GetVariantValue(getter4(row)), static_cast(4)); + ASSERT_EQ(DataDefine::GetVariantValue(getter5(row)), static_cast(5.5)); + ASSERT_EQ(DataDefine::GetVariantValue(getter6(row)), static_cast(6.66)); + + ASSERT_EQ(DataDefine::GetVariantValue(getter7(row)), std::string_view(data)); + ASSERT_EQ(DataDefine::GetVariantValue(getter8(row)), std::string_view(data)); + ASSERT_EQ(DataDefine::GetVariantValue(getter9(row)), 9); + ASSERT_EQ(DataDefine::GetVariantValue(getter10(row)), Decimal(5, 2, 123)); + ASSERT_EQ(DataDefine::GetVariantValue(getter11(row)), Timestamp(11, 12)); + ASSERT_EQ(DataDefine::GetVariantValue(getter12(row)).ToString(), "hello"); + ASSERT_EQ(*DataDefine::GetVariantValue>(getter13(row)), + Bytes("world", pool.get())); + ASSERT_TRUE(DataDefine::IsVariantNull(getter14(row))); + ASSERT_TRUE(DataDefine::IsVariantNull(getter15(row))); + + ASSERT_EQ(DataDefine::GetVariantValue(getter16(row)), Timestamp(1758030901000l, 0)); + ASSERT_EQ(DataDefine::GetVariantValue(getter17(row)), Timestamp(1758030901001l, 0)); + ASSERT_TRUE(DataDefine::IsVariantNull(getter18(row))); + ASSERT_EQ(DataDefine::GetVariantValue(getter19(row)), + Timestamp(1758030901001l, 1001)); + + ASSERT_TRUE(DataDefine::IsVariantNull(getter20(row))); + ASSERT_EQ(DataDefine::GetVariantValue(getter21(row)), Timestamp(1758030902002l, 0)); + ASSERT_EQ(DataDefine::GetVariantValue(getter22(row)), + Timestamp(1758030902002l, 2000)); + ASSERT_EQ(DataDefine::GetVariantValue(getter23(row)), + Timestamp(1758030902002l, 2002)); +} + +TEST(BinaryRowWriterTest, TestFieldSetterWithNull) { + int32_t arity = 5; + BinaryRow row(arity); + BinaryRowWriter writer(&row, /*initial_size=*/0, GetDefaultPool().get()); + writer.Reset(); + auto timezone = DateTimeUtils::GetLocalTimezoneName(); + ASSERT_OK_AND_ASSIGN(BinaryRowWriter::FieldSetterFunc setter0, + BinaryRowWriter::CreateFieldSetter(0, arrow::boolean())); + ASSERT_OK_AND_ASSIGN(BinaryRowWriter::FieldSetterFunc setter1, + BinaryRowWriter::CreateFieldSetter(1, arrow::int8())); + ASSERT_OK_AND_ASSIGN(BinaryRowWriter::FieldSetterFunc setter2, + BinaryRowWriter::CreateFieldSetter(2, arrow::decimal128(5, 2))); + ASSERT_OK_AND_ASSIGN( + BinaryRowWriter::FieldSetterFunc setter3, + BinaryRowWriter::CreateFieldSetter(3, arrow::timestamp(arrow::TimeUnit::SECOND))); + ASSERT_OK_AND_ASSIGN( + BinaryRowWriter::FieldSetterFunc setter4, + BinaryRowWriter::CreateFieldSetter(4, arrow::timestamp(arrow::TimeUnit::NANO, timezone))); + + setter0(true, &writer); + setter1(NullType(), &writer); + setter2(NullType(), &writer); + setter3(NullType(), &writer); + setter4(NullType(), &writer); + writer.Complete(); + + ASSERT_EQ(row.GetBoolean(0), true); + ASSERT_TRUE(row.IsNullAt(1)); + ASSERT_TRUE(row.IsNullAt(2)); + ASSERT_TRUE(row.IsNullAt(3)); + ASSERT_TRUE(row.IsNullAt(4)); +} + +TEST(BinaryRowWriterTest, TestWriteNested) { + auto pool = GetDefaultPool(); + BinaryRow inner_row = + BinaryRowGenerator::GenerateRow({std::string("Alice"), 30, 12.1, NullType()}, pool.get()); + BinaryArray inner_array = BinaryArray::FromIntArray({10, 20, 30}, pool.get()); + auto key = BinaryArray::FromIntArray({1, 2, 3, 5}, pool.get()); + auto value = BinaryArray::FromLongArray({100ll, 200ll, 300ll, 500ll}, pool.get()); + auto inner_map = BinaryMap::ValueOf(key, value, pool.get()); + + BinaryRow row(3); + BinaryRowWriter writer(&row, /*initial_size=*/1024, pool.get()); + writer.WriteRow(0, inner_row); + writer.WriteArray(1, inner_array); + writer.WriteMap(2, *inner_map); + writer.Complete(); + + ASSERT_EQ(3, row.GetFieldCount()); + ASSERT_FALSE(row.IsNullAt(0)); + + auto de_row = row.GetRow(0, 4); + ASSERT_EQ(std::dynamic_pointer_cast(de_row)->HashCode(), inner_row.HashCode()); + ASSERT_EQ(4, de_row->GetFieldCount()); + ASSERT_EQ(de_row->GetString(0).ToString(), "Alice"); + ASSERT_EQ(de_row->GetInt(1), 30); + ASSERT_EQ(de_row->GetDouble(2), 12.1); + ASSERT_TRUE(de_row->IsNullAt(3)); + + auto de_array = row.GetArray(1); + ASSERT_EQ(std::dynamic_pointer_cast(de_array)->HashCode(), inner_array.HashCode()); + ASSERT_EQ(de_array->ToIntArray().value(), std::vector({10, 20, 30})); + + auto de_map = row.GetMap(2); + ASSERT_EQ(std::dynamic_pointer_cast(de_map->KeyArray())->HashCode(), + key.HashCode()); + ASSERT_EQ(std::dynamic_pointer_cast(de_map->ValueArray())->HashCode(), + value.HashCode()); + ASSERT_EQ(de_map->KeyArray()->ToIntArray().value(), std::vector({1, 2, 3, 5})); + ASSERT_EQ(de_map->ValueArray()->ToLongArray().value(), + std::vector({100ll, 200ll, 300ll, 500ll})); +} +} // namespace paimon::test diff --git a/src/paimon/common/data/binary_writer.h b/src/paimon/common/data/binary_writer.h new file mode 100644 index 0000000..d9871f7 --- /dev/null +++ b/src/paimon/common/data/binary_writer.h @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include +#include + +namespace paimon { + +class Bytes; +class BinaryString; +class BinaryRow; +class BinaryArray; +class BinaryMap; +class Timestamp; +class Decimal; + +/// Writer to write a composite data format, like row, array. 1. Invoke `Reset()`. 2. Write each +/// field by `WriteXX()` or `SetNullAt()`. (Same field can not be written repeatedly.) 3. Invoke +/// `Complete()`. +class BinaryWriter { + public: + virtual ~BinaryWriter() = default; + /// Reset writer to prepare next write. + virtual void Reset() = 0; + + /// Set null to this field. + virtual void SetNullAt(int32_t pos) = 0; + + virtual void WriteBoolean(int32_t pos, bool value) = 0; + + virtual void WriteByte(int32_t pos, int8_t value) = 0; + + virtual void WriteShort(int32_t pos, int16_t value) = 0; + + virtual void WriteInt(int32_t pos, int32_t value) = 0; + + virtual void WriteLong(int32_t pos, int64_t value) = 0; + + virtual void WriteFloat(int32_t pos, float value) = 0; + + virtual void WriteDouble(int32_t pos, double value) = 0; + + virtual void WriteString(int32_t pos, const BinaryString& value) = 0; + + virtual void WriteBinary(int32_t pos, const Bytes& bytes) = 0; + + virtual void WriteStringView(int32_t pos, const std::string_view& view) = 0; + + virtual void WriteDecimal(int32_t pos, const std::optional& value, + int32_t precision) = 0; + + virtual void WriteTimestamp(int32_t pos, const std::optional& value, + int32_t precision) = 0; + + virtual void WriteArray(int32_t pos, const BinaryArray& value) = 0; + + virtual void WriteRow(int32_t pos, const BinaryRow& value) = 0; + + virtual void WriteMap(int32_t pos, const BinaryMap& input) = 0; + + /// Finally, complete write to set real size to binary. + virtual void Complete() = 0; +}; + +} // namespace paimon From ab5fe20dcaa24533699eec77febcf549f4d767f1 Mon Sep 17 00:00:00 2001 From: "lisizhuo.lsz" Date: Tue, 26 May 2026 11:44:45 +0000 Subject: [PATCH 2/5] add binary_map & binary_array --- src/paimon/common/data/binary_array.cpp | 301 ++++++++++ src/paimon/common/data/binary_array.h | 123 +++++ src/paimon/common/data/binary_array_test.cpp | 518 ++++++++++++++++++ .../common/data/binary_array_writer.cpp | 139 +++++ src/paimon/common/data/binary_array_writer.h | 98 ++++ src/paimon/common/data/binary_map.h | 90 +++ src/paimon/common/data/binary_map_test.cpp | 42 ++ 7 files changed, 1311 insertions(+) create mode 100644 src/paimon/common/data/binary_array.cpp create mode 100644 src/paimon/common/data/binary_array.h create mode 100644 src/paimon/common/data/binary_array_test.cpp create mode 100644 src/paimon/common/data/binary_array_writer.cpp create mode 100644 src/paimon/common/data/binary_array_writer.h create mode 100644 src/paimon/common/data/binary_map.h create mode 100644 src/paimon/common/data/binary_map_test.cpp diff --git a/src/paimon/common/data/binary_array.cpp b/src/paimon/common/data/binary_array.cpp new file mode 100644 index 0000000..de195b8 --- /dev/null +++ b/src/paimon/common/data/binary_array.cpp @@ -0,0 +1,301 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "paimon/common/data/binary_array.h" + +#include +#include +#include + +#include "paimon/common/data/binary_array_writer.h" +#include "paimon/common/data/binary_data_read_utils.h" +#include "paimon/common/memory/memory_segment.h" +#include "paimon/memory/memory_pool.h" + +namespace paimon { + +int32_t BinaryArray::CalculateHeaderInBytes(int32_t num_fields) { + return 4 + ((num_fields + 31) / 32) * 4; +} + +void BinaryArray::AssertIndexIsValid(int32_t ordinal) const { + assert(ordinal >= 0); + assert(ordinal < size_); +} +int32_t BinaryArray::GetElementOffset(int32_t ordinal, int32_t element_size) const { + return element_offset_ + ordinal * element_size; +} + +void BinaryArray::PointTo(const MemorySegment& segment, int32_t offset, int32_t size_in_bytes) { + auto size = MemorySegmentUtils::GetValue({segment}, offset); + assert(size >= 0); + size_ = size; + segment_ = segment; + offset_ = offset; + size_in_bytes_ = size_in_bytes; + element_offset_ = offset_ + CalculateHeaderInBytes(size_); +} + +bool BinaryArray::IsNullAt(int32_t pos) const { + AssertIndexIsValid(pos); + return MemorySegmentUtils::BitGet({segment_}, offset_ + 4, pos); +} + +int64_t BinaryArray::GetLong(int32_t pos) const { + AssertIndexIsValid(pos); + return MemorySegmentUtils::GetValue({segment_}, GetElementOffset(pos, 8)); +} + +int32_t BinaryArray::GetInt(int32_t pos) const { + AssertIndexIsValid(pos); + return MemorySegmentUtils::GetValue({segment_}, GetElementOffset(pos, 4)); +} +int32_t BinaryArray::GetDate(int32_t pos) const { + return GetInt(pos); +} + +BinaryString BinaryArray::GetString(int32_t pos) const { + AssertIndexIsValid(pos); + int32_t field_offset = GetElementOffset(pos, 8); + const auto offset_and_size = MemorySegmentUtils::GetValue({segment_}, field_offset); + return BinaryDataReadUtils::ReadBinaryString(segment_, offset_, field_offset, offset_and_size); +} + +std::string_view BinaryArray::GetStringView(int32_t pos) const { + BinaryString binary_string = GetString(pos); + return binary_string.GetStringView(); +} + +Decimal BinaryArray::GetDecimal(int32_t pos, int32_t precision, int32_t scale) const { + AssertIndexIsValid(pos); + if (Decimal::IsCompact(precision)) { + return Decimal::FromUnscaledLong( + MemorySegmentUtils::GetValue({segment_}, GetElementOffset(pos, 8)), precision, + scale); + } + + int32_t field_offset = GetElementOffset(pos, 8); + const auto offset_and_size = MemorySegmentUtils::GetValue({segment_}, field_offset); + return BinaryDataReadUtils::ReadDecimal(segment_, offset_, offset_and_size, precision, scale); +} + +Timestamp BinaryArray::GetTimestamp(int32_t pos, int32_t precision) const { + AssertIndexIsValid(pos); + + if (Timestamp::IsCompact(precision)) { + return Timestamp::FromEpochMillis( + MemorySegmentUtils::GetValue({segment_}, GetElementOffset(pos, 8))); + } + + int32_t field_offset = GetElementOffset(pos, 8); + const auto offset_and_nano_of_milli = + MemorySegmentUtils::GetValue({segment_}, field_offset); + return BinaryDataReadUtils::ReadTimestampData(segment_, offset_, offset_and_nano_of_milli); +} + +std::shared_ptr BinaryArray::GetBinary(int32_t pos) const { + AssertIndexIsValid(pos); + int32_t field_offset = GetElementOffset(pos, 8); + const auto offset_and_size = MemorySegmentUtils::GetValue({segment_}, field_offset); + return BinarySection::ReadBinary(segment_, offset_, field_offset, offset_and_size, + GetDefaultPool().get()); +} + +std::shared_ptr BinaryArray::GetArray(int32_t pos) const { + AssertIndexIsValid(pos); + return BinaryDataReadUtils::ReadArrayData(segment_, offset_, GetLong(pos)); +} + +std::shared_ptr BinaryArray::GetMap(int32_t pos) const { + AssertIndexIsValid(pos); + return BinaryDataReadUtils::ReadMapData(segment_, offset_, GetLong(pos)); +} + +std::shared_ptr BinaryArray::GetRow(int32_t pos, int32_t num_fields) const { + AssertIndexIsValid(pos); + int32_t field_offset = GetElementOffset(pos, 8); + const auto offset_and_size = MemorySegmentUtils::GetValue({segment_}, field_offset); + return BinaryDataReadUtils::ReadRowData(segment_, num_fields, offset_, offset_and_size); +} + +bool BinaryArray::GetBoolean(int32_t pos) const { + AssertIndexIsValid(pos); + return MemorySegmentUtils::GetValue({segment_}, GetElementOffset(pos, 1)); +} + +char BinaryArray::GetByte(int32_t pos) const { + AssertIndexIsValid(pos); + return MemorySegmentUtils::GetValue({segment_}, GetElementOffset(pos, 1)); +} + +int16_t BinaryArray::GetShort(int32_t pos) const { + AssertIndexIsValid(pos); + return MemorySegmentUtils::GetValue({segment_}, + GetElementOffset(pos, sizeof(int16_t))); +} + +float BinaryArray::GetFloat(int32_t pos) const { + AssertIndexIsValid(pos); + return MemorySegmentUtils::GetValue({segment_}, GetElementOffset(pos, sizeof(float))); +} + +double BinaryArray::GetDouble(int32_t pos) const { + AssertIndexIsValid(pos); + return MemorySegmentUtils::GetValue({segment_}, GetElementOffset(pos, sizeof(double))); +} + +bool BinaryArray::AnyNull() const { + for (int32_t i = offset_ + 4; i < element_offset_; i += 4) { + if (MemorySegmentUtils::GetValue({segment_}, i) != 0) { + return true; + } + } + return false; +} + +Result> BinaryArray::ToBooleanArray() const { + PAIMON_RETURN_NOT_OK(CheckNoNull()); + std::vector values; + values.resize(size_); + MemorySegmentUtils::CopyToUnsafe({segment_}, element_offset_, + const_cast(static_cast(values.data())), + size_); + return values; +} + +Result> BinaryArray::ToByteArray() const { + PAIMON_RETURN_NOT_OK(CheckNoNull()); + std::vector values; + values.resize(size_); + MemorySegmentUtils::CopyToUnsafe({segment_}, element_offset_, + const_cast(static_cast(values.data())), + size_); + return values; +} + +Result> BinaryArray::ToShortArray() const { + PAIMON_RETURN_NOT_OK(CheckNoNull()); + std::vector values; + values.resize(size_); + MemorySegmentUtils::CopyToUnsafe({segment_}, element_offset_, + const_cast(static_cast(values.data())), + size_ * sizeof(int16_t)); + return values; +} + +Result> BinaryArray::ToIntArray() const { + PAIMON_RETURN_NOT_OK(CheckNoNull()); + std::vector values; + values.resize(size_); + MemorySegmentUtils::CopyToUnsafe({segment_}, element_offset_, + const_cast(static_cast(values.data())), + size_ * sizeof(int32_t)); + return values; +} + +Result> BinaryArray::ToLongArray() const { + PAIMON_RETURN_NOT_OK(CheckNoNull()); + std::vector values; + values.resize(size_); + MemorySegmentUtils::CopyToUnsafe({segment_}, element_offset_, + const_cast(static_cast(values.data())), + size_ * sizeof(int64_t)); + return values; +} + +Result> BinaryArray::ToFloatArray() const { + PAIMON_RETURN_NOT_OK(CheckNoNull()); + std::vector values; + values.resize(size_); + MemorySegmentUtils::CopyToUnsafe({segment_}, element_offset_, + const_cast(static_cast(values.data())), + size_ * sizeof(float)); + return values; +} + +Result> BinaryArray::ToDoubleArray() const { + PAIMON_RETURN_NOT_OK(CheckNoNull()); + std::vector values; + values.resize(size_); + MemorySegmentUtils::CopyToUnsafe({segment_}, element_offset_, + const_cast(static_cast(values.data())), + size_ * sizeof(double)); + return values; +} + +BinaryArray BinaryArray::Copy(MemoryPool* pool) const { + BinaryArray array; + Copy(&array, pool); + return array; +} + +void BinaryArray::Copy(BinaryArray* reuse, MemoryPool* pool) const { + std::shared_ptr bytes = + MemorySegmentUtils::CopyToBytes({segment_}, offset_, size_in_bytes_, pool); + reuse->PointTo(MemorySegment::Wrap(bytes), 0, size_in_bytes_); +} + +BinaryArray BinaryArray::FromIntArray(const std::vector& arr, MemoryPool* pool) { + BinaryArray array; + BinaryArrayWriter writer = BinaryArrayWriter(&array, arr.size(), sizeof(int32_t), pool); + for (size_t i = 0; i < arr.size(); i++) { + int32_t v = arr[i]; + writer.WriteInt(i, v); + } + writer.Complete(); + return array; +} + +BinaryArray BinaryArray::FromLongArray(const std::vector& arr, MemoryPool* pool) { + BinaryArray array; + BinaryArrayWriter writer = BinaryArrayWriter(&array, arr.size(), sizeof(int64_t), pool); + for (size_t i = 0; i < arr.size(); i++) { + int64_t v = arr[i]; + writer.WriteLong(i, v); + } + writer.Complete(); + return array; +} + +BinaryArray BinaryArray::FromLongArray(const InternalArray* arr, MemoryPool* pool) { + assert(arr); + auto cast_array = dynamic_cast(arr); + if (cast_array) { + return *cast_array; + } + + BinaryArray array; + BinaryArrayWriter writer = BinaryArrayWriter(&array, arr->Size(), 8, pool); + std::vector is_null(arr->Size(), false); + // accessing the null bit first makes memory access more concentrated + for (int32_t i = 0; i < arr->Size(); i++) { + is_null[i] = arr->IsNullAt(i); + } + for (int32_t i = 0; i < arr->Size(); i++) { + if (is_null[i]) { + writer.SetNullValue(i); + } else { + writer.WriteLong(i, arr->GetLong(i)); + } + } + writer.Complete(); + return array; +} + +} // namespace paimon diff --git a/src/paimon/common/data/binary_array.h b/src/paimon/common/data/binary_array.h new file mode 100644 index 0000000..c505ce7 --- /dev/null +++ b/src/paimon/common/data/binary_array.h @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include + +#include "paimon/common/data/binary_section.h" +#include "paimon/common/data/binary_string.h" +#include "paimon/common/data/internal_array.h" +#include "paimon/common/data/internal_map.h" +#include "paimon/common/data/internal_row.h" +#include "paimon/common/memory/memory_segment_utils.h" +#include "paimon/data/decimal.h" +#include "paimon/data/timestamp.h" +#include "paimon/memory/bytes.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/result.h" +#include "paimon/status.h" + +namespace paimon { +class MemorySegment; + +/// A binary implementation of `InternalArray` which is backed by a single `MemorySegment`. +/// For fields that hold fixed-length primitive types, such as long, double or int, they are +/// stored compacted in bytes, just like the original C array. +/// +/// @note: Unlike the Java implementation where data may span multiple MemorySegments, +/// in this C++ implementation all data resides within a single MemorySegment. +/// +/// The binary layout of BinaryArray: +/// [size(int)] + [null bits(4-byte word boundaries)] + [values or offset&length] + [variable length +/// part]. + +class BinaryArray final : public BinarySection, public InternalArray { + public: + BinaryArray() = default; + + static int32_t CalculateHeaderInBytes(int32_t num_fields); + int32_t Size() const override { + return size_; + } + void PointTo(const MemorySegment& segment, int32_t offset, int32_t size_in_bytes) override; + bool IsNullAt(int32_t pos) const override; + + bool GetBoolean(int32_t pos) const override; + char GetByte(int32_t pos) const override; + int16_t GetShort(int32_t pos) const override; + int32_t GetInt(int32_t pos) const override; + int32_t GetDate(int32_t pos) const override; + int64_t GetLong(int32_t pos) const override; + float GetFloat(int32_t pos) const override; + double GetDouble(int32_t pos) const override; + BinaryString GetString(int32_t pos) const override; + std::string_view GetStringView(int32_t pos) const override; + + Decimal GetDecimal(int32_t pos, int32_t precision, int32_t scale) const override; + Timestamp GetTimestamp(int32_t pos, int32_t precision) const override; + std::shared_ptr GetBinary(int32_t pos) const override; + std::shared_ptr GetArray(int32_t pos) const override; + std::shared_ptr GetMap(int32_t pos) const override; + std::shared_ptr GetRow(int32_t pos, int32_t num_fields) const override; + + bool AnyNull() const; + + Result> ToBooleanArray() const override; + Result> ToByteArray() const override; + Result> ToShortArray() const override; + Result> ToIntArray() const override; + Result> ToLongArray() const override; + Result> ToFloatArray() const override; + Result> ToDoubleArray() const override; + + BinaryArray Copy(MemoryPool* pool) const; + void Copy(BinaryArray* reuse, MemoryPool* pool) const; + + int32_t HashCode() const override { + return MemorySegmentUtils::HashByWords({segment_}, offset_, size_in_bytes_, + GetDefaultPool().get()); + } + + static BinaryArray FromIntArray(const std::vector& arr, MemoryPool* pool); + static BinaryArray FromLongArray(const std::vector& arr, MemoryPool* pool); + static BinaryArray FromLongArray(const InternalArray* arr, MemoryPool* pool); + + private: + void AssertIndexIsValid(int32_t ordinal) const; + int32_t GetElementOffset(int32_t ordinal, int32_t element_size) const; + Status CheckNoNull() const { + if (AnyNull()) { + return Status::Invalid("Primitive array must not contain a null value."); + } + return Status::OK(); + } + + private: + /// The number of elements in this array. + int32_t size_ = 0; + /// The position to start storing array elements. + int32_t element_offset_ = -1; +}; + +} // namespace paimon diff --git a/src/paimon/common/data/binary_array_test.cpp b/src/paimon/common/data/binary_array_test.cpp new file mode 100644 index 0000000..06a3cad --- /dev/null +++ b/src/paimon/common/data/binary_array_test.cpp @@ -0,0 +1,518 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "paimon/common/data/binary_array.h" + +#include +#include +#include + +#include "arrow/api.h" +#include "arrow/array/array_nested.h" +#include "arrow/ipc/json_simple.h" +#include "arrow/util/checked_cast.h" +#include "gtest/gtest.h" +#include "paimon/common/data/binary_array_writer.h" +#include "paimon/common/data/binary_map.h" +#include "paimon/common/data/columnar/columnar_array.h" +#include "paimon/common/utils/date_time_utils.h" +#include "paimon/testing/utils/binary_row_generator.h" +#include "paimon/testing/utils/testharness.h" +namespace paimon::test { + +TEST(BinaryArrayTest, TestBinaryArraySimple) { + auto pool = GetDefaultPool(); + int64_t seed = DateTimeUtils::GetCurrentUTCTimeUs(); + std::srand(seed); + std::vector values(100, 0); + for (auto& value : values) { + value = paimon::test::RandomNumber(0, 10000000); + } + auto binary_array = BinaryArray::FromLongArray(values, pool.get()); + ASSERT_EQ(values.size(), binary_array.Size()) << "seed:" << seed; + for (size_t i = 0; i < values.size(); i++) { + ASSERT_EQ(values[i], binary_array.GetLong(i)) << "seed:" << seed << ", idx: " << i; + } + + ASSERT_FALSE(binary_array.IsNullAt(40)) << "seed:" << seed; + ASSERT_FALSE(binary_array.IsNullAt(70)) << "seed:" << seed; + auto res = binary_array.ToLongArray(); + ASSERT_EQ(res.value(), values); +} + +TEST(BinaryArrayTest, TestSetAndGet) { + auto pool = GetDefaultPool(); + { + std::vector arr = {true, false}; + BinaryArray array; + BinaryArrayWriter writer = BinaryArrayWriter(&array, arr.size(), sizeof(bool), pool.get()); + for (size_t i = 0; i < arr.size(); i++) { + writer.WriteBoolean(i, arr[i]); + } + writer.Complete(); + ASSERT_EQ(true, array.GetBoolean(0)); + ASSERT_EQ(false, array.GetBoolean(1)); + ASSERT_OK_AND_ASSIGN(auto res, array.ToBooleanArray()); + std::vector char_arr = {1, 0}; + ASSERT_EQ(res, char_arr); + } + { + std::vector arr = {1, 2, 3, 4, 5}; + BinaryArray array; + BinaryArrayWriter writer = + BinaryArrayWriter(&array, arr.size(), sizeof(int8_t), pool.get()); + for (size_t i = 0; i < arr.size(); i++) { + writer.WriteByte(i, arr[i]); + } + writer.Complete(); + ASSERT_EQ(1, array.GetByte(0)); + ASSERT_EQ(5, array.GetByte(4)); + ASSERT_OK_AND_ASSIGN(auto res, array.ToByteArray()); + std::vector char_arr = {1, 2, 3, 4, 5}; + ASSERT_EQ(res, char_arr); + } + { + std::vector arr = {1, 2, 3, 4, 5}; + BinaryArray array; + BinaryArrayWriter writer = + BinaryArrayWriter(&array, arr.size(), sizeof(int16_t), pool.get()); + for (size_t i = 0; i < arr.size(); i++) { + writer.WriteShort(i, arr[i]); + } + writer.Complete(); + ASSERT_EQ(1, array.GetShort(0)); + ASSERT_EQ(5, array.GetShort(4)); + ASSERT_OK_AND_ASSIGN(auto res, array.ToShortArray()); + ASSERT_EQ(res, arr); + } + { + std::vector arr = {1, 2, 3, 4, 5}; + BinaryArray array; + BinaryArrayWriter writer = + BinaryArrayWriter(&array, arr.size(), sizeof(int32_t), pool.get()); + for (size_t i = 0; i < arr.size(); i++) { + writer.WriteInt(i, arr[i]); + } + writer.Complete(); + ASSERT_EQ(1, array.GetInt(0)); + ASSERT_EQ(5, array.GetInt(4)); + ASSERT_OK_AND_ASSIGN(auto res, array.ToIntArray()); + ASSERT_EQ(res, arr); + } + { + // test date + std::vector arr = {10000, 20006}; + BinaryArray array; + BinaryArrayWriter writer = + BinaryArrayWriter(&array, arr.size(), sizeof(int32_t), pool.get()); + for (size_t i = 0; i < arr.size(); i++) { + writer.WriteInt(i, arr[i]); + } + writer.Complete(); + ASSERT_EQ(10000, array.GetDate(0)); + ASSERT_EQ(20006, array.GetDate(1)); + ASSERT_OK_AND_ASSIGN(auto res, array.ToIntArray()); + ASSERT_EQ(res, arr); + } + { + std::vector arr = {1.0, 2.0}; + BinaryArray array; + BinaryArrayWriter writer = BinaryArrayWriter(&array, arr.size(), sizeof(float), pool.get()); + for (size_t i = 0; i < arr.size(); i++) { + writer.WriteFloat(i, arr[i]); + } + writer.Complete(); + ASSERT_EQ(1.0, array.GetFloat(0)); + ASSERT_EQ(2.0, array.GetFloat(1)); + ASSERT_OK_AND_ASSIGN(auto res, array.ToFloatArray()); + ASSERT_EQ(res, arr); + } + { + std::vector arr = {1.0, 2.0}; + BinaryArray array; + BinaryArrayWriter writer = + BinaryArrayWriter(&array, arr.size(), sizeof(double), pool.get()); + for (size_t i = 0; i < arr.size(); i++) { + writer.WriteDouble(i, arr[i]); + } + writer.Complete(); + ASSERT_EQ(2, writer.GetNumElements()); + ASSERT_EQ(1.0, array.GetDouble(0)); + ASSERT_EQ(2.0, array.GetDouble(1)); + ASSERT_OK_AND_ASSIGN(auto res, array.ToDoubleArray()); + ASSERT_EQ(res, arr); + } + // decimal + { + // not compact (precision <= 18) + std::vector arr = {Decimal(6, 2, 123456), Decimal(6, 3, 123456)}; + BinaryArray array; + BinaryArrayWriter writer = BinaryArrayWriter(&array, arr.size(), 8, pool.get()); + for (size_t i = 0; i < arr.size(); i++) { + writer.WriteDecimal(i, arr[i], 6); + } + writer.Complete(); + ASSERT_EQ(arr[0], array.GetDecimal(0, 6, 2)); + ASSERT_EQ(arr[1], array.GetDecimal(1, 6, 3)); + } + { + // compact (precision > 18) + std::vector arr = {Decimal(/*precision=*/20, /*scale=*/3, 123456), + Decimal(/*precision=*/20, /*scale=*/3, 123456789)}; + BinaryArray array; + BinaryArrayWriter writer = BinaryArrayWriter(&array, arr.size(), 8, pool.get()); + for (size_t i = 0; i < arr.size(); i++) { + writer.WriteDecimal(i, arr[i], /*precision=*/20); + } + writer.Complete(); + ASSERT_EQ(arr[0], array.GetDecimal(0, /*precision=*/20, /*scale=*/3)); + ASSERT_EQ(arr[1], array.GetDecimal(1, /*precision=*/20, /*scale=*/3)); + } + // timestamp + { + // not compact (precision > 3) + std::vector arr = {Timestamp(0, 0), Timestamp(12345, 1)}; + BinaryArray array; + BinaryArrayWriter writer = BinaryArrayWriter(&array, arr.size(), 8, pool.get()); + for (size_t i = 0; i < arr.size(); i++) { + writer.WriteTimestamp(i, arr[i], 9); + } + writer.Complete(); + ASSERT_EQ(arr[0], array.GetTimestamp(0, 9)); + ASSERT_EQ(arr[1], array.GetTimestamp(1, 9)); + } + { + // compact (precision <= 3) + std::vector arr = {Timestamp(0, 0), Timestamp(12345, 0)}; + BinaryArray array; + BinaryArrayWriter writer = BinaryArrayWriter(&array, arr.size(), 8, pool.get()); + for (size_t i = 0; i < arr.size(); i++) { + writer.WriteTimestamp(i, arr[i], 3); + } + writer.Complete(); + ASSERT_EQ(arr[0], array.GetTimestamp(0, 3)); + ASSERT_EQ(arr[1], array.GetTimestamp(1, 3)); + } + // binary + { + std::vector arr; + arr.emplace_back("hello", pool.get()); + arr.emplace_back("world", pool.get()); + BinaryArray array; + BinaryArrayWriter writer = BinaryArrayWriter(&array, arr.size(), 8, pool.get()); + for (size_t i = 0; i < arr.size(); i++) { + writer.WriteBinary(i, arr[i]); + } + writer.Complete(); + ASSERT_EQ(arr[0], *array.GetBinary(0)); + ASSERT_EQ(arr[1], *array.GetBinary(1)); + } + // array + { + // element1 + std::vector arr1 = {1, 2}; + BinaryArray array1; + BinaryArrayWriter writer1 = + BinaryArrayWriter(&array1, arr1.size(), sizeof(int32_t), pool.get()); + for (size_t i = 0; i < arr1.size(); i++) { + writer1.WriteInt(i, arr1[i]); + } + writer1.Complete(); + // element2 + std::vector arr2 = {100, 200}; + BinaryArray array2; + BinaryArrayWriter writer2 = + BinaryArrayWriter(&array2, arr2.size(), sizeof(int32_t), pool.get()); + for (size_t i = 0; i < arr2.size(); i++) { + writer2.WriteInt(i, arr2[i]); + } + writer2.Complete(); + // array + std::vector arr; + arr.push_back(array1); + arr.push_back(array2); + BinaryArray array; + BinaryArrayWriter writer = BinaryArrayWriter(&array, arr.size(), 8, pool.get()); + for (size_t i = 0; i < arr.size(); i++) { + writer.WriteArray(i, arr[i]); + } + writer.Complete(); + ASSERT_EQ(arr[0].ToIntArray().value(), array.GetArray(0)->ToIntArray().value()); + ASSERT_EQ(arr[1].ToIntArray().value(), array.GetArray(1)->ToIntArray().value()); + } + // row + { + BinaryRow row1 = BinaryRowGenerator::GenerateRow( + {std::string("Alice"), 30, 12.1, NullType()}, pool.get()); + BinaryRow row2 = + BinaryRowGenerator::GenerateRow({std::string("Bob"), 40, 14.1, NullType()}, pool.get()); + BinaryArray array; + BinaryArrayWriter writer = + BinaryArrayWriter(&array, /*num_elements=*/2, /*element_size=*/8, pool.get()); + writer.WriteRow(0, row1); + writer.WriteRow(1, row2); + writer.Complete(); + + ASSERT_EQ(2, array.Size()); + auto de_row1 = array.GetRow(0, 4); + ASSERT_EQ(row1, *(std::dynamic_pointer_cast(de_row1))); + auto de_row2 = array.GetRow(1, 4); + ASSERT_EQ(row2, *(std::dynamic_pointer_cast(de_row2))); + } + // map + { + auto key = BinaryArray::FromIntArray({1, 2, 3, 5}, pool.get()); + auto value = BinaryArray::FromLongArray({100ll, 200ll, 300ll, 500ll}, pool.get()); + auto map = BinaryMap::ValueOf(key, value, pool.get()); + BinaryArray array; + BinaryArrayWriter writer = + BinaryArrayWriter(&array, /*num_elements=*/1, /*element_size=*/8, pool.get()); + writer.WriteMap(0, *map); + writer.Complete(); + + ASSERT_EQ(1, array.Size()); + auto de_map = array.GetMap(0); + ASSERT_EQ(key.HashCode(), + std::dynamic_pointer_cast(de_map->KeyArray())->HashCode()); + ASSERT_EQ(value.HashCode(), + std::dynamic_pointer_cast(de_map->ValueArray())->HashCode()); + } +} + +TEST(BinaryArrayTest, TestCopy) { + auto pool = GetDefaultPool(); + + std::vector arr = {true, false}; + BinaryArray array; + BinaryArrayWriter writer = BinaryArrayWriter(&array, arr.size(), sizeof(bool), pool.get()); + for (size_t i = 0; i < arr.size(); i++) { + writer.WriteBoolean(i, arr[i]); + } + writer.Complete(); + + auto copy_array = array.Copy(pool.get()); + ASSERT_EQ(array.ToBooleanArray().value(), copy_array.ToBooleanArray().value()); +} + +TEST(BinaryArrayTest, TestNullValue) { + auto pool = GetDefaultPool(); + std::vector arr = {1, 2, 3, 4, 5}; + BinaryArray array; + BinaryArrayWriter writer = + BinaryArrayWriter(&array, arr.size() + 2, sizeof(int64_t), pool.get()); + for (size_t i = 0; i < arr.size(); i++) { + writer.WriteLong(i, arr[i]); + } + // last two element is null + writer.SetNullValue(5); + writer.SetNullAt(6); + writer.Complete(); + ASSERT_TRUE(array.AnyNull()); + + auto ret = BinaryArray::FromLongArray(&array, pool.get()); + ASSERT_EQ(ret, array); +} + +TEST(BinaryArrayTest, TestFromLongArray) { + auto pool = GetDefaultPool(); + auto f1 = arrow::ipc::internal::json::ArrayFromJSON(arrow::list(arrow::int64()), + R"([[123, null], [789], [12345], [12]])") + .ValueOrDie(); + auto list_array = arrow::internal::checked_pointer_cast(f1); + auto array = ColumnarArray(list_array->values(), pool, /*offset=*/0, 2); + + BinaryArray ret = BinaryArray::FromLongArray(&array, pool.get()); + + BinaryArray expected_array; + BinaryArrayWriter writer = BinaryArrayWriter(&expected_array, 2, sizeof(int64_t), pool.get()); + writer.Reset(); + writer.WriteLong(0, 123); + writer.SetNullAt(1); + writer.Complete(); + + ASSERT_EQ(ret, expected_array); + + ASSERT_NOK_WITH_MSG(expected_array.ToLongArray(), + "Primitive array must not contain a null value."); +} + +TEST(BinaryArrayTest, TestFromIntArray) { + auto pool = GetDefaultPool(); + std::vector values = {10, 20, 30, 100}; + BinaryArray array = BinaryArray::FromIntArray(values, pool.get()); + ASSERT_OK_AND_ASSIGN(std::vector result, array.ToIntArray()); + ASSERT_EQ(result, values); +} + +TEST(BinaryArrayTest, TestFromAllNullLongArray) { + auto pool = GetDefaultPool(); + auto f1 = arrow::ipc::internal::json::ArrayFromJSON(arrow::list(arrow::int64()), + R"([[null, null], [789], [12345], [12]])") + .ValueOrDie(); + auto list_array = arrow::internal::checked_pointer_cast(f1); + auto array = ColumnarArray(list_array->values(), pool, /*offset=*/0, 2); + + BinaryArray ret = BinaryArray::FromLongArray(&array, pool.get()); + + BinaryArray expected_array; + BinaryArrayWriter writer = BinaryArrayWriter(&expected_array, 2, sizeof(int64_t), pool.get()); + writer.Reset(); + writer.SetNullAt(0); + writer.SetNullAt(1); + writer.Complete(); + + ASSERT_EQ(ret, expected_array); + + ASSERT_NOK_WITH_MSG(expected_array.ToLongArray(), + "Primitive array must not contain a null value."); +} +TEST(BinaryArrayTest, TestReset) { + auto pool = GetDefaultPool(); + std::vector arr = {1, 2, 3, 4, 5}; + BinaryArray array; + BinaryArrayWriter writer = BinaryArrayWriter(&array, arr.size(), sizeof(int64_t), pool.get()); + writer.Reset(); + for (size_t i = 0; i < arr.size(); i++) { + writer.WriteLong(i, arr[i]); + } + writer.Complete(); + ASSERT_EQ(arr, array.ToLongArray().value()); +} + +TEST(BinaryArrayTest, TestGetElementSize) { + ASSERT_EQ(sizeof(bool), BinaryArrayWriter::GetElementSize(arrow::Type::type::BOOL)); + ASSERT_EQ(sizeof(int8_t), BinaryArrayWriter::GetElementSize(arrow::Type::type::INT8)); + ASSERT_EQ(sizeof(int16_t), BinaryArrayWriter::GetElementSize(arrow::Type::type::INT16)); + ASSERT_EQ(sizeof(int32_t), BinaryArrayWriter::GetElementSize(arrow::Type::type::INT32)); + ASSERT_EQ(sizeof(int32_t), BinaryArrayWriter::GetElementSize(arrow::Type::type::DATE32)); + ASSERT_EQ(sizeof(int64_t), BinaryArrayWriter::GetElementSize(arrow::Type::type::INT64)); + ASSERT_EQ(sizeof(float), BinaryArrayWriter::GetElementSize(arrow::Type::type::FLOAT)); + ASSERT_EQ(sizeof(double), BinaryArrayWriter::GetElementSize(arrow::Type::type::DOUBLE)); + // default cases: variable-length types use 8 bytes (offset + length) + ASSERT_EQ(8, BinaryArrayWriter::GetElementSize(arrow::Type::type::STRING)); + ASSERT_EQ(8, BinaryArrayWriter::GetElementSize(arrow::Type::type::BINARY)); + ASSERT_EQ(8, BinaryArrayWriter::GetElementSize(arrow::Type::type::TIMESTAMP)); + ASSERT_EQ(8, BinaryArrayWriter::GetElementSize(arrow::Type::type::DECIMAL128)); +} + +TEST(BinaryArrayTest, TestSetNullAtWithArrowType) { + auto pool = GetDefaultPool(); + + { + // BOOL + BinaryArray array; + BinaryArrayWriter writer(&array, 2, sizeof(bool), pool.get()); + writer.WriteBoolean(0, true); + writer.SetNullAt(1, arrow::Type::type::BOOL); + writer.Complete(); + ASSERT_FALSE(array.IsNullAt(0)); + ASSERT_TRUE(array.GetBoolean(0)); + ASSERT_TRUE(array.IsNullAt(1)); + } + { + // INT8 + BinaryArray array; + BinaryArrayWriter writer(&array, 2, sizeof(int8_t), pool.get()); + writer.WriteByte(0, 42); + writer.SetNullAt(1, arrow::Type::type::INT8); + writer.Complete(); + ASSERT_FALSE(array.IsNullAt(0)); + ASSERT_EQ(42, array.GetByte(0)); + ASSERT_TRUE(array.IsNullAt(1)); + } + { + // INT16 + BinaryArray array; + BinaryArrayWriter writer(&array, 2, sizeof(int16_t), pool.get()); + writer.WriteShort(0, 1000); + writer.SetNullAt(1, arrow::Type::type::INT16); + writer.Complete(); + ASSERT_FALSE(array.IsNullAt(0)); + ASSERT_EQ(1000, array.GetShort(0)); + ASSERT_TRUE(array.IsNullAt(1)); + } + { + // INT32 + BinaryArray array; + BinaryArrayWriter writer(&array, 2, sizeof(int32_t), pool.get()); + writer.WriteInt(0, 100000); + writer.SetNullAt(1, arrow::Type::type::INT32); + writer.Complete(); + ASSERT_FALSE(array.IsNullAt(0)); + ASSERT_EQ(100000, array.GetInt(0)); + ASSERT_TRUE(array.IsNullAt(1)); + } + { + // DATE32 + BinaryArray array; + BinaryArrayWriter writer(&array, 2, sizeof(int32_t), pool.get()); + writer.WriteInt(0, 19000); + writer.SetNullAt(1, arrow::Type::type::DATE32); + writer.Complete(); + ASSERT_FALSE(array.IsNullAt(0)); + ASSERT_EQ(19000, array.GetDate(0)); + ASSERT_TRUE(array.IsNullAt(1)); + } + { + // INT64 + BinaryArray array; + BinaryArrayWriter writer(&array, 2, sizeof(int64_t), pool.get()); + writer.WriteLong(0, 123456789L); + writer.SetNullAt(1, arrow::Type::type::INT64); + writer.Complete(); + ASSERT_FALSE(array.IsNullAt(0)); + ASSERT_EQ(123456789L, array.GetLong(0)); + ASSERT_TRUE(array.IsNullAt(1)); + } + { + // FLOAT + BinaryArray array; + BinaryArrayWriter writer(&array, 2, sizeof(float), pool.get()); + writer.WriteFloat(0, 3.14f); + writer.SetNullAt(1, arrow::Type::type::FLOAT); + writer.Complete(); + ASSERT_FALSE(array.IsNullAt(0)); + ASSERT_FLOAT_EQ(3.14f, array.GetFloat(0)); + ASSERT_TRUE(array.IsNullAt(1)); + } + { + // DOUBLE + BinaryArray array; + BinaryArrayWriter writer(&array, 2, sizeof(double), pool.get()); + writer.WriteDouble(0, 2.718); + writer.SetNullAt(1, arrow::Type::type::DOUBLE); + writer.Complete(); + ASSERT_FALSE(array.IsNullAt(0)); + ASSERT_DOUBLE_EQ(2.718, array.GetDouble(0)); + ASSERT_TRUE(array.IsNullAt(1)); + } + { + // STRING (default path, uses 8-byte null) + BinaryArray array; + BinaryArrayWriter writer(&array, 2, 8, pool.get()); + writer.WriteString(0, BinaryString::FromString("hello", pool.get())); + writer.SetNullAt(1, arrow::Type::type::STRING); + writer.Complete(); + ASSERT_FALSE(array.IsNullAt(0)); + ASSERT_EQ("hello", std::string(array.GetStringView(0))); + ASSERT_TRUE(array.IsNullAt(1)); + } +} + +} // namespace paimon::test diff --git a/src/paimon/common/data/binary_array_writer.cpp b/src/paimon/common/data/binary_array_writer.cpp new file mode 100644 index 0000000..7845311 --- /dev/null +++ b/src/paimon/common/data/binary_array_writer.cpp @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "paimon/common/data/binary_array_writer.h" + +#include +#include + +#include "paimon/common/data/binary_array.h" +#include "paimon/common/memory/memory_segment_utils.h" +#include "paimon/memory/bytes.h" + +namespace paimon { +class MemoryPool; + +BinaryArrayWriter::BinaryArrayWriter(BinaryArray* array, int32_t num_elements, int32_t element_size, + MemoryPool* pool) { + null_bits_size_in_bytes_ = BinaryArray::CalculateHeaderInBytes(num_elements); + fixed_size_ = + RoundNumberOfBytesToNearestWord(null_bits_size_in_bytes_ + element_size * num_elements); + cursor_ = fixed_size_; + num_elements_ = num_elements; + auto bytes = Bytes::AllocateBytes(fixed_size_, pool); + segment_ = MemorySegment::Wrap(std::move(bytes)); + segment_.PutValue(0, num_elements); + array_ = array; + pool_ = pool; +} + +void BinaryArrayWriter::Reset() { + cursor_ = fixed_size_; + for (int32_t i = 0; i < null_bits_size_in_bytes_; i += 8) { + segment_.PutValue(i, 0L); + } + segment_.PutValue(0, num_elements_); +} + +void BinaryArrayWriter::SetNullBit(int32_t ordinal) { + MemorySegmentUtils::BitSet(&segment_, 4, ordinal); +} + +void BinaryArrayWriter::SetNullAt(int32_t ordinal) { + SetNullValue(ordinal); +} + +void BinaryArrayWriter::SetNullAt(int32_t ordinal, const arrow::Type::type& arrow_type) { + switch (arrow_type) { + case arrow::Type::type::BOOL: + SetNullValue(ordinal); + return; + case arrow::Type::type::INT8: + SetNullValue(ordinal); + return; + case arrow::Type::type::INT16: + SetNullValue(ordinal); + return; + case arrow::Type::type::DATE32: + case arrow::Type::type::INT32: + SetNullValue(ordinal); + return; + case arrow::Type::type::INT64: + SetNullValue(ordinal); + return; + case arrow::Type::type::FLOAT: + SetNullValue(ordinal); + return; + case arrow::Type::type::DOUBLE: + SetNullValue(ordinal); + return; + default: + SetNullAt(ordinal); + return; + } +} + +int32_t BinaryArrayWriter::GetElementOffset(int32_t pos, int32_t element_size) const { + return null_bits_size_in_bytes_ + element_size * pos; +} + +int32_t BinaryArrayWriter::GetFieldOffset(int32_t pos) const { + return GetElementOffset(pos, 8); +} + +void BinaryArrayWriter::SetOffsetAndSize(int32_t pos, int32_t offset, int64_t size) { + const int64_t offset_and_size = (static_cast(offset) << 32) | size; + segment_.PutValue(GetElementOffset(pos, 8), offset_and_size); +} + +void BinaryArrayWriter::AfterGrow() { + array_->PointTo(segment_, 0, segment_.Size()); +} + +void BinaryArrayWriter::Complete() { + array_->PointTo(segment_, 0, cursor_); +} + +int32_t BinaryArrayWriter::GetNumElements() const { + return num_elements_; +} + +int32_t BinaryArrayWriter::GetElementSize(const arrow::Type::type& arrow_type) { + switch (arrow_type) { + case arrow::Type::type::BOOL: + return sizeof(bool); + case arrow::Type::type::INT8: + return sizeof(int8_t); + case arrow::Type::type::INT16: + return sizeof(int16_t); + case arrow::Type::type::DATE32: + case arrow::Type::type::INT32: + return sizeof(int32_t); + case arrow::Type::type::INT64: + return sizeof(int64_t); + case arrow::Type::type::FLOAT: + return sizeof(float); + case arrow::Type::type::DOUBLE: + return sizeof(double); + default: + return 8; + } +} + +} // namespace paimon diff --git a/src/paimon/common/data/binary_array_writer.h b/src/paimon/common/data/binary_array_writer.h new file mode 100644 index 0000000..6b214d9 --- /dev/null +++ b/src/paimon/common/data/binary_array_writer.h @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include + +#include "arrow/api.h" +#include "paimon/common/data/abstract_binary_writer.h" +#include "paimon/common/memory/memory_segment.h" +namespace paimon { +class BinaryArray; +class MemoryPool; + +/// Writer for binary array. See `BinaryArray`. +class BinaryArrayWriter : public AbstractBinaryWriter { + public: + BinaryArrayWriter(BinaryArray* array, int32_t num_elements, int32_t element_size, + MemoryPool* pool); + + static int32_t GetElementSize(const arrow::Type::type& arrow_type); + + /// First, call `Reset()` before set/write value. + void Reset() override; + void SetNullBit(int32_t ordinal) override; + + template + void SetNullValue(int32_t ordinal) { + SetNullBit(ordinal); + // put zero into the corresponding field when set null + segment_.PutValue(GetElementOffset(ordinal, sizeof(T)), static_cast(0)); + } + + void SetNullAt(int32_t ordinal, const arrow::Type::type& arrow_type); + + void WriteBoolean(int32_t pos, bool value) override { + segment_.PutValue(GetElementOffset(pos, 1), value); + } + + void WriteByte(int32_t pos, int8_t value) override { + segment_.PutValue(GetElementOffset(pos, 1), value); + } + + void WriteShort(int32_t pos, int16_t value) override { + segment_.PutValue(GetElementOffset(pos, 2), value); + } + + void WriteInt(int32_t pos, int32_t value) override { + segment_.PutValue(GetElementOffset(pos, 4), value); + } + + void WriteLong(int32_t pos, int64_t value) override { + segment_.PutValue(GetElementOffset(pos, 8), value); + } + + void WriteFloat(int32_t pos, float value) override { + segment_.PutValue(GetElementOffset(pos, 4), value); + } + + void WriteDouble(int32_t pos, double value) override { + segment_.PutValue(GetElementOffset(pos, 8), value); + } + + void SetNullAt(int32_t ordinal) override; + int32_t GetFieldOffset(int32_t pos) const override; + void SetOffsetAndSize(int32_t pos, int32_t offset, int64_t size) override; + void AfterGrow() override; + /// Finally, complete write to set real size to row. + void Complete() override; + int32_t GetNumElements() const; + + private: + int32_t GetElementOffset(int32_t pos, int32_t element_size) const; + + private: + int32_t null_bits_size_in_bytes_; + BinaryArray* array_; + int32_t num_elements_; + int32_t fixed_size_; +}; + +} // namespace paimon diff --git a/src/paimon/common/data/binary_map.h b/src/paimon/common/data/binary_map.h new file mode 100644 index 0000000..7169461 --- /dev/null +++ b/src/paimon/common/data/binary_map.h @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include "paimon/common/data/binary_array.h" +#include "paimon/common/memory/memory_segment.h" +#include "paimon/common/memory/memory_segment_utils.h" +namespace paimon { +/// A binary implementation of `InternalMap` which is backed by a single `MemorySegment`. +/// Binary layout: [4 byte(keyArray size in bytes)] + [Key BinaryArray] + [Value BinaryArray]. +/// `BinaryMap` is influenced by Apache Spark UnsafeMapData. +/// +/// @note: Unlike the Java implementation where data may span multiple MemorySegments, +/// in this C++ implementation all data resides within a single MemorySegment. +class BinaryMap : public BinarySection, public InternalMap { + public: + BinaryMap() = default; + + int32_t Size() const override { + return keys_->Size(); + } + + std::shared_ptr KeyArray() const override { + return keys_; + } + std::shared_ptr ValueArray() const override { + return values_; + } + + void PointTo(const MemorySegment& segment, int32_t offset, int32_t size_in_bytes) override { + // Read the numBytes of key array from the first 4 bytes. + auto key_array_bytes = MemorySegmentUtils::GetValue({segment}, offset); + assert(key_array_bytes >= 0); + int32_t value_array_bytes = size_in_bytes - key_array_bytes - kHeaderSize; + assert(value_array_bytes >= 0); + + assert(keys_); + keys_->PointTo(segment, offset + kHeaderSize, key_array_bytes); + assert(values_); + values_->PointTo(segment, offset + kHeaderSize + key_array_bytes, value_array_bytes); + + assert(keys_->Size() == values_->Size()); + + segment_ = segment; + offset_ = offset; + size_in_bytes_ = size_in_bytes; + } + + static std::shared_ptr ValueOf(const BinaryArray& key, const BinaryArray& value, + MemoryPool* pool) { + auto bytes = std::make_shared( + kHeaderSize + key.GetSizeInBytes() + value.GetSizeInBytes(), pool); + MemorySegment segment = MemorySegment::Wrap(bytes); + segment.PutValue(0, key.GetSizeInBytes()); + const auto& key_segment = key.GetSegment(); + key_segment.CopyTo(key.GetOffset(), &segment, /*target_offset=*/kHeaderSize, + key.GetSizeInBytes()); + const auto& value_segment = value.GetSegment(); + value_segment.CopyTo(value.GetOffset(), &segment, + /*target_offset=*/kHeaderSize + key.GetSizeInBytes(), + value.GetSizeInBytes()); + auto binary_map = std::make_shared(); + binary_map->PointTo(segment, /*offset=*/0, bytes->size()); + return binary_map; + } + + private: + static constexpr int32_t kHeaderSize = sizeof(int32_t); + + std::shared_ptr keys_ = std::make_shared(); + std::shared_ptr values_ = std::make_shared(); +}; +} // namespace paimon diff --git a/src/paimon/common/data/binary_map_test.cpp b/src/paimon/common/data/binary_map_test.cpp new file mode 100644 index 0000000..e27edee --- /dev/null +++ b/src/paimon/common/data/binary_map_test.cpp @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "paimon/common/data/binary_map.h" + +#include "gtest/gtest.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { +TEST(BinaryMapTest, TestSimple) { + auto pool = GetDefaultPool(); + auto key = BinaryArray::FromIntArray({1, 2, 3, 5}, pool.get()); + auto value = BinaryArray::FromLongArray({100ll, 200ll, 300ll, 500ll}, pool.get()); + + auto binary_map = BinaryMap::ValueOf(key, value, pool.get()); + + ASSERT_EQ(binary_map->Size(), 4); + auto key_in_map = std::dynamic_pointer_cast(binary_map->KeyArray()); + auto value_in_map = std::dynamic_pointer_cast(binary_map->ValueArray()); + ASSERT_EQ(key_in_map->HashCode(), key.HashCode()); + ASSERT_EQ(value_in_map->HashCode(), value.HashCode()); + ASSERT_EQ(key_in_map->ToIntArray().value(), std::vector({1, 2, 3, 5})); + ASSERT_EQ(value_in_map->ToLongArray().value(), + std::vector({100ll, 200ll, 300ll, 500ll})); +} +} // namespace paimon::test From f815bd8361e32f59114cc3e9b75726cbcd28abdf Mon Sep 17 00:00:00 2001 From: "lisizhuo.lsz" Date: Wed, 27 May 2026 03:11:37 +0000 Subject: [PATCH 3/5] fix comment --- .../common/data/abstract_binary_writer.cpp | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/paimon/common/data/abstract_binary_writer.cpp b/src/paimon/common/data/abstract_binary_writer.cpp index 1487493..a0625c3 100644 --- a/src/paimon/common/data/abstract_binary_writer.cpp +++ b/src/paimon/common/data/abstract_binary_writer.cpp @@ -199,24 +199,24 @@ int32_t AbstractBinaryWriter::RoundNumberOfBytesToNearestWord(int32_t num_bytes) return num_bytes + (8 - remainder); } } + template void AbstractBinaryWriter::WriteBytesToFixLenPart(MemorySegment* segment, int32_t field_offset, const T& bytes, int32_t len) { - int64_t first_byte = len | 0x80; // first bit is 1, other bits is len - int64_t seven_bytes = 0L; // real data + const uint64_t first_byte = + static_cast(len) | 0x80U; // first bit is 1, low 7 bits are len + uint64_t seven_bytes = 0U; // real data if ((SystemByteOrder() == ByteOrder::PAIMON_LITTLE_ENDIAN)) { for (int32_t i = 0; i < len; i++) { - seven_bytes |= ((0x00000000000000FFL & bytes[i]) << (i * 8L)); + seven_bytes |= (static_cast(bytes[i]) & 0xFFU) << (i * 8U); } } else { for (int32_t i = 0; i < len; i++) { - seven_bytes |= ((0x00000000000000FFL & bytes[i]) << ((6 - i) * 8L)); + seven_bytes |= (static_cast(bytes[i]) & 0xFFU) << ((6 - i) * 8U); } } - const int64_t offset_and_size = - (first_byte << 56) | // NOLINT(clang-analyzer-core.UndefinedBinaryOperatorResult) - seven_bytes; - segment->PutValue(field_offset, offset_and_size); + const uint64_t offset_and_size = (first_byte << 56) | seven_bytes; + segment->PutValue(field_offset, static_cast(offset_and_size)); } } // namespace paimon From 94b3929e19f287ea44b863a14c3bc15703a32150 Mon Sep 17 00:00:00 2001 From: "lisizhuo.lsz" Date: Wed, 27 May 2026 03:34:54 +0000 Subject: [PATCH 4/5] fix comment in binary_row.cpp --- src/paimon/common/data/binary_row.cpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/paimon/common/data/binary_row.cpp b/src/paimon/common/data/binary_row.cpp index 563f71f..47bb703 100644 --- a/src/paimon/common/data/binary_row.cpp +++ b/src/paimon/common/data/binary_row.cpp @@ -31,7 +31,9 @@ namespace paimon { const int64_t BinaryRow::FIRST_BYTE_ZERO = - (SystemByteOrder() == ByteOrder::PAIMON_LITTLE_ENDIAN) ? (~0xFFL) : (~(0xFFL << 56L)); + (SystemByteOrder() == ByteOrder::PAIMON_LITTLE_ENDIAN) + ? static_cast(~0xFFULL) + : static_cast(~(0xFFULL << 56)); const BinaryRow& BinaryRow::EmptyRow() { static const BinaryRow empty_row = GetEmptyRow(); From 329611d1cc4ef1556347fcd8d23345860e1aaf2e Mon Sep 17 00:00:00 2001 From: "lisizhuo.lsz" Date: Thu, 28 May 2026 03:28:51 +0000 Subject: [PATCH 5/5] fix comment in binary_row_writer.cpp --- src/paimon/common/data/binary_row_writer.cpp | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/paimon/common/data/binary_row_writer.cpp b/src/paimon/common/data/binary_row_writer.cpp index eb4898e..3c1deb4 100644 --- a/src/paimon/common/data/binary_row_writer.cpp +++ b/src/paimon/common/data/binary_row_writer.cpp @@ -148,8 +148,9 @@ Result BinaryRowWriter::CreateFieldSetter( arrow::internal::checked_cast(field_type.get()); assert(decimal_type); auto precision = decimal_type->precision(); - field_setter = [field_idx, precision](const VariantType& field, - BinaryRowWriter* writer) -> void { + auto scale = decimal_type->scale(); + field_setter = [field_idx, precision, scale](const VariantType& field, + BinaryRowWriter* writer) -> void { if (DataDefine::IsVariantNull(field)) { if (!Decimal::IsCompact(precision)) { writer->WriteDecimal(field_idx, std::nullopt, precision); @@ -158,8 +159,9 @@ Result BinaryRowWriter::CreateFieldSetter( } return; } - return writer->WriteDecimal(field_idx, DataDefine::GetVariantValue(field), - precision); + auto decimal_value = DataDefine::GetVariantValue(field); + assert(decimal_value.Scale() == scale); + return writer->WriteDecimal(field_idx, decimal_value, precision); }; return field_setter; }