feat: WebSocket-Lifecycle und Sync ab App-Start unabhaengig von Settings-Screen

- MainViewModel: verbindet WebSocket beim App-Start (connectOnStartup) und
  nach Login (via AuthEventBus.loginSuccess). Behandelt alle WebSocket-Events
  (Connected/FullSyncRequired/InventoryUpdated) -> pullSync/pushSync.
  Auto-pushSync wenn Server leer ist und lokale Daten vorhanden (Daten-Recovery).
- AuthEventBus: loginSuccess-Signal ergaenzt (serverUrl + token)
- SyncServiceImpl: emittiert loginSuccess nach erfolgreichem Login
- SettingsViewModel: WebSocket-Lifecycle entfernt (nur noch ConnectionFailed
  fuer UI-Fehlermeldung). Manueller Sync-Button bleibt erhalten.
- WebSocketClientImpl: vollstaendiges Logging, wiederholende User-Benachrichtigung
  bei Verbindungsfehlern (alle MAX_RETRIES Versuche statt nur einmalig)
This commit is contained in:
Jens Reinemann 2026-05-18 01:17:47 +02:00
parent 575c0ad709
commit dad2907481
5 changed files with 139 additions and 47 deletions

View file

@ -98,6 +98,7 @@ internal class SyncServiceImpl @Inject constructor(
loginResponse.inventoryName?.let { loginResponse.inventoryName?.let {
settingsRepository.setString(StringKey.ActiveInventoryName, it) settingsRepository.setString(StringKey.ActiveInventoryName, it)
} }
authEventBus.notifyLoginSuccess(serverUrl.trimEnd('/'), loginResponse.accessToken)
Result.success(Unit) Result.success(Unit)
} }
HttpStatusCode.Unauthorized -> Result.failure(SyncError.AuthError()) HttpStatusCode.Unauthorized -> Result.failure(SyncError.AuthError())

View file

@ -51,50 +51,57 @@ internal class WebSocketClientImpl @Inject constructor() : WebSocketClient {
connectionJob = scope.launch { connectionJob = scope.launch {
var backoffMs = INITIAL_BACKOFF_MS var backoffMs = INITIAL_BACKOFF_MS
var consecutiveFailures = 0 var consecutiveFailures = 0
var connectionFailedEmitted = false val wsUrl = serverUrl.trimEnd('/')
.replace("https://", "wss://")
.replace("http://", "ws://")
Log.i(TAG, "WebSocket: Starte Verbindung zu $wsUrl/ws/sync")
while (isActive) { while (isActive) {
_connectionState.value = ConnectionState.Connecting _connectionState.value = ConnectionState.Connecting
Log.d(TAG, "WebSocket: Verbindungsversuch #${consecutiveFailures + 1} (Backoff: ${backoffMs}ms)")
try { try {
val wsUrl = serverUrl.trimEnd('/')
.replace("https://", "wss://")
.replace("http://", "ws://")
wsHttpClient.webSocket("$wsUrl/ws/sync?token=$accessToken") { wsHttpClient.webSocket("$wsUrl/ws/sync?token=$accessToken") {
backoffMs = INITIAL_BACKOFF_MS backoffMs = INITIAL_BACKOFF_MS
consecutiveFailures = 0 consecutiveFailures = 0
connectionFailedEmitted = false
_connectionState.value = ConnectionState.Connected _connectionState.value = ConnectionState.Connected
Log.i(TAG, "WebSocket: Verbunden mit $wsUrl/ws/sync")
_events.emit(WebSocketEvent.Connected) _events.emit(WebSocketEvent.Connected)
for (frame in incoming) { for (frame in incoming) {
if (frame is Frame.Text) { if (frame is Frame.Text) {
handleFrame(frame.readText()) val text = frame.readText()
Log.d(TAG, "WebSocket: Frame empfangen: ${text.take(200)}")
handleFrame(text)
} }
} }
Log.i(TAG, "WebSocket: Session normal beendet")
} }
} catch (e: CancellationException) { } catch (e: CancellationException) {
Log.d(TAG, "WebSocket: Verbindung abgebrochen (disconnect)")
break break
} catch (_: Exception) { } catch (e: Exception) {
consecutiveFailures++ consecutiveFailures++
if (consecutiveFailures >= MAX_RETRIES && !connectionFailedEmitted) { Log.w(TAG, "WebSocket: Verbindungsfehler #$consecutiveFailures ${e::class.simpleName}: ${e.message}", e)
_events.emit( // Nutzer nach jeweils MAX_RETRIES Fehlern benachrichtigen
WebSocketEvent.ConnectionFailed( if (consecutiveFailures % MAX_RETRIES == 0) {
"Verbindung nach $MAX_RETRIES Versuchen fehlgeschlagen" val msg = "WebSocket-Verbindung fehlgeschlagen (${consecutiveFailures}x): ${e.message ?: e::class.simpleName}"
) Log.e(TAG, "WebSocket: $msg")
) _events.emit(WebSocketEvent.ConnectionFailed(msg))
connectionFailedEmitted = true
} }
} }
if (!isActive) break if (!isActive) break
_events.emit(WebSocketEvent.Disconnected) _events.emit(WebSocketEvent.Disconnected)
val jitter = backoffMs * JITTER_FACTOR * (Random.nextDouble() * 2 - 1) val jitter = backoffMs * JITTER_FACTOR * (Random.nextDouble() * 2 - 1)
val totalDelayMs = backoffMs + jitter.toLong() val totalDelayMs = backoffMs + jitter.toLong()
Log.d(TAG, "WebSocket: Nächster Versuch in ${totalDelayMs / 1000}s")
startCountdown(totalDelayMs) startCountdown(totalDelayMs)
delay(totalDelayMs) delay(totalDelayMs)
backoffMs = minOf(backoffMs * 2, MAX_BACKOFF_MS) backoffMs = minOf(backoffMs * 2, MAX_BACKOFF_MS)
} }
Log.i(TAG, "WebSocket: Verbindungsschleife beendet")
} }
} }
override fun disconnect() { override fun disconnect() {
Log.i(TAG, "WebSocket: Verbindung wird getrennt")
connectionJob?.cancel() connectionJob?.cancel()
connectionJob = null connectionJob = null
countdownJob?.cancel() countdownJob?.cancel()

View file

@ -15,8 +15,11 @@ import javax.inject.Singleton
internal class AuthEventBus @Inject constructor() { internal class AuthEventBus @Inject constructor() {
private val _sessionExpired = MutableSharedFlow<Unit>(extraBufferCapacity = 1) private val _sessionExpired = MutableSharedFlow<Unit>(extraBufferCapacity = 1)
val sessionExpired: SharedFlow<Unit> = _sessionExpired.asSharedFlow() val sessionExpired: SharedFlow<Unit> = _sessionExpired.asSharedFlow()
fun notifySessionExpired() { _sessionExpired.tryEmit(Unit) }
fun notifySessionExpired() { private val _loginSuccess = MutableSharedFlow<Pair<String, String>>(extraBufferCapacity = 1)
_sessionExpired.tryEmit(Unit) val loginSuccess: SharedFlow<Pair<String, String>> = _loginSuccess.asSharedFlow()
fun notifyLoginSuccess(serverUrl: String, accessToken: String) {
_loginSuccess.tryEmit(serverUrl to accessToken)
} }
} }

View file

@ -1,36 +1,139 @@
package de.bollwerk.app.ui package de.bollwerk.app.ui
import android.util.Log
import androidx.lifecycle.ViewModel import androidx.lifecycle.ViewModel
import androidx.lifecycle.viewModelScope import androidx.lifecycle.viewModelScope
import dagger.hilt.android.lifecycle.HiltViewModel import dagger.hilt.android.lifecycle.HiltViewModel
import de.bollwerk.app.data.sync.WebSocketClient import de.bollwerk.app.data.sync.WebSocketClient
import de.bollwerk.app.data.sync.WebSocketEvent
import de.bollwerk.app.domain.AuthEventBus import de.bollwerk.app.domain.AuthEventBus
import de.bollwerk.app.domain.model.SettingsKey.StringKey
import de.bollwerk.app.domain.repository.ImportExportRepository
import de.bollwerk.app.domain.repository.SettingsRepository
import de.bollwerk.app.domain.repository.SyncService import de.bollwerk.app.domain.repository.SyncService
import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharedFlow import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.asSharedFlow import kotlinx.coroutines.flow.asSharedFlow
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import java.time.Instant
import javax.inject.Inject import javax.inject.Inject
@HiltViewModel @HiltViewModel
internal class MainViewModel @Inject constructor( internal class MainViewModel @Inject constructor(
private val authEventBus: AuthEventBus, private val authEventBus: AuthEventBus,
private val syncService: SyncService, private val syncService: SyncService,
private val webSocketClient: WebSocketClient private val webSocketClient: WebSocketClient,
private val settingsRepository: SettingsRepository,
private val importExportRepository: ImportExportRepository,
) : ViewModel() { ) : ViewModel() {
private val _navigateToSettings = MutableSharedFlow<Unit>(extraBufferCapacity = 1) private val _navigateToSettings = MutableSharedFlow<Unit>(extraBufferCapacity = 1)
val navigateToSettings: SharedFlow<Unit> = _navigateToSettings.asSharedFlow() val navigateToSettings: SharedFlow<Unit> = _navigateToSettings.asSharedFlow()
init { init {
connectOnStartup()
observeLoginSuccess()
observeSessionExpiry()
observeWebSocketEvents()
}
/** Beim App-Start: wenn Token + Server-URL vorhanden → WebSocket verbinden */
private fun connectOnStartup() {
viewModelScope.launch {
val token = settingsRepository.getString(StringKey.AuthAccessToken)
val serverUrl = settingsRepository.getString(StringKey.ServerUrl)
if (token.isNotBlank() && serverUrl.isNotBlank()) {
Log.i(TAG, "App-Start: Token vorhanden verbinde WebSocket")
webSocketClient.connect(serverUrl, token)
} else {
Log.d(TAG, "App-Start: Kein Token kein WebSocket")
}
}
}
/** Nach erfolgreichem Login → WebSocket verbinden */
private fun observeLoginSuccess() {
viewModelScope.launch {
authEventBus.loginSuccess.collect { (serverUrl, token) ->
Log.i(TAG, "Login erfolgreich verbinde WebSocket")
webSocketClient.connect(serverUrl, token)
}
}
}
/** Session abgelaufen → Token löschen, WebSocket trennen, zu Settings navigieren */
private fun observeSessionExpiry() {
viewModelScope.launch { viewModelScope.launch {
authEventBus.sessionExpired.collect { authEventBus.sessionExpired.collect {
// Tokens löschen (kein Datenverlust nur Auth-Daten) Log.w(TAG, "Session abgelaufen Forced-Logout")
syncService.logout() syncService.logout()
webSocketClient.disconnect() webSocketClient.disconnect()
// Navigation zu Settings (Login-Formular)
_navigateToSettings.emit(Unit) _navigateToSettings.emit(Unit)
} }
} }
} }
/** WebSocket-Events → Sync auslösen */
private fun observeWebSocketEvents() {
viewModelScope.launch {
webSocketClient.events.collect { event ->
when (event) {
is WebSocketEvent.Connected -> {
Log.i(TAG, "WebSocket verbunden starte initialen Sync")
pullSync(isInitialConnect = true)
}
is WebSocketEvent.FullSyncRequired -> {
Log.i(TAG, "Server fordert Full-Sync")
pullSync(fullSync = true)
}
is WebSocketEvent.InventoryUpdated -> {
Log.d(TAG, "Inventar-Update empfangen (id=${event.itemId})")
pullSync()
}
else -> {}
}
}
}
}
private suspend fun pullSync(fullSync: Boolean = false, isInitialConnect: Boolean = false) {
val since = if (fullSync) null
else settingsRepository.getStringOrNull(StringKey.SyncLastTimestamp)?.toLongOrNull()
Log.d(TAG, "pullSync: fullSync=$fullSync, isInitialConnect=$isInitialConnect, since=$since")
syncService.downloadInventory(since).fold(
onSuccess = { inventoryDto ->
importExportRepository.importFromInventoryDto(inventoryDto)
.onSuccess {
val now = Instant.now().toEpochMilli().toString()
settingsRepository.setString(StringKey.SyncLastTimestamp, now)
Log.i(TAG, "pullSync: ${inventoryDto.items.size} Items vom Server")
// Server leer aber lokal Daten vorhanden → hochladen
if (isInitialConnect && inventoryDto.items.isEmpty()) {
Log.i(TAG, "Server hat keine Daten uploade lokales Inventar")
pushSync()
}
}
.onFailure { e ->
Log.e(TAG, "pullSync: Import fehlgeschlagen: ${e.message}", e)
}
},
onFailure = { e ->
Log.e(TAG, "pullSync: Download fehlgeschlagen: ${e.message}", e)
}
)
}
private suspend fun pushSync() {
val inventoryDto = importExportRepository.exportToInventoryDto()
Log.i(TAG, "pushSync: Uploade ${inventoryDto.items.size} Items zum Server")
syncService.uploadInventory(inventoryDto).fold(
onSuccess = { Log.i(TAG, "pushSync erfolgreich") },
onFailure = { e -> Log.e(TAG, "pushSync fehlgeschlagen: ${e.message}", e) }
)
}
companion object {
private const val TAG = "MainViewModel"
}
} }

View file

@ -52,7 +52,7 @@ internal class SettingsViewModel @Inject constructor(
init { init {
loadSettings() loadSettings()
observeWebSocketEvents() observeConnectionFailedEvent()
observeConnectionState() observeConnectionState()
observePendingQueueCount() observePendingQueueCount()
} }
@ -76,25 +76,11 @@ internal class SettingsViewModel @Inject constructor(
} }
} }
private fun observeWebSocketEvents() { private fun observeConnectionFailedEvent() {
viewModelScope.launch { viewModelScope.launch {
webSocketClient.events.collect { event -> webSocketClient.events.collect { event ->
when (event) { if (event is WebSocketEvent.ConnectionFailed) {
is WebSocketEvent.FullSyncRequired -> { showActivity(SyncActivityMessage.Error(event.message))
showActivity(SyncActivityMessage.ReceivingUpdate)
pullSync(fullSync = true)
}
is WebSocketEvent.InventoryUpdated -> {
showActivity(SyncActivityMessage.ReceivingUpdate)
pullSync(fullSync = false)
}
is WebSocketEvent.ConnectionFailed -> {
showActivity(SyncActivityMessage.Error(event.message))
}
is WebSocketEvent.Connected -> {
pullSync(fullSync = false, isInitialConnect = true)
}
else -> {}
} }
} }
} }
@ -133,14 +119,7 @@ internal class SettingsViewModel @Inject constructor(
) )
} }
if (isLoggedIn) { // WebSocket-Lifecycle wird von MainViewModel verwaltet
val currentState = webSocketClient.connectionState.value
if (currentState == ConnectionState.NotConfigured
|| currentState is ConnectionState.Disconnected
) {
webSocketClient.connect(serverUrl, accessToken)
}
}
} catch (e: Exception) { } catch (e: Exception) {
_uiState.update { _uiState.update {
it.copy( it.copy(
@ -224,8 +203,7 @@ internal class SettingsViewModel @Inject constructor(
_uiState.update { it.copy(isLoggingIn = true, loginError = null) } _uiState.update { it.copy(isLoggingIn = true, loginError = null) }
syncService.login(serverUrl, username, password).fold( syncService.login(serverUrl, username, password).fold(
onSuccess = { onSuccess = {
val accessToken = settingsRepository.getString(StringKey.AuthAccessToken) // WebSocket-Connect durch MainViewModel via AuthEventBus.loginSuccess
webSocketClient.connect(serverUrl, accessToken)
_uiState.update { _uiState.update {
it.copy( it.copy(
isLoggingIn = false, isLoggingIn = false,