Skip to content
175 changes: 99 additions & 76 deletions app/src/main/java/to/bitkit/repositories/LightningRepo.kt
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import kotlinx.coroutines.flow.update
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.coroutines.tasks.await
import kotlinx.coroutines.withContext
import kotlinx.coroutines.withTimeoutOrNull
Expand Down Expand Up @@ -108,6 +109,7 @@ class LightningRepo @Inject constructor(
private val syncMutex = Mutex()
private val syncPending = AtomicBoolean(false)
private val syncRetryJob = AtomicReference<Job?>(null)
private val lifecycleMutex = Mutex()

init {
observeConnectivityForSyncRetry()
Expand Down Expand Up @@ -269,88 +271,98 @@ class LightningRepo @Inject constructor(

eventHandler?.let { _eventHandlers.add(it) }

val initialLifecycleState = _lightningState.value.nodeLifecycleState
if (initialLifecycleState.isRunningOrStarting()) {
Logger.info("LDK node start skipped, lifecycle state: $initialLifecycleState", context = TAG)
return@withContext Result.success(Unit)
}
// Track retry state outside mutex to avoid deadlock (Mutex is non-reentrant)
var shouldRetryStart = false
var initialLifecycleState: NodeLifecycleState = NodeLifecycleState.Stopped

runCatching {
_lightningState.update { it.copy(nodeLifecycleState = NodeLifecycleState.Starting) }

// Setup if needed
if (lightningService.node == null) {
val setupResult = setup(walletIndex, customServerUrl, customRgsServerUrl, channelMigration)
if (setupResult.isFailure) {
_lightningState.update {
it.copy(
nodeLifecycleState = NodeLifecycleState.ErrorStarting(
setupResult.exceptionOrNull() ?: NodeSetupError()
val result = lifecycleMutex.withLock {
initialLifecycleState = _lightningState.value.nodeLifecycleState
if (initialLifecycleState.isRunningOrStarting()) {
Logger.info("LDK node start skipped, lifecycle state: $initialLifecycleState", context = TAG)
return@withLock Result.success(Unit)
}

runCatching {
_lightningState.update { it.copy(nodeLifecycleState = NodeLifecycleState.Starting) }

// Setup if needed
if (lightningService.node == null) {
val setupResult = setup(walletIndex, customServerUrl, customRgsServerUrl, channelMigration)
if (setupResult.isFailure) {
_lightningState.update {
it.copy(
nodeLifecycleState = NodeLifecycleState.ErrorStarting(
setupResult.exceptionOrNull() ?: NodeSetupError()
)
)
)
}
return@withLock setupResult
}
return@withContext setupResult
}
}

if (getStatus()?.isRunning == true) {
Logger.info("LDK node already running", context = TAG)
_lightningState.update { it.copy(nodeLifecycleState = NodeLifecycleState.Running) }
lightningService.startEventListener(::onEvent).onFailure {
Logger.warn("Failed to start event listener", it, context = TAG)
return@withContext Result.failure(it)
if (getStatus()?.isRunning == true) {
Logger.info("LDK node already running", context = TAG)
_lightningState.update { it.copy(nodeLifecycleState = NodeLifecycleState.Running) }
lightningService.startEventListener(::onEvent).onFailure {
Logger.warn("Failed to start event listener", it, context = TAG)
return@withLock Result.failure(it)
}
return@withLock Result.success(Unit)
}
return@withContext Result.success(Unit)
}

// Start node
lightningService.start(timeout, ::onEvent)
// Start node
lightningService.start(timeout, ::onEvent)

_lightningState.update { it.copy(nodeLifecycleState = NodeLifecycleState.Running) }
_lightningState.update { it.copy(nodeLifecycleState = NodeLifecycleState.Running) }

// Initial state sync
syncState()
updateGeoBlockState()
refreshChannelCache()
// Initial state sync
syncState()
updateGeoBlockState()
refreshChannelCache()

// Post-startup tasks (non-blocking)
connectToTrustedPeers().onFailure {
Logger.error("Failed to connect to trusted peers", it, context = TAG)
}
// Post-startup tasks (non-blocking)
connectToTrustedPeers().onFailure {
Logger.error("Failed to connect to trusted peers", it, context = TAG)
}

sync().onFailure { e ->
Logger.warn("Initial sync failed, event-driven sync will retry", e, context = TAG)
}
scope.launch { registerForNotifications() }
Unit
}.onFailure { e ->
val currentLifecycleState = _lightningState.value.nodeLifecycleState
if (currentLifecycleState.isRunning()) {
Logger.warn("Start error occurred but node is $currentLifecycleState, skipping retry", e, context = TAG)
return@withContext Result.success(Unit)
}
sync().onFailure { e ->
Logger.warn("Initial sync failed, event-driven sync will retry", e, context = TAG)
}
scope.launch { registerForNotifications() }
Result.success(Unit)
}.getOrElse { e ->
val currentState = _lightningState.value.nodeLifecycleState
if (currentState.isRunning()) {
Logger.warn("Start error but node is $currentState, skipping retry", e, context = TAG)
return@withLock Result.success(Unit)
}

if (shouldRetry) {
val retryDelay = 2.seconds
Logger.warn("Start error, retrying after $retryDelay...", e, context = TAG)
_lightningState.update { it.copy(nodeLifecycleState = initialLifecycleState) }

delay(retryDelay)
return@withContext start(
walletIndex = walletIndex,
timeout = timeout,
shouldRetry = false,
customServerUrl = customServerUrl,
customRgsServerUrl = customRgsServerUrl,
channelMigration = channelMigration,
)
} else {
_lightningState.update {
it.copy(nodeLifecycleState = NodeLifecycleState.ErrorStarting(e))
if (shouldRetry) {
Logger.warn("Start error, will retry...", e, context = TAG)
_lightningState.update { it.copy(nodeLifecycleState = initialLifecycleState) }
shouldRetryStart = true
Result.failure(e)
} else {
_lightningState.update { it.copy(nodeLifecycleState = NodeLifecycleState.ErrorStarting(e)) }
Result.failure(e)
}
return@withContext Result.failure(e)
}
}

// Retry OUTSIDE the mutex to avoid deadlock (Kotlin Mutex is non-reentrant)
if (shouldRetryStart) {
delay(2.seconds)
return@withContext start(
walletIndex = walletIndex,
timeout = timeout,
shouldRetry = false,
customServerUrl = customServerUrl,
customRgsServerUrl = customRgsServerUrl,
channelMigration = channelMigration,
)
}

result
}

private suspend fun onEvent(event: Event) {
Expand All @@ -374,16 +386,27 @@ class LightningRepo @Inject constructor(
}

suspend fun stop(): Result<Unit> = withContext(bgDispatcher) {
if (_lightningState.value.nodeLifecycleState.isStoppedOrStopping()) {
return@withContext Result.success(Unit)
}
lifecycleMutex.withLock {
if (_lightningState.value.nodeLifecycleState.isStoppedOrStopping()) {
return@withLock Result.success(Unit)
}

runCatching {
_lightningState.update { it.copy(nodeLifecycleState = NodeLifecycleState.Stopping) }
lightningService.stop()
_lightningState.update { LightningState(nodeLifecycleState = NodeLifecycleState.Stopped) }
}.onFailure {
Logger.error("Node stop error", it, context = TAG)
runCatching {
_lightningState.update { it.copy(nodeLifecycleState = NodeLifecycleState.Stopping) }
lightningService.stop()
_lightningState.update { LightningState(nodeLifecycleState = NodeLifecycleState.Stopped) }
}.onFailure {
Logger.error("Node stop error", it, context = TAG)
// On failure, check actual node state and update accordingly
// If node is still running, revert to Running state to allow retry
if (lightningService.node != null && lightningService.status?.isRunning == true) {
Logger.warn("Stop failed but node is still running, reverting to Running state", context = TAG)
_lightningState.update { it.copy(nodeLifecycleState = NodeLifecycleState.Running) }
} else {
// Node appears stopped, update state
_lightningState.update { LightningState(nodeLifecycleState = NodeLifecycleState.Stopped) }
}
}
}
}

Expand Down
Loading