StreamBuffer is a single-class Java library that bridges an OutputStream and an InputStream through a dynamic, unbounded FIFO queue — solving the fixed-buffer and potential deadlock limitations of Java's built-in PipedInputStream/PipedOutputStream.
Pull it from the central Maven repositories:
<dependency>
<groupId>net.ladenthin</groupId>
<artifactId>streambuffer</artifactId>
<version>1.2.0</version>
</dependency>StreamBuffer sb = new StreamBuffer();
OutputStream os = sb.getOutputStream();
InputStream is = sb.getInputStream();
os.write(new byte[]{1, 2, 3});
byte[] buf = new byte[3];
is.read(buf); // reads [1, 2, 3]
sb.close();Closing via sb.close(), os.close(), or is.close() all close the entire buffer.
Since JDK 1.0, PipedInputStream/PipedOutputStream has been available for connecting streams across threads, but it has notable limitations:
- Write operations block when the fixed circular buffer is full, potentially deadlocking the writing thread.
- The circular buffer has a fixed, 32-bit maximum size and does not grow or shrink dynamically.
- Every written byte array is copied into the circular buffer, doubling memory usage in the worst case.
- The buffer does not shrink after reads — a large circular buffer stays large.
- A pipe is considered broken if the thread that provided data is no longer alive, even if data remains in the buffer.
It is also not recommended to use both ends of a pipe from a single thread, as this may deadlock.
StreamBuffer holds references to written byte arrays directly in a Deque<byte[]> (FIFO), without copying data into a secondary circular buffer. This avoids the fixed-size constraint and eliminates write-blocking caused by buffer saturation.
Instead of a fixed circular buffer, StreamBuffer uses a Deque<byte[]> that grows as data is written and shrinks as data is read. There is no upper bound on buffered data size at the FIFO level.
StreamBuffer exposes a standard InputStream (via getInputStream()) and a standard OutputStream (via getOutputStream()). Both are full subclass implementations and can be used anywhere those types are accepted.
The InputStream and OutputStream can be used concurrently from different threads without additional synchronization:
- All
Dequeaccesses are guarded by abufferLockobject. - State fields (
streamClosed,safeWrite,availableBytes,positionAtCurrentBufferEntry,maxBufferElements,maxAllocationSize,isTrimRunning,maxObservedBytes,totalBytesWritten,totalBytesRead) arevolatile. - A
Semaphore signalModificationblocks reading threads until data is written or the stream is closed, avoiding busy-waiting. External semaphores can be registered viaaddSignalfor thread-decoupled notification.
In contrast to PipedOutputStream, write operations on StreamBuffer never block or deadlock — the FIFO grows as needed.
StreamBuffer tracks a positionAtCurrentBufferEntry index within the head byte array. Partial reads consume bytes from the current head entry without copying the remaining data. The trim operation accounts for this index and only copies the unread remainder.
By default, StreamBuffer stores a direct reference to the written byte array (safeWrite = false). If the caller modifies the array after writing, the buffered content may be affected.
Enable safe write mode to clone every written byte array before buffering:
sb.setSafeWrite(true);When writing with an offset (e.g., write(b, off, len)), a new byte array of exactly len bytes is always created, regardless of the safeWrite setting, since only the relevant portion is stored.
When the Deque grows beyond maxBufferElements entries (default: 100), the next write operation consolidates all buffered data into a single byte array. This bounds the number of FIFO elements and can improve read performance for large accumulated buffers.
sb.setMaxBufferElements(50); // trim when more than 50 elements are queued
sb.setMaxBufferElements(0); // disable trimming entirelyTrimming is triggered by writes, not by setMaxBufferElements. The trim internally bypasses safeWrite (via an ignoreSafeWrite flag) because the byte arrays it produces are not reachable from outside the buffer.
If maxAllocationSize is set (see below), trim may produce multiple smaller chunks instead of one. Trim is also skipped automatically when consolidation would not reduce the chunk count below maxBufferElements — for example, when maxAllocationSize is small enough that the resulting chunk count would still exceed the threshold. This prevents repeated no-op trims on every write.
Use isTrimRunning() to observe whether a trim is currently executing. This value is volatile and can change at any time in concurrent scenarios.
available() returns Integer.MAX_VALUE when the number of buffered bytes exceeds Integer.MAX_VALUE, correctly handling buffers larger than 2 GB.
Register external Semaphore objects to receive thread-decoupled notifications when the buffer is modified (data written or stream closed). Each registered semaphore is released using the same "max 1 permit" pattern as the internal reader/writer semaphore, enabling observers to block in their own threads and wake up when something changes:
Semaphore mySignal = new Semaphore(0);
sb.addSignal(mySignal);
// Observer's own thread:
while (!done) {
mySignal.acquire(); // blocks until writer signals
// process in MY thread — fully decoupled from writer
if (sb.isClosed()) {
done = true;
}
}API:
| Method | Description |
|---|---|
addSignal(Semaphore) |
Registers an external semaphore; throws NullPointerException if null |
removeSignal(Semaphore) |
Removes a semaphore; returns false if not found or null |
For trim lifecycle events specifically, see Trim Observer Signals below.
Signals are stored in a CopyOnWriteArrayList for thread-safe iteration. The "max 1 permit" pattern means rapid writes collapse into a single wake-up — the observer should check buffer state (e.g., isClosed(), available()) after waking to determine what changed.
StreamBuffer tracks cumulative I/O statistics for the bytes flowing through the user-facing API. Internal reads and writes performed by the trim operation are excluded — isTrimRunning is checked before updating the counters so the statistics always reflect user I/O only.
StreamBuffer sb = new StreamBuffer();
OutputStream os = sb.getOutputStream();
InputStream is = sb.getInputStream();
os.write(new byte[]{1, 2, 3}); // totalBytesWritten = 3
byte[] buf = new byte[2];
is.read(buf); // totalBytesRead = 2
System.out.println(sb.getTotalBytesWritten()); // 3
System.out.println(sb.getTotalBytesRead()); // 2
System.out.println(sb.getMaxObservedBytes()); // 3 (peak bytes in buffer)API:
| Method | Description |
|---|---|
getTotalBytesWritten() |
Cumulative bytes written by user I/O operations (excludes internal trim) |
getTotalBytesRead() |
Cumulative bytes consumed by user reads (excludes internal trim) |
getMaxObservedBytes() |
Peak value of availableBytes ever observed |
By default trim consolidates all buffered data into a single byte array. maxAllocationSize limits how large each individual byte array can be during consolidation. When availableBytes > maxAllocationSize, trim produces multiple smaller chunks:
sb.setMaxAllocationSize(1024 * 1024); // cap each consolidated chunk at 1 MiB
long current = sb.getMaxAllocationSize();- Default:
Integer.MAX_VALUE(effectively one chunk per trim pass). - Throws
IllegalArgumentExceptionif the value is ≤ 0. - Trim is skipped automatically when the resulting chunk count would not be smaller than the current
Dequesize — this prevents repeated no-op trims whenmaxAllocationSizeis very small.
Register Semaphore objects to be notified when a trim cycle starts or ends. This uses the same "max 1 permit" semaphore pattern as the general modification signals:
Semaphore trimStarted = new Semaphore(0);
Semaphore trimEnded = new Semaphore(0);
sb.addTrimStartSignal(trimStarted);
sb.addTrimEndSignal(trimEnded);
// Observer thread:
trimStarted.acquire(); // blocks until trim begins
// ... trim is running ...
trimEnded.acquire(); // blocks until trim finishesAPI:
| Method | Description |
|---|---|
addTrimStartSignal(Semaphore) |
Registers a semaphore released when trim starts; throws NullPointerException if null |
removeTrimStartSignal(Semaphore) |
Removes a trim-start semaphore; returns false if not found or null |
addTrimEndSignal(Semaphore) |
Registers a semaphore released when trim ends; throws NullPointerException if null |
removeTrimEndSignal(Semaphore) |
Removes a trim-end semaphore; returns false if not found or null |
public class StreamBuffer implements Closeable| Method | Description |
|---|---|
StreamBuffer() |
Constructs a new buffer |
getInputStream() |
Returns the InputStream end |
getOutputStream() |
Returns the OutputStream end |
close() |
Closes both ends of the buffer |
isClosed() |
Returns true if the buffer is closed |
isSafeWrite() |
Returns the current safeWrite flag |
setSafeWrite(boolean) |
Enables or disables safe write (byte array cloning) |
getMaxBufferElements() |
Returns the current trim threshold |
setMaxBufferElements(int) |
Sets the trim threshold; <= 0 disables trimming |
getBufferSize() |
Returns the current number of byte array entries in the FIFO (legacy) |
getBufferElementCount() |
Returns the current number of byte arrays in the internal queue (synchronized) |
getTotalBytesWritten() |
Cumulative bytes written by user I/O (excludes internal trim) |
getTotalBytesRead() |
Cumulative bytes consumed by user reads (excludes internal trim) |
getMaxObservedBytes() |
Peak value of available bytes ever observed |
getMaxAllocationSize() |
Returns the maximum byte-array size used during trim consolidation |
setMaxAllocationSize(long) |
Sets the maximum allocation size; throws IllegalArgumentException if ≤ 0 |
isTrimRunning() |
true while trim is executing; volatile, may change at any time |
addSignal(Semaphore) |
Registers an external semaphore for thread-decoupled notification |
removeSignal(Semaphore) |
Removes a registered semaphore |
addTrimStartSignal(Semaphore) |
Registers a semaphore released when trim starts |
removeTrimStartSignal(Semaphore) |
Removes a trim-start semaphore |
addTrimEndSignal(Semaphore) |
Registers a semaphore released when trim ends |
removeTrimEndSignal(Semaphore) |
Removes a trim-end semaphore |
blockDataAvailable() |
Deprecated. Blocks until at least one byte is available |
public static boolean correctOffsetAndLengthToRead(byte[] b, int off, int len)
public static boolean correctOffsetAndLengthToWrite(byte[] b, int off, int len)Both methods mirror the parameter validation performed by InputStream.read(byte[], int, int) and OutputStream.write(byte[], int, int). They throw NullPointerException for null arrays, IndexOutOfBoundsException for invalid offsets or lengths (including integer overflow: off + len < 0), and return false for zero-length operations.
External observers register java.util.concurrent.Semaphore objects via addSignal(Semaphore). When the buffer is modified (write or close), each registered semaphore is released using the "max 1 permit" pattern — a permit is released only if the semaphore currently has zero permits. The observer blocks on semaphore.acquire() in its own thread and wakes up when data is available or the stream is closed. This provides full thread decoupling between writer and observer.
If no data is available and the stream is not closed, read() blocks the calling thread. To avoid blocking, only read as many bytes as available() reports. The blockDataAvailable() method (deprecated) can be used to wait before reading; tryWaitForEnoughBytes is the internal successor.
Write operations never block, regardless of how much data is already buffered.
streambuffer ships with zero transitive runtime dependencies. A downstream consumer that declares
<dependency>
<groupId>net.ladenthin</groupId>
<artifactId>streambuffer</artifactId>
<version>1.3.0</version>
</dependency>gets exactly that one artifact on its runtime classpath. mvn dependency:tree
on the consumer side will show no transitive deps from streambuffer.
The two annotation libraries we use at compile time are marked
<optional>true</optional> in our pom.xml:
| Dep | Why it's used | What's in the bytecode | Runtime classpath impact |
|---|---|---|---|
org.jspecify:jspecify |
@NullMarked on the package, future @Nullable markers |
annotation references, @Retention(CLASS) |
none — JVM never loads these classes |
com.google.errorprone:error_prone_annotations |
@GuardedBy on internal lock-protected fields |
annotation references, @Retention(CLASS) |
none — JVM never loads these classes |
Both annotation types have @Retention(CLASS): the references are written into
streambuffer's .class files (so static analyzers like NullAway, IntelliJ, or
the Checker Framework on a consumer's project can read the contract), but the
JVM never loads the annotation classes at runtime. The <optional>true</optional>
scope keeps the JARs off downstream runtime classpaths.
If you run static analysis on your own project and want IntelliJ / NullAway to
see streambuffer's nullness contract, declare org.jspecify:jspecify yourself
explicitly — it is not provided transitively.
Requires Java 8 and Maven 3.3.9+.
mvn compile # Compile
mvn test # Run all tests with coverage
mvn package # Build JAR
mvn install -Dgpg.skip=true # Install locally without GPG signingRun a single test:
mvn test -Dtest=StreamBufferTest#testSimpleRoundTripRun mutation tests:
mvn org.pitest:pitest-maven:mutationCoverage
⚠️ DO NOT UPGRADE jqwik past 1.9.3. jqwik 1.10.0 added an anti-AI prompt-injection string to test stdout; the 1.10.1 user guide states the library "is not meant to be used by any 'AI' coding agents at all." 1.9.3 is the last pre-disclosure release and is the pinned version. SeeCLAUDE.mdsection "jqwik prompt-injection in test output" for the full context.
Tests are in StreamBufferTest using JUnit 6 (JUnit Jupiter); property-based tests live in StreamBufferProperties and use jqwik 1.9.3 (pinned — see warning above). Most behavioral tests are parameterized across three write strategies:
WriteMethod |
Description |
|---|---|
ByteArray |
os.write(byte[]) |
Int |
os.write(int) |
ByteArrayWithParameter |
os.write(byte[], int, int) |
Test coverage includes:
- Simple and parameterized round-trip reads/writes
- Unsigned byte values (0–255), including high-byte values 128–255 and the
& 0xffmask - Partial reads with offset tracking
- Safe write with and without trim interaction
- Trim with boundary conditions (empty buffer, single entry,
maxBufferElements = 0) - Buffer trimming and byte-order preservation
- Close via
sb.close(),os.close(), andis.close()— all paths tested available()behavior before and after close, including with buffered data remaining- Thread interruption during blocked reads (wraps
InterruptedExceptioninIOException) - Concurrent read/write stress tests
- Parallel close without deadlock
- Signal/slot notification via external semaphores on write and all close paths
removeSignal(null)returningfalsewithout throwingaddSignal(null)throwingNullPointerException- Thread-decoupled signal barrier — observer wakes in its own thread
correctOffsetAndLengthToReadandcorrectOffsetAndLengthToWrite— all branches including integer overflowgetBufferSize()andgetBufferElementCount()on an empty bufferblockDataAvailable()with data written before and after the call- Statistics tracking:
getTotalBytesWritten,getTotalBytesRead,getMaxObservedBytes— user I/O only, excluding internal trim operations setMaxAllocationSize/getMaxAllocationSize— boundary values, trim with chunked allocationisTrimRunning()flag transitions during concurrent trim execution- Trim observer signals (
addTrimStartSignal,addTrimEndSignal) — semaphore released at correct lifecycle points - Configuration changes during active trim (
setMaxBufferElements,setMaxAllocationSize) — verified not to affect running trim - Concurrent close during active trim — no exceptions or deadlock
decideTrimExecutionpure function — comprehensive table-driven tests covering all boundary conditions and the smart-skip edge case
Code is under the Apache Licence v2.