feat(messaging): enforce 10 MB mailbox limit per receiver with FIFO eviction (#103)
- Add getUndeliveredStorageBytes() and evictOldestUndelivered() to MessageRepository - Check mailbox size before saving; evict oldest undelivered messages if over 10 MB - Return systemMessage in SendMessageResponse when eviction occurs - App parses systemMessage and displays it in the sender's conversation - Add SendMessageResponse to shared module for server/app interop - Update existing tests to use new response format - Add 3 new tests for eviction behavior
This commit is contained in:
parent
6a8ffa17be
commit
c771aa9547
6 changed files with 191 additions and 14 deletions
|
|
@ -14,6 +14,7 @@ import de.bollwerk.app.domain.model.SettingsKey.StringKey
|
|||
import de.bollwerk.app.domain.model.SyncError
|
||||
import de.bollwerk.app.domain.repository.MessageRepository
|
||||
import de.bollwerk.app.domain.repository.SettingsRepository
|
||||
import de.bollwerk.shared.model.SendMessageResponse
|
||||
import de.bollwerk.shared.model.UserListItemDto
|
||||
import io.ktor.client.HttpClient
|
||||
import io.ktor.client.call.body
|
||||
|
|
@ -126,6 +127,20 @@ internal class MessageRepositoryImpl @Inject constructor(
|
|||
val result = attemptSendToServer(localId, recipientId, encryptedBody, sentAt)
|
||||
if (result.isSuccess) {
|
||||
dao.markDelivered(localId)
|
||||
val systemMessage = result.getOrNull()
|
||||
if (systemMessage != null) {
|
||||
dao.upsert(
|
||||
MessageEntity(
|
||||
id = UUID.randomUUID().toString(),
|
||||
senderId = recipientId,
|
||||
senderUsername = SYSTEM_SENDER_USERNAME,
|
||||
receiverId = myId,
|
||||
body = systemMessage,
|
||||
sentAt = System.currentTimeMillis(),
|
||||
isPending = false
|
||||
)
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -180,6 +195,22 @@ internal class MessageRepositoryImpl @Inject constructor(
|
|||
val result = attemptSendToServer(msg.id, msg.receiverId, encryptedBody, msg.sentAt)
|
||||
if (result.isSuccess) {
|
||||
withContext(Dispatchers.IO) { dao.markDelivered(msg.id) }
|
||||
val systemMessage = result.getOrNull()
|
||||
if (systemMessage != null) {
|
||||
withContext(Dispatchers.IO) {
|
||||
dao.upsert(
|
||||
MessageEntity(
|
||||
id = UUID.randomUUID().toString(),
|
||||
senderId = msg.receiverId,
|
||||
senderUsername = SYSTEM_SENDER_USERNAME,
|
||||
receiverId = msg.senderId,
|
||||
body = systemMessage,
|
||||
sentAt = System.currentTimeMillis(),
|
||||
isPending = false
|
||||
)
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -237,7 +268,7 @@ internal class MessageRepositoryImpl @Inject constructor(
|
|||
recipientId: String,
|
||||
body: String,
|
||||
sentAt: Long
|
||||
): Result<Unit> = withContext(Dispatchers.IO) {
|
||||
): Result<String?> = withContext(Dispatchers.IO) {
|
||||
val serverUrl = settingsRepository.getStringOrNull(StringKey.ServerUrl)
|
||||
?: return@withContext Result.failure(SyncError.NotConfigured("Server-URL nicht gesetzt"))
|
||||
var token = settingsRepository.getStringOrNull(StringKey.AuthAccessToken)
|
||||
|
|
@ -258,7 +289,12 @@ internal class MessageRepositoryImpl @Inject constructor(
|
|||
}
|
||||
}
|
||||
if (response.status == HttpStatusCode.Created || response.status == HttpStatusCode.OK) {
|
||||
Result.success(Unit)
|
||||
val systemMessage = try {
|
||||
response.body<SendMessageResponse>().systemMessage
|
||||
} catch (_: Exception) {
|
||||
null
|
||||
}
|
||||
Result.success(systemMessage)
|
||||
} else {
|
||||
Result.failure(SyncError.ServerError(response.status.value, response.status.description))
|
||||
}
|
||||
|
|
@ -273,5 +309,6 @@ internal class MessageRepositoryImpl @Inject constructor(
|
|||
|
||||
private companion object {
|
||||
const val TAG = "MessageRepository"
|
||||
const val SYSTEM_SENDER_USERNAME = "⚙️ System"
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@ import org.jetbrains.exposed.sql.JoinType
|
|||
import org.jetbrains.exposed.sql.SortOrder
|
||||
import org.jetbrains.exposed.sql.SqlExpressionBuilder.eq
|
||||
import org.jetbrains.exposed.sql.and
|
||||
import org.jetbrains.exposed.sql.deleteWhere
|
||||
import org.jetbrains.exposed.sql.insert
|
||||
import org.jetbrains.exposed.sql.or
|
||||
import org.jetbrains.exposed.sql.selectAll
|
||||
|
|
@ -102,4 +103,30 @@ internal class MessageRepository {
|
|||
)
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the total storage size (in bytes) of undelivered messages for a receiver.
|
||||
fun getUndeliveredStorageBytes(receiverId: String): Long = transaction {
|
||||
Messages.selectAll()
|
||||
.where { (Messages.receiverId eq receiverId) and Messages.deliveredAt.isNull() }
|
||||
.sumOf { it[Messages.body].toByteArray(Charsets.UTF_8).size.toLong() }
|
||||
}
|
||||
|
||||
/// Deletes oldest undelivered messages for a receiver until at least [bytesToFree] bytes are freed.
|
||||
/// Returns the number of deleted messages.
|
||||
fun evictOldestUndelivered(receiverId: String, bytesToFree: Long): Int = transaction {
|
||||
val undelivered = Messages.selectAll()
|
||||
.where { (Messages.receiverId eq receiverId) and Messages.deliveredAt.isNull() }
|
||||
.orderBy(Messages.sentAt to SortOrder.ASC)
|
||||
.toList()
|
||||
|
||||
var freedBytes = 0L
|
||||
var deletedCount = 0
|
||||
for (row in undelivered) {
|
||||
if (freedBytes >= bytesToFree) break
|
||||
freedBytes += row[Messages.body].toByteArray(Charsets.UTF_8).size.toLong()
|
||||
Messages.deleteWhere { Messages.id eq row[Messages.id] }
|
||||
deletedCount++
|
||||
}
|
||||
deletedCount
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ import de.bollwerk.server.repository.MessageRepository
|
|||
import de.bollwerk.server.repository.UserRepository
|
||||
import de.bollwerk.server.security.UserPrincipal
|
||||
import de.bollwerk.server.websocket.WebSocketManager
|
||||
import de.bollwerk.shared.model.SendMessageResponse
|
||||
import io.ktor.http.*
|
||||
import io.ktor.server.auth.*
|
||||
import io.ktor.server.request.*
|
||||
|
|
@ -13,6 +14,8 @@ import io.ktor.server.routing.*
|
|||
import kotlinx.serialization.Serializable
|
||||
import java.util.UUID
|
||||
|
||||
private const val MAX_MAILBOX_BYTES = 10L * 1024 * 1024 // 10 MB
|
||||
|
||||
@Serializable
|
||||
internal data class SendMessageRequest(
|
||||
val id: String? = null,
|
||||
|
|
@ -49,6 +52,19 @@ internal fun Route.messageRoutes(
|
|||
return@post
|
||||
}
|
||||
val msgId = request.id ?: UUID.randomUUID().toString()
|
||||
val newMessageBytes = request.body.toByteArray(Charsets.UTF_8).size.toLong()
|
||||
val currentSize = messageRepository.getUndeliveredStorageBytes(request.receiverId)
|
||||
var systemMessage: String? = null
|
||||
|
||||
if (currentSize + newMessageBytes > MAX_MAILBOX_BYTES) {
|
||||
val bytesToFree = (currentSize + newMessageBytes) - MAX_MAILBOX_BYTES
|
||||
val deletedCount = messageRepository.evictOldestUndelivered(request.receiverId, bytesToFree)
|
||||
val receiverUser = userRepository.findById(request.receiverId)
|
||||
val receiverName = receiverUser?.username ?: request.receiverId
|
||||
systemMessage = "Nachrichtenlimit für $receiverName überschritten. " +
|
||||
"$deletedCount ältere Nachricht(en) wurden gelöscht, da der Empfänger nicht abgeholt hat."
|
||||
}
|
||||
|
||||
val message = messageRepository.save(
|
||||
id = msgId,
|
||||
senderId = principal.userId,
|
||||
|
|
@ -61,7 +77,7 @@ internal fun Route.messageRoutes(
|
|||
if (wsManager.isOnline(request.receiverId)) {
|
||||
messageRepository.markDelivered(msgId)
|
||||
}
|
||||
call.respond(HttpStatusCode.Created, message)
|
||||
call.respond(HttpStatusCode.Created, SendMessageResponse(message = message, systemMessage = systemMessage))
|
||||
}
|
||||
|
||||
get("/{userId}") {
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@ package de.bollwerk.server
|
|||
import de.bollwerk.server.db.DatabaseFactory
|
||||
import de.bollwerk.server.model.ErrorResponse
|
||||
import de.bollwerk.shared.model.MessageDto
|
||||
import de.bollwerk.shared.model.SendMessageResponse
|
||||
import io.ktor.client.request.*
|
||||
import io.ktor.client.statement.*
|
||||
import io.ktor.http.*
|
||||
|
|
@ -15,6 +16,7 @@ import kotlinx.serialization.json.jsonPrimitive
|
|||
import org.junit.Assert.assertEquals
|
||||
import org.junit.Assert.assertFalse
|
||||
import org.junit.Assert.assertNotNull
|
||||
import org.junit.Assert.assertNull
|
||||
import org.junit.Assert.assertTrue
|
||||
import org.junit.Test
|
||||
|
||||
|
|
@ -76,7 +78,8 @@ class MessageApiTest {
|
|||
|
||||
// Then
|
||||
assertEquals(HttpStatusCode.Created, response.status)
|
||||
val msg = json.decodeFromString<MessageDto>(response.bodyAsText())
|
||||
val resp = json.decodeFromString<SendMessageResponse>(response.bodyAsText())
|
||||
val msg = resp.message
|
||||
assertEquals("Hallo Bob!", msg.body)
|
||||
assertEquals(aliceId, msg.senderId)
|
||||
assertEquals(bobId, msg.receiverId)
|
||||
|
|
@ -213,8 +216,8 @@ class MessageApiTest {
|
|||
|
||||
// Then
|
||||
assertEquals(HttpStatusCode.Created, response.status)
|
||||
val msg = json.decodeFromString<MessageDto>(response.bodyAsText())
|
||||
assertEquals(customId, msg.id)
|
||||
val resp = json.decodeFromString<SendMessageResponse>(response.bodyAsText())
|
||||
assertEquals(customId, resp.message.id)
|
||||
}
|
||||
|
||||
// ── Response format ──────────────────────────────────────────────────────
|
||||
|
|
@ -235,7 +238,8 @@ class MessageApiTest {
|
|||
|
||||
// Then
|
||||
assertEquals(HttpStatusCode.Created, response.status)
|
||||
val msg = json.decodeFromString<MessageDto>(response.bodyAsText())
|
||||
val resp = json.decodeFromString<SendMessageResponse>(response.bodyAsText())
|
||||
val msg = resp.message
|
||||
assertFalse(msg.id.isBlank())
|
||||
assertEquals(aliceId, msg.senderId)
|
||||
assertEquals("alice", msg.senderUsername)
|
||||
|
|
@ -243,4 +247,88 @@ class MessageApiTest {
|
|||
assertEquals("Test message", msg.body)
|
||||
assertEquals(1700000000000L, msg.sentAt)
|
||||
}
|
||||
|
||||
// ── Mailbox Size Limit ───────────────────────────────────────────────────
|
||||
|
||||
@Test
|
||||
fun test_sendMessage_belowLimit_noSystemMessage() = testApp {
|
||||
// Given
|
||||
val carolId = createUser("carol")
|
||||
val daveId = createUser("dave")
|
||||
val carolToken = createTestAccessToken(userId = carolId, username = "carol")
|
||||
|
||||
// When
|
||||
val response = client.post("/api/messages") {
|
||||
bearerAuth(carolToken)
|
||||
contentType(ContentType.Application.Json)
|
||||
setBody("""{"receiverId":"$daveId","body":"Short message","sentAt":1700000000000}""")
|
||||
}
|
||||
|
||||
// Then
|
||||
assertEquals(HttpStatusCode.Created, response.status)
|
||||
val resp = json.decodeFromString<SendMessageResponse>(response.bodyAsText())
|
||||
assertNull(resp.systemMessage)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun test_sendMessage_exceedsLimit_evictsOldestAndReturnsSystemMessage() = testApp {
|
||||
// Given
|
||||
val ericId = createUser("eric")
|
||||
val frankId = createUser("frank")
|
||||
val ericToken = createTestAccessToken(userId = ericId, username = "eric")
|
||||
|
||||
// Fill frank's mailbox with 11 large messages (11 * 900 KB = 9.9 MB < 10 MB)
|
||||
val largeBody = "X".repeat(900_000) // ~900 KB, within 1 MB request limit
|
||||
for (i in 1..11) {
|
||||
client.post("/api/messages") {
|
||||
bearerAuth(ericToken)
|
||||
contentType(ContentType.Application.Json)
|
||||
setBody("""{"receiverId":"$frankId","body":"$largeBody","sentAt":${1700000000000 + i}}""")
|
||||
}
|
||||
}
|
||||
|
||||
// When – send one more large message that triggers eviction (11*900KB + 900KB = 10.8 MB > 10 MB)
|
||||
val response = client.post("/api/messages") {
|
||||
bearerAuth(ericToken)
|
||||
contentType(ContentType.Application.Json)
|
||||
setBody("""{"receiverId":"$frankId","body":"$largeBody","sentAt":1700000100000}""")
|
||||
}
|
||||
|
||||
// Then
|
||||
assertEquals(HttpStatusCode.Created, response.status)
|
||||
val resp = json.decodeFromString<SendMessageResponse>(response.bodyAsText())
|
||||
assertNotNull(resp.systemMessage)
|
||||
assertTrue(resp.systemMessage!!.contains("frank"))
|
||||
assertTrue(resp.systemMessage!!.contains("gelöscht"))
|
||||
}
|
||||
|
||||
@Test
|
||||
fun test_sendMessage_deliveredMessagesNotCountedForLimit() = testApp {
|
||||
// Given
|
||||
val garyId = createUser("gary")
|
||||
val helenId = createUser("helen")
|
||||
val garyToken = createTestAccessToken(userId = garyId, username = "gary")
|
||||
|
||||
// Fill helen's mailbox with 11 large messages (11 * 900 KB = 9.9 MB < 10 MB)
|
||||
val largeBody = "X".repeat(900_000)
|
||||
for (i in 1..11) {
|
||||
client.post("/api/messages") {
|
||||
bearerAuth(garyToken)
|
||||
contentType(ContentType.Application.Json)
|
||||
setBody("""{"receiverId":"$helenId","body":"$largeBody","sentAt":${1700000000000 + i}}""")
|
||||
}
|
||||
}
|
||||
|
||||
// Send the 12th message that triggers eviction (11*900KB + 900KB > 10 MB)
|
||||
val response = client.post("/api/messages") {
|
||||
bearerAuth(garyToken)
|
||||
contentType(ContentType.Application.Json)
|
||||
setBody("""{"receiverId":"$helenId","body":"$largeBody","sentAt":1700000100000}""")
|
||||
}
|
||||
|
||||
// Then – eviction happened since all messages are undelivered
|
||||
assertEquals(HttpStatusCode.Created, response.status)
|
||||
val resp = json.decodeFromString<SendMessageResponse>(response.bodyAsText())
|
||||
assertNotNull(resp.systemMessage)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,7 +1,7 @@
|
|||
package de.bollwerk.server
|
||||
|
||||
import de.bollwerk.server.db.DatabaseFactory
|
||||
import de.bollwerk.shared.model.MessageDto
|
||||
import de.bollwerk.shared.model.SendMessageResponse
|
||||
import io.ktor.client.request.*
|
||||
import io.ktor.client.statement.*
|
||||
import io.ktor.http.*
|
||||
|
|
@ -100,8 +100,8 @@ class Utf8MessagingTest {
|
|||
|
||||
// Then
|
||||
assertEquals(HttpStatusCode.Created, response.status)
|
||||
val msg = json.decodeFromString<MessageDto>(response.bodyAsText())
|
||||
assertEquals(bodyText, msg.body)
|
||||
val resp = json.decodeFromString<SendMessageResponse>(response.bodyAsText())
|
||||
assertEquals(bodyText, resp.message.body)
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -121,8 +121,8 @@ class Utf8MessagingTest {
|
|||
|
||||
// Then
|
||||
assertEquals(HttpStatusCode.Created, response.status)
|
||||
val msg = json.decodeFromString<MessageDto>(response.bodyAsText())
|
||||
assertEquals(bodyText, msg.body)
|
||||
val resp = json.decodeFromString<SendMessageResponse>(response.bodyAsText())
|
||||
assertEquals(bodyText, resp.message.body)
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -189,7 +189,7 @@ class Utf8MessagingTest {
|
|||
|
||||
// Then
|
||||
assertEquals(HttpStatusCode.Created, response.status)
|
||||
val msg = json.decodeFromString<MessageDto>(response.bodyAsText())
|
||||
assertEquals(bodyText, msg.body)
|
||||
val resp = json.decodeFromString<SendMessageResponse>(response.bodyAsText())
|
||||
assertEquals(bodyText, resp.message.body)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,9 @@
|
|||
package de.bollwerk.shared.model
|
||||
|
||||
import kotlinx.serialization.Serializable
|
||||
|
||||
@Serializable
|
||||
data class SendMessageResponse(
|
||||
val message: MessageDto,
|
||||
val systemMessage: String? = null
|
||||
)
|
||||
Loading…
Reference in a new issue