Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
191 changes: 171 additions & 20 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1347,6 +1347,149 @@ where
.push_back(ExtendedProtocolData::create_new_close(message, close));
}

// Flush
// Frontend asks the server to push pending responses
// without ending the extended-query sequence. The
// server stays in extended-query mode (no
// ReadyForQuery) and we keep the server checked out.
// Used by drivers like postgres.js between
// Parse/Describe and Bind/Execute (`describeFirst`).
'H' => {
debug!("Flushing buffered extended-protocol messages to server");

// Drain the buffered extended-protocol messages
// into self.buffer the same way Sync does, but
// without sending Sync — append the Flush byte
// instead so the server pushes responses but
// stays in extended-query state.
while let Some(protocol_data) =
self.extended_protocol_data_buffer.pop_front()
{
match protocol_data {
ExtendedProtocolData::Parse { data, metadata } => {
let (parse, hash) = match metadata {
Some(metadata) => metadata,
None => {
let first_char_in_name = *data.get(5).unwrap_or(&0);
if first_char_in_name != 0 {
server.mark_dirty();
}
self.buffer.put(&data[..]);
continue;
}
};

if server.has_prepared_statement(&parse.name) {
self.response_message_queue_buffer.put(parse_complete());
} else {
self.register_parse_to_server_cache(
false, &hash, &parse, &pool, server, &address,
)
.await?;
self.buffer.put(&data[..]);
}
}
ExtendedProtocolData::Bind { data, metadata } => {
if let Some(client_given_name) = metadata {
self.ensure_prepared_statement_is_on_server(
client_given_name,
&pool,
server,
&address,
)
.await?;
}
self.buffer.put(&data[..]);
}
ExtendedProtocolData::Describe { data, metadata } => {
if let Some(client_given_name) = metadata {
self.ensure_prepared_statement_is_on_server(
client_given_name,
&pool,
server,
&address,
)
.await?;
}
self.buffer.put(&data[..]);
}
ExtendedProtocolData::Execute { data } => {
self.buffer.put(&data[..])
}
ExtendedProtocolData::Close { data, close } => {
if self.prepared_statements_enabled
&& close.is_prepared_statement()
&& !close.anonymous()
{
self.prepared_statements.remove(&close.name);
self.response_message_queue_buffer.put(close_complete());
} else {
self.buffer.put(&data[..]);
}
}
}
}

// Append the Flush byte so the server pushes its
// pending responses now.
self.buffer.put(&message[..]);

// If the buffer contains only the Flush byte (no
// pending extended-protocol work), there is
// nothing for the server to flush — just emit any
// queued client responses and continue.
let only_flush = *self.buffer.first().unwrap() == b'H';

if !self.response_message_queue_buffer.is_empty() {
if let Err(err) = write_all_flush(
&mut self.write,
&self.response_message_queue_buffer,
)
.await
{
server.mark_bad(err.to_string().as_str());
return Err(err);
}
self.response_message_queue_buffer.clear();
}

if !only_flush {
// Send to server.
self.send_server_message(server, &self.buffer, &address, &pool)
.await?;

// Read the Flush response — must NOT use the
// normal recv loop because the server does not
// send ReadyForQuery in response to Flush.
let response = match server
.recv_flush_response(Some(&mut self.server_parameters))
.await
{
Ok(r) => r,
Err(err) => {
pool.ban(&address, BanReason::MessageReceiveFailed, Some(&self.stats));
error_response_terminal(
&mut self.write,
&format!("error receiving Flush response: {:?}", err),
)
.await?;
return Err(err);
}
};

if let Err(err) = write_all_flush(&mut self.write, &response).await {
server.mark_bad(err.to_string().as_str());
return Err(err);
}
}

self.buffer.clear();

// Do NOT release the server: extended-query
// exchange is still open. A Sync from the client
// will eventually end it.
}

// Sync
// Frontend (client) is asking for the query result now.
'S' => {
Expand Down Expand Up @@ -1743,29 +1886,37 @@ where
.register_parse_to_server_cache(true, hash, parse, pool, server, address)
.await
{
Ok(_) => (),
Err(err) => match err {
Error::PreparedStatementError => {
debug!("Removed {} from client cache", client_name);
self.prepared_statements.remove(&client_name);
}

_ => {
return Err(err);
}
},
Ok(_) => Ok(()),
Err(Error::PreparedStatementError) => {
// The backend rejected our Parse. This used to silently
// drop the statement from the *client* cache and return
// Ok, which left tokio-postgres clients believing the
// statement was still valid: the very next Bind would
// miss in `buffer_bind` and abort the whole TCP conn
// with "Prepared statement sN doesn't exist", killing
// pgraft-style ETL workloads that prepare-then-reuse
// across many transactions.
//
// Keep the client cache intact (it represents what the
// client believes), mark this server bad so the pool
// replaces it on next checkout, and surface the error
// so the caller can retry on a fresh backend.
warn!(
"Server {:?} rejected re-prepare of `{}` — marking bad so pool replaces it",
address, client_name
);
server.mark_bad("prepared statement re-register failed");
Err(Error::PreparedStatementError)
}
Err(err) => Err(err),
}
}

None => {
return Err(Error::ClientError(format!(
"prepared statement `{}` not found",
client_name
)))
}
};

Ok(())
None => Err(Error::ClientError(format!(
"prepared statement `{}` not found",
client_name
))),
}
}

/// Register the parse to the server cache and send it to the server if requested (ie. requested by pgcat)
Expand Down
75 changes: 75 additions & 0 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1114,6 +1114,81 @@ impl Server {
Ok(bytes)
}

/// Read server messages emitted in response to a Flush.
///
/// Unlike `recv`, this loop never waits for ReadyForQuery (`Z`) —
/// the server does not send one in response to Flush. Instead we
/// read every message available, and stop once we hit a terminal
/// describe-response marker (RowDescription `T`, NoData `n`,
/// ParameterDescription `t`-followed-by-T-or-n, or ErrorResponse
/// `E`). On error we still keep buffering until ReadyForQuery
/// is NOT expected — we just return what we have.
pub async fn recv_flush_response(
&mut self,
mut client_server_parameters: Option<&mut ServerParameters>,
) -> Result<BytesMut, Error> {
loop {
let message = match read_message(&mut self.stream).await {
Ok(message) => message,
Err(err) => {
error!(
"Terminating server {:?} during Flush response: {:?}",
self.address, err
);
self.bad = true;
return Err(err);
}
};

// Inspect framing byte without consuming the message — avoid
// re-parsing length and avoid the double copy of body bytes.
let code = *message.first().unwrap_or(&0);

// Only ParameterStatus needs body parsing (to track session
// params). Skip get_u8/get_i32/read_string for everything
// else — these messages are 5–N bytes and we only need to
// know the framing byte.
if code == b'S' {
// ParameterStatus body: code(1) + len(4) + key\0 + value\0.
// Parse into an owned BytesMut just for the body so we
// can use the existing BytesMutReader impl without
// disturbing the original `message` we still need to
// append to self.buffer below.
let mut body = BytesMut::from(&message[5..]);
if let (Ok(key), Ok(value)) = (body.read_string(), body.read_string()) {
if let Some(client_server_parameters) = client_server_parameters.as_mut() {
client_server_parameters.set_param(key.clone(), value.clone(), false);
}
self.server_parameters.set_param(key, value, false);
}
} else if code == b'1' {
// ParseComplete — consume one pending registration.
self.registering_prepared_statement.pop_front();
}

self.buffer.extend_from_slice(&message);

// Terminal describe/bind-response markers — server done
// emitting in response to this Flush.
// T = RowDescription, n = NoData, E = ErrorResponse,
// 2 = BindComplete (Flush after Bind w/o Execute).
if matches!(code, b'T' | b'n' | b'E' | b'2') {
self.data_available = false;
break;
}
}

// mem::take instead of clone — hands the BytesMut to caller
// without a memcpy+alloc. self.buffer is left empty with retained
// capacity is NOT a concern; the empty BytesMut here is fine
// because we reuse it next call (push triggers fresh alloc on
// first put, identical cost to the prior clear()).
let bytes = std::mem::take(&mut self.buffer);
self.stats().data_received(bytes.len());
self.last_activity = SystemTime::now();
Ok(bytes)
}

// Determines if the server already has a prepared statement with the given name
// Increments the prepared statement cache hit counter
pub fn has_prepared_statement(&mut self, name: &str) -> bool {
Expand Down