diff --git a/crates/rproxy/src/server/metrics.rs b/crates/rproxy/src/server/metrics.rs index 638846d..ba167ad 100644 --- a/crates/rproxy/src/server/metrics.rs +++ b/crates/rproxy/src/server/metrics.rs @@ -47,6 +47,8 @@ pub(crate) struct Metrics { pub(crate) http_proxy_success_count: Family, pub(crate) http_proxy_failure_count: Family, + pub(crate) proxy_oneshot_spawn_fallback_count: Family, + pub(crate) http_request_size: Family, pub(crate) http_response_size: Family, @@ -91,6 +93,8 @@ impl Metrics { http_proxy_success_count: Family::default(), http_proxy_failure_count: Family::default(), + proxy_oneshot_spawn_fallback_count: Family::default(), + http_request_size: Family::default(), http_response_size: Family::default(), @@ -191,6 +195,12 @@ impl Metrics { this.http_proxy_failure_count.clone(), ); + this.registry.register( + "proxy_oneshot_spawn_fallback_count", + "count of postprocess pairings that fell back to spawned await on the request oneshot", + this.proxy_oneshot_spawn_fallback_count.clone(), + ); + this.registry.register_with_unit( "http_request_size", "sizes of incoming http requests", diff --git a/crates/rproxy/src/server/proxy/http/proxy.rs b/crates/rproxy/src/server/proxy/http/proxy.rs index 131b928..5062f87 100644 --- a/crates/rproxy/src/server/proxy/http/proxy.rs +++ b/crates/rproxy/src/server/proxy/http/proxy.rs @@ -5,7 +5,6 @@ use std::{ mem, ops::Add, pin::Pin, - str::FromStr, sync::{ Arc, atomic::{AtomicI64, AtomicUsize, Ordering}, @@ -40,9 +39,10 @@ use bytes::Bytes; use futures::TryStreamExt; use futures_core::Stream; use pin_project::pin_project; +use prometheus_client::metrics::{counter::Counter, gauge::Gauge}; use scc::HashMap; use time::{UtcDateTime, format_description::well_known::Iso8601}; -use tokio::sync::broadcast; +use tokio::sync::{broadcast, oneshot}; use tracing::{debug, error, info, warn}; use url::Url; use uuid::Uuid; @@ -77,8 +77,20 @@ where shared: ProxyHttpSharedState, backend: ProxyHttpBackendEndpoint, - requests: HashMap, postprocessor: actix::Addr>, + + // Per-worker cached metric handles. These are cheap to clone (each + // wraps an `Arc` internally) and bypass the per-request + // `Family::get_or_create` lookup on the hot path. + in_flight_client: Gauge, + in_flight_backend: Gauge, + proxy_failure_count: Counter, + oneshot_spawn_fallback_count: Counter, + + // Per-worker cache of (user_agent -> Counter) so that we only pay the + // `Family::get_or_create` cost the first time a UA is seen on this + // worker. Lookup is `&str`-keyed (no allocation on hit). + client_info_cache: HashMap, } impl ProxyHttp @@ -135,7 +147,30 @@ where } .start(); - Self { id, shared, backend, requests: HashMap::default(), postprocessor } + let labels_proxy = LabelsProxy { proxy: P::name() }; + let in_flight_client = + shared.metrics.http_in_flight_requests_client.get_or_create(&labels_proxy).clone(); + let in_flight_backend = + shared.metrics.http_in_flight_requests_backend.get_or_create(&labels_proxy).clone(); + let proxy_failure_count = + shared.metrics.http_proxy_failure_count.get_or_create(&labels_proxy).clone(); + let oneshot_spawn_fallback_count = shared + .metrics + .proxy_oneshot_spawn_fallback_count + .get_or_create(&labels_proxy) + .clone(); + + Self { + id, + shared, + backend, + postprocessor, + in_flight_client, + in_flight_backend, + proxy_failure_count, + oneshot_spawn_fallback_count, + client_info_cache: HashMap::default(), + } } pub(crate) async fn run( @@ -358,13 +393,15 @@ where fn to_client_response(bknd_res: &ClientResponse) -> HttpResponseBuilder { let mut clnt_res = HttpResponse::build(bknd_res.status()); + // The backend's `HeaderName` instances are already validated by the + // http parser, so we forward them verbatim without round-tripping + // through `HeaderName::from_str` (which was showing up as + // surprisingly hot on engine_* call patterns). for (hkey, hval) in bknd_res.headers().iter() { if is_hop_by_hop_header(hkey) { continue; } - if let Ok(hkey) = header::HeaderName::from_str(hkey.as_str()) { - clnt_res.append_header((hkey, hval.clone())); - } + clnt_res.append_header((hkey.clone(), hval.clone())); } clnt_res @@ -398,19 +435,30 @@ where !user_agent.is_empty() && let Ok(user_agent) = user_agent.to_str() { - metrics - .client_info - .get_or_create(&LabelsProxyClientInfo { - proxy: P::name(), - user_agent: user_agent.to_string(), - }) - .inc(); + // Hot path: read-only lookup keyed by &str (no allocation). + // Cold path: allocate the owned String key and resolve the + // counter from the metrics family exactly once per worker per + // distinct UA. + if let Some(entry) = this.client_info_cache.read_sync(user_agent, |_, c| c.clone()) { + entry.inc(); + } else { + let counter = metrics + .client_info + .get_or_create(&LabelsProxyClientInfo { + proxy: P::name(), + user_agent: user_agent.to_string(), + }) + .clone(); + counter.inc(); + // Best-effort insert; races with another task on the same + // worker thread shouldn't happen given !Send actix workers, + // but tolerate insert errors regardless. + let _ = this.client_info_cache.insert_sync(user_agent.to_string(), counter); + } } - metrics - .http_in_flight_requests_client - .get_or_create(&LabelsProxy { proxy: P::name() }) - .inc(); + let in_flight_client = this.in_flight_client.clone(); + in_flight_client.inc(); let res = if this.shared.inner.might_intercept() { Self::send_to_backend_and_maybe_intercept(this, info, clnt_req_body, timestamp).await @@ -418,10 +466,7 @@ where Self::stream_to_backend(this, info, clnt_req_body, timestamp).await }; - metrics - .http_in_flight_requests_client - .get_or_create(&LabelsProxy { proxy: P::name() }) - .dec(); + in_flight_client.dec(); res } @@ -436,19 +481,20 @@ where let conn_id = info.conn_id; let bknd_req = this.backend.new_backend_request(&info); + // Oneshot hands the completed ProxiedHttpRequest from the + // request body's terminal poll to the response body's terminal + // poll, replacing the previous `proxy.requests` HashMap lookup. + let (req_tx, req_rx) = oneshot::channel::(); let bknd_req_body = ProxyHttpRequestBody::new( this.clone(), info, clnt_req_body, this.shared.config().prealloacated_request_buffer_size(), timestamp, + req_tx, ); - this.shared - .metrics - .http_in_flight_requests_backend - .get_or_create(&LabelsProxy { proxy: P::name() }) - .inc(); + this.in_flight_backend.inc(); #[cfg(debug_assertions)] debug!( @@ -477,16 +523,8 @@ where error = ?err, "Failed to proxy a request", ); - this.shared - .metrics - .http_in_flight_requests_backend - .get_or_create(&LabelsProxy { proxy: P::name() }) - .dec(); - this.shared - .metrics - .http_proxy_failure_count - .get_or_create(&LabelsProxy { proxy: P::name() }) - .inc(); + this.in_flight_backend.dec(); + this.proxy_failure_count.inc(); return Ok(HttpResponse::BadGateway().body(format!("Backend error: {err:?}"))); } }; @@ -503,13 +541,9 @@ where "Finished streaming http request to backend", ); - this.shared - .metrics - .http_in_flight_requests_backend - .get_or_create(&LabelsProxy { proxy: P::name() }) - .dec(); + this.in_flight_backend.dec(); - Self::stream_to_client(this, req_id, conn_id, bknd_res) + Self::stream_to_client(this, req_id, conn_id, bknd_res, req_rx) } async fn send_to_backend_and_maybe_intercept( @@ -532,11 +566,7 @@ where "Sending http request to backend...", ); - this.shared - .metrics - .http_in_flight_requests_backend - .get_or_create(&LabelsProxy { proxy: P::name() }) - .inc(); + this.in_flight_backend.inc(); let body = match clnt_req_body.to_bytes_limited(this.shared.config().max_request_size()).await { @@ -554,16 +584,8 @@ where error = ?err, "Failed to proxy a request", ); - this.shared - .metrics - .http_in_flight_requests_backend - .get_or_create(&LabelsProxy { proxy: P::name() }) - .dec(); - this.shared - .metrics - .http_proxy_failure_count - .get_or_create(&LabelsProxy { proxy: P::name() }) - .inc(); + this.in_flight_backend.dec(); + this.proxy_failure_count.inc(); return Ok(HttpResponse::BadGateway().body(format!("Backend error: {err:?}"))); } @@ -583,25 +605,13 @@ where error = ?err, "Failed to proxy a request", ); - this.shared - .metrics - .http_in_flight_requests_backend - .get_or_create(&LabelsProxy { proxy: P::name() }) - .dec(); - this.shared - .metrics - .http_proxy_failure_count - .get_or_create(&LabelsProxy { proxy: P::name() }) - .inc(); + this.in_flight_backend.dec(); + this.proxy_failure_count.inc(); return Ok(HttpResponse::PayloadTooLarge().finish()); } }; - this.shared - .metrics - .http_in_flight_requests_backend - .get_or_create(&LabelsProxy { proxy: P::name() }) - .dec(); + this.in_flight_backend.dec(); #[cfg(debug_assertions)] debug!( @@ -618,7 +628,8 @@ where let (decompressed_body, decompressed_size) = decompress(body.clone(), size, info.content_encoding()); - match serde_json::from_slice::(&decompressed_body) { + let jrpc_meta = match serde_json::from_slice::(&decompressed_body) + { Ok(jrpc) => { if let Some(res) = this.shared.inner.should_intercept(&jrpc) { let json_req = if this.shared.config().log_proxied_requests() { @@ -644,6 +655,9 @@ where return res; } + // Stash the parsed jrpc so `finalise_proxying` can avoid + // re-parsing the same body. + Some(Arc::new(jrpc)) } Err(err) => { @@ -657,8 +671,9 @@ where error = ?err, "Failed to parse json-rpc request", ); + None } - } + }; let req = ProxiedHttpRequest { info, @@ -668,15 +683,12 @@ where decompressed_size, start: timestamp, end: UtcDateTime::now(), + jrpc_meta, }; let bknd_req = this.backend.new_backend_request(&req.info); - this.shared - .metrics - .http_in_flight_requests_backend - .get_or_create(&LabelsProxy { proxy: P::name() }) - .inc(); + this.in_flight_backend.inc(); let bknd_res = match bknd_req.send_body(req.body.clone()).await { Ok(bknd_res) => bknd_res, @@ -692,29 +704,24 @@ where error = ?err, "Failed to proxy a request", ); - this.shared - .metrics - .http_proxy_failure_count - .get_or_create(&LabelsProxy { proxy: P::name() }) - .inc(); - this.shared - .metrics - .http_in_flight_requests_backend - .get_or_create(&LabelsProxy { proxy: P::name() }) - .dec(); + this.proxy_failure_count.inc(); + this.in_flight_backend.dec(); return Ok(HttpResponse::BadGateway().body(format!("Backend error: {err:?}"))); } }; - this.shared - .metrics - .http_in_flight_requests_backend - .get_or_create(&LabelsProxy { proxy: P::name() }) - .dec(); + this.in_flight_backend.dec(); - this.postprocess_client_request(req); + // Hand the already-buffered request to the response body via + // oneshot. The response body's terminal poll receives it and + // dispatches the combo to the postprocessor. `tx.send` is a + // trivial slot-store (no HashMap insert, no contention), so we + // no longer need PR 1's stream-first-then-file ordering trick — + // the bookkeeping cost is gone entirely. + let (req_tx, req_rx) = oneshot::channel::(); + let _ = req_tx.send(req); - Self::stream_to_client(this, req_id, conn_id, bknd_res) + Self::stream_to_client(this, req_id, conn_id, bknd_res, req_rx) } fn stream_to_client( @@ -722,6 +729,7 @@ where req_id: Uuid, conn_id: Uuid, bknd_res: ClientResponse, + req_rx: oneshot::Receiver, ) -> Result where S: Stream> + Unpin + 'static, @@ -732,15 +740,17 @@ where let mut clnt_res = Self::to_client_response(&bknd_res); let preallocate = this.shared.config().prealloacated_response_buffer_size(); + let content_encoding = bknd_res.headers().get(header::CONTENT_ENCODING).cloned(); let bknd_res_body = ProxyHttpResponseBody::new( this, req_id, conn_id, status, - bknd_res.headers().clone(), + content_encoding, bknd_res.into_stream(), preallocate, timestamp, + req_rx, ); #[cfg(debug_assertions)] @@ -756,38 +766,6 @@ where Ok(clnt_res.streaming(bknd_res_body)) } - fn postprocess_client_request(&self, req: ProxiedHttpRequest) { - let id = req.info.req_id; - let conn_id = req.info.conn_id; - - if self.requests.insert_sync(id, req).is_err() { - error!( - proxy = P::name(), - request_id = %id, - connection_id = %conn_id, - worker_id = %self.id, - "Duplicate request id", - ); - }; - } - - fn postprocess_backend_response(&self, bknd_res: ProxiedHttpResponse) { - let Some((_, clnt_req)) = self.requests.remove_sync(&bknd_res.info.req_id) else { - error!( - proxy = P::name(), - request_id = %bknd_res.info.req_id, - connection_id = %bknd_res.info.conn_id, - worker_id = %self.id, - "Proxied http response for unmatching request", - ); - return - }; - - // hand over to postprocessor asynchronously so that we can return the - // response to the client as early as possible - self.postprocessor.do_send(ProxiedHttpCombo { req: clnt_req, res: bknd_res }); - } - fn finalise_proxying( mut clnt_req: ProxiedHttpRequest, mut bknd_res: ProxiedHttpResponse, @@ -797,6 +775,46 @@ where mirroring_peers: Arc>>>, mut mirroring_peer_round_robin_index: usize, ) { + // Fast path: when nobody is logging the proxied request/response and + // there are no mirror peers configured, skip the (expensive) response + // decompress + parse and the (always-evaluated) `info!` formatting in + // `maybe_log_proxied_request_and_response`. We still parse the request + // for the jrpc method label so metrics keep their correct dimensions. + let log_off = + !inner.config().log_proxied_requests() && !inner.config().log_proxied_responses(); + let no_mirror = mirroring_peers.is_empty(); + if log_off && no_mirror { + // If the request body has already been parsed (intercept path) + // we can avoid a second decompress + parse. + if let Some(jrpc) = clnt_req.jrpc_meta.clone() { + Self::emit_metrics_on_proxy_success(&jrpc, &clnt_req, &bknd_res, metrics); + return; + } + if clnt_req.decompressed_size < clnt_req.size { + (clnt_req.decompressed_body, clnt_req.decompressed_size) = decompress( + clnt_req.body.clone(), + clnt_req.size, + clnt_req.info.content_encoding(), + ); + } + match serde_json::from_slice::(&clnt_req.decompressed_body) { + Ok(jrpc) => { + Self::emit_metrics_on_proxy_success(&jrpc, &clnt_req, &bknd_res, metrics); + } + Err(err) => { + warn!( + proxy = P::name(), + request_id = %clnt_req.info.req_id, + connection_id = %clnt_req.info.conn_id, + worker_id = %worker_id, + error = ?err, + "Failed to parse json-rpc request", + ); + } + } + return; + } + if clnt_req.decompressed_size < clnt_req.size { (clnt_req.decompressed_body, clnt_req.decompressed_size) = decompress(clnt_req.body.clone(), clnt_req.size, clnt_req.info.content_encoding()); @@ -807,7 +825,15 @@ where decompress(bknd_res.body.clone(), bknd_res.size, bknd_res.info.content_encoding()); } - match serde_json::from_slice::(&clnt_req.decompressed_body) { + // Reuse the parsed jrpc body when the intercept path stashed it on + // the `ProxiedHttpRequest`, otherwise parse it now. + let parsed = if let Some(stashed) = clnt_req.jrpc_meta.clone() { + Ok(stashed) + } else { + serde_json::from_slice::(&clnt_req.decompressed_body) + .map(Arc::new) + }; + match parsed { Ok(jrpc) => { if inner.should_mirror(&jrpc, &clnt_req, &bknd_res) { let mirrors_count = match inner.config().mirroring_strategy() { @@ -1353,9 +1379,31 @@ where .unwrap() // safety: verified on start .to_string(); + // Build the inner TCP connector ourselves so we can flip + // TCP_NODELAY on after each connect + use actix_service::ServiceExt as _; + let tcp_nodelay = actix_tls::connect::Connector::new( + actix_tls::connect::Resolver::default(), + ) + .service() + .map(|conn: actix_tls::connect::Connection| { + let _ = conn.io_ref().set_nodelay(true); + let _ = socket2::SockRef::from(conn.io_ref()).set_tcp_keepalive( + &socket2::TcpKeepalive::new().with_time(Duration::from_secs(60)), + ); + conn + }); + let client = Client::builder() .add_default_header((header::HOST, host)) - .connector(Connector::new().conn_keep_alive(2 * timeout).limit(connections_limit)) + .connector( + Connector::new() + .connector(tcp_nodelay) + .conn_keep_alive(2 * timeout) + .conn_lifetime(Duration::from_secs(86_400)) + .disconnect_timeout(Duration::from_millis(100)) + .limit(connections_limit), + ) .timeout(timeout) .finish(); @@ -1368,8 +1416,13 @@ where let mut req = self.client.request(info.method.clone(), url.as_str()).no_decompress(); - for (header, value) in info.headers.iter() { - req = req.insert_header((header.clone(), value.clone())); + // Append directly to the awc request's HeaderMap (vs. the + // per-header `insert_header` builder pattern, which re-runs + // `TryIntoHeaderPair` validation each iteration). HeaderValue + // clones are refcount bumps; HeaderName clones are interned/cheap. + let dst = req.headers_mut(); + for (k, v) in info.headers.iter() { + dst.append(k.clone(), v.clone()); } req @@ -1417,11 +1470,13 @@ where BodySize::Sized(size) => size, // Body is always sized BodySize::None | BodySize::Stream => 0, }; + let content_encoding = + bknd_res.headers().get(header::CONTENT_ENCODING).cloned(); let info = ProxyHttpResponseInfo::new( clnt_req.info.req_id, clnt_req.info.conn_id, bknd_res.status(), - bknd_res.headers().clone(), + content_encoding, ); let mirr_res = ProxiedHttpResponse { info, @@ -1488,6 +1543,10 @@ pub(crate) struct ProxyHttpRequestInfo { impl ProxyHttpRequestInfo { pub(crate) fn new(req: &HttpRequest, guard: Option<&ConnectionGuard>) -> Self { + // Bind connection_info() once — actix recomputes/borrows on each + // call and the lookup showed up as a hot spot under load. + let ci = req.connection_info(); + // copy over only non hop-by-hop headers let mut headers = HeaderMap::new(); for (header, value) in req.headers().iter() { @@ -1497,7 +1556,7 @@ impl ProxyHttpRequestInfo { } // append remote ip to x-forwarded-for - if let Some(peer_addr) = req.connection_info().peer_addr() { + if let Some(peer_addr) = ci.peer_addr() { let mut forwarded_for = String::new(); if let Some(ff) = req.headers().get(header::X_FORWARDED_FOR) && let Ok(ff) = ff.to_str() @@ -1512,17 +1571,17 @@ impl ProxyHttpRequestInfo { } // set x-forwarded-proto if it's not already set - if !req.connection_info().scheme().is_empty() && + if !ci.scheme().is_empty() && req.headers().get(header::X_FORWARDED_PROTO).is_none() && - let Ok(forwarded_proto) = HeaderValue::from_str(req.connection_info().scheme()) + let Ok(forwarded_proto) = HeaderValue::from_str(ci.scheme()) { headers.insert(header::X_FORWARDED_PROTO, forwarded_proto); } // set x-forwarded-host if it's not already set - if !req.connection_info().host().is_empty() && + if !ci.host().is_empty() && req.headers().get(header::X_FORWARDED_HOST).is_none() && - let Ok(forwarded_host) = HeaderValue::from_str(req.connection_info().host()) + let Ok(forwarded_host) = HeaderValue::from_str(ci.host()) { headers.insert(header::X_FORWARDED_HOST, forwarded_host); } @@ -1532,9 +1591,9 @@ impl ProxyHttpRequestInfo { let remote_addr = match guard { Some(guard) => match guard.remote_addr.clone() { Some(remote_addr) => Some(remote_addr), - None => req.connection_info().peer_addr().map(String::from), + None => ci.peer_addr().map(String::from), }, - None => req.connection_info().peer_addr().map(String::from), + None => ci.peer_addr().map(String::from), }; let path = match req.path() { @@ -1597,18 +1656,23 @@ pub(crate) struct ProxyHttpResponseInfo { req_id: Uuid, conn_id: Uuid, status: StatusCode, - headers: HeaderMap, // TODO: perhaps we don't need all headers, just select ones + content_encoding: Option, } impl ProxyHttpResponseInfo { - pub(crate) fn new(req_id: Uuid, conn_id: Uuid, status: StatusCode, headers: HeaderMap) -> Self { - Self { req_id, conn_id, status, headers } + pub(crate) fn new( + req_id: Uuid, + conn_id: Uuid, + status: StatusCode, + content_encoding: Option, + ) -> Self { + Self { req_id, conn_id, status, content_encoding } } fn content_encoding(&self) -> String { - self.headers - .get(header::CONTENT_ENCODING) - .map(|h| h.to_str().unwrap_or_default()) + self.content_encoding + .as_ref() + .and_then(|h| h.to_str().ok()) .map(|h| h.to_string()) .unwrap_or_default() } @@ -1626,9 +1690,16 @@ where info: Option, start: UtcDateTime, - body: Vec, + body: Vec, + body_len: usize, max_size: usize, + /// Oneshot for handing the completed `ProxiedHttpRequest` off to the + /// response stream (which combines it with the response and forwards + /// the pair to the postprocessor). Replaces the previous + /// `ProxyHttp.requests` HashMap. + req_tx: Option>, + #[pin] stream: S, } @@ -1638,12 +1709,17 @@ where C: ConfigProxyHttp, P: ProxyHttpInner, { + /// `_preallocate` is retained for API stability but is no longer used + /// to pre-size a flat buffer — chunks are now refcounted (`Bytes`) + /// and stored in a `Vec`, so there's no flat capacity to + /// reserve. fn new( proxy: web::Data>, info: ProxyHttpRequestInfo, body: S, - preallocate: usize, + _preallocate: usize, timestamp: UtcDateTime, + req_tx: oneshot::Sender, ) -> Self { let max_size = proxy.shared.config().max_request_size(); Self { @@ -1651,8 +1727,10 @@ where info: Some(info), stream: body, start: timestamp, - body: Vec::with_capacity(preallocate), + body: Vec::new(), + body_len: 0, max_size, + req_tx: Some(req_tx), } } } @@ -1691,14 +1769,16 @@ where thread_id = ?std::thread::current().id(), latency_total = (UtcDateTime::now() - *this.start).as_seconds_f64(), chunk_size = chunk.len(), - total_size = this.body.len() + chunk.len(), + total_size = *this.body_len + chunk.len(), "Polled chunk of http request body", ); } - if this.body.len() + chunk.len() > *this.max_size { + if *this.body_len + chunk.len() > *this.max_size { return Poll::Ready(Some(Err(E::from(PayloadError::Overflow)))); } - this.body.extend_from_slice(&chunk); + // Refcount bump only — no memcpy of the chunk payload. + *this.body_len += chunk.len(); + this.body.push(chunk.clone()); Poll::Ready(Some(Ok(chunk))) } @@ -1720,7 +1800,7 @@ where worker_id = %this.proxy.id, thread_id = ?std::thread::current().id(), latency_total = (UtcDateTime::now() - *this.start).as_seconds_f64(), - total_size = this.body.len(), + total_size = *this.body_len, error = ?err, "Error while polling http request body", ); @@ -1746,14 +1826,22 @@ where worker_id = %this.proxy.id, thread_id = ?std::thread::current().id(), latency_total = (UtcDateTime::now() - *this.start).as_seconds_f64(), - total_size = this.body.len(), + total_size = *this.body_len, "Done polling http request body", ); } let end = UtcDateTime::now(); if let Some(info) = mem::take(this.info) { - let req = ProxiedHttpRequest::new(info, mem::take(this.body), *this.start, end); - this.proxy.postprocess_client_request(req); + let body = concat_bytes(mem::take(this.body)); + let req = ProxiedHttpRequest::new(info, body, *this.start, end); + if let Some(tx) = this.req_tx.take() { + // Receiver lives on the matching response body. If + // the client disconnected before reading the + // response, the receiver is dropped and we silently + // discard the postprocess hand-off — there is no + // matching response to combine with anyway. + let _ = tx.send(req); + } } Poll::Ready(None) } @@ -1773,9 +1861,16 @@ where info: Option, start: UtcDateTime, - body: Vec, + body: Vec, + body_len: usize, max_size: usize, + /// Oneshot for receiving the matching `ProxiedHttpRequest` from the + /// request body stream. Combined with the completed response and + /// forwarded to the postprocessor on terminal poll. Replaces the + /// previous `ProxyHttp.requests` HashMap lookup. + req_rx: Option>, + #[pin] stream: S, } @@ -1785,25 +1880,32 @@ where C: ConfigProxyHttp, P: ProxyHttpInner, { + /// `_preallocate` is retained for API stability but is no longer used + /// to pre-size a flat buffer — chunks are now refcounted (`Bytes`) + /// and stored in a `Vec`, so there's no flat capacity to + /// reserve. #[allow(clippy::too_many_arguments)] fn new( proxy: web::Data>, req_id: Uuid, conn_id: Uuid, status: StatusCode, - headers: HeaderMap, + content_encoding: Option, body: S, - preallocate: usize, + _preallocate: usize, timestamp: UtcDateTime, + req_rx: oneshot::Receiver, ) -> Self { let max_size = proxy.shared.config().max_response_size(); Self { proxy, stream: body, start: timestamp, - body: Vec::with_capacity(preallocate), + body: Vec::new(), + body_len: 0, max_size, - info: Some(ProxyHttpResponseInfo::new(req_id, conn_id, status, headers)), + info: Some(ProxyHttpResponseInfo::new(req_id, conn_id, status, content_encoding)), + req_rx: Some(req_rx), } } } @@ -1842,14 +1944,16 @@ where thread_id = ?std::thread::current().id(), latency_total = (UtcDateTime::now() - *this.start).as_seconds_f64(), chunk_size = chunk.len(), - total_size = this.body.len() + chunk.len(), + total_size = *this.body_len + chunk.len(), "Polled chunk of http response body", ); } - if this.body.len() + chunk.len() > *this.max_size { + if *this.body_len + chunk.len() > *this.max_size { return Poll::Ready(Some(Err(E::from(PayloadError::Overflow)))); } - this.body.extend_from_slice(&chunk); + // Refcount bump only — no memcpy of the chunk payload. + *this.body_len += chunk.len(); + this.body.push(chunk.clone()); Poll::Ready(Some(Ok(chunk))) } @@ -1871,7 +1975,7 @@ where worker_id = %this.proxy.id, thread_id = ?std::thread::current().id(), latency_total = (UtcDateTime::now() - *this.start).as_seconds_f64(), - total_size = this.body.len(), + total_size = *this.body_len, error = ?err, "Error while polling http response body", ); @@ -1897,15 +2001,79 @@ where worker_id = %this.proxy.id, thread_id = ?std::thread::current().id(), latency_total = (UtcDateTime::now() - *this.start).as_seconds_f64(), - total_size = this.body.len(), + total_size = *this.body_len, "Done polling http response body", ); } let end = UtcDateTime::now(); if let Some(info) = mem::take(this.info) { - let res = - ProxiedHttpResponse::new(info, mem::take(this.body), *this.start, end); - this.proxy.postprocess_backend_response(res); + let body = concat_bytes(mem::take(this.body)); + let res = ProxiedHttpResponse::new(info, body, *this.start, end); + // Combine with the matching request and dispatch to + // the postprocessor. The request body's terminal poll + // sends on the oneshot — usually before the response + // completes (backend processing time dominates), so + // `try_recv` typically resolves immediately. On a + // small/fast response that races ahead of the request + // body completion, fall back to awaiting on a spawned + // task. Closed/cancelled => no request to combine with; + // log and drop the postprocessing for this exchange. + let postprocessor = this.proxy.postprocessor.clone(); + let worker_id = this.proxy.id; + match this.req_rx.take() { + Some(mut rx) => match rx.try_recv() { + Ok(req) => { + postprocessor.do_send(ProxiedHttpCombo { req, res }); + } + Err(oneshot::error::TryRecvError::Empty) => { + let req_id = res.info.req_id; + let conn_id = res.info.conn_id; + this.proxy.oneshot_spawn_fallback_count.inc(); + warn!( + proxy = P::name(), + request_id = %req_id, + connection_id = %conn_id, + worker_id = %worker_id, + "Proxied http response terminal poll raced ahead of request; spawning oneshot await fallback", + ); + actix::spawn(async move { + match rx.await { + Ok(req) => { + postprocessor + .do_send(ProxiedHttpCombo { req, res }); + } + Err(_) => { + error!( + proxy = P::name(), + request_id = %req_id, + connection_id = %conn_id, + worker_id = %worker_id, + "Proxied http response for unmatching request (oneshot closed)", + ); + } + } + }); + } + Err(oneshot::error::TryRecvError::Closed) => { + error!( + proxy = P::name(), + request_id = %res.info.req_id, + connection_id = %res.info.conn_id, + worker_id = %worker_id, + "Proxied http response for unmatching request (oneshot closed)", + ); + } + }, + None => { + error!( + proxy = P::name(), + request_id = %res.info.req_id, + connection_id = %res.info.conn_id, + worker_id = %worker_id, + "Proxied http response without a request receiver", + ); + } + } } Poll::Ready(None) } @@ -1913,6 +2081,20 @@ where } } +/// Coalesce a vector of refcounted chunks into a single `Bytes`. +/// +/// - Empty: returns `Bytes::new()`. +/// - Single chunk: returns the lone `Bytes` directly (zero-copy). +/// - Multi-chunk: concatenates into one contiguous allocation. +#[inline] +fn concat_bytes(mut chunks: Vec) -> Bytes { + match chunks.len() { + 0 => Bytes::new(), + 1 => chunks.pop().unwrap(), + _ => Bytes::from(chunks.concat()), + } +} + // ProxiedHttpRequest -------------------------------------------------- #[derive(Clone, actix::Message)] @@ -1925,24 +2107,29 @@ pub(crate) struct ProxiedHttpRequest { decompressed_size: usize, start: UtcDateTime, end: UtcDateTime, + /// Optionally stashed jrpc meta parsed on the intercept path so that + /// `finalise_proxying` does not redo `serde_json::from_slice` on the + /// same body. `None` for the prod streaming path which never parses. + jrpc_meta: Option>, } impl ProxiedHttpRequest { pub(crate) fn new( info: ProxyHttpRequestInfo, - body: Vec, + body: Bytes, start: UtcDateTime, end: UtcDateTime, ) -> Self { let size = body.len(); Self { info, - body: Bytes::from(body), + body, size, decompressed_body: Bytes::new(), decompressed_size: 0, start, end, + jrpc_meta: None, } } @@ -1979,14 +2166,14 @@ pub(crate) struct ProxiedHttpResponse { impl ProxiedHttpResponse { pub(crate) fn new( info: ProxyHttpResponseInfo, - body: Vec, + body: Bytes, start: UtcDateTime, end: UtcDateTime, ) -> Self { let size = body.len(); Self { info, - body: Bytes::from(body), + body, size, decompressed_body: Bytes::new(), decompressed_size: 0, diff --git a/crates/rproxy/src/utils/utils_http.rs b/crates/rproxy/src/utils/utils_http.rs index 3bbba50..2bffd9a 100644 --- a/crates/rproxy/src/utils/utils_http.rs +++ b/crates/rproxy/src/utils/utils_http.rs @@ -1,8 +1,15 @@ // is_hop_by_hop_header ------------------------------------------------ -pub(crate) fn is_hop_by_hop_header(name: &actix_web::http::header::HeaderName) -> bool { +use actix_web::http::header; + +pub(crate) fn is_hop_by_hop_header(name: &header::HeaderName) -> bool { + // Fast path: compare against known HeaderName constants (no allocation). + // The original implementation only filtered these four (per its ASCII + // lowercase match): connection, host, keep-alive, transfer-encoding. matches!( - name.as_str().to_ascii_lowercase().as_str(), - "connection" | "host" | "keep-alive" | "transfer-encoding" - ) + name, + &header::CONNECTION | + &header::HOST | + &header::TRANSFER_ENCODING + ) || name.as_str().eq_ignore_ascii_case("keep-alive") }