Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/blob-rpc-support.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"capnweb": minor
---

Add `Blob` as a serializable type over RPC. `Blob` objects can now be passed as call arguments and return values. The MIME type (`blob.type`) is preserved across the wire.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ The following types can be passed over RPC (in arguments or return values), and
* `Date`
* `Uint8Array`
* `Error` and its well-known subclasses
* `Blob`
* `ReadableStream` and `WritableStream`, with automatic flow control.
* `Headers`, `Request`, and `Response` from the Fetch API.

Expand Down
233 changes: 233 additions & 0 deletions __tests__/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,27 @@ describe("simple serialization", () => {

// =======================================================================================

describe("blob serialization", () => {
it("rejects malformed blob wire values", () => {
// Missing parts.
expect(() => deserialize('["blob"]')).toThrowError();
expect(() => deserialize('["blob","text/plain"]')).toThrowError();
// Non-string MIME type.
expect(() => deserialize('["blob",123,["readable",0]]')).toThrowError();
// Extra parts.
expect(() => deserialize('["blob","text/plain",["readable",0],"extra"]')).toThrowError();
});

it("throws when serializing Blob without an RPC session", () => {
// The encoder always uses a pipe, which requires an active RPC session. `serialize()` routes
// through NULL_EXPORTER and therefore cannot support Blob — same as streams and stubs.
let blob = new Blob(["hello"], {type: "text/plain"});
expect(() => serialize(blob)).toThrowError("Cannot create pipes without an RPC session");
});
});

// =======================================================================================

class TestTransport implements RpcTransport {
constructor(public name: string, private partner?: TestTransport) {
if (partner) {
Expand Down Expand Up @@ -2444,3 +2465,215 @@ describe("Fetch API types over RPC", () => {
expect(result.hasBody).toBe(false);
});
});

// =======================================================================================

describe("Blob over RPC", () => {
it("can send and receive a binary Blob", async () => {
await using harness = new TestHarness(new TestTarget());
let bytes = new TextEncoder().encode("hello from blob");
let blob = new Blob([bytes], {type: "application/octet-stream"});
using result = await harness.stub.echoBlob(blob);
expect(result).toBeInstanceOf(Blob);
expect(result.type).toBe("application/octet-stream");
expect(new Uint8Array(await result.arrayBuffer())).toStrictEqual(bytes);
});

it("preserves Blob MIME type", async () => {
await using harness = new TestHarness(new TestTarget());
let blob = new Blob(["<h1>hello</h1>"], {type: "text/html; charset=utf-8"});
using result = await harness.stub.echoBlob(blob);
expect(result.type).toBe("text/html; charset=utf-8");
expect(await result.text()).toBe("<h1>hello</h1>");
});

it("can send an empty Blob", async () => {
await using harness = new TestHarness(new TestTarget());
let blob = new Blob([], {type: "application/octet-stream"});
using result = await harness.stub.echoBlob(blob);
expect(result).toBeInstanceOf(Blob);
expect(result.size).toBe(0);
expect(result.type).toBe("application/octet-stream");
});

it("can send a Blob as part of a compound return value", async () => {
class BlobServer extends RpcTarget {
makePayload() {
return {
name: "test.txt",
blob: new Blob(["file content"], {type: "text/plain"}),
size: 12,
};
}
}

await using harness = new TestHarness(new BlobServer());
let stub = harness.stub as any;
let result = await stub.makePayload();
expect(result.name).toBe("test.txt");
expect(result.blob).toBeInstanceOf(Blob);
expect(result.blob.type).toBe("text/plain");
expect(await result.blob.text()).toBe("file content");
expect(result.size).toBe(12);
});

it("can send multiple Blobs in the same call", async () => {
// Each Blob produces its own RpcPromise entry in the Evaluator's `promises` list; all must
// resolve before the payload is delivered to user code.
class BlobCombiner extends RpcTarget {
async concatenate(a: Blob, b: Blob) {
let [textA, textB] = await Promise.all([a.text(), b.text()]);
return `${textA}|${textB}`;
}
}

await using harness = new TestHarness(new BlobCombiner());
let stub = harness.stub as any;
let result = await stub.concatenate(
new Blob(["hello"], {type: "text/plain"}),
new Blob(["world"], {type: "text/plain"}),
);
expect(result).toBe("hello|world");
});

it("can receive an array of Blobs in one return value", async () => {
// Multiple RpcPromise entries produced from a single return value, all substituted before
// the array reaches user code.
class BlobFactory extends RpcTarget {
makeBlobs() {
return [
new Blob(["first"], {type: "text/plain"}),
new Blob(["second"], {type: "text/plain"}),
new Blob(["third"], {type: "text/plain"}),
];
}
}

await using harness = new TestHarness(new BlobFactory());
let stub = harness.stub as any;
let [b1, b2, b3] = await stub.makeBlobs();
expect(await b1.text()).toBe("first");
expect(await b2.text()).toBe("second");
expect(await b3.text()).toBe("third");
});

it("round-trips a Blob with no MIME type", async () => {
// new Blob([bytes]) leaves .type as "" — the empty string must survive the round-trip
// and not become undefined or null.
await using harness = new TestHarness(new TestTarget());
let bytes = new TextEncoder().encode("untyped content");
let blob = new Blob([bytes]);
expect(blob.type).toBe("");
using result = await harness.stub.echoBlob(blob);
expect(result.type).toBe("");
expect(new Uint8Array(await result.arrayBuffer())).toStrictEqual(bytes);
});

it("preserves every possible byte value through the pipe", async () => {
// All 256 possible byte values in a single Blob — verifies the pipe mechanism
// neither corrupts nor truncates any byte.
await using harness = new TestHarness(new TestTarget());
let bytes = new Uint8Array(256);
for (let i = 0; i < 256; i++) bytes[i] = i;
let blob = new Blob([bytes], {type: "application/octet-stream"});
using result = await harness.stub.echoBlob(blob);
expect(result.size).toBe(256);
expect(new Uint8Array(await result.arrayBuffer())).toStrictEqual(bytes);
});

it("can send a large Blob over RPC", async () => {
// 1 MB blob — exercises multi-chunk stream collection in streamToBlob().
// Timeout is raised because CI machines can be slow to pump 1 MB through the
// fake in-process transport (default 5 s is too tight on some runners).
// Skipped in workerd: the isolate drops its connection when a large in-process
// stream is pumped through it (infrastructure limit, not a code bug).
if (navigator.userAgent === "Cloudflare-Workers") return;
await using harness = new TestHarness(new TestTarget());
let size = 1024 * 1024;
let bytes = new Uint8Array(size);
for (let i = 0; i < size; i++) bytes[i] = i & 0xff;
let blob = new Blob([bytes], {type: "application/octet-stream"});
using result = await harness.stub.echoBlob(blob);
expect(result.size).toBe(size);
expect(new Uint8Array(await result.arrayBuffer())).toStrictEqual(bytes);
}, 30_000);

it("can pass a Blob through a local (loopback) stub", async () => {
// No network — payload goes through deepCopy() rather than the Evaluator. Blobs are
// immutable so deepCopy() returns them as-is, without going through the pipe path.
using stub = new RpcStub(new TestTarget());
let bytes = new TextEncoder().encode("loopback content");
let blob = new Blob([bytes], {type: "text/plain"});
let result = await stub.echoBlob(blob);
expect(result).toBeInstanceOf(Blob);
expect(result.type).toBe("text/plain");
expect(await result.text()).toBe("loopback content");
result[Symbol.dispose]();
});

it("disposing a result containing a Blob does not throw", async () => {
// Blobs have no owned resources; disposeImpl() must be a silent no-op.
class BlobServer extends RpcTarget {
makeBlob() { return new Blob(["hello"], {type: "text/plain"}); }
}

await using harness = new TestHarness(new BlobServer());
let stub = harness.stub as any;
let result = await stub.makeBlob();
expect(result).toBeInstanceOf(Blob);
// Dispose without reading — should never throw.
expect(() => result[Symbol.dispose]()).not.toThrow();
});

it("is encoded as a readable pipe on the wire", async () => {
// Verify the wire format: ["blob", type, ["readable", pipeId]] — always. There is no inline
// fast path; reading a Blob's bytes is inherently async so we always stream.
class Server extends RpcTarget {
receiveBlob(_blob: Blob) { return "ok"; }
}

let clientTransport = new TestTransport("client");
let serverTransport = new TestTransport("server", clientTransport);

let client = new RpcSession<Server>(clientTransport);
let server = new RpcSession(serverTransport, new Server());

serverTransport.fence();

let stub = client.getRemoteMain();
let blob = new Blob(["hello"], {type: "text/plain"});
let p = stub.receiveBlob(blob);

// The call message is dispatched synchronously (the pipe path does not require pre-reading
// bytes on the sending side), but yield once to be safe across environments.
await Promise.resolve();

let blobExpr: any = undefined;
for (let i = 0; i < serverTransport.pendingCount; i++) {
let msg = JSON.parse((serverTransport as any).queue[i]);
if (msg[0] === "push") {
let findBlob = (v: any): any => {
if (v instanceof Array && v[0] === "blob") return v;
if (v instanceof Array) for (let e of v) { let r = findBlob(e); if (r) return r; }
if (v && typeof v === "object") for (let k in v) { let r = findBlob(v[k]); if (r) return r; }
return undefined;
};
blobExpr = findBlob(msg);
if (blobExpr) break;
}
}

expect(blobExpr).toBeDefined();
expect(blobExpr[0]).toBe("blob");
expect(blobExpr[1]).toBe("text/plain");
expect(blobExpr[2]).toBeInstanceOf(Array);
expect(blobExpr[2][0]).toBe("readable");
expect(typeof blobExpr[2][1]).toBe("number"); // pipe ID

serverTransport.releaseFence();
await p;

stub[Symbol.dispose]();
await pumpMicrotasks();
});
});
5 changes: 5 additions & 0 deletions __tests__/test-util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,9 @@ export class TestTarget extends RpcTarget {
returnNull() { return null; }
returnUndefined() { return undefined; }
returnNumber(i: number) { return i; }

async echoBlob(blob: Blob): Promise<Blob> {
let bytes = await blob.arrayBuffer();
return new Blob([bytes], {type: blob.type});
}
}
11 changes: 9 additions & 2 deletions src/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ export let RpcTarget = workersModule ? workersModule.RpcTarget : class {};
export type PropertyPath = (string | number)[];

type TypeForRpc = "unsupported" | "primitive" | "object" | "function" | "array" | "date" |
"bigint" | "bytes" | "stub" | "rpc-promise" | "rpc-target" | "rpc-thenable" | "error" |
"undefined" | "writable" | "readable" | "headers" | "request" | "response";
"bigint" | "bytes" | "blob" | "stub" | "rpc-promise" | "rpc-target" | "rpc-thenable" |
"error" | "undefined" | "writable" | "readable" | "headers" | "request" | "response";

const AsyncFunction = (async function () {}).constructor;

Expand Down Expand Up @@ -112,6 +112,9 @@ export function typeForRpc(value: unknown): TypeForRpc {
case Response.prototype:
return "response";

case Blob.prototype:
return "blob";

// TODO: All other structured clone types.

case RpcStub.prototype:
Expand Down Expand Up @@ -947,6 +950,7 @@ export class RpcPayload {
case "bigint":
case "date":
case "bytes":
case "blob":
case "error":
case "undefined":
// immutable, no need to copy
Expand Down Expand Up @@ -1271,6 +1275,7 @@ export class RpcPayload {
case "primitive":
case "bigint":
case "bytes":
case "blob":
case "date":
case "error":
case "undefined":
Expand Down Expand Up @@ -1409,6 +1414,7 @@ export class RpcPayload {
case "primitive":
case "bigint":
case "bytes":
case "blob":
case "date":
case "error":
case "undefined":
Expand Down Expand Up @@ -1566,6 +1572,7 @@ function followPath(value: unknown, parent: object | undefined,
case "primitive":
case "bigint":
case "bytes":
case "blob":
case "date":
case "error":
case "headers":
Expand Down
Loading
Loading