moq-lite-05: move immutable track props to a Track Stream (TRACK_INFO) + model info()#1609
moq-lite-05: move immutable track props to a Track Stream (TRACK_INFO) + model info()#1609kixelated wants to merge 8 commits into
Conversation
Replace the per-response publisher metadata in SUBSCRIBE_OK with a dedicated, on-demand Track Stream, per moq-dev/drafts#25. Scoped to the WIP Lite05 version; Lite01-04 keep SUBSCRIBE_OK unchanged. - New Track Stream (0x6): a TRACK request (broadcast path + track name) answered with a single TRACK_INFO carrying the immutable publisher properties (Priority, Ordered, Cache, Timescale, Compression), then a FIN (or reset on error / missing track). - Removed the static props (compression/timescale/cache) from SUBSCRIBE_OK; on Lite05 a subscription is accepted implicitly (rejection is a reset) and the publisher sends nothing on the subscribe stream. - Subscriber flights TRACK and SUBSCRIBE in parallel, so the first group still arrives in one round trip. A pending TrackEntry is inserted before SUBSCRIBE, so group streams that race ahead of TRACK_INFO park on a resolved channel (buffered by QUIC flow control) instead of being dropped. The resolved (producer, compression, timescale) is reused for every group's decode instead of being re-derived per response, and is fetched once for the upstream subscription's lifetime (linger). Publisher resolves TRACK_INFO by subscribing to read the track's .info and dropping the subscription; a parallel SUBSCRIBE coalesces onto the same upstream producer. Priority/Ordered are sent as 0/false for now since the model Track carries no publisher priority/order field yet. Not included (no functional gap in the Rust impl, which never resolves a group range or emits drops): SUBSCRIBE_START/SUBSCRIBE_END and the SUBSCRIBE_DROP renumber. Cross-package sync to js/net and doc/concept is also deferred. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
bc017a9 to
0ba9972
Compare
| // alive across this brief gap. | ||
| let track = broadcast | ||
| .consume_track(track_name) | ||
| .subscribe(crate::Subscription::default()) |
There was a problem hiding this comment.
We would need a separate info().await method right? We definitely don't want to subscribe just to get this Track info.
There was a problem hiding this comment.
Oh or consume_track would have to be async. That's probably cleaner, but it also introduces an RTT I'd like to avoid. Pretty gross.
There was a problem hiding this comment.
Done — added a TrackConsumer::info() method to the model (commit f37b862). recv_track now calls consume_track(name).info().await, no subscription. Under the hood it is a dynamic info-request channel mirroring requested_track/TrackRequest (requested_info() resolves an InfoRequest with the Track), with a track_info cache. Warm (a live producer exists, or it is cached) resolves with 0 RTT; only a cold relay lookup costs one upstream TRACK fetch. accept warms the cache, so a SUBSCRIBE and a parallel TRACK coalesce, and group-by-group FETCH (no track-level producer) reuses the one lookup. (Written by Claude)
There was a problem hiding this comment.
Avoided the consume_track async + forced RTT. info() is the async bit, but it only hits the wire on a cold relay lookup; the origin (live producer) and any cached/warm case resolve synchronously. So no RTT in the common path. (Written by Claude)
| let broadcast = self.origin.get_broadcast(&req.broadcast); | ||
| let version = self.version; | ||
|
|
||
| web_async::spawn(async move { |
There was a problem hiding this comment.
Do we need this spawn? I forget why recv_track is async but I think we can inline run_track_info
There was a problem hiding this comment.
Kept the spawn, with a comment explaining why. recv_track is async only so run() can await it to decode the first message (same as recv_subscribe). The body has to stay spawned: resolving info can be a cold upstream TRACK fetch (relay case), and the publisher's accept loop in run() must stay free to handle other control streams meanwhile — inlining would head-of-line block it. (Written by Claude)
…cribe
Addresses PR review: the publisher must not open a subscription just to learn
a track's immutable properties. Add a first-class info path to the model and
route every property lookup (publisher TRACK reply, subscriber SUBSCRIBE) and
the cache through it.
Model (broadcast.rs):
- TrackConsumer::info() -> InfoPending: resolves a track's immutable Track
(timescale/compression/cache) without subscribing. Warm (a live producer
exists, or the value is cached) it resolves with no round trip; cold it
queues a dynamic info request.
- New dynamic info-request channel mirroring requested_track/TrackRequest:
BroadcastDynamic::requested_info() -> InfoRequest::resolve(Track)/deny, plus
a combined requested() -> DynamicRequest{Track,Info} so one loop serves both.
- track_info cache keyed by name (a re-announce replaces the broadcast and
State, invalidating it). TrackRequest::accept warms it, so a subscribe and a
concurrent TRACK coalesce. Group-by-group fetches (which keep no track-level
producer) reuse the one cached lookup.
Publisher: recv_track now calls consume_track(name).info().await instead of
subscribing-and-dropping. The spawn stays (a cold relay lookup is an upstream
round trip; the accept loop must not block on it).
Subscriber: the relay serves downstream info requests (run_info) by fetching
TRACK_INFO upstream and caching it; its own lite-05 SUBSCRIBE path now resolves
props through info() too, so a downstream's parallel TRACK + SUBSCRIBE collapse
to a single upstream TRACK fetch.
Tests cover warm/cold/coalesced/NotFound info() and accept-warms-cache.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Resolves a track's immutable properties via a TRACK stream (info(), no subscription), then subscribes and reads a frame. Guards the on-demand info path end-to-end ahead of the model refactor. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…quest Per review: collapse the separate InfoRequest/DynamicRequest channel into a single demand-driven request. info() and subscribe() now coalesce onto one PendingRequest, so a downstream's parallel TRACK + SUBSCRIBE for a track triggers exactly one upstream fetch (not two). Model (broadcast.rs): - TrackConsumer::info() coalesces onto the same dynamic request as subscribe (a new info-waiter list on PendingRequest); a single TrackRequest::accept resolves both the subscribers and the info waiters and caches the Track. - TrackRequest::subscription() is now Option: None means a pure info() request with no group demand, so the handler resolves info without subscribing. - BroadcastDynamic::cached_info() lets a handler skip the upstream TRACK fetch when the props are already known. Removed InfoRequest, DynamicRequest, requested_info(), requested(), and the parallel info_requests state. Relay subscriber (subscriber.rs): - run_subscribe is demand-gated: it resolves props (cached or one upstream TRACK fetch) and opens the upstream SUBSCRIBE only when there's group demand, flighting it alongside the info fetch so a fresh subscribe stays one round trip. A pure info() request resolves + caches the props and opens no subscription; if a subscriber coalesces meanwhile, it subscribes then. The linger lifecycle is factored into serve_lifecycle and reused. Publisher recv_track is unchanged (still consume_track(name).info().await); it now drives the unified request. Tested end-to-end: rs/moq-native broadcast tests cover info() + subscribe over a real session (cold fetch, then cached reuse), plus the existing timestamp round-trips. Model unit tests cover warm/cold-coalesced/NotFound/accept-warms. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Per review, drop the pending-vs-resolved split: consume_track() returns a single TrackConsumer (Clone) that Derefs to a name-only Track (so `.name` works) and exposes async info() / subscribe(). Every other track property is unknown until info() resolves it (for a relay it comes from upstream), so there's no separate "resolved" handle and no ok()/poll_ok ceremony. The previous subscribe-future TrackPending is renamed SubscribePending (one moq-mux reference). Existing callers (consume_track(x).subscribe(None).await / .info().await) are unchanged. This is the consumer-side of the model redesign; the demand-side (TrackState shared by TrackRequest/TrackProducer, async subscription(), accept upgrading the request into the producer) is the follow-up. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
3baae30 to
3a8c8cd
Compare
| pub use stream::*; | ||
| pub use subscribe::*; | ||
| use subscriber::*; | ||
| #[allow(unused_imports)] |
| /// The publisher replies with a single [`TrackInfo`] and then FINs the stream, | ||
| /// or resets it on error (e.g. the track does not exist). Lite05+ only. | ||
| #[derive(Clone, Debug)] | ||
| pub struct Track<'a> { |
There was a problem hiding this comment.
We should probably call this something else, like TrackRequest IDK.
| // resolution instead of re-fetching upstream. Keyed by track name; a | ||
| // re-announce replaces the whole broadcast (and this State), so the cache is | ||
| // implicitly invalidated then. | ||
| track_info: HashMap<String, Track>, |
There was a problem hiding this comment.
I think this is wrong. We should instead cache via tracks or dedupe via requests.
| /// producer already exists or the info is cached, otherwise once the dynamic | ||
| /// handler accepts the coalesced request via [`TrackRequest::accept`]. Implements | ||
| /// [`Future`]; poll-based callers can use [`Self::poll_info`]. | ||
| pub struct InfoPending { |
There was a problem hiding this comment.
Definitely remove.
…iring-hawking-baf03d # Conflicts: # rs/moq-net/src/lite/publisher.rs
| /// The track name this handle is bound to. | ||
| pub fn name(&self) -> &str { | ||
| &self.name | ||
| impl Deref for TrackConsumer { |
There was a problem hiding this comment.
Please remove. Just make a .name() method instead of placeholders.
…TrackConsumer::info() Restart of the model per review. Reverts the lite-layer wire/relay work back to dev (Track stream, SUBSCRIBE_OK changes, demand loop) to get the model right first; only the model refactor remains. track.rs: - Split TrackProducer into a headless `TrackRequest` (the shared state + info-free methods: subscription/used/unused/closed/finish/abort/...) and `TrackProducer`, a wrapper of `TrackRequest` + `info: Track` that Derefs to the Track. `TrackRequest::accept(info)` upgrades a request into a producer. The public `TrackProducer` API is unchanged, so all callers compile as-is. broadcast.rs: - `TrackConsumer::info() -> InfoPending` (+ `poll_info()` / `Future`): resolves a track's immutable Track without subscribing. Warm via the live producer in `tracks`; cold it coalesces onto the same dynamic request as subscribers and resolves on `accept`. No separate info cache or channel. - Renamed the broadcast-level dynamic request `TrackRequest` -> `PendingTrack` (to free the name for the track-level headless producer); updated its two consumers in lite/ietf subscriber.rs (type name only). Held off on rewiring publisher/subscriber to the new model, as requested. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Re-adds the wire/relay implementation reverted in the model restart, now built on the split TrackRequest/TrackProducer model and the clean TrackConsumer::info(). - lite/track.rs, stream.rs (ControlType::Track), mod.rs, subscribe.rs: TRACK / TRACK_INFO messages and the SUBSCRIBE_OK slimming for lite-05, restored. - lite/publisher.rs: recv_track serves TRACK_INFO via consume_track(name).info(); skips SUBSCRIBE_OK on lite-05. - lite/subscriber.rs: lite-05 fetches TRACK_INFO upstream (flighted with SUBSCRIBE), demand-gated on PendingTrack::subscription() (info-only requests don't subscribe); resolve_props drops the removed cached_info and fetches once. - model/broadcast.rs: PendingTrack::subscription() now returns Option (None = info-only, no group demand) so the relay can gate the upstream subscribe. Tested: 359 model + 57 moq-native e2e (incl. lite05 info() + timestamp round-trips), clippy + fmt clean, relay/hang/mux build. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
What
Implements moq-dev/drafts#25 in
rs/moq-net: move a track's immutable publisher properties out ofSUBSCRIBE_OKonto a dedicated, on-demand Track Stream, backed by a first-class modelinfo()path. Scoped to the WIPLite05version;Lite01-Lite04keepSUBSCRIBE_OKunchanged.Why
Priority/Ordered/Cache/Timescale/Compressionare fixed for a track's lifetime, yet were echoed on every response. The real payoff is FETCH (#1601): group-by-group history catch-up otherwise repeats aFetchOk(compression+timescale) on every group. A cachedTRACK_INFOlets a single lookup serve every SUBSCRIBE and FETCH of a track.Model:
TrackConsumer::info()(the key piece)The publisher must not subscribe just to learn properties (per review). Added a first-class info path to
model/broadcast.rs:TrackConsumer::info() -> InfoPendingresolves a track's immutableTrackwithout subscribing. Warm (a live producer exists, or the value is cached) → resolves with 0 RTT; cold → queues a dynamic info request.requested_track/TrackRequest:BroadcastDynamic::requested_info() -> InfoRequest::resolve(Track)/deny, plus a combinedrequested() -> DynamicRequest{Track,Info}so one relay loop serves both.track_infocache keyed by name (a re-announce replaces the broadcast +State, invalidating it).TrackRequest::acceptwarms it, so a SUBSCRIBE and a concurrent TRACK coalesce; group-by-group FETCH (no track-level producer) reuses the one lookup.Wire + lite layer
0x6) (lite/stream.rs,lite/track.rs):TRACKrequest → singleTRACK_INFO(Priority,Ordered,Cache,Timescale,Compression) → FIN, or reset on error / missing track. Roundtrip tests.SUBSCRIBE_OKslimmed (lite/subscribe.rs): static props removed. On Lite05 a subscription is accepted implicitly (rejection = reset); the publisher sends nothing on the subscribe stream.lite/publisher.rs):recv_track→consume_track(name).info().await→ replyTRACK_INFO. No subscribe-to-peek. The spawn stays so a cold relay lookup doesn't head-of-line blockrun()'s accept loop.lite/subscriber.rs): the relay serves downstream info requests (run_info) by fetchingTRACK_INFOupstream and caching it. Its own Lite05 SUBSCRIBE path resolves props throughinfo()too, so a downstream's parallel TRACK + SUBSCRIBE collapse to one upstream TRACK fetch and the first group still arrives in one round trip (groups that race ahead buffer onresolved).Coordination with #1601 (FETCH)
#1601 currently adds
FetchOk(per-FETCH compression+timescale) — exactly what #25 removes. Plan (agreed): landinfo()here; #1601 rebases to consumeinfo()for its timescale/compression and dropFetchOk, so group-by-group fetch reuses the cached lookup.Notes
Priority/Orderedare sent as0/false(the modelTrackhas no publisher priority/order field yet; oldSUBSCRIBE_OKechoed the subscriber's priority and hardcodedordered=false, so nothing real is lost).bool; the codec set is{none, deflate}, so it round-trips losslessly. A third codec would need the enum onTrack.SUBSCRIBE_START/SUBSCRIBE_ENDand theSUBSCRIBE_DROPrenumber (the impl never resolves a group range or emits drops — a wire-spec gap, not a functional one).js/net+doc/conceptdeferred.Test plan
cargo test -p moq-net(360 passing), incl.TRACK/TRACK_INFOroundtrips and modelinfo()tests (warm / cold-coalesced / NotFound / accept-warms-cache)cargo clippy -p moq-net --all-targetscleancargo fmt(pinned nix toolchain)cargo check -p moq-relay -p hang(downstream builds)(Written by Claude)