Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions Tests/AsyncAlgorithmsTests/TestShare.swift
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,55 @@ final class TestShare: XCTestCase {
XCTAssertEqual(results1.withLock { $0 }.sorted(), [1, 2, 3, 4, 5])
XCTAssertEqual(results2.withLock { $0 }.sorted(), [1, 2, 3, 4, 5])
}

func test_share_with_no_buffering() async {
let shared = [1, 2, 3, 4, 5].async.share(bufferingPolicy: .bounded(0))

let expectation1 = XCTestExpectation(description: "Consumer 1 finished looping")
let expectation2 = XCTestExpectation(description: "Consumer 2 finished looping")

let results1 = Mutex([Int]())
let results2 = Mutex([Int]())
let gate1 = Gate()
let gate2 = Gate()

let consumer1 = Task {
var iterator = shared.makeAsyncIterator()
gate2.open()
await gate1.enter()
while let value = await iterator.next(isolation: nil) {
results1.withLock { $0.append(value) }
// Add some delay to consumer 1
try? await Task.sleep(for: .milliseconds(1))
}
expectation1.fulfill()
}

let consumer2 = Task {
var iterator = shared.makeAsyncIterator()
gate1.open()
await gate2.enter()
while let value = await iterator.next(isolation: nil) {
results2.withLock { $0.append(value) }
}
expectation2.fulfill()
}

await fulfillment(of: [expectation1, expectation2], timeout: 5)

await consumer1.value
await consumer2.value

// Both consumers should receive all elements
XCTAssertEqual(results1.withLock { $0 }.sorted(), [1, 2, 3, 4, 5])
XCTAssertEqual(results2.withLock { $0 }.sorted(), [1, 2, 3, 4, 5])
}

func test_share_with_no_buffering_multiple() async {
for _ in 0..<10 {
await test_share_with_no_buffering()
}
}

func test_share_with_unbounded_buffering() async {
let source = [1, 2, 3, 4, 5]
Expand Down