From 9e7681a6696a769b350b491af5b79a1ed7eb22a1 Mon Sep 17 00:00:00 2001 From: lxy264173 Date: Tue, 26 May 2026 18:23:00 +0800 Subject: [PATCH 1/3] feat: migrate common/data/columnar module --- .../common/data/columnar/columnar_array.cpp | 157 ++++++ .../common/data/columnar/columnar_array.h | 149 ++++++ .../data/columnar/columnar_array_test.cpp | 312 ++++++++++++ .../data/columnar/columnar_batch_context.h | 40 ++ .../common/data/columnar/columnar_map.cpp | 45 ++ .../common/data/columnar/columnar_map.h | 53 ++ .../common/data/columnar/columnar_row.cpp | 81 ++++ .../common/data/columnar/columnar_row.h | 151 ++++++ .../common/data/columnar/columnar_row_ref.cpp | 84 ++++ .../common/data/columnar/columnar_row_ref.h | 136 ++++++ .../data/columnar/columnar_row_test.cpp | 455 ++++++++++++++++++ .../common/data/columnar/columnar_utils.h | 106 ++++ .../data/columnar/columnar_utils_test.cpp | 56 +++ src/paimon/common/utils/date_time_utils.h | 213 ++++++++ 14 files changed, 2038 insertions(+) create mode 100644 src/paimon/common/data/columnar/columnar_array.cpp create mode 100644 src/paimon/common/data/columnar/columnar_array.h create mode 100644 src/paimon/common/data/columnar/columnar_array_test.cpp create mode 100644 src/paimon/common/data/columnar/columnar_batch_context.h create mode 100644 src/paimon/common/data/columnar/columnar_map.cpp create mode 100644 src/paimon/common/data/columnar/columnar_map.h create mode 100644 src/paimon/common/data/columnar/columnar_row.cpp create mode 100644 src/paimon/common/data/columnar/columnar_row.h create mode 100644 src/paimon/common/data/columnar/columnar_row_ref.cpp create mode 100644 src/paimon/common/data/columnar/columnar_row_ref.h create mode 100644 src/paimon/common/data/columnar/columnar_row_test.cpp create mode 100644 src/paimon/common/data/columnar/columnar_utils.h create mode 100644 src/paimon/common/data/columnar/columnar_utils_test.cpp create mode 100644 src/paimon/common/utils/date_time_utils.h diff --git a/src/paimon/common/data/columnar/columnar_array.cpp b/src/paimon/common/data/columnar/columnar_array.cpp new file mode 100644 index 0000000..2fd1507 --- /dev/null +++ b/src/paimon/common/data/columnar/columnar_array.cpp @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "paimon/common/data/columnar/columnar_array.h" + +#include + +#include "arrow/api.h" +#include "arrow/array/array_decimal.h" +#include "arrow/array/array_nested.h" +#include "arrow/array/array_primitive.h" +#include "arrow/type_traits.h" +#include "arrow/util/checked_cast.h" +#include "arrow/util/decimal.h" +#include "fmt/format.h" +#include "paimon/common/data/columnar/columnar_batch_context.h" +#include "paimon/common/data/columnar/columnar_map.h" +#include "paimon/common/data/columnar/columnar_row_ref.h" +#include "paimon/common/utils/date_time_utils.h" + +namespace paimon { +Status ColumnarArray::CheckNoNull() const { + for (int32_t i = 0; i < length_; i++) { + if (IsNullAt(i)) { + return Status::Invalid(fmt::format("row {} is null", i)); + } + } + return Status::OK(); +} + +Decimal ColumnarArray::GetDecimal(int32_t pos, int32_t precision, int32_t scale) const { + using ArrayType = typename arrow::TypeTraits::ArrayType; + auto array = arrow::internal::checked_cast(array_); + assert(array); + arrow::Decimal128 decimal(array->GetValue(offset_ + pos)); + return Decimal(precision, scale, + static_cast(decimal.high_bits()) << 64 | decimal.low_bits()); +} + +Timestamp ColumnarArray::GetTimestamp(int32_t pos, int32_t precision) const { + using ArrayType = typename arrow::TypeTraits::ArrayType; + auto array = arrow::internal::checked_cast(array_); + assert(array); + int64_t data = array->Value(offset_ + pos); + auto timestamp_type = + arrow::internal::checked_pointer_cast(array->type()); + // for orc format, data is saved as nano, therefore, Timestamp convert should consider precision + // in arrow array rather than input precision + DateTimeUtils::TimeType time_type = DateTimeUtils::GetTimeTypeFromArrowType(timestamp_type); + auto [milli, nano] = DateTimeUtils::TimestampConverter( + data, time_type, DateTimeUtils::TimeType::MILLISECOND, DateTimeUtils::TimeType::NANOSECOND); + return Timestamp(milli, nano); +} + +std::shared_ptr ColumnarArray::GetArray(int32_t pos) const { + auto list_array = arrow::internal::checked_cast(array_); + assert(list_array); + int32_t offset = list_array->value_offset(offset_ + pos); + int32_t length = list_array->value_length(offset_ + pos); + return std::make_shared(list_array->values(), pool_, offset, length); +} + +std::shared_ptr ColumnarArray::GetMap(int32_t pos) const { + auto map_array = arrow::internal::checked_cast(array_); + assert(map_array); + int32_t offset = map_array->value_offset(offset_ + pos); + int32_t length = map_array->value_length(offset_ + pos); + return std::make_shared(map_array->keys(), map_array->items(), pool_, offset, + length); +} + +std::shared_ptr ColumnarArray::GetRow(int32_t pos, int32_t num_fields) const { + auto struct_array = arrow::internal::checked_cast(array_); + assert(struct_array); + auto row_ctx = std::make_shared(struct_array->fields(), pool_); + return std::make_shared(std::move(row_ctx), offset_ + pos); +} + +Result> ColumnarArray::ToBooleanArray() const { + PAIMON_RETURN_NOT_OK(CheckNoNull()); + std::vector res(length_); + for (int32_t i = 0; i < length_; i++) { + bool element = GetBoolean(i); + res[i] = element ? static_cast(1) : static_cast(0); + } + return res; +} + +Result> ColumnarArray::ToByteArray() const { + PAIMON_RETURN_NOT_OK(CheckNoNull()); + std::vector res(length_); + for (int32_t i = 0; i < length_; i++) { + res[i] = GetByte(i); + } + return res; +} + +Result> ColumnarArray::ToShortArray() const { + PAIMON_RETURN_NOT_OK(CheckNoNull()); + std::vector res(length_); + for (int32_t i = 0; i < length_; i++) { + res[i] = GetShort(i); + } + return res; +} + +Result> ColumnarArray::ToIntArray() const { + PAIMON_RETURN_NOT_OK(CheckNoNull()); + std::vector res(length_); + for (int32_t i = 0; i < length_; i++) { + res[i] = GetInt(i); + } + return res; +} + +Result> ColumnarArray::ToLongArray() const { + PAIMON_RETURN_NOT_OK(CheckNoNull()); + std::vector res(length_); + for (int32_t i = 0; i < length_; i++) { + res[i] = GetLong(i); + } + return res; +} + +Result> ColumnarArray::ToFloatArray() const { + PAIMON_RETURN_NOT_OK(CheckNoNull()); + std::vector res(length_); + for (int32_t i = 0; i < length_; i++) { + res[i] = GetFloat(i); + } + return res; +} + +Result> ColumnarArray::ToDoubleArray() const { + PAIMON_RETURN_NOT_OK(CheckNoNull()); + std::vector res(length_); + for (int32_t i = 0; i < length_; i++) { + res[i] = GetDouble(i); + } + return res; +} + +} // namespace paimon diff --git a/src/paimon/common/data/columnar/columnar_array.h b/src/paimon/common/data/columnar/columnar_array.h new file mode 100644 index 0000000..9554845 --- /dev/null +++ b/src/paimon/common/data/columnar/columnar_array.h @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once +#include +#include +#include +#include +#include + +#include "arrow/array/array_base.h" +#include "paimon/common/data/binary_string.h" +#include "paimon/common/data/columnar/columnar_utils.h" +#include "paimon/common/data/internal_array.h" +#include "paimon/common/data/internal_map.h" +#include "paimon/common/data/internal_row.h" +#include "paimon/data/decimal.h" +#include "paimon/data/timestamp.h" +#include "paimon/result.h" +#include "paimon/status.h" + +namespace arrow { +class BinaryType; +class BooleanType; +class Date32Type; +class DoubleType; +class FloatType; +class Int16Type; +class Int32Type; +class Int64Type; +class Int8Type; +class StringType; +} // namespace arrow + +namespace paimon { +class Bytes; +class MemoryPool; + +/// Columnar array to support access to vector column data. +class ColumnarArray : public InternalArray { + public: + ColumnarArray(const std::shared_ptr& array, + const std::shared_ptr& pool, int32_t offset, int32_t length) + : pool_(pool), array_(array.get()), offset_(offset), length_(length) { + assert(array_); + assert(array_->length() >= offset + length); + } + + int32_t Size() const override { + return length_; + } + + bool IsNullAt(int32_t pos) const override { + return array_->IsNull(offset_ + pos); + } + + bool GetBoolean(int32_t pos) const override { + return ColumnarUtils::GetGenericValue(array_, offset_ + pos); + } + + char GetByte(int32_t pos) const override { + return ColumnarUtils::GetGenericValue(array_, offset_ + pos); + } + + int16_t GetShort(int32_t pos) const override { + return ColumnarUtils::GetGenericValue(array_, offset_ + pos); + } + + int32_t GetInt(int32_t pos) const override { + return ColumnarUtils::GetGenericValue(array_, offset_ + pos); + } + + int32_t GetDate(int32_t pos) const override { + return ColumnarUtils::GetGenericValue(array_, offset_ + pos); + } + + int64_t GetLong(int32_t pos) const override { + return ColumnarUtils::GetGenericValue(array_, offset_ + pos); + } + + float GetFloat(int32_t pos) const override { + return ColumnarUtils::GetGenericValue(array_, offset_ + pos); + } + + double GetDouble(int32_t pos) const override { + return ColumnarUtils::GetGenericValue(array_, offset_ + pos); + } + + BinaryString GetString(int32_t pos) const override { + auto bytes = ColumnarUtils::GetBytes(array_, offset_ + pos, pool_.get()); + return BinaryString::FromBytes(bytes); + } + + std::string_view GetStringView(int32_t pos) const override { + return ColumnarUtils::GetView(array_, offset_ + pos); + } + + Decimal GetDecimal(int32_t pos, int32_t precision, int32_t scale) const override; + + Timestamp GetTimestamp(int32_t pos, int32_t precision) const override; + + std::shared_ptr GetBinary(int32_t pos) const override { + return ColumnarUtils::GetBytes(array_, offset_ + pos, pool_.get()); + } + + std::shared_ptr GetArray(int32_t pos) const override; + + std::shared_ptr GetMap(int32_t pos) const override; + + std::shared_ptr GetRow(int32_t pos, int32_t num_fields) const override; + + Result> ToBooleanArray() const override; + + Result> ToByteArray() const override; + + Result> ToShortArray() const override; + + Result> ToIntArray() const override; + + Result> ToLongArray() const override; + + Result> ToFloatArray() const override; + + Result> ToDoubleArray() const override; + + private: + Status CheckNoNull() const; + + private: + std::shared_ptr pool_; + const arrow::Array* array_; + int32_t offset_; + int32_t length_; +}; +} // namespace paimon diff --git a/src/paimon/common/data/columnar/columnar_array_test.cpp b/src/paimon/common/data/columnar/columnar_array_test.cpp new file mode 100644 index 0000000..c976ae2 --- /dev/null +++ b/src/paimon/common/data/columnar/columnar_array_test.cpp @@ -0,0 +1,312 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "paimon/common/data/columnar/columnar_array.h" + +#include +#include + +#include "arrow/api.h" +#include "arrow/array/array_nested.h" +#include "arrow/ipc/json_simple.h" +#include "arrow/util/checked_cast.h" +#include "gtest/gtest.h" +#include "paimon/common/data/internal_map.h" +#include "paimon/common/data/internal_row.h" +#include "paimon/common/utils/date_time_utils.h" +#include "paimon/memory/bytes.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { +TEST(ColumnarArrayTest, TestSimple) { + auto pool = GetDefaultPool(); + { + auto f1 = + arrow::ipc::internal::json::ArrayFromJSON( + arrow::list(arrow::boolean()), "[[true, false], [true], [false], [false, true]]") + .ValueOrDie(); + auto list_array = arrow::internal::checked_pointer_cast(f1); + auto array = ColumnarArray(list_array->values(), pool, /*offset=*/2, 1); + ASSERT_EQ(array.Size(), 1); + ASSERT_EQ(array.GetBoolean(0), true); + std::vector expected_array = {static_cast(1)}; + ASSERT_EQ(array.ToBooleanArray().value(), expected_array); + } + { + auto f1 = arrow::ipc::internal::json::ArrayFromJSON(arrow::list(arrow::int8()), + "[[1, 1, 2], [3], [2], [2]]") + .ValueOrDie(); + auto list_array = arrow::internal::checked_pointer_cast(f1); + auto array = ColumnarArray(list_array->values(), pool, /*offset=*/5, 1); + ASSERT_EQ(array.GetByte(0), 2); + std::vector expected_array = {static_cast(2)}; + ASSERT_EQ(array.ToByteArray().value(), expected_array); + } + { + auto f1 = arrow::ipc::internal::json::ArrayFromJSON(arrow::list(arrow::int16()), + "[[1, 1, 2], [3], [2], [-4]]") + .ValueOrDie(); + auto list_array = arrow::internal::checked_pointer_cast(f1); + auto array = ColumnarArray(list_array->values(), pool, /*offset=*/0, 3); + ASSERT_EQ(array.GetShort(0), 1); + ASSERT_EQ(array.GetShort(1), 1); + ASSERT_EQ(array.GetShort(2), 2); + std::vector expected_array = {1, 1, 2}; + ASSERT_EQ(array.ToShortArray().value(), expected_array); + } + { + auto f1 = arrow::ipc::internal::json::ArrayFromJSON(arrow::list(arrow::int32()), + "[[1, 1, 2], [3], [2], [-4]]") + .ValueOrDie(); + auto list_array = arrow::internal::checked_pointer_cast(f1); + auto array = ColumnarArray(list_array->values(), pool, /*offset=*/3, 1); + ASSERT_EQ(array.GetInt(0), 3); + std::vector expected_array = {3}; + ASSERT_EQ(array.ToIntArray().value(), expected_array); + } + { + auto f1 = arrow::ipc::internal::json::ArrayFromJSON(arrow::list(arrow::int64()), + "[[1, 1, 2], [3], [2], [-4]]") + .ValueOrDie(); + auto list_array = arrow::internal::checked_pointer_cast(f1); + auto array = ColumnarArray(list_array->values(), pool, /*offset=*/4, 1); + ASSERT_EQ(array.GetLong(0), 2); + std::vector expected_array = {2}; + ASSERT_EQ(array.ToLongArray().value(), expected_array); + } + { + auto f1 = arrow::ipc::internal::json::ArrayFromJSON(arrow::list(arrow::int64()), + "[[1, 1, 2], [3], [null], null]") + .ValueOrDie(); + auto list_array = arrow::internal::checked_pointer_cast(f1); + auto array = ColumnarArray(list_array->values(), pool, /*offset=*/4, 1); + ASSERT_NOK_WITH_MSG(array.ToLongArray(), "is null"); + } + { + auto f1 = arrow::ipc::internal::json::ArrayFromJSON( + arrow::list(arrow::float32()), "[[0.0, 1.1, 2.2], [3.3], [4.4], [5.5]]") + .ValueOrDie(); + auto list_array = arrow::internal::checked_pointer_cast(f1); + auto array = ColumnarArray(list_array->values(), pool, /*offset=*/0, 3); + ASSERT_NEAR(array.GetFloat(1), 1.1, 0.001); + std::vector expected_array = {0.0, 1.1, 2.2}; + ASSERT_EQ(array.ToFloatArray().value(), expected_array); + } + { + auto f1 = arrow::ipc::internal::json::ArrayFromJSON( + arrow::list(arrow::float64()), "[[0.0, 1.1, 2.2], [3.3], [4.4], [5.5]]") + .ValueOrDie(); + auto list_array = arrow::internal::checked_pointer_cast(f1); + auto array = ColumnarArray(list_array->values(), pool, /*offset=*/3, 1); + ASSERT_NEAR(array.GetDouble(0), 3.3, 0.001); + std::vector expected_array = {3.3}; + ASSERT_EQ(array.ToDoubleArray().value(), expected_array); + } + { + auto f1 = arrow::ipc::internal::json::ArrayFromJSON( + arrow::list(arrow::utf8()), R"([["abc", "def"], ["efg"], ["hello"], ["hi"]])") + .ValueOrDie(); + auto list_array = arrow::internal::checked_pointer_cast(f1); + auto array = ColumnarArray(list_array->values(), pool, /*offset=*/4, 1); + ASSERT_EQ(array.GetString(0).ToString(), "hi"); + ASSERT_EQ(std::string(array.GetStringView(0)), "hi"); + } +} + +TEST(ColumnarArrayTest, TestComplexAndNestedType) { + auto pool = GetDefaultPool(); + { + auto f1 = arrow::ipc::internal::json::ArrayFromJSON(arrow::list(arrow::date32()), + "[[1, 1, 2], [3], [2], [-4]]") + .ValueOrDie(); + auto list_array = arrow::internal::checked_pointer_cast(f1); + auto array = ColumnarArray(list_array->values(), pool, /*offset=*/3, 1); + ASSERT_EQ(array.GetDate(0), 3); + } + { + auto f1 = arrow::ipc::internal::json::ArrayFromJSON( + arrow::list(arrow::decimal128(10, 3)), + R"([["1.234", "1234.000"], ["-9876.543"], ["666.888"]])") + .ValueOrDie(); + auto list_array = arrow::internal::checked_pointer_cast(f1); + auto array = ColumnarArray(list_array->values(), pool, /*offset=*/0, 2); + ASSERT_EQ(array.GetDecimal(0, 10, 3), Decimal(10, 3, 1234)); + } + { + auto f1 = arrow::ipc::internal::json::ArrayFromJSON( + arrow::list(arrow::timestamp(arrow::TimeUnit::NANO)), + R"([["1970-01-01T00:00:59"],["2000-02-29T23:23:23", + "1899-01-01T00:59:20"],["2033-05-18T03:33:20"]])") + .ValueOrDie(); + auto list_array = arrow::internal::checked_pointer_cast(f1); + auto array = ColumnarArray(list_array->values(), pool, /*offset=*/0, 1); + auto ts = array.GetTimestamp(0, 9); + ASSERT_EQ(ts, Timestamp(59000, 0)); + } + { + auto f1 = arrow::ipc::internal::json::ArrayFromJSON(arrow::list(arrow::binary()), + R"([["aaa", "bb"], ["ccc"], ["bbb"]])") + .ValueOrDie(); + auto list_array = arrow::internal::checked_pointer_cast(f1); + auto array = ColumnarArray(list_array->values(), pool, /*offset=*/0, 2); + ASSERT_EQ(*array.GetBinary(1), Bytes("bb", pool.get())); + ASSERT_EQ(std::string(array.GetStringView(1)), "bb"); + } + { + auto f1 = arrow::ipc::internal::json::ArrayFromJSON(arrow::list(arrow::struct_({ + field("sub1", arrow::int64()), + field("sub2", arrow::int64()), + field("sub3", arrow::int64()), + field("sub4", arrow::int64()), + })), + R"([ + [[1, 3, 2, 5], + [2, 2, 1, 3]], + [[3, 2, 1, 3]], + [[4, 1, 0, 2]] + ])") + .ValueOrDie(); + auto list_array = arrow::internal::checked_pointer_cast(f1); + auto array = ColumnarArray(list_array->values(), pool, /*offset=*/0, 2); + auto result_row = array.GetRow(1, 4); + ASSERT_EQ(result_row->GetLong(0), 2); + ASSERT_EQ(result_row->GetLong(1), 2); + ASSERT_EQ(result_row->GetLong(2), 1); + ASSERT_EQ(result_row->GetLong(3), 3); + } + { + auto f1 = arrow::ipc::internal::json::ArrayFromJSON( + arrow::list(arrow::list(arrow::int64())), "[[[1, 2, 3], [4, 5, 6]], []]") + .ValueOrDie(); + auto list_array = arrow::internal::checked_pointer_cast(f1); + auto array = ColumnarArray(list_array->values(), pool, /*offset=*/0, 1); + auto result_array = array.GetArray(0); + auto inner_result_array = array.GetArray(0); + std::vector values = {1, 2, 3}; + ASSERT_EQ(inner_result_array->ToLongArray().value(), values); + } + { + auto f1 = arrow::ipc::internal::json::ArrayFromJSON( + arrow::list(arrow::map(arrow::int32(), arrow::int64())), + R"([ + [[[1, 3], [4, 4]]], [] + ])") + .ValueOrDie(); + auto list_array = arrow::internal::checked_pointer_cast(f1); + ASSERT_TRUE(list_array); + auto array = ColumnarArray(list_array->values(), pool, /*offset=*/0, 1); + auto result_key = array.GetMap(0)->KeyArray(); + auto result_value = array.GetMap(0)->ValueArray(); + ASSERT_EQ(result_key->ToIntArray().value(), std::vector({1, 4})); + ASSERT_EQ(result_value->ToLongArray().value(), std::vector({3, 4})); + } +} +TEST(ColumnarArrayTest, TestTimestampType) { + auto pool = GetDefaultPool(); + auto timezone = DateTimeUtils::GetLocalTimezoneName(); + { + auto f1 = arrow::ipc::internal::json::ArrayFromJSON( + arrow::list(arrow::timestamp(arrow::TimeUnit::SECOND)), + R"([["1970-01-01T00:00:59"],["2000-02-29T23:23:23", + "1899-01-01T00:59:20"],["2033-05-18T03:33:20"]])") + .ValueOrDie(); + auto list_array = arrow::internal::checked_pointer_cast(f1); + auto array = ColumnarArray(list_array->values(), pool, /*offset=*/1, 2); + auto ts = array.GetTimestamp(0, 0); + ASSERT_EQ(ts, Timestamp(951866603000, 0)) << ts.GetMillisecond(); + } + { + auto f1 = arrow::ipc::internal::json::ArrayFromJSON( + arrow::list(arrow::timestamp(arrow::TimeUnit::MILLI)), + R"([["1970-01-01T00:00:59"],["2000-02-29T23:23:23.001", + "1899-01-01T00:59:20"],["2033-05-18T03:33:20"]])") + .ValueOrDie(); + auto list_array = arrow::internal::checked_pointer_cast(f1); + auto array = ColumnarArray(list_array->values(), pool, /*offset=*/1, 2); + auto ts = array.GetTimestamp(0, 3); + ASSERT_EQ(ts, Timestamp(951866603001, 0)) << ts.GetMillisecond(); + } + { + auto f1 = arrow::ipc::internal::json::ArrayFromJSON( + arrow::list(arrow::timestamp(arrow::TimeUnit::MICRO)), + R"([["1970-01-01T00:00:59"],["2000-02-29T23:23:23.001001", + "1899-01-01T00:59:20"],["2033-05-18T03:33:20"]])") + .ValueOrDie(); + auto list_array = arrow::internal::checked_pointer_cast(f1); + auto array = ColumnarArray(list_array->values(), pool, /*offset=*/1, 2); + auto ts = array.GetTimestamp(0, 6); + ASSERT_EQ(ts, Timestamp(951866603001, 1000)) << ts.GetMillisecond(); + } + { + auto f1 = arrow::ipc::internal::json::ArrayFromJSON( + arrow::list(arrow::timestamp(arrow::TimeUnit::NANO)), + R"([["1970-01-01T00:00:59"],["2000-02-29T23:23:23.001001001", + "1899-01-01T00:59:20"],["2033-05-18T03:33:20"]])") + .ValueOrDie(); + auto list_array = arrow::internal::checked_pointer_cast(f1); + auto array = ColumnarArray(list_array->values(), pool, /*offset=*/1, 2); + auto ts = array.GetTimestamp(0, 9); + ASSERT_EQ(ts, Timestamp(951866603001, 1001)) << ts.GetMillisecond(); + } + { + auto f1 = arrow::ipc::internal::json::ArrayFromJSON( + arrow::list(arrow::timestamp(arrow::TimeUnit::SECOND, timezone)), + R"([["1970-01-01T00:00:59"],["2000-02-29T23:23:23", + "1899-01-01T00:59:20"],["2033-05-18T03:33:20"]])") + .ValueOrDie(); + auto list_array = arrow::internal::checked_pointer_cast(f1); + auto array = ColumnarArray(list_array->values(), pool, /*offset=*/1, 2); + auto ts = array.GetTimestamp(0, 0); + ASSERT_EQ(ts, Timestamp(951866603000, 0)) << ts.GetMillisecond(); + } + { + auto f1 = arrow::ipc::internal::json::ArrayFromJSON( + arrow::list(arrow::timestamp(arrow::TimeUnit::MILLI, timezone)), + R"([["1970-01-01T00:00:59"],["2000-02-29T23:23:23.001", + "1899-01-01T00:59:20"],["2033-05-18T03:33:20"]])") + .ValueOrDie(); + auto list_array = arrow::internal::checked_pointer_cast(f1); + auto array = ColumnarArray(list_array->values(), pool, /*offset=*/1, 2); + auto ts = array.GetTimestamp(0, 3); + ASSERT_EQ(ts, Timestamp(951866603001, 0)) << ts.GetMillisecond(); + } + { + auto f1 = arrow::ipc::internal::json::ArrayFromJSON( + arrow::list(arrow::timestamp(arrow::TimeUnit::MICRO, timezone)), + R"([["1970-01-01T00:00:59"],["2000-02-29T23:23:23.001001", + "1899-01-01T00:59:20"],["2033-05-18T03:33:20"]])") + .ValueOrDie(); + auto list_array = arrow::internal::checked_pointer_cast(f1); + auto array = ColumnarArray(list_array->values(), pool, /*offset=*/1, 2); + auto ts = array.GetTimestamp(0, 6); + ASSERT_EQ(ts, Timestamp(951866603001, 1000)) << ts.GetMillisecond(); + } + { + auto f1 = arrow::ipc::internal::json::ArrayFromJSON( + arrow::list(arrow::timestamp(arrow::TimeUnit::NANO, timezone)), + R"([["1970-01-01T00:00:59"],["2000-02-29T23:23:23.001001001", + "1899-01-01T00:59:20"],["2033-05-18T03:33:20"]])") + .ValueOrDie(); + auto list_array = arrow::internal::checked_pointer_cast(f1); + auto array = ColumnarArray(list_array->values(), pool, /*offset=*/1, 2); + auto ts = array.GetTimestamp(0, 9); + ASSERT_EQ(ts, Timestamp(951866603001, 1001)) << ts.GetMillisecond(); + } +} + +} // namespace paimon::test diff --git a/src/paimon/common/data/columnar/columnar_batch_context.h b/src/paimon/common/data/columnar/columnar_batch_context.h new file mode 100644 index 0000000..2d35c0d --- /dev/null +++ b/src/paimon/common/data/columnar/columnar_batch_context.h @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include + +#include "arrow/array/array_base.h" + +namespace arrow { +class StructArray; +} // namespace arrow + +namespace paimon { +class MemoryPool; + +struct ColumnarBatchContext { + ColumnarBatchContext(const arrow::ArrayVector& array_vec_in, + const std::shared_ptr& pool_in) + : pool(pool_in), array_vec(array_vec_in) {} + + std::shared_ptr pool; + arrow::ArrayVector array_vec; +}; +} // namespace paimon diff --git a/src/paimon/common/data/columnar/columnar_map.cpp b/src/paimon/common/data/columnar/columnar_map.cpp new file mode 100644 index 0000000..3d720fe --- /dev/null +++ b/src/paimon/common/data/columnar/columnar_map.cpp @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "paimon/common/data/columnar/columnar_map.h" + +#include "paimon/common/data/columnar/columnar_array.h" + +namespace arrow { +class Array; +} // namespace arrow + +namespace paimon { +class MemoryPool; + +ColumnarMap::ColumnarMap(const std::shared_ptr& key_array, + const std::shared_ptr& value_array, + const std::shared_ptr& pool, int32_t offset, int32_t length) + : pool_(pool), + key_array_(key_array), + value_array_(value_array), + offset_(offset), + length_(length) {} + +std::shared_ptr ColumnarMap::KeyArray() const { + return std::make_shared(key_array_, pool_, offset_, length_); +} +std::shared_ptr ColumnarMap::ValueArray() const { + return std::make_shared(value_array_, pool_, offset_, length_); +} + +} // namespace paimon diff --git a/src/paimon/common/data/columnar/columnar_map.h b/src/paimon/common/data/columnar/columnar_map.h new file mode 100644 index 0000000..549a75a --- /dev/null +++ b/src/paimon/common/data/columnar/columnar_map.h @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include + +#include "paimon/common/data/internal_array.h" +#include "paimon/common/data/internal_map.h" + +namespace arrow { +class Array; +} // namespace arrow + +namespace paimon { +class MemoryPool; + +/// Columnar map to support access to vector column data. +class ColumnarMap : public InternalMap { + public: + ColumnarMap(const std::shared_ptr& key_array, + const std::shared_ptr& value_array, + const std::shared_ptr& pool, int32_t offset, int32_t length); + + int32_t Size() const override { + return length_; + } + std::shared_ptr KeyArray() const override; + std::shared_ptr ValueArray() const override; + + private: + std::shared_ptr pool_; + std::shared_ptr key_array_; + std::shared_ptr value_array_; + int32_t offset_; + int32_t length_; +}; +} // namespace paimon diff --git a/src/paimon/common/data/columnar/columnar_row.cpp b/src/paimon/common/data/columnar/columnar_row.cpp new file mode 100644 index 0000000..fd8509d --- /dev/null +++ b/src/paimon/common/data/columnar/columnar_row.cpp @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "paimon/common/data/columnar/columnar_row.h" + +#include + +#include "arrow/array/array_base.h" +#include "arrow/array/array_decimal.h" +#include "arrow/array/array_nested.h" +#include "arrow/array/array_primitive.h" +#include "arrow/type_traits.h" +#include "arrow/util/checked_cast.h" +#include "arrow/util/decimal.h" +#include "paimon/common/data/columnar/columnar_array.h" +#include "paimon/common/data/columnar/columnar_map.h" +#include "paimon/common/utils/date_time_utils.h" + +namespace paimon { +Decimal ColumnarRow::GetDecimal(int32_t pos, int32_t precision, int32_t scale) const { + using ArrayType = typename arrow::TypeTraits::ArrayType; + auto array = arrow::internal::checked_cast(array_vec_[pos]); + assert(array); + arrow::Decimal128 decimal(array->GetValue(row_id_)); + return Decimal(precision, scale, + static_cast(decimal.high_bits()) << 64 | decimal.low_bits()); +} + +Timestamp ColumnarRow::GetTimestamp(int32_t pos, int32_t precision) const { + using ArrayType = typename arrow::TypeTraits::ArrayType; + auto array = arrow::internal::checked_cast(array_vec_[pos]); + assert(array); + int64_t data = array->Value(row_id_); + auto timestamp_type = + arrow::internal::checked_pointer_cast(array->type()); + // for orc format, data is saved as nano, therefore, Timestamp convert should consider precision + // in arrow array rather than input precision + DateTimeUtils::TimeType time_type = DateTimeUtils::GetTimeTypeFromArrowType(timestamp_type); + auto [milli, nano] = DateTimeUtils::TimestampConverter( + data, time_type, DateTimeUtils::TimeType::MILLISECOND, DateTimeUtils::TimeType::NANOSECOND); + return Timestamp(milli, nano); +} + +std::shared_ptr ColumnarRow::GetRow(int32_t pos, int32_t num_fields) const { + auto struct_array = arrow::internal::checked_cast(array_vec_[pos]); + assert(struct_array); + return std::make_shared(struct_array->fields(), pool_, row_id_); +} + +std::shared_ptr ColumnarRow::GetArray(int32_t pos) const { + auto list_array = arrow::internal::checked_cast(array_vec_[pos]); + assert(list_array); + int32_t offset = list_array->value_offset(row_id_); + int32_t length = list_array->value_length(row_id_); + return std::make_shared(list_array->values(), pool_, offset, length); +} + +std::shared_ptr ColumnarRow::GetMap(int32_t pos) const { + auto map_array = arrow::internal::checked_cast(array_vec_[pos]); + assert(map_array); + int32_t offset = map_array->value_offset(row_id_); + int32_t length = map_array->value_length(row_id_); + return std::make_shared(map_array->keys(), map_array->items(), pool_, offset, + length); +} + +} // namespace paimon diff --git a/src/paimon/common/data/columnar/columnar_row.h b/src/paimon/common/data/columnar/columnar_row.h new file mode 100644 index 0000000..f0163bf --- /dev/null +++ b/src/paimon/common/data/columnar/columnar_row.h @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include +#include +#include +#include + +#include "arrow/api.h" +#include "arrow/array/array_base.h" +#include "fmt/format.h" +#include "paimon/common/data/binary_string.h" +#include "paimon/common/data/columnar/columnar_utils.h" +#include "paimon/common/data/internal_array.h" +#include "paimon/common/data/internal_map.h" +#include "paimon/common/data/internal_row.h" +#include "paimon/common/types/row_kind.h" +#include "paimon/data/decimal.h" +#include "paimon/data/timestamp.h" +#include "paimon/result.h" + +namespace arrow { +class StructArray; +} // namespace arrow + +namespace paimon { +class Bytes; +class MemoryPool; + +/// Columnar row to support access to vector column data. It is a row view in arrow::Array. +class ColumnarRow : public InternalRow { + public: + ColumnarRow(const arrow::ArrayVector& array_vec, const std::shared_ptr& pool, + int64_t row_id) + : ColumnarRow(/*struct_array holder*/ nullptr, array_vec, pool, row_id) {} + + ColumnarRow(const std::shared_ptr& struct_array, + const arrow::ArrayVector& array_vec, const std::shared_ptr& pool, + int64_t row_id) + : struct_array_(struct_array), pool_(pool), row_id_(row_id) { + array_vec_.reserve(array_vec.size()); + for (const auto& array : array_vec) { + array_vec_.push_back(array.get()); + } + } + + Result GetRowKind() const override { + return row_kind_; + } + + void SetRowKind(const RowKind* kind) override { + row_kind_ = kind; + } + + int32_t GetFieldCount() const override { + return array_vec_.size(); + } + + bool IsNullAt(int32_t pos) const override { + return array_vec_[pos]->IsNull(row_id_); + } + bool GetBoolean(int32_t pos) const override { + return ColumnarUtils::GetGenericValue(array_vec_[pos], row_id_); + } + + char GetByte(int32_t pos) const override { + return ColumnarUtils::GetGenericValue(array_vec_[pos], row_id_); + } + + int16_t GetShort(int32_t pos) const override { + return ColumnarUtils::GetGenericValue(array_vec_[pos], row_id_); + } + + int32_t GetInt(int32_t pos) const override { + return ColumnarUtils::GetGenericValue(array_vec_[pos], row_id_); + } + + int32_t GetDate(int32_t pos) const override { + return ColumnarUtils::GetGenericValue(array_vec_[pos], row_id_); + } + + int64_t GetLong(int32_t pos) const override { + return ColumnarUtils::GetGenericValue(array_vec_[pos], row_id_); + } + + float GetFloat(int32_t pos) const override { + return ColumnarUtils::GetGenericValue(array_vec_[pos], row_id_); + } + + double GetDouble(int32_t pos) const override { + return ColumnarUtils::GetGenericValue(array_vec_[pos], row_id_); + } + + /// @note `GetString()` and `GetBinary()` will deep copy string data to pool, use + /// `GetStringView()` to avoid copy + BinaryString GetString(int32_t pos) const override { + auto bytes = + ColumnarUtils::GetBytes(array_vec_[pos], row_id_, pool_.get()); + return BinaryString::FromBytes(bytes); + } + + std::string_view GetStringView(int32_t pos) const override { + return ColumnarUtils::GetView(array_vec_[pos], row_id_); + } + + Decimal GetDecimal(int32_t pos, int32_t precision, int32_t scale) const override; + + Timestamp GetTimestamp(int32_t pos, int32_t precision) const override; + + std::shared_ptr GetBinary(int32_t pos) const override { + return ColumnarUtils::GetBytes(array_vec_[pos], row_id_, pool_.get()); + } + + std::shared_ptr GetRow(int32_t pos, int32_t num_fields) const override; + + std::shared_ptr GetArray(int32_t pos) const override; + + std::shared_ptr GetMap(int32_t pos) const override; + + std::string ToString() const override { + return fmt::format("ColumnarRow, row_id {}", row_id_); + } + + private: + /// @note `struct_array_` is the data holder for columnar row, ensure that the data life + /// cycle is consistent with the columnar row, `array_vec_` maybe a subset of + /// `struct_array_`, so `struct_array_` cannot be used for `GetXXX()` + std::shared_ptr struct_array_; + std::shared_ptr pool_; + std::vector array_vec_; + const RowKind* row_kind_ = RowKind::Insert(); + int64_t row_id_; +}; +} // namespace paimon diff --git a/src/paimon/common/data/columnar/columnar_row_ref.cpp b/src/paimon/common/data/columnar/columnar_row_ref.cpp new file mode 100644 index 0000000..ba362e9 --- /dev/null +++ b/src/paimon/common/data/columnar/columnar_row_ref.cpp @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "paimon/common/data/columnar/columnar_row_ref.h" + +#include + +#include "arrow/array/array_decimal.h" +#include "arrow/array/array_nested.h" +#include "arrow/array/array_primitive.h" +#include "arrow/type_traits.h" +#include "arrow/util/checked_cast.h" +#include "arrow/util/decimal.h" +#include "paimon/common/data/columnar/columnar_array.h" +#include "paimon/common/data/columnar/columnar_map.h" +#include "paimon/common/utils/date_time_utils.h" + +namespace paimon { +Decimal ColumnarRowRef::GetDecimal(int32_t pos, int32_t precision, int32_t scale) const { + using ArrayType = typename arrow::TypeTraits::ArrayType; + auto array = arrow::internal::checked_cast(ctx_->array_vec[pos].get()); + assert(array); + arrow::Decimal128 decimal(array->GetValue(row_id_)); + return Decimal(precision, scale, + static_cast(decimal.high_bits()) << 64 | decimal.low_bits()); +} + +Timestamp ColumnarRowRef::GetTimestamp(int32_t pos, int32_t precision) const { + using ArrayType = typename arrow::TypeTraits::ArrayType; + auto array = arrow::internal::checked_cast(ctx_->array_vec[pos].get()); + assert(array); + int64_t data = array->Value(row_id_); + auto timestamp_type = + arrow::internal::checked_pointer_cast(array->type()); + // for orc format, data is saved as nano, therefore, Timestamp convert should consider precision + // in arrow array rather than input precision + DateTimeUtils::TimeType time_type = DateTimeUtils::GetTimeTypeFromArrowType(timestamp_type); + auto [milli, nano] = DateTimeUtils::TimestampConverter( + data, time_type, DateTimeUtils::TimeType::MILLISECOND, DateTimeUtils::TimeType::NANOSECOND); + return Timestamp(milli, nano); +} + +std::shared_ptr ColumnarRowRef::GetRow(int32_t pos, int32_t num_fields) const { + auto struct_array = + arrow::internal::checked_cast(ctx_->array_vec[pos].get()); + assert(struct_array); + auto nested_ctx = std::make_shared(struct_array->fields(), ctx_->pool); + return std::make_shared(std::move(nested_ctx), row_id_); +} + +std::shared_ptr ColumnarRowRef::GetArray(int32_t pos) const { + auto list_array = + arrow::internal::checked_cast(ctx_->array_vec[pos].get()); + assert(list_array); + int32_t offset = list_array->value_offset(row_id_); + int32_t length = list_array->value_length(row_id_); + return std::make_shared(list_array->values(), ctx_->pool, offset, length); +} + +std::shared_ptr ColumnarRowRef::GetMap(int32_t pos) const { + auto map_array = + arrow::internal::checked_cast(ctx_->array_vec[pos].get()); + assert(map_array); + int32_t offset = map_array->value_offset(row_id_); + int32_t length = map_array->value_length(row_id_); + return std::make_shared(map_array->keys(), map_array->items(), ctx_->pool, offset, + length); +} + +} // namespace paimon diff --git a/src/paimon/common/data/columnar/columnar_row_ref.h b/src/paimon/common/data/columnar/columnar_row_ref.h new file mode 100644 index 0000000..b08fdf6 --- /dev/null +++ b/src/paimon/common/data/columnar/columnar_row_ref.h @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include +#include +#include + +#include "fmt/format.h" +#include "paimon/common/data/binary_string.h" +#include "paimon/common/data/columnar/columnar_batch_context.h" +#include "paimon/common/data/columnar/columnar_utils.h" +#include "paimon/common/data/internal_array.h" +#include "paimon/common/data/internal_map.h" +#include "paimon/common/data/internal_row.h" +#include "paimon/common/types/row_kind.h" +#include "paimon/data/decimal.h" +#include "paimon/data/timestamp.h" +#include "paimon/result.h" + +namespace paimon { +class Bytes; + +/// Columnar row view which shares batch-level context to reduce per-row overhead. +class ColumnarRowRef : public InternalRow { + public: + ColumnarRowRef(std::shared_ptr ctx, int64_t row_id) + : ctx_(std::move(ctx)), row_id_(row_id) {} + + Result GetRowKind() const override { + return row_kind_; + } + + void SetRowKind(const RowKind* kind) override { + row_kind_ = kind; + } + + int32_t GetFieldCount() const override { + return static_cast(ctx_->array_vec.size()); + } + + bool IsNullAt(int32_t pos) const override { + return ctx_->array_vec[pos]->IsNull(row_id_); + } + + bool GetBoolean(int32_t pos) const override { + return ColumnarUtils::GetGenericValue(ctx_->array_vec[pos].get(), + row_id_); + } + + char GetByte(int32_t pos) const override { + return ColumnarUtils::GetGenericValue(ctx_->array_vec[pos].get(), + row_id_); + } + + int16_t GetShort(int32_t pos) const override { + return ColumnarUtils::GetGenericValue(ctx_->array_vec[pos].get(), + row_id_); + } + + int32_t GetInt(int32_t pos) const override { + return ColumnarUtils::GetGenericValue(ctx_->array_vec[pos].get(), + row_id_); + } + + int32_t GetDate(int32_t pos) const override { + return ColumnarUtils::GetGenericValue( + ctx_->array_vec[pos].get(), row_id_); + } + + int64_t GetLong(int32_t pos) const override { + return ColumnarUtils::GetGenericValue(ctx_->array_vec[pos].get(), + row_id_); + } + + float GetFloat(int32_t pos) const override { + return ColumnarUtils::GetGenericValue(ctx_->array_vec[pos].get(), + row_id_); + } + + double GetDouble(int32_t pos) const override { + return ColumnarUtils::GetGenericValue(ctx_->array_vec[pos].get(), + row_id_); + } + + BinaryString GetString(int32_t pos) const override { + auto bytes = ColumnarUtils::GetBytes(ctx_->array_vec[pos].get(), row_id_, + ctx_->pool.get()); + return BinaryString::FromBytes(bytes); + } + + std::string_view GetStringView(int32_t pos) const override { + return ColumnarUtils::GetView(ctx_->array_vec[pos].get(), row_id_); + } + + Decimal GetDecimal(int32_t pos, int32_t precision, int32_t scale) const override; + + Timestamp GetTimestamp(int32_t pos, int32_t precision) const override; + + std::shared_ptr GetBinary(int32_t pos) const override { + return ColumnarUtils::GetBytes(ctx_->array_vec[pos].get(), row_id_, + ctx_->pool.get()); + } + + std::shared_ptr GetRow(int32_t pos, int32_t num_fields) const override; + + std::shared_ptr GetArray(int32_t pos) const override; + + std::shared_ptr GetMap(int32_t pos) const override; + + std::string ToString() const override { + return fmt::format("ColumnarRowRef, row_id {}", row_id_); + } + + private: + std::shared_ptr ctx_; + const RowKind* row_kind_ = RowKind::Insert(); + int64_t row_id_; +}; +} // namespace paimon diff --git a/src/paimon/common/data/columnar/columnar_row_test.cpp b/src/paimon/common/data/columnar/columnar_row_test.cpp new file mode 100644 index 0000000..6301cd2 --- /dev/null +++ b/src/paimon/common/data/columnar/columnar_row_test.cpp @@ -0,0 +1,455 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "paimon/common/data/columnar/columnar_row.h" + +#include + +#include "arrow/api.h" +#include "arrow/array/array_base.h" +#include "arrow/array/array_dict.h" +#include "arrow/array/array_nested.h" +#include "arrow/ipc/json_simple.h" +#include "gtest/gtest.h" +#include "paimon/common/data/columnar/columnar_row_ref.h" +#include "paimon/common/utils/date_time_utils.h" +#include "paimon/memory/bytes.h" +#include "paimon/memory/memory_pool.h" + +namespace paimon::test { +TEST(ColumnarRowTest, TestSimple) { + auto pool = GetDefaultPool(); + std::shared_ptr target_type = + arrow::struct_({arrow::field("f1", arrow::boolean()), arrow::field("f2", arrow::int8()), + arrow::field("f3", arrow::int16()), arrow::field("f4", arrow::int32()), + arrow::field("f5", arrow::int64()), arrow::field("f6", arrow::float32()), + arrow::field("f7", arrow::float64()), arrow::field("f8", arrow::utf8())}); + auto f1 = + arrow::ipc::internal::json::ArrayFromJSON(arrow::boolean(), R"([true, false, false, true])") + .ValueOrDie(); + auto f2 = + arrow::ipc::internal::json::ArrayFromJSON(arrow::int8(), R"([0, 1, 2, 3])").ValueOrDie(); + auto f3 = + arrow::ipc::internal::json::ArrayFromJSON(arrow::int16(), R"([4, 5, 6, 7])").ValueOrDie(); + auto f4 = arrow::ipc::internal::json::ArrayFromJSON(arrow::int32(), R"([10, 11, 12, 13])") + .ValueOrDie(); + auto f5 = arrow::ipc::internal::json::ArrayFromJSON(arrow::int64(), R"([15, 16, 17, 18])") + .ValueOrDie(); + auto f6 = arrow::ipc::internal::json::ArrayFromJSON(arrow::float32(), R"([0.0, 1.1, 2.2, 3.3])") + .ValueOrDie(); + auto f7 = arrow::ipc::internal::json::ArrayFromJSON(arrow::float64(), R"([5.5, 6.6, 7.7, 8.8])") + .ValueOrDie(); + auto f8 = arrow::ipc::internal::json::ArrayFromJSON(arrow::utf8(), + R"(["Hello", "World", "HELLO", "WORLD"])") + .ValueOrDie(); + auto data = arrow::StructArray::Make({f1, f2, f3, f4, f5, f6, f7, f8}, target_type->fields()) + .ValueOrDie(); + + auto row = ColumnarRow(data->fields(), pool, 0); + ASSERT_EQ(row.GetFieldCount(), 8); + ASSERT_EQ(row.GetBoolean(0), true); + ASSERT_EQ(row.GetByte(1), 0); + ASSERT_EQ(row.GetShort(2), 4); + ASSERT_EQ(row.GetInt(3), 10); + ASSERT_EQ(row.GetLong(4), 15); + ASSERT_EQ(row.GetFloat(5), 0.0); + ASSERT_EQ(row.GetDouble(6), 5.5); + ASSERT_EQ(row.GetString(7).ToString(), "Hello"); + ASSERT_EQ(std::string(row.GetStringView(7)), "Hello"); +} + +TEST(ColumnarRowRefTest, TestSimple) { + auto pool = GetDefaultPool(); + std::shared_ptr target_type = + arrow::struct_({arrow::field("f1", arrow::int32()), arrow::field("f2", arrow::utf8())}); + auto f1 = + arrow::ipc::internal::json::ArrayFromJSON(arrow::int32(), R"([1, 2, 3])").ValueOrDie(); + auto f2 = + arrow::ipc::internal::json::ArrayFromJSON(arrow::utf8(), R"(["alpha", "beta", "gamma"])") + .ValueOrDie(); + auto data = arrow::StructArray::Make({f1, f2}, target_type->fields()).ValueOrDie(); + + auto ctx = std::make_shared(data->fields(), pool); + ColumnarRowRef row(ctx, 1); + ASSERT_EQ(row.GetFieldCount(), 2); + ASSERT_EQ(row.GetInt(0), 2); + ASSERT_EQ(std::string(row.GetStringView(1)), "beta"); + + auto row_kind = row.GetRowKind(); + ASSERT_TRUE(row_kind.ok()); + ASSERT_EQ(row_kind.value(), RowKind::Insert()); + row.SetRowKind(RowKind::Delete()); + auto updated_kind = row.GetRowKind(); + ASSERT_TRUE(updated_kind.ok()); + ASSERT_EQ(updated_kind.value(), RowKind::Delete()); +} + +TEST(ColumnarRowTest, TestComplexAndNestedType) { + auto pool = GetDefaultPool(); + std::shared_ptr target_type = arrow::struct_({ + arrow::field("f0", arrow::date32()), + arrow::field("f1", arrow::decimal128(10, 3)), + arrow::field("f2", arrow::timestamp(arrow::TimeUnit::NANO)), + arrow::field("f3", arrow::binary()), + arrow::field( + "f4", arrow::struct_({field("sub1", arrow::int64()), field("sub2", arrow::int64()), + field("sub3", arrow::int64()), field("sub4", arrow::int64())})), + arrow::field("f5", arrow::list(arrow::int64())), + arrow::field("f6", arrow::map(arrow::int32(), arrow::int64())), + arrow::field("f7", arrow::map(arrow::int32(), arrow::list(arrow::int64()))), + arrow::field("f8", arrow::timestamp(arrow::TimeUnit::SECOND)), + arrow::field("f9", arrow::timestamp(arrow::TimeUnit::MILLI)), + arrow::field("f10", arrow::timestamp(arrow::TimeUnit::MICRO)), + }); + auto f0 = + arrow::ipc::internal::json::ArrayFromJSON(arrow::date32(), R"([109, 1000, -1000, 555])") + .ValueOrDie(); + auto f1 = arrow::ipc::internal::json::ArrayFromJSON( + arrow::decimal128(10, 3), R"(["1.234", "1234.000", "-9876.543", "666.888"])") + .ValueOrDie(); + auto f2 = + arrow::ipc::internal::json::ArrayFromJSON(arrow::timestamp(arrow::TimeUnit::NANO), + R"(["1970-01-01T00:00:59","2000-02-29T23:23:23", + "1899-01-01T00:59:20","2033-05-18T03:33:20"])") + .ValueOrDie(); + auto f3 = + arrow::ipc::internal::json::ArrayFromJSON(arrow::binary(), R"(["aaa", "bb", "ccc", "bbb"])") + .ValueOrDie(); + auto f4 = arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_({ + field("sub1", arrow::int64()), + field("sub2", arrow::int64()), + field("sub3", arrow::int64()), + field("sub4", arrow::int64()), + }), + R"([ + [1, 3, 2, 5], + [2, 2, 1, 3], + [3, 2, 1, 3], + [4, 1, 0, 2] + ])") + .ValueOrDie(); + auto f5 = arrow::ipc::internal::json::ArrayFromJSON(arrow::list(arrow::int64()), + "[[1, 1, 2], [3], [2], [-4]]") + .ValueOrDie(); + auto f6 = arrow::ipc::internal::json::ArrayFromJSON(arrow::map(arrow::int32(), arrow::int64()), + R"([[[1, 3], [4, 4]], + [[1, 5], [7, 6], [100, 7]], + [[0, 9]], + null])") + .ValueOrDie(); + auto f7 = arrow::ipc::internal::json::ArrayFromJSON( + arrow::map(arrow::int32(), arrow::list(arrow::int64())), + R"([[[1, [10, 20]], [4, [40, 50, 100]]], + [[1, [1, 2]], [7, [6]], [100, [8]]], + [[0, [9]]], + null])") + .ValueOrDie(); + auto f8 = + arrow::ipc::internal::json::ArrayFromJSON(arrow::timestamp(arrow::TimeUnit::SECOND), + R"(["1970-01-01T00:00:59","2000-02-29T23:23:23", + "1899-01-01T00:59:20","2033-05-18T03:33:20"])") + .ValueOrDie(); + auto f9 = arrow::ipc::internal::json::ArrayFromJSON( + arrow::timestamp(arrow::TimeUnit::MILLI), + R"(["1970-01-01T00:00:59.001","2000-02-29T23:23:23", + "1899-01-01T00:59:20","2033-05-18T03:33:20"])") + .ValueOrDie(); + auto f10 = arrow::ipc::internal::json::ArrayFromJSON( + arrow::timestamp(arrow::TimeUnit::MICRO), + R"(["1970-01-01T00:00:59.000001","2000-02-29T23:23:23", + "1899-01-01T00:59:20","2033-05-18T03:33:20"])") + .ValueOrDie(); + + auto data = arrow::StructArray::Make({f0, f1, f2, f3, f4, f5, f6, f7, f8, f9, f10}, + target_type->fields()) + .ValueOrDie(); + + auto row = ColumnarRow(data->fields(), pool, 0); + ASSERT_EQ(row.GetFieldCount(), 11); + ASSERT_EQ(row.GetDate(0), 109); + ASSERT_EQ(row.GetDecimal(1, 10, 3), Decimal(10, 3, 1234)); + + auto ts = row.GetTimestamp(2, /*precision=*/9); + ASSERT_EQ(ts, Timestamp(59000, 0)); + + ASSERT_EQ(*row.GetBinary(3), Bytes("aaa", pool.get())); + ASSERT_EQ(std::string(row.GetStringView(3)), "aaa"); + + auto result_row = row.GetRow(4, 4); + ASSERT_EQ(result_row->GetLong(0), 1); + ASSERT_EQ(result_row->GetLong(1), 3); + ASSERT_EQ(result_row->GetLong(2), 2); + ASSERT_EQ(result_row->GetLong(3), 5); + + std::vector values = {1, 1, 2}; + auto result_array = row.GetArray(5); + ASSERT_EQ(result_array->ToLongArray().value(), values); + + auto result_key = row.GetMap(6)->KeyArray(); + auto result_value = row.GetMap(6)->ValueArray(); + ASSERT_EQ(result_key->ToIntArray().value(), std::vector({1, 4})); + ASSERT_EQ(result_value->ToLongArray().value(), std::vector({3, 4})); + + result_key = row.GetMap(7)->KeyArray(); + result_value = row.GetMap(7)->ValueArray(); + + ASSERT_EQ(result_key->ToIntArray().value(), std::vector({1, 4})); + ASSERT_EQ(2, result_value->Size()); + ASSERT_EQ(result_value->GetArray(0)->ToLongArray().value(), std::vector({10, 20})); + ASSERT_EQ(result_value->GetArray(1)->ToLongArray().value(), + std::vector({40, 50, 100})); + + auto ts_second = row.GetTimestamp(8, /*precision=*/0); + ASSERT_EQ(ts_second, Timestamp(59000, 0)); + + auto ts_milli = row.GetTimestamp(9, /*precision=*/3); + ASSERT_EQ(ts_milli, Timestamp(59001, 0)); + + auto ts_micro = row.GetTimestamp(10, /*precision=*/6); + ASSERT_EQ(ts_micro, Timestamp(59000, 1000)); +} + +TEST(ColumnarRowTest, TestTimestampType) { + auto pool = GetDefaultPool(); + auto timezone = DateTimeUtils::GetLocalTimezoneName(); + arrow::FieldVector fields = { + arrow::field("ts_sec", arrow::timestamp(arrow::TimeUnit::SECOND)), + arrow::field("ts_milli", arrow::timestamp(arrow::TimeUnit::MILLI)), + arrow::field("ts_micro", arrow::timestamp(arrow::TimeUnit::MICRO)), + arrow::field("ts_nano", arrow::timestamp(arrow::TimeUnit::NANO)), + arrow::field("ts_tz_sec", arrow::timestamp(arrow::TimeUnit::SECOND, timezone)), + arrow::field("ts_tz_milli", arrow::timestamp(arrow::TimeUnit::MILLI, timezone)), + arrow::field("ts_tz_micro", arrow::timestamp(arrow::TimeUnit::MICRO, timezone)), + arrow::field("ts_tz_nano", arrow::timestamp(arrow::TimeUnit::NANO, timezone)), + }; + auto array = std::dynamic_pointer_cast( + arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(fields), R"([ +["1970-01-01 00:00:01", "1970-01-01 00:00:00.001", "1970-01-01 00:00:00.000001", "1970-01-01 00:00:00.000000001", "1970-01-01 00:00:02", "1970-01-01 00:00:00.002", "1970-01-01 00:00:00.000002", "1970-01-01 00:00:00.000000002"], +["1970-01-01 00:00:03", "1970-01-01 00:00:00.003", null, "1970-01-01 00:00:00.000000003", "1970-01-01 00:00:04", "1970-01-01 00:00:00.004", "1970-01-01 00:00:00.000004", "1970-01-01 00:00:00.000000004"], +["1970-01-01 00:00:05", "1970-01-01 00:00:00.005", null, null, "1970-01-01 00:00:06", null, "1970-01-01 00:00:00.000006", null] + ])") + .ValueOrDie()); + { + auto row = ColumnarRow(array->fields(), pool, 0); + ASSERT_EQ(row.GetFieldCount(), 8); + ASSERT_EQ(row.GetTimestamp(0, /*precision=*/0), Timestamp(1000, 0)); + ASSERT_EQ(row.GetTimestamp(1, /*precision=*/3), Timestamp(1, 0)); + ASSERT_EQ(row.GetTimestamp(2, /*precision=*/6), Timestamp(0, 1000)); + ASSERT_EQ(row.GetTimestamp(3, /*precision=*/9), Timestamp(0, 1)); + ASSERT_EQ(row.GetTimestamp(4, /*precision=*/0), Timestamp(2000, 0)); + ASSERT_EQ(row.GetTimestamp(5, /*precision=*/3), Timestamp(2, 0)); + ASSERT_EQ(row.GetTimestamp(6, /*precision=*/6), Timestamp(0, 2000)); + ASSERT_EQ(row.GetTimestamp(7, /*precision=*/9), Timestamp(0, 2)); + } + { + auto row = ColumnarRow(array->fields(), pool, 2); + ASSERT_EQ(row.GetFieldCount(), 8); + ASSERT_EQ(row.GetTimestamp(0, /*precision=*/0), Timestamp(5000, 0)); + ASSERT_EQ(row.GetTimestamp(1, /*precision=*/3), Timestamp(5, 0)); + ASSERT_TRUE(row.IsNullAt(2)); + ASSERT_TRUE(row.IsNullAt(3)); + ASSERT_EQ(row.GetTimestamp(4, /*precision=*/0), Timestamp(6000, 0)); + ASSERT_TRUE(row.IsNullAt(5)); + ASSERT_EQ(row.GetTimestamp(6, /*precision=*/6), Timestamp(0, 6000)); + } +} + +TEST(ColumnarRowTest, TestNull) { + auto pool = GetDefaultPool(); + std::shared_ptr target_type = + arrow::struct_({arrow::field("f1", arrow::boolean()), arrow::field("f2", arrow::int8())}); + auto f1 = + arrow::ipc::internal::json::ArrayFromJSON(arrow::boolean(), R"([null, false, false, true])") + .ValueOrDie(); + auto f2 = + arrow::ipc::internal::json::ArrayFromJSON(arrow::int8(), R"([null, 1, 2, 3])").ValueOrDie(); + auto data = arrow::StructArray::Make({f1, f2}, target_type->fields()).ValueOrDie(); + + auto row = ColumnarRow(data->fields(), pool, 0); + row.SetRowKind(RowKind::Insert()); + ASSERT_EQ(row.GetFieldCount(), 2); + ASSERT_EQ(row.GetRowKind().value(), RowKind::Insert()); + ASSERT_TRUE(row.IsNullAt(0)); + ASSERT_TRUE(row.IsNullAt(1)); +} + +TEST(ColumnarRowTest, TestDictionary) { + auto pool = GetDefaultPool(); + auto dict = arrow::ipc::internal::json::ArrayFromJSON(arrow::utf8(), R"(["foo", "bar", "baz"])") + .ValueOrDie(); + auto dict_type = arrow::dictionary(arrow::int32(), arrow::utf8()); + auto indices = + arrow::ipc::internal::json::ArrayFromJSON(arrow::int32(), "[1, 2, 0, 2, 0]").ValueOrDie(); + std::shared_ptr dict_array = + std::make_shared(dict_type, indices, dict); + + auto f0 = arrow::field("f0", arrow::list(dict_type)); + auto f1 = arrow::field("f1", arrow::struct_({arrow::field("sub1", arrow::int64()), + arrow::field("sub2", arrow::binary()), + arrow::field("sub3", dict_type)})); + + auto list_offsets = + arrow::ipc::internal::json::ArrayFromJSON(arrow::int32(), R"([0, 5])").ValueOrDie(); + auto f0_array = arrow::ListArray::FromArrays(*list_offsets, *dict_array).ValueOrDie(); + + auto sub1_array = + arrow::ipc::internal::json::ArrayFromJSON(arrow::int64(), R"([1])").ValueOrDie(); + auto sub2_array = + arrow::ipc::internal::json::ArrayFromJSON(arrow::binary(), R"(["apple"])").ValueOrDie(); + auto sub3_array = + std::make_shared(dict_type, indices->Slice(0, 1), dict); + auto f1_array = std::make_shared( + f1->type(), /*length=*/1, arrow::ArrayVector({sub1_array, sub2_array, sub3_array})); + + auto struct_type = arrow::struct_({f0, f1}); + // data: [["bar", "baz", "foo", "baz", "foo"], [1, "apple", "bar"]] + auto data = std::make_shared(struct_type, /*length=*/1, + arrow::ArrayVector({f0_array, f1_array})); + + auto row = ColumnarRow(data->fields(), pool, 0); + ASSERT_FALSE(row.IsNullAt(0)); + auto internal_array = row.GetArray(0); + ASSERT_TRUE(internal_array); + ASSERT_EQ(5, internal_array->Size()); + ASSERT_EQ("bar", internal_array->GetString(0).ToString()); + ASSERT_EQ("baz", internal_array->GetString(1).ToString()); + ASSERT_EQ("foo", internal_array->GetString(2).ToString()); + ASSERT_EQ("baz", internal_array->GetString(3).ToString()); + ASSERT_EQ("foo", internal_array->GetString(4).ToString()); + + ASSERT_EQ("bar", std::string(internal_array->GetStringView(0))); + ASSERT_EQ("baz", std::string(internal_array->GetStringView(1))); + ASSERT_EQ("foo", std::string(internal_array->GetStringView(2))); + ASSERT_EQ("baz", std::string(internal_array->GetStringView(3))); + ASSERT_EQ("foo", std::string(internal_array->GetStringView(4))); + + ASSERT_FALSE(row.IsNullAt(1)); + auto internal_row = row.GetRow(1, 3); + ASSERT_TRUE(internal_row); + ASSERT_EQ(1, internal_row->GetLong(0)); + auto bytes = internal_row->GetBinary(1); + ASSERT_EQ("apple", std::string(bytes->data(), bytes->size())); + ASSERT_EQ("apple", std::string(internal_row->GetStringView(1))); + ASSERT_EQ("bar", internal_row->GetString(2).ToString()); + ASSERT_EQ("bar", std::string(internal_row->GetStringView(2))); +} + +TEST(ColumnarRowTest, TestDataLifeCycle) { + auto pool = GetDefaultPool(); + std::shared_ptr target_type = arrow::struct_({arrow::field( + "f0", arrow::struct_({field("sub1", arrow::int64()), field("sub2", arrow::int64()), + field("sub3", arrow::int64()), field("sub4", arrow::int64())}))}); + auto f0 = arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_({ + field("sub1", arrow::int64()), + field("sub2", arrow::int64()), + field("sub3", arrow::int64()), + field("sub4", arrow::int64()), + }), + R"([ + [1, 3, 2, 5], + [2, 2, 1, 3], + [3, 2, 1, 3], + [4, 1, 0, 2] + ])") + .ValueOrDie(); + auto data = arrow::StructArray::Make({f0}, target_type->fields()).ValueOrDie(); + auto row = std::make_unique(data, data->fields(), pool, 0); + data.reset(); + f0.reset(); + // array data is only held by columnar row + ASSERT_EQ(1, row->struct_array_.use_count()); + + ASSERT_EQ(row->GetFieldCount(), 1); + ASSERT_FALSE(row->IsNullAt(0)); + auto result_row = row->GetRow(0, 4); + + ASSERT_FALSE(result_row->IsNullAt(0)); + ASSERT_EQ(result_row->GetLong(0), 1); + ASSERT_FALSE(result_row->IsNullAt(1)); + ASSERT_EQ(result_row->GetLong(1), 3); + ASSERT_FALSE(result_row->IsNullAt(2)); + ASSERT_EQ(result_row->GetLong(2), 2); + ASSERT_FALSE(result_row->IsNullAt(3)); + ASSERT_EQ(result_row->GetLong(3), 5); +} + +TEST(ColumnarRowTest, TestColumnarRowRefGetBinary) { + auto pool = GetDefaultPool(); + std::shared_ptr target_type = arrow::struct_({ + arrow::field("f0", arrow::binary()), + arrow::field("f1", arrow::binary()), + }); + auto f0 = + arrow::ipc::internal::json::ArrayFromJSON(arrow::binary(), R"(["hello", "world", null])") + .ValueOrDie(); + auto f1 = arrow::ipc::internal::json::ArrayFromJSON(arrow::binary(), R"(["abc", "", "xyz"])") + .ValueOrDie(); + auto data = arrow::StructArray::Make({f0, f1}, target_type->fields()).ValueOrDie(); + + auto ctx = std::make_shared(data->fields(), pool); + + { + ColumnarRowRef row(ctx, 0); + auto binary = row.GetBinary(0); + ASSERT_TRUE(binary); + ASSERT_EQ(std::string(binary->data(), binary->size()), "hello"); + + auto binary1 = row.GetBinary(1); + ASSERT_TRUE(binary1); + ASSERT_EQ(std::string(binary1->data(), binary1->size()), "abc"); + } + { + ColumnarRowRef row(ctx, 1); + auto binary = row.GetBinary(0); + ASSERT_TRUE(binary); + ASSERT_EQ(std::string(binary->data(), binary->size()), "world"); + + auto binary1 = row.GetBinary(1); + ASSERT_TRUE(binary1); + ASSERT_EQ(binary1->size(), 0); + } + { + ColumnarRowRef row(ctx, 2); + ASSERT_TRUE(row.IsNullAt(0)); + + auto binary1 = row.GetBinary(1); + ASSERT_TRUE(binary1); + ASSERT_EQ(std::string(binary1->data(), binary1->size()), "xyz"); + } +} + +TEST(ColumnarRowTest, TestColumnarRowRefToString) { + auto pool = GetDefaultPool(); + std::shared_ptr target_type = + arrow::struct_({arrow::field("f0", arrow::int32())}); + auto f0 = + arrow::ipc::internal::json::ArrayFromJSON(arrow::int32(), R"([1, 2, 3])").ValueOrDie(); + auto data = arrow::StructArray::Make({f0}, target_type->fields()).ValueOrDie(); + + auto ctx = std::make_shared(data->fields(), pool); + + { + ColumnarRowRef row(ctx, 0); + ASSERT_EQ(row.ToString(), "ColumnarRowRef, row_id 0"); + } + { + ColumnarRowRef row(ctx, 2); + ASSERT_EQ(row.ToString(), "ColumnarRowRef, row_id 2"); + } +} + +} // namespace paimon::test diff --git a/src/paimon/common/data/columnar/columnar_utils.h b/src/paimon/common/data/columnar/columnar_utils.h new file mode 100644 index 0000000..a1fa31e --- /dev/null +++ b/src/paimon/common/data/columnar/columnar_utils.h @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include +#include +#include +#include + +#include "arrow/api.h" +#include "arrow/array/array_base.h" +#include "arrow/array/array_binary.h" +#include "arrow/array/array_dict.h" +#include "arrow/array/array_primitive.h" +#include "arrow/type_traits.h" +#include "arrow/util/checked_cast.h" +#include "paimon/memory/bytes.h" + +namespace paimon { +class MemoryPool; + +class ColumnarUtils { + public: + ColumnarUtils() = delete; + ~ColumnarUtils() = delete; + + template + static ValueType GetGenericValue(const arrow::Array* array, int32_t pos) { + using ArrayType = typename arrow::TypeTraits::ArrayType; + const auto* typed_array = arrow::internal::checked_cast(array); + assert(typed_array); + return typed_array->Value(pos); + } + + static std::string_view GetView(const arrow::Array* array, int32_t pos) { + auto type_id = array->type_id(); + bool is_dict = (type_id == arrow::Type::type::DICTIONARY); + if (!is_dict) { + const auto* typed_array = + arrow::internal::checked_cast(array); + assert(typed_array); + return typed_array->GetView(pos); + } else { + const auto* typed_array = + arrow::internal::checked_cast(array); + assert(typed_array); + auto dict_type = + arrow::internal::checked_pointer_cast(array->type()); + assert(dict_type); + auto value_type_id = dict_type->value_type()->id(); + auto index_type_id = dict_type->index_type()->id(); + int64_t dict_index = -1; + if (index_type_id == arrow::Type::type::INT32) { + auto indices = + arrow::internal::checked_cast(typed_array->indices().get()); + assert(indices); + dict_index = indices->Value(pos); + } else if (index_type_id == arrow::Type::type::INT64) { + auto indices = + arrow::internal::checked_cast(typed_array->indices().get()); + assert(indices); + dict_index = indices->Value(pos); + } + assert(dict_index >= 0); + if (value_type_id == arrow::Type::type::STRING) { + auto dictionary = arrow::internal::checked_cast( + typed_array->dictionary().get()); + assert(dictionary); + return dictionary->GetView(dict_index); + } else if (value_type_id == arrow::Type::type::LARGE_STRING) { + auto dictionary = arrow::internal::checked_cast( + typed_array->dictionary().get()); + assert(dictionary); + return dictionary->GetView(dict_index); + } + assert(false); + return std::string_view(); + } + } + + template + static std::shared_ptr GetBytes(const arrow::Array* array, int32_t pos, + MemoryPool* pool) { + auto view = GetView(array, pos); + std::shared_ptr bytes = Bytes::AllocateBytes(view.size(), pool); + memcpy(bytes->data(), view.data(), view.size()); + return bytes; + } +}; +} // namespace paimon diff --git a/src/paimon/common/data/columnar/columnar_utils_test.cpp b/src/paimon/common/data/columnar/columnar_utils_test.cpp new file mode 100644 index 0000000..7eb8187 --- /dev/null +++ b/src/paimon/common/data/columnar/columnar_utils_test.cpp @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "paimon/common/data/columnar/columnar_utils.h" + +#include + +#include "arrow/api.h" +#include "arrow/array/array_dict.h" +#include "arrow/ipc/json_simple.h" +#include "gtest/gtest.h" +#include "paimon/memory/memory_pool.h" + +namespace paimon::test { +TEST(ColumnarUtilsTest, TestGetViewAndBytes) { + auto pool = GetDefaultPool(); + auto array = arrow::ipc::internal::json::ArrayFromJSON(arrow::utf8(), R"(["abc", "def", "hi"])") + .ValueOrDie(); + std::string_view view = ColumnarUtils::GetView(array.get(), 2); + ASSERT_EQ(std::string(view), "hi"); + auto bytes = ColumnarUtils::GetBytes(array.get(), 1, pool.get()); + ASSERT_EQ(*std::make_shared("def", pool.get()), *bytes); +} + +TEST(ColumnarUtilsTest, TestGetViewAndBytesOfDict) { + auto pool = GetDefaultPool(); + auto dict = arrow::ipc::internal::json::ArrayFromJSON(arrow::utf8(), R"(["foo", "bar", "baz"])") + .ValueOrDie(); + auto dict_type = arrow::dictionary(arrow::int32(), arrow::utf8()); + auto indices = + arrow::ipc::internal::json::ArrayFromJSON(arrow::int32(), "[1, 2, 0, 2, 0]").ValueOrDie(); + std::shared_ptr dict_array = + std::make_shared(dict_type, indices, dict); + + ASSERT_EQ("bar", std::string(ColumnarUtils::GetView(dict_array.get(), 0))); + ASSERT_EQ("baz", std::string(ColumnarUtils::GetView(dict_array.get(), 1))); + ASSERT_EQ("foo", std::string(ColumnarUtils::GetView(dict_array.get(), 2))); + ASSERT_EQ("baz", std::string(ColumnarUtils::GetView(dict_array.get(), 3))); + ASSERT_EQ("foo", std::string(ColumnarUtils::GetView(dict_array.get(), 4))); +} + +} // namespace paimon::test diff --git a/src/paimon/common/utils/date_time_utils.h b/src/paimon/common/utils/date_time_utils.h new file mode 100644 index 0000000..b72a043 --- /dev/null +++ b/src/paimon/common/utils/date_time_utils.h @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include + +#include +#include +#include +#include +#include +#include +#include + +#include "arrow/api.h" +#include "arrow/compute/api.h" +#include "arrow/vendored/datetime.h" +#include "fmt/format.h" +#include "paimon/common/utils/arrow/status_utils.h" +#include "paimon/data/timestamp.h" +#include "paimon/result.h" +namespace paimon { +/// Utils for date time. +class DateTimeUtils { + public: + DateTimeUtils() = delete; + ~DateTimeUtils() = delete; + + /// The number of milliseconds in a day. + /// + /// This is the modulo 'mask' used when converting TIMESTAMP values to DATE and TIME values. + static constexpr int64_t MILLIS_PER_DAY = 86400000l; // = 24 * 60 * 60 * 1000 + static constexpr int64_t SECONDS_PER_DAY = 86400l; // = 24 * 60 * 60 + static constexpr int64_t NANOS_PER_MILLIS = 1000000l; + enum TimeType { + SECOND = 0, + MILLISECOND = 1, + MICROSECOND = 2, + NANOSECOND = 3, + }; + constexpr static int64_t CONVERSION_FACTORS[] = {1L, 1000L, 1000000L, 1000000000L}; + + // convert a timestamp of a certain type into a combination of two specified types + // e.g., src_timestamp = 12345678, src_type = ns, dst_first_type = ms, dst_second_type = ns + // return: {12, 345678} + static std::pair TimestampConverter(int64_t src_timestamp, + const TimeType& src_type, + const TimeType& dst_first_type, + const TimeType& dst_second_type) { + if (src_type <= dst_first_type) { + // e.g., ms -> {us, ns} or {ms, ns} or {us, us} or {ns, ms} + int64_t conversion_factor_to_first_type = + CONVERSION_FACTORS[dst_first_type] / CONVERSION_FACTORS[src_type]; + // TODO(jinli.zjw): maybe overflow int64 + assert(src_timestamp * conversion_factor_to_first_type < + std::numeric_limits::max()); + return std::make_pair(src_timestamp * conversion_factor_to_first_type, 0L); + } else { + // e.g., ns -> {ms, ns} or {ms, s} or {ms, us} + int64_t conversion_factor_to_first_type = + CONVERSION_FACTORS[src_type] / CONVERSION_FACTORS[dst_first_type]; + double conversion_factor_to_second_type = + static_cast(CONVERSION_FACTORS[dst_second_type]) / + CONVERSION_FACTORS[src_type]; + + int64_t first_value = src_timestamp / conversion_factor_to_first_type; + int64_t second_value = src_timestamp % conversion_factor_to_first_type; + if (second_value < 0) { + second_value += conversion_factor_to_first_type; + first_value--; + } + second_value = conversion_factor_to_second_type * second_value; + return std::make_pair(first_value, second_value); + } + } + + static int64_t TimestampToInteger(const Timestamp& timestamp, const TimeType& dst_type) { + if (dst_type == TimeType::SECOND) { + return timestamp.GetMillisecond() / CONVERSION_FACTORS[MILLISECOND]; + } else if (dst_type == TimeType::MILLISECOND) { + return timestamp.GetMillisecond(); + } else if (dst_type == TimeType::MICROSECOND) { + return timestamp.ToMicrosecond(); + } + return timestamp.ToNanosecond(); + } + + static inline uint64_t GetCurrentUTCTimeUs() { + struct timeval ts; + gettimeofday(&ts, nullptr); + return static_cast(ts.tv_sec) * 1000000ULL + static_cast(ts.tv_usec); + } + + static inline Result GetCurrentLocalTimeUs() { + uint64_t utc_micro = GetCurrentUTCTimeUs(); + auto utc_ts_scalar = std::make_shared( + static_cast(utc_micro), arrow::TimeUnit::MICRO, GetLocalTimezoneName()); + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW( + arrow::Datum local_micro, arrow::compute::LocalTimestamp(arrow::Datum(utc_ts_scalar))); + auto local_ts_scalar = + std::dynamic_pointer_cast(local_micro.scalar()); + return *(static_cast(local_ts_scalar->data())); + } + + static inline Result GetCurrentLocalHour() { + PAIMON_ASSIGN_OR_RAISE(uint64_t local_us, GetCurrentLocalTimeUs()); + auto local_seconds = static_cast(local_us / 1000000); + std::tm local_tm{}; + gmtime_r(&local_seconds, &local_tm); + return local_tm.tm_hour; + } + + static inline int32_t GetPrecisionFromType( + const std::shared_ptr& timestamp_type) { + int32_t precision = Timestamp::MAX_PRECISION; + if (timestamp_type->unit() == arrow::TimeUnit::type::SECOND) { + precision = Timestamp::MIN_PRECISION; + } else if (timestamp_type->unit() == arrow::TimeUnit::type::MILLI) { + precision = Timestamp::MILLIS_PRECISION; + } else if (timestamp_type->unit() == arrow::TimeUnit::type::MICRO) { + precision = Timestamp::DEFAULT_PRECISION; + } + return precision; + } + + static inline TimeType GetTimeTypeFromArrowType( + const std::shared_ptr& timestamp_type) { + if (timestamp_type->unit() == arrow::TimeUnit::type::SECOND) { + return TimeType::SECOND; + } else if (timestamp_type->unit() == arrow::TimeUnit::type::MILLI) { + return TimeType::MILLISECOND; + } else if (timestamp_type->unit() == arrow::TimeUnit::type::MICRO) { + return TimeType::MICROSECOND; + } + return TimeType::NANOSECOND; + } + + static inline Result> GetTypeFromPrecision( + int32_t precision, bool with_timezone) { + std::string timezone = with_timezone ? GetLocalTimezoneName() : ""; + if (precision == Timestamp::MIN_PRECISION) { + return arrow::timestamp(arrow::TimeUnit::type::SECOND, timezone); + } else if (precision == Timestamp::MILLIS_PRECISION) { + return arrow::timestamp(arrow::TimeUnit::type::MILLI, timezone); + } else if (precision == Timestamp::DEFAULT_PRECISION) { + return arrow::timestamp(arrow::TimeUnit::type::MICRO, timezone); + } else if (precision == Timestamp::MAX_PRECISION) { + return arrow::timestamp(arrow::TimeUnit::type::NANO, timezone); + } + return Status::Invalid("only support precision 0/3/6/9 in timestamp type"); + } + + static std::string GetLocalTimezoneName() { + // find local tz in env + const char* timezone = std::getenv("TZ"); + if (timezone != nullptr && *timezone != '\0') { + return std::string(timezone); + } + // find local tz in file + auto* tz = arrow_vendored::date::current_zone(); + return tz ? tz->name() : "UTC"; + } + + static std::string GetArrowTimeUnitStr(arrow::TimeUnit::type unit) { + switch (unit) { + case arrow::TimeUnit::SECOND: + return "SECOND"; + case arrow::TimeUnit::MILLI: + return "MILLISECOND"; + case arrow::TimeUnit::MICRO: + return "MICROSECOND"; + case arrow::TimeUnit::NANO: + return "NANOSECOND"; + default: + break; + } + return "UNKNOWN"; + } + + // there may be a precision loss for nano + static Result ToUTCTimestamp(const Timestamp& timestamp) { + int64_t micro_second = timestamp.ToMicrosecond(); + auto local_ts_scalar = + std::make_shared(micro_second, arrow::TimeUnit::MICRO); + arrow::compute::AssumeTimezoneOptions options(DateTimeUtils::GetLocalTimezoneName()); + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW( + arrow::Datum target_scalar, + arrow::compute::AssumeTimezone(arrow::Datum(local_ts_scalar), options)); + auto utc_ts_scalar = + std::dynamic_pointer_cast(target_scalar.scalar()); + auto [milli, nano] = DateTimeUtils::TimestampConverter( + *(static_cast(utc_ts_scalar->data())), + DateTimeUtils::TimeType::MICROSECOND, DateTimeUtils::TimeType::MILLISECOND, + DateTimeUtils::TimeType::NANOSECOND); + return Timestamp(milli, nano); + } +}; +} // namespace paimon From 056a165a9b38694b8858a2833ab1f41fef3070ff Mon Sep 17 00:00:00 2001 From: lxy264173 Date: Wed, 27 May 2026 11:44:10 +0800 Subject: [PATCH 2/3] refactor ColumnarArray with raw pointer constructor parameter --- .../common/data/columnar/columnar_array.cpp | 2 +- .../common/data/columnar/columnar_array.h | 11 +++-- .../data/columnar/columnar_array_test.cpp | 48 +++++++++---------- .../common/data/columnar/columnar_map.cpp | 4 +- .../common/data/columnar/columnar_row.cpp | 2 +- .../common/data/columnar/columnar_row_ref.cpp | 2 +- 6 files changed, 37 insertions(+), 32 deletions(-) diff --git a/src/paimon/common/data/columnar/columnar_array.cpp b/src/paimon/common/data/columnar/columnar_array.cpp index 2fd1507..28a41f3 100644 --- a/src/paimon/common/data/columnar/columnar_array.cpp +++ b/src/paimon/common/data/columnar/columnar_array.cpp @@ -71,7 +71,7 @@ std::shared_ptr ColumnarArray::GetArray(int32_t pos) const { assert(list_array); int32_t offset = list_array->value_offset(offset_ + pos); int32_t length = list_array->value_length(offset_ + pos); - return std::make_shared(list_array->values(), pool_, offset, length); + return std::make_shared(list_array->values().get(), pool_, offset, length); } std::shared_ptr ColumnarArray::GetMap(int32_t pos) const { diff --git a/src/paimon/common/data/columnar/columnar_array.h b/src/paimon/common/data/columnar/columnar_array.h index 9554845..773e46f 100644 --- a/src/paimon/common/data/columnar/columnar_array.h +++ b/src/paimon/common/data/columnar/columnar_array.h @@ -51,11 +51,16 @@ class Bytes; class MemoryPool; /// Columnar array to support access to vector column data. +/// +/// NOTE: This class holds a non-owning raw pointer to the underlying arrow::Array for efficiency. +/// The caller must ensure that the pointed-to Array outlives this ColumnarArray instance. +/// Typically, lifetime is guaranteed by the owning ColumnarBatchContext or the parent +/// arrow container (e.g., ListArray, MapArray) that holds the shared_ptr. class ColumnarArray : public InternalArray { public: - ColumnarArray(const std::shared_ptr& array, - const std::shared_ptr& pool, int32_t offset, int32_t length) - : pool_(pool), array_(array.get()), offset_(offset), length_(length) { + ColumnarArray(const arrow::Array* array, const std::shared_ptr& pool, + int32_t offset, int32_t length) + : pool_(pool), array_(array), offset_(offset), length_(length) { assert(array_); assert(array_->length() >= offset + length); } diff --git a/src/paimon/common/data/columnar/columnar_array_test.cpp b/src/paimon/common/data/columnar/columnar_array_test.cpp index c976ae2..650d7eb 100644 --- a/src/paimon/common/data/columnar/columnar_array_test.cpp +++ b/src/paimon/common/data/columnar/columnar_array_test.cpp @@ -41,7 +41,7 @@ TEST(ColumnarArrayTest, TestSimple) { arrow::list(arrow::boolean()), "[[true, false], [true], [false], [false, true]]") .ValueOrDie(); auto list_array = arrow::internal::checked_pointer_cast(f1); - auto array = ColumnarArray(list_array->values(), pool, /*offset=*/2, 1); + auto array = ColumnarArray(list_array->values().get(), pool, /*offset=*/2, 1); ASSERT_EQ(array.Size(), 1); ASSERT_EQ(array.GetBoolean(0), true); std::vector expected_array = {static_cast(1)}; @@ -52,7 +52,7 @@ TEST(ColumnarArrayTest, TestSimple) { "[[1, 1, 2], [3], [2], [2]]") .ValueOrDie(); auto list_array = arrow::internal::checked_pointer_cast(f1); - auto array = ColumnarArray(list_array->values(), pool, /*offset=*/5, 1); + auto array = ColumnarArray(list_array->values().get(), pool, /*offset=*/5, 1); ASSERT_EQ(array.GetByte(0), 2); std::vector expected_array = {static_cast(2)}; ASSERT_EQ(array.ToByteArray().value(), expected_array); @@ -62,7 +62,7 @@ TEST(ColumnarArrayTest, TestSimple) { "[[1, 1, 2], [3], [2], [-4]]") .ValueOrDie(); auto list_array = arrow::internal::checked_pointer_cast(f1); - auto array = ColumnarArray(list_array->values(), pool, /*offset=*/0, 3); + auto array = ColumnarArray(list_array->values().get(), pool, /*offset=*/0, 3); ASSERT_EQ(array.GetShort(0), 1); ASSERT_EQ(array.GetShort(1), 1); ASSERT_EQ(array.GetShort(2), 2); @@ -74,7 +74,7 @@ TEST(ColumnarArrayTest, TestSimple) { "[[1, 1, 2], [3], [2], [-4]]") .ValueOrDie(); auto list_array = arrow::internal::checked_pointer_cast(f1); - auto array = ColumnarArray(list_array->values(), pool, /*offset=*/3, 1); + auto array = ColumnarArray(list_array->values().get(), pool, /*offset=*/3, 1); ASSERT_EQ(array.GetInt(0), 3); std::vector expected_array = {3}; ASSERT_EQ(array.ToIntArray().value(), expected_array); @@ -84,7 +84,7 @@ TEST(ColumnarArrayTest, TestSimple) { "[[1, 1, 2], [3], [2], [-4]]") .ValueOrDie(); auto list_array = arrow::internal::checked_pointer_cast(f1); - auto array = ColumnarArray(list_array->values(), pool, /*offset=*/4, 1); + auto array = ColumnarArray(list_array->values().get(), pool, /*offset=*/4, 1); ASSERT_EQ(array.GetLong(0), 2); std::vector expected_array = {2}; ASSERT_EQ(array.ToLongArray().value(), expected_array); @@ -94,7 +94,7 @@ TEST(ColumnarArrayTest, TestSimple) { "[[1, 1, 2], [3], [null], null]") .ValueOrDie(); auto list_array = arrow::internal::checked_pointer_cast(f1); - auto array = ColumnarArray(list_array->values(), pool, /*offset=*/4, 1); + auto array = ColumnarArray(list_array->values().get(), pool, /*offset=*/4, 1); ASSERT_NOK_WITH_MSG(array.ToLongArray(), "is null"); } { @@ -102,7 +102,7 @@ TEST(ColumnarArrayTest, TestSimple) { arrow::list(arrow::float32()), "[[0.0, 1.1, 2.2], [3.3], [4.4], [5.5]]") .ValueOrDie(); auto list_array = arrow::internal::checked_pointer_cast(f1); - auto array = ColumnarArray(list_array->values(), pool, /*offset=*/0, 3); + auto array = ColumnarArray(list_array->values().get(), pool, /*offset=*/0, 3); ASSERT_NEAR(array.GetFloat(1), 1.1, 0.001); std::vector expected_array = {0.0, 1.1, 2.2}; ASSERT_EQ(array.ToFloatArray().value(), expected_array); @@ -112,7 +112,7 @@ TEST(ColumnarArrayTest, TestSimple) { arrow::list(arrow::float64()), "[[0.0, 1.1, 2.2], [3.3], [4.4], [5.5]]") .ValueOrDie(); auto list_array = arrow::internal::checked_pointer_cast(f1); - auto array = ColumnarArray(list_array->values(), pool, /*offset=*/3, 1); + auto array = ColumnarArray(list_array->values().get(), pool, /*offset=*/3, 1); ASSERT_NEAR(array.GetDouble(0), 3.3, 0.001); std::vector expected_array = {3.3}; ASSERT_EQ(array.ToDoubleArray().value(), expected_array); @@ -122,7 +122,7 @@ TEST(ColumnarArrayTest, TestSimple) { arrow::list(arrow::utf8()), R"([["abc", "def"], ["efg"], ["hello"], ["hi"]])") .ValueOrDie(); auto list_array = arrow::internal::checked_pointer_cast(f1); - auto array = ColumnarArray(list_array->values(), pool, /*offset=*/4, 1); + auto array = ColumnarArray(list_array->values().get(), pool, /*offset=*/4, 1); ASSERT_EQ(array.GetString(0).ToString(), "hi"); ASSERT_EQ(std::string(array.GetStringView(0)), "hi"); } @@ -135,7 +135,7 @@ TEST(ColumnarArrayTest, TestComplexAndNestedType) { "[[1, 1, 2], [3], [2], [-4]]") .ValueOrDie(); auto list_array = arrow::internal::checked_pointer_cast(f1); - auto array = ColumnarArray(list_array->values(), pool, /*offset=*/3, 1); + auto array = ColumnarArray(list_array->values().get(), pool, /*offset=*/3, 1); ASSERT_EQ(array.GetDate(0), 3); } { @@ -144,7 +144,7 @@ TEST(ColumnarArrayTest, TestComplexAndNestedType) { R"([["1.234", "1234.000"], ["-9876.543"], ["666.888"]])") .ValueOrDie(); auto list_array = arrow::internal::checked_pointer_cast(f1); - auto array = ColumnarArray(list_array->values(), pool, /*offset=*/0, 2); + auto array = ColumnarArray(list_array->values().get(), pool, /*offset=*/0, 2); ASSERT_EQ(array.GetDecimal(0, 10, 3), Decimal(10, 3, 1234)); } { @@ -154,7 +154,7 @@ TEST(ColumnarArrayTest, TestComplexAndNestedType) { "1899-01-01T00:59:20"],["2033-05-18T03:33:20"]])") .ValueOrDie(); auto list_array = arrow::internal::checked_pointer_cast(f1); - auto array = ColumnarArray(list_array->values(), pool, /*offset=*/0, 1); + auto array = ColumnarArray(list_array->values().get(), pool, /*offset=*/0, 1); auto ts = array.GetTimestamp(0, 9); ASSERT_EQ(ts, Timestamp(59000, 0)); } @@ -163,7 +163,7 @@ TEST(ColumnarArrayTest, TestComplexAndNestedType) { R"([["aaa", "bb"], ["ccc"], ["bbb"]])") .ValueOrDie(); auto list_array = arrow::internal::checked_pointer_cast(f1); - auto array = ColumnarArray(list_array->values(), pool, /*offset=*/0, 2); + auto array = ColumnarArray(list_array->values().get(), pool, /*offset=*/0, 2); ASSERT_EQ(*array.GetBinary(1), Bytes("bb", pool.get())); ASSERT_EQ(std::string(array.GetStringView(1)), "bb"); } @@ -182,7 +182,7 @@ TEST(ColumnarArrayTest, TestComplexAndNestedType) { ])") .ValueOrDie(); auto list_array = arrow::internal::checked_pointer_cast(f1); - auto array = ColumnarArray(list_array->values(), pool, /*offset=*/0, 2); + auto array = ColumnarArray(list_array->values().get(), pool, /*offset=*/0, 2); auto result_row = array.GetRow(1, 4); ASSERT_EQ(result_row->GetLong(0), 2); ASSERT_EQ(result_row->GetLong(1), 2); @@ -194,7 +194,7 @@ TEST(ColumnarArrayTest, TestComplexAndNestedType) { arrow::list(arrow::list(arrow::int64())), "[[[1, 2, 3], [4, 5, 6]], []]") .ValueOrDie(); auto list_array = arrow::internal::checked_pointer_cast(f1); - auto array = ColumnarArray(list_array->values(), pool, /*offset=*/0, 1); + auto array = ColumnarArray(list_array->values().get(), pool, /*offset=*/0, 1); auto result_array = array.GetArray(0); auto inner_result_array = array.GetArray(0); std::vector values = {1, 2, 3}; @@ -209,7 +209,7 @@ TEST(ColumnarArrayTest, TestComplexAndNestedType) { .ValueOrDie(); auto list_array = arrow::internal::checked_pointer_cast(f1); ASSERT_TRUE(list_array); - auto array = ColumnarArray(list_array->values(), pool, /*offset=*/0, 1); + auto array = ColumnarArray(list_array->values().get(), pool, /*offset=*/0, 1); auto result_key = array.GetMap(0)->KeyArray(); auto result_value = array.GetMap(0)->ValueArray(); ASSERT_EQ(result_key->ToIntArray().value(), std::vector({1, 4})); @@ -226,7 +226,7 @@ TEST(ColumnarArrayTest, TestTimestampType) { "1899-01-01T00:59:20"],["2033-05-18T03:33:20"]])") .ValueOrDie(); auto list_array = arrow::internal::checked_pointer_cast(f1); - auto array = ColumnarArray(list_array->values(), pool, /*offset=*/1, 2); + auto array = ColumnarArray(list_array->values().get(), pool, /*offset=*/1, 2); auto ts = array.GetTimestamp(0, 0); ASSERT_EQ(ts, Timestamp(951866603000, 0)) << ts.GetMillisecond(); } @@ -237,7 +237,7 @@ TEST(ColumnarArrayTest, TestTimestampType) { "1899-01-01T00:59:20"],["2033-05-18T03:33:20"]])") .ValueOrDie(); auto list_array = arrow::internal::checked_pointer_cast(f1); - auto array = ColumnarArray(list_array->values(), pool, /*offset=*/1, 2); + auto array = ColumnarArray(list_array->values().get(), pool, /*offset=*/1, 2); auto ts = array.GetTimestamp(0, 3); ASSERT_EQ(ts, Timestamp(951866603001, 0)) << ts.GetMillisecond(); } @@ -248,7 +248,7 @@ TEST(ColumnarArrayTest, TestTimestampType) { "1899-01-01T00:59:20"],["2033-05-18T03:33:20"]])") .ValueOrDie(); auto list_array = arrow::internal::checked_pointer_cast(f1); - auto array = ColumnarArray(list_array->values(), pool, /*offset=*/1, 2); + auto array = ColumnarArray(list_array->values().get(), pool, /*offset=*/1, 2); auto ts = array.GetTimestamp(0, 6); ASSERT_EQ(ts, Timestamp(951866603001, 1000)) << ts.GetMillisecond(); } @@ -259,7 +259,7 @@ TEST(ColumnarArrayTest, TestTimestampType) { "1899-01-01T00:59:20"],["2033-05-18T03:33:20"]])") .ValueOrDie(); auto list_array = arrow::internal::checked_pointer_cast(f1); - auto array = ColumnarArray(list_array->values(), pool, /*offset=*/1, 2); + auto array = ColumnarArray(list_array->values().get(), pool, /*offset=*/1, 2); auto ts = array.GetTimestamp(0, 9); ASSERT_EQ(ts, Timestamp(951866603001, 1001)) << ts.GetMillisecond(); } @@ -270,7 +270,7 @@ TEST(ColumnarArrayTest, TestTimestampType) { "1899-01-01T00:59:20"],["2033-05-18T03:33:20"]])") .ValueOrDie(); auto list_array = arrow::internal::checked_pointer_cast(f1); - auto array = ColumnarArray(list_array->values(), pool, /*offset=*/1, 2); + auto array = ColumnarArray(list_array->values().get(), pool, /*offset=*/1, 2); auto ts = array.GetTimestamp(0, 0); ASSERT_EQ(ts, Timestamp(951866603000, 0)) << ts.GetMillisecond(); } @@ -281,7 +281,7 @@ TEST(ColumnarArrayTest, TestTimestampType) { "1899-01-01T00:59:20"],["2033-05-18T03:33:20"]])") .ValueOrDie(); auto list_array = arrow::internal::checked_pointer_cast(f1); - auto array = ColumnarArray(list_array->values(), pool, /*offset=*/1, 2); + auto array = ColumnarArray(list_array->values().get(), pool, /*offset=*/1, 2); auto ts = array.GetTimestamp(0, 3); ASSERT_EQ(ts, Timestamp(951866603001, 0)) << ts.GetMillisecond(); } @@ -292,7 +292,7 @@ TEST(ColumnarArrayTest, TestTimestampType) { "1899-01-01T00:59:20"],["2033-05-18T03:33:20"]])") .ValueOrDie(); auto list_array = arrow::internal::checked_pointer_cast(f1); - auto array = ColumnarArray(list_array->values(), pool, /*offset=*/1, 2); + auto array = ColumnarArray(list_array->values().get(), pool, /*offset=*/1, 2); auto ts = array.GetTimestamp(0, 6); ASSERT_EQ(ts, Timestamp(951866603001, 1000)) << ts.GetMillisecond(); } @@ -303,7 +303,7 @@ TEST(ColumnarArrayTest, TestTimestampType) { "1899-01-01T00:59:20"],["2033-05-18T03:33:20"]])") .ValueOrDie(); auto list_array = arrow::internal::checked_pointer_cast(f1); - auto array = ColumnarArray(list_array->values(), pool, /*offset=*/1, 2); + auto array = ColumnarArray(list_array->values().get(), pool, /*offset=*/1, 2); auto ts = array.GetTimestamp(0, 9); ASSERT_EQ(ts, Timestamp(951866603001, 1001)) << ts.GetMillisecond(); } diff --git a/src/paimon/common/data/columnar/columnar_map.cpp b/src/paimon/common/data/columnar/columnar_map.cpp index 3d720fe..13351ad 100644 --- a/src/paimon/common/data/columnar/columnar_map.cpp +++ b/src/paimon/common/data/columnar/columnar_map.cpp @@ -36,10 +36,10 @@ ColumnarMap::ColumnarMap(const std::shared_ptr& key_array, length_(length) {} std::shared_ptr ColumnarMap::KeyArray() const { - return std::make_shared(key_array_, pool_, offset_, length_); + return std::make_shared(key_array_.get(), pool_, offset_, length_); } std::shared_ptr ColumnarMap::ValueArray() const { - return std::make_shared(value_array_, pool_, offset_, length_); + return std::make_shared(value_array_.get(), pool_, offset_, length_); } } // namespace paimon diff --git a/src/paimon/common/data/columnar/columnar_row.cpp b/src/paimon/common/data/columnar/columnar_row.cpp index fd8509d..9e43fbc 100644 --- a/src/paimon/common/data/columnar/columnar_row.cpp +++ b/src/paimon/common/data/columnar/columnar_row.cpp @@ -66,7 +66,7 @@ std::shared_ptr ColumnarRow::GetArray(int32_t pos) const { assert(list_array); int32_t offset = list_array->value_offset(row_id_); int32_t length = list_array->value_length(row_id_); - return std::make_shared(list_array->values(), pool_, offset, length); + return std::make_shared(list_array->values().get(), pool_, offset, length); } std::shared_ptr ColumnarRow::GetMap(int32_t pos) const { diff --git a/src/paimon/common/data/columnar/columnar_row_ref.cpp b/src/paimon/common/data/columnar/columnar_row_ref.cpp index ba362e9..8b04662 100644 --- a/src/paimon/common/data/columnar/columnar_row_ref.cpp +++ b/src/paimon/common/data/columnar/columnar_row_ref.cpp @@ -68,7 +68,7 @@ std::shared_ptr ColumnarRowRef::GetArray(int32_t pos) const { assert(list_array); int32_t offset = list_array->value_offset(row_id_); int32_t length = list_array->value_length(row_id_); - return std::make_shared(list_array->values(), ctx_->pool, offset, length); + return std::make_shared(list_array->values().get(), ctx_->pool, offset, length); } std::shared_ptr ColumnarRowRef::GetMap(int32_t pos) const { From 6819671e6ff4af8309fd2660074e35636a2682e7 Mon Sep 17 00:00:00 2001 From: lxy264173 Date: Thu, 28 May 2026 16:23:10 +0800 Subject: [PATCH 3/3] fix review --- src/paimon/common/data/columnar/columnar_array.cpp | 7 +++++-- src/paimon/common/data/columnar/columnar_row.cpp | 12 ++++++++++-- src/paimon/common/data/columnar/columnar_row.h | 9 +++++++++ src/paimon/common/data/columnar/columnar_row_ref.cpp | 7 +++++-- src/paimon/common/data/columnar/columnar_utils.h | 12 +++++++++++- 5 files changed, 40 insertions(+), 7 deletions(-) diff --git a/src/paimon/common/data/columnar/columnar_array.cpp b/src/paimon/common/data/columnar/columnar_array.cpp index 28a41f3..7a96106 100644 --- a/src/paimon/common/data/columnar/columnar_array.cpp +++ b/src/paimon/common/data/columnar/columnar_array.cpp @@ -47,8 +47,11 @@ Decimal ColumnarArray::GetDecimal(int32_t pos, int32_t precision, int32_t scale) auto array = arrow::internal::checked_cast(array_); assert(array); arrow::Decimal128 decimal(array->GetValue(offset_ + pos)); - return Decimal(precision, scale, - static_cast(decimal.high_bits()) << 64 | decimal.low_bits()); + return Decimal( + precision, scale, + static_cast( + static_cast(static_cast(decimal.high_bits())) << 64 | + decimal.low_bits())); } Timestamp ColumnarArray::GetTimestamp(int32_t pos, int32_t precision) const { diff --git a/src/paimon/common/data/columnar/columnar_row.cpp b/src/paimon/common/data/columnar/columnar_row.cpp index 9e43fbc..bc7b150 100644 --- a/src/paimon/common/data/columnar/columnar_row.cpp +++ b/src/paimon/common/data/columnar/columnar_row.cpp @@ -27,7 +27,9 @@ #include "arrow/util/checked_cast.h" #include "arrow/util/decimal.h" #include "paimon/common/data/columnar/columnar_array.h" +#include "paimon/common/data/columnar/columnar_batch_context.h" #include "paimon/common/data/columnar/columnar_map.h" +#include "paimon/common/data/columnar/columnar_row_ref.h" #include "paimon/common/utils/date_time_utils.h" namespace paimon { @@ -36,8 +38,11 @@ Decimal ColumnarRow::GetDecimal(int32_t pos, int32_t precision, int32_t scale) c auto array = arrow::internal::checked_cast(array_vec_[pos]); assert(array); arrow::Decimal128 decimal(array->GetValue(row_id_)); - return Decimal(precision, scale, - static_cast(decimal.high_bits()) << 64 | decimal.low_bits()); + return Decimal( + precision, scale, + static_cast( + static_cast(static_cast(decimal.high_bits())) << 64 | + decimal.low_bits())); } Timestamp ColumnarRow::GetTimestamp(int32_t pos, int32_t precision) const { @@ -58,6 +63,9 @@ Timestamp ColumnarRow::GetTimestamp(int32_t pos, int32_t precision) const { std::shared_ptr ColumnarRow::GetRow(int32_t pos, int32_t num_fields) const { auto struct_array = arrow::internal::checked_cast(array_vec_[pos]); assert(struct_array); + // NOTE: For performance, the returned nested row does NOT hold shared ownership of the parent + // StructArray. Callers must ensure the parent ColumnarRow (or its underlying RecordBatch) + // outlives the returned row to avoid dangling pointers. return std::make_shared(struct_array->fields(), pool_, row_id_); } diff --git a/src/paimon/common/data/columnar/columnar_row.h b/src/paimon/common/data/columnar/columnar_row.h index f0163bf..2156c81 100644 --- a/src/paimon/common/data/columnar/columnar_row.h +++ b/src/paimon/common/data/columnar/columnar_row.h @@ -47,10 +47,19 @@ class MemoryPool; /// Columnar row to support access to vector column data. It is a row view in arrow::Array. class ColumnarRow : public InternalRow { public: + /// @brief Construct a ColumnarRow without holding ownership of the underlying arrays. + /// @warning The caller MUST ensure the data source (e.g., RecordBatch or parent StructArray) + /// outlives this ColumnarRow. The internal array_vec_ stores raw pointers only; if the + /// source is freed first, these pointers will dangle. This design is intentional for + /// performance—avoiding per-row shared_ptr ref-count overhead on the hot read path. ColumnarRow(const arrow::ArrayVector& array_vec, const std::shared_ptr& pool, int64_t row_id) : ColumnarRow(/*struct_array holder*/ nullptr, array_vec, pool, row_id) {} + /// @brief Construct a ColumnarRow that holds shared ownership of a StructArray. + /// @note When struct_array is non-null it keeps the underlying buffers alive, making it safe + /// to outlive the original batch. Prefer this overload when the row may escape the scope of + /// its parent container. ColumnarRow(const std::shared_ptr& struct_array, const arrow::ArrayVector& array_vec, const std::shared_ptr& pool, int64_t row_id) diff --git a/src/paimon/common/data/columnar/columnar_row_ref.cpp b/src/paimon/common/data/columnar/columnar_row_ref.cpp index 8b04662..9aaba46 100644 --- a/src/paimon/common/data/columnar/columnar_row_ref.cpp +++ b/src/paimon/common/data/columnar/columnar_row_ref.cpp @@ -35,8 +35,11 @@ Decimal ColumnarRowRef::GetDecimal(int32_t pos, int32_t precision, int32_t scale auto array = arrow::internal::checked_cast(ctx_->array_vec[pos].get()); assert(array); arrow::Decimal128 decimal(array->GetValue(row_id_)); - return Decimal(precision, scale, - static_cast(decimal.high_bits()) << 64 | decimal.low_bits()); + return Decimal( + precision, scale, + static_cast( + static_cast(static_cast(decimal.high_bits())) << 64 | + decimal.low_bits())); } Timestamp ColumnarRowRef::GetTimestamp(int32_t pos, int32_t precision) const { diff --git a/src/paimon/common/data/columnar/columnar_utils.h b/src/paimon/common/data/columnar/columnar_utils.h index a1fa31e..ca9f58e 100644 --- a/src/paimon/common/data/columnar/columnar_utils.h +++ b/src/paimon/common/data/columnar/columnar_utils.h @@ -66,7 +66,17 @@ class ColumnarUtils { auto value_type_id = dict_type->value_type()->id(); auto index_type_id = dict_type->index_type()->id(); int64_t dict_index = -1; - if (index_type_id == arrow::Type::type::INT32) { + if (index_type_id == arrow::Type::type::INT8) { + auto indices = + arrow::internal::checked_cast(typed_array->indices().get()); + assert(indices); + dict_index = indices->Value(pos); + } else if (index_type_id == arrow::Type::type::INT16) { + auto indices = + arrow::internal::checked_cast(typed_array->indices().get()); + assert(indices); + dict_index = indices->Value(pos); + } else if (index_type_id == arrow::Type::type::INT32) { auto indices = arrow::internal::checked_cast(typed_array->indices().get()); assert(indices);