diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index a3642794a8..f0711506f2 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -2102,6 +2102,30 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, )(implicit F: Concurrent[F2]): Stream[F2, O2] = merge_(that) { case (s, fin) => s.onFinalize(fin) } + /** Like `mergeAndAwaitDownstream`, but halts as soon as _either_ branch halts. */ + def mergeAndAwaitDownstreamHaltBoth[F2[x] >: F[x]: Concurrent, O2 >: O]( + that: Stream[F2, O2] + ): Stream[F2, O2] = + noneTerminate.mergeAndAwaitDownstream(that.noneTerminate).unNoneTerminate + + /** Like `mergeAndAwaitDownstream`, but halts as soon as the `s1` branch halts. + * + * Note: it is *not* guaranteed that the last element of the stream will come from `s1`. + */ + def mergeAndAwaitDownstreamHaltL[F2[x] >: F[x]: Concurrent, O2 >: O]( + that: Stream[F2, O2] + ): Stream[F2, O2] = + noneTerminate.mergeAndAwaitDownstream(that.map(Some(_))).unNoneTerminate + + /** Like `mergeAndAwaitDownstream`, but halts as soon as the `s2` branch halts. + * + * Note: it is *not* guaranteed that the last element of the stream will come from `s2`. + */ + def mergeAndAwaitDownstreamHaltR[F2[x] >: F[x]: Concurrent, O2 >: O]( + that: Stream[F2, O2] + ): Stream[F2, O2] = + that.mergeAndAwaitDownstreamHaltL(this) + /** Interleaves the two inputs nondeterministically. The output stream * halts after BOTH `s1` and `s2` terminate normally, or in the event * of an uncaught failure on either `s1` or `s2`. Has the property that