From 4c77c34a4050bb37f01d1e6df764b0768645c6ef Mon Sep 17 00:00:00 2001 From: Kim Romero Date: Fri, 5 Jun 2026 15:58:57 +0300 Subject: [PATCH 01/13] Drop String alloc in is_hop_by_hop_header --- crates/rproxy/src/utils/utils_http.rs | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/crates/rproxy/src/utils/utils_http.rs b/crates/rproxy/src/utils/utils_http.rs index 3bbba50..2bffd9a 100644 --- a/crates/rproxy/src/utils/utils_http.rs +++ b/crates/rproxy/src/utils/utils_http.rs @@ -1,8 +1,15 @@ // is_hop_by_hop_header ------------------------------------------------ -pub(crate) fn is_hop_by_hop_header(name: &actix_web::http::header::HeaderName) -> bool { +use actix_web::http::header; + +pub(crate) fn is_hop_by_hop_header(name: &header::HeaderName) -> bool { + // Fast path: compare against known HeaderName constants (no allocation). + // The original implementation only filtered these four (per its ASCII + // lowercase match): connection, host, keep-alive, transfer-encoding. matches!( - name.as_str().to_ascii_lowercase().as_str(), - "connection" | "host" | "keep-alive" | "transfer-encoding" - ) + name, + &header::CONNECTION | + &header::HOST | + &header::TRANSFER_ENCODING + ) || name.as_str().eq_ignore_ascii_case("keep-alive") } From 0da807613743174468815a97cb05b8b8bf96e0d7 Mon Sep 17 00:00:00 2001 From: Kim Romero Date: Fri, 5 Jun 2026 16:01:16 +0300 Subject: [PATCH 02/13] Cache in-flight metric handles on ProxyHttp --- crates/rproxy/src/server/proxy/http/proxy.rs | 122 +++++++------------ 1 file changed, 43 insertions(+), 79 deletions(-) diff --git a/crates/rproxy/src/server/proxy/http/proxy.rs b/crates/rproxy/src/server/proxy/http/proxy.rs index 131b928..fd79100 100644 --- a/crates/rproxy/src/server/proxy/http/proxy.rs +++ b/crates/rproxy/src/server/proxy/http/proxy.rs @@ -40,6 +40,7 @@ use bytes::Bytes; use futures::TryStreamExt; use futures_core::Stream; use pin_project::pin_project; +use prometheus_client::metrics::{counter::Counter, gauge::Gauge}; use scc::HashMap; use time::{UtcDateTime, format_description::well_known::Iso8601}; use tokio::sync::broadcast; @@ -79,6 +80,13 @@ where backend: ProxyHttpBackendEndpoint, requests: HashMap, postprocessor: actix::Addr>, + + // Per-worker cached metric handles. These are cheap to clone (each + // wraps an `Arc` internally) and bypass the per-request + // `Family::get_or_create` lookup on the hot path. + in_flight_client: Gauge, + in_flight_backend: Gauge, + proxy_failure_count: Counter, } impl ProxyHttp @@ -135,7 +143,24 @@ where } .start(); - Self { id, shared, backend, requests: HashMap::default(), postprocessor } + let labels_proxy = LabelsProxy { proxy: P::name() }; + let in_flight_client = + shared.metrics.http_in_flight_requests_client.get_or_create(&labels_proxy).clone(); + let in_flight_backend = + shared.metrics.http_in_flight_requests_backend.get_or_create(&labels_proxy).clone(); + let proxy_failure_count = + shared.metrics.http_proxy_failure_count.get_or_create(&labels_proxy).clone(); + + Self { + id, + shared, + backend, + requests: HashMap::default(), + postprocessor, + in_flight_client, + in_flight_backend, + proxy_failure_count, + } } pub(crate) async fn run( @@ -407,10 +432,8 @@ where .inc(); } - metrics - .http_in_flight_requests_client - .get_or_create(&LabelsProxy { proxy: P::name() }) - .inc(); + let in_flight_client = this.in_flight_client.clone(); + in_flight_client.inc(); let res = if this.shared.inner.might_intercept() { Self::send_to_backend_and_maybe_intercept(this, info, clnt_req_body, timestamp).await @@ -418,10 +441,7 @@ where Self::stream_to_backend(this, info, clnt_req_body, timestamp).await }; - metrics - .http_in_flight_requests_client - .get_or_create(&LabelsProxy { proxy: P::name() }) - .dec(); + in_flight_client.dec(); res } @@ -444,11 +464,7 @@ where timestamp, ); - this.shared - .metrics - .http_in_flight_requests_backend - .get_or_create(&LabelsProxy { proxy: P::name() }) - .inc(); + this.in_flight_backend.inc(); #[cfg(debug_assertions)] debug!( @@ -477,16 +493,8 @@ where error = ?err, "Failed to proxy a request", ); - this.shared - .metrics - .http_in_flight_requests_backend - .get_or_create(&LabelsProxy { proxy: P::name() }) - .dec(); - this.shared - .metrics - .http_proxy_failure_count - .get_or_create(&LabelsProxy { proxy: P::name() }) - .inc(); + this.in_flight_backend.dec(); + this.proxy_failure_count.inc(); return Ok(HttpResponse::BadGateway().body(format!("Backend error: {err:?}"))); } }; @@ -503,11 +511,7 @@ where "Finished streaming http request to backend", ); - this.shared - .metrics - .http_in_flight_requests_backend - .get_or_create(&LabelsProxy { proxy: P::name() }) - .dec(); + this.in_flight_backend.dec(); Self::stream_to_client(this, req_id, conn_id, bknd_res) } @@ -532,11 +536,7 @@ where "Sending http request to backend...", ); - this.shared - .metrics - .http_in_flight_requests_backend - .get_or_create(&LabelsProxy { proxy: P::name() }) - .inc(); + this.in_flight_backend.inc(); let body = match clnt_req_body.to_bytes_limited(this.shared.config().max_request_size()).await { @@ -554,16 +554,8 @@ where error = ?err, "Failed to proxy a request", ); - this.shared - .metrics - .http_in_flight_requests_backend - .get_or_create(&LabelsProxy { proxy: P::name() }) - .dec(); - this.shared - .metrics - .http_proxy_failure_count - .get_or_create(&LabelsProxy { proxy: P::name() }) - .inc(); + this.in_flight_backend.dec(); + this.proxy_failure_count.inc(); return Ok(HttpResponse::BadGateway().body(format!("Backend error: {err:?}"))); } @@ -583,25 +575,13 @@ where error = ?err, "Failed to proxy a request", ); - this.shared - .metrics - .http_in_flight_requests_backend - .get_or_create(&LabelsProxy { proxy: P::name() }) - .dec(); - this.shared - .metrics - .http_proxy_failure_count - .get_or_create(&LabelsProxy { proxy: P::name() }) - .inc(); + this.in_flight_backend.dec(); + this.proxy_failure_count.inc(); return Ok(HttpResponse::PayloadTooLarge().finish()); } }; - this.shared - .metrics - .http_in_flight_requests_backend - .get_or_create(&LabelsProxy { proxy: P::name() }) - .dec(); + this.in_flight_backend.dec(); #[cfg(debug_assertions)] debug!( @@ -672,11 +652,7 @@ where let bknd_req = this.backend.new_backend_request(&req.info); - this.shared - .metrics - .http_in_flight_requests_backend - .get_or_create(&LabelsProxy { proxy: P::name() }) - .inc(); + this.in_flight_backend.inc(); let bknd_res = match bknd_req.send_body(req.body.clone()).await { Ok(bknd_res) => bknd_res, @@ -692,25 +668,13 @@ where error = ?err, "Failed to proxy a request", ); - this.shared - .metrics - .http_proxy_failure_count - .get_or_create(&LabelsProxy { proxy: P::name() }) - .inc(); - this.shared - .metrics - .http_in_flight_requests_backend - .get_or_create(&LabelsProxy { proxy: P::name() }) - .dec(); + this.proxy_failure_count.inc(); + this.in_flight_backend.dec(); return Ok(HttpResponse::BadGateway().body(format!("Backend error: {err:?}"))); } }; - this.shared - .metrics - .http_in_flight_requests_backend - .get_or_create(&LabelsProxy { proxy: P::name() }) - .dec(); + this.in_flight_backend.dec(); this.postprocess_client_request(req); From bfc08dc26747310317b8ed62595b37026082b785 Mon Sep 17 00:00:00 2001 From: Kim Romero Date: Fri, 5 Jun 2026 16:02:09 +0300 Subject: [PATCH 03/13] Cache user-agent counter per worker, cap at 100 --- crates/rproxy/src/server/proxy/http/proxy.rs | 36 ++++++++++++++++---- 1 file changed, 29 insertions(+), 7 deletions(-) diff --git a/crates/rproxy/src/server/proxy/http/proxy.rs b/crates/rproxy/src/server/proxy/http/proxy.rs index fd79100..513a11c 100644 --- a/crates/rproxy/src/server/proxy/http/proxy.rs +++ b/crates/rproxy/src/server/proxy/http/proxy.rs @@ -87,6 +87,11 @@ where in_flight_client: Gauge, in_flight_backend: Gauge, proxy_failure_count: Counter, + + // Per-worker cache of (user_agent -> Counter) so that we only pay the + // `Family::get_or_create` cost the first time a UA is seen on this + // worker. Lookup is `&str`-keyed (no allocation on hit). + client_info_cache: HashMap, } impl ProxyHttp @@ -160,6 +165,7 @@ where in_flight_client, in_flight_backend, proxy_failure_count, + client_info_cache: HashMap::default(), } } @@ -423,13 +429,29 @@ where !user_agent.is_empty() && let Ok(user_agent) = user_agent.to_str() { - metrics - .client_info - .get_or_create(&LabelsProxyClientInfo { - proxy: P::name(), - user_agent: user_agent.to_string(), - }) - .inc(); + // Hot path: read-only lookup keyed by &str (no allocation). + // Cold path: allocate the owned String key and resolve the + // counter from the metrics family exactly once per worker per + // distinct UA. + if let Some(entry) = this.client_info_cache.read_sync(user_agent, |_, c| c.clone()) { + entry.inc(); + } else { + let counter = metrics + .client_info + .get_or_create(&LabelsProxyClientInfo { + proxy: P::name(), + user_agent: user_agent.to_string(), + }) + .clone(); + counter.inc(); + // soft cap to prevent unbounded growth from hostile UA values + if this.client_info_cache.len() < 100 { + // Best-effort insert; races with another task on the same + // worker thread shouldn't happen given !Send actix workers, + // but tolerate insert errors regardless. + let _ = this.client_info_cache.insert_sync(user_agent.to_string(), counter); + } + } } let in_flight_client = this.in_flight_client.clone(); From 69e1189c1ea4d0a683d8779cdf858026f5c53fba Mon Sep 17 00:00:00 2001 From: Kim Romero Date: Fri, 5 Jun 2026 16:03:05 +0300 Subject: [PATCH 04/13] Bind connection_info() once per request --- crates/rproxy/src/server/proxy/http/proxy.rs | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/crates/rproxy/src/server/proxy/http/proxy.rs b/crates/rproxy/src/server/proxy/http/proxy.rs index 513a11c..89f8731 100644 --- a/crates/rproxy/src/server/proxy/http/proxy.rs +++ b/crates/rproxy/src/server/proxy/http/proxy.rs @@ -1474,6 +1474,10 @@ pub(crate) struct ProxyHttpRequestInfo { impl ProxyHttpRequestInfo { pub(crate) fn new(req: &HttpRequest, guard: Option<&ConnectionGuard>) -> Self { + // Bind connection_info() once — actix recomputes/borrows on each + // call and the lookup showed up as a hot spot under load. + let ci = req.connection_info(); + // copy over only non hop-by-hop headers let mut headers = HeaderMap::new(); for (header, value) in req.headers().iter() { @@ -1483,7 +1487,7 @@ impl ProxyHttpRequestInfo { } // append remote ip to x-forwarded-for - if let Some(peer_addr) = req.connection_info().peer_addr() { + if let Some(peer_addr) = ci.peer_addr() { let mut forwarded_for = String::new(); if let Some(ff) = req.headers().get(header::X_FORWARDED_FOR) && let Ok(ff) = ff.to_str() @@ -1498,17 +1502,17 @@ impl ProxyHttpRequestInfo { } // set x-forwarded-proto if it's not already set - if !req.connection_info().scheme().is_empty() && + if !ci.scheme().is_empty() && req.headers().get(header::X_FORWARDED_PROTO).is_none() && - let Ok(forwarded_proto) = HeaderValue::from_str(req.connection_info().scheme()) + let Ok(forwarded_proto) = HeaderValue::from_str(ci.scheme()) { headers.insert(header::X_FORWARDED_PROTO, forwarded_proto); } // set x-forwarded-host if it's not already set - if !req.connection_info().host().is_empty() && + if !ci.host().is_empty() && req.headers().get(header::X_FORWARDED_HOST).is_none() && - let Ok(forwarded_host) = HeaderValue::from_str(req.connection_info().host()) + let Ok(forwarded_host) = HeaderValue::from_str(ci.host()) { headers.insert(header::X_FORWARDED_HOST, forwarded_host); } @@ -1518,9 +1522,9 @@ impl ProxyHttpRequestInfo { let remote_addr = match guard { Some(guard) => match guard.remote_addr.clone() { Some(remote_addr) => Some(remote_addr), - None => req.connection_info().peer_addr().map(String::from), + None => ci.peer_addr().map(String::from), }, - None => req.connection_info().peer_addr().map(String::from), + None => ci.peer_addr().map(String::from), }; let path = match req.path() { From 4f39ca91919847c5c30d114721a9ab560176c8eb Mon Sep 17 00:00:00 2001 From: Kim Romero Date: Fri, 5 Jun 2026 16:03:52 +0300 Subject: [PATCH 05/13] Append headers without per-iter revalidation --- crates/rproxy/src/server/proxy/http/proxy.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/crates/rproxy/src/server/proxy/http/proxy.rs b/crates/rproxy/src/server/proxy/http/proxy.rs index 89f8731..4cefee7 100644 --- a/crates/rproxy/src/server/proxy/http/proxy.rs +++ b/crates/rproxy/src/server/proxy/http/proxy.rs @@ -1354,8 +1354,13 @@ where let mut req = self.client.request(info.method.clone(), url.as_str()).no_decompress(); - for (header, value) in info.headers.iter() { - req = req.insert_header((header.clone(), value.clone())); + // Append directly to the awc request's HeaderMap (vs. the + // per-header `insert_header` builder pattern, which re-runs + // `TryIntoHeaderPair` validation each iteration). HeaderValue + // clones are refcount bumps; HeaderName clones are interned/cheap. + let dst = req.headers_mut(); + for (k, v) in info.headers.iter() { + dst.append(k.clone(), v.clone()); } req From c9a072561d82afa91a0f8926cfa2bcd21e11020b Mon Sep 17 00:00:00 2001 From: Kim Romero Date: Fri, 5 Jun 2026 16:04:45 +0300 Subject: [PATCH 06/13] Drop HeaderName::from_str on response forwarding --- crates/rproxy/src/server/proxy/http/proxy.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/crates/rproxy/src/server/proxy/http/proxy.rs b/crates/rproxy/src/server/proxy/http/proxy.rs index 4cefee7..8834958 100644 --- a/crates/rproxy/src/server/proxy/http/proxy.rs +++ b/crates/rproxy/src/server/proxy/http/proxy.rs @@ -5,7 +5,6 @@ use std::{ mem, ops::Add, pin::Pin, - str::FromStr, sync::{ Arc, atomic::{AtomicI64, AtomicUsize, Ordering}, @@ -389,13 +388,15 @@ where fn to_client_response(bknd_res: &ClientResponse) -> HttpResponseBuilder { let mut clnt_res = HttpResponse::build(bknd_res.status()); + // The backend's `HeaderName` instances are already validated by the + // http parser, so we forward them verbatim without round-tripping + // through `HeaderName::from_str` (which was showing up as + // surprisingly hot on engine_* call patterns). for (hkey, hval) in bknd_res.headers().iter() { if is_hop_by_hop_header(hkey) { continue; } - if let Ok(hkey) = header::HeaderName::from_str(hkey.as_str()) { - clnt_res.append_header((hkey, hval.clone())); - } + clnt_res.append_header((hkey.clone(), hval.clone())); } clnt_res From 9d16040966872d7b60c599c175989bc8b844c470 Mon Sep 17 00:00:00 2001 From: Kim Romero Date: Thu, 4 Jun 2026 11:48:28 +0300 Subject: [PATCH 07/13] Set TCP_NODELAY on backend connections --- crates/rproxy/src/server/proxy/http/proxy.rs | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/crates/rproxy/src/server/proxy/http/proxy.rs b/crates/rproxy/src/server/proxy/http/proxy.rs index 8834958..b5ec08f 100644 --- a/crates/rproxy/src/server/proxy/http/proxy.rs +++ b/crates/rproxy/src/server/proxy/http/proxy.rs @@ -1340,9 +1340,26 @@ where .unwrap() // safety: verified on start .to_string(); + // Build the inner TCP connector ourselves so we can flip + // TCP_NODELAY on after each connect + use actix_service::ServiceExt as _; + let tcp_nodelay = actix_tls::connect::Connector::new( + actix_tls::connect::Resolver::default(), + ) + .service() + .map(|conn: actix_tls::connect::Connection| { + let _ = conn.io_ref().set_nodelay(true); + conn + }); + let client = Client::builder() .add_default_header((header::HOST, host)) - .connector(Connector::new().conn_keep_alive(2 * timeout).limit(connections_limit)) + .connector( + Connector::new() + .connector(tcp_nodelay) + .conn_keep_alive(2 * timeout) + .limit(connections_limit), + ) .timeout(timeout) .finish(); From 5274b87c3649cd48e2a26828e2b894ca69482170 Mon Sep 17 00:00:00 2001 From: Kim Romero Date: Fri, 5 Jun 2026 18:25:11 +0300 Subject: [PATCH 08/13] Tighten backend awc disconnect_timeout to 100ms --- crates/rproxy/src/server/proxy/http/proxy.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/crates/rproxy/src/server/proxy/http/proxy.rs b/crates/rproxy/src/server/proxy/http/proxy.rs index b5ec08f..64b6274 100644 --- a/crates/rproxy/src/server/proxy/http/proxy.rs +++ b/crates/rproxy/src/server/proxy/http/proxy.rs @@ -1358,6 +1358,9 @@ where Connector::new() .connector(tcp_nodelay) .conn_keep_alive(2 * timeout) + // 100ms grace is fine for loopback (op-rbuilder); raise if backend + // is ever non-loopback (would risk truncating responses on shutdown). + .disconnect_timeout(Duration::from_millis(100)) .limit(connections_limit), ) .timeout(timeout) From f267b6d4779757807bbd2a73dc1bd74d7d876fff Mon Sep 17 00:00:00 2001 From: Kim Romero Date: Fri, 5 Jun 2026 18:26:08 +0300 Subject: [PATCH 09/13] Enable TCP_KEEPALIVE on backend connections --- crates/rproxy/src/server/proxy/http/proxy.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/crates/rproxy/src/server/proxy/http/proxy.rs b/crates/rproxy/src/server/proxy/http/proxy.rs index 64b6274..4f30bcc 100644 --- a/crates/rproxy/src/server/proxy/http/proxy.rs +++ b/crates/rproxy/src/server/proxy/http/proxy.rs @@ -1349,6 +1349,9 @@ where .service() .map(|conn: actix_tls::connect::Connection| { let _ = conn.io_ref().set_nodelay(true); + let _ = socket2::SockRef::from(conn.io_ref()).set_tcp_keepalive( + &socket2::TcpKeepalive::new().with_time(Duration::from_secs(60)), + ); conn }); From 9c777800ff8140d6a74931efbab1d8a5fdaec816 Mon Sep 17 00:00:00 2001 From: Kim Romero Date: Fri, 5 Jun 2026 18:28:01 +0300 Subject: [PATCH 10/13] Drop HeaderMap clone, capture content-encoding only --- crates/rproxy/src/server/proxy/http/proxy.rs | 28 +++++++++++++------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/crates/rproxy/src/server/proxy/http/proxy.rs b/crates/rproxy/src/server/proxy/http/proxy.rs index 4f30bcc..ed622bd 100644 --- a/crates/rproxy/src/server/proxy/http/proxy.rs +++ b/crates/rproxy/src/server/proxy/http/proxy.rs @@ -719,12 +719,13 @@ where let mut clnt_res = Self::to_client_response(&bknd_res); let preallocate = this.shared.config().prealloacated_response_buffer_size(); + let content_encoding = bknd_res.headers().get(header::CONTENT_ENCODING).cloned(); let bknd_res_body = ProxyHttpResponseBody::new( this, req_id, conn_id, status, - bknd_res.headers().clone(), + content_encoding, bknd_res.into_stream(), preallocate, timestamp, @@ -1432,11 +1433,13 @@ where BodySize::Sized(size) => size, // Body is always sized BodySize::None | BodySize::Stream => 0, }; + let content_encoding = + bknd_res.headers().get(header::CONTENT_ENCODING).cloned(); let info = ProxyHttpResponseInfo::new( clnt_req.info.req_id, clnt_req.info.conn_id, bknd_res.status(), - bknd_res.headers().clone(), + content_encoding, ); let mirr_res = ProxiedHttpResponse { info, @@ -1616,18 +1619,23 @@ pub(crate) struct ProxyHttpResponseInfo { req_id: Uuid, conn_id: Uuid, status: StatusCode, - headers: HeaderMap, // TODO: perhaps we don't need all headers, just select ones + content_encoding: Option, } impl ProxyHttpResponseInfo { - pub(crate) fn new(req_id: Uuid, conn_id: Uuid, status: StatusCode, headers: HeaderMap) -> Self { - Self { req_id, conn_id, status, headers } + pub(crate) fn new( + req_id: Uuid, + conn_id: Uuid, + status: StatusCode, + content_encoding: Option, + ) -> Self { + Self { req_id, conn_id, status, content_encoding } } fn content_encoding(&self) -> String { - self.headers - .get(header::CONTENT_ENCODING) - .map(|h| h.to_str().unwrap_or_default()) + self.content_encoding + .as_ref() + .and_then(|h| h.to_str().ok()) .map(|h| h.to_string()) .unwrap_or_default() } @@ -1810,7 +1818,7 @@ where req_id: Uuid, conn_id: Uuid, status: StatusCode, - headers: HeaderMap, + content_encoding: Option, body: S, preallocate: usize, timestamp: UtcDateTime, @@ -1822,7 +1830,7 @@ where start: timestamp, body: Vec::with_capacity(preallocate), max_size, - info: Some(ProxyHttpResponseInfo::new(req_id, conn_id, status, headers)), + info: Some(ProxyHttpResponseInfo::new(req_id, conn_id, status, content_encoding)), } } } From d92937693dd3678ee6c8c757e9f2a4421030f993 Mon Sep 17 00:00:00 2001 From: Kim Romero Date: Fri, 5 Jun 2026 18:29:29 +0300 Subject: [PATCH 11/13] Defer postprocess_client_request off the critical path --- crates/rproxy/src/server/proxy/http/proxy.rs | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/crates/rproxy/src/server/proxy/http/proxy.rs b/crates/rproxy/src/server/proxy/http/proxy.rs index ed622bd..b0c5f81 100644 --- a/crates/rproxy/src/server/proxy/http/proxy.rs +++ b/crates/rproxy/src/server/proxy/http/proxy.rs @@ -699,9 +699,17 @@ where this.in_flight_backend.dec(); - this.postprocess_client_request(req); - - Self::stream_to_client(this, req_id, conn_id, bknd_res) + // Initiate the response stream first so the client sees no extra + // bookkeeping latency on the critical path, then file the request + // into the in-flight map for the eventual response postprocessor. + // The insert must complete before the response body stream finishes + // (which is when `postprocess_backend_response` calls `remove_sync`); + // this is guaranteed because actix won't begin polling the streaming + // body until after this synchronous handler returns. + let this_clone = this.clone(); + let res = Self::stream_to_client(this, req_id, conn_id, bknd_res); + this_clone.postprocess_client_request(req); + res } fn stream_to_client( From 818595ff4858f237f5f7e14d47daa824dbb38e19 Mon Sep 17 00:00:00 2001 From: Kim Romero Date: Fri, 5 Jun 2026 18:31:52 +0300 Subject: [PATCH 12/13] Skip postprocess entirely for non-logged, non-mirrored requests --- crates/rproxy/src/server/proxy/http/proxy.rs | 34 ++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/crates/rproxy/src/server/proxy/http/proxy.rs b/crates/rproxy/src/server/proxy/http/proxy.rs index b0c5f81..76e8453 100644 --- a/crates/rproxy/src/server/proxy/http/proxy.rs +++ b/crates/rproxy/src/server/proxy/http/proxy.rs @@ -793,6 +793,40 @@ where mirroring_peers: Arc>>>, mut mirroring_peer_round_robin_index: usize, ) { + // Fast path: when nobody is logging the proxied request/response and + // there are no mirror peers configured, skip the (expensive) response + // decompress + parse and the (always-evaluated) `info!` formatting in + // `maybe_log_proxied_request_and_response`. We still parse the request + // for the jrpc method label so metrics keep their correct dimensions. + let log_off = + !inner.config().log_proxied_requests() && !inner.config().log_proxied_responses(); + let no_mirror = mirroring_peers.is_empty(); + if log_off && no_mirror { + if clnt_req.decompressed_size < clnt_req.size { + (clnt_req.decompressed_body, clnt_req.decompressed_size) = decompress( + clnt_req.body.clone(), + clnt_req.size, + clnt_req.info.content_encoding(), + ); + } + match serde_json::from_slice::(&clnt_req.decompressed_body) { + Ok(jrpc) => { + Self::emit_metrics_on_proxy_success(&jrpc, &clnt_req, &bknd_res, metrics); + } + Err(err) => { + warn!( + proxy = P::name(), + request_id = %clnt_req.info.req_id, + connection_id = %clnt_req.info.conn_id, + worker_id = %worker_id, + error = ?err, + "Failed to parse json-rpc request", + ); + } + } + return; + } + if clnt_req.decompressed_size < clnt_req.size { (clnt_req.decompressed_body, clnt_req.decompressed_size) = decompress(clnt_req.body.clone(), clnt_req.size, clnt_req.info.content_encoding()); From f86fd2d11c13f50bb074cc356809c138a29f870e Mon Sep 17 00:00:00 2001 From: Kim Romero Date: Fri, 5 Jun 2026 18:35:33 +0300 Subject: [PATCH 13/13] Reuse parsed JrpcRequestMetaMaybeBatch in finalise_proxying --- crates/rproxy/src/server/proxy/http/proxy.rs | 31 ++++++++++++++++++-- 1 file changed, 28 insertions(+), 3 deletions(-) diff --git a/crates/rproxy/src/server/proxy/http/proxy.rs b/crates/rproxy/src/server/proxy/http/proxy.rs index 76e8453..09ac1eb 100644 --- a/crates/rproxy/src/server/proxy/http/proxy.rs +++ b/crates/rproxy/src/server/proxy/http/proxy.rs @@ -621,7 +621,8 @@ where let (decompressed_body, decompressed_size) = decompress(body.clone(), size, info.content_encoding()); - match serde_json::from_slice::(&decompressed_body) { + let jrpc_meta = match serde_json::from_slice::(&decompressed_body) + { Ok(jrpc) => { if let Some(res) = this.shared.inner.should_intercept(&jrpc) { let json_req = if this.shared.config().log_proxied_requests() { @@ -647,6 +648,9 @@ where return res; } + // Stash the parsed jrpc so `finalise_proxying` can avoid + // re-parsing the same body. + Some(Arc::new(jrpc)) } Err(err) => { @@ -660,8 +664,9 @@ where error = ?err, "Failed to parse json-rpc request", ); + None } - } + }; let req = ProxiedHttpRequest { info, @@ -671,6 +676,7 @@ where decompressed_size, start: timestamp, end: UtcDateTime::now(), + jrpc_meta, }; let bknd_req = this.backend.new_backend_request(&req.info); @@ -802,6 +808,12 @@ where !inner.config().log_proxied_requests() && !inner.config().log_proxied_responses(); let no_mirror = mirroring_peers.is_empty(); if log_off && no_mirror { + // If the request body has already been parsed (intercept path) + // we can avoid a second decompress + parse. + if let Some(jrpc) = clnt_req.jrpc_meta.clone() { + Self::emit_metrics_on_proxy_success(&jrpc, &clnt_req, &bknd_res, metrics); + return; + } if clnt_req.decompressed_size < clnt_req.size { (clnt_req.decompressed_body, clnt_req.decompressed_size) = decompress( clnt_req.body.clone(), @@ -837,7 +849,15 @@ where decompress(bknd_res.body.clone(), bknd_res.size, bknd_res.info.content_encoding()); } - match serde_json::from_slice::(&clnt_req.decompressed_body) { + // Reuse the parsed jrpc body when the intercept path stashed it on + // the `ProxiedHttpRequest`, otherwise parse it now. + let parsed = if let Some(stashed) = clnt_req.jrpc_meta.clone() { + Ok(stashed) + } else { + serde_json::from_slice::(&clnt_req.decompressed_body) + .map(Arc::new) + }; + match parsed { Ok(jrpc) => { if inner.should_mirror(&jrpc, &clnt_req, &bknd_res) { let mirrors_count = match inner.config().mirroring_strategy() { @@ -1994,6 +2014,10 @@ pub(crate) struct ProxiedHttpRequest { decompressed_size: usize, start: UtcDateTime, end: UtcDateTime, + /// Optionally stashed jrpc meta parsed on the intercept path so that + /// `finalise_proxying` does not redo `serde_json::from_slice` on the + /// same body. `None` for the prod streaming path which never parses. + jrpc_meta: Option>, } impl ProxiedHttpRequest { @@ -2012,6 +2036,7 @@ impl ProxiedHttpRequest { decompressed_size: 0, start, end, + jrpc_meta: None, } }