Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .github/workflows/scripts/functions/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,10 @@ export { fetchAppCheckTokenV2 } from './fetchAppCheckToken';
export { sendFCM } from './sendFCM';

export { testFetchStream, testFetch } from './vertexaiFunctions';

export {
testStreamingCallable,
testProgressStream,
testComplexDataStream,
testStreamWithError,
} from './testStreamingCallable';
159 changes: 159 additions & 0 deletions .github/workflows/scripts/functions/src/testStreamingCallable.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
import { onCall, CallableRequest, CallableResponse } from 'firebase-functions/v2/https';
import { logger } from 'firebase-functions/v2';

/**
* Test streaming callable function that sends multiple chunks of data
* This function demonstrates Server-Sent Events (SSE) streaming
*/
export const testStreamingCallable = onCall(
async (
req: CallableRequest<{ count?: number; delay?: number }>,
response?: CallableResponse<any>,
) => {
const count = req.data.count || 5;
const delay = req.data.delay || 500;

logger.info('testStreamingCallable called', { count, delay });

// Send chunks of data over time
for (let i = 0; i < count; i++) {
// Wait for the specified delay
await new Promise(resolve => setTimeout(resolve, delay));

if (response) {
await response.sendChunk({
index: i,
message: `Chunk ${i + 1} of ${count}`,
timestamp: new Date().toISOString(),
data: {
value: i * 10,
isEven: i % 2 === 0,
},
});
}
}

// Return final result
return { totalCount: count, message: 'Stream complete' };
},
);

/**
* Test streaming callable that sends progressive updates
*/
export const testProgressStream = onCall(
async (
req: CallableRequest<{ task?: string }>,
response?: CallableResponse<any>,
) => {
const task = req.data.task || 'Processing';

logger.info('testProgressStream called', { task });

const updates = [
{ progress: 0, status: 'Starting...', task },
{ progress: 25, status: 'Loading data...', task },
{ progress: 50, status: 'Processing data...', task },
{ progress: 75, status: 'Finalizing...', task },
{ progress: 100, status: 'Complete!', task },
];

for (const update of updates) {
await new Promise(resolve => setTimeout(resolve, 300));
if (response) {
await response.sendChunk(update);
}
}

return { success: true };
},
);

/**
* Test streaming with complex data types
*/
export const testComplexDataStream = onCall(
async (req: CallableRequest, response?: CallableResponse<any>) => {
logger.info('testComplexDataStream called');

const items = [
{
id: 1,
name: 'Item One',
tags: ['test', 'streaming', 'firebase'],
metadata: {
created: new Date().toISOString(),
version: '1.0.0',
},
},
{
id: 2,
name: 'Item Two',
tags: ['react-native', 'functions'],
metadata: {
created: new Date().toISOString(),
version: '1.0.1',
},
},
{
id: 3,
name: 'Item Three',
tags: ['cloud', 'streaming'],
metadata: {
created: new Date().toISOString(),
version: '2.0.0',
},
},
];

// Stream each item individually
for (const item of items) {
await new Promise(resolve => setTimeout(resolve, 200));
if (response) {
await response.sendChunk(item);
}
}

// Return summary
return {
summary: {
totalItems: items.length,
processedAt: new Date().toISOString(),
},
};
},
);

/**
* Test streaming with error handling
*/
export const testStreamWithError = onCall(
async (
req: CallableRequest<{ shouldError?: boolean; errorAfter?: number }>,
response?: CallableResponse<any>,
) => {
const shouldError = req.data.shouldError !== false;
const errorAfter = req.data.errorAfter || 2;

logger.info('testStreamWithError called', { shouldError, errorAfter });

for (let i = 0; i < 5; i++) {
if (shouldError && i === errorAfter) {
throw new Error('Simulated streaming error after chunk ' + errorAfter);
}

await new Promise(resolve => setTimeout(resolve, 300));
if (response) {
await response.sendChunk({
chunk: i,
message: `Processing chunk ${i + 1}`,
});
}
}

return {
success: true,
message: 'All chunks processed successfully',
};
},
);
119 changes: 116 additions & 3 deletions packages/functions/__tests__/functions.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,17 @@ describe('Cloud Functions', function () {
globalThis.RNFB_SILENCE_MODULAR_DEPRECATION_WARNINGS = false;
});

beforeEach(function () {
// Mock native module for streaming methods
// @ts-ignore test
jest.spyOn(FirebaseModule.prototype, 'native', 'get').mockImplementation(() => {
return {
httpsCallableStream: jest.fn(),
httpsCallableStreamFromUrl: jest.fn(),
};
});
});

it('accessible from firebase.app()', function () {
const app = firebase.app();
expect(app.functions).toBeDefined();
Expand Down Expand Up @@ -77,10 +88,53 @@ describe('Cloud Functions', function () {
'HttpsCallableOptions.timeout expected a Number in milliseconds',
);
});

it('has stream method', function () {
const app = firebase.app();
const callable = app.functions().httpsCallable('example');
expect(callable.stream).toBeDefined();
expect(typeof callable.stream).toBe('function');
});

it('stream method returns unsubscribe function', function () {
const app = firebase.app();
const callable = app.functions().httpsCallable('example');
const unsubscribe = callable.stream({ test: 'data' }, () => {});
expect(typeof unsubscribe).toBe('function');
unsubscribe();
});
});

describe('httpsCallableFromUrl()', function () {
it('has stream method', function () {
const app = firebase.app();
const callable = app.functions().httpsCallableFromUrl('https://example.com/example');
expect(callable.stream).toBeDefined();
expect(typeof callable.stream).toBe('function');
});

it('stream method returns unsubscribe function', function () {
const app = firebase.app();
const callable = app.functions().httpsCallableFromUrl('https://example.com/example');
const unsubscribe = callable.stream({ test: 'data' }, () => {});
expect(typeof unsubscribe).toBe('function');
unsubscribe();
});
});
});

describe('modular', function () {
beforeEach(function () {
// Mock native module for streaming methods
// @ts-ignore test
jest.spyOn(FirebaseModule.prototype, 'native', 'get').mockImplementation(() => {
return {
httpsCallableStream: jest.fn(),
httpsCallableStreamFromUrl: jest.fn(),
};
});
});

it('`getFunctions` function is properly exposed to end user', function () {
expect(getFunctions).toBeDefined();
});
Expand All @@ -101,6 +155,32 @@ describe('Cloud Functions', function () {
expect(HttpsErrorCode).toBeDefined();
});

it('`httpsCallable().stream()` method is properly exposed to end user', function () {
const callable = httpsCallable(getFunctions(), 'example');
expect(callable.stream).toBeDefined();
expect(typeof callable.stream).toBe('function');
});

it('`httpsCallableFromUrl().stream()` method is properly exposed to end user', function () {
const callable = httpsCallableFromUrl(getFunctions(), 'https://example.com/example');
expect(callable.stream).toBeDefined();
expect(typeof callable.stream).toBe('function');
});

it('`httpsCallable().stream()` returns unsubscribe function', function () {
const callable = httpsCallable(getFunctions(), 'example');
const unsubscribe = callable.stream({ test: 'data' }, () => {});
expect(typeof unsubscribe).toBe('function');
unsubscribe();
});

it('`httpsCallableFromUrl().stream()` returns unsubscribe function', function () {
const callable = httpsCallableFromUrl(getFunctions(), 'https://example.com/example');
const unsubscribe = callable.stream({ test: 'data' }, () => {});
expect(typeof unsubscribe).toBe('function');
unsubscribe();
});

describe('types', function () {
it('`HttpsCallableOptions` type is properly exposed to end user', function () {
const options: HttpsCallableOptions = { timeout: 5000 };
Expand All @@ -110,10 +190,18 @@ describe('Cloud Functions', function () {

it('`HttpsCallable` type is properly exposed to end user', function () {
// Type check - this will fail at compile time if type is not exported
const callable: HttpsCallableType<{ test: string }, { result: number }> = async () => {
return { data: { result: 42 } };
};
const callable: HttpsCallableType<{ test: string }, { result: number }> = Object.assign(
async () => {
return { data: { result: 42 } };
},
{
stream: (_data?: any, _onEvent?: any, _options?: any) => {
return () => {};
},
},
);
expect(callable).toBeDefined();
expect(callable.stream).toBeDefined();
});

it('`FunctionsModule` type is properly exposed to end user', function () {
Expand Down Expand Up @@ -191,6 +279,31 @@ describe('Cloud Functions', function () {
'httpsCallableFromUrl',
);
});

it('httpsCallable().stream()', function () {
const functions = (getApp() as unknown as FirebaseApp).functions();
functionsRefV9Deprecation(
() => httpsCallable(functions, 'example').stream({ test: 'data' }, () => {}),
() => functions.httpsCallable('example').stream({ test: 'data' }, () => {}),
'httpsCallable',
);
});

it('httpsCallableFromUrl().stream()', function () {
const functions = (getApp() as unknown as FirebaseApp).functions();
functionsRefV9Deprecation(
() =>
httpsCallableFromUrl(functions, 'https://example.com/example').stream(
{ test: 'data' },
() => {},
),
() =>
functions
.httpsCallableFromUrl('https://example.com/example')
.stream({ test: 'data' }, () => {}),
'httpsCallableFromUrl',
);
});
});
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package io.invertase.firebase.functions;

/*
* Copyright (c) 2016-present Invertase Limited & Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this library except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import com.facebook.react.bridge.Arguments;
import com.facebook.react.bridge.WritableMap;
import io.invertase.firebase.interfaces.NativeEvent;

public class FirebaseFunctionsStreamHandler implements NativeEvent {
static final String FUNCTIONS_STREAMING_EVENT = "functions_streaming_event";
private static final String KEY_ID = "listenerId";
private static final String KEY_BODY = "body";
private static final String KEY_APP_NAME = "appName";
private static final String KEY_EVENT_NAME = "eventName";
private String eventName;
private WritableMap eventBody;
private String appName;
private int listenerId;

FirebaseFunctionsStreamHandler(
String eventName, WritableMap eventBody, String appName, int listenerId) {
this.eventName = eventName;
this.eventBody = eventBody;
this.appName = appName;
this.listenerId = listenerId;
}

@Override
public String getEventName() {
return eventName;
}

@Override
public WritableMap getEventBody() {
WritableMap event = Arguments.createMap();
event.putInt(KEY_ID, listenerId);
event.putMap(KEY_BODY, eventBody);
event.putString(KEY_APP_NAME, appName);
event.putString(KEY_EVENT_NAME, eventName);
return event;
}

@Override
public String getFirebaseAppName() {
return appName;
}
}
Loading
Loading