Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion packages/realtime_client/lib/realtime_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@
library;

export 'src/constants.dart'
show RealtimeConstants, RealtimeLogLevel, SocketStates;
show
RealtimeConstants,
RealtimeLogLevel,
RealtimeProtocolVersion,
SocketStates;
export 'src/realtime_channel.dart';
export 'src/realtime_client.dart';
export 'src/realtime_presence.dart';
Expand Down
14 changes: 13 additions & 1 deletion packages/realtime_client/lib/src/constants.dart
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import 'package:realtime_client/src/version.dart';

class Constants {
static const String vsn = '1.0.0';
static const Duration defaultTimeout = Duration(milliseconds: 10000);
static const int defaultHeartbeatIntervalMs = 25000;
static const int wsCloseNormal = 1000;
Expand All @@ -12,6 +11,19 @@ class Constants {

typedef RealtimeConstants = Constants;

enum RealtimeProtocolVersion {
/// Legacy protocol: object-shaped JSON text frames only.
v1('1.0.0'),

/// Positional JSON array text frames plus binary frames.
v2('2.0.0');

const RealtimeProtocolVersion(this.vsn);

/// The value sent as the `vsn` connection parameter.
final String vsn;
}

enum SocketStates {
/// Client attempting to establish a connection
connecting,
Expand Down
25 changes: 19 additions & 6 deletions packages/realtime_client/lib/src/realtime_channel.dart
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,7 @@ class RealtimeChannel {
]
};

final res = await (socket.httpClient?.post ?? post)(
final response = await (socket.httpClient?.post ?? post)(
Uri.parse(broadcastEndpointURL),
headers: headers,
body: json.encode(body),
Expand All @@ -574,13 +574,13 @@ class RealtimeChannel {
onTimeout: () => throw TimeoutException('Request timeout'),
);

if (res.statusCode == 202) {
if (response.statusCode == 202) {
return;
}

String errorMessage = res.reasonPhrase ?? 'Unknown error';
String errorMessage = response.reasonPhrase ?? 'Unknown error';
try {
final errorBody = json.decode(res.body) as Map<String, dynamic>;
final errorBody = json.decode(response.body) as Map<String, dynamic>;
errorMessage = (errorBody['error'] ??
errorBody['message'] ??
errorMessage) as String;
Expand All @@ -592,6 +592,19 @@ class RealtimeChannel {
}

/// Sends a realtime broadcast message.
///
/// With protocol `2.0.0` the message is sent as a positional JSON text frame.
///
/// To send a raw binary payload over a WebSocket binary frame (avoiding JSON
/// encoding on the server), set a `Uint8List` (or any `TypedData`) under the
/// `payload` key:
///
/// ```dart
/// channel.sendBroadcastMessage(
/// event: 'file',
/// payload: {'payload': myUint8List},
/// );
/// ```
Future<ChannelResponse> sendBroadcastMessage({
required String event,
required Map<String, dynamic> payload,
Expand Down Expand Up @@ -643,12 +656,12 @@ class RealtimeChannel {
]
};
try {
final res = await (socket.httpClient?.post ?? post)(
final response = await (socket.httpClient?.post ?? post)(
Uri.parse(broadcastEndpointURL),
headers: headers,
body: json.encode(body),
);
if (200 <= res.statusCode && res.statusCode < 300) {
if (200 <= response.statusCode && response.statusCode < 300) {
completer.complete(ChannelResponse.ok);
} else {
completer.complete(ChannelResponse.error);
Expand Down
136 changes: 80 additions & 56 deletions packages/realtime_client/lib/src/realtime_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import 'package:realtime_client/realtime_client.dart';
import 'package:realtime_client/src/constants.dart';
import 'package:realtime_client/src/message.dart';
import 'package:realtime_client/src/retry_timer.dart';
import 'package:realtime_client/src/serializer.dart';
import 'package:realtime_client/src/websocket/websocket.dart';
import 'package:web_socket_channel/web_socket_channel.dart';

Expand All @@ -18,15 +19,13 @@ typedef WebSocketTransport = WebSocketChannel Function(
Map<String, String> headers,
);

typedef RealtimeEncode = void Function(
dynamic payload,
void Function(String result) callback,
);
/// Serializes an outgoing message into the `String` or binary frame written to
/// the WebSocket.
typedef RealtimeEncode = Object Function(Map<String, dynamic> payload);

typedef RealtimeDecode = void Function(
String payload,
void Function(dynamic result) callback,
);
/// Deserializes a raw incoming WebSocket frame (`String` or binary) into a
/// message map.
typedef RealtimeDecode = Map<String, dynamic> Function(Object payload);

/// Event details for when the connection closed.
class RealtimeCloseEvent {
Expand Down Expand Up @@ -89,13 +88,14 @@ class RealtimeCloseEvent {
/// - Works on all Dart platforms (Flutter mobile/desktop, web, server).
/// - On web, the underlying [WebSocketChannel] uses the browser WebSocket API.
class RealtimeClient {
// This is named `accessTokenValue` in supabase-js
String? accessToken;
List<RealtimeChannel> channels = [];
final String endPoint;

final Map<String, String> headers;
final Map<String, dynamic> params;

final RealtimeProtocolVersion version;
final Duration timeout;
final WebSocketTransport transport;
final Client? httpClient;
Expand All @@ -111,9 +111,10 @@ class RealtimeClient {
/// Unique reference ID for every heartbeat.
int ref = 0;
late RetryTimer reconnectTimer;
void Function(String? kind, String? msg, dynamic data)? logger;
late RealtimeEncode encode;
late RealtimeDecode decode;
void Function(String? kind, String? message, dynamic data)? logger;
static final Serializer _serializer = Serializer();
final RealtimeEncode encode;
final RealtimeDecode decode;
late TimerCalculation reconnectAfterMs;
WebSocketChannel? conn;
List sendBuffer = [];
Expand All @@ -127,7 +128,6 @@ class RealtimeClient {
@Deprecated("No longer used. Will be removed in the next major version.")
int longpollerTimeout = 20000;
SocketStates? connState;
// This is called `accessToken` in realtime-js
Future<String?> Function()? customAccessToken;

/// Initializes the Socket
Expand All @@ -144,15 +144,21 @@ class RealtimeClient {
///
/// [heartbeatIntervalMs] The millisec interval to send a heartbeat message.
///
/// [logger] The optional function for specialized logging, ie: logger: (kind, msg, data) => { console.log(`$kind: $msg`, data) }
/// [logger] The optional function for specialized logging, ie: logger: (kind, message, data) => { console.log(`$kind: $message`, data) }
///
/// [encode] The function to encode outgoing messages. Defaults to JSON: (payload, callback) => callback(JSON.stringify(payload))
/// [encode] Overrides how outgoing messages are serialized, for example to
/// use a faster JSON implementation. Defaults to the codec for [version].
///
/// [decode] The function to decode incoming messages. Defaults to JSON: (payload, callback) => callback(JSON.parse(payload))
/// [decode] Overrides how incoming frames are deserialized. Defaults to the
/// codec for [version].
///
/// [reconnectAfterMs] The optional function that returns the millsec reconnect interval. Defaults to stepped backoff off.
///
/// [logLevel] Specifies the log level for the connection on the server.
///
/// [version] The Realtime protocol version. Defaults to
/// [RealtimeProtocolVersion.v2]; pass [RealtimeProtocolVersion.v1] for the
/// legacy object-shaped JSON frames.
RealtimeClient(
String endPoint, {
WebSocketTransport? transport,
Expand All @@ -168,6 +174,7 @@ class RealtimeClient {
RealtimeLogLevel? logLevel,
this.httpClient,
this.customAccessToken,
this.version = RealtimeProtocolVersion.v2,
}) : endPoint = Uri.parse('$endPoint/${Transports.websocket}')
.replace(
queryParameters:
Expand All @@ -178,7 +185,15 @@ class RealtimeClient {
...Constants.defaultHeaders,
if (headers != null) ...headers,
},
transport = transport ?? createWebSocketClient {
transport = transport ?? createWebSocketClient,
encode = encode ??
(version == RealtimeProtocolVersion.v1
? _encodeLegacy
: _serializer.encode),
decode = decode ??
(version == RealtimeProtocolVersion.v1
? _decodeLegacy
: _serializer.decode) {
_log.config(
'Initialize RealtimeClient with endpoint: $endPoint, timeout: $timeout, heartbeatIntervalMs: $heartbeatIntervalMs, logLevel: $logLevel');
_log.finest('Initialize with headers: $headers, params: $params');
Expand All @@ -187,12 +202,6 @@ class RealtimeClient {

this.reconnectAfterMs =
reconnectAfterMs ?? RetryTimer.createRetryFunction();
this.encode = encode ??
(dynamic payload, Function(String result) callback) =>
callback(json.encode(payload));
this.decode = decode ??
(String payload, Function(dynamic result) callback) =>
callback(json.decode(payload));
reconnectTimer = RetryTimer(
() async {
await disconnect();
Expand Down Expand Up @@ -244,8 +253,7 @@ class RealtimeClient {

_onConnOpen();
localConn.stream.listen(
// incoming messages
(message) => onConnMessage(message as String),
(message) => onConnMessage(message),
onError: _onConnError,
onDone: () {
// communication has been closed
Expand Down Expand Up @@ -322,9 +330,12 @@ class RealtimeClient {
///
/// [level] must be [Level.FINEST] for senitive data
void log(
[String? kind, String? msg, dynamic data, Level level = Level.FINEST]) {
_log.log(level, '$kind: $msg', data);
logger?.call(kind, msg, data);
[String? kind,
String? message,
dynamic data,
Level level = Level.FINEST]) {
_log.log(level, '$kind: $message', data);
logger?.call(kind, message, data);
}

/// Registers callbacks for connection state change events
Expand Down Expand Up @@ -394,7 +405,7 @@ class RealtimeClient {
/// If the socket is not connected, the message gets enqueued within a local buffer, and sent out when a connection is next established.
String? push(Message message) {
void callback() {
encode(message.toJson(), (result) => conn?.sink.add(result));
conn?.sink.add(encode(message.toJson()));
}

log('push', '${message.topic} ${message.event} (${message.ref})',
Expand All @@ -408,39 +419,52 @@ class RealtimeClient {
return null;
}

void onConnMessage(String rawMessage) {
decode(rawMessage, (msg) {
final topic = msg['topic'] as String;
final event = msg['event'] as String;
final payload = msg['payload'];
final ref = msg['ref'] as String?;
if (ref != null && ref == pendingHeartbeatRef) {
pendingHeartbeatRef = null;
}
void onConnMessage(Object rawMessage) {
final Map<String, dynamic> message;
try {
message = decode(rawMessage);
} catch (error) {
log('transport', 'failed to decode message', error);
return;
}

log(
'receive',
"${payload['status'] ?? ''} $topic $event ${ref != null ? '($ref)' : ''}",
payload,
);
final topic = message['topic'] as String;
final event = message['event'] as String;
final payload = message['payload'];
final ref = message['ref'] as String?;
if (ref != null && ref == pendingHeartbeatRef) {
pendingHeartbeatRef = null;
}

channels
.where((channel) => channel.isMember(topic))
.forEach((channel) => channel.trigger(
event,
payload,
ref,
));
for (final callback in stateChangeCallbacks['message']!) {
callback(msg);
}
});
final status = payload is Map ? (payload['status'] ?? '') : '';
log(
'receive',
"$status $topic $event ${ref != null ? '($ref)' : ''}",
payload,
);

channels.where((channel) => channel.isMember(topic)).forEach(
(channel) => channel.trigger(
event,
payload,
ref,
),
);
for (final callback in stateChangeCallbacks['message']!) {
callback(message);
}
}

static Object _encodeLegacy(Map<String, dynamic> message) =>
jsonEncode(message);

static Map<String, dynamic> _decodeLegacy(Object rawMessage) =>
Map<String, dynamic>.from(jsonDecode(rawMessage as String) as Map);

/// Returns the URL of the websocket.
String get endPointURL {
final params = Map<String, String>.from(this.params);
params['vsn'] = Constants.vsn;
params['vsn'] = version.vsn;
return _appendParams(endPoint, params);
}

Expand Down Expand Up @@ -577,7 +601,7 @@ class RealtimeClient {
pendingHeartbeatRef = null;
log(
'transport',
'heartbeat timeout. Attempting to re-establish connection',
'heartbeat timeout. Attempting to re-establish conn',
);
conn?.sink.close(Constants.wsCloseNormal, 'heartbeat timeout');
return;
Expand Down
Loading
Loading