From c771aa95478aa831cd3964ce4dc41e4ad5e2d714 Mon Sep 17 00:00:00 2001 From: Jens Reinemann Date: Mon, 18 May 2026 09:17:15 +0200 Subject: [PATCH] feat(messaging): enforce 10 MB mailbox limit per receiver with FIFO eviction (#103) - Add getUndeliveredStorageBytes() and evictOldestUndelivered() to MessageRepository - Check mailbox size before saving; evict oldest undelivered messages if over 10 MB - Return systemMessage in SendMessageResponse when eviction occurs - App parses systemMessage and displays it in the sender's conversation - Add SendMessageResponse to shared module for server/app interop - Update existing tests to use new response format - Add 3 new tests for eviction behavior --- .../data/repository/MessageRepositoryImpl.kt | 41 +++++++- .../server/repository/MessageRepository.kt | 27 ++++++ .../bollwerk/server/routes/MessageRoutes.kt | 18 +++- .../de/bollwerk/server/MessageApiTest.kt | 96 ++++++++++++++++++- .../de/bollwerk/server/Utf8MessagingTest.kt | 14 +-- .../shared/model/SendMessageResponse.kt | 9 ++ 6 files changed, 191 insertions(+), 14 deletions(-) create mode 100644 shared/src/main/kotlin/de/bollwerk/shared/model/SendMessageResponse.kt diff --git a/app/src/main/java/de/bollwerk/app/data/repository/MessageRepositoryImpl.kt b/app/src/main/java/de/bollwerk/app/data/repository/MessageRepositoryImpl.kt index d2d5452..f2e33d8 100644 --- a/app/src/main/java/de/bollwerk/app/data/repository/MessageRepositoryImpl.kt +++ b/app/src/main/java/de/bollwerk/app/data/repository/MessageRepositoryImpl.kt @@ -14,6 +14,7 @@ import de.bollwerk.app.domain.model.SettingsKey.StringKey import de.bollwerk.app.domain.model.SyncError import de.bollwerk.app.domain.repository.MessageRepository import de.bollwerk.app.domain.repository.SettingsRepository +import de.bollwerk.shared.model.SendMessageResponse import de.bollwerk.shared.model.UserListItemDto import io.ktor.client.HttpClient import io.ktor.client.call.body @@ -126,6 +127,20 @@ internal class MessageRepositoryImpl @Inject constructor( val result = attemptSendToServer(localId, recipientId, encryptedBody, sentAt) if (result.isSuccess) { dao.markDelivered(localId) + val systemMessage = result.getOrNull() + if (systemMessage != null) { + dao.upsert( + MessageEntity( + id = UUID.randomUUID().toString(), + senderId = recipientId, + senderUsername = SYSTEM_SENDER_USERNAME, + receiverId = myId, + body = systemMessage, + sentAt = System.currentTimeMillis(), + isPending = false + ) + ) + } } } @@ -180,6 +195,22 @@ internal class MessageRepositoryImpl @Inject constructor( val result = attemptSendToServer(msg.id, msg.receiverId, encryptedBody, msg.sentAt) if (result.isSuccess) { withContext(Dispatchers.IO) { dao.markDelivered(msg.id) } + val systemMessage = result.getOrNull() + if (systemMessage != null) { + withContext(Dispatchers.IO) { + dao.upsert( + MessageEntity( + id = UUID.randomUUID().toString(), + senderId = msg.receiverId, + senderUsername = SYSTEM_SENDER_USERNAME, + receiverId = msg.senderId, + body = systemMessage, + sentAt = System.currentTimeMillis(), + isPending = false + ) + ) + } + } } } } @@ -237,7 +268,7 @@ internal class MessageRepositoryImpl @Inject constructor( recipientId: String, body: String, sentAt: Long - ): Result = withContext(Dispatchers.IO) { + ): Result = withContext(Dispatchers.IO) { val serverUrl = settingsRepository.getStringOrNull(StringKey.ServerUrl) ?: return@withContext Result.failure(SyncError.NotConfigured("Server-URL nicht gesetzt")) var token = settingsRepository.getStringOrNull(StringKey.AuthAccessToken) @@ -258,7 +289,12 @@ internal class MessageRepositoryImpl @Inject constructor( } } if (response.status == HttpStatusCode.Created || response.status == HttpStatusCode.OK) { - Result.success(Unit) + val systemMessage = try { + response.body().systemMessage + } catch (_: Exception) { + null + } + Result.success(systemMessage) } else { Result.failure(SyncError.ServerError(response.status.value, response.status.description)) } @@ -273,5 +309,6 @@ internal class MessageRepositoryImpl @Inject constructor( private companion object { const val TAG = "MessageRepository" + const val SYSTEM_SENDER_USERNAME = "⚙️ System" } } diff --git a/server/src/main/kotlin/de/bollwerk/server/repository/MessageRepository.kt b/server/src/main/kotlin/de/bollwerk/server/repository/MessageRepository.kt index ea04bbf..13fd29b 100644 --- a/server/src/main/kotlin/de/bollwerk/server/repository/MessageRepository.kt +++ b/server/src/main/kotlin/de/bollwerk/server/repository/MessageRepository.kt @@ -7,6 +7,7 @@ import org.jetbrains.exposed.sql.JoinType import org.jetbrains.exposed.sql.SortOrder import org.jetbrains.exposed.sql.SqlExpressionBuilder.eq import org.jetbrains.exposed.sql.and +import org.jetbrains.exposed.sql.deleteWhere import org.jetbrains.exposed.sql.insert import org.jetbrains.exposed.sql.or import org.jetbrains.exposed.sql.selectAll @@ -102,4 +103,30 @@ internal class MessageRepository { ) } } + + /// Returns the total storage size (in bytes) of undelivered messages for a receiver. + fun getUndeliveredStorageBytes(receiverId: String): Long = transaction { + Messages.selectAll() + .where { (Messages.receiverId eq receiverId) and Messages.deliveredAt.isNull() } + .sumOf { it[Messages.body].toByteArray(Charsets.UTF_8).size.toLong() } + } + + /// Deletes oldest undelivered messages for a receiver until at least [bytesToFree] bytes are freed. + /// Returns the number of deleted messages. + fun evictOldestUndelivered(receiverId: String, bytesToFree: Long): Int = transaction { + val undelivered = Messages.selectAll() + .where { (Messages.receiverId eq receiverId) and Messages.deliveredAt.isNull() } + .orderBy(Messages.sentAt to SortOrder.ASC) + .toList() + + var freedBytes = 0L + var deletedCount = 0 + for (row in undelivered) { + if (freedBytes >= bytesToFree) break + freedBytes += row[Messages.body].toByteArray(Charsets.UTF_8).size.toLong() + Messages.deleteWhere { Messages.id eq row[Messages.id] } + deletedCount++ + } + deletedCount + } } diff --git a/server/src/main/kotlin/de/bollwerk/server/routes/MessageRoutes.kt b/server/src/main/kotlin/de/bollwerk/server/routes/MessageRoutes.kt index 498b612..7e6b519 100644 --- a/server/src/main/kotlin/de/bollwerk/server/routes/MessageRoutes.kt +++ b/server/src/main/kotlin/de/bollwerk/server/routes/MessageRoutes.kt @@ -5,6 +5,7 @@ import de.bollwerk.server.repository.MessageRepository import de.bollwerk.server.repository.UserRepository import de.bollwerk.server.security.UserPrincipal import de.bollwerk.server.websocket.WebSocketManager +import de.bollwerk.shared.model.SendMessageResponse import io.ktor.http.* import io.ktor.server.auth.* import io.ktor.server.request.* @@ -13,6 +14,8 @@ import io.ktor.server.routing.* import kotlinx.serialization.Serializable import java.util.UUID +private const val MAX_MAILBOX_BYTES = 10L * 1024 * 1024 // 10 MB + @Serializable internal data class SendMessageRequest( val id: String? = null, @@ -49,6 +52,19 @@ internal fun Route.messageRoutes( return@post } val msgId = request.id ?: UUID.randomUUID().toString() + val newMessageBytes = request.body.toByteArray(Charsets.UTF_8).size.toLong() + val currentSize = messageRepository.getUndeliveredStorageBytes(request.receiverId) + var systemMessage: String? = null + + if (currentSize + newMessageBytes > MAX_MAILBOX_BYTES) { + val bytesToFree = (currentSize + newMessageBytes) - MAX_MAILBOX_BYTES + val deletedCount = messageRepository.evictOldestUndelivered(request.receiverId, bytesToFree) + val receiverUser = userRepository.findById(request.receiverId) + val receiverName = receiverUser?.username ?: request.receiverId + systemMessage = "Nachrichtenlimit für $receiverName überschritten. " + + "$deletedCount ältere Nachricht(en) wurden gelöscht, da der Empfänger nicht abgeholt hat." + } + val message = messageRepository.save( id = msgId, senderId = principal.userId, @@ -61,7 +77,7 @@ internal fun Route.messageRoutes( if (wsManager.isOnline(request.receiverId)) { messageRepository.markDelivered(msgId) } - call.respond(HttpStatusCode.Created, message) + call.respond(HttpStatusCode.Created, SendMessageResponse(message = message, systemMessage = systemMessage)) } get("/{userId}") { diff --git a/server/src/test/kotlin/de/bollwerk/server/MessageApiTest.kt b/server/src/test/kotlin/de/bollwerk/server/MessageApiTest.kt index 0b783cc..1cba17e 100644 --- a/server/src/test/kotlin/de/bollwerk/server/MessageApiTest.kt +++ b/server/src/test/kotlin/de/bollwerk/server/MessageApiTest.kt @@ -3,6 +3,7 @@ package de.bollwerk.server import de.bollwerk.server.db.DatabaseFactory import de.bollwerk.server.model.ErrorResponse import de.bollwerk.shared.model.MessageDto +import de.bollwerk.shared.model.SendMessageResponse import io.ktor.client.request.* import io.ktor.client.statement.* import io.ktor.http.* @@ -15,6 +16,7 @@ import kotlinx.serialization.json.jsonPrimitive import org.junit.Assert.assertEquals import org.junit.Assert.assertFalse import org.junit.Assert.assertNotNull +import org.junit.Assert.assertNull import org.junit.Assert.assertTrue import org.junit.Test @@ -76,7 +78,8 @@ class MessageApiTest { // Then assertEquals(HttpStatusCode.Created, response.status) - val msg = json.decodeFromString(response.bodyAsText()) + val resp = json.decodeFromString(response.bodyAsText()) + val msg = resp.message assertEquals("Hallo Bob!", msg.body) assertEquals(aliceId, msg.senderId) assertEquals(bobId, msg.receiverId) @@ -213,8 +216,8 @@ class MessageApiTest { // Then assertEquals(HttpStatusCode.Created, response.status) - val msg = json.decodeFromString(response.bodyAsText()) - assertEquals(customId, msg.id) + val resp = json.decodeFromString(response.bodyAsText()) + assertEquals(customId, resp.message.id) } // ── Response format ────────────────────────────────────────────────────── @@ -235,7 +238,8 @@ class MessageApiTest { // Then assertEquals(HttpStatusCode.Created, response.status) - val msg = json.decodeFromString(response.bodyAsText()) + val resp = json.decodeFromString(response.bodyAsText()) + val msg = resp.message assertFalse(msg.id.isBlank()) assertEquals(aliceId, msg.senderId) assertEquals("alice", msg.senderUsername) @@ -243,4 +247,88 @@ class MessageApiTest { assertEquals("Test message", msg.body) assertEquals(1700000000000L, msg.sentAt) } + + // ── Mailbox Size Limit ─────────────────────────────────────────────────── + + @Test + fun test_sendMessage_belowLimit_noSystemMessage() = testApp { + // Given + val carolId = createUser("carol") + val daveId = createUser("dave") + val carolToken = createTestAccessToken(userId = carolId, username = "carol") + + // When + val response = client.post("/api/messages") { + bearerAuth(carolToken) + contentType(ContentType.Application.Json) + setBody("""{"receiverId":"$daveId","body":"Short message","sentAt":1700000000000}""") + } + + // Then + assertEquals(HttpStatusCode.Created, response.status) + val resp = json.decodeFromString(response.bodyAsText()) + assertNull(resp.systemMessage) + } + + @Test + fun test_sendMessage_exceedsLimit_evictsOldestAndReturnsSystemMessage() = testApp { + // Given + val ericId = createUser("eric") + val frankId = createUser("frank") + val ericToken = createTestAccessToken(userId = ericId, username = "eric") + + // Fill frank's mailbox with 11 large messages (11 * 900 KB = 9.9 MB < 10 MB) + val largeBody = "X".repeat(900_000) // ~900 KB, within 1 MB request limit + for (i in 1..11) { + client.post("/api/messages") { + bearerAuth(ericToken) + contentType(ContentType.Application.Json) + setBody("""{"receiverId":"$frankId","body":"$largeBody","sentAt":${1700000000000 + i}}""") + } + } + + // When – send one more large message that triggers eviction (11*900KB + 900KB = 10.8 MB > 10 MB) + val response = client.post("/api/messages") { + bearerAuth(ericToken) + contentType(ContentType.Application.Json) + setBody("""{"receiverId":"$frankId","body":"$largeBody","sentAt":1700000100000}""") + } + + // Then + assertEquals(HttpStatusCode.Created, response.status) + val resp = json.decodeFromString(response.bodyAsText()) + assertNotNull(resp.systemMessage) + assertTrue(resp.systemMessage!!.contains("frank")) + assertTrue(resp.systemMessage!!.contains("gelöscht")) + } + + @Test + fun test_sendMessage_deliveredMessagesNotCountedForLimit() = testApp { + // Given + val garyId = createUser("gary") + val helenId = createUser("helen") + val garyToken = createTestAccessToken(userId = garyId, username = "gary") + + // Fill helen's mailbox with 11 large messages (11 * 900 KB = 9.9 MB < 10 MB) + val largeBody = "X".repeat(900_000) + for (i in 1..11) { + client.post("/api/messages") { + bearerAuth(garyToken) + contentType(ContentType.Application.Json) + setBody("""{"receiverId":"$helenId","body":"$largeBody","sentAt":${1700000000000 + i}}""") + } + } + + // Send the 12th message that triggers eviction (11*900KB + 900KB > 10 MB) + val response = client.post("/api/messages") { + bearerAuth(garyToken) + contentType(ContentType.Application.Json) + setBody("""{"receiverId":"$helenId","body":"$largeBody","sentAt":1700000100000}""") + } + + // Then – eviction happened since all messages are undelivered + assertEquals(HttpStatusCode.Created, response.status) + val resp = json.decodeFromString(response.bodyAsText()) + assertNotNull(resp.systemMessage) + } } diff --git a/server/src/test/kotlin/de/bollwerk/server/Utf8MessagingTest.kt b/server/src/test/kotlin/de/bollwerk/server/Utf8MessagingTest.kt index c8a026b..f78c69b 100644 --- a/server/src/test/kotlin/de/bollwerk/server/Utf8MessagingTest.kt +++ b/server/src/test/kotlin/de/bollwerk/server/Utf8MessagingTest.kt @@ -1,7 +1,7 @@ package de.bollwerk.server import de.bollwerk.server.db.DatabaseFactory -import de.bollwerk.shared.model.MessageDto +import de.bollwerk.shared.model.SendMessageResponse import io.ktor.client.request.* import io.ktor.client.statement.* import io.ktor.http.* @@ -100,8 +100,8 @@ class Utf8MessagingTest { // Then assertEquals(HttpStatusCode.Created, response.status) - val msg = json.decodeFromString(response.bodyAsText()) - assertEquals(bodyText, msg.body) + val resp = json.decodeFromString(response.bodyAsText()) + assertEquals(bodyText, resp.message.body) } @Test @@ -121,8 +121,8 @@ class Utf8MessagingTest { // Then assertEquals(HttpStatusCode.Created, response.status) - val msg = json.decodeFromString(response.bodyAsText()) - assertEquals(bodyText, msg.body) + val resp = json.decodeFromString(response.bodyAsText()) + assertEquals(bodyText, resp.message.body) } @Test @@ -189,7 +189,7 @@ class Utf8MessagingTest { // Then assertEquals(HttpStatusCode.Created, response.status) - val msg = json.decodeFromString(response.bodyAsText()) - assertEquals(bodyText, msg.body) + val resp = json.decodeFromString(response.bodyAsText()) + assertEquals(bodyText, resp.message.body) } } diff --git a/shared/src/main/kotlin/de/bollwerk/shared/model/SendMessageResponse.kt b/shared/src/main/kotlin/de/bollwerk/shared/model/SendMessageResponse.kt new file mode 100644 index 0000000..048d731 --- /dev/null +++ b/shared/src/main/kotlin/de/bollwerk/shared/model/SendMessageResponse.kt @@ -0,0 +1,9 @@ +package de.bollwerk.shared.model + +import kotlinx.serialization.Serializable + +@Serializable +data class SendMessageResponse( + val message: MessageDto, + val systemMessage: String? = null +)