diff --git a/native/Cargo.lock b/native/Cargo.lock index bdd221e95f..1546af4e6b 100644 --- a/native/Cargo.lock +++ b/native/Cargo.lock @@ -196,8 +196,8 @@ dependencies = [ "serde_bytes", "serde_json", "snap", - "strum", - "strum_macros", + "strum 0.27.2", + "strum_macros 0.27.2", "thiserror 2.0.18", "uuid", "zstd", @@ -359,6 +359,7 @@ dependencies = [ "arrow-select", "flatbuffers", "lz4_flex", + "zstd", ] [[package]] @@ -563,6 +564,17 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "async-recursion" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b43422f69d8ff38f95f1b2bb76517c91589a924d1559a0e935d7c8ce0274c11" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.117", +] + [[package]] name = "async-std" version = "1.13.2" @@ -606,6 +618,15 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "async_cell" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "447ab28afbb345f5408b120702a44e5529ebf90b1796ec76e9528df8e288e6c2" +dependencies = [ + "loom", +] + [[package]] name = "atoi" version = "2.0.0" @@ -969,9 +990,9 @@ dependencies = [ [[package]] name = "aws-smithy-types" -version = "1.4.9" +version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "53f93074121a1be41317b9aa607143ae17900631f7f59a99f2b905d519d6783b" +checksum = "32b42fcf341259d85ca10fac9a2f6448a8ec691c6955a18e45bc3b71a85fab85" dependencies = [ "base64-simd", "bytes", @@ -1136,6 +1157,27 @@ version = "2.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b4388bee8683e3d04af747c73422af53102d2bd24d9eadb6cbc100baef4b43f8" +[[package]] +name = "bitpacking" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96a7139abd3d9cebf8cd6f920a389cf3dc9576172e32f4563f188cae3c3eb019" +dependencies = [ + "crunchy", +] + +[[package]] +name = "bitvec" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1bc2832c24239b0141d5674bb9174f9d68a8b5b3f2753311927c172ca46f7e9c" +dependencies = [ + "funty", + "radium", + "tap", + "wyz", +] + [[package]] name = "blake2" version = "0.10.6" @@ -1170,9 +1212,9 @@ dependencies = [ [[package]] name = "block-buffer" -version = "0.12.0" +version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cdd35008169921d80bc60d3d0ab416eecb028c4cd653352907921d95084790be" +checksum = "d2f6c7dbe95a6ed67ad9f18e57daf93a2f034c524b99fd2b76d18fdfeb6660aa" dependencies = [ "hybrid-array", ] @@ -1346,7 +1388,7 @@ version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6fac387a98bb7c37292057cffc56d62ecb629900026402633ae9160df93a8766" dependencies = [ - "nom", + "nom 7.1.3", ] [[package]] @@ -1717,6 +1759,25 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "crossbeam-queue" +version = "0.3.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f58bbc28f91df819d0aa2a2c00cd19754769c2fad90579b3592b1c9ba7a3115" +dependencies = [ + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-skiplist" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df29de440c58ca2cc6e587ec3d22347551a32435fbde9d2bff64e78a9ffa151b" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.21" @@ -2010,6 +2071,7 @@ dependencies = [ "iceberg-storage-opendal", "itertools 0.14.0", "jni 0.22.4", + "lance", "lazy_static", "log", "log4rs", @@ -2748,6 +2810,26 @@ dependencies = [ "uuid", ] +[[package]] +name = "deepsize" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1cdb987ec36f6bf7bfbea3f928b75590b736fc42af8e54d97592481351b2b96c" +dependencies = [ + "deepsize_derive", +] + +[[package]] +name = "deepsize_derive" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "990101d41f3bc8c1a45641024377ee284ecc338e5ecf3ea0f0e236d897c72796" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "der" version = "0.7.10" @@ -2846,12 +2928,33 @@ version = "0.11.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f1dd6dbb5841937940781866fa1281a1ff7bd3bf827091440879f9994983d5c2" dependencies = [ - "block-buffer 0.12.0", + "block-buffer 0.12.1", "const-oid 0.10.2", "crypto-common 0.2.2", "ctutils", ] +[[package]] +name = "dirs" +version = "6.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3e8aa94d75141228480295a7d0e7feb620b1a5ad9f12bc40be62411e38cce4e" +dependencies = [ + "dirs-sys", +] + +[[package]] +name = "dirs-sys" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e01a3366d27ee9890022452ee61b2b63a67e6f13f58900b651ff5665f0bb1fab" +dependencies = [ + "libc", + "option-ext", + "redox_users", + "windows-sys 0.61.2", +] + [[package]] name = "displaydoc" version = "0.2.6" @@ -2896,6 +2999,15 @@ version = "1.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "91622ff5e7162018101f2fea40d6ebf4a78bbe5a49736a2020649edf9693679e" +[[package]] +name = "encoding_rs" +version = "0.8.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75030f3c4f45dafd7586dd6780965a8c7e8e285a5ecb86713e63a79c5b2766f3" +dependencies = [ + "cfg-if", +] + [[package]] name = "equator" version = "0.4.2" @@ -2943,6 +3055,12 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "ethnum" +version = "1.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "40404c3f5f511ec4da6fe866ddf6a717c309fdbb69fbbad7b0f3edab8f2e835f" + [[package]] name = "event-listener" version = "2.5.3" @@ -2980,6 +3098,12 @@ dependencies = [ "once_cell", ] +[[package]] +name = "fast-float2" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8eb564c5c7423d25c886fb561d1e4ee69f72354d16918afa32c08811f6b6a55" + [[package]] name = "fastnum" version = "0.7.5" @@ -3090,6 +3214,30 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c" +[[package]] +name = "fsst" +version = "7.0.0-beta.17" +source = "git+https://github.com/lance-format/lance?rev=c7c5626d4e830b46c239b5bf4e2a17e32ab901b8#c7c5626d4e830b46c239b5bf4e2a17e32ab901b8" +dependencies = [ + "arrow-array", + "rand 0.9.4", +] + +[[package]] +name = "fst" +version = "0.4.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ab85b9b05e3978cc9a9cf8fea7f01b494e1a09ed3037e16ba39edc7a29eb61a" +dependencies = [ + "utf8-ranges", +] + +[[package]] +name = "funty" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6d5a32815ae3f33302d95fdcb2ce17862f8c65363dcfd29360480ba1001fc9c" + [[package]] name = "futures" version = "0.3.32" @@ -3191,6 +3339,21 @@ dependencies = [ "slab", ] +[[package]] +name = "generator" +version = "0.8.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b3b854b0e584ead1a33f18b2fcad7cf7be18b3875c78816b753639aa501513ae" +dependencies = [ + "cc", + "cfg-if", + "libc", + "log", + "rustversion", + "windows-link", + "windows-result", +] + [[package]] name = "generic-array" version = "0.14.7" @@ -3550,6 +3713,15 @@ dependencies = [ "tracing", ] +[[package]] +name = "hyperloglogplus" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "621debdf94dcac33e50475fdd76d34d5ea9c0362a834b9db08c3024696c1fbe3" +dependencies = [ + "serde", +] + [[package]] name = "iana-time-zone" version = "0.1.65" @@ -3619,7 +3791,7 @@ dependencies = [ "serde_json", "serde_repr", "serde_with", - "strum", + "strum 0.27.2", "tokio", "tracing", "typed-builder", @@ -3830,6 +4002,17 @@ dependencies = [ "rustversion", ] +[[package]] +name = "io-uring" +version = "0.7.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4d09b98f7eace8982db770e4408e7470b028ce513ac28fecdc6bf4c30fe92b62" +dependencies = [ + "bitflags 2.13.0", + "cfg-if", + "libc", +] + [[package]] name = "ipnet" version = "2.12.0" @@ -4027,83 +4210,554 @@ dependencies = [ ] [[package]] -name = "kv-log-macro" -version = "1.0.7" +name = "jsonb" +version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0de8b303297635ad57c9f5059fd9cee7a47f8e8daa09df0fcd07dd39fb22977f" +checksum = "eb98fb29636087c40ad0d1274d9a30c0c1e83e03ae93f6e7e89247b37fcc6953" dependencies = [ - "log", + "byteorder", + "ethnum", + "fast-float2", + "itoa", + "jiff", + "nom 8.0.0", + "num-traits", + "ordered-float 5.3.0", + "rand 0.9.4", + "serde", + "serde_json", + "zmij", ] [[package]] -name = "lazy_static" -version = "1.5.0" +name = "kv-log-macro" +version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" +checksum = "0de8b303297635ad57c9f5059fd9cee7a47f8e8daa09df0fcd07dd39fb22977f" dependencies = [ - "spin 0.9.8", + "log", ] [[package]] -name = "lazycell" -version = "1.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" - -[[package]] -name = "leb128fmt" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" - -[[package]] -name = "lexical-core" -version = "1.0.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7d8d125a277f807e55a77304455eb7b1cb52f2b18c143b60e766c120bd64a594" +name = "lance" +version = "7.0.0-beta.17" +source = "git+https://github.com/lance-format/lance?rev=c7c5626d4e830b46c239b5bf4e2a17e32ab901b8#c7c5626d4e830b46c239b5bf4e2a17e32ab901b8" dependencies = [ - "lexical-parse-float", - "lexical-parse-integer", - "lexical-util", - "lexical-write-float", - "lexical-write-integer", + "arc-swap", + "arrow", + "arrow-arith", + "arrow-array", + "arrow-buffer", + "arrow-cast", + "arrow-ipc", + "arrow-ord", + "arrow-row", + "arrow-schema", + "arrow-select", + "async-recursion", + "async-trait", + "async_cell", + "bitpacking", + "byteorder", + "bytes", + "chrono", + "crossbeam-queue", + "crossbeam-skiplist", + "dashmap", + "datafusion", + "datafusion-expr", + "datafusion-functions", + "datafusion-physical-expr", + "datafusion-physical-plan", + "deepsize", + "either", + "futures", + "half", + "humantime", + "itertools 0.13.0", + "lance-arrow", + "lance-core", + "lance-datafusion", + "lance-encoding", + "lance-file", + "lance-index", + "lance-io", + "lance-linalg", + "lance-namespace", + "lance-table", + "lance-tokenizer", + "log", + "moka", + "object_store", + "permutation", + "pin-project", + "prost", + "prost-build", + "prost-types", + "rand 0.9.4", + "rayon", + "roaring", + "semver", + "serde", + "serde_json", + "snafu", + "tokio", + "tokio-stream", + "tokio-util", + "tracing", + "url", + "uuid", ] [[package]] -name = "lexical-parse-float" -version = "1.0.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "52a9f232fbd6f550bc0137dcb5f99ab674071ac2d690ac69704593cb4abbea56" +name = "lance-arrow" +version = "7.0.0-beta.17" +source = "git+https://github.com/lance-format/lance?rev=c7c5626d4e830b46c239b5bf4e2a17e32ab901b8#c7c5626d4e830b46c239b5bf4e2a17e32ab901b8" dependencies = [ - "lexical-parse-integer", - "lexical-util", + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-ipc", + "arrow-ord", + "arrow-schema", + "arrow-select", + "bytes", + "futures", + "getrandom 0.2.17", + "half", + "jsonb", + "num-traits", + "rand 0.9.4", ] [[package]] -name = "lexical-parse-integer" -version = "1.0.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a7a039f8fb9c19c996cd7b2fcce303c1b2874fe1aca544edc85c4a5f8489b34" +name = "lance-bitpacking" +version = "7.0.0-beta.17" +source = "git+https://github.com/lance-format/lance?rev=c7c5626d4e830b46c239b5bf4e2a17e32ab901b8#c7c5626d4e830b46c239b5bf4e2a17e32ab901b8" dependencies = [ - "lexical-util", + "arrayref", + "paste", + "seq-macro", ] [[package]] -name = "lexical-util" -version = "1.0.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2604dd126bb14f13fb5d1bd6a66155079cb9fa655b37f875b3a742c705dbed17" - -[[package]] -name = "lexical-write-float" -version = "1.0.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50c438c87c013188d415fbabbb1dceb44249ab81664efbd31b14ae55dabb6361" +name = "lance-core" +version = "7.0.0-beta.17" +source = "git+https://github.com/lance-format/lance?rev=c7c5626d4e830b46c239b5bf4e2a17e32ab901b8#c7c5626d4e830b46c239b5bf4e2a17e32ab901b8" dependencies = [ - "lexical-util", - "lexical-write-integer", -] - + "arrow-array", + "arrow-buffer", + "arrow-schema", + "async-trait", + "byteorder", + "bytes", + "datafusion-common", + "datafusion-sql", + "deepsize", + "futures", + "itertools 0.13.0", + "lance-arrow", + "libc", + "log", + "moka", + "num_cpus", + "object_store", + "pin-project", + "prost", + "rand 0.9.4", + "roaring", + "serde_json", + "snafu", + "tempfile", + "tokio", + "tokio-stream", + "tokio-util", + "tracing", + "url", +] + +[[package]] +name = "lance-datafusion" +version = "7.0.0-beta.17" +source = "git+https://github.com/lance-format/lance?rev=c7c5626d4e830b46c239b5bf4e2a17e32ab901b8#c7c5626d4e830b46c239b5bf4e2a17e32ab901b8" +dependencies = [ + "arrow", + "arrow-array", + "arrow-buffer", + "arrow-cast", + "arrow-ord", + "arrow-schema", + "arrow-select", + "async-trait", + "chrono", + "datafusion", + "datafusion-common", + "datafusion-functions", + "datafusion-physical-expr", + "futures", + "jsonb", + "lance-arrow", + "lance-core", + "lance-datagen", + "log", + "pin-project", + "prost", + "prost-build", + "tokio", + "tracing", +] + +[[package]] +name = "lance-datagen" +version = "7.0.0-beta.17" +source = "git+https://github.com/lance-format/lance?rev=c7c5626d4e830b46c239b5bf4e2a17e32ab901b8#c7c5626d4e830b46c239b5bf4e2a17e32ab901b8" +dependencies = [ + "arrow", + "arrow-array", + "arrow-cast", + "arrow-schema", + "chrono", + "futures", + "half", + "hex", + "rand 0.9.4", + "rand_distr", + "rand_xoshiro", + "random_word", +] + +[[package]] +name = "lance-encoding" +version = "7.0.0-beta.17" +source = "git+https://github.com/lance-format/lance?rev=c7c5626d4e830b46c239b5bf4e2a17e32ab901b8#c7c5626d4e830b46c239b5bf4e2a17e32ab901b8" +dependencies = [ + "arrow-arith", + "arrow-array", + "arrow-buffer", + "arrow-cast", + "arrow-data", + "arrow-schema", + "arrow-select", + "bytemuck", + "byteorder", + "bytes", + "fsst", + "futures", + "hex", + "hyperloglogplus", + "itertools 0.13.0", + "lance-arrow", + "lance-bitpacking", + "lance-core", + "log", + "lz4", + "num-traits", + "prost", + "prost-build", + "rand 0.9.4", + "strum 0.26.3", + "tokio", + "tracing", + "xxhash-rust", + "zstd", +] + +[[package]] +name = "lance-file" +version = "7.0.0-beta.17" +source = "git+https://github.com/lance-format/lance?rev=c7c5626d4e830b46c239b5bf4e2a17e32ab901b8#c7c5626d4e830b46c239b5bf4e2a17e32ab901b8" +dependencies = [ + "arrow-arith", + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "arrow-select", + "async-recursion", + "async-trait", + "byteorder", + "bytes", + "datafusion-common", + "deepsize", + "futures", + "lance-arrow", + "lance-core", + "lance-encoding", + "lance-io", + "log", + "num-traits", + "object_store", + "prost", + "prost-build", + "prost-types", + "tokio", + "tracing", +] + +[[package]] +name = "lance-index" +version = "7.0.0-beta.17" +source = "git+https://github.com/lance-format/lance?rev=c7c5626d4e830b46c239b5bf4e2a17e32ab901b8#c7c5626d4e830b46c239b5bf4e2a17e32ab901b8" +dependencies = [ + "arc-swap", + "arrow", + "arrow-arith", + "arrow-array", + "arrow-ord", + "arrow-schema", + "arrow-select", + "async-channel 2.5.0", + "async-recursion", + "async-trait", + "bitpacking", + "bitvec", + "bytes", + "chrono", + "crossbeam-queue", + "datafusion", + "datafusion-common", + "datafusion-expr", + "datafusion-physical-expr", + "deepsize", + "dirs", + "fst", + "futures", + "half", + "itertools 0.13.0", + "jsonb", + "lance-arrow", + "lance-core", + "lance-datafusion", + "lance-datagen", + "lance-encoding", + "lance-file", + "lance-io", + "lance-linalg", + "lance-table", + "lance-tokenizer", + "libm", + "log", + "ndarray", + "num-traits", + "object_store", + "prost", + "prost-build", + "prost-types", + "rand 0.9.4", + "rand_distr", + "rangemap", + "rayon", + "roaring", + "serde", + "serde_json", + "smallvec", + "tempfile", + "tokio", + "tracing", + "twox-hash", + "uuid", +] + +[[package]] +name = "lance-io" +version = "7.0.0-beta.17" +source = "git+https://github.com/lance-format/lance?rev=c7c5626d4e830b46c239b5bf4e2a17e32ab901b8#c7c5626d4e830b46c239b5bf4e2a17e32ab901b8" +dependencies = [ + "arrow", + "arrow-arith", + "arrow-array", + "arrow-buffer", + "arrow-cast", + "arrow-data", + "arrow-schema", + "arrow-select", + "async-recursion", + "async-trait", + "byteorder", + "bytes", + "chrono", + "deepsize", + "futures", + "http 1.4.2", + "io-uring", + "lance-arrow", + "lance-core", + "lance-namespace", + "log", + "moka", + "object_store", + "path_abs", + "pin-project", + "prost", + "rand 0.9.4", + "serde", + "tempfile", + "tokio", + "tracing", + "url", +] + +[[package]] +name = "lance-linalg" +version = "7.0.0-beta.17" +source = "git+https://github.com/lance-format/lance?rev=c7c5626d4e830b46c239b5bf4e2a17e32ab901b8#c7c5626d4e830b46c239b5bf4e2a17e32ab901b8" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-schema", + "cc", + "deepsize", + "half", + "lance-arrow", + "lance-core", + "num-traits", + "rand 0.9.4", +] + +[[package]] +name = "lance-namespace" +version = "7.0.0-beta.17" +source = "git+https://github.com/lance-format/lance?rev=c7c5626d4e830b46c239b5bf4e2a17e32ab901b8#c7c5626d4e830b46c239b5bf4e2a17e32ab901b8" +dependencies = [ + "arrow", + "async-trait", + "bytes", + "lance-core", + "lance-namespace-reqwest-client", + "snafu", +] + +[[package]] +name = "lance-namespace-reqwest-client" +version = "0.7.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6369eee4682fb11edf538388b43c61ce288b8302fe89bb40944d7daa7faaae99" +dependencies = [ + "reqwest 0.12.28", + "serde", + "serde_json", + "serde_repr", + "serde_with", + "url", +] + +[[package]] +name = "lance-table" +version = "7.0.0-beta.17" +source = "git+https://github.com/lance-format/lance?rev=c7c5626d4e830b46c239b5bf4e2a17e32ab901b8#c7c5626d4e830b46c239b5bf4e2a17e32ab901b8" +dependencies = [ + "arrow", + "arrow-array", + "arrow-buffer", + "arrow-ipc", + "arrow-schema", + "async-trait", + "byteorder", + "bytes", + "chrono", + "deepsize", + "futures", + "lance-arrow", + "lance-core", + "lance-file", + "lance-io", + "log", + "object_store", + "prost", + "prost-build", + "prost-types", + "rand 0.9.4", + "rangemap", + "roaring", + "semver", + "serde", + "serde_json", + "snafu", + "tokio", + "tracing", + "url", + "uuid", +] + +[[package]] +name = "lance-tokenizer" +version = "7.0.0-beta.17" +source = "git+https://github.com/lance-format/lance?rev=c7c5626d4e830b46c239b5bf4e2a17e32ab901b8#c7c5626d4e830b46c239b5bf4e2a17e32ab901b8" +dependencies = [ + "rust-stemmers", + "serde", + "unicode-normalization", +] + +[[package]] +name = "lazy_static" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" +dependencies = [ + "spin 0.9.8", +] + +[[package]] +name = "lazycell" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" + +[[package]] +name = "leb128fmt" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" + +[[package]] +name = "lexical-core" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d8d125a277f807e55a77304455eb7b1cb52f2b18c143b60e766c120bd64a594" +dependencies = [ + "lexical-parse-float", + "lexical-parse-integer", + "lexical-util", + "lexical-write-float", + "lexical-write-integer", +] + +[[package]] +name = "lexical-parse-float" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52a9f232fbd6f550bc0137dcb5f99ab674071ac2d690ac69704593cb4abbea56" +dependencies = [ + "lexical-parse-integer", + "lexical-util", +] + +[[package]] +name = "lexical-parse-integer" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a7a039f8fb9c19c996cd7b2fcce303c1b2874fe1aca544edc85c4a5f8489b34" +dependencies = [ + "lexical-util", +] + +[[package]] +name = "lexical-util" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2604dd126bb14f13fb5d1bd6a66155079cb9fa655b37f875b3a742c705dbed17" + +[[package]] +name = "lexical-write-float" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50c438c87c013188d415fbabbb1dceb44249ab81664efbd31b14ae55dabb6361" +dependencies = [ + "lexical-util", + "lexical-write-integer", +] + [[package]] name = "lexical-write-integer" version = "1.0.6" @@ -4170,6 +4824,15 @@ dependencies = [ "cc", ] +[[package]] +name = "libredox" +version = "0.1.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f02ab6bace2054fb888a3c16f990117b579d14a3088e472d63c6011fa185c9d3" +dependencies = [ + "libc", +] + [[package]] name = "link-section" version = "0.18.2" @@ -4255,10 +4918,42 @@ dependencies = [ ] [[package]] -name = "lru-slab" -version = "0.1.2" +name = "loom" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "419e0dc8046cb947daa77eb95ae174acfbddb7673b4151f56d1eed8e93fbfaca" +dependencies = [ + "cfg-if", + "generator", + "scoped-tls", + "tracing", + "tracing-subscriber", +] + +[[package]] +name = "lru-slab" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154" + +[[package]] +name = "lz4" +version = "1.28.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a20b523e860d03443e98350ceaac5e71c6ba89aea7d960769ec3ce37f4de5af4" +dependencies = [ + "lz4-sys", +] + +[[package]] +name = "lz4-sys" +version = "1.11.1+lz4-1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154" +checksum = "6bd8c0d6c6ed0cd30b3652886bb8711dc4bb01d637a68105a3d5158039b418e6" +dependencies = [ + "cc", + "libc", +] [[package]] name = "lz4_flex" @@ -4269,6 +4964,28 @@ dependencies = [ "twox-hash", ] +[[package]] +name = "matchers" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1525a2a28c7f4fa0fc98bb91ae755d1e2d1505079e05539e35bc876b5d65ae9" +dependencies = [ + "regex-automata", +] + +[[package]] +name = "matrixmultiply" +version = "0.3.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a06de3016e9fae57a36fd14dba131fccf49f74b40b7fbdb472f96e361ec71a08" +dependencies = [ + "autocfg", + "num_cpus", + "once_cell", + "rawpointer", + "thread-tree", +] + [[package]] name = "md-5" version = "0.10.6" @@ -4300,9 +5017,9 @@ dependencies = [ [[package]] name = "memchr" -version = "2.8.1" +version = "2.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6b947ae49db0d222b1dbc6b113ce7248a3fc3a6ca21b696717bfc000ba4484d8" +checksum = "88904434abc2901f197fe8cc55f0445e7ded921dba5911dad2e2b39b48e663c4" [[package]] name = "memmap2" @@ -4322,6 +5039,22 @@ dependencies = [ "libmimalloc-sys", ] +[[package]] +name = "mime" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" + +[[package]] +name = "mime_guess" +version = "2.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7c44f8e672c00fe5308fa235f821cb4198414e1c77935c1ab6948d3fd78550e" +dependencies = [ + "mime", + "unicase", +] + [[package]] name = "minimal-lexical" version = "0.2.1" @@ -4387,6 +5120,21 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9252111cf132ba0929b6f8e030cac2a24b507f3a4d6db6fb2896f27b354c714b" +[[package]] +name = "ndarray" +version = "0.16.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "882ed72dce9365842bf196bdeedf5055305f11fc8c03dee7bb0194a6cad34841" +dependencies = [ + "matrixmultiply", + "num-complex", + "num-integer", + "num-traits", + "portable-atomic", + "portable-atomic-util", + "rawpointer", +] + [[package]] name = "nix" version = "0.26.4" @@ -4408,6 +5156,24 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "nom" +version = "8.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df9761775871bdef83bee530e60050f7e54b1105350d6884eb0fb4f46c2f9405" +dependencies = [ + "memchr", +] + +[[package]] +name = "nu-ansi-term" +version = "0.50.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" +dependencies = [ + "windows-sys 0.61.2", +] + [[package]] name = "num" version = "0.4.3" @@ -4515,6 +5281,16 @@ dependencies = [ "libm", ] +[[package]] +name = "num_cpus" +version = "1.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91df4bbde75afed763b708b7eee1e8e7651e02d97f6d5dd763e89367e957b23b" +dependencies = [ + "hermit-abi", + "libc", +] + [[package]] name = "object" version = "0.37.3" @@ -4820,6 +5596,12 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7c87def4c32ab89d880effc9e097653c8da5d6ef28e6b539d313baaacfbafcbe" +[[package]] +name = "option-ext" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" + [[package]] name = "ordered-float" version = "2.10.1" @@ -4838,6 +5620,15 @@ dependencies = [ "num-traits", ] +[[package]] +name = "ordered-float" +version = "5.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b7d950ca161dc355eaf28f82b11345ed76c6e1f6eb1f4f4479e0323b9e2fbd0e" +dependencies = [ + "num-traits", +] + [[package]] name = "ordered-multimap" version = "0.7.3" @@ -4976,6 +5767,18 @@ version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a" +[[package]] +name = "path_abs" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05ef02f6342ac01d8a93b65f96db53fe68a92a15f41144f97fb00a9e669633c3" +dependencies = [ + "serde", + "serde_derive", + "std_prelude", + "stfu8", +] + [[package]] name = "pbkdf2" version = "0.12.2" @@ -5017,6 +5820,12 @@ version = "2.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b4f627cb1b25917193a259e49bdad08f671f8d9708acfd5fe0a8c1455d87220" +[[package]] +name = "permutation" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df202b0b0f5b8e389955afd5f27b007b00fb948162953f1db9c70d2c7e3157d7" + [[package]] name = "petgraph" version = "0.8.3" @@ -5455,6 +6264,12 @@ version = "6.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8dcc9c7d52a811697d2151c701e0d08956f92b0e24136cf4cf27b57a6a0d9bf" +[[package]] +name = "radium" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc33ff2d4973d518d823d61aa239014831e521c75da58e3df4840d3f47749d09" + [[package]] name = "rand" version = "0.8.6" @@ -5530,6 +6345,50 @@ version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "63b8176103e19a2643978565ca18b50549f6101881c443590420e4dc998a3c69" +[[package]] +name = "rand_distr" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a8615d50dcf34fa31f7ab52692afec947c4dd0ab803cc87cb3b0b4570ff7463" +dependencies = [ + "num-traits", + "rand 0.9.4", +] + +[[package]] +name = "rand_xoshiro" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f703f4665700daf5512dcca5f43afa6af89f09db47fb56be587f80636bda2d41" +dependencies = [ + "rand_core 0.9.5", +] + +[[package]] +name = "random_word" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e47a395bdb55442b883c89062d6bcff25dc90fa5f8369af81e0ac6d49d78cf81" +dependencies = [ + "ahash", + "brotli", + "paste", + "rand 0.9.4", + "unicase", +] + +[[package]] +name = "rangemap" +version = "1.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "973443cf09a9c8656b574a866ab68dfa19f0867d0340648c7d2f6a71b8a8ea68" + +[[package]] +name = "rawpointer" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "60a357793950651c4ed0f3f52338f53b2f809f32d83a07f72909fa13e4c6c1e3" + [[package]] name = "rayon" version = "1.12.0" @@ -5559,6 +6418,17 @@ dependencies = [ "bitflags 2.13.0", ] +[[package]] +name = "redox_users" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4e608c6638b9c18977b00b475ac1f28d14e84b27d8d42f70e0bf1e3dec127ac" +dependencies = [ + "getrandom 0.2.17", + "libredox", + "thiserror 2.0.18", +] + [[package]] name = "ref-cast" version = "1.0.25" @@ -5736,6 +6606,7 @@ checksum = "eddd3ca559203180a307f12d114c268abf583f59b03cb906fd0b3ff8646c1147" dependencies = [ "base64", "bytes", + "encoding_rs", "futures-core", "futures-util", "h2", @@ -5747,6 +6618,8 @@ dependencies = [ "hyper-util", "js-sys", "log", + "mime", + "mime_guess", "percent-encoding", "pin-project-lite", "quinn", @@ -5872,6 +6745,16 @@ dependencies = [ "ordered-multimap", ] +[[package]] +name = "rust-stemmers" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e46a2036019fdb888131db7a4c847a1063a7493f971ed94ea82c67eada63ca54" +dependencies = [ + "serde", + "serde_derive", +] + [[package]] name = "rustc-demangle" version = "0.1.27" @@ -6064,6 +6947,12 @@ dependencies = [ "serde_json", ] +[[package]] +name = "scoped-tls" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1cf6437eb19a8f4a6cc0f7dca544973b0b78843adbfeb3683d1a94a0024a294" + [[package]] name = "scopeguard" version = "1.2.0" @@ -6301,6 +7190,15 @@ dependencies = [ "digest 0.11.3", ] +[[package]] +name = "sharded-slab" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" +dependencies = [ + "lazy_static", +] + [[package]] name = "shlex" version = "1.3.0" @@ -6369,9 +7267,30 @@ checksum = "0c790de23124f9ab44544d7ac05d60440adc586479ce501c1d6d7da3cd8c9cf5" [[package]] name = "smallvec" -version = "1.15.1" +version = "1.15.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ed6a63f02c8539c91a8685a86f4099661ba3da017932f6ebbea6de3f0fa7c90" + +[[package]] +name = "snafu" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1a012328be2e3f5d5f6f3218147ca02588cea4cb865e876849ab6debcf36522" +dependencies = [ + "snafu-derive", +] + +[[package]] +name = "snafu-derive" +version = "0.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03" +checksum = "5f103c50866b8743da9429b8a581d81a27c2d3a9c4ac7df8f8571c1dd7896eda" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn 2.0.117", +] [[package]] name = "snap" @@ -6441,6 +7360,18 @@ version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6ce2be8dc25455e1f91df71bfa12ad37d7af1092ae736f3a6cd0e37bc7810596" +[[package]] +name = "std_prelude" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8207e78455ffdf55661170876f88daf85356e4edd54e0a3dbc79586ca1e50cbe" + +[[package]] +name = "stfu8" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e51f1e89f093f99e7432c491c382b88a6860a5adbe6bf02574bf0a08efff1978" + [[package]] name = "str_stack" version = "0.1.1" @@ -6453,13 +7384,35 @@ version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" +[[package]] +name = "strum" +version = "0.26.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8fec0f0aef304996cf250b31b5a10dee7980c85da9d759361292b8bca5a18f06" +dependencies = [ + "strum_macros 0.26.4", +] + [[package]] name = "strum" version = "0.27.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "af23d6f6c1a224baef9d3f61e287d2761385a5b88fdab4eb4c6f11aeb54c4bcf" dependencies = [ - "strum_macros", + "strum_macros 0.27.2", +] + +[[package]] +name = "strum_macros" +version = "0.26.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c6bee85a5a24955dc440386795aa378cd9cf82acd5f764469152d2270e581be" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "rustversion", + "syn 2.0.117", ] [[package]] @@ -6551,6 +7504,12 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417" +[[package]] +name = "tap" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" + [[package]] name = "tempfile" version = "3.27.0" @@ -6614,6 +7573,24 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "thread-tree" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffbd370cb847953a25954d9f63e14824a36113f8c72eecf6eccef5dc4b45d630" +dependencies = [ + "crossbeam-channel", +] + +[[package]] +name = "thread_local" +version = "1.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f60246a4944f24f6e018aa17cdeffb7818b76356965d03b07d6a9886e8962185" +dependencies = [ + "cfg-if", +] + [[package]] name = "thrift" version = "0.17.0" @@ -6815,12 +7792,17 @@ version = "0.6.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4cfcf7e2740e6fc6d4d688b4ef00650406bb94adf4731e43c096c3a19fe40840" dependencies = [ + "async-compression", "bitflags 2.13.0", "bytes", + "futures-core", "futures-util", "http 1.4.2", "http-body 1.0.1", + "http-body-util", "pin-project-lite", + "tokio", + "tokio-util", "tower", "tower-layer", "tower-service", @@ -6868,6 +7850,36 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "db97caf9d906fbde555dd62fa95ddba9eecfd14cb388e4f491a66d74cd5fb79a" dependencies = [ "once_cell", + "valuable", +] + +[[package]] +name = "tracing-log" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" +dependencies = [ + "log", + "once_cell", + "tracing-core", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb7f578e5945fb242538965c2d0b04418d38ec25c79d160cd279bf0731c8d319" +dependencies = [ + "matchers", + "nu-ansi-term", + "once_cell", + "regex-automata", + "sharded-slab", + "smallvec", + "thread_local", + "tracing", + "tracing-core", + "tracing-log", ] [[package]] @@ -6950,12 +7962,27 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "unicase" +version = "2.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dbc4bc3a9f746d862c45cb89d705aa10f187bb96c76001afab07a0d35ce60142" + [[package]] name = "unicode-ident" version = "1.0.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6e4313cd5fcd3dad5cafa179702e2b244f760991f45397d14d4ebf38247da75" +[[package]] +name = "unicode-normalization" +version = "0.1.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5fd4f6878c9cb28d874b009da9e8d183b5abc80117c40bbd187a1fde336be6e8" +dependencies = [ + "tinyvec", +] + [[package]] name = "unicode-segmentation" version = "1.13.3" @@ -7023,6 +8050,12 @@ version = "2.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da" +[[package]] +name = "utf8-ranges" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7fcfc827f90e53a02eaef5e535ee14266c1d569214c6aa70133a624d8a3164ba" + [[package]] name = "utf8_iter" version = "1.0.4" @@ -7047,6 +8080,12 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "valuable" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" + [[package]] name = "value-bag" version = "1.12.0" @@ -7685,6 +8724,15 @@ version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1ffae5123b2d3fc086436f8834ae3ab053a283cfac8fe0a0b8eaae044768a4c4" +[[package]] +name = "wyz" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05f360fc0b24296329c78fda852a1e9ae82de9cf7b27dae4b7f62f118f77b9ed" +dependencies = [ + "tap", +] + [[package]] name = "xattr" version = "1.6.1" @@ -7701,6 +8749,12 @@ version = "0.13.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "66fee0b777b0f5ac1c69bb06d361268faafa61cd4682ae064a171c16c433e9e4" +[[package]] +name = "xxhash-rust" +version = "0.8.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fdd20c5420375476fbd4394763288da7eb0cc0b8c11deed431a91562af7335d3" + [[package]] name = "yoke" version = "0.8.3" diff --git a/native/Cargo.toml b/native/Cargo.toml index d764350e02..a34c9d0271 100644 --- a/native/Cargo.toml +++ b/native/Cargo.toml @@ -30,8 +30,8 @@ readme = "README.md" license = "Apache-2.0" edition = "2021" -# Comet uses the same minimum Rust version as DataFusion -rust-version = "1.88" +# Comet native follows the highest MSRV required by its native dependencies. +rust-version = "1.92" [workspace.dependencies] arrow = { version = "58.3.0", features = ["prettyprint", "ffi", "chrono-tz"] } @@ -61,6 +61,7 @@ aws-credential-types = "1.2.13" iceberg = { git = "https://github.com/apache/iceberg-rust", rev = "80a30d3" } iceberg-storage-opendal = { git = "https://github.com/apache/iceberg-rust", rev = "80a30d3", features = ["opendal-memory", "opendal-fs", "opendal-s3", "opendal-gcs", "opendal-oss", "opendal-azdls"] } reqsign-core = "3" +lance = { git = "https://github.com/lance-format/lance", rev = "c7c5626d4e830b46c239b5bf4e2a17e32ab901b8", default-features = false } [profile.release] debug = true diff --git a/native/core/Cargo.toml b/native/core/Cargo.toml index d9c45a08a3..cbec95a705 100644 --- a/native/core/Cargo.toml +++ b/native/core/Cargo.toml @@ -76,6 +76,7 @@ opendal = { version = "0.57.0", optional = true, features = ["services-hdfs"] } iceberg = { workspace = true } iceberg-storage-opendal = { workspace = true } reqsign-core = { workspace = true } +lance = { workspace = true, optional = true } serde_json = "1.0" uuid = "1.23.3" @@ -97,6 +98,7 @@ datafusion-functions-nested = { version = "53.1.0" } [features] backtrace = ["datafusion/backtrace"] default = ["hdfs-opendal"] +contrib-lance = ["dep:lance"] hdfs = ["datafusion-comet-objectstore-hdfs"] hdfs-opendal = ["opendal", "object_store_opendal", "hdfs-sys"] jemalloc = ["tikv-jemallocator", "tikv-jemalloc-ctl"] diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index 4a7a21006d..805be1560d 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -240,6 +240,7 @@ fn op_name(op: &OpStruct) -> &'static str { OpStruct::CsvScan(_) => "CsvScan", OpStruct::ShuffleScan(_) => "ShuffleScan", OpStruct::BroadcastNestedLoopJoin(_) => "BroadcastNestedLoopJoin", + OpStruct::LanceScan(_) => "LanceScan", } } diff --git a/native/core/src/execution/operators/lance_scan.rs b/native/core/src/execution/operators/lance_scan.rs new file mode 100644 index 0000000000..5551ef9ae4 --- /dev/null +++ b/native/core/src/execution/operators/lance_scan.rs @@ -0,0 +1,424 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Native Lance table scan operator. + +use std::any::Any; +use std::collections::HashMap; +use std::fmt; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; + +use arrow::array::RecordBatch; +use arrow::datatypes::SchemaRef; +use datafusion::common::{DataFusionError, Result as DFResult}; +use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext}; +use datafusion::physical_expr::EquivalenceProperties; +use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; +use datafusion::physical_plan::metrics::{ + BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet, +}; +use datafusion::physical_plan::{ + DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, +}; +use futures::future::{BoxFuture, FutureExt}; +use futures::Stream; +use lance::dataset::builder::DatasetBuilder; + +use crate::execution::operators::ExecutionError; + +#[derive(Debug)] +pub struct LanceScanExec { + dataset_uri: String, + resolved_version: u64, + storage_options: HashMap, + output_schema: SchemaRef, + projection_names: Vec, + filter_sql: Option, + limit: Option, + offset: Option, + batch_size: Option, + spark_partition_index: u32, + fragment_ids: Vec, + plan_properties: Arc, + metrics: ExecutionPlanMetricsSet, +} + +impl LanceScanExec { + pub fn try_new( + dataset_uri: String, + resolved_version: i64, + storage_options: HashMap, + output_schema: SchemaRef, + filter_sql: Option, + limit: Option, + offset: Option, + batch_size: u32, + spark_partition_index: u32, + fragment_ids: Vec, + ) -> Result { + if dataset_uri.is_empty() { + return Err(ExecutionError::GeneralError( + "LanceScan missing dataset_uri".to_string(), + )); + } + + let resolved_version = resolved_version.try_into().map_err(|_| { + ExecutionError::GeneralError(format!( + "LanceScan resolved_version must be non-negative, got {resolved_version}" + )) + })?; + + validate_optional_non_negative("limit", limit)?; + validate_optional_non_negative("offset", offset)?; + validate_ordered_fragment_ids(&fragment_ids)?; + + let projection_names = output_schema + .fields() + .iter() + .map(|field| field.name().to_string()) + .collect(); + let plan_properties = Self::compute_properties(Arc::clone(&output_schema)); + let metrics = ExecutionPlanMetricsSet::new(); + let filter_sql = filter_sql.filter(|filter| !filter.is_empty()); + let limit = limit.filter(|limit| *limit != 0); + let offset = offset.filter(|offset| *offset != 0); + let batch_size = (batch_size != 0).then_some(batch_size as usize); + + Ok(Self { + dataset_uri, + resolved_version, + storage_options, + output_schema, + projection_names, + filter_sql, + limit, + offset, + batch_size, + spark_partition_index, + fragment_ids, + plan_properties, + metrics, + }) + } + + fn compute_properties(schema: SchemaRef) -> Arc { + Arc::new(PlanProperties::new( + EquivalenceProperties::new(schema), + Partitioning::UnknownPartitioning(1), + EmissionType::Incremental, + Boundedness::Bounded, + )) + } + + async fn open_stream( + dataset_uri: String, + resolved_version: u64, + storage_options: HashMap, + projection_names: Vec, + filter_sql: Option, + limit: Option, + offset: Option, + batch_size: Option, + fragment_ids: Vec, + ) -> DFResult { + let dataset = DatasetBuilder::from_uri(dataset_uri) + .with_version(resolved_version) + .with_storage_options(storage_options) + .load() + .await + .map_err(lance_error)?; + + let file_fragments = dataset.get_frags_from_ordered_ids(&fragment_ids); + let mut fragments = Vec::with_capacity(file_fragments.len()); + for (fragment_id, fragment) in fragment_ids.iter().zip(file_fragments.into_iter()) { + let fragment = fragment.ok_or_else(|| { + DataFusionError::Execution(format!( + "LanceScan requested missing fragment id {fragment_id}" + )) + })?; + fragments.push(fragment.metadata().clone()); + } + + let mut scanner = dataset.scan(); + scanner.with_fragments(fragments); + if projection_names.is_empty() { + scanner.project(&[] as &[&str]).map_err(lance_error)?; + } else { + scanner.project(&projection_names).map_err(lance_error)?; + } + if let Some(filter) = filter_sql { + scanner.filter(&filter).map_err(lance_error)?; + } + if limit.is_some() || offset.is_some() { + scanner.limit(limit, offset).map_err(lance_error)?; + } + if let Some(batch_size) = batch_size { + scanner.batch_size(batch_size); + } + + Ok(scanner.try_into_stream().await.map_err(lance_error)?.into()) + } +} + +impl ExecutionPlan for LanceScanExec { + fn name(&self) -> &str { + "LanceScanExec" + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + Arc::clone(&self.output_schema) + } + + fn properties(&self) -> &Arc { + &self.plan_properties + } + + fn children(&self) -> Vec<&Arc> { + vec![] + } + + fn with_new_children( + self: Arc, + _children: Vec>, + ) -> DFResult> { + Ok(self) + } + + fn execute( + &self, + partition: usize, + _context: Arc, + ) -> DFResult { + if partition != 0 { + return Err(DataFusionError::Execution(format!( + "LanceScanExec has one native partition, got partition {partition}" + ))); + } + + let metrics = LanceScanMetrics::new(&self.metrics); + metrics.fragment_count.add(self.fragment_ids.len()); + + let open_future = Self::open_stream( + self.dataset_uri.clone(), + self.resolved_version, + self.storage_options.clone(), + self.projection_names.clone(), + self.filter_sql.clone(), + self.limit, + self.offset, + self.batch_size, + self.fragment_ids.clone(), + ) + .boxed(); + + Ok(Box::pin(LanceScanStream { + state: LanceScanStreamState::Opening(open_future), + schema: Arc::clone(&self.output_schema), + baseline_metrics: metrics.baseline, + })) + } + + fn metrics(&self) -> Option { + Some(self.metrics.clone_inner()) + } +} + +impl DisplayAs for LanceScanExec { + fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result { + write!( + f, + "LanceScanExec: dataset_uri={}, version={}, spark_partition={}, fragments={}", + self.dataset_uri, + self.resolved_version, + self.spark_partition_index, + self.fragment_ids.len() + ) + } +} + +struct LanceScanMetrics { + baseline: BaselineMetrics, + fragment_count: Count, +} + +impl LanceScanMetrics { + fn new(metrics: &ExecutionPlanMetricsSet) -> Self { + Self { + baseline: BaselineMetrics::new(metrics, 0), + fragment_count: MetricBuilder::new(metrics).counter("fragment_count", 0), + } + } +} + +enum LanceScanStreamState { + Opening(BoxFuture<'static, DFResult>), + Scanning(SendableRecordBatchStream), + Done, +} + +struct LanceScanStream { + state: LanceScanStreamState, + schema: SchemaRef, + baseline_metrics: BaselineMetrics, +} + +impl Stream for LanceScanStream { + type Item = DFResult; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + loop { + match &mut this.state { + LanceScanStreamState::Opening(open_future) => match open_future.as_mut().poll(cx) { + Poll::Ready(Ok(stream)) => { + this.state = LanceScanStreamState::Scanning(stream); + } + Poll::Ready(Err(err)) => { + this.state = LanceScanStreamState::Done; + return this + .baseline_metrics + .record_poll(Poll::Ready(Some(Err(err)))); + } + Poll::Pending => return this.baseline_metrics.record_poll(Poll::Pending), + }, + LanceScanStreamState::Scanning(stream) => { + let poll = stream.as_mut().poll_next(cx); + if matches!(poll, Poll::Ready(None)) { + this.state = LanceScanStreamState::Done; + } + return this.baseline_metrics.record_poll(poll); + } + LanceScanStreamState::Done => return Poll::Ready(None), + } + } + } +} + +impl RecordBatchStream for LanceScanStream { + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } +} + +fn validate_optional_non_negative(name: &str, value: Option) -> Result<(), ExecutionError> { + if matches!(value, Some(value) if value < 0) { + return Err(ExecutionError::GeneralError(format!( + "LanceScan {name} must be non-negative, got {}", + value.unwrap() + ))); + } + Ok(()) +} + +fn validate_ordered_fragment_ids(fragment_ids: &[u32]) -> Result<(), ExecutionError> { + if fragment_ids.windows(2).any(|window| window[0] >= window[1]) { + return Err(ExecutionError::GeneralError( + "LanceScan fragment_ids must be strictly increasing".to_string(), + )); + } + Ok(()) +} + +fn lance_error(error: impl fmt::Display) -> DataFusionError { + DataFusionError::Execution(format!("Lance scan error: {error}")) +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow::datatypes::{DataType, Field, Schema}; + + #[test] + fn rejects_invalid_descriptor_values() { + struct TestCase { + name: &'static str, + resolved_version: i64, + limit: Option, + offset: Option, + fragment_ids: Vec, + expected_error: &'static str, + } + + let cases = vec![ + TestCase { + name: "negative version", + resolved_version: -1, + limit: None, + offset: None, + fragment_ids: vec![1], + expected_error: "resolved_version must be non-negative", + }, + TestCase { + name: "negative limit", + resolved_version: 1, + limit: Some(-1), + offset: None, + fragment_ids: vec![1], + expected_error: "limit must be non-negative", + }, + TestCase { + name: "negative offset", + resolved_version: 1, + limit: None, + offset: Some(-1), + fragment_ids: vec![1], + expected_error: "offset must be non-negative", + }, + TestCase { + name: "unordered fragments", + resolved_version: 1, + limit: None, + offset: None, + fragment_ids: vec![2, 1], + expected_error: "fragment_ids must be strictly increasing", + }, + ]; + + for case in cases { + let err = LanceScanExec::try_new( + "file:///tmp/table.lance".to_string(), + case.resolved_version, + HashMap::new(), + test_schema(), + None, + case.limit, + case.offset, + 0, + 0, + case.fragment_ids, + ) + .expect_err(case.name); + assert!( + err.to_string().contains(case.expected_error), + "{}: expected error containing {:?}, got {:?}", + case.name, + case.expected_error, + err + ); + } + } + + fn test_schema() -> SchemaRef { + Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, true)])) + } +} diff --git a/native/core/src/execution/operators/mod.rs b/native/core/src/execution/operators/mod.rs index d68252bd9b..25e3f9bbd0 100644 --- a/native/core/src/execution/operators/mod.rs +++ b/native/core/src/execution/operators/mod.rs @@ -22,6 +22,8 @@ pub use crate::errors::ExecutionError; pub use aligned_stream_reader::*; pub use copy::*; pub use iceberg_scan::*; +#[cfg(feature = "contrib-lance")] +pub use lance_scan::LanceScanExec; pub use scan::*; mod aligned_stream_reader; @@ -29,6 +31,8 @@ mod copy; mod expand; pub use expand::ExpandExec; mod iceberg_scan; +#[cfg(feature = "contrib-lance")] +mod lance_scan; mod parquet_writer; pub use parquet_writer::ParquetWriterExec; mod csv_scan; diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index c07a92d700..c67af46e5b 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -79,6 +79,8 @@ use datafusion_spark::function::aggregate::collect::SparkCollectSet; use iceberg::expr::Bind; use crate::execution::operators::ExecutionError::GeneralError; +#[cfg(feature = "contrib-lance")] +use crate::execution::operators::LanceScanExec; use crate::execution::shuffle::{CometPartitioning, CompressionCodec}; use crate::execution::spark_plan::SparkPlan; use crate::parquet::parquet_support::prepare_object_store_with_configs; @@ -1597,6 +1599,62 @@ impl PhysicalPlanner { )), )) } + OpStruct::LanceScan(scan) => { + #[cfg(feature = "contrib-lance")] + { + let common = scan + .common + .as_ref() + .ok_or_else(|| GeneralError("LanceScan missing common data".into()))?; + let partition = scan + .partition + .as_ref() + .ok_or_else(|| GeneralError("LanceScan missing partition data".into()))?; + + let output_schema_fields = if common.projected_schema.is_empty() { + common.required_schema.as_slice() + } else { + common.projected_schema.as_slice() + }; + let output_schema = convert_spark_types_to_arrow_schema(output_schema_fields); + let storage_options: HashMap = common + .storage_options + .iter() + .map(|(k, v)| (k.clone(), v.clone())) + .collect(); + + let lance_scan = LanceScanExec::try_new( + common.dataset_uri.clone(), + common.resolved_version, + storage_options, + output_schema, + common.filter_sql.clone(), + common.limit, + common.offset, + common.batch_size, + partition.partition_index, + partition.fragment_ids.clone(), + )?; + + Ok(( + vec![], + vec![], + Arc::new(SparkPlan::new( + spark_plan.plan_id, + Arc::new(lance_scan), + vec![], + )), + )) + } + #[cfg(not(feature = "contrib-lance"))] + { + let _ = scan; + Err(GeneralError( + "Native Lance scan requires a Comet native build with the contrib-lance feature" + .to_string(), + )) + } + } OpStruct::ShuffleWriter(writer) => { assert_eq!(children.len(), 1); let (scans, shuffle_scans, child) = diff --git a/native/core/src/execution/planner/operator_registry.rs b/native/core/src/execution/planner/operator_registry.rs index dc98a32fb0..314f8fa005 100644 --- a/native/core/src/execution/planner/operator_registry.rs +++ b/native/core/src/execution/planner/operator_registry.rs @@ -46,6 +46,7 @@ pub enum OperatorType { Scan, NativeScan, IcebergScan, + LanceScan, Projection, Filter, HashAgg, @@ -142,6 +143,7 @@ fn get_operator_type(spark_operator: &Operator) -> Option { OpStruct::Scan(_) => Some(OperatorType::Scan), OpStruct::NativeScan(_) => Some(OperatorType::NativeScan), OpStruct::IcebergScan(_) => Some(OperatorType::IcebergScan), + OpStruct::LanceScan(_) => Some(OperatorType::LanceScan), OpStruct::ShuffleWriter(_) => Some(OperatorType::ShuffleWriter), OpStruct::ParquetWriter(_) => Some(OperatorType::ParquetWriter), OpStruct::Expand(_) => Some(OperatorType::Expand), diff --git a/native/proto/src/proto/operator.proto b/native/proto/src/proto/operator.proto index 15265f1d86..63a2b9dd6a 100644 --- a/native/proto/src/proto/operator.proto +++ b/native/proto/src/proto/operator.proto @@ -54,6 +54,7 @@ message Operator { CsvScan csv_scan = 115; ShuffleScan shuffle_scan = 116; BroadcastNestedLoopJoin broadcast_nested_loop_join = 117; + LanceScan lance_scan = 118; } } @@ -220,6 +221,53 @@ message IcebergScan { repeated IcebergFileScanTask file_scan_tasks = 2; } +// Common data shared by all partitions for native Lance scans. +message LanceScanCommon { + // Stable key for matching this scan during per-partition data injection. + string scan_id = 1; + + // Schema Spark requested from the Lance scan. + repeated SparkStructField required_schema = 2; + + // Informational class name of Lance's native scan plan object. + string native_scan_plan_class = 3; + + // Lance dataset location and version resolved during Spark-side planning. + string dataset_uri = 4; + int64 resolved_version = 5; + + // Storage options captured by Lance Spark planning. + map storage_options = 6; + + // Schema after projection pruning, as planned by Lance Spark. + repeated SparkStructField projected_schema = 7; + + // Optional pushdowns captured by Lance Spark planning. + optional string filter_sql = 8; + optional int64 limit = 9; + optional int64 offset = 10; + + // Preferred native batch size. + uint32 batch_size = 11; + + // Version of the Lance Spark native scan descriptor contract. + uint32 descriptor_version = 12; +} + +// Per-partition Lance scan split. +message LanceScanPartition { + uint32 partition_index = 1; + repeated uint32 fragment_ids = 2; +} + +message LanceScan { + // Common data shared across partitions. + LanceScanCommon common = 1; + + // Single partition's scan data, injected at execution time. + LanceScanPartition partition = 2; +} + // Helper message for deduplicating field ID lists message ProjectFieldIdList { repeated int32 field_ids = 1; diff --git a/pom.xml b/pom.xml index 95444845e1..19e31c1b22 100644 --- a/pom.xml +++ b/pom.xml @@ -744,6 +744,13 @@ under the License. + + contrib-lance + + true + + + scala-2.12 diff --git a/spark/pom.xml b/spark/pom.xml index 7dd3c6fe33..4fcfc389ac 100644 --- a/spark/pom.xml +++ b/spark/pom.xml @@ -331,6 +331,31 @@ under the License. + + contrib-lance + + + + org.codehaus.mojo + build-helper-maven-plugin + + + add-contrib-lance-source + generate-sources + + add-source + + + + src/contrib-lance/scala + + + + + + + + generate-docs diff --git a/spark/src/contrib-lance/scala/org/apache/comet/lance/CometLanceSupport.scala b/spark/src/contrib-lance/scala/org/apache/comet/lance/CometLanceSupport.scala new file mode 100644 index 0000000000..69d2ed2a44 --- /dev/null +++ b/spark/src/contrib-lance/scala/org/apache/comet/lance/CometLanceSupport.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.lance + +import scala.collection.mutable.ListBuffer + +import org.apache.spark.sql.comet.CometBatchScanExec +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.datasources.v2.BatchScanExec + +import org.apache.comet.CometSparkSessionExtensions.withFallbackReasons +import org.apache.comet.serde.OperatorOuterClass.Operator +import org.apache.comet.serde.operator.CometLanceNativeScan + +object CometLanceSupport { + + def tryTransform(scanExec: BatchScanExec, nativeScanPlan: Object): Option[SparkPlan] = { + val fallbackReasons = new ListBuffer[String]() + val schemaSupported = + CometBatchScanExec.isSchemaSupported(scanExec.scan.readSchema(), fallbackReasons) + + if (!schemaSupported) { + fallbackReasons += s"Schema ${scanExec.scan.readSchema()} is not supported" + withFallbackReasons(scanExec, fallbackReasons.toSet) + None + } else { + val builder = Operator.newBuilder().setPlanId(scanExec.id) + CometLanceNativeScan + .convert(scanExec, builder, Option(nativeScanPlan)) + .map { nativeOp => + CometLanceNativeScan.createExec(nativeOp, scanExec, Option(nativeScanPlan)) + } + } + } +} diff --git a/spark/src/contrib-lance/scala/org/apache/comet/serde/operator/CometLanceNativeScan.scala b/spark/src/contrib-lance/scala/org/apache/comet/serde/operator/CometLanceNativeScan.scala new file mode 100644 index 0000000000..e20ecf1df8 --- /dev/null +++ b/spark/src/contrib-lance/scala/org/apache/comet/serde/operator/CometLanceNativeScan.scala @@ -0,0 +1,341 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.serde.operator + +import java.lang.reflect.InvocationTargetException + +import scala.jdk.CollectionConverters._ +import scala.util.control.NonFatal + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.comet.{CometLanceNativeScanExec, CometNativeExec, SerializedPlan} +import org.apache.spark.sql.execution.datasources.v2.BatchScanExec +import org.apache.spark.sql.types.{DataType, StructType} + +import org.apache.comet.{CometConf, ConfigEntry} +import org.apache.comet.serde.{CometOperatorSerde, Compatible, OperatorOuterClass, SupportLevel} +import org.apache.comet.serde.OperatorOuterClass.Operator + +object CometLanceNativeScan extends CometOperatorSerde[BatchScanExec] with Logging { + + case class LanceNativeScanSplitDescriptor(partitionIndex: Int, fragmentIds: Seq[Int]) + + case class LanceNativeScanDescriptor( + descriptorVersion: Int, + scanId: String, + datasetUri: String, + resolvedVersion: Long, + storageOptions: Map[String, String], + requiredSchema: StructType, + projectedSchema: StructType, + filterSql: Option[String], + limit: Option[Long], + offset: Option[Long], + batchSize: Int, + nativeScanPlanClass: String, + splits: Seq[LanceNativeScanSplitDescriptor]) + + override def enabledConfig: Option[ConfigEntry[Boolean]] = + Some(CometConf.COMET_LANCE_NATIVE_ENABLED) + + override def getSupportLevel(operator: BatchScanExec): SupportLevel = Compatible() + + override def convert( + scanExec: BatchScanExec, + builder: Operator.Builder, + childOp: Operator*): Option[Operator] = + convert(scanExec, builder, None) + + def convert( + scanExec: BatchScanExec, + builder: Operator.Builder, + nativeScanPlan: Option[Any]): Option[Operator] = { + val descriptor = descriptorFor(scanExec, nativeScanPlan) + + val lanceScanBuilder = OperatorOuterClass.LanceScan + .newBuilder() + .setCommon(commonFromDescriptor(descriptor)) + + builder.clearChildren() + Some(builder.setLanceScan(lanceScanBuilder).build()) + } + + override def createExec(nativeOp: Operator, op: BatchScanExec): CometNativeExec = + createExec(nativeOp, op, None) + + def createExec( + nativeOp: Operator, + op: BatchScanExec, + nativeScanPlan: Option[Any]): CometNativeExec = { + val descriptor = descriptorFor(op, nativeScanPlan) + val exec = CometLanceNativeScanExec( + nativeOp, + op.output, + op.runtimeFilters, + op, + SerializedPlan(None), + descriptor.scanId, + descriptor) + op.logicalLink.foreach(exec.setLogicalLink) + exec + } + + def serializePartitions(descriptor: LanceNativeScanDescriptor): (Array[Byte], Array[Array[Byte]]) = + ( + commonFromDescriptor(descriptor).toByteArray, + descriptor.splits.map { split => + val partition = OperatorOuterClass.LanceScanPartition + .newBuilder() + .setPartitionIndex(split.partitionIndex) + .addAllFragmentIds(split.fragmentIds.map(Int.box).asJava) + .build() + + OperatorOuterClass.LanceScan + .newBuilder() + .setPartition(partition) + .build() + .toByteArray + }.toArray) + + private[comet] def serializeNativePlan( + nativeScanPlan: Any, + fallbackScanId: String, + fallbackRequiredSchema: StructType): (Array[Byte], Array[Array[Byte]]) = { + serializePartitions(descriptorFromNativePlan( + nativeScanPlan, + fallbackScanId, + fallbackRequiredSchema)) + } + + private def descriptorFor( + scanExec: BatchScanExec, + nativeScanPlan: Option[Any]): LanceNativeScanDescriptor = { + val fallbackScanId = scanKey(scanExec) + val fallbackRequiredSchema = scanExec.scan.readSchema() + nativeScanPlan + .map(descriptorFromNativePlan(_, fallbackScanId, fallbackRequiredSchema)) + .getOrElse(fallbackDescriptor(fallbackScanId, fallbackRequiredSchema)) + } + + private def descriptorFromNativePlan( + nativeScanPlan: Any, + fallbackScanId: String, + fallbackRequiredSchema: StructType): LanceNativeScanDescriptor = { + val requiredSchema = + structTypeFromJson( + requireString(invokeRequired(nativeScanPlan, "getSparkReadSchemaJson")), + "getSparkReadSchemaJson") + val projectedSchema = + structTypeFromJson( + requireString(invokeRequired(nativeScanPlan, "getProjectedReadSchemaJson")), + "getProjectedReadSchemaJson") + + LanceNativeScanDescriptor( + descriptorVersion = toUInt32( + invokeRequired(nativeScanPlan, "getDescriptorVersion"), + "getDescriptorVersion"), + scanId = nonEmptyString( + invokeRequired(nativeScanPlan, "getScanId"), + fallbackScanId), + datasetUri = requireString(invokeRequired(nativeScanPlan, "getDatasetUri")), + resolvedVersion = toLong(invokeRequired(nativeScanPlan, "getResolvedVersion")), + storageOptions = toStringMap(invokeRequired(nativeScanPlan, "getStorageOptions")), + requiredSchema = requiredSchema, + projectedSchema = projectedSchema, + filterSql = optionalString(nativeScanPlan, "hasPushedFilterSql", "getPushedFilterSql"), + limit = optionalLong(nativeScanPlan, "hasLimit", "getLimit"), + offset = optionalLong(nativeScanPlan, "hasOffset", "getOffset"), + batchSize = toUInt32(invokeRequired(nativeScanPlan, "getBatchSize"), "getBatchSize"), + nativeScanPlanClass = nativeScanPlan.getClass.getName, + splits = toSeq(invokeRequired(nativeScanPlan, "getSplits")).map(splitFromNativeSplit)) + } + + private def fallbackDescriptor( + scanId: String, + requiredSchema: StructType): LanceNativeScanDescriptor = + LanceNativeScanDescriptor( + descriptorVersion = 0, + scanId = scanId, + datasetUri = "", + resolvedVersion = 0L, + storageOptions = Map.empty, + requiredSchema = requiredSchema, + projectedSchema = requiredSchema, + filterSql = None, + limit = None, + offset = None, + batchSize = 0, + nativeScanPlanClass = "", + splits = Seq(LanceNativeScanSplitDescriptor(0, Nil))) + + private def commonFromDescriptor( + descriptor: LanceNativeScanDescriptor): OperatorOuterClass.LanceScanCommon = { + val commonBuilder = OperatorOuterClass.LanceScanCommon + .newBuilder() + .setScanId(descriptor.scanId) + .setNativeScanPlanClass(descriptor.nativeScanPlanClass) + .setDatasetUri(descriptor.datasetUri) + .setResolvedVersion(descriptor.resolvedVersion) + .putAllStorageOptions(descriptor.storageOptions.asJava) + .addAllRequiredSchema(schema2Proto(descriptor.requiredSchema.fields).toSeq.asJava) + .addAllProjectedSchema(schema2Proto(descriptor.projectedSchema.fields).toSeq.asJava) + .setBatchSize(descriptor.batchSize) + .setDescriptorVersion(descriptor.descriptorVersion) + + descriptor.filterSql.foreach(commonBuilder.setFilterSql) + descriptor.limit.foreach(commonBuilder.setLimit) + descriptor.offset.foreach(commonBuilder.setOffset) + commonBuilder.build() + } + + private def splitFromNativeSplit(nativeSplit: Any): LanceNativeScanSplitDescriptor = + LanceNativeScanSplitDescriptor( + partitionIndex = toUInt32(invokeRequired(nativeSplit, "getSplitIndex"), "getSplitIndex"), + fragmentIds = toSeq(invokeRequired(nativeSplit, "getFragmentIds")) + .map(toUInt32(_, "getFragmentIds"))) + + private def structTypeFromJson(json: String, methodName: String): StructType = + try { + DataType.fromJson(json) match { + case schema: StructType => schema + case other => + throw new IllegalArgumentException( + s"expected StructType JSON but got ${other.typeName}") + } + } catch { + case NonFatal(e) => + throw new IllegalArgumentException( + s"Native Lance scan descriptor method $methodName returned invalid Spark schema JSON", + e) + } + + private def optionalString(target: Any, hasMethod: String, valueMethod: String): Option[String] = + if (toBoolean(invokeRequired(target, hasMethod))) { + Some(requireString(invokeRequired(target, valueMethod))) + } else { + None + } + + private def optionalLong(target: Any, hasMethod: String, valueMethod: String): Option[Long] = + if (toBoolean(invokeRequired(target, hasMethod))) { + Some(toLong(invokeRequired(target, valueMethod))) + } else { + None + } + + private def invokeRequired(target: Any, methodName: String): Any = { + require(target != null, s"Native Lance scan descriptor target is null for $methodName") + try { + findNoArgMethod(target.getClass, methodName) + .getOrElse { + throw new NoSuchMethodException(s"${target.getClass.getName}.$methodName()") + } + .invoke(target.asInstanceOf[AnyRef]) + } catch { + case e: InvocationTargetException if e.getCause != null => + throw e.getCause + case NonFatal(e) => + throw new IllegalArgumentException( + s"Unable to read native Lance scan descriptor method $methodName", + e) + } + } + + private def findNoArgMethod( + clazz: Class[_], + methodName: String): Option[java.lang.reflect.Method] = { + var current = clazz + while (current != null) { + try { + val method = current.getDeclaredMethod(methodName) + method.setAccessible(true) + return Some(method) + } catch { + case _: NoSuchMethodException => + current = current.getSuperclass + } + } + None + } + + private def toSeq(value: Any): Seq[Any] = value match { + case null => Seq.empty + case values: java.lang.Iterable[_] => values.asScala.toSeq + case values: Iterable[_] => values.toSeq + case values: Array[_] => values.toSeq + case other => + throw new IllegalArgumentException( + s"Expected a collection in native Lance scan descriptor, got ${other.getClass.getName}") + } + + private def toStringMap(value: Any): Map[String, String] = value match { + case null => Map.empty + case values: java.util.Map[_, _] => + values.asScala.map { case (key, value) => key.toString -> value.toString }.toMap + case values: collection.Map[_, _] => + values.map { case (key, value) => key.toString -> value.toString }.toMap + case other => + throw new IllegalArgumentException( + s"Expected a map in native Lance scan descriptor, got ${other.getClass.getName}") + } + + private def toBoolean(value: Any): Boolean = value match { + case value: java.lang.Boolean => value.booleanValue() + case value: Boolean => value + case other => + throw new IllegalArgumentException( + s"Expected boolean in native Lance scan descriptor, got ${typeName(other)}") + } + + private def toLong(value: Any): Long = value match { + case value: java.lang.Number => value.longValue() + case value: String => value.toLong + case other => + throw new IllegalArgumentException( + s"Expected integer in native Lance scan descriptor, got ${typeName(other)}") + } + + private def toUInt32(value: Any, methodName: String): Int = { + val longValue = toLong(value) + if (longValue < 0 || longValue > 0xffffffffL) { + throw new IllegalArgumentException( + s"Native Lance scan descriptor method $methodName returned out-of-range uint32 " + + s"value $longValue") + } + longValue.toInt + } + + private def requireString(value: Any): String = value match { + case null => "" + case value: String => value + case other => other.toString + } + + private def nonEmptyString(value: Any, fallback: String): String = { + val stringValue = requireString(value) + if (stringValue.nonEmpty) stringValue else fallback + } + + private def typeName(value: Any): String = + Option(value).map(_.getClass.getName).getOrElse("null") + + private def scanKey(scanExec: BatchScanExec): String = + s"lance_${scanExec.id}_${scanExec.scan.hashCode()}" +} diff --git a/spark/src/contrib-lance/scala/org/apache/spark/sql/comet/CometLanceNativeScanExec.scala b/spark/src/contrib-lance/scala/org/apache/spark/sql/comet/CometLanceNativeScanExec.scala new file mode 100644 index 0000000000..5843e92536 --- /dev/null +++ b/spark/src/contrib-lance/scala/org/apache/spark/sql/comet/CometLanceNativeScanExec.scala @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.comet + +import scala.jdk.CollectionConverters._ + +import org.apache.spark.{Partition, TaskContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, SortOrder} +import org.apache.spark.sql.catalyst.plans.QueryPlan +import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} +import org.apache.spark.sql.execution.datasources.v2.BatchScanExec +import org.apache.spark.sql.vectorized.ColumnarBatch + +import com.google.common.base.Objects + +import org.apache.comet.serde.OperatorOuterClass.Operator +import org.apache.comet.serde.operator.CometLanceNativeScan +import org.apache.comet.serde.operator.CometLanceNativeScan.LanceNativeScanDescriptor + +case class CometLanceNativeScanExec( + override val nativeOp: Operator, + override val output: Seq[Attribute], + runtimeFilters: Seq[Expression], + @transient originalPlan: BatchScanExec, + override val serializedPlanOpt: SerializedPlan, + override val sourceKey: String, + lanceDescriptor: LanceNativeScanDescriptor) + extends CometLeafExec + with CometLanceNativeScanLike { + + override val supportsColumnar: Boolean = true + + override val nodeName: String = "CometLanceNativeScan" + + @transient private lazy val serializedPartitionData: (Array[Byte], Array[Array[Byte]]) = + CometLanceNativeScan.serializePartitions(lanceDescriptor) + + override def commonData: Array[Byte] = serializedPartitionData._1 + + override def perPartitionData: Array[Array[Byte]] = serializedPartitionData._2 + + override lazy val outputPartitioning: Partitioning = + UnknownPartitioning(perPartitionData.length) + + override lazy val outputOrdering: Seq[SortOrder] = Nil + + override def doExecuteColumnar(): RDD[ColumnarBatch] = { + val nativeMetrics = CometMetricNode.fromCometPlan(this) + val serializedPlan = CometExec.serializeNativePlan(nativeOp) + new CometExecRDD( + sparkContext, + inputRDDs = Seq.empty, + commonByKey = Map(sourceKey -> commonData), + perPartitionByKey = Map(sourceKey -> perPartitionData), + serializedPlan = serializedPlan, + defaultNumPartitions = perPartitionData.length, + numOutputCols = output.length, + nativeMetrics = nativeMetrics, + subqueries = Seq.empty) { + override def compute(split: Partition, context: TaskContext): Iterator[ColumnarBatch] = { + val res = super.compute(split, context) + Option(context).foreach(nativeMetrics.reportScanInputMetrics) + res + } + } + } + + override def convertBlock(): CometLanceNativeScanExec = { + val newSerializedPlan = if (serializedPlanOpt.isEmpty) { + SerializedPlan(Some(CometExec.serializeNativePlan(nativeOp))) + } else { + serializedPlanOpt + } + + CometLanceNativeScanExec( + nativeOp, + output, + runtimeFilters, + originalPlan, + newSerializedPlan, + sourceKey, + lanceDescriptor) + } + + override protected def doCanonicalize(): CometLanceNativeScanExec = { + CometLanceNativeScanExec( + nativeOp, + output.map(QueryPlan.normalizeExpressions(_, output)), + QueryPlan.normalizePredicates( + CometScanUtils.filterUnusedDynamicPruningExpressions(runtimeFilters), + output), + null, + SerializedPlan(None), + sourceKey, + lanceDescriptor) + } + + override def stringArgs: Iterator[Any] = + Iterator(output, s"$sourceKey, nativeScanPlan=${lanceDescriptor.nativeScanPlanClass}") + + override def equals(obj: Any): Boolean = obj match { + case other: CometLanceNativeScanExec => + this.sourceKey == other.sourceKey && + this.output == other.output && + this.runtimeFilters == other.runtimeFilters && + this.serializedPlanOpt == other.serializedPlanOpt + case _ => false + } + + override def hashCode(): Int = + Objects.hashCode(sourceKey, output.asJava, runtimeFilters, serializedPlanOpt) +} diff --git a/spark/src/main/scala/org/apache/comet/CometConf.scala b/spark/src/main/scala/org/apache/comet/CometConf.scala index aabd64b9b3..5b9e0365a6 100644 --- a/spark/src/main/scala/org/apache/comet/CometConf.scala +++ b/spark/src/main/scala/org/apache/comet/CometConf.scala @@ -121,6 +121,14 @@ object CometConf extends ShimCometConf { .booleanConf .createWithDefault(true) + val COMET_LANCE_NATIVE_ENABLED: ConfigEntry[Boolean] = + conf("spark.comet.scan.lanceNative.enabled") + .category(CATEGORY_SCAN) + .doc("Whether to enable native Lance table scans through the optional contrib-lance " + + "integration. This is an experimental scaffold and is disabled by default.") + .booleanConf + .createWithDefault(false) + val COMET_ICEBERG_DATA_FILE_CONCURRENCY_LIMIT: ConfigEntry[Int] = conf("spark.comet.scan.icebergNative.dataFileConcurrencyLimit") .category(CATEGORY_SCAN) diff --git a/spark/src/main/scala/org/apache/comet/lance/LanceIntegration.scala b/spark/src/main/scala/org/apache/comet/lance/LanceIntegration.scala new file mode 100644 index 0000000000..5424bb5f6d --- /dev/null +++ b/spark/src/main/scala/org/apache/comet/lance/LanceIntegration.scala @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.lance + +import java.lang.reflect.InvocationTargetException +import java.util.{Optional => JOptional} + +import scala.util.control.NonFatal + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.datasources.v2.BatchScanExec + +import org.apache.comet.CometConf +import org.apache.comet.CometSparkSessionExtensions.withFallbackReason + +/** + * Reflection-only bridge for optional Lance Spark integration. + * + * Default Comet builds must not depend on Lance classes. This object treats both Lance Spark and + * the Comet contrib-lance scaffold as optional runtime classes and falls back cleanly when either + * side is absent. + */ +object LanceIntegration extends Logging { + + private val LanceScanClassName = "org.lance.spark.read.LanceScan" + private val NativeScanPlanMethod = "nativeScanPlan" + private val ContribSupportModule = "org.apache.comet.lance.CometLanceSupport$" + + def isLanceScan(scan: Any): Boolean = { + scan != null && { + scan.getClass.getName == LanceScanClassName || + loadClass(LanceScanClassName).exists(_.isInstance(scan)) + } + } + + def nativeScanPlan(scan: Any): Option[Any] = + if (isLanceScan(scan)) { + invokeNativeScanPlan(scan) + } else { + None + } + + def tryCreateNativeScan(scanExec: BatchScanExec): Option[SparkPlan] = { + if (!CometConf.COMET_LANCE_NATIVE_ENABLED.get(scanExec.conf)) { + withFallbackReason( + scanExec, + s"Native Lance scan disabled because ${CometConf.COMET_LANCE_NATIVE_ENABLED.key} " + + "is not enabled") + return None + } + + if (!CometConf.COMET_EXEC_ENABLED.get(scanExec.conf)) { + withFallbackReason( + scanExec, + s"Native Lance scan disabled because ${CometConf.COMET_EXEC_ENABLED.key} is not enabled") + return None + } + + val nativePlan = nativeScanPlan(scanExec.scan) match { + case Some(plan) => plan + case None => + withFallbackReason( + scanExec, + s"Native Lance scan disabled because $LanceScanClassName.$NativeScanPlanMethod() " + + "is not available") + return None + } + + val support = loadContribSupport match { + case Some(module) => module + case None => + withFallbackReason( + scanExec, + "Native Lance scan disabled because the contrib-lance build profile is not present") + return None + } + + try { + val method = + support.getClass.getMethod("tryTransform", classOf[BatchScanExec], classOf[Object]) + method.invoke(support, scanExec, nativePlan.asInstanceOf[AnyRef]) match { + case plan: Option[_] => plan.asInstanceOf[Option[SparkPlan]] + case other => + logWarning( + "Native Lance scan disabled because contrib-lance returned unexpected " + + s"result: ${Option(other).map(_.getClass.getName).getOrElse("null")}") + None + } + } catch { + case e: InvocationTargetException => + val cause = Option(e.getCause).getOrElse(e) + logWarning( + "Native Lance scan disabled because contrib-lance threw during reflection: " + + s"${cause.getClass.getName}: ${cause.getMessage}", + cause) + None + case NonFatal(e) => + logWarning(s"Native Lance scan disabled by contrib-lance reflection failure: $e") + None + } + } + + private[comet] def invokeNativeScanPlan(scan: Any): Option[Any] = { + try { + findNoArgMethod(scan.getClass, NativeScanPlanMethod) + .flatMap { method => + optionalResult(method.invoke(scan)) + } + } catch { + case e: InvocationTargetException => + logWarning( + s"Native Lance scan disabled because $NativeScanPlanMethod() threw: " + + s"${Option(e.getCause).map(_.getMessage).getOrElse(e.getMessage)}") + None + case NonFatal(e) => + logWarning(s"Native Lance scan disabled by reflection failure: $e") + None + } + } + + private def optionalResult(value: Any): Option[Any] = value match { + case null => None + case option: Option[_] => option + case option: JOptional[_] if option.isPresent => Some(option.get) + case _: JOptional[_] => None + case other => Some(other) + } + + private def findNoArgMethod( + clazz: Class[_], + methodName: String): Option[java.lang.reflect.Method] = { + var current = clazz + while (current != null) { + try { + val method = current.getDeclaredMethod(methodName) + method.setAccessible(true) + return Some(method) + } catch { + case _: NoSuchMethodException => + current = current.getSuperclass + case NonFatal(_) => + return None + } + } + None + } + + private def loadContribSupport: Option[AnyRef] = + loadClass(ContribSupportModule).flatMap { clazz => + try { + Some(clazz.getField("MODULE$").get(null).asInstanceOf[AnyRef]) + } catch { + case NonFatal(_) => None + } + } + + private def loadClass(className: String): Option[Class[_]] = { + try { + val classLoader = Thread.currentThread().getContextClassLoader + // scalastyle:off classforname + val clazz = + if (classLoader != null) { + Class.forName(className, false, classLoader) + } else { + Class.forName(className) + } + // scalastyle:on classforname + Some(clazz) + } catch { + case _: ClassNotFoundException | _: NoClassDefFoundError => None + case NonFatal(e) => + logDebug(s"Unable to load optional class $className", e) + None + } + } +} diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index f2ed7920c2..5a89141d5b 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -45,6 +45,7 @@ import org.apache.comet.CometConf._ import org.apache.comet.CometSparkSessionExtensions.{isCometLoaded, isSpark35Plus, withFallbackReason, withFallbackReasons} import org.apache.comet.DataTypeSupport.isComplexType import org.apache.comet.iceberg.{CometIcebergNativeScanMetadata, IcebergReflection} +import org.apache.comet.lance.LanceIntegration import org.apache.comet.objectstore.NativeConfig import org.apache.comet.parquet.CometParquetUtils.{encryptionEnabled, isEncryptionConfigSupported} import org.apache.comet.serde.operator.{CometIcebergNativeScan, CometNativeScan} @@ -293,6 +294,9 @@ case class CometScanRule(session: SparkSession) withFallbackReasons(scanExec, fallbackReasons.toSet) } + case _ if LanceIntegration.isLanceScan(scanExec.scan) => + LanceIntegration.tryCreateNativeScan(scanExec).getOrElse(scanExec) + // Iceberg scan - detected by class name. SparkStagedScan covers reads issued by // RewriteDataFiles (and similar maintenance actions) where the planner has already // staged FileScanTasks via ScanTaskSetManager. diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometLanceNativeScanLike.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometLanceNativeScanLike.scala new file mode 100644 index 0000000000..72b5326b7b --- /dev/null +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometLanceNativeScanLike.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.comet + +private[comet] trait CometLanceNativeScanLike extends CometLeafExec { + def sourceKey: String + + def commonData: Array[Byte] + + def perPartitionData: Array[Array[Byte]] +} diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala index 53b09e92b3..361ad2ea83 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala @@ -86,6 +86,7 @@ private[comet] object PlanDataInjector { // Registry of injectors for different operator types private val injectors: Seq[PlanDataInjector] = Seq( IcebergPlanDataInjector, + LancePlanDataInjector, NativeScanPlanDataInjector // Future: DeltaPlanDataInjector, HudiPlanDataInjector, etc. ) @@ -235,6 +236,34 @@ private[comet] object NativeScanPlanDataInjector extends PlanDataInjector { } } +/** + * Injector for LanceScan operators. + */ +private[comet] object LancePlanDataInjector extends PlanDataInjector { + + override def canInject(op: Operator): Boolean = + op.hasLanceScan && + op.getLanceScan.hasCommon && + !op.getLanceScan.hasPartition + + override def getKey(op: Operator): Option[String] = + Some(op.getLanceScan.getCommon.getScanId) + + override def inject( + op: Operator, + commonBytes: Array[Byte], + partitionBytes: Array[Byte]): Operator = { + val common = OperatorOuterClass.LanceScanCommon.parseFrom(commonBytes) + val partitionOnly = OperatorOuterClass.LanceScan.parseFrom(partitionBytes) + + val scanBuilder = OperatorOuterClass.LanceScan.newBuilder() + scanBuilder.setCommon(common) + scanBuilder.setPartition(partitionOnly.getPartition) + + op.toBuilder.setLanceScan(scanBuilder).build() + } +} + /** * A Comet physical operator */ @@ -685,6 +714,7 @@ abstract class CometNativeExec extends CometExec { * - CometScanExec - Comet scan node * - CometBatchScanExec - Comet scan node * - CometIcebergNativeScanExec - Native Iceberg scan node + * - CometLanceNativeScanLike - Native Lance scan node * - ShuffleQueryStageExec - AQE shuffle stage node on top of Comet shuffle * - AQEShuffleReadExec - AQE shuffle read node on top of Comet shuffle * - CometShuffleExchangeExec - Comet shuffle exchange node @@ -699,11 +729,11 @@ abstract class CometNativeExec extends CometExec { def foreachUntilCometInput(plan: SparkPlan)(func: SparkPlan => Unit): Unit = { plan match { case _: CometNativeScanExec | _: CometScanExec | _: CometBatchScanExec | - _: CometIcebergNativeScanExec | _: CometCsvNativeScanExec | _: ShuffleQueryStageExec | - _: AQEShuffleReadExec | _: CometShuffleExchangeExec | _: CometUnionExec | - _: CometTakeOrderedAndProjectExec | _: CometCoalesceExec | _: ReusedExchangeExec | - _: CometBroadcastExchangeExec | _: BroadcastQueryStageExec | - _: CometSparkToColumnarExec | _: CometLocalTableScanExec => + _: CometIcebergNativeScanExec | _: CometLanceNativeScanLike | + _: CometCsvNativeScanExec | _: ShuffleQueryStageExec | _: AQEShuffleReadExec | + _: CometShuffleExchangeExec | _: CometUnionExec | _: CometTakeOrderedAndProjectExec | + _: CometCoalesceExec | _: ReusedExchangeExec | _: CometBroadcastExchangeExec | + _: BroadcastQueryStageExec | _: CometSparkToColumnarExec | _: CometLocalTableScanExec => func(plan) case _: CometPlan => // Other Comet operators, continue to traverse the tree. @@ -772,6 +802,10 @@ abstract class CometNativeExec extends CometExec { Map(nativeScan.sourceKey -> nativeScan.commonData), Map(nativeScan.sourceKey -> nativeScan.perPartitionData)) + case lance: CometLanceNativeScanLike => + lance.ensureSubqueriesResolved() + (Map(lance.sourceKey -> lance.commonData), Map(lance.sourceKey -> lance.perPartitionData)) + // Broadcast stages are boundaries - don't collect per-partition data from inside them. // After DPP filtering, broadcast scans may have different partition counts than the // probe side, causing ArrayIndexOutOfBoundsException in CometExecRDD.getPartitions. diff --git a/spark/src/test/scala/org/apache/comet/rules/CometScanRuleSuite.scala b/spark/src/test/scala/org/apache/comet/rules/CometScanRuleSuite.scala index f444fe62c9..a3eee6e424 100644 --- a/spark/src/test/scala/org/apache/comet/rules/CometScanRuleSuite.scala +++ b/spark/src/test/scala/org/apache/comet/rules/CometScanRuleSuite.scala @@ -19,7 +19,11 @@ package org.apache.comet.rules +import java.util.{Arrays, LinkedHashMap, Optional => JOptional} + +import scala.jdk.CollectionConverters._ import scala.util.Random +import scala.util.Try import org.apache.spark.sql._ import org.apache.spark.sql.comet._ @@ -28,6 +32,8 @@ import org.apache.spark.sql.execution.adaptive.QueryStageExec import org.apache.spark.sql.types.{DataTypes, StructField, StructType} import org.apache.comet.CometConf +import org.apache.comet.lance.LanceIntegration +import org.apache.comet.serde.OperatorOuterClass import org.apache.comet.testing.{DataGenOptions, FuzzDataGenerator} /** @@ -134,4 +140,201 @@ class CometScanRuleSuite extends CometTestBase { } } + test("Lance native scan config defaults to disabled") { + assert(!CometConf.COMET_LANCE_NATIVE_ENABLED.get()) + } + + test("LanceIntegration nativeScanPlan reflection handles fallback paths") { + class WithNativeScanPlan { + def nativeScanPlan(): String = "native-plan" + } + class WithJavaOptionalNativeScanPlan { + def nativeScanPlan(): JOptional[String] = JOptional.of("native-plan") + } + class WithEmptyJavaOptionalNativeScanPlan { + def nativeScanPlan(): JOptional[String] = JOptional.empty() + } + class WithScalaOptionNativeScanPlan { + def nativeScanPlan(): Option[String] = Some("native-plan") + } + class WithoutNativeScanPlan + class ThrowingNativeScanPlan { + def nativeScanPlan(): String = throw new IllegalStateException("boom") + } + + val cases = Seq( + ("present method", new WithNativeScanPlan, Some("native-plan")), + ("java optional method", new WithJavaOptionalNativeScanPlan, Some("native-plan")), + ("empty java optional method", new WithEmptyJavaOptionalNativeScanPlan, None), + ("scala option method", new WithScalaOptionNativeScanPlan, Some("native-plan")), + ("missing method", new WithoutNativeScanPlan, None), + ("throwing method", new ThrowingNativeScanPlan, None)) + + cases.foreach { case (name, scan, expected) => + assert( + LanceIntegration.invokeNativeScanPlan(scan).map(_.toString) == expected, + s"unexpected reflection result for $name") + } + + val nonLanceScan = new WithNativeScanPlan + assert(!LanceIntegration.isLanceScan(nonLanceScan)) + assert(LanceIntegration.nativeScanPlan(nonLanceScan).isEmpty) + } + + test("Lance native scan serde reflects descriptor common fields and split fragments") { + val serde = loadContribLanceSerde.getOrElse { + cancel("contrib-lance profile is not enabled") + } + + val requiredSchema = StructType( + Seq( + StructField("id", DataTypes.IntegerType, nullable = false), + StructField("name", DataTypes.StringType, nullable = true))) + val projectedSchema = StructType(Seq(StructField("id", DataTypes.IntegerType, false))) + val storageOptions = new LinkedHashMap[String, String]() + storageOptions.put("region", "us-west-2") + storageOptions.put("endpoint", "http://127.0.0.1:9000") + + val descriptor = new FakeLanceNativeScanPlan( + descriptorVersion = 1, + scanId = "scan-123", + datasetUri = "s3://bucket/table.lance", + resolvedVersion = 42L, + sparkReadSchemaJson = requiredSchema.json, + projectedReadSchemaJson = projectedSchema.json, + pushedFilterSql = Some("id > 10"), + limit = Some(100L), + offset = Some(5L), + batchSize = 4096, + storageOptions = storageOptions, + splits = Arrays.asList( + new FakeLanceNativeScanSplit(0, Arrays.asList(Int.box(7), Int.box(8))), + new FakeLanceNativeScanSplit(1, Arrays.asList(Int.box(9))))) + + val (common, partitions) = + serializeFakeLanceDescriptor(serde, descriptor, "fallback-scan", requiredSchema) + + assert(common.getScanId == "scan-123") + assert(common.getDatasetUri == "s3://bucket/table.lance") + assert(common.getResolvedVersion == 42L) + assert(common.getDescriptorVersion == 1) + assert(common.getBatchSize == 4096) + assert(common.getNativeScanPlanClass.contains("FakeLanceNativeScanPlan")) + assert(common.getStorageOptionsMap.get("region") == "us-west-2") + assert(common.getStorageOptionsMap.get("endpoint") == "http://127.0.0.1:9000") + assert(common.getRequiredSchemaList.asScala.map(_.getName) == Seq("id", "name")) + assert(common.getProjectedSchemaList.asScala.map(_.getName) == Seq("id")) + assert(common.hasFilterSql) + assert(common.getFilterSql == "id > 10") + assert(common.hasLimit) + assert(common.getLimit == 100L) + assert(common.hasOffset) + assert(common.getOffset == 5L) + + assert(partitions.length == 2) + assert(partitions(0).getPartition.getPartitionIndex == 0) + assert(partitions(0).getPartition.getFragmentIdsList.asScala.map(_.intValue()) == Seq(7, 8)) + assert(partitions(1).getPartition.getPartitionIndex == 1) + assert(partitions(1).getPartition.getFragmentIdsList.asScala.map(_.intValue()) == Seq(9)) + } + + test("Lance native scan serde leaves absent optional pushdowns unset") { + val serde = loadContribLanceSerde.getOrElse { + cancel("contrib-lance profile is not enabled") + } + + val schema = StructType(Seq(StructField("id", DataTypes.IntegerType, nullable = true))) + val descriptor = new FakeLanceNativeScanPlan( + descriptorVersion = 1, + scanId = "", + datasetUri = "/tmp/table.lance", + resolvedVersion = 3L, + sparkReadSchemaJson = schema.json, + projectedReadSchemaJson = schema.json, + pushedFilterSql = None, + limit = None, + offset = None, + batchSize = 1024, + storageOptions = new LinkedHashMap[String, String](), + splits = Arrays.asList(new FakeLanceNativeScanSplit(0, Arrays.asList(Int.box(1))))) + + val (common, partitions) = + serializeFakeLanceDescriptor(serde, descriptor, "fallback-scan", schema) + + assert(common.getScanId == "fallback-scan") + assert(!common.hasFilterSql) + assert(!common.hasLimit) + assert(!common.hasOffset) + assert(partitions.length == 1) + assert(partitions.head.getPartition.getFragmentIdsList.asScala.map(_.intValue()) == Seq(1)) + } + + private def loadContribLanceSerde: Option[AnyRef] = + Try { + Class + .forName("org.apache.comet.serde.operator.CometLanceNativeScan$") + .getField("MODULE$") + .get(null) + .asInstanceOf[AnyRef] + }.toOption + + private def serializeFakeLanceDescriptor( + serde: AnyRef, + descriptor: AnyRef, + fallbackScanId: String, + fallbackRequiredSchema: StructType) + : (OperatorOuterClass.LanceScanCommon, Array[OperatorOuterClass.LanceScan]) = { + val method = serde.getClass.getMethods + .find(method => + method.getName == "serializeNativePlan" && method.getParameterTypes.length == 3) + .getOrElse { + throw new AssertionError("CometLanceNativeScan.serializeNativePlan was not found") + } + + val serialized = method + .invoke(serde, descriptor, fallbackScanId, fallbackRequiredSchema) + .asInstanceOf[Product] + val commonBytes = serialized.productElement(0).asInstanceOf[Array[Byte]] + val partitionBytes = serialized.productElement(1).asInstanceOf[Array[Array[Byte]]] + + ( + OperatorOuterClass.LanceScanCommon.parseFrom(commonBytes), + partitionBytes.map(OperatorOuterClass.LanceScan.parseFrom)) + } + + private class FakeLanceNativeScanPlan( + descriptorVersion: Int, + scanId: String, + datasetUri: String, + resolvedVersion: Long, + sparkReadSchemaJson: String, + projectedReadSchemaJson: String, + pushedFilterSql: Option[String], + limit: Option[Long], + offset: Option[Long], + batchSize: Int, + storageOptions: java.util.Map[String, String], + splits: java.util.List[FakeLanceNativeScanSplit]) { + def getDescriptorVersion(): Int = descriptorVersion + def getScanId(): String = scanId + def getDatasetUri(): String = datasetUri + def getResolvedVersion(): Long = resolvedVersion + def getSparkReadSchemaJson(): String = sparkReadSchemaJson + def getProjectedReadSchemaJson(): String = projectedReadSchemaJson + def hasPushedFilterSql(): Boolean = pushedFilterSql.isDefined + def getPushedFilterSql(): String = pushedFilterSql.get + def hasLimit(): Boolean = limit.isDefined + def getLimit(): Long = limit.get + def hasOffset(): Boolean = offset.isDefined + def getOffset(): Long = offset.get + def getBatchSize(): Int = batchSize + def getStorageOptions(): java.util.Map[String, String] = storageOptions + def getSplits(): java.util.List[FakeLanceNativeScanSplit] = splits + } + + private class FakeLanceNativeScanSplit(splitIndex: Int, fragmentIds: java.util.List[Integer]) { + def getSplitIndex(): Int = splitIndex + def getFragmentIds(): java.util.List[Integer] = fragmentIds + } + }