Replace streams Sequence/Flow/Reaktive backends with a round-based Step engine#2740
Draft
slisson wants to merge 7 commits into
Draft
Replace streams Sequence/Flow/Reaktive backends with a round-based Step engine#2740slisson wants to merge 7 commits into
slisson wants to merge 7 commits into
Conversation
…ased Step engine
The streams module previously carried three interchangeable backends (Sequence,
Flow, Reaktive) plus a deferred builder. Reaktive existed solely to provide the
push semantics needed for bulk-request batching: collecting a set of independent
data requests before forcing any of them.
That batching requirement is an *applicative* property and can be satisfied
structurally by a single round-based interpreter, removing the need for three
backends and the third-party Reaktive dependency.
This change:
- Adds streams2: a standalone, dependency-free reference implementation of the
Step-based engine (prototype + tests + README).
- Ports the engine into the streams module behind its existing public API:
- engine/StepEngine.kt: Step IR, applicative/monadic combinators, blocking +
suspending round drivers, fetch leaves, async leaves (fromFlow/
singleFromCoroutine).
- StreamImpl.kt: one backing impl for every IStream cardinality, plus
CompletableImpl and the single StreamBuilderImpl.
- SimpleStreamExecutor / BlockingStreamExecutor / BulkRequestStreamExecutor are
now thin wrappers over the driver; enqueue() is a fetch leaf and batching is
structural (per source, per round).
- Deletes the four backend builders, the concrete stream classes, the Reaktive
helpers, and the Reaktive dependency.
The public API (IStream.*, IStreamBuilder, IStreamExecutor, IExecutableStream,
IStreamInternal, IBulkExecutor, BulkRequestStreamExecutor, all operators) is
unchanged. No downstream module required API-driven changes; the only other edits
remove dead code (unused Reaktive imports/helpers in model-api and
modelql-untyped) that compiled only via the former transitive Reaktive export.
Verified: streams compiles JVM+JS and tests pass; datastructures and
model-datastructure compile untouched and their suites pass; modelql-core/-untyped
tests pass; model-client/model-server/modelql-* compile; model-server
LazyLoadingTest (batching/access-pattern correctness) passes.
See streams-redesign.md and streams2/README.md for details and tradeoffs.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
4c9c626 to
e7b72ae
Compare
The streams2 module duplicated both the IStream interface hierarchy and the Step engine. Since the engine was already ported into streams in a more complete form (Pending with async leaves, recover/doOnError, the suspending driver, batch chunking) and nothing depended on streams2, the separate module was pure redundancy. - Remove the streams2 module and its settings.gradle.kts registration. - Keep its batching/dedup/stack-safety tests, rewritten against the real BulkRequestStreamExecutor / enqueue API (BulkRequestBatchingTest). - Add streams/README.md with the design overview (previously in streams2/README). - Update streams-redesign.md to describe the prototype-then-merge history. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Contributor
Test Results 265 files 265 suites 43m 29s ⏱️ Results for commit fbe71a6. ♻️ This comment has been updated with latest results. |
…ource
The IStreamExecutor passed to getBlocking/getSuspending/iterate*/execute* no
longer carried any batching context: fetch leaves carry their own IBulkExecutor
source and the per-run Execution provides caching/dedup. Two changes remove the
residual coupling.
B. Batch size on the source: IBulkExecutor now declares `val batchSize`
(default DEFAULT_BULK_REQUEST_BATCH_SIZE = 5000). The round driver chunks each
source's keys to its own batchSize and no longer takes a batch-size parameter.
BulkRequestStreamExecutor(source, batchSize) still works (it exposes the
constructor batch size on the source the leaves bind to).
A. Executor-less terminals: add getBlocking()/getSuspending()/iterateBlocking{}/
iterateSuspending{}/executeBlocking()/executeSuspending(); deprecate the
executor-taking overloads with ReplaceWith, keeping their original behavior for
compatibility. All ~180 in-repo terminal call sites migrated to the
executor-less form.
IStreamExecutor / IStreamExecutorProvider and BulkRequestStreamExecutor.enqueue
remain (enqueue creates fetch leaves; CONTEXT is still set during
BulkRequestStreamExecutor runs because ModelQL resolves the current executor via
IStreamExecutor.getInstance()).
Verified: streams JVM+JS compile and tests pass; datastructures and
model-datastructure compile + full suites pass; model-client (JVM+JS),
model-server, modelql-* compile; model-server LazyLoadingTest (batching
correctness) passes.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
cached() was a no-op, which broke ModelQLClientTest.testCaching: a query step consumed by multiple branches (with a side-effecting setProperty) was evaluated once per consumer instead of once. Implement real multicast in the engine: cached() resolves its inner stream through a shared MemoCell stored per-Execution and keyed by a stable token. Every consumer gets a view of the same cell, and the cell's inner step is advanced at most once per round, so the underlying work and any side effects run exactly once and the result is shared. Pending.union now dedupes async leaves by token so a memoized leaf shared across branches isn't run twice in a round. Verified: new CachedStreamTest (side-effect-once across consumers, including via zip); ModelQLClientTest passes (16/16, incl. testCaching); datastructures, model-datastructure, modelql-core/-untyped suites pass. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
aade9f4 to
051474e
Compare
Contributor
sarif-multitool 5.x removed --merge-runs; merging runs by tool is now the default merge behavior, so the flag caused an unknown-option error. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
7440151 to
23790a0
Compare
Add frequently-needed operators to IStream as extension functions composed from existing engine primitives (no StreamImpl changes): boolean reductions (any/all/none), filterNot/filterIsInstance/ filterIndexed, first/last accessors, mapIndexed, collection conversions (toSet/groupBy/associateBy/associateWith/toMap), sorting, distinctBy, reduce/sum/sumOf/min-maxBy, startWith/endWith, joinToString. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
284bb68 to
30f7a44
Compare
Rework the Step engine from a Blocked/Pending/resume lockstep model to a lazily-evaluated node graph (Done/FetchStep/MapStep/FlatMapStep/ZipStep/ FanStep/AsyncStep/Recover/OnError/MemoStep) driven by a depth-first walk. Each round collects the first-unmet request of every pending branch into a shared pool capped at the source's batchSize, and FanStep builds its children lazily (left-to-right, dropped once resolved). The peak request frontier is therefore <= batchSize regardless of how wide a tree level is, restoring the property the old BulkRequestStreamExecutor.RequestQueue.sendNextBatch provided (a depth-first traversal that bounds the live request set). Sibling requests still share a round when they fit under the cap, so applicative batching is preserved; an unbounded cap collapses to one round per level. Combinators evaluate eagerly when their inputs are already Done, so a synchronous prefix and its side effects run at build time in build order. Some call sites depend on this (e.g. HamtLeafNode.getChanges sets a var in one synchronous stream and reads it in a later deferZeroOrOne); fully-lazy evaluation broke tree diffing until eager-on-Done was restored. The public IStream API and combinator signatures are unchanged; only flatMapOrdered/flatMapZeroOrOne switch to the new lazy fanOut, and the driver functions became Execution members. Tests: FrontierSizeTest asserts the frontier stays within batchSize on a wide+deep tree (widest level 16384, cap 1000) and collapses to one round per level when uncapped. HamtGetChangesTest adds direct coverage of the HAMT diff (guards the eager-evaluation dependency). Validated across streams (JVM + JS), datastructures, model-datastructure, modelql-core/untyped/typed, model-api, and bulk-model-sync-lib. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What
Replaces the
streamsmodule's three interchangeable backends (Sequence, Flow, Reaktive) and deferred builder with a single round-basedStepinterpreter, and removes the Reaktive third-party dependency — while keeping the entire public API unchanged.A new
streams2module is added as a standalone, dependency-free reference implementation of the engine (with its own tests and README). The same design is then ported intostreamsto back its richer legacy API.Why
streamscarried three backends because of one capability plain coroutines/Flowcan't give cheaply: automatic bulk-request batching for lazy loading of large content-addressed models. Reaktive was there only for the push semantics that let the runtime collect a set of independent requests before forcing any of them.That requirement is an applicative property (independent branches → same batch round;
flatMapdependency → next round). It can be satisfied structurally by one interpreter, eliminating the three backends and the dependency. Prior art: Haxl, ZIO Query, Stitch.How
engine/StepEngine.kt—StepIR (Done/Blocked/Failed), applicative/monadic combinators, blocking + suspending round drivers, fetch leaves, and async leaves (forfromFlow/singleFromCoroutine).StreamImpl.kt— one backing impl for everyIStreamcardinality, plusCompletableImpland the singleStreamBuilderImpl.SimpleStreamExecutor/BlockingStreamExecutor/BulkRequestStreamExecutorare now thin wrappers over the driver.enqueue(key)is a fetch leaf; batching is structural (per source, per round) rather than tied to an executor-context queue.API compatibility / migration
The public surface (
IStream.*,IStreamBuilder,IStreamExecutor,IExecutableStream,IStreamInternal,IBulkExecutor,BulkRequestStreamExecutor, all operators/extensions) is unchanged. No downstream module needed API-driven changes. The only other edits remove dead code — unused Reaktive imports in threemodelql-untypedfiles and two unused private helpers inNodeAsAsyncNode.kt— which compiled solely becausestreamsused to re-export Reaktive viaapi.Verification
streamscompiles JVM + JS; unit tests pass.datastructuresandmodel-datastructurecompile untouched; full JVM suites pass.modelql-core/modelql-untypedtests pass.model-client,model-server,model-server-api,modelql-typed/-html/-client/-server,bulk-model-sync-libcompile (JVM + JS where applicable).model-serverLazyLoadingTestpasses — the access-pattern / batching-correctness test, confirming structural batching reproduces the old Reaktive collect-and-batch behavior.Reviewer notes — behavioral tradeoffs (intentional, "no incremental emission")
iterate/iterateSuspendingnow fully materialize before visiting (Reaktive previously drained between batches). Raises peak memory for very large server-side iterations; clean follow-up is per-round streaming in just theiterate*drivers.cached()is currently a no-op — fetch-level dedup (the expensive part) is handled by the per-run cache; only pure-recompute memoization is lost.take/skipoperate on materialized results (don't prune upstream fetches).SimpleStreamExecutornow also batches per source/round → strictly fewer round-trips, never more.See
streams-redesign.md(repo root) andstreams2/README.mdfor the full design writeup and known limitations.🤖 Generated with Claude Code