Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
222 changes: 222 additions & 0 deletions src/paimon/common/data/abstract_binary_writer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "paimon/common/data/abstract_binary_writer.h"

#include <cassert>
#include <memory>
#include <optional>

#include "paimon/common/data/binary_array.h"
#include "paimon/common/data/binary_map.h"
#include "paimon/common/data/binary_row.h"
#include "paimon/common/data/binary_section.h"
#include "paimon/common/data/binary_string.h"
#include "paimon/common/memory/memory_segment_utils.h"
#include "paimon/data/decimal.h"
#include "paimon/data/timestamp.h"
#include "paimon/io/byte_order.h"
#include "paimon/memory/bytes.h"
#include "paimon/memory/memory_pool.h"

namespace paimon {

void AbstractBinaryWriter::WriteBytes(int32_t pos, const Bytes& bytes) {
int32_t len = bytes.size();
if (len <= BinarySection::MAX_FIX_PART_DATA_SIZE) {
WriteBytesToFixLenPart(&segment_, GetFieldOffset(pos), bytes, len);
} else {
WriteBytesToVarLenPart(pos, bytes, len);
}
}

void AbstractBinaryWriter::WriteString(int32_t pos, const BinaryString& input) {
int32_t len = input.GetSizeInBytes();
if (len <= BinarySection::MAX_FIX_PART_DATA_SIZE) {
auto bytes = Bytes::AllocateBytes(len, pool_);
MemorySegmentUtils::CopyToBytes({input.GetSegment()}, input.GetOffset(), bytes.get(), 0,
len);
WriteBytesToFixLenPart(&segment_, GetFieldOffset(pos), *bytes, len);
} else {
WriteSegmentToVarLenPart(pos, input.GetSegment(), input.GetOffset(), len);
}
}

void AbstractBinaryWriter::WriteBinary(int32_t pos, const Bytes& bytes) {
int32_t len = bytes.size();
if (len <= BinarySection::MAX_FIX_PART_DATA_SIZE) {
WriteBytesToFixLenPart(&segment_, GetFieldOffset(pos), bytes, len);
} else {
WriteBytesToVarLenPart(pos, bytes, len);
}
}

void AbstractBinaryWriter::WriteStringView(int32_t pos, const std::string_view& view) {
int32_t len = view.size();
if (len <= BinarySection::MAX_FIX_PART_DATA_SIZE) {
WriteBytesToFixLenPart(&segment_, GetFieldOffset(pos), view, len);
} else {
WriteBytesToVarLenPart(pos, view, len);
}
}

void AbstractBinaryWriter::WriteRow(int32_t pos, const BinaryRow& input) {
return WriteSegmentToVarLenPart(pos, input.GetSegment(), input.GetOffset(),
input.GetSizeInBytes());
}

void AbstractBinaryWriter::WriteArray(int32_t pos, const BinaryArray& input) {
return WriteSegmentToVarLenPart(pos, input.GetSegment(), input.GetOffset(),
input.GetSizeInBytes());
}

void AbstractBinaryWriter::WriteMap(int32_t pos, const BinaryMap& input) {
return WriteSegmentToVarLenPart(pos, input.GetSegment(), input.GetOffset(),
input.GetSizeInBytes());
}
void AbstractBinaryWriter::WriteDecimal(int32_t pos, const std::optional<Decimal>& value,
int32_t precision) {
assert(value == std::nullopt || precision == value.value().Precision());
if (Decimal::IsCompact(precision)) {
assert(value != std::nullopt);
WriteLong(pos, value.value().ToUnscaledLong());
} else {
// grow the global buffer before writing data.
EnsureCapacity(16);
// zero-out 16 bytes
segment_.PutValue<int64_t>(cursor_, 0ll);
segment_.PutValue<int64_t>(cursor_ + 8, 0ll);

// Make sure Decimal object has the same scale as DecimalType.
// Note that we may pass in null Decimal object to set null for it.
if (value == std::nullopt) {
SetNullBit(pos);
SetOffsetAndSize(pos, cursor_, 0l);
} else {
auto bytes = value.value().ToUnscaledBytes();
segment_.Put(cursor_, bytes, 0, bytes.size());
SetOffsetAndSize(pos, cursor_, bytes.size());
}
// move the cursor forward.
cursor_ += 16;
}
}

void AbstractBinaryWriter::WriteTimestamp(int32_t pos, const std::optional<Timestamp>& value,
int32_t precision) {
if (Timestamp::IsCompact(precision)) {
assert(value != std::nullopt);
WriteLong(pos, value.value().GetMillisecond());
} else {
// store the nanoOfMillisecond in fixed-length part as offset and nanoOfMillisecond
EnsureCapacity(8);

if (value == std::nullopt) {
SetNullBit(pos);
// zero-out the bytes
segment_.PutValue<int64_t>(cursor_, 0l);
SetOffsetAndSize(pos, cursor_, 0l);
} else {
segment_.PutValue<int64_t>(cursor_, value.value().GetMillisecond());
SetOffsetAndSize(pos, cursor_, value.value().GetNanoOfMillisecond());
}
cursor_ += 8;
}
}

void AbstractBinaryWriter::ZeroOutPaddingBytes(int32_t num_bytes) {
if ((num_bytes & 0x07) > 0) {
segment_.PutValue<int64_t>(cursor_ + ((num_bytes >> 3) << 3), 0L);
}
}

void AbstractBinaryWriter::EnsureCapacity(int32_t needed_size) {
const int32_t length = cursor_ + needed_size;
if (segment_.Size() < length) {
Grow(length);
}
}

void AbstractBinaryWriter::WriteSegmentToVarLenPart(int32_t pos, const MemorySegment& segment,
int32_t offset, int32_t size) {
const int32_t rounded_size = RoundNumberOfBytesToNearestWord(size);
// grow the global buffer before writing data.
EnsureCapacity(rounded_size);
ZeroOutPaddingBytes(size);

segment.CopyTo(offset, &segment_, cursor_, size);
SetOffsetAndSize(pos, cursor_, size);
// move the cursor forward.
cursor_ += rounded_size;
}

template <typename T>
void AbstractBinaryWriter::WriteBytesToVarLenPart(int32_t pos, const T& bytes, int32_t len) {
const int32_t rounded_size = RoundNumberOfBytesToNearestWord(len);
// grow the global buffer before writing data.
EnsureCapacity(rounded_size);
ZeroOutPaddingBytes(len);
// Write the bytes to the variable length portion.
segment_.Put(cursor_, bytes, 0, len);
SetOffsetAndSize(pos, cursor_, len);
// move the cursor forward.
cursor_ += rounded_size;
}

void AbstractBinaryWriter::Grow(int32_t min_capacity) {
int32_t old_capacity = segment_.Size();
int32_t new_capacity = old_capacity + (old_capacity >> 1);
if (new_capacity - min_capacity < 0) {
new_capacity = min_capacity;
}
std::shared_ptr<Bytes> new_bytes =
Bytes::CopyOf(*(segment_.GetOrCreateHeapMemory(pool_)), new_capacity, pool_);
segment_ = MemorySegment::Wrap(new_bytes);
AfterGrow();
}

int32_t AbstractBinaryWriter::RoundNumberOfBytesToNearestWord(int32_t num_bytes) {
int32_t remainder = num_bytes & 0x07;
if (remainder == 0) {
return num_bytes;
} else {
return num_bytes + (8 - remainder);
}
}

template <typename T>
void AbstractBinaryWriter::WriteBytesToFixLenPart(MemorySegment* segment, int32_t field_offset,
const T& bytes, int32_t len) {
const uint64_t first_byte =
static_cast<uint64_t>(len) | 0x80U; // first bit is 1, low 7 bits are len
uint64_t seven_bytes = 0U; // real data
if ((SystemByteOrder() == ByteOrder::PAIMON_LITTLE_ENDIAN)) {
for (int32_t i = 0; i < len; i++) {
seven_bytes |= (static_cast<uint64_t>(bytes[i]) & 0xFFU) << (i * 8U);
}
} else {
for (int32_t i = 0; i < len; i++) {
seven_bytes |= (static_cast<uint64_t>(bytes[i]) & 0xFFU) << ((6 - i) * 8U);
}
}
const uint64_t offset_and_size = (first_byte << 56) | seven_bytes;
segment->PutValue<int64_t>(field_offset, static_cast<int64_t>(offset_and_size));
}

} // namespace paimon
104 changes: 104 additions & 0 deletions src/paimon/common/data/abstract_binary_writer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#pragma once

#include <cstdint>
#include <optional>
#include <string_view>

#include "paimon/common/data/binary_writer.h"
#include "paimon/common/memory/memory_segment.h"

namespace paimon {
class BinaryArray;
class BinaryRow;
class BinaryString;
class Bytes;
class Decimal;
class MemoryPool;
class Timestamp;

/// Use the special format to write data to a `MemorySegment` (its capacity grows
/// automatically). If write a format binary: 1. New a writer. 2. Write each field by `WriteXXX()`
/// or `SetNullAt()`. (Variable length fields can not be written repeatedly.) 3. Invoke
/// `Complete()`. If want to reuse this writer, please invoke `Reset()` first.
class AbstractBinaryWriter : public BinaryWriter {
public:
void WriteString(int32_t pos, const BinaryString& value) override;

void WriteBinary(int32_t pos, const Bytes& bytes) override;

void WriteDecimal(int32_t pos, const std::optional<Decimal>& value, int32_t precision) override;

void WriteTimestamp(int32_t pos, const std::optional<Timestamp>& value,
int32_t precision) override;

void WriteArray(int32_t pos, const BinaryArray& value) override;

void WriteRow(int32_t pos, const BinaryRow& value) override;

void WriteMap(int32_t pos, const BinaryMap& input) override;

void WriteStringView(int32_t pos, const std::string_view& view) override;

const MemorySegment& GetSegment() const {
return segment_;
}

protected:
static int32_t RoundNumberOfBytesToNearestWord(int32_t num_bytes);

/// Set offset and size to fix len part.
virtual void SetOffsetAndSize(int32_t pos, int32_t offset, int64_t size) = 0;

/// Get field offset.
virtual int32_t GetFieldOffset(int32_t pos) const = 0;

/// After grow, need point to new memory.
virtual void AfterGrow() = 0;

virtual void SetNullBit(int32_t ordinal) = 0;

void ZeroOutPaddingBytes(int32_t num_bytes);
void EnsureCapacity(int32_t needed_size);

private:
/// Increases the capacity to ensure that it can hold at least the minimum capacity argument.
void Grow(int32_t min_capacity);

void WriteBytes(int32_t pos, const Bytes& bytes);

template <typename T>
void WriteBytesToVarLenPart(int32_t pos, const T& bytes, int32_t len);

template <typename T>
static void WriteBytesToFixLenPart(MemorySegment* segment, int32_t field_offset, const T& bytes,
int32_t len);

void WriteSegmentToVarLenPart(int32_t pos, const MemorySegment& segment, int32_t offset,
int32_t size);

protected:
int32_t cursor_ = 0;
MemoryPool* pool_ = nullptr;
MemorySegment segment_;
};

} // namespace paimon
Loading