diff --git a/obp-api/src/main/scala/bootstrap/liftweb/Boot.scala b/obp-api/src/main/scala/bootstrap/liftweb/Boot.scala index 5c79a617a7..d64022e342 100644 --- a/obp-api/src/main/scala/bootstrap/liftweb/Boot.scala +++ b/obp-api/src/main/scala/bootstrap/liftweb/Boot.scala @@ -762,7 +762,7 @@ class Boot extends MdcLoggable { def schemifyAll() = { Schemifier.schemify(true, Schemifier.infoF _, ToSchemify.models: _*) - // Create default system-level "general" chat room (all_users_are_participants = true) + // Create default system-level "general" chat room (is_open_room = true) code.chat.ChatRoomTrait.chatRoomProvider.vend.getOrCreateDefaultRoom() } diff --git a/obp-api/src/main/scala/code/api/util/ApiRole.scala b/obp-api/src/main/scala/code/api/util/ApiRole.scala index 026dbbf765..213771b7e7 100644 --- a/obp-api/src/main/scala/code/api/util/ApiRole.scala +++ b/obp-api/src/main/scala/code/api/util/ApiRole.scala @@ -1379,10 +1379,10 @@ object ApiRole extends MdcLoggable{ lazy val canArchiveBankChatRoom = CanArchiveBankChatRoom() case class CanArchiveSystemChatRoom(requiresBankId: Boolean = false) extends ApiRole lazy val canArchiveSystemChatRoom = CanArchiveSystemChatRoom() - case class CanSetBankChatRoomAUAP(requiresBankId: Boolean = true) extends ApiRole - lazy val canSetBankChatRoomAUAP = CanSetBankChatRoomAUAP() - case class CanSetSystemChatRoomAUAP(requiresBankId: Boolean = false) extends ApiRole - lazy val canSetSystemChatRoomAUAP = CanSetSystemChatRoomAUAP() + case class CanSetBankChatRoomIsOpenRoom(requiresBankId: Boolean = true) extends ApiRole + lazy val canSetBankChatRoomIsOpenRoom = CanSetBankChatRoomIsOpenRoom() + case class CanSetSystemChatRoomIsOpenRoom(requiresBankId: Boolean = false) extends ApiRole + lazy val canSetSystemChatRoomIsOpenRoom = CanSetSystemChatRoomIsOpenRoom() private val dynamicApiRoles = new ConcurrentHashMap[String, ApiRole] diff --git a/obp-api/src/main/scala/code/api/util/Glossary.scala b/obp-api/src/main/scala/code/api/util/Glossary.scala index 12e2deb972..fe90ba0fce 100644 --- a/obp-api/src/main/scala/code/api/util/Glossary.scala +++ b/obp-api/src/main/scala/code/api/util/Glossary.scala @@ -5140,13 +5140,13 @@ object Glossary extends MdcLoggable { |### Chat Rooms |A Chat Room is a named space where participants exchange messages. | - |A system-level room called **general** is created automatically at startup with **all_users_are_participants = true** — meaning every authenticated user can read and send messages without needing an explicit Participant record. + |A system-level room called **general** is created automatically at startup with **is_open_room = true** — meaning every authenticated user can read and send messages without needing an explicit Participant record. | |Each room has: |- A unique **joining key** (UUID) that can be shared to invite others. The key can be refreshed to revoke access. |- A **name** that is unique within its scope (per bank, or globally for system-level rooms). |- An optional **bank_id** — if set, the room is scoped to that bank. If empty, it is a system-level room. - |- An **all_users_are_participants** flag — if true, all authenticated users are treated as implicit participants without needing a database record. They can read and send messages but have no special permissions. + |- An **is_open_room** flag — if true, all authenticated users are treated as implicit participants without needing a database record. They can read and send messages but have no special permissions. | |### Participants |A Participant is a user or consumer (application/bot) that belongs to a Chat Room. Participants can: @@ -5192,13 +5192,22 @@ object Glossary extends MdcLoggable { |### Polling |Clients retrieve new messages by polling the GET messages endpoint with a **since** parameter (timestamp). This avoids the complexity of WebSocket infrastructure while providing a simple, reliable mechanism for near-real-time updates. | + |### gRPC Streaming (real-time) + |For clients that need true real-time updates without polling, OBP exposes a **ChatStreamService** over gRPC (see `chat.proto`, package `code.obp.grpc.chat.g1`). It provides four server-streaming / bidirectional RPCs: + |- **StreamMessages(StreamMessagesRequest) → stream ChatMessageEvent** — push new/edited/deleted messages for a given chat room as they happen. + |- **StreamTyping(stream TypingEvent) → stream TypingIndicator** — bidirectional stream: clients send their own typing state, server fans out typing indicators from other participants. + |- **StreamPresence(StreamPresenceRequest) → stream PresenceEvent** — online/offline updates for participants in a room. + |- **StreamUnreadCounts(StreamUnreadCountsRequest) → stream UnreadCountEvent** — per-room unread counters for the authenticated user. + | + |gRPC calls are authenticated via the same credentials as REST (see `AuthInterceptor`). The REST polling endpoints remain the canonical API; the gRPC streams are an optional push channel for clients that want lower latency and less request overhead. + | |## API Endpoints | - |All chat endpoints are available in two forms: + |All chat REST endpoints are available in two forms: |- **Bank-scoped**: /banks/BANK_ID/chat-rooms/... |- **System-level**: /chat-rooms/... | - |See the API Explorer for the full list of Chat endpoints, tagged with **Chat**. + |See the API Explorer for the full list of Chat endpoints, tagged with **Chat**. For the real-time streaming surface, see `chat.proto` / `ChatStreamServiceImpl`. | """) diff --git a/obp-api/src/main/scala/code/api/util/migration/Migration.scala b/obp-api/src/main/scala/code/api/util/migration/Migration.scala index 544b3ee80a..7128c44ded 100644 --- a/obp-api/src/main/scala/code/api/util/migration/Migration.scala +++ b/obp-api/src/main/scala/code/api/util/migration/Migration.scala @@ -115,6 +115,7 @@ object Migration extends MdcLoggable { updateConsentViewAddJwtPayload(startedBeforeSchemifier) updateConsentViewAddJwtExpiresAt(startedBeforeSchemifier) updateAccountAccessWithViewsViewUnionAll(startedBeforeSchemifier) + migrateChatRoomIsOpenRoom() } private def dummyScript(): Boolean = { @@ -649,6 +650,13 @@ object Migration extends MdcLoggable { } } } + + private def migrateChatRoomIsOpenRoom(): Boolean = { + val name = nameOf(migrateChatRoomIsOpenRoom) + runOnce(name) { + MigrationOfChatRoomIsOpenRoom.migrateColumn(name) + } + } } /** diff --git a/obp-api/src/main/scala/code/api/util/migration/MigrationOfChatRoomIsOpenRoom.scala b/obp-api/src/main/scala/code/api/util/migration/MigrationOfChatRoomIsOpenRoom.scala new file mode 100644 index 0000000000..89367c97f5 --- /dev/null +++ b/obp-api/src/main/scala/code/api/util/migration/MigrationOfChatRoomIsOpenRoom.scala @@ -0,0 +1,87 @@ +package code.api.util.migration + +import code.api.util.APIUtil +import code.api.util.migration.Migration.{DbFunction, saveLog} +import code.chat.ChatRoom +import net.liftweb.common.Full +import net.liftweb.db.DB +import net.liftweb.mapper.Schemifier +import net.liftweb.util.DefaultConnectionIdentifier + +object MigrationOfChatRoomIsOpenRoom { + + /** + * Migrate the old `allusersareparticipants` column to the new `isopenroom` column. + * + * Schemifier will have already created the new `isopenroom` column (defaulting to false). + * This migration copies data from the old column and then drops it. + * + * If the old column does not exist (fresh install), this is a no-op. + */ + def migrateColumn(name: String): Boolean = { + DbFunction.tableExists(ChatRoom) match { + case true => + val startDate = System.currentTimeMillis() + val commitId: String = APIUtil.gitCommit + + // Check if the old column exists before attempting migration + val oldColumnExists = try { + DB.use(DefaultConnectionIdentifier) { conn => + val rs = conn.getMetaData.getColumns(null, null, "chatroom", "allusersareparticipants") + val exists = rs.next() + rs.close() + exists + } + } catch { + case _: Throwable => false + } + + if (!oldColumnExists) { + val endDate = System.currentTimeMillis() + val comment = "Old column allusersareparticipants does not exist (fresh install). No migration needed." + saveLog(name, commitId, true, startDate, endDate, comment) + return true + } + + var isSuccessful = false + + val executedSql = + DbFunction.maybeWrite(true, Schemifier.infoF _) { + APIUtil.getPropsValue("db.driver") match { + case Full(dbDriver) if dbDriver.contains("com.microsoft.sqlserver.jdbc.SQLServerDriver") => + () => + """ + |UPDATE chatroom SET isopenroom = allusersareparticipants WHERE allusersareparticipants IS NOT NULL; + |ALTER TABLE chatroom DROP COLUMN allusersareparticipants; + |""".stripMargin + case _ => + // PostgreSQL and MySQL + () => + """ + |UPDATE chatroom SET isopenroom = allusersareparticipants WHERE allusersareparticipants IS NOT NULL; + |ALTER TABLE chatroom DROP COLUMN allusersareparticipants; + |""".stripMargin + } + } + + val endDate = System.currentTimeMillis() + val comment: String = + s"""Executed SQL: + |$executedSql + |""".stripMargin + isSuccessful = true + saveLog(name, commitId, isSuccessful, startDate, endDate, comment) + isSuccessful + + case false => + val startDate = System.currentTimeMillis() + val commitId: String = APIUtil.gitCommit + val isSuccessful = false + val endDate = System.currentTimeMillis() + val comment: String = + s"""${ChatRoom._dbTableNameLC} table does not exist""" + saveLog(name, commitId, isSuccessful, startDate, endDate, comment) + isSuccessful + } + } +} diff --git a/obp-api/src/main/scala/code/api/v6_0_0/APIMethods600.scala b/obp-api/src/main/scala/code/api/v6_0_0/APIMethods600.scala index b85da9cac8..1a553f4f9d 100644 --- a/obp-api/src/main/scala/code/api/v6_0_0/APIMethods600.scala +++ b/obp-api/src/main/scala/code/api/v6_0_0/APIMethods600.scala @@ -12828,7 +12828,7 @@ trait APIMethods600 { created_by = "user-id-123", created_by_username = "robert.x.0.gh", created_by_provider = "https://github.com", - all_users_are_participants = false, + is_open_room = false, is_archived = false, created_at = new java.util.Date(), updated_at = new java.util.Date() @@ -12895,7 +12895,7 @@ trait APIMethods600 { created_by = "user-id-123", created_by_username = "robert.x.0.gh", created_by_provider = "https://github.com", - all_users_are_participants = false, + is_open_room = false, is_archived = false, created_at = new java.util.Date(), updated_at = new java.util.Date() @@ -12961,7 +12961,7 @@ trait APIMethods600 { created_by = "user-id-123", created_by_username = "robert.x.0.gh", created_by_provider = "https://github.com", - all_users_are_participants = false, + is_open_room = false, is_archived = false, created_at = new java.util.Date(), updated_at = new java.util.Date() @@ -13013,7 +13013,7 @@ trait APIMethods600 { created_by = "user-id-123", created_by_username = "robert.x.0.gh", created_by_provider = "https://github.com", - all_users_are_participants = false, + is_open_room = false, is_archived = false, created_at = new java.util.Date(), updated_at = new java.util.Date() @@ -13065,7 +13065,7 @@ trait APIMethods600 { created_by = "user-id-123", created_by_username = "robert.x.0.gh", created_by_provider = "https://github.com", - all_users_are_participants = false, + is_open_room = false, is_archived = false, created_at = new java.util.Date(), updated_at = new java.util.Date() @@ -13124,7 +13124,7 @@ trait APIMethods600 { created_by = "user-id-123", created_by_username = "robert.x.0.gh", created_by_provider = "https://github.com", - all_users_are_participants = false, + is_open_room = false, is_archived = false, created_at = new java.util.Date(), updated_at = new java.util.Date() @@ -13183,7 +13183,7 @@ trait APIMethods600 { created_by = "user-id-123", created_by_username = "robert.x.0.gh", created_by_provider = "https://github.com", - all_users_are_participants = false, + is_open_room = false, is_archived = false, created_at = new java.util.Date(), updated_at = new java.util.Date() @@ -13252,7 +13252,7 @@ trait APIMethods600 { created_by = "user-id-123", created_by_username = "robert.x.0.gh", created_by_provider = "https://github.com", - all_users_are_participants = false, + is_open_room = false, is_archived = false, created_at = new java.util.Date(), updated_at = new java.util.Date() @@ -13416,7 +13416,7 @@ trait APIMethods600 { created_by = "user-id-123", created_by_username = "robert.x.0.gh", created_by_provider = "https://github.com", - all_users_are_participants = false, + is_open_room = false, is_archived = true, created_at = new java.util.Date(), updated_at = new java.util.Date() @@ -13477,7 +13477,7 @@ trait APIMethods600 { created_by = "user-id-123", created_by_username = "robert.x.0.gh", created_by_provider = "https://github.com", - all_users_are_participants = false, + is_open_room = false, is_archived = true, created_at = new java.util.Date(), updated_at = new java.util.Date() @@ -13514,19 +13514,19 @@ trait APIMethods600 { } } - // 6c. setBankChatRoomAllUsersAreParticipants + // 6c. setBankChatRoomOpenRoom staticResourceDocs += ResourceDoc( - setBankChatRoomAllUsersAreParticipants, + setBankChatRoomOpenRoom, implementedInApiVersion, - nameOf(setBankChatRoomAllUsersAreParticipants), + nameOf(setBankChatRoomOpenRoom), "PUT", - "/banks/BANK_ID/chat-rooms/CHAT_ROOM_ID/all-users-are-participants", + "/banks/BANK_ID/chat-rooms/CHAT_ROOM_ID/open-room", "Set Chat Room All Users Are Participants", s"""Set whether all authenticated users are implicit participants of this chat room. | |If true, all users can read and send messages without needing an explicit Participant record. | - |Requires the CanSetBankChatRoomAUAP role. + |Requires the CanSetBankChatRoomIsOpenRoom role. | |Authentication is Required | @@ -13541,7 +13541,7 @@ trait APIMethods600 { created_by = "user-id-123", created_by_username = "username", created_by_provider = "provider", - all_users_are_participants = true, + is_open_room = true, is_archived = false, created_at = new java.util.Date(), updated_at = new java.util.Date() @@ -13553,23 +13553,23 @@ trait APIMethods600 { UnknownError ), List(apiTagChat), - Some(List(canSetBankChatRoomAUAP)) + Some(List(canSetBankChatRoomIsOpenRoom)) ) - lazy val setBankChatRoomAllUsersAreParticipants: OBPEndpoint = { - case "banks" :: BankId(bankId) :: "chat-rooms" :: chatRoomId :: "all-users-are-participants" :: Nil JsonPut json -> _ => { + lazy val setBankChatRoomOpenRoom: OBPEndpoint = { + case "banks" :: BankId(bankId) :: "chat-rooms" :: chatRoomId :: "open-room" :: Nil JsonPut json -> _ => { cc => implicit val ec = EndpointContext(Some(cc)) for { (Full(u), callContext) <- authenticatedAccess(cc) - _ <- NewStyle.function.hasEntitlement(bankId.value, u.userId, canSetBankChatRoomAUAP, callContext) + _ <- NewStyle.function.hasEntitlement(bankId.value, u.userId, canSetBankChatRoomIsOpenRoom, callContext) _ <- Future { code.chat.ChatRoomTrait.chatRoomProvider.vend.getChatRoom(chatRoomId) } map { x => unboxFullOrFail(x, callContext, ChatRoomNotFound, 404) } - allUsersAreParticipants = (json \ "all_users_are_participants").extractOrElse[Boolean](false) + isOpenRoom = (json \ "is_open_room").extractOrElse[Boolean](false) updatedRoom <- Future { - code.chat.ChatRoomTrait.chatRoomProvider.vend.setAllUsersAreParticipants(chatRoomId, allUsersAreParticipants) + code.chat.ChatRoomTrait.chatRoomProvider.vend.setIsOpenRoom(chatRoomId, isOpenRoom) } map { x => unboxFullOrFail(x, callContext, s"$UnknownError Cannot update chat room", 400) } @@ -13579,19 +13579,19 @@ trait APIMethods600 { } } - // 6d. setSystemChatRoomAllUsersAreParticipants + // 6d. setSystemChatRoomOpenRoom staticResourceDocs += ResourceDoc( - setSystemChatRoomAllUsersAreParticipants, + setSystemChatRoomOpenRoom, implementedInApiVersion, - nameOf(setSystemChatRoomAllUsersAreParticipants), + nameOf(setSystemChatRoomOpenRoom), "PUT", - "/chat-rooms/CHAT_ROOM_ID/all-users-are-participants", + "/chat-rooms/CHAT_ROOM_ID/open-room", "Set System Chat Room All Users Are Participants", s"""Set whether all authenticated users are implicit participants of this system-level chat room. | |If true, all users can read and send messages without needing an explicit Participant record. | - |Requires the CanSetSystemChatRoomAUAP role. + |Requires the CanSetSystemChatRoomIsOpenRoom role. | |Authentication is Required | @@ -13606,7 +13606,7 @@ trait APIMethods600 { created_by = "user-id-123", created_by_username = "username", created_by_provider = "provider", - all_users_are_participants = true, + is_open_room = true, is_archived = false, created_at = new java.util.Date(), updated_at = new java.util.Date() @@ -13618,23 +13618,23 @@ trait APIMethods600 { UnknownError ), List(apiTagChat), - Some(List(canSetSystemChatRoomAUAP)) + Some(List(canSetSystemChatRoomIsOpenRoom)) ) - lazy val setSystemChatRoomAllUsersAreParticipants: OBPEndpoint = { - case "chat-rooms" :: chatRoomId :: "all-users-are-participants" :: Nil JsonPut json -> _ => { + lazy val setSystemChatRoomOpenRoom: OBPEndpoint = { + case "chat-rooms" :: chatRoomId :: "open-room" :: Nil JsonPut json -> _ => { cc => implicit val ec = EndpointContext(Some(cc)) for { (Full(u), callContext) <- authenticatedAccess(cc) - _ <- NewStyle.function.hasEntitlement("", u.userId, canSetSystemChatRoomAUAP, callContext) + _ <- NewStyle.function.hasEntitlement("", u.userId, canSetSystemChatRoomIsOpenRoom, callContext) _ <- Future { code.chat.ChatRoomTrait.chatRoomProvider.vend.getChatRoom(chatRoomId) } map { x => unboxFullOrFail(x, callContext, ChatRoomNotFound, 404) } - allUsersAreParticipants = (json \ "all_users_are_participants").extractOrElse[Boolean](false) + isOpenRoom = (json \ "is_open_room").extractOrElse[Boolean](false) updatedRoom <- Future { - code.chat.ChatRoomTrait.chatRoomProvider.vend.setAllUsersAreParticipants(chatRoomId, allUsersAreParticipants) + code.chat.ChatRoomTrait.chatRoomProvider.vend.setIsOpenRoom(chatRoomId, isOpenRoom) } map { x => unboxFullOrFail(x, callContext, s"$UnknownError Cannot update chat room", 400) } @@ -15650,6 +15650,7 @@ trait APIMethods600 { x => unboxFullOrFail(x, callContext, s"$UnknownError Cannot add reaction", 400) } } yield { + code.chat.ChatEventPublisher.afterReactionAdd(chatRoomId, chatMessageId, postJson.emoji, u.userId, u.name, u.provider) (JSONFactory600.createReactionJson(reaction), HttpCode.`201`(callContext)) } } @@ -15724,6 +15725,7 @@ trait APIMethods600 { x => unboxFullOrFail(x, callContext, s"$UnknownError Cannot add reaction", 400) } } yield { + code.chat.ChatEventPublisher.afterReactionAdd(chatRoomId, chatMessageId, postJson.emoji, u.userId, u.name, u.provider) (JSONFactory600.createReactionJson(reaction), HttpCode.`201`(callContext)) } } @@ -15787,6 +15789,7 @@ trait APIMethods600 { x => unboxFullOrFail(x, callContext, s"$UnknownError Cannot remove reaction", 400) } } yield { + code.chat.ChatEventPublisher.afterReactionRemove(chatRoomId, chatMessageId, decodedEmoji, u.userId, u.name, u.provider) (EmptyBody, HttpCode.`204`(callContext)) } } @@ -15850,6 +15853,7 @@ trait APIMethods600 { x => unboxFullOrFail(x, callContext, s"$UnknownError Cannot remove reaction", 400) } } yield { + code.chat.ChatEventPublisher.afterReactionRemove(chatRoomId, chatMessageId, decodedEmoji, u.userId, u.name, u.provider) (EmptyBody, HttpCode.`204`(callContext)) } } @@ -16246,7 +16250,7 @@ trait APIMethods600 { created_by = "user-id-123", created_by_username = "robert.x.0.gh", created_by_provider = "https://github.com", - all_users_are_participants = false, + is_open_room = false, is_archived = false, created_at = new java.util.Date(), updated_at = new java.util.Date() @@ -16315,7 +16319,13 @@ trait APIMethods600 { } unreadCounts <- Future { participantRecords.flatMap { p => - val count = code.chat.ChatMessageTrait.chatMessageProvider.vend.getUnreadCount(p.chatRoomId, p.lastReadAt) + val room = code.chat.ChatRoomTrait.chatRoomProvider.vend.getChatRoom(p.chatRoomId) + val isOpenRoom = room.map(_.isOpenRoom).openOr(false) + val count = if (isOpenRoom) { + code.chat.ChatMessageTrait.chatMessageProvider.vend.getUnreadMentionCount(p.chatRoomId, p.userId, p.lastReadAt) + } else { + code.chat.ChatMessageTrait.chatMessageProvider.vend.getUnreadCount(p.chatRoomId, p.userId, p.lastReadAt) + } count.toList.map(c => UnreadCountJsonV600(chat_room_id = p.chatRoomId, unread_count = c)) } } diff --git a/obp-api/src/main/scala/code/api/v6_0_0/JSONFactory6.0.0.scala b/obp-api/src/main/scala/code/api/v6_0_0/JSONFactory6.0.0.scala index a91c879955..88dda50215 100644 --- a/obp-api/src/main/scala/code/api/v6_0_0/JSONFactory6.0.0.scala +++ b/obp-api/src/main/scala/code/api/v6_0_0/JSONFactory6.0.0.scala @@ -1190,7 +1190,7 @@ case class ChatRoomJsonV600( created_by: String, created_by_username: String, created_by_provider: String, - all_users_are_participants: Boolean, + is_open_room: Boolean, is_archived: Boolean, created_at: java.util.Date, updated_at: java.util.Date @@ -2972,7 +2972,7 @@ object JSONFactory600 extends CustomJsonFormats with MdcLoggable { created_by = room.createdBy, created_by_username = creator.map(_.name).getOrElse(""), created_by_provider = creator.map(_.provider).getOrElse(""), - all_users_are_participants = room.allUsersAreParticipants, + is_open_room = room.isOpenRoom, is_archived = room.isArchived, created_at = room.createdDate, updated_at = room.updatedDate diff --git a/obp-api/src/main/scala/code/chat/ChatEventPublisher.scala b/obp-api/src/main/scala/code/chat/ChatEventPublisher.scala index 275194a6f8..1a9ba50429 100644 --- a/obp-api/src/main/scala/code/chat/ChatEventPublisher.scala +++ b/obp-api/src/main/scala/code/chat/ChatEventPublisher.scala @@ -3,6 +3,7 @@ package code.chat import code.util.Helper.MdcLoggable import net.liftweb.json import net.liftweb.json.Serialization.write +import scala.concurrent.{Future, ExecutionContext} /** * Publishes chat events to ChatEventBus after REST operations. @@ -57,6 +58,9 @@ object ChatEventPublisher extends MdcLoggable { def afterCreate(msg: ChatMessageTrait, senderUsername: String, senderProvider: String, senderConsumerName: String): Unit = { publishMessageEvent("new", msg, senderUsername, senderProvider, senderConsumerName) + // Sending a message means the sender has "read" the room up to this point + ParticipantTrait.participantProvider.vend.updateLastReadAt(msg.chatRoomId, msg.senderUserId) + Future { broadcastUnreadCounts(msg) }(ExecutionContext.global) } def afterUpdate(msg: ChatMessageTrait, senderUsername: String, senderProvider: String, senderConsumerName: String): Unit = { @@ -67,6 +71,16 @@ object ChatEventPublisher extends MdcLoggable { publishMessageEvent("deleted", msg, senderUsername, senderProvider, senderConsumerName) } + def afterReactionAdd(chatRoomId: String, chatMessageId: String, emoji: String, + userId: String, username: String, provider: String): Unit = { + publishReactionEvent("reacted", chatRoomId, chatMessageId, emoji, userId, username, provider) + } + + def afterReactionRemove(chatRoomId: String, chatMessageId: String, emoji: String, + userId: String, username: String, provider: String): Unit = { + publishReactionEvent("unreacted", chatRoomId, chatMessageId, emoji, userId, username, provider) + } + def afterTyping(chatRoomId: String, userId: String, username: String, provider: String, isTyping: Boolean): Unit = { val event = TypingEvent(chatRoomId, userId, username, provider, isTyping) ChatEventBus.publishTyping(chatRoomId, write(event)) @@ -82,6 +96,45 @@ object ChatEventPublisher extends MdcLoggable { ChatEventBus.publishUnread(userId, write(event)) } + /** + * Broadcast unread counts to affected participants after a new message. + * + * "Open rooms" (isOpenRoom=true) + * only notify users who are explicitly @mentioned, to avoid generating + * hundreds of thousands of publish events for large rooms. + * + * Private rooms notify all participants except the sender. + * + * Unread counts respect a 60-day cutoff — older messages are ignored. + */ + private def broadcastUnreadCounts(msg: ChatMessageTrait): Unit = { + try { + val room = ChatRoomTrait.chatRoomProvider.vend.getChatRoom(msg.chatRoomId) + val isOpenRoom = room.map(_.isOpenRoom).openOr(false) + + val participants = ParticipantTrait.participantProvider.vend + .getParticipants(msg.chatRoomId).openOr(List.empty) + + for (p <- participants if p.userId != msg.senderUserId) { + if (isOpenRoom) { + // Open rooms: only notify explicitly mentioned users + if (msg.mentionedUserIds.contains(p.userId)) { + val count = ChatMessageTrait.chatMessageProvider.vend + .getUnreadMentionCount(msg.chatRoomId, p.userId, p.lastReadAt).openOr(0L) + afterUnreadCountChange(p.userId, msg.chatRoomId, count) + } + } else { + // Private rooms: notify all participants + val count = ChatMessageTrait.chatMessageProvider.vend + .getUnreadCount(msg.chatRoomId, p.userId, p.lastReadAt).openOr(0L) + afterUnreadCountChange(p.userId, msg.chatRoomId, count) + } + } + } catch { + case e: Throwable => logger.error(s"Failed to broadcast unread counts: ${e.getMessage}") + } + } + private def publishMessageEvent( eventType: String, msg: ChatMessageTrait, @@ -109,4 +162,35 @@ object ChatEventPublisher extends MdcLoggable { ) ChatEventBus.publishMessage(msg.chatRoomId, write(event)) } + + private def publishReactionEvent( + eventType: String, + chatRoomId: String, + chatMessageId: String, + emoji: String, + userId: String, + username: String, + provider: String + ): Unit = { + val now = dateFormat.format(new java.util.Date()) + val event = MessageEvent( + event_type = eventType, + chat_message_id = chatMessageId, + chat_room_id = chatRoomId, + sender_user_id = userId, + sender_consumer_id = "", + sender_username = username, + sender_provider = provider, + sender_consumer_name = "", + content = emoji, + message_type = "", + mentioned_user_ids = List.empty, + reply_to_message_id = "", + thread_id = "", + is_deleted = false, + created_at = now, + updated_at = now + ) + ChatEventBus.publishMessage(chatRoomId, write(event)) + } } diff --git a/obp-api/src/main/scala/code/chat/ChatMessageTrait.scala b/obp-api/src/main/scala/code/chat/ChatMessageTrait.scala index b6fe7bf05b..b3a1a5e9d0 100644 --- a/obp-api/src/main/scala/code/chat/ChatMessageTrait.scala +++ b/obp-api/src/main/scala/code/chat/ChatMessageTrait.scala @@ -25,7 +25,8 @@ trait ChatMessageProvider { def getMessages(chatRoomId: String, limit: Int, offset: Int, fromDate: Date, toDate: Date): Box[List[ChatMessageTrait]] def getThreadReplies(threadId: String): Box[List[ChatMessageTrait]] def getMentionsForUser(userId: String, limit: Int, offset: Int): Box[List[ChatMessageTrait]] - def getUnreadCount(chatRoomId: String, sinceDate: Date): Box[Long] + def getUnreadCount(chatRoomId: String, userId: String, sinceDate: Date): Box[Long] + def getUnreadMentionCount(chatRoomId: String, userId: String, sinceDate: Date): Box[Long] def updateMessage(chatMessageId: String, content: String): Box[ChatMessageTrait] def softDeleteMessage(chatMessageId: String): Box[ChatMessageTrait] diff --git a/obp-api/src/main/scala/code/chat/ChatPermissions.scala b/obp-api/src/main/scala/code/chat/ChatPermissions.scala index 33f9cb5884..8a6fa53dd0 100644 --- a/obp-api/src/main/scala/code/chat/ChatPermissions.scala +++ b/obp-api/src/main/scala/code/chat/ChatPermissions.scala @@ -4,7 +4,7 @@ import java.util.Date import net.liftweb.common.{Box, Empty, Failure, Full} /** - * A synthetic participant for rooms with allUsersAreParticipants = true. + * A synthetic participant for rooms with isOpenRoom = true. * No database row exists — the user is implicitly a member with no special permissions. */ case class ImplicitParticipant(chatRoomId: String, userId: String) extends ParticipantTrait { @@ -34,15 +34,15 @@ object ChatPermissions { /** * Check if user is a participant of the room. Returns the Participant record if found, - * or a synthetic participant (via the room's allUsersAreParticipants flag) with empty permissions. + * or a synthetic participant (via the room's isOpenRoom flag) with empty permissions. */ def isParticipant(chatRoomId: String, userId: String): Box[ParticipantTrait] = { ParticipantTrait.participantProvider.vend.getParticipant(chatRoomId, userId) match { case Full(p) => Full(p) case _ => - // Check if room has allUsersAreParticipants = true + // Check if room has isOpenRoom = true ChatRoomTrait.chatRoomProvider.vend.getChatRoom(chatRoomId) match { - case Full(room) if room.allUsersAreParticipants => + case Full(room) if room.isOpenRoom => Full(ImplicitParticipant(chatRoomId, userId)) case _ => Empty } diff --git a/obp-api/src/main/scala/code/chat/ChatRoomTrait.scala b/obp-api/src/main/scala/code/chat/ChatRoomTrait.scala index cf9a521d80..3b4c3615d6 100644 --- a/obp-api/src/main/scala/code/chat/ChatRoomTrait.scala +++ b/obp-api/src/main/scala/code/chat/ChatRoomTrait.scala @@ -29,7 +29,7 @@ trait ChatRoomProvider { description: Option[String] ): Box[ChatRoomTrait] - def setAllUsersAreParticipants(chatRoomId: String, allUsersAreParticipants: Boolean): Box[ChatRoomTrait] + def setIsOpenRoom(chatRoomId: String, isOpenRoom: Boolean): Box[ChatRoomTrait] def archiveChatRoom(chatRoomId: String): Box[ChatRoomTrait] def deleteChatRoom(chatRoomId: String): Box[Boolean] def refreshJoiningKey(chatRoomId: String): Box[ChatRoomTrait] @@ -43,7 +43,8 @@ trait ChatRoomTrait { def description: String def joiningKey: String def createdBy: String - def allUsersAreParticipants: Boolean + /** Whether this is an "open room" where all users are implicit participants. */ + def isOpenRoom: Boolean def isArchived: Boolean def createdDate: Date def updatedDate: Date diff --git a/obp-api/src/main/scala/code/chat/MappedChatMessage.scala b/obp-api/src/main/scala/code/chat/MappedChatMessage.scala index f320b6cb10..512f136391 100644 --- a/obp-api/src/main/scala/code/chat/MappedChatMessage.scala +++ b/obp-api/src/main/scala/code/chat/MappedChatMessage.scala @@ -70,11 +70,28 @@ object MappedChatMessageProvider extends ChatMessageProvider { } } - override def getUnreadCount(chatRoomId: String, sinceDate: Date): Box[Long] = { + private def effectiveSinceDate(sinceDate: Date): Date = { + val sixtyDaysAgo = new Date(System.currentTimeMillis() - 60L * 24 * 60 * 60 * 1000) + if (sinceDate.before(sixtyDaysAgo)) sixtyDaysAgo else sinceDate + } + + override def getUnreadCount(chatRoomId: String, userId: String, sinceDate: Date): Box[Long] = { + tryo { + ChatMessage.count( + By(ChatMessage.ChatRoomId, chatRoomId), + By_>(ChatMessage.createdAt, effectiveSinceDate(sinceDate)), + NotBy(ChatMessage.SenderUserId, userId) + ) + } + } + + override def getUnreadMentionCount(chatRoomId: String, userId: String, sinceDate: Date): Box[Long] = { tryo { ChatMessage.count( By(ChatMessage.ChatRoomId, chatRoomId), - By_>(ChatMessage.createdAt, sinceDate) + By_>(ChatMessage.createdAt, effectiveSinceDate(sinceDate)), + NotBy(ChatMessage.SenderUserId, userId), + Like(ChatMessage.MentionedUserIds, s"%$userId%") ) } } diff --git a/obp-api/src/main/scala/code/chat/MappedChatRoom.scala b/obp-api/src/main/scala/code/chat/MappedChatRoom.scala index 3a940ab3b8..061b29c507 100644 --- a/obp-api/src/main/scala/code/chat/MappedChatRoom.scala +++ b/obp-api/src/main/scala/code/chat/MappedChatRoom.scala @@ -53,7 +53,7 @@ object MappedChatRoomProvider extends ChatRoomProvider { ) val openRooms = ChatRoom.findAll( By(ChatRoom.BankId, bankId), - By(ChatRoom.AllUsersAreParticipants, true) + By(ChatRoom.IsOpenRoom, true) ) (explicitRooms ++ openRooms).groupBy(_.chatRoomId).values.map(_.head).toList } @@ -77,10 +77,10 @@ object MappedChatRoomProvider extends ChatRoomProvider { } } - override def setAllUsersAreParticipants(chatRoomId: String, allUsersAreParticipants: Boolean): Box[ChatRoomTrait] = { + override def setIsOpenRoom(chatRoomId: String, isOpenRoom: Boolean): Box[ChatRoomTrait] = { ChatRoom.find(By(ChatRoom.ChatRoomId, chatRoomId)).flatMap { room => tryo { - room.AllUsersAreParticipants(allUsersAreParticipants).saveMe() + room.IsOpenRoom(isOpenRoom).saveMe() } } } @@ -119,7 +119,7 @@ object MappedChatRoomProvider extends ChatRoomProvider { .Name("general") .Description("Default system-wide chat room for all users") .CreatedBy("system") - .AllUsersAreParticipants(true) + .IsOpenRoom(true) .IsArchived(false) .saveMe() } @@ -137,7 +137,7 @@ class ChatRoom extends ChatRoomTrait with LongKeyedMapper[ChatRoom] with IdPK wi object Description extends MappedText(this) object JoiningKey extends MappedUUID(this) object CreatedBy extends MappedString(this, 36) - object AllUsersAreParticipants extends MappedBoolean(this) + object IsOpenRoom extends MappedBoolean(this) object IsArchived extends MappedBoolean(this) override def chatRoomId: String = ChatRoomId.get @@ -146,7 +146,7 @@ class ChatRoom extends ChatRoomTrait with LongKeyedMapper[ChatRoom] with IdPK wi override def description: String = Description.get override def joiningKey: String = JoiningKey.get override def createdBy: String = CreatedBy.get - override def allUsersAreParticipants: Boolean = AllUsersAreParticipants.get + override def isOpenRoom: Boolean = IsOpenRoom.get override def isArchived: Boolean = IsArchived.get override def createdDate: Date = createdAt.get override def updatedDate: Date = updatedAt.get