Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 160 additions & 0 deletions src/paimon/common/data/columnar/columnar_array.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "paimon/common/data/columnar/columnar_array.h"

#include <utility>

#include "arrow/api.h"
#include "arrow/array/array_decimal.h"
#include "arrow/array/array_nested.h"
#include "arrow/array/array_primitive.h"
#include "arrow/type_traits.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/decimal.h"
#include "fmt/format.h"
#include "paimon/common/data/columnar/columnar_batch_context.h"
#include "paimon/common/data/columnar/columnar_map.h"
#include "paimon/common/data/columnar/columnar_row_ref.h"
#include "paimon/common/utils/date_time_utils.h"

namespace paimon {
Status ColumnarArray::CheckNoNull() const {
for (int32_t i = 0; i < length_; i++) {
if (IsNullAt(i)) {
return Status::Invalid(fmt::format("row {} is null", i));
}
}
return Status::OK();
}

Decimal ColumnarArray::GetDecimal(int32_t pos, int32_t precision, int32_t scale) const {
using ArrayType = typename arrow::TypeTraits<arrow::Decimal128Type>::ArrayType;
auto array = arrow::internal::checked_cast<const ArrayType*>(array_);
assert(array);
arrow::Decimal128 decimal(array->GetValue(offset_ + pos));
return Decimal(
precision, scale,
static_cast<Decimal::int128_t>(
static_cast<Decimal::uint128_t>(static_cast<uint64_t>(decimal.high_bits())) << 64 |
decimal.low_bits()));
}

Timestamp ColumnarArray::GetTimestamp(int32_t pos, int32_t precision) const {
using ArrayType = typename arrow::TypeTraits<arrow::TimestampType>::ArrayType;
auto array = arrow::internal::checked_cast<const ArrayType*>(array_);
assert(array);
int64_t data = array->Value(offset_ + pos);
auto timestamp_type =
arrow::internal::checked_pointer_cast<arrow::TimestampType>(array->type());
// for orc format, data is saved as nano, therefore, Timestamp convert should consider precision
// in arrow array rather than input precision
DateTimeUtils::TimeType time_type = DateTimeUtils::GetTimeTypeFromArrowType(timestamp_type);
auto [milli, nano] = DateTimeUtils::TimestampConverter(
data, time_type, DateTimeUtils::TimeType::MILLISECOND, DateTimeUtils::TimeType::NANOSECOND);
return Timestamp(milli, nano);
}

std::shared_ptr<InternalArray> ColumnarArray::GetArray(int32_t pos) const {
auto list_array = arrow::internal::checked_cast<const arrow::ListArray*>(array_);
assert(list_array);
int32_t offset = list_array->value_offset(offset_ + pos);
int32_t length = list_array->value_length(offset_ + pos);
return std::make_shared<ColumnarArray>(list_array->values().get(), pool_, offset, length);
}

std::shared_ptr<InternalMap> ColumnarArray::GetMap(int32_t pos) const {
auto map_array = arrow::internal::checked_cast<const arrow::MapArray*>(array_);
assert(map_array);
int32_t offset = map_array->value_offset(offset_ + pos);
int32_t length = map_array->value_length(offset_ + pos);
return std::make_shared<ColumnarMap>(map_array->keys(), map_array->items(), pool_, offset,
length);
}

std::shared_ptr<InternalRow> ColumnarArray::GetRow(int32_t pos, int32_t num_fields) const {
auto struct_array = arrow::internal::checked_cast<const arrow::StructArray*>(array_);
assert(struct_array);
auto row_ctx = std::make_shared<ColumnarBatchContext>(struct_array->fields(), pool_);
return std::make_shared<ColumnarRowRef>(std::move(row_ctx), offset_ + pos);
}

Result<std::vector<char>> ColumnarArray::ToBooleanArray() const {
PAIMON_RETURN_NOT_OK(CheckNoNull());
std::vector<char> res(length_);
for (int32_t i = 0; i < length_; i++) {
bool element = GetBoolean(i);
res[i] = element ? static_cast<char>(1) : static_cast<char>(0);
}
return res;
}

Result<std::vector<char>> ColumnarArray::ToByteArray() const {
PAIMON_RETURN_NOT_OK(CheckNoNull());
std::vector<char> res(length_);
for (int32_t i = 0; i < length_; i++) {
res[i] = GetByte(i);
}
return res;
}

Result<std::vector<int16_t>> ColumnarArray::ToShortArray() const {
PAIMON_RETURN_NOT_OK(CheckNoNull());
std::vector<int16_t> res(length_);
for (int32_t i = 0; i < length_; i++) {
res[i] = GetShort(i);
}
return res;
}

Result<std::vector<int32_t>> ColumnarArray::ToIntArray() const {
PAIMON_RETURN_NOT_OK(CheckNoNull());
std::vector<int32_t> res(length_);
for (int32_t i = 0; i < length_; i++) {
res[i] = GetInt(i);
}
return res;
}

Result<std::vector<int64_t>> ColumnarArray::ToLongArray() const {
PAIMON_RETURN_NOT_OK(CheckNoNull());
std::vector<int64_t> res(length_);
for (int32_t i = 0; i < length_; i++) {
res[i] = GetLong(i);
}
return res;
}

Result<std::vector<float>> ColumnarArray::ToFloatArray() const {
PAIMON_RETURN_NOT_OK(CheckNoNull());
std::vector<float> res(length_);
for (int32_t i = 0; i < length_; i++) {
res[i] = GetFloat(i);
}
return res;
}

Result<std::vector<double>> ColumnarArray::ToDoubleArray() const {
PAIMON_RETURN_NOT_OK(CheckNoNull());
std::vector<double> res(length_);
for (int32_t i = 0; i < length_; i++) {
res[i] = GetDouble(i);
}
return res;
}

} // namespace paimon
154 changes: 154 additions & 0 deletions src/paimon/common/data/columnar/columnar_array.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <cassert>
#include <cstdint>
#include <memory>
#include <string_view>
#include <vector>

#include "arrow/array/array_base.h"
#include "paimon/common/data/binary_string.h"
#include "paimon/common/data/columnar/columnar_utils.h"
#include "paimon/common/data/internal_array.h"
#include "paimon/common/data/internal_map.h"
#include "paimon/common/data/internal_row.h"
#include "paimon/data/decimal.h"
#include "paimon/data/timestamp.h"
#include "paimon/result.h"
#include "paimon/status.h"

namespace arrow {
class BinaryType;
class BooleanType;
class Date32Type;
class DoubleType;
class FloatType;
class Int16Type;
class Int32Type;
class Int64Type;
class Int8Type;
class StringType;
} // namespace arrow

namespace paimon {
class Bytes;
class MemoryPool;

/// Columnar array to support access to vector column data.
///
/// NOTE: This class holds a non-owning raw pointer to the underlying arrow::Array for efficiency.
/// The caller must ensure that the pointed-to Array outlives this ColumnarArray instance.
/// Typically, lifetime is guaranteed by the owning ColumnarBatchContext or the parent
/// arrow container (e.g., ListArray, MapArray) that holds the shared_ptr.
class ColumnarArray : public InternalArray {
public:
ColumnarArray(const arrow::Array* array, const std::shared_ptr<MemoryPool>& pool,
int32_t offset, int32_t length)
: pool_(pool), array_(array), offset_(offset), length_(length) {
assert(array_);
assert(array_->length() >= offset + length);
}

int32_t Size() const override {
return length_;
}

bool IsNullAt(int32_t pos) const override {
return array_->IsNull(offset_ + pos);
}

bool GetBoolean(int32_t pos) const override {
return ColumnarUtils::GetGenericValue<arrow::BooleanType, bool>(array_, offset_ + pos);
}

char GetByte(int32_t pos) const override {
return ColumnarUtils::GetGenericValue<arrow::Int8Type, char>(array_, offset_ + pos);
}

int16_t GetShort(int32_t pos) const override {
return ColumnarUtils::GetGenericValue<arrow::Int16Type, int16_t>(array_, offset_ + pos);
}

int32_t GetInt(int32_t pos) const override {
return ColumnarUtils::GetGenericValue<arrow::Int32Type, int32_t>(array_, offset_ + pos);
}

int32_t GetDate(int32_t pos) const override {
return ColumnarUtils::GetGenericValue<arrow::Date32Type, int32_t>(array_, offset_ + pos);
}

int64_t GetLong(int32_t pos) const override {
return ColumnarUtils::GetGenericValue<arrow::Int64Type, int64_t>(array_, offset_ + pos);
}

float GetFloat(int32_t pos) const override {
return ColumnarUtils::GetGenericValue<arrow::FloatType, float>(array_, offset_ + pos);
}

double GetDouble(int32_t pos) const override {
return ColumnarUtils::GetGenericValue<arrow::DoubleType, double>(array_, offset_ + pos);
}

BinaryString GetString(int32_t pos) const override {
auto bytes = ColumnarUtils::GetBytes<arrow::StringType>(array_, offset_ + pos, pool_.get());
return BinaryString::FromBytes(bytes);
}

std::string_view GetStringView(int32_t pos) const override {
return ColumnarUtils::GetView(array_, offset_ + pos);
}

Decimal GetDecimal(int32_t pos, int32_t precision, int32_t scale) const override;

Timestamp GetTimestamp(int32_t pos, int32_t precision) const override;

std::shared_ptr<Bytes> GetBinary(int32_t pos) const override {
return ColumnarUtils::GetBytes<arrow::BinaryType>(array_, offset_ + pos, pool_.get());
}

std::shared_ptr<InternalArray> GetArray(int32_t pos) const override;

std::shared_ptr<InternalMap> GetMap(int32_t pos) const override;

std::shared_ptr<InternalRow> GetRow(int32_t pos, int32_t num_fields) const override;

Result<std::vector<char>> ToBooleanArray() const override;

Result<std::vector<char>> ToByteArray() const override;

Result<std::vector<int16_t>> ToShortArray() const override;

Result<std::vector<int32_t>> ToIntArray() const override;

Result<std::vector<int64_t>> ToLongArray() const override;

Result<std::vector<float>> ToFloatArray() const override;

Result<std::vector<double>> ToDoubleArray() const override;

private:
Status CheckNoNull() const;

private:
std::shared_ptr<MemoryPool> pool_;
const arrow::Array* array_;
int32_t offset_;
int32_t length_;
};
} // namespace paimon
Loading