fix(sync): robuste WebSocket-Verbindung und Token-Refresh
- Backoff nur nach stabiler Verbindung (>30s) zurücksetzen → verhindert rapiden 2s-4s-2s-Reconnect-Oscillation - VIOLATED_POLICY Close-Reason erkennen → AuthRejected-Event → kein endloser Retry mit abgelaufenem Token - Token-Refresh bei AuthRejected: MainViewModel refresht Access-Token und reconnectet WS automatisch; bei Fehlschlag Session-Expired - executeItemRequest: fehlende 401-Retry-Logik ergänzt (Bug 4) - SyncService.refreshAccessToken() als neue Interface-Methode
This commit is contained in:
parent
e52f041d31
commit
ea3bd6dc97
5 changed files with 57 additions and 5 deletions
|
|
@ -121,6 +121,12 @@ internal class SyncServiceImpl @Inject constructor(
|
||||||
settingsRepository.setString(StringKey.AuthUsername, "")
|
settingsRepository.setString(StringKey.AuthUsername, "")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
override suspend fun refreshAccessToken(): Boolean {
|
||||||
|
val serverUrl = settingsRepository.getString(StringKey.ServerUrl)
|
||||||
|
if (serverUrl.isBlank()) return false
|
||||||
|
return refreshToken(serverUrl.trimEnd('/'))
|
||||||
|
}
|
||||||
|
|
||||||
private suspend fun executeRequest(
|
private suspend fun executeRequest(
|
||||||
block: suspend (serverUrl: String, token: String) -> Result<InventoryDto>
|
block: suspend (serverUrl: String, token: String) -> Result<InventoryDto>
|
||||||
): Result<InventoryDto> = withContext(Dispatchers.IO) {
|
): Result<InventoryDto> = withContext(Dispatchers.IO) {
|
||||||
|
|
@ -169,7 +175,20 @@ internal class SyncServiceImpl @Inject constructor(
|
||||||
return@withContext Result.failure(SyncError.NotConfigured("Nicht angemeldet"))
|
return@withContext Result.failure(SyncError.NotConfigured("Nicht angemeldet"))
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
block(serverUrl.trimEnd('/'), token)
|
val result = block(serverUrl.trimEnd('/'), token)
|
||||||
|
if (result.exceptionOrNull() is SyncError.AuthError) {
|
||||||
|
val refreshed = refreshToken(serverUrl.trimEnd('/'))
|
||||||
|
if (refreshed) {
|
||||||
|
val newToken = settingsRepository.getString(StringKey.AuthAccessToken)
|
||||||
|
if (newToken.isBlank()) return@withContext result
|
||||||
|
block(serverUrl.trimEnd('/'), newToken)
|
||||||
|
} else {
|
||||||
|
authEventBus.notifySessionExpired()
|
||||||
|
result
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
result
|
||||||
|
}
|
||||||
} catch (e: SocketTimeoutException) {
|
} catch (e: SocketTimeoutException) {
|
||||||
Result.failure(SyncError.Timeout(e))
|
Result.failure(SyncError.Timeout(e))
|
||||||
} catch (e: ConnectException) {
|
} catch (e: ConnectException) {
|
||||||
|
|
|
||||||
|
|
@ -17,6 +17,7 @@ internal sealed interface WebSocketEvent {
|
||||||
data object Connected : WebSocketEvent
|
data object Connected : WebSocketEvent
|
||||||
data object Disconnected : WebSocketEvent
|
data object Disconnected : WebSocketEvent
|
||||||
data class ConnectionFailed(val message: String) : WebSocketEvent
|
data class ConnectionFailed(val message: String) : WebSocketEvent
|
||||||
|
data object AuthRejected : WebSocketEvent
|
||||||
data class NewMessage(val message: MessageDto) : WebSocketEvent
|
data class NewMessage(val message: MessageDto) : WebSocketEvent
|
||||||
data class KeyUpdated(val userId: String, val publicKey: String) : WebSocketEvent
|
data class KeyUpdated(val userId: String, val publicKey: String) : WebSocketEvent
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -7,6 +7,7 @@ import io.ktor.client.plugins.websocket.WebSockets
|
||||||
import io.ktor.client.plugins.websocket.webSocket
|
import io.ktor.client.plugins.websocket.webSocket
|
||||||
import io.ktor.client.request.header
|
import io.ktor.client.request.header
|
||||||
import io.ktor.http.HttpHeaders
|
import io.ktor.http.HttpHeaders
|
||||||
|
import io.ktor.websocket.CloseReason
|
||||||
import io.ktor.websocket.Frame
|
import io.ktor.websocket.Frame
|
||||||
import io.ktor.websocket.readText
|
import io.ktor.websocket.readText
|
||||||
import kotlinx.coroutines.CancellationException
|
import kotlinx.coroutines.CancellationException
|
||||||
|
|
@ -60,13 +61,13 @@ internal class WebSocketClientImpl @Inject constructor() : WebSocketClient {
|
||||||
while (isActive) {
|
while (isActive) {
|
||||||
_connectionState.value = ConnectionState.Connecting
|
_connectionState.value = ConnectionState.Connecting
|
||||||
Log.d(TAG, "WebSocket: Verbindungsversuch #${consecutiveFailures + 1} (Backoff: ${backoffMs}ms)")
|
Log.d(TAG, "WebSocket: Verbindungsversuch #${consecutiveFailures + 1} (Backoff: ${backoffMs}ms)")
|
||||||
|
var authRejected = false
|
||||||
try {
|
try {
|
||||||
wsHttpClient.webSocket(
|
wsHttpClient.webSocket(
|
||||||
urlString = "$wsUrl/ws/sync",
|
urlString = "$wsUrl/ws/sync",
|
||||||
request = { header(HttpHeaders.Authorization, "Bearer $accessToken") }
|
request = { header(HttpHeaders.Authorization, "Bearer $accessToken") }
|
||||||
) {
|
) {
|
||||||
backoffMs = INITIAL_BACKOFF_MS
|
val connectedAt = System.currentTimeMillis()
|
||||||
consecutiveFailures = 0
|
|
||||||
_connectionState.value = ConnectionState.Connected
|
_connectionState.value = ConnectionState.Connected
|
||||||
Log.i(TAG, "WebSocket: Verbunden mit $wsUrl/ws/sync")
|
Log.i(TAG, "WebSocket: Verbunden mit $wsUrl/ws/sync")
|
||||||
_events.emit(WebSocketEvent.Connected)
|
_events.emit(WebSocketEvent.Connected)
|
||||||
|
|
@ -77,7 +78,19 @@ internal class WebSocketClientImpl @Inject constructor() : WebSocketClient {
|
||||||
handleFrame(text)
|
handleFrame(text)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Log.i(TAG, "WebSocket: Session normal beendet")
|
val reason = closeReason.await()
|
||||||
|
val durationMs = System.currentTimeMillis() - connectedAt
|
||||||
|
Log.i(TAG, "WebSocket: Session beendet nach ${durationMs}ms – Reason: ${reason?.message}")
|
||||||
|
when {
|
||||||
|
reason?.knownReason == CloseReason.Codes.VIOLATED_POLICY -> {
|
||||||
|
authRejected = true
|
||||||
|
_events.emit(WebSocketEvent.AuthRejected)
|
||||||
|
}
|
||||||
|
durationMs >= STABLE_CONNECTION_MS -> {
|
||||||
|
backoffMs = INITIAL_BACKOFF_MS
|
||||||
|
consecutiveFailures = 0
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} catch (e: CancellationException) {
|
} catch (e: CancellationException) {
|
||||||
Log.d(TAG, "WebSocket: Verbindung abgebrochen (disconnect)")
|
Log.d(TAG, "WebSocket: Verbindung abgebrochen (disconnect)")
|
||||||
|
|
@ -92,7 +105,7 @@ internal class WebSocketClientImpl @Inject constructor() : WebSocketClient {
|
||||||
_events.emit(WebSocketEvent.ConnectionFailed(msg))
|
_events.emit(WebSocketEvent.ConnectionFailed(msg))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (!isActive) break
|
if (!isActive || authRejected) break
|
||||||
_events.emit(WebSocketEvent.Disconnected)
|
_events.emit(WebSocketEvent.Disconnected)
|
||||||
val jitter = backoffMs * JITTER_FACTOR * (Random.nextDouble() * 2 - 1)
|
val jitter = backoffMs * JITTER_FACTOR * (Random.nextDouble() * 2 - 1)
|
||||||
val totalDelayMs = backoffMs + jitter.toLong()
|
val totalDelayMs = backoffMs + jitter.toLong()
|
||||||
|
|
@ -159,6 +172,7 @@ internal class WebSocketClientImpl @Inject constructor() : WebSocketClient {
|
||||||
const val TAG = "WebSocketClient"
|
const val TAG = "WebSocketClient"
|
||||||
const val INITIAL_BACKOFF_MS = 2_000L
|
const val INITIAL_BACKOFF_MS = 2_000L
|
||||||
const val MAX_BACKOFF_MS = 60_000L
|
const val MAX_BACKOFF_MS = 60_000L
|
||||||
|
const val STABLE_CONNECTION_MS = 30_000L
|
||||||
const val MAX_RETRIES = 5
|
const val MAX_RETRIES = 5
|
||||||
const val JITTER_FACTOR = 0.25
|
const val JITTER_FACTOR = 0.25
|
||||||
val json = Json { ignoreUnknownKeys = true }
|
val json = Json { ignoreUnknownKeys = true }
|
||||||
|
|
|
||||||
|
|
@ -9,6 +9,7 @@ internal interface SyncService {
|
||||||
suspend fun uploadInventory(inventory: InventoryDto): Result<InventoryDto>
|
suspend fun uploadInventory(inventory: InventoryDto): Result<InventoryDto>
|
||||||
suspend fun login(serverUrl: String, username: String, password: String): Result<Unit>
|
suspend fun login(serverUrl: String, username: String, password: String): Result<Unit>
|
||||||
suspend fun logout()
|
suspend fun logout()
|
||||||
|
suspend fun refreshAccessToken(): Boolean = false
|
||||||
suspend fun patchItem(itemId: String, item: ItemDto): Result<Unit>
|
suspend fun patchItem(itemId: String, item: ItemDto): Result<Unit>
|
||||||
suspend fun deleteItem(itemId: String): Result<Unit>
|
suspend fun deleteItem(itemId: String): Result<Unit>
|
||||||
suspend fun listInventories(): Result<List<InventoryInfoDto>>
|
suspend fun listInventories(): Result<List<InventoryInfoDto>>
|
||||||
|
|
|
||||||
|
|
@ -90,6 +90,23 @@ internal class MainViewModel @Inject constructor(
|
||||||
Log.d(TAG, "Inventar-Update empfangen (id=${event.itemId})")
|
Log.d(TAG, "Inventar-Update empfangen (id=${event.itemId})")
|
||||||
pullSync()
|
pullSync()
|
||||||
}
|
}
|
||||||
|
is WebSocketEvent.AuthRejected -> {
|
||||||
|
Log.w(TAG, "WebSocket: Auth abgelehnt – versuche Token-Refresh")
|
||||||
|
val refreshed = syncService.refreshAccessToken()
|
||||||
|
if (refreshed) {
|
||||||
|
val token = settingsRepository.getString(StringKey.AuthAccessToken)
|
||||||
|
val serverUrl = settingsRepository.getString(StringKey.ServerUrl)
|
||||||
|
if (token.isNotBlank() && serverUrl.isNotBlank()) {
|
||||||
|
Log.i(TAG, "Token refreshed – reconnekte WebSocket")
|
||||||
|
webSocketClient.connect(serverUrl, token)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
Log.w(TAG, "Token-Refresh fehlgeschlagen – Session expired")
|
||||||
|
syncService.logout()
|
||||||
|
webSocketClient.disconnect()
|
||||||
|
_navigateToSettings.emit(Unit)
|
||||||
|
}
|
||||||
|
}
|
||||||
else -> {}
|
else -> {}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue