Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 62 additions & 29 deletions integration/rust/tests/tokio_postgres/copy.rs
Original file line number Diff line number Diff line change
@@ -1,32 +1,65 @@
// use futures_util::{TryStreamExt, pin_mut};
// use tokio_postgres::binary_copy::{BinaryCopyInWriter, BinaryCopyOutStream};
// use tokio_postgres::types::Type;
use bytes::{BufMut, BytesMut};
use futures_util::SinkExt;
use tokio_postgres::NoTls;

// use rust::setup::connections;
/// Demonstrate that COPY FROM STDIN via extended protocol (tokio-postgres)
/// works correctly through pgdog.
///
/// tokio-postgres sends COPY using Bind+Execute+Sync (extended protocol).
/// PostgreSQL ignores the Sync during COPY IN mode, producing only one
/// ReadyForQuery instead of two. Without the remove_one_rfq() fix, the
/// stale ReadyForQuery expectation desyncs the state machine and the
/// connection becomes unusable for subsequent queries.
#[tokio::test]
async fn test_copy_in_extended_protocol() {
let (conn, connection) = tokio_postgres::connect(
"host=127.0.0.1 user=pgdog dbname=pgdog password=pgdog port=6432",
NoTls,
)
.await
.unwrap();

// #[tokio::test]
// async fn test_copy() {
// for conn in connections().await {
// conn.batch_execute(
// "DROP SCHEMA IF EXISTS rust_test_insert CASCADE;
// CREATE SCHEMA rust_test_insert;
// CREATE TABLE rust_test_insert.sharded (id BIGINT PRIMARY KEY, value VARCHAR);
// SET search_path TO rust_test_insert,public;",
// )
// .await
// .unwrap();
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("connection error: {}", e);
}
});

// let sink = conn
// .copy_in("COPY sharded (id, value) FROM STDIN BINARY")
// .await
// .unwrap();
// let writer = BinaryCopyInWriter::new(sink, &[Type::INT8, Type::TEXT]);
// for i in 0..25 {
// let writer = tokio::pin!(writer);
// writer.
// .write(&[&1_i64, &"foobar"])
// .await
// .unwrap();
// }
// }
// }
// Setup: clean slate
conn.batch_execute(
"DROP TABLE IF EXISTS _copy_test;
CREATE TABLE _copy_test (id BIGINT, value TEXT);",
)
.await
.unwrap();

// COPY FROM STDIN — tokio-postgres sends this via extended protocol
// (Parse, Bind, Execute, Sync), triggering the double-Sync pattern.
let sink = conn
.copy_in("COPY _copy_test (id, value) FROM STDIN")
.await
.unwrap();

// Write some tab-delimited rows
let mut buf = BytesMut::new();
for i in 0..10_i64 {
buf.put_slice(format!("{}\trow_{}\n", i, i).as_bytes());
}
futures_util::pin_mut!(sink);
sink.send(buf.freeze()).await.unwrap();
let rows_copied = sink.finish().await.unwrap();
assert_eq!(rows_copied, 10);

// This query AFTER the copy is the real test.
// Without the fix, the state machine has a stale ReadyForQuery
// and this query will either hang, error, or return wrong results.
let rows = conn
.query("SELECT count(*) FROM _copy_test", &[])
.await
.unwrap();
let count: i64 = rows[0].get(0);
assert_eq!(count, 10);

// Cleanup
conn.execute("DROP TABLE _copy_test", &[]).await.unwrap();
}
54 changes: 54 additions & 0 deletions pgdog/src/backend/prepared_statements.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,10 @@ impl PreparedStatements {

'G' => {
self.state.prepend('G'); // Next thing we'll see is a CopyFail or CopyDone.
// PostgreSQL ignores Sync during COPY IN (protocol spec §55.2.6).
// Remove the ReadyForQuery that was expected from the initial
// Bind+Execute+Sync — the server won't send it.
self.state.remove_one_rfq();
}

// Backend told us the copy is done.
Expand Down Expand Up @@ -374,3 +378,53 @@ impl PreparedStatements {
close
}
}

#[cfg(test)]
mod test {
use super::*;
use crate::net::messages::Payload;

/// Build a minimal backend Message with the given code byte.
fn msg(code: char) -> Message {
Message::new(Payload::named(code).freeze())
}

/// Simulate a client that sends Bind+Execute+Sync for COPY FROM STDIN
/// (the tokio-postgres double-Sync pattern). PostgreSQL ignores Sync
/// during COPY IN mode, so only one ReadyForQuery is produced.
///
/// This test exercises the real `forward()` code path — it must clean
/// up the stale ReadyForQuery when it sees CopyInResponse.
#[test]
fn test_copy_in_with_client_double_sync() {
let mut ps = PreparedStatements::new();

// Client: Bind + Execute + Sync → state expects [BindComplete, ExecutionCompleted, RFQ]
ps.state_mut().add('2');
ps.state_mut().add(ExecutionCode::ExecutionCompleted);
ps.state_mut().add('Z');

// Server responds: BindComplete
assert!(ps.forward(&msg('2')).unwrap());

// Server responds: CopyInResponse — forward() should drop the stale RFQ
assert!(ps.forward(&msg('G')).unwrap());

// Queue should be [Copy] only — no stale ReadyForQuery.
assert!(ps.in_copy_mode());
assert!(!ps.done()); // still in COPY mode

// Client sends CopyDone (handled through handle())
ps.state_mut().action('c').unwrap();

// Client sends second Sync
ps.state_mut().add('Z');

// Server: CommandComplete + ReadyForQuery (one pair, not two)
assert!(ps.forward(&msg('C')).unwrap());
assert!(ps.forward(&msg('Z')).unwrap());

// Clean — no stale entries.
assert!(ps.done());
}
}
16 changes: 16 additions & 0 deletions pgdog/src/backend/protocol/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,21 @@ impl ProtocolState {
self.queue.front() == Some(&ExecutionItem::Code(ExecutionCode::Copy))
}

/// Remove one ReadyForQuery expectation from the queue.
///
/// Called when the server enters COPY IN mode (sends CopyInResponse).
/// PostgreSQL ignores Sync during COPY IN (protocol spec §55.2.6),
/// so the ReadyForQuery that was expected from the initial
/// Bind+Execute+Sync will never arrive. Leaving it in the queue
/// would desync the state machine on the next query.
pub(crate) fn remove_one_rfq(&mut self) {
if let Some(pos) = self.queue.iter().position(|item| {
matches!(item, ExecutionItem::Code(ExecutionCode::ReadyForQuery))
}) {
self.queue.remove(pos);
}
}

pub(crate) fn is_empty(&self) -> bool {
self.len() == 0
}
Expand Down Expand Up @@ -847,4 +862,5 @@ mod test {
assert_eq!(state.action('Z').unwrap(), Action::Forward);
assert!(state.is_empty());
}

}
Loading