diff --git a/CHANGELOG.md b/CHANGELOG.md index 8e6f6f47..4f120643 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # Change Log +## 24.0.0 + +* Breaking: Added `unsubscribe()`, `update()`, and `close()` for Realtime subscription lifecycle. +* Added: Added `userPhone` to the `Membership` model. +* Updated: Updated `X-Appwrite-Response-Format` header to `1.9.2`. + ## 23.1.0 * Added `x` OAuth provider to `OAuthProvider` enum diff --git a/README.md b/README.md index a2f1e3ab..1e2528ba 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ ![Maven Central](https://img.shields.io/maven-central/v/io.appwrite/sdk-for-android.svg?color=green&style=flat-square) ![License](https://img.shields.io/github/license/appwrite/sdk-for-android.svg?style=flat-square) -![Version](https://img.shields.io/badge/api%20version-1.9.1-blue.svg?style=flat-square) +![Version](https://img.shields.io/badge/api%20version-1.9.2-blue.svg?style=flat-square) [![Build Status](https://img.shields.io/travis/com/appwrite/sdk-generator?style=flat-square)](https://travis-ci.com/appwrite/sdk-generator) [![Twitter Account](https://img.shields.io/twitter/follow/appwrite?color=00acee&label=twitter&style=flat-square)](https://twitter.com/appwrite) [![Discord](https://img.shields.io/discord/564160730845151244?label=discord&style=flat-square)](https://appwrite.io/discord) @@ -38,7 +38,7 @@ repositories { Next, add the dependency to your project's `build.gradle(.kts)` file: ```groovy -implementation("io.appwrite:sdk-for-android:23.1.0") +implementation("io.appwrite:sdk-for-android:24.0.0") ``` ### Maven @@ -49,7 +49,7 @@ Add this to your project's `pom.xml` file: io.appwrite sdk-for-android - 23.1.0 + 24.0.0 ``` diff --git a/library/src/main/java/io/appwrite/Client.kt b/library/src/main/java/io/appwrite/Client.kt index 4eb6a51c..ccd93894 100644 --- a/library/src/main/java/io/appwrite/Client.kt +++ b/library/src/main/java/io/appwrite/Client.kt @@ -87,8 +87,8 @@ class Client @JvmOverloads constructor( "x-sdk-name" to "Android", "x-sdk-platform" to "client", "x-sdk-language" to "android", - "x-sdk-version" to "23.1.0", - "x-appwrite-response-format" to "1.9.1" + "x-sdk-version" to "24.0.0", + "x-appwrite-response-format" to "1.9.2" ) config = mutableMapOf() diff --git a/library/src/main/java/io/appwrite/models/Membership.kt b/library/src/main/java/io/appwrite/models/Membership.kt index 4a899eec..eb13be3b 100644 --- a/library/src/main/java/io/appwrite/models/Membership.kt +++ b/library/src/main/java/io/appwrite/models/Membership.kt @@ -43,6 +43,12 @@ data class Membership( @SerializedName("userEmail") val userEmail: String, + /** + * User phone number. Hide this attribute by toggling membership privacy in the Console. + */ + @SerializedName("userPhone") + val userPhone: String, + /** * Team ID. */ @@ -93,6 +99,7 @@ data class Membership( "userId" to userId as Any, "userName" to userName as Any, "userEmail" to userEmail as Any, + "userPhone" to userPhone as Any, "teamId" to teamId as Any, "teamName" to teamName as Any, "invited" to invited as Any, @@ -114,6 +121,7 @@ data class Membership( userId = map["userId"] as String, userName = map["userName"] as String, userEmail = map["userEmail"] as String, + userPhone = map["userPhone"] as String, teamId = map["teamId"] as String, teamName = map["teamName"] as String, invited = map["invited"] as String, diff --git a/library/src/main/java/io/appwrite/models/RealtimeModels.kt b/library/src/main/java/io/appwrite/models/RealtimeModels.kt index 93131f0c..23caa5ae 100644 --- a/library/src/main/java/io/appwrite/models/RealtimeModels.kt +++ b/library/src/main/java/io/appwrite/models/RealtimeModels.kt @@ -3,9 +3,29 @@ package io.appwrite.models import kotlin.collections.Collection import java.io.Closeable +data class RealtimeSubscriptionUpdate( + val channels: Collection? = null, + val queries: Collection? = null +) + data class RealtimeSubscription( + /** + * Remove this subscription only. The WebSocket stays open so other subscriptions keep + * receiving events — use [Realtime.disconnect] for full teardown. + */ + val unsubscribe: () -> Unit, + + /** + * Replace the channels and/or queries on this subscription without recreating it. + */ + val update: (RealtimeSubscriptionUpdate) -> Unit, + private val close: () -> Unit ) : Closeable { + /** + * Alias of [unsubscribe] that also tears the socket down when this was the last active + * subscription. Prefer [unsubscribe] plus [Realtime.disconnect] for explicit control. + */ override fun close() = close.invoke() } diff --git a/library/src/main/java/io/appwrite/services/Account.kt b/library/src/main/java/io/appwrite/services/Account.kt index a288fc10..bb9bb4fd 100644 --- a/library/src/main/java/io/appwrite/services/Account.kt +++ b/library/src/main/java/io/appwrite/services/Account.kt @@ -2242,4 +2242,4 @@ class Account(client: Client) : Service(client) { } -} \ No newline at end of file +} diff --git a/library/src/main/java/io/appwrite/services/Avatars.kt b/library/src/main/java/io/appwrite/services/Avatars.kt index 7694c0d1..004440f3 100644 --- a/library/src/main/java/io/appwrite/services/Avatars.kt +++ b/library/src/main/java/io/appwrite/services/Avatars.kt @@ -347,4 +347,4 @@ class Avatars(client: Client) : Service(client) { } -} \ No newline at end of file +} diff --git a/library/src/main/java/io/appwrite/services/Databases.kt b/library/src/main/java/io/appwrite/services/Databases.kt index 2b1b3d18..2fc95889 100644 --- a/library/src/main/java/io/appwrite/services/Databases.kt +++ b/library/src/main/java/io/appwrite/services/Databases.kt @@ -859,4 +859,4 @@ class Databases(client: Client) : Service(client) { nestedType = classOf(), ) -} \ No newline at end of file +} diff --git a/library/src/main/java/io/appwrite/services/Functions.kt b/library/src/main/java/io/appwrite/services/Functions.kt index 279d3127..12a5fee5 100644 --- a/library/src/main/java/io/appwrite/services/Functions.kt +++ b/library/src/main/java/io/appwrite/services/Functions.kt @@ -137,4 +137,4 @@ class Functions(client: Client) : Service(client) { } -} \ No newline at end of file +} diff --git a/library/src/main/java/io/appwrite/services/Graphql.kt b/library/src/main/java/io/appwrite/services/Graphql.kt index c168f5fd..1c714748 100644 --- a/library/src/main/java/io/appwrite/services/Graphql.kt +++ b/library/src/main/java/io/appwrite/services/Graphql.kt @@ -78,4 +78,4 @@ class Graphql(client: Client) : Service(client) { } -} \ No newline at end of file +} diff --git a/library/src/main/java/io/appwrite/services/Locale.kt b/library/src/main/java/io/appwrite/services/Locale.kt index 95e7d96b..d6acb84d 100644 --- a/library/src/main/java/io/appwrite/services/Locale.kt +++ b/library/src/main/java/io/appwrite/services/Locale.kt @@ -240,4 +240,4 @@ class Locale(client: Client) : Service(client) { } -} \ No newline at end of file +} diff --git a/library/src/main/java/io/appwrite/services/Messaging.kt b/library/src/main/java/io/appwrite/services/Messaging.kt index 82213f36..ee2444d9 100644 --- a/library/src/main/java/io/appwrite/services/Messaging.kt +++ b/library/src/main/java/io/appwrite/services/Messaging.kt @@ -82,4 +82,4 @@ class Messaging(client: Client) : Service(client) { } -} \ No newline at end of file +} diff --git a/library/src/main/java/io/appwrite/services/Realtime.kt b/library/src/main/java/io/appwrite/services/Realtime.kt index 8f88864f..841fe279 100644 --- a/library/src/main/java/io/appwrite/services/Realtime.kt +++ b/library/src/main/java/io/appwrite/services/Realtime.kt @@ -3,11 +3,13 @@ package io.appwrite.services import io.appwrite.Service import io.appwrite.Client import io.appwrite.Channel +import io.appwrite.ID import io.appwrite.Query import io.appwrite.exceptions.AppwriteException import io.appwrite.extensions.forEachAsync import io.appwrite.extensions.fromJson import io.appwrite.extensions.jsonCast +import io.appwrite.extensions.toJson import io.appwrite.models.* import kotlinx.coroutines.* import kotlinx.coroutines.Dispatchers.IO @@ -35,23 +37,18 @@ class Realtime(client: Client) : Service(client), CoroutineScope { private const val TYPE_ERROR = "error" private const val TYPE_EVENT = "event" private const val TYPE_PONG = "pong" + private const val TYPE_RESPONSE = "response" private const val HEARTBEAT_INTERVAL = 20_000L // 20 seconds private var socket: RealWebSocket? = null - // Slot-centric state: Map - private val activeSubscriptions = ConcurrentHashMap() - // Map slot index -> subscriptionId (from backend) - private val slotToSubscriptionId = ConcurrentHashMap() - // Inverse map: subscriptionId -> slot index (for O(1) lookup) - private val subscriptionIdToSlot = ConcurrentHashMap() + private val activeSubscriptions = ConcurrentHashMap() + private val pendingSubscribes = LinkedHashMap>() private var reconnectAttempts = 0 - private val subscriptionsCounter = AtomicInteger(0) private val socketGeneration = AtomicInteger(0) private var reconnect = true private var heartbeatJob: Job? = null - // Lock to coordinate multi-map updates (activeSubscriptions, slotToSubscriptionId, subscriptionIdToSlot) private val subscriptionLock = Any() } @@ -61,51 +58,14 @@ class Realtime(client: Client) : Service(client), CoroutineScope { val request: Request val newSocket: RealWebSocket? synchronized(subscriptionLock) { - // Rebuild activeChannels from all slots - val allChannels = mutableSetOf() - activeSubscriptions.values.forEach { subscription -> - allChannels.addAll(subscription.channels) - } - - if (allChannels.isEmpty()) { + if (activeSubscriptions.isEmpty()) { reconnect = false closeSocket() return } val encodedProject = java.net.URLEncoder.encode(client.config["project"].toString(), "UTF-8") - var queryParams = "project=$encodedProject" - - allChannels.forEach { channel -> - val encodedChannel = java.net.URLEncoder.encode(channel, "UTF-8") - queryParams += "&channels[]=$encodedChannel" - } - - // Build query string from slots → channels → queries - // Format: channel[slot][]=query (each query sent as separate parameter) - // For each slot, repeat its queries under each channel it subscribes to - // Example: slot 1 → channels [tests, prod], queries [q1, q2] - // Produces: tests[1][]=q1&tests[1][]=q2&prod[1][]=q1&prod[1][]=q2 - val selectAllQuery = Query.select(listOf("*")).toString() - activeSubscriptions.forEach { (slot, subscription) -> - // Get queries array - each query is a separate string - val queries = if (subscription.queries.isEmpty()) { - listOf(selectAllQuery) - } else { - subscription.queries.toList() - } - - // Repeat this slot's queries under each channel it subscribes to - // Each query is sent as a separate parameter: channel[slot][]=q1&channel[slot][]=q2 - subscription.channels.forEach { channel -> - val encodedChannel = java.net.URLEncoder.encode(channel, "UTF-8") - queries.forEach { query -> - val encodedQuery = java.net.URLEncoder.encode(query, "UTF-8") - queryParams += "&$encodedChannel[$slot][]=$encodedQuery" - } - } - } - + val queryParams = "project=$encodedProject" val url = "${client.endpointRealtime}/realtime?$queryParams" request = Request.Builder().url(url).build() @@ -137,6 +97,66 @@ class Realtime(client: Client) : Service(client), CoroutineScope { socket?.close(RealtimeCode.POLICY_VIOLATION.value, null) } + private fun sendUnsubscribeMessage(subscriptionIds: List) { + val ws = socket ?: return + val ids = subscriptionIds.filter { it.isNotEmpty() } + if (ids.isEmpty()) { + return + } + ws.send( + mapOf( + "type" to "unsubscribe", + "data" to ids.map { mapOf("subscriptionId" to it) } + ).toJson() + ) + } + + private fun generateUniqueSubscriptionIdLocked(): String { + repeat(activeSubscriptions.size + 1) { + val id = ID.unique() + if (!activeSubscriptions.containsKey(id)) { + return id + } + } + throw AppwriteException("Failed to generate unique subscription id") + } + + private fun enqueuePendingSubscribeLocked(subscriptionId: String) { + val subscription = activeSubscriptions[subscriptionId] ?: return + pendingSubscribes[subscriptionId] = mapOf( + "subscriptionId" to subscriptionId, + "channels" to subscription.channels.toList(), + "queries" to subscription.queries.toList() + ) + } + + /** + * Close the WebSocket connection and drop all active subscriptions client-side. + * Use this instead of calling [RealtimeSubscription.unsubscribe] on every subscription + * when you want to tear everything down. + */ + fun disconnect() { + synchronized(subscriptionLock) { + activeSubscriptions.clear() + pendingSubscribes.clear() + reconnect = false + closeSocket() + } + } + + private fun sendPendingSubscribes() { + val ws = socket ?: return + val rows: List> + synchronized(subscriptionLock) { + if (pendingSubscribes.isEmpty()) { + return + } + rows = pendingSubscribes.values.toList() + pendingSubscribes.clear() + } + ws.send(mapOf("type" to "subscribe", "data" to rows).toJson()) + } + private fun startHeartbeat() { stopHeartbeat() heartbeatJob = launch { @@ -209,39 +229,91 @@ class Realtime(client: Client) : Service(client), CoroutineScope { queries: Set = emptySet(), callback: (RealtimeResponseEvent) -> Unit, ): RealtimeSubscription { - // Allocate a new slot index atomically - val slot = subscriptionsCounter.incrementAndGet() - - // Store slot-centric data: channels, queries, and callback belong to the slot. - // We only touch activeSubscriptions here, but keep the pattern consistent - // and future-proof by guarding multi-map writes with a shared lock. + val subscriptionId: String + var shouldSendPending = false + var shouldCreateSocket = false synchronized(subscriptionLock) { - activeSubscriptions[slot] = RealtimeCallback( + subscriptionId = generateUniqueSubscriptionIdLocked() + activeSubscriptions[subscriptionId] = RealtimeCallback( channels.toSet(), queries, payloadType, callback as (RealtimeResponseEvent<*>) -> Unit ) + enqueuePendingSubscribeLocked(subscriptionId) + if (socket != null) { + shouldSendPending = true + } else { + shouldCreateSocket = true + } + } + if (shouldSendPending) { + sendPendingSubscribes() + } else if (shouldCreateSocket) { + val shouldCreateAfterRecheck: Boolean + synchronized(subscriptionLock) { + shouldCreateAfterRecheck = (socket == null) + } + + if (shouldCreateAfterRecheck) { + createSocket() + } else { + // Another concurrent subscribe() created the socket first. + // Send pending rows without holding the lock to avoid ws.send() blocking others. + sendPendingSubscribes() + } } - // Sequential: rebuild socket immediately for A -> A+B -> A+B+C. - createSocket() - return RealtimeSubscription { - // Unsubscribe must update all three maps atomically so that - // no reader observes a half-updated state. + val unsubscribeFn: () -> Unit = { + val removed: Boolean synchronized(subscriptionLock) { - val subscriptionId = slotToSubscriptionId[slot] - activeSubscriptions.remove(slot) - slotToSubscriptionId.remove(slot) - subscriptionId?.let { subscriptionIdToSlot.remove(it) } + removed = activeSubscriptions.remove(subscriptionId) != null + pendingSubscribes.remove(subscriptionId) + } + if (removed) { + sendUnsubscribeMessage(listOf(subscriptionId)) + } + } + + val updateFn: (RealtimeSubscriptionUpdate) -> Unit = { changes -> + synchronized(subscriptionLock) { + val current = activeSubscriptions[subscriptionId] + if (current != null) { + val nextChannels = changes.channels?.map { channelToString(it) }?.toSet() + ?: current.channels + val nextQueries = changes.queries?.toSet() ?: current.queries + activeSubscriptions[subscriptionId] = RealtimeCallback( + nextChannels, + nextQueries, + current.payloadClass, + current.callback + ) + enqueuePendingSubscribeLocked(subscriptionId) + } + } + if (socket != null) { + sendPendingSubscribes() + } else { + createSocket() + } + } + + val closeFn: () -> Unit = { + unsubscribeFn() + synchronized(subscriptionLock) { + if (activeSubscriptions.isEmpty()) { + reconnect = false + closeSocket() + } } - // Sequential: rebuild socket immediately after unsubscribe. - this@Realtime.createSocket() } - } - // cleanUp is no longer needed - slots are removed directly in subscribe().close() - // Channels are automatically rebuilt from remaining slots in createSocket() + return RealtimeSubscription( + unsubscribe = unsubscribeFn, + update = updateFn, + close = closeFn + ) + } private inner class AppwriteWebSocketListener( private val generation: Int @@ -268,6 +340,7 @@ class Realtime(client: Client) : Service(client), CoroutineScope { when (message.type) { TYPE_ERROR -> handleResponseError(message) TYPE_CONNECTED -> handleResponseConnected(message) + TYPE_RESPONSE -> handleResponseAction(message) TYPE_EVENT -> handleResponseEvent(message) TYPE_PONG -> {} } @@ -275,28 +348,18 @@ class Realtime(client: Client) : Service(client), CoroutineScope { } private fun handleResponseConnected(message: RealtimeResponse) { - val messageData = message.data?.jsonCast>() ?: return - val subscriptions = messageData["subscriptions"] as? Map<*, *> ?: return + if (message.data == null) return - // Store subscription ID mappings from backend - // Format: { "0": "sub_a1f9", "1": "sub_b83c", ... } synchronized(subscriptionLock) { - val newSlotToSub = mutableMapOf() - val newSubToSlot = mutableMapOf() - - subscriptions.forEach { (slotStr, subscriptionId) -> - val slot = (slotStr as? String)?.toIntOrNull() - if (slot != null && subscriptionId is String) { - newSlotToSub[slot] = subscriptionId - newSubToSlot[subscriptionId] = slot - } - } - - slotToSubscriptionId.clear() - slotToSubscriptionId.putAll(newSlotToSub) - subscriptionIdToSlot.clear() - subscriptionIdToSlot.putAll(newSubToSlot) + activeSubscriptions.keys.forEach { enqueuePendingSubscribeLocked(it) } } + sendPendingSubscribes() + } + + private fun handleResponseAction(message: RealtimeResponse) { + // The SDK generates subscriptionIds client-side and sends them on every + // subscribe/unsubscribe, so subscribe/unsubscribe acks carry no state + // the SDK needs to reconcile. } private fun handleResponseError(message: RealtimeResponse) { @@ -319,19 +382,12 @@ class Realtime(client: Client) : Service(client), CoroutineScope { return } - // Use backend-provided subscription IDs for O(1) dispatch subscriptions.forEach { subscriptionId -> - // O(1) lookup using subscriptionId - val slot = subscriptionIdToSlot[subscriptionId] - if (slot != null) { - val subscription = activeSubscriptions[slot] - if (subscription != null) { - val typedEvent = event.copy( - payload = rawPayload.jsonCast(subscription.payloadClass) - ) - subscription.callback(typedEvent) - } - } + val subscription = activeSubscriptions[subscriptionId] ?: return@forEach + val typedEvent = event.copy( + payload = rawPayload.jsonCast(subscription.payloadClass) + ) + subscription.callback(typedEvent) } } diff --git a/library/src/main/java/io/appwrite/services/Storage.kt b/library/src/main/java/io/appwrite/services/Storage.kt index 6a8e9737..8276d082 100644 --- a/library/src/main/java/io/appwrite/services/Storage.kt +++ b/library/src/main/java/io/appwrite/services/Storage.kt @@ -343,4 +343,4 @@ class Storage(client: Client) : Service(client) { } -} \ No newline at end of file +} diff --git a/library/src/main/java/io/appwrite/services/TablesDB.kt b/library/src/main/java/io/appwrite/services/TablesDB.kt index d662045e..9c5a955e 100644 --- a/library/src/main/java/io/appwrite/services/TablesDB.kt +++ b/library/src/main/java/io/appwrite/services/TablesDB.kt @@ -799,4 +799,4 @@ class TablesDB(client: Client) : Service(client) { nestedType = classOf(), ) -} \ No newline at end of file +} diff --git a/library/src/main/java/io/appwrite/services/Teams.kt b/library/src/main/java/io/appwrite/services/Teams.kt index 56fc955b..78019a1d 100644 --- a/library/src/main/java/io/appwrite/services/Teams.kt +++ b/library/src/main/java/io/appwrite/services/Teams.kt @@ -600,4 +600,4 @@ class Teams(client: Client) : Service(client) { nestedType = classOf(), ) -} \ No newline at end of file +}