feat(capablities)!: sleep & spawn capabilities#1873
Conversation
📚 Documentation Check Results📦
|
🔒 Cargo Deny Results📦
|
Clippy Allow Annotation ReportComparing clippy allow annotations between branches:
Summary by Rule
Annotation Counts by File
Annotation Stats by Crate
About This ReportThis report tracks Clippy allow annotations for specific rules, showing how they've changed in this PR. Decreasing the number of these annotations generally improves code quality. |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #1873 +/- ##
==========================================
- Coverage 71.78% 71.77% -0.01%
==========================================
Files 434 437 +3
Lines 69951 70138 +187
==========================================
+ Hits 50211 50344 +133
- Misses 19740 19794 +54
🚀 New features to boost your workflow:
|
🎉 All green!❄️ No new flaky tests detected 🎯 Code Coverage (details) 🔗 Commit SHA: 1e64b86 | Docs | Datadog PR Page | Give us feedback! |
Artifact Size Benchmark Reportaarch64-alpine-linux-musl
aarch64-unknown-linux-gnu
libdatadog-x64-windows
libdatadog-x86-windows
x86_64-alpine-linux-musl
x86_64-unknown-linux-gnu
|
0f2d27f to
628bd2c
Compare
628bd2c to
4145e42
Compare
| } else { | ||
| // The task has been aborted and the worker can't be retrieved. | ||
| *self = PausableWorker::InvalidState; | ||
| Err(PausableWorkerError::TaskAborted) |
There was a problem hiding this comment.
Why has the error handIng been removed? Won't the new code panic?
There was a problem hiding this comment.
Tokio's JoinHandle implements Future<Output = Result<T, JoinError>>, now I use Box<dyn Future<Output = T>> instead for our WorkerJoinHandle<T>, so we can't directly use the same pattern.
I could make it so our JoinHandle type also is a Result but futures::remote_handle() yields a T and panics on drop-cancel, so this way everything has the same API.
Looking at solutions a little, I guess I can do something along the lines of std::panic::AssertUnwindSafe(handle).catch_unwind().await to get back a result here, but I'm not sure it's better.
There was a problem hiding this comment.
I could make it so our JoinHandle type also is a Result
I think this is the way to go we want to avoid panics and this also allows us to follow through with other workers shutdown rather than panicking and dropping data on all workers.
| stats = StatsComputationStatus::DisabledByAgent { bucket_size }; | ||
| } | ||
|
|
||
| #[cfg(all(not(target_arch = "wasm32"), feature = "telemetry"))] |
There was a problem hiding this comment.
Can't this #[cfg...] mess be separated out in some way? This is barely readable.
There was a problem hiding this comment.
I'm not sure how without making spaghetti, do you have something in mind ?
There was a problem hiding this comment.
One possibility would be to accept that the two implementations are too different and make two separate versions of build. If there's really code duplication maybe some part can be moved into separate helpers?
There was a problem hiding this comment.
(or dually move the platform-dependents bit into bespoke functions?)
166e5c7 to
f4b19a7
Compare
59f8c03 to
f4b19a7
Compare
f4b19a7 to
be149e6
Compare
| match Pin::new(&mut self.get_mut().0).poll(cx) { | ||
| Poll::Ready(Ok(val)) => Poll::Ready(val), | ||
| // JoinError means the task panicked or was aborted. Both unrecoverable. | ||
| Poll::Ready(Err(e)) => panic!("spawned task failed: {e}"), |
There was a problem hiding this comment.
I think tokio's JoinHandle return a JoinError instead of panicking which is probably better to be able to recover in some cases
There was a problem hiding this comment.
I agree. We're just wrapping JoinHandle which returns a result, so there's no obvious reason to depart from this behavior and introduce panics instead.
| { | ||
| RuntimeJoinHandle(ctx.spawn(future)) | ||
| } | ||
| } |
There was a problem hiding this comment.
Should this be replaced by NativeSpawnCapability
There was a problem hiding this comment.
I don't think it should:
- NativeSpawnCapability lives in libdd-capabilities-impl, which is a leaf crate, libdd-shared-runtime (a core crate) can't depend on it without inverting the layering
- libdd-capabilities-impl doesn't compile on wasm32, and this module needs to (the non-fork parts are wasm-compatible)
- We could make the fork methods generic and have the FFI caller pass a spawner in, but it adds API surface for no real benefit, the internal RuntimeSpawner is a 5-line ZST that avoids the dependency while keeping the fork hooks simple
There was a problem hiding this comment.
I might be missing something, but when you say:
NativeSpawnCapability lives in libdd-capabilities-impl, which is a leaf crate, libdd-shared-runtime (a core crate) can't depend on it without inverting the layering
Why should shared_runtime depend on it once we move everything to libdd-capabilities-impl? I think (but I'm not sure) @VianneyRuhlmann's question is, or at least the one I have as well, is why this code has to be in shared_runtime at all. It naively looks like it's entirely capability and native-specific, so should live in libdd-capabilities-impl. Do you need some private symbols from here?
There was a problem hiding this comment.
The fork methods are impl SharedRuntime and access private fields. They can't move to libdd-capabilities-impl without making those fields public. RuntimeSpawner is only consumed by these methods, so it's co-located with them. Moving just the spawner out wouldn't buy anything since the methods that use it must stay here regardless
be149e6 to
2bc0664
Compare
ce7a92b to
29583cd
Compare
There was a problem hiding this comment.
I have mixed feelings about the complexity and boilerplate added. Part of it is inherent to the capability approach. So the elephant in the room: is it acted and 100% decided that we want to go the WASM route and that we should support that? It has an impact of the whole of libdatadog, and it seems for now that the route for dd-trace-js is not entirely decided yet. It's obviously not one person's decision, but I wonder if we should move forward and merge things if this is not fully committed yet; since it's far-reaching, it might be harder to revert than just a git revert, as people will quickly build on it.
Another point is the shared runtime. My understanding was that the whole promise of capabilities (vs a more classical platform-specific swappable module implementation as we do for linux vs windows for example) was to push all of the WASM and node-specific concerns out of libdatadog. However the impression when looking at the shared runtime's changes is that it's full of WASM-specific concerns, which feels like we got the drawbacks of both approaches (complexity + wasm-specific code in libdatadogand cfg gates). Some functions are available on WASM, some others are not. I'm not sure yet what's the conclusion, but I think the interface of the shared runtime should be reworked if we want it to work with capabilities. Maybe the conclusion is that the whole shared runtime should be a capability, since it's offering an API that has fundamentally different implementation on WASM and native (and it seems impossible to delegate that to existing capabilities). Or maybe the answer is that we need to move some of the shared runtime into new capability.ies for async, but can still keep some code in libdd-shared-runtime. In any case, the current situation feels a bit wrong.
| pub use http::NativeHttpClient; | ||
| use libdd_capabilities::http::HttpError; | ||
| pub use libdd_capabilities::HttpClientCapability; | ||
| use libdd_capabilities::MaybeSend; | ||
| pub use libdd_capabilities::SleepCapability; | ||
| pub use libdd_capabilities::SpawnCapability; | ||
| pub use sleep::NativeSleepCapability; | ||
| pub use spawn::{NativeJoinHandle, NativeSpawnCapability}; |
There was a problem hiding this comment.
Nitpick: those could be compacted a bit (pub use libdd_capabilities::{A, B, C, D})
There was a problem hiding this comment.
I would have expected that to be handled by cargo fmt, I'll check if it's something we can enable
There was a problem hiding this comment.
The codebase doesn't have any standard on that front from what I can tell, whether you put crate or module on the rustfmt.toml, there are dozens of files that change.
| Self { | ||
| http: DefaultHttpClient::new_client(), | ||
| } | ||
| impl NativeCapabilities { |
There was a problem hiding this comment.
I wonder if bundling all capabilities together has a cost. For example, does it mean we need to instantiate an http client even if all we want to do is to sleep?
There was a problem hiding this comment.
If a function just needs to sleep, it should not use the bundle, because the NativeHttp trait is not a ZST. Ideally everything should be a ZST, but even in this case, it's not that big a deal since using the bundle is a convenience option and not an obligation, if you just sleep, the function's bound should just be SleepCapability and it should just be passed the corresponding ZST.
I think there would be a way to have the HttpCapability a ZST, but that would be out-of-scope here
| } | ||
|
|
||
| impl SleepCapability for NativeCapabilities { | ||
| fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + MaybeSend { |
There was a problem hiding this comment.
any reason to not make this an async function instead of the explicit impl Future ? Is it for the MaybeSend bound?
There was a problem hiding this comment.
Yes it is for the MaybeSend bound. But given we are in NativeCapabilities I guess MaybeSend will always be Send, and so we could make it explicit async. Do you think it would be better ?
| /// `T` instead of `Result<T, JoinError>`. | ||
| /// | ||
| /// A `JoinError` means the spawned task panicked or was aborted. Workers use | ||
| /// `CancellationToken` for graceful shutdown, so `JoinError` indicates a bug. |
There was a problem hiding this comment.
It's really a detail, but I think they don't use cancellation tokens any more with the new shared runtime?
| type Output = T; | ||
|
|
||
| fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> { | ||
| // JoinHandle<T>: Unpin, so Pin::new is safe. |
There was a problem hiding this comment.
Nit: the safety comment feels a bit unnecessary. There's no unsafe code here, so it wouldn't actually compile if the precondition was false.
| // JoinHandle<T>: Unpin, so Pin::new is safe. |
| match Pin::new(&mut self.get_mut().0).poll(cx) { | ||
| Poll::Ready(Ok(val)) => Poll::Ready(val), | ||
| // JoinError means the task panicked or was aborted. Both unrecoverable. | ||
| Poll::Ready(Err(e)) => panic!("spawned task failed: {e}"), |
There was a problem hiding this comment.
I agree. We're just wrapping JoinHandle which returns a result, so there's no obvious reason to depart from this behavior and introduce panics instead.
| /// | ||
| /// The runtime handle is passed at spawn time via [`SpawnCapability::RuntimeContext`], | ||
| /// not stored, so the spawner is always using the current (potentially rebuilt) runtime. | ||
| #[cfg(not(target_arch = "wasm32"))] |
There was a problem hiding this comment.
Instead of all those cfg gates, could we move everything spawner-related to a submodule and gate the submodule once and for all instead? as we do with xxx::linux for example elsewhere in libdatadog.
| { | ||
| RuntimeJoinHandle(ctx.spawn(future)) | ||
| } | ||
| } |
There was a problem hiding this comment.
I might be missing something, but when you say:
NativeSpawnCapability lives in libdd-capabilities-impl, which is a leaf crate, libdd-shared-runtime (a core crate) can't depend on it without inverting the layering
Why should shared_runtime depend on it once we move everything to libdd-capabilities-impl? I think (but I'm not sure) @VianneyRuhlmann's question is, or at least the one I have as well, is why this code has to be in shared_runtime at all. It naively looks like it's entirely capability and native-specific, so should live in libdd-capabilities-impl. Do you need some private symbols from here?
| spawner: &S, | ||
| ) -> Result<WorkerHandle, SharedRuntimeError> { | ||
| let boxed_worker: BoxedWorker = Box::new(worker); | ||
| debug!(?boxed_worker, "Spawning worker on SharedRuntime"); |
There was a problem hiding this comment.
Is this a left-over ? (might not be, just asking)
There was a problem hiding this comment.
I did not add this ? I don't know
| Err(PausableWorkerError::TaskAborted) | ||
| } | ||
| let mut worker = handle.await; | ||
| debug!(?worker, "Worker paused successfully"); |
There was a problem hiding this comment.
Same, is this supposed to stay?
| assert!(agent_info::get_agent_info().is_none()); | ||
| let shared_runtime = SharedRuntime::new().unwrap(); | ||
| shared_runtime.spawn_worker(fetcher, true).unwrap(); | ||
| let spawner = NativeCapabilities::new(); |
There was a problem hiding this comment.
Can we remove the spawner and pass capabilities at shared_runtime init?
There was a problem hiding this comment.
Storing the spawner in SharedRuntime would make it generic (SharedRuntime<S>) or require type erasing a non object-safe trait. It's also not straightforward because the fork recovery paths use their own internal RuntimeSpawner, not the caller-provided one. In practice, production callers already have capabilities in scope so passing it to spawn_worker isn't a burden imho
| impl<T: Worker + MaybeSend + Sync + 'static> std::fmt::Debug for PausableWorker<T> { | ||
| fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | ||
| match self { | ||
| Self::Running { .. } => f.debug_struct("PausableWorker::Running").finish(), | ||
| Self::Paused { worker } => f | ||
| .debug_struct("PausableWorker::Paused") | ||
| .field("worker", worker) | ||
| .finish(), | ||
| Self::InvalidState => write!(f, "PausableWorker::InvalidState"), | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Why do you need this ?
There was a problem hiding this comment.
I went from the JoinHandle of tokio to ours, and so we can't derive debug automatically, and WorkerEntry needs debug.
| PausableWorker::Paused { .. } => { | ||
| debug!(?self, "Starting pausable worker"); |
There was a problem hiding this comment.
Try to avoid (or train your ai to avoid) this kind of unrelated changes which clutter the PR. (If it's actually related disregard)
| /// Pause the worker and wait for it to complete, storing its state for restart. | ||
| /// | ||
| /// # Errors | ||
| /// Fails if the worker handle has been aborted preventing the worker from being retrieved. |
| #[cfg(not(target_arch = "wasm32"))] | ||
| pub type SpawnRuntimeContext = tokio::runtime::Handle; | ||
|
|
||
| /// See the non-wasm variant for documentation. | ||
| #[cfg(target_arch = "wasm32")] | ||
| pub type SpawnRuntimeContext = (); |
There was a problem hiding this comment.
Can't you reuse the NativeSpawnCapability associated type ?
| meta: StatsMetadata, | ||
| sequence_id: AtomicU64, | ||
| client: H, | ||
| capabilities: Cap, |
There was a problem hiding this comment.
ultra nit: Since we're going to pass the capabilities in a lot of functions it would be nice to have a "standardized" way of passing it e.g. "always the first param" to make it easier to recognize.
| get_test_metadata(), | ||
| Endpoint::from_url(stats_url_from_agent_url(&server.url("/")).unwrap()), | ||
| NativeCapabilities::new_client(), | ||
| caps.clone(), |
There was a problem hiding this comment.
Maybe implementing Copy on NativeCapabilities would make it more ergonomic (since it's zero sized anyway)
| backoff_type: RetryBackoffType::Constant, | ||
| jitter: None, | ||
| }; | ||
| let sleeper = NativeSleepCapability; |
There was a problem hiding this comment.
nit: I think it's better to always call it cap or capabilities than to use a different name depending on the capabilities available
c6789b6 to
e8146c9
Compare
fix: ZST spawner fix: docs and fixes fix(shared_runtime): implicit debug impl chore: format feat: change the Output to Result feat: mod native cfg gated for better readability chore: revert to main's version of thing
e8146c9 to
1e64b86
Compare
What does this PR do?
Motivation
Before if we were to spawn tasks on wasm, we would have required a current thread tokio runtime blocking JS event loop until the end of the tokio runtime itself. Now in wasm we delegate tasks to the eventloop itself, making the whole thing non blocking, and enabling stuff like sleeping in a JS compatible way.
Additional Notes
/
How to test the change?
DataDog/libdatadog-nodejs#70