Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [14.5.0] - 2026-06-03
- Shared awaitable

## [14.1.0] - 2026-05-10
- Add header-only async building blocks for sender-driven storage code. cqe_state bridges io_uring CQEs to callback-owned operation state; io_uring_scheduler
exposes single-threaded schedule_at() and async_submit() senders on a caller-owned ring, batches submission in poll_once(), and flushes pending SQEs when the
Expand Down
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

class SISLConan(ConanFile):
name = "sisl"
version = "14.4.1"
version = "14.5.0"

homepage = "https://github.com/eBay/sisl"
description = "Library for fast data structures, utilities"
Expand Down
98 changes: 98 additions & 0 deletions include/sisl/async/shared_awaitable.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
#pragma once

#include <coroutine>
#include <mutex>
#include <optional>
#include <utility>
#include <vector>

namespace sisl::async {

// Cross-thread, multi-consumer (broadcast) awaitable carrying a value of type T.
//
// The multi-waiter generalization of value_awaitable: where value_awaitable resumes exactly ONE waiter and
// MOVES its result out (single-shot), shared_awaitable resumes ALL installed waiters and hands each a COPY of
// the result (broadcast). It is the folly::SharedPromise<T> replacement -- the pattern where several callers
// await the same in-flight operation and all observe its single completion (e.g. N threads triggering one
// checkpoint flush, or several openers of one log store). T must therefore be copyable.
//
// USAGE: one producer holds the shared_awaitable (typically inside a std::shared_ptr) and calls complete(value)
// once when the operation finishes. Each consumer co_awaits the SAME object (the object IS the awaitable, like
// value_awaitable) -- usually a coroutine that captures a std::shared_ptr to it so it stays alive across the
// suspend. A consumer that co_awaits AFTER completion takes the fast path and resumes inline.
//
// THREAD SAFETY: complete() and any number of await_suspend()/await_resume() may run on different threads. A
// single mutex guards the waiter list and the result; resumes are performed OUTSIDE the lock (the waiter list
// is swapped out under the lock, then drained) so a resumed coroutine can re-enter (await again, or complete a
// nested awaitable) without self-deadlock. The handshake has no lost-wakeup: a consumer whose await_suspend
// races a concurrent complete() either gets installed-then-resumed, or observes _done and resumes itself --
// exactly once either way.
//
// LIFETIME: the object must outlive every waiter's resume AND any late (post-completion) co_await. complete()
// resumes installed waiters inline, so they have run past await_resume by the time it returns; the canonical
// arrangement is a std::shared_ptr held by BOTH the producer and each awaiting coroutine frame, so the last
// reference (producer or consumer) keeps the result readable.
//
// EXACTLY-ONCE: complete() must be called; a second call is a no-op (the broadcast already fired). await_resume
// copies the result, so the awaitable can be observed any number of times after completion.
//
// EXCEPTION DISCIPLINE: complete() resumes waiters via handle.resume(); an exception escaping a resumed frame
// propagates out of complete() (same contract as value_awaitable). A producer running on a noexcept completion
// boundary must ensure the resumed coroutine bodies cannot throw across it.
template < typename T >
struct shared_awaitable {
mutable std::mutex _mtx{};
bool _done{false};
std::optional< T > _result{};
std::vector< std::coroutine_handle<> > _waiters{};

shared_awaitable() = default;
shared_awaitable(const shared_awaitable&) = delete;
shared_awaitable& operator=(const shared_awaitable&) = delete;

// Producer side (any thread). Publishes the value, marks done, and resumes every installed waiter. The
// waiters are swapped out under the lock and resumed outside it so a resumed coroutine may re-enter safely.
// A second complete() is ignored.
void complete(T value) {
std::vector< std::coroutine_handle<> > to_resume;
{
std::lock_guard lg{_mtx};
if (_done) { return; }
_result.emplace(std::move(value));
_done = true;
to_resume.swap(_waiters);
}
for (auto h : to_resume) {
h.resume();
}
}

// True once complete() has fired. Also the awaiter fast-path check.
[[nodiscard]] bool is_ready() const {
std::lock_guard lg{_mtx};
return _done;
}

// ----- awaiter interface (the object is co_await-ed directly; many coroutines may await one object) -----

[[nodiscard]] bool await_ready() const {
std::lock_guard lg{_mtx};
return _done;
}

// Install this coroutine as a waiter, unless completion already landed (return false -> resume immediately).
bool await_suspend(std::coroutine_handle<> h) {
std::lock_guard lg{_mtx};
if (_done) { return false; }
_waiters.push_back(h);
return true;
}

// Broadcast: hand back a COPY so every waiter observes the result independently.
T await_resume() const {
std::lock_guard lg{_mtx};
return *_result;
}
};

} // namespace sisl::async
7 changes: 7 additions & 0 deletions src/async/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@ target_sources(test_value_awaitable PRIVATE tests/test_value_awaitable.cpp)
target_link_libraries(test_value_awaitable PRIVATE GTest::gtest GTest::gtest_main)
add_test(NAME ValueAwaitable COMMAND test_value_awaitable)

# shared_awaitable is the multi-consumer (broadcast) sibling of value_awaitable (the folly::SharedPromise
# replacement); also header-only and stdexec-free.
add_executable(test_shared_awaitable)
target_sources(test_shared_awaitable PRIVATE tests/test_shared_awaitable.cpp)
target_link_libraries(test_shared_awaitable PRIVATE GTest::gtest GTest::gtest_main)
add_test(NAME SharedAwaitable COMMAND test_shared_awaitable)

find_package(stdexec QUIET)
if (TARGET stdexec::stdexec)
add_executable(test_cqe_awaitable)
Expand Down
135 changes: 135 additions & 0 deletions src/async/tests/test_shared_awaitable.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
// Unit tests for sisl::async::shared_awaitable<T> -- the multi-consumer (broadcast) generalization of
// value_awaitable used to replace folly::SharedPromise<T> (N callers awaiting one in-flight completion).
//
// Mirrors test_value_awaitable.cpp: std::noop_coroutine() stands in for suspended coroutine handles (no real
// frames), and the protocol is driven through the public complete()/await_* API so both completion orderings
// and the broadcast (many waiters, one complete) are exercised deterministically. Header-only, stdexec-free.

#include <atomic>
#include <coroutine>
#include <memory>
#include <string>
#include <thread>
#include <vector>

#include <gtest/gtest.h>

#include <sisl/async/shared_awaitable.hpp>

namespace {

using string_await = sisl::async::shared_awaitable< std::string >;

// not-ready before completion
TEST(shared_awaitable, AwaitReadyFalseWhenNotReady) {
string_await state{};
EXPECT_FALSE(state.await_ready());
EXPECT_FALSE(state.is_ready());
}

// synchronous fast path: completion BEFORE the consumer co_awaits
TEST(shared_awaitable, CompleteThenAwaitReadyTrue) {
string_await state{};
state.complete("hello");
EXPECT_TRUE(state.await_ready());
EXPECT_EQ(state.await_resume(), "hello");
}

// suspend-then-complete: consumer suspends first, completer resumes it
TEST(shared_awaitable, SuspendThenCompleteResumesWaiter) {
string_await state{};
auto const h = std::noop_coroutine();
EXPECT_TRUE(state.await_suspend(h)); // not yet completed -> stay suspended
state.complete("world");
EXPECT_EQ(state.await_resume(), "world");
}

// complete-then-suspend race: completion lands before await_suspend installs the waiter; no lost wakeup.
TEST(shared_awaitable, CompleteThenSuspendDoesNotSuspend) {
string_await state{};
state.complete("early");
auto const h = std::noop_coroutine();
EXPECT_FALSE(state.await_suspend(h)); // do not suspend; result already available
EXPECT_EQ(state.await_resume(), "early");
}

// broadcast: MANY waiters installed before completion are ALL resumed, each observing a copy.
TEST(shared_awaitable, BroadcastResumesAllWaiters) {
sisl::async::shared_awaitable< int > state{};

// A handful of distinct suspended coroutines (noop handles all compare/resume fine).
constexpr int kWaiters = 8;
for (int i = 0; i < kWaiters; ++i) {
EXPECT_TRUE(state.await_suspend(std::noop_coroutine()));
}
state.complete(77); // single completion fans out to all installed waiters

// Every (late) observation returns the same broadcast value -- the result was copied, not moved out.
for (int i = 0; i < kWaiters; ++i) {
EXPECT_EQ(state.await_resume(), 77);
}
}

// second complete() is a no-op (the broadcast already fired)
TEST(shared_awaitable, SecondCompleteIsIgnored) {
sisl::async::shared_awaitable< int > state{};
state.complete(1);
state.complete(2); // ignored
EXPECT_TRUE(state.await_ready());
EXPECT_EQ(state.await_resume(), 1);
}

// copyable payload broadcasts to multiple late observers (the shared_ptr<HomeLogStore> shape)
TEST(shared_awaitable, CopyablePayloadBroadcasts) {
sisl::async::shared_awaitable< std::shared_ptr< int > > state{};
state.complete(std::make_shared< int >(123));
auto a = state.await_resume();
auto b = state.await_resume();
ASSERT_TRUE(a);
ASSERT_TRUE(b);
EXPECT_EQ(a.get(), b.get()); // same underlying object, shared
EXPECT_EQ(*a, 123);
}

// refcounted shared state: the producer drops its ref after completing while a consumer ref keeps it readable.
TEST(shared_awaitable, SharedStateSurvivesProducerDestruction) {
auto state = std::make_shared< string_await >();
{
auto producer_ref = state;
producer_ref->complete("persisted");
}
ASSERT_EQ(state.use_count(), 1);
EXPECT_TRUE(state->await_ready());
EXPECT_EQ(state->await_resume(), "persisted");
}

// Cross-thread broadcast: several threads install waiters while one delivers the completion. Every waiter is
// resumed exactly once and observes the value without a data race. Run under TSAN to catch ordering regressions.
TEST(shared_awaitable, CrossThreadBroadcastIsRaceFree) {
constexpr int kIters = 1000;
for (int i = 0; i < kIters; ++i) {
sisl::async::shared_awaitable< int > state{};
std::atomic< bool > go{false};

std::vector< std::thread > waiters;
for (int w = 0; w < 4; ++w) {
waiters.emplace_back([&] {
while (!go.load(std::memory_order_acquire)) {}
(void)state.await_suspend(std::noop_coroutine());
});
}
std::thread completer([&] {
while (!go.load(std::memory_order_acquire)) {}
state.complete(i);
});

go.store(true, std::memory_order_release);
for (auto& t : waiters) {
t.join();
}
completer.join();
EXPECT_EQ(state.await_resume(), i);
}
}

} // namespace
Loading