Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,20 @@
name: Rust

on: [push]
on:
push:
branches:
- master
paths:
- '**/*.rs'
- '**/Cargo.toml'
- '.github/workflows/rust.yml'
pull_request:
branches:
- master
paths:
- '**/*.rs'
- '**/Cargo.toml'
- '.github/workflows/rust.yml'

jobs:
build:
Expand All @@ -9,7 +23,9 @@ jobs:

steps:
- uses: actions/checkout@v1
- name: Setup Rust Toolchain
run: rustup toolchain install nightly
- name: Build
run: cargo build --all-features --verbose
run: cargo +nightly build --all-features --verbose
- name: Run tests
run: cargo test --all-features --verbose
run: cargo +nightly test --all-features --verbose
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/target/
**/*.rs.bk
Cargo.lock
.vscode/
14 changes: 8 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
[package]
name = "send_wrapper"
version = "0.6.0"
edition = "2018"
authors = ["Thomas Keh"]
name = "compio-send-wrapper"
version = "0.7.0"
edition = "2024"
license = "MIT/Apache-2.0"
description = """
This Rust library implements a wrapper type called SendWrapper which allows you to move around non-Send types
Expand All @@ -11,13 +10,16 @@ make sure that the wrapper is dropped from within the original thread. If any of
a panic occurs."""
keywords = ["send", "wrapper", "thread_local"]
readme = "README.md"
repository = "https://github.com/thk1/send_wrapper"
documentation = "https://docs.rs/send_wrapper"
repository = "https://github.com/compio-rs/send_wrapper"
documentation = "https://docs.rs/compio-send-wrapper"
categories = ["rust-patterns"]

[features]
futures = ["futures-core"]

current_thread_id = []
nightly = ["current_thread_id"]

[dependencies]
futures-core = { version = "0.3", optional = true }

Expand Down
38 changes: 8 additions & 30 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,14 @@ between threads, as long as you access the contained value only from within the
make sure that the wrapper is dropped from within the original thread. If any of these constraints is violated,
a panic occurs.

The idea for this crate was born in the context of a [`GTK+`]/[`gtk-rs`]-based application. [`GTK+`] applications
are strictly single-threaded. It is not allowed to call any [`GTK+`] method from a thread different to the main
thread. Consequently, all [`gtk-rs`] structs are non-[`Send`].

Sometimes you still want to do some work in background. It is possible to enqueue [`GTK+`] calls from there to be
executed in the main thread [using `Glib`]. This way you can know, that the [`gtk-rs`] structs involved are only
accessed in the main thread and will also be dropped there. This crate makes it possible for [`gtk-rs`] structs
to leave the main thread.

# Examples

```rust
use send_wrapper::SendWrapper;
use compio_send_wrapper::SendWrapper;
use std::rc::Rc;
use std::thread;
use std::sync::mpsc::channel;

// This import is important. It allows you to unwrap the value using deref(),
// deref_mut() or Deref coercion.
use std::ops::{Deref, DerefMut};

// Rc is a non-Send type.
let value = Rc::new(42);

Expand All @@ -38,8 +25,8 @@ let (sender, receiver) = channel();

let t = thread::spawn(move || {

// This would panic (because of dereferencing in wrong thread):
// let value = wrapped_value.deref();
// This would panic (because of accessing in the wrong thread):
// let value = wrapped_value.get().unwrap();

// Move SendWrapper back to main thread, so it can be dropped from there.
// If you leave this out the thread will panic because of dropping from wrong thread.
Expand All @@ -50,30 +37,24 @@ let t = thread::spawn(move || {
let wrapped_value = receiver.recv().unwrap();

// Now you can use the value again.
let value = wrapped_value.deref();

// alternatives for dereferencing:
// let value = *wrapped_value;
// let value: &NonSendType = &wrapped_value;
let value = wrapped_value.get().unwrap();

// alternatives for mutable dereferencing (value and wrapped_value must be mutable too, then):
// let mut value = wrapped_value.deref_mut();
// let mut value = &mut *wrapped_value;
// let mut value: &mut NonSendType = &mut wrapped_value;
// let mut value = wrapped_value.get_mut().unwrap();
```


## Wrapping `Future`s and `Stream`s

To use `SendWrapper` on `Future`s or `Stream`s, you should enable the Cargo feature `futures` first:
```toml
send_wrapper = { version = "0.5", features = ["futures"] }
compio-send-wrapper = { version = "0.7", features = ["futures"] }
```

Then, you can transparently wrap your `Future` or `Stream`:
```rust
use futures::{executor, future::{self, BoxFuture}};
use send_wrapper::SendWrapper;
use compio_send_wrapper::SendWrapper;

// `Rc` is a `!Send` type,
let value = Rc::new(42);
Expand All @@ -98,13 +79,10 @@ See [CHANGELOG.md](CHANGELOG.md)

# License

`send_wrapper` is distributed under the terms of both the MIT license and the Apache License (Version 2.0).
`compio-send-wrapper` is distributed under the terms of both the MIT license and the Apache License (Version 2.0).

See LICENSE-APACHE, and LICENSE-MIT for details.


[Rust]: https://www.rust-lang.org
[`Send`]: https://doc.rust-lang.org/std/marker/trait.Send.html
[`gtk-rs`]: http://gtk-rs.org/
[`GTK+`]: https://www.gtk.org/
[using `Glib`]: http://gtk-rs.org/docs/glib/source/fn.idle_add.html
20 changes: 19 additions & 1 deletion rustfmt.toml
Original file line number Diff line number Diff line change
@@ -1 +1,19 @@
hard_tabs = true
unstable_features = true

style_edition = "2024"

group_imports = "StdExternalCrate"
imports_granularity = "Crate"
reorder_imports = true

wrap_comments = true
normalize_comments = true

reorder_impl_items = true
condense_wildcard_suffixes = true
enum_discrim_align_threshold = 20
use_field_init_shorthand = true

format_strings = true
format_code_in_doc_comments = true
format_macro_matchers = true
145 changes: 69 additions & 76 deletions src/futures.rs
Original file line number Diff line number Diff line change
@@ -1,99 +1,92 @@
//! [`Future`] and [`Stream`] support for [`SendWrapper`].
use std::{
future::Future,
ops::{Deref as _, DerefMut as _},
pin::Pin,
task,
};
use std::{future::Future, pin::Pin, task};

use futures_core::Stream;

use crate::SendWrapper;
use crate::{SendWrapper, invalid_deref, invalid_poll};

impl<F: Future> Future for SendWrapper<F> {
type Output = F::Output;
type Output = F::Output;

/// Polls this [`SendWrapper`] [`Future`].
///
/// # Panics
///
/// Polling panics if it is done from a different thread than the one the [`SendWrapper`]
/// instance has been created with.
#[track_caller]
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
self.assert_valid_for_poll();
// This is safe as `SendWrapper` itself points to the inner `Future`.
// So, as long as `SendWrapper` is pinned, the inner `Future` is pinned too.
unsafe { self.map_unchecked_mut(Self::deref_mut) }.poll(cx)
}
/// Polls this [`SendWrapper`] [`Future`].
///
/// # Panics
///
/// Polling panics if it is done from a different thread than the one the
/// [`SendWrapper`] instance has been created with.
#[track_caller]
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
self.get_pinned_mut()
.unwrap_or_else(|| invalid_poll())
.poll(cx)
}
}

impl<S: Stream> Stream for SendWrapper<S> {
type Item = S::Item;
type Item = S::Item;

/// Polls this [`SendWrapper`] [`Stream`].
///
/// # Panics
///
/// Polling panics if it is done from a different thread than the one the [`SendWrapper`]
/// instance has been created with.
#[track_caller]
fn poll_next(
self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> task::Poll<Option<Self::Item>> {
self.assert_valid_for_poll();
// This is safe as `SendWrapper` itself points to the inner `Stream`.
// So, as long as `SendWrapper` is pinned, the inner `Stream` is pinned too.
unsafe { self.map_unchecked_mut(Self::deref_mut) }.poll_next(cx)
}
/// Polls this [`SendWrapper`] [`Stream`].
///
/// # Panics
///
/// Polling panics if it is done from a different thread than the one the
/// [`SendWrapper`] instance has been created with.
#[track_caller]
fn poll_next(
self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> task::Poll<Option<Self::Item>> {
self.get_pinned_mut()
.unwrap_or_else(|| invalid_poll())
.poll_next(cx)
}

#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
self.deref().size_hint()
}
#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
self.get().unwrap_or_else(|| invalid_deref()).size_hint()
}
}

#[cfg(test)]
mod tests {
use std::thread;
use std::thread;

use futures_executor as executor;
use futures_util::{future, stream, StreamExt};
use futures_executor as executor;
use futures_util::{StreamExt, future, stream};

use crate::SendWrapper;
use crate::SendWrapper;

#[test]
fn test_future() {
let w1 = SendWrapper::new(future::ready(42));
let w2 = w1.clone();
assert_eq!(
format!("{:?}", executor::block_on(w1)),
format!("{:?}", executor::block_on(w2)),
);
}
#[test]
fn test_future() {
let w1 = SendWrapper::new(future::ready(42));
let w2 = w1.clone();
assert_eq!(
format!("{:?}", executor::block_on(w1)),
format!("{:?}", executor::block_on(w2)),
);
}

#[test]
fn test_future_panic() {
let w = SendWrapper::new(future::ready(42));
let t = thread::spawn(move || executor::block_on(w));
assert!(t.join().is_err());
}
#[test]
fn test_future_panic() {
let w = SendWrapper::new(future::ready(42));
let t = thread::spawn(move || executor::block_on(w));
assert!(t.join().is_err());
}

#[test]
fn test_stream() {
let mut w1 = SendWrapper::new(stream::once(future::ready(42)));
let mut w2 = SendWrapper::new(stream::once(future::ready(42)));
assert_eq!(
format!("{:?}", executor::block_on(w1.next())),
format!("{:?}", executor::block_on(w2.next())),
);
}
#[test]
fn test_stream() {
let mut w1 = SendWrapper::new(stream::once(future::ready(42)));
let mut w2 = SendWrapper::new(stream::once(future::ready(42)));
assert_eq!(
format!("{:?}", executor::block_on(w1.next())),
format!("{:?}", executor::block_on(w2.next())),
);
}

#[test]
fn test_stream_panic() {
let mut w = SendWrapper::new(stream::once(future::ready(42)));
let t = thread::spawn(move || executor::block_on(w.next()));
assert!(t.join().is_err());
}
#[test]
fn test_stream_panic() {
let mut w = SendWrapper::new(stream::once(future::ready(42)));
let t = thread::spawn(move || executor::block_on(w.next()));
assert!(t.join().is_err());
}
}
Loading