My colleague Matt Jibson and I recently found ourselves in the unfortunate situation of debugging this hefty async/await-related error from the Rust compiler:

error[E0277]: `(dyn futures::Stream<Item = std::result::Result<std::vec::Vec<repr::Row>, comm::Error>> + std::marker::Send + std::marker::Unpin + 'static)` cannot be shared between threads safely --> src/materialized/src/mux.rs:138:100 |
138 | async fn handle_connection(&self, conn: SniffedStream<TcpStream>) -> Result<(), anyhow::Error> { | ____________________________________________________________________________________________________^
139 | | self.handle_connection(conn).await
140 | | } | |_____^ `(dyn futures::Stream<Item = std::result::Result<std::vec::Vec<repr::Row>, comm::Error>> + std::marker::Send + std::marker::Unpin + 'static)` cannot be shared between threads safely | = help: the trait `std::marker::Sync` is not implemented for `(dyn futures::Stream<Item = std::result::Result<std::vec::Vec<repr::Row>, comm::Error>> + std::marker::Send + std::marker::Unpin + 'static)` = note: required because of the requirements on the impl of `std::marker::Sync` for `std::ptr::Unique<(dyn futures::Stream<Item = std::result::Result<std::vec::Vec<repr::Row>, comm::Error>> + std::marker::Send + std::marker::Unpin + 'static)>` = note: required because it appears within the type `std::boxed::Box<(dyn futures::Stream<Item = std::result::Result<std::vec::Vec<repr::Row>, comm::Error>> + std::marker::Send + std::marker::Unpin + 'static)>` = note: required because it appears within the type `std::option::Option<std::boxed::Box<(dyn futures::Stream<Item = std::result::Result<std::vec::Vec<repr::Row>, comm::Error>> + std::marker::Send + std::marker::Unpin + 'static)>>` = note: required because it appears within the type `coord::session::Portal` = note: required because of the requirements on the impl of `std::marker::Send` for `&coord::session::Portal` = note: required because it appears within the type `for<'r, 's, 't0, 't1, 't2, 't3, 't4, 't5, 't6, 't7, 't8> {std::future::ResumeTy, &'r mut pgwire::protocol::StateMachine<pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>>, std::string::String, pgwire::protocol::StateMachine<pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>>, &'s mut coord::SessionClient, coord::SessionClient, &'t0 coord::session::Session, &'t1 mut coord::session::Session, &'t2 str, &'t3 std::string::String, std::option::Option<&'t4 coord::session::Portal>, tokio_postgres::error::sqlstate::SqlState, impl futures::Future, (), &'t7 coord::session::Portal, impl futures::Future}` = note: required because it appears within the type `[static generator@pgwire::protocol::StateMachine::<A>::describe_portal::#0 0:&mut pgwire::protocol::StateMachine<pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>>, 1:std::string::String for<'r, 's, 't0, 't1, 't2, 't3, 't4, 't5, 't6, 't7, 't8> {std::future::ResumeTy, &'r mut pgwire::protocol::StateMachine<pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>>, std::string::String, pgwire::protocol::StateMachine<pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>>, &'s mut coord::SessionClient, coord::SessionClient, &'t0 coord::session::Session, &'t1 mut coord::session::Session, &'t2 str, &'t3 std::string::String, std::option::Option<&'t4 coord::session::Portal>, tokio_postgres::error::sqlstate::SqlState, impl futures::Future, (), &'t7 coord::session::Portal, impl futures::Future}]` = note: required because it appears within the type `std::future::from_generator::GenFuture<[static generator@pgwire::protocol::StateMachine::<A>::describe_portal::#0 0:&mut pgwire::protocol::StateMachine<pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>>, 1:std::string::String for<'r, 's, 't0, 't1, 't2, 't3, 't4, 't5, 't6, 't7, 't8> {std::future::ResumeTy, &'r mut pgwire::protocol::StateMachine<pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>>, std::string::String, pgwire::protocol::StateMachine<pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>>, &'s mut coord::SessionClient, coord::SessionClient, &'t0 coord::session::Session, &'t1 mut coord::session::Session, &'t2 str, &'t3 std::string::String, std::option::Option<&'t4 coord::session::Portal>, tokio_postgres::error::sqlstate::SqlState, impl futures::Future, (), &'t7 coord::session::Portal, impl futures::Future}]>` = note: required because it appears within the type `impl futures::Future` = note: required because it appears within the type `impl futures::Future` = note: required because it appears within the type `for<'r, 's, 't0, 't1, 't2, 't3, 't4, 't5, 't6, 't7, 't8, 't9, 't10, 't11> {std::future::ResumeTy, &'r mut pgwire::protocol::StateMachine<pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>>, pgwire::protocol::StateMachine<pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>>, &'s mut pgwire::codec::FramedConn<pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>>, pgwire::codec::FramedConn<pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>>, impl futures::Future, (), std::option::Option<pgwire::message::FrontendMessage>, std::time::Instant, &'t1 str, std::string::String, impl futures::Future, std::vec::Vec<u32>, impl futures::Future, std::vec::Vec<pgrepr::format::Format>, std::vec::Vec<std::option::Option<std::vec::Vec<u8>>>, impl futures::Future, i32, usize, impl futures::Future, impl futures::Future, impl futures::Future, impl futures::Future, impl futures::Future, impl futures::Future, impl futures::Future}` = note: required because it appears within the type `[static generator@pgwire::protocol::StateMachine::<A>::advance_ready::#0 0:&mut pgwire::protocol::StateMachine<pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>> for<'r, 's, 't0, 't1, 't2, 't3, 't4, 't5, 't6, 't7, 't8, 't9, 't10, 't11> {std::future::ResumeTy, &'r mut pgwire::protocol::StateMachine<pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>>, pgwire::protocol::StateMachine<pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>>, &'s mut pgwire::codec::FramedConn<pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>>, pgwire::codec::FramedConn<pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>>, impl futures::Future, (), std::option::Option<pgwire::message::FrontendMessage>, std::time::Instant, &'t1 str, std::string::String, impl futures::Future, std::vec::Vec<u32>, impl futures::Future, std::vec::Vec<pgrepr::format::Format>, std::vec::Vec<std::option::Option<std::vec::Vec<u8>>>, impl futures::Future, i32, usize, impl futures::Future, impl futures::Future, impl futures::Future, impl futures::Future, impl futures::Future, impl futures::Future, impl futures::Future}]` = note: required because it appears within the type `std::future::from_generator::GenFuture<[static generator@pgwire::protocol::StateMachine::<A>::advance_ready::#0 0:&mut pgwire::protocol::StateMachine<pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>> for<'r, 's, 't0, 't1, 't2, 't3, 't4, 't5, 't6, 't7, 't8, 't9, 't10, 't11> {std::future::ResumeTy, &'r mut pgwire::protocol::StateMachine<pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>>, pgwire::protocol::StateMachine<pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>>, &'s mut pgwire::codec::FramedConn<pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>>, pgwire::codec::FramedConn<pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>>, impl futures::Future, (), std::option::Option<pgwire::message::FrontendMessage>, std::time::Instant, &'t1 str, std::string::String, impl futures::Future, std::vec::Vec<u32>, impl futures::Future, std::vec::Vec<pgrepr::format::Format>, std::vec::Vec<std::option::Option<std::vec::Vec<u8>>>, impl futures::Future, i32, usize, impl futures::Future, impl futures::Future, impl futures::Future, impl futures::Future, impl futures::Future, impl futures::Future, impl futures::Future}]>` = note: required because it appears within the type `impl futures::Future` = note: required because it appears within the type `impl futures::Future` = note: required because it appears within the type `for<'r, 's, 't0, 't1> {std::future::ResumeTy, pgwire::protocol::StateMachine<pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>>, i32, std::vec::Vec<(std::string::String, std::string::String)>, &'r mut pgwire::protocol::StateMachine<pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>>, impl futures::Future, (), pgwire::protocol::State, impl futures::Future, impl futures::Future, coord::SessionClient, impl futures::Future}` = note: required because it appears within the type `[static generator@pgwire::protocol::StateMachine::<A>::run::#0 0:pgwire::protocol::StateMachine<pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>>, 1:i32, 2:std::vec::Vec<(std::string::String, std::string::String)> for<'r, 's, 't0, 't1> {std::future::ResumeTy, pgwire::protocol::StateMachine<pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>>, i32, std::vec::Vec<(std::string::String, std::string::String)>, &'r mut pgwire::protocol::StateMachine<pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>>, impl futures::Future, (), pgwire::protocol::State, impl futures::Future, impl futures::Future, coord::SessionClient, impl futures::Future}]` = note: required because it appears within the type `std::future::from_generator::GenFuture<[static generator@pgwire::protocol::StateMachine::<A>::run::#0 0:pgwire::protocol::StateMachine<pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>>, 1:i32, 2:std::vec::Vec<(std::string::String, std::string::String)> for<'r, 's, 't0, 't1> {std::future::ResumeTy, pgwire::protocol::StateMachine<pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>>, i32, std::vec::Vec<(std::string::String, std::string::String)>, &'r mut pgwire::protocol::StateMachine<pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>>, impl futures::Future, (), pgwire::protocol::State, impl futures::Future, impl futures::Future, coord::SessionClient, impl futures::Future}]>` = note: required because it appears within the type `impl futures::Future` = note: required because it appears within the type `impl futures::Future` = note: required because it appears within the type `for<'r, 's, 't0, 't1, 't2, 't3, 't4, 't5, 't6, 't7, 't8, 't9, 't10> {std::future::ResumeTy, &'r pgwire::Server, ore::netio::SniffedStream<tokio::net::TcpStream>, pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>, &'s mut pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>, impl futures::Future, (), std::result::Result<pgwire::message::FrontendStartupMessage, std::io::Error>, pgwire::message::FrontendStartupMessage, i32, std::vec::Vec<(std::string::String, std::string::String)>, u32, coord::SessionClient, pgwire::protocol::StateMachine<pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>>, impl futures::Future, bool, pgwire::Server, &'t1 coord::Client, coord::Client, &'t2 mut coord::Client, impl futures::Future, &'t4 mut ore::netio::SniffedStream<tokio::net::TcpStream>, u8, [u8; 1], &'t5 [u8], &'t6 [u8; 1], tokio::io::util::write_all::WriteAll<'t7, ore::netio::SniffedStream<tokio::net::TcpStream>>, &'t8 openssl::ssl::SslAcceptor, impl futures::Future, tokio::io::util::write_all::WriteAll<'t10, pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>>}` = note: required because it appears within the type `[static generator@pgwire::Server::handle_connection::#0 0:&pgwire::Server, 1:ore::netio::SniffedStream<tokio::net::TcpStream> for<'r, 's, 't0, 't1, 't2, 't3, 't4, 't5, 't6, 't7, 't8, 't9, 't10> {std::future::ResumeTy, &'r pgwire::Server, ore::netio::SniffedStream<tokio::net::TcpStream>, pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>, &'s mut pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>, impl futures::Future, (), std::result::Result<pgwire::message::FrontendStartupMessage, std::io::Error>, pgwire::message::FrontendStartupMessage, i32, std::vec::Vec<(std::string::String, std::string::String)>, u32, coord::SessionClient, pgwire::protocol::StateMachine<pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>>, impl futures::Future, bool, pgwire::Server, &'t1 coord::Client, coord::Client, &'t2 mut coord::Client, impl futures::Future, &'t4 mut ore::netio::SniffedStream<tokio::net::TcpStream>, u8, [u8; 1], &'t5 [u8], &'t6 [u8; 1], tokio::io::util::write_all::WriteAll<'t7, ore::netio::SniffedStream<tokio::net::TcpStream>>, &'t8 openssl::ssl::SslAcceptor, impl futures::Future, tokio::io::util::write_all::WriteAll<'t10, pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>>}]` = note: required because it appears within the type `std::future::from_generator::GenFuture<[static generator@pgwire::Server::handle_connection::#0 0:&pgwire::Server, 1:ore::netio::SniffedStream<tokio::net::TcpStream> for<'r, 's, 't0, 't1, 't2, 't3, 't4, 't5, 't6, 't7, 't8, 't9, 't10> {std::future::ResumeTy, &'r pgwire::Server, ore::netio::SniffedStream<tokio::net::TcpStream>, pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>, &'s mut pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>, impl futures::Future, (), std::result::Result<pgwire::message::FrontendStartupMessage, std::io::Error>, pgwire::message::FrontendStartupMessage, i32, std::vec::Vec<(std::string::String, std::string::String)>, u32, coord::SessionClient, pgwire::protocol::StateMachine<pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>>, impl futures::Future, bool, pgwire::Server, &'t1 coord::Client, coord::Client, &'t2 mut coord::Client, impl futures::Future, &'t4 mut ore::netio::SniffedStream<tokio::net::TcpStream>, u8, [u8; 1], &'t5 [u8], &'t6 [u8; 1], tokio::io::util::write_all::WriteAll<'t7, ore::netio::SniffedStream<tokio::net::TcpStream>>, &'t8 openssl::ssl::SslAcceptor, impl futures::Future, tokio::io::util::write_all::WriteAll<'t10, pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>>}]>` = note: required because it appears within the type `impl futures::Future` = note: required because it appears within the type `impl futures::Future` = note: required because it appears within the type `for<'r, 's> {std::future::ResumeTy, &'r pgwire::Server, ore::netio::SniffedStream<tokio::net::TcpStream>, impl futures::Future, ()}` = note: required because it appears within the type `[static generator@src/materialized/src/mux.rs:138:100: 140:6 _self:&pgwire::Server, conn:ore::netio::SniffedStream<tokio::net::TcpStream> for<'r, 's> {std::future::ResumeTy, &'r pgwire::Server, ore::netio::SniffedStream<tokio::net::TcpStream>, impl futures::Future, ()}]` = note: required because it appears within the type `std::future::from_generator::GenFuture<[static generator@src/materialized/src/mux.rs:138:100: 140:6 _self:&pgwire::Server, conn:ore::netio::SniffedStream<tokio::net::TcpStream> for<'r, 's> {std::future::ResumeTy, &'r pgwire::Server, ore::netio::SniffedStream<tokio::net::TcpStream>, impl futures::Future, ()}]>` = note: required because it appears within the type `impl futures::Future` = note: required because it appears within the type `impl futures::Future` = note: required for the cast to the object type `dyn futures::Future<Output = std::result::Result<(), anyhow::Error>> + std::marker::Send`

Good luck with that, eh?

It turns out there is a clever way to convince the compiler to spit out a far more helpful error messages. I’ve now forgotten this trick twice and so have had to invent it thrice, so I figured it was time to write it down. Maybe you’ll find it useful too.

If you’re not interested in the background, just skip down to the trick.

Background

The code in question is Materialize’s implementation of the PostgreSQL network protocol1. Roughly speaking, the protocol works like this:

  1. A client sends a SQL query of interest to the Materialize server.
  2. The server plans the SQL query. If the query is valid, the server establishes a “portal” for the query.2
  3. The client optionally asks questions about the portal, like “how many columns will be in the result set?” and “what are the types of those columns?”
  4. The client asks the server to execute the portal, optionally asking that no more than n rows are returned.
  5. The server executes the query, and sends up to n rows back to the client. If the result contains more than n rows, the server additionally tells the client “you might want to ask for more rows.”
  6. The client optionally asks for another batch of up to n rows.
  7. Steps 6 and 7 repeat until the result set is exhausted.

You can view the gory details of the full protocol on GitHub. I’m going to do my best to present a simplified version here. Like most modern networking code in Rust, our implementation is entirely asynchronous, using the recently-stabilized async/await syntax.

Today, the code that handles portal execution looks roughly like this:

struct Session { portals: HashMap<String, Portal>,
} struct Portal { sql: String, remaining_rows: Option<Vec<Row>>,
} async fn execute_query(sql: &str) -> Result<Vec<Row>, Error> { /* ... */ } async fn handle_execute( conn: Conn, session: Session, portal_name: &str, max_rows: usize,
) -> Result<(), Error> { let portal = match session.portals.get_mut(portal_name) { Some(portal) => portal, None => bail!("unknown portal {}", portal_name), }; if portal.remaining_rows.is_none() { portal.remaining_rows = Some(execute_query(&portal.sql).await?); }; let rows = portal.remaining_rows.as_mut().unwrap(); for row in rows.drain(..max_rows) { conn.send(row).await?; } if rows.is_empty() { conn.send(Message::CommandComplete).await?; } else { conn.send(Message::PortalSuspended).await?; }
}

Supporting streaming SQL

This code above works well enough, but it has the limitation that the execute_query method must produce all of the rows up front as a Vec<Row>. Materialize is, after all, a streaming database: it incrementally computes changes to queries as the input data changes. Wouldn’t it be nice if we could stream those changes to the client as they happened?

The goal is to change execute_query to return a Stream<Item = Row> so that new rows could be shipped off to the client as soon as they were ready. The naive diff would look something like this:

@@ -4,10 +4,10 @@ struct Portal { sql: String,
- remaining_rows: Option<Vec<Row>>,
+ remaining_rows: Option<Box<dyn Stream<Item = Row> + Send + Unpin>>,
 } -async fn execute_query(sql: &str) -> Result<Vec<Row>, Error> { /* ... */ }
+fn execute_query(sql: &str) -> Result<Box<dyn Stream<Item = Row> + Send + Unpin>, Error> { /* ... */ }
 async fn handle_execute( conn: Conn,
@@ -25,11 +25,11 @@ }; let rows = portal.remaining_rows.as_mut().unwrap(); - for row in rows.drain(..max_rows) {
+ while let Some(row) = rows.take(max_rows).next().await {
 conn.send(row).await?; } - if rows.is_empty() {
+ if rows.peek().is_none() {
 conn.send(Message::CommandComplete).await?; } else { conn.send(Message::PortalSuspended).await?;

But that would be too easy. This straightforward diff triggers the six-page “future cannot be shared between threads safely” error mentioned at the beginning of the post.

To explain the error message to the extent possible: our PostgreSQL server is compiled into a giant state machine by the Rust compiler. There is an entry point to the state machine that looks like this:

async fn handle_connection(conn: Connection)

This is desugared by the Rust compiler into:

fn handle_connection(conn: Conn) -> impl Future<Output = ()>

The returned Future has a poll method that drives the state machine from await point to await point. For some unknown reason the generated future does not implement the Sync trait, but the compiler insists that it needs to.

So what to do?

Failed approaches

My usual first approach to dealing with less-than-helpful error messages like this is (not sarcastically) to think really hard. Oftentimes you already know, subconciously, why the code you have written is unsafe—e.g., you’re trying to share an Rc across threads—and the compiler’s reminder is enough to jog your memory.

I remember a study, though I can’t find the source now, in which programmers were tasked with finding bugs in a code sample that contained several bugs. If the programmer was told nothing more about the code, they would often find only some of the bugs, or none at all. But if the programmer was told that the program contained n bugs, they were much more likely to find n bugs, if not more! Bad compiler errors remind me of that study. It’s weirdly much easier to find your bug when your compiler has told you that it exists, even if it can’t tell you where it is or how to fix it.

But the “think hard” strategy proved fruitless here. The code in question is very much safe according to my mental model. The connection future takes a Stream that it owns, stores it in a Portal that it owns, and reads messages out of the stream in response to execute requests. There are no other futures that have access to the Session, Portal, or Stream objects.

My next approach is to apply the most obvious, mechanical solution possible. Matt does this too, I learned. In this case the compiler is complaining about a future not implementing Sync. Maybe we just need to declare that our stream is Sync like so and everything will work out:

@@ -4,10 +4,10 @@ struct Portal { sql: String,
- remaining_rows: Option<Box<dyn Stream<Item = Row> + Send + Unpin>>,
+ remaining_rows: Option<Box<dyn Stream<Item = Row> + Send + Sync + Unpin>>,
 } -fn execute_query(sql: &str) -> Result<Box<dyn Stream<Item = Row> + Send + Unpin>, Error> { /* ... */ }
+fn execute_query(sql: &str) -> Result<Box<dyn Stream<Item = Row> + Send + Sync + Unpin>, Error> { /* ... */ }
 async fn handle_execute( conn: Conn, 

As you might have guessed, this doesn’t work. It turns out the stream that our execute_query function returns is not Sync and cannot be made Sync for reasons fundamental to the design of our communication plane.

But, still, why does the overall protocol future need to be Sync? This future is not shared between threads.

Successful strategy: demand Send earlier

If you Google enough terms related to “future”, “rust”, “async/await”, “bad error messages”, and “shared between threads”, eventually you will surface this blog post from the Rust team entitled “Improving async-await’s ‘Future is not Send’ diagnostic”. This blog post claims that many error messages of this class have been vastly improved, like:

error[E0277]: `std::sync::MutexGuard<'_, u32>` cannot be sent between threads safely --> src/main.rs:23:5 |
5 | fn is_send<T: Send>(t: T) { | ------- ---- required by this bound in `is_send`
...
23 | is_send(foo()); | ^^^^^^^ `std::sync::MutexGuard<'_, u32>` cannot be sent between threads safely | = help: within `impl std::future::Future`, the trait `std::marker::Send` is not implemented for `std::sync::MutexGuard<'_, u32>`
note: future does not implement `std::marker::Send` as this value is used across an await --> src/main.rs:15:3 |
14 | let g = x.lock().unwrap(); | - has type `std::sync::MutexGuard<'_, u32>`
15 | baz().await; | ^^^^^^^^^^^ await occurs here, with `g` maybe used later
16 | } | - `g` is later dropped here

That is a far cry from the error message at the start of this blog post.

It turns out the trick for coaxing out these nicer error messages is to manually desugar an async fn or two. In our case, manually desugaring the entry point to the PostgreSQL protocol server from

async fn handle_connection(conn: Connection) { // Do stuff with `conn`.
}

into

fn handle_connection(conn: Conn) -> impl Future<Output = ()> + Send { async move { // Do stuff with `conn`. }
}

is literally all that’s required. The key is the explicit Send bound that we’ve added to the return type.

Here’s the fantastic error that the compiler spits out now:

error: future cannot be sent between threads safely --> src/pgwire/src/protocol.rs:93:10 |
93 | ) -> impl Future<Output = Result<(), comm::Error>> + Send { | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ future created by async block is not `Send` | = help: the trait `std::marker::Sync` is not implemented for `(dyn futures::Stream<Item = std::result::Result<std::vec::Vec<repr::Row>, comm::Error>> + std::marker::Send + std::marker::Unpin + 'static)`
note: future is not `Send` as this value is used across an await --> src/pgwire/src/protocol.rs:537:9 |
525 | let portal = match self.coord_client.session().get_portal(&name) { | ------ has type `&coord::session::Portal` which is not `Send`
...
537 | self.send_describe_rows(stmt_name).await | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ await occurs here, with `portal` maybe used later
538 | } | - `portal` is later dropped here = note: the return type of a function must have a statically known size

With this laser-focused error message, the problem is now clear. There is a completely unrelated part of the protocol server, far away from execute_rows, that acquires a reference to a portal and inadvertently holds that reference across an await point. Since the Portal type is no longer Sync due to our changes, holding a reference to a portal across an await point is no longer permissible.

The fix is thankfully trivial, since the offending code can easily be refactored to scope the borrow of portal more tightly so that it is not alive across an await point.

What’s going on?

As to why this trick makes the error message so much better, I’m not entirely sure. If I had to guess, the heuristics for when the simplified error message applies are not perfect. Somehow being explicit about Send bounds more often helps these heuristics along.

In Materialize in particular, the function that calls handle_connection does so by way of the async-trait crate. There’s a decent bit of magic involved, since proper async traits don’t yet exist in the language. So possibly that is what is impeding the heuristics.

Particularly confusing is the fact that the original error message was complaining about a future not implementing Sync but the solution is to add a Send bound. But look closely at the original error message and it will make sense in hindsight:

 = help: the trait `std::marker::Sync` is not implemented for `(dyn futures::Stream<Item = std::result::Result<std::vec::Vec<repr::Row>, comm::Error>> + std::marker::Send + std::marker::Unpin + 'static)` = note: required because of the requirements on the impl of `std::marker::Sync` for `std::ptr::Unique<(dyn futures::Stream<Item = std::result::Result<std::vec::Vec<repr::Row>, comm::Error>> + std::marker::Send + std::marker::Unpin + 'static)>` ... = note: required for the cast to the object type `dyn futures::Future<Output = std::result::Result<(), anyhow::Error>> + std::marker::Send`

The notes indicate that the outer future must implement Send, but it cannot because there is an inner future which is not Sync. (The part left unsaid, which we learned from the improved error message, is that the inner future only needs to be Sync because it is unnecessarily held across an await point.)

If anyone reading this can offer more clues about what is going on, I’d love to file an issue upstream—or, better yet, a pull request. As it stands, I’ve been unable to reduce the reproduction to anything less than “all of Materialize”, which I don’t feel is appropriate for a bug report.

For now, this weird trick will have to suffice.3

If you look at the actual protocol code, rather than the simplified code in the blog post, you’ll notice something strange. According to the rules of non-lexical lifetimes (NLL), the lifetime of the borrow of the portal doesn’t span the await point! Here’s a verbatim reproduction of the code implicated by the improved error message:

let portal = match self.coord_client.session().get_portal(&name) { Some(portal) => portal, None => { return self .error( SqlState::INVALID_SQL_STATEMENT_NAME, "portal does not exist", ) .await }
};
let stmt_name = portal.statement_name.clone();
// the lifetime of the portal borrow ends here
self.send_describe_rows(stmt_name).await

By the time we get to the last line, we’ve cloned what we need out of the portal, so the lifetime of the portal borrow needn’t include the last line of the function.

Unfortunately, async/await generators do not use the same NLL analysis that the borrow checker does when determining what variables are alive across an await point. Instead, the generators use a much coarser analysis based on variable scope, like the old borrow checker. Since the scope of the variable named portal includes the last line of the function, the compiler considers its contents to be held across the await point.

If you want to experience this for yourself, I’ve reduced this behavior to a very simple snippet. Simply assigning an Rc to a variable in a scope that later calls await fails to compile, even if you immediately drop the Rc:

fn sendable() -> impl Future<Output = i64> + Send { async move { let x = Rc::new(()); drop(x); useless().await }
} async fn useless() -> i64 { 42
}

Try it on the Rust playground.

There has been much discussion about fixing this in rust-lang/rust#57017, but for many reasons that are above my pay grade to explain, the fix is extremely difficult.