package org.tigase.officialtea.common.services

import co.touchlab.kermit.Logger
import com.arkivanov.decompose.value.MutableValue
import com.badoo.reaktive.completable.*
import com.badoo.reaktive.maybe.*
import com.badoo.reaktive.observable.*
import com.badoo.reaktive.scheduler.computationScheduler
import com.badoo.reaktive.scheduler.ioScheduler
import com.badoo.reaktive.scheduler.mainScheduler
import com.badoo.reaktive.single.*
import com.badoo.reaktive.subject.behavior.BehaviorSubject
import com.badoo.reaktive.subject.publish.PublishSubject
import com.squareup.sqldelight.db.SqlDriver
import kotlinx.datetime.Clock
import kotlinx.datetime.Instant
import org.tigase.officialtea.common.database.*
import org.tigase.officialtea.common.services.chat.*
import org.tigase.officialtea.common.services.halcyon.lastMessageCorrection
import org.tigase.officialtea.common.services.roster.RosterPresenceServiceImpl
import org.tigase.officialtea.common.services.utils.generateRandomRoomJid
import tigase.halcyon.core.AbstractHalcyon
import tigase.halcyon.core.HalcyonStateChangeEvent
import tigase.halcyon.core.ReflectionModuleManager
import tigase.halcyon.core.xmpp.*
import tigase.halcyon.core.xmpp.modules.MessageReceivedEvent
import tigase.halcyon.core.xmpp.modules.RSM
import tigase.halcyon.core.xmpp.modules.chatmarkers.ChatMarkersModule
import tigase.halcyon.core.xmpp.modules.chatmarkers.isChatMarkerRequested
import tigase.halcyon.core.xmpp.modules.chatmarkers.markable
import tigase.halcyon.core.xmpp.modules.chatstates.ChatStateModule
import tigase.halcyon.core.xmpp.modules.chatstates.OwnChatStateChangeEvent
import tigase.halcyon.core.xmpp.modules.discovery.DiscoveryModule
import tigase.halcyon.core.xmpp.modules.mam.ForwardedStanza
import tigase.halcyon.core.xmpp.modules.mam.MAMModule
import tigase.halcyon.core.xmpp.modules.mix.*
import tigase.halcyon.core.xmpp.modules.receipts.DeliveryReceiptsModule
import tigase.halcyon.core.xmpp.modules.receipts.deliveryReceiptRequest
import tigase.halcyon.core.xmpp.modules.receipts.isDeliveryReceiptRequested
import tigase.halcyon.core.xmpp.modules.roster.RosterEvent
import tigase.halcyon.core.xmpp.modules.roster.RosterItem
import tigase.halcyon.core.xmpp.modules.uniqueId.getOriginID
import tigase.halcyon.core.xmpp.modules.uniqueId.getStanzaIDBy
import tigase.halcyon.core.xmpp.stanzas.Message
import tigase.halcyon.core.xmpp.stanzas.MessageType
import tigase.halcyon.core.xmpp.stanzas.message
import tigase.halcyon.rx.asSingle
import tigase.halcyon.rx.observe

class ChatsServiceImpl(
    private val driver: Single<SqlDriver>,
    private val connectionService: ConnectionService,
    private val rosterPresenceService: RosterPresenceServiceImpl,
) : ChatsService {

    private val log = Logger.withTag("ChatsService")

    override val unreadCount: MutableValue<Int> = MutableValue(0)

    val attachmentsDownloaded = PublishSubject<Unit>()

    private val attachmentProvider: AttachmentProvider = AttachmentProvider()

    constructor(
        driver: SqlDriver, connectionService: ConnectionService, rosterPresenceService: RosterPresenceServiceImpl,
    ) : this(singleOf(driver), connectionService, rosterPresenceService)

    private val openChatsQueries: Single<OpenChatsDatabaseQueries> =
        driver.map { SharedDatabase(it).openChatsDatabaseQueries }.asObservable().replay().autoConnect().firstOrError()

    val chatMarkers: Single<ChatMarkersQueries> =
        driver.map { SharedDatabase(it).chatMarkersQueries }.asObservable().replay().autoConnect().firstOrError()

    val chatsQueries: Single<MessagesDatabaseQueries> =
        driver.map { SharedDatabase(it).messagesDatabaseQueries }.asObservable().replay().autoConnect().firstOrError()

    val attachmentQueries =
        driver.map { SharedDatabase(it).attachmentsQueries }.asObservable().replay().autoConnect().firstOrError()

    val fileUploader = FileUploader(attachmentProvider, attachmentQueries, connectionService)

    private fun processReceivedMessageEvent(event: MIXMessageEvent) {
        val account = event.context.boundJID?.bareJID ?: throw RuntimeException("Unknown account")
        processReceivedMessages(
            type = MessageCollectionType.FromServer,
            account = account,
            events = listOf(event.stanza.toMessageToProcess())
        ).doOnAfterSuccess {
            prepareNotifications(it)
        }.flatMapCompletable {
            confirmReceivingMessage(event.context,
                event.stanza,
                stanzaId = { message -> message.getStanzaIDBy(message.from!!.bareJID) })
        }.subscribe { loadOpenChats() }
    }

    private fun processReceivedMessageEvent(event: MessageReceivedEvent) {
        val account = event.context.boundJID?.bareJID ?: throw RuntimeException("Unknown account")
        if (event.stanza.getChildrenNS("result", "urn:xmpp:mam:2") != null) return
        processReceivedMessages(
            type = MessageCollectionType.FromServer,
            account = account,
            events = listOf(event.stanza.toMessageToProcess())
        ).doOnAfterSuccess {
            prepareNotifications(it)
        }.flatMapCompletable {
            confirmReceivingMessage(event.context, event.stanza, stanzaId = { message ->
                message.getOriginID() ?: message.attributes["id"]
            })
        }
//			.delay(128, mainScheduler)
            .subscribe {
                loadOpenChats()
            }
    }

//	fun markAsRead2(list: List<MsgMeta>): Completable = chatsQueries.execute {
//		log.i { "Marking as read ${list.size}" }
//		list.forEach { meta ->
//			log.i { "READ openCaht=${meta.openChat.id}, timestamp=${meta.timestamp.toEpochMilliseconds()}" }
//			it.updateMessagesState(
//				MessageState.IncomingRead, meta.openChat.id, meta.timestamp, listOf(MessageState.IncomingUnread)
//			)
//		}
//	}

    private fun confirmReceivingMessage(
        halcyon: AbstractHalcyon,
        stanza: Message,
        stanzaId: (Message) -> String?,
    ): Completable = completableFromFunction {
        if (stanza.type == MessageType.Error) return@completableFromFunction
        val sender = stanza.from?.bareJID ?: return@completableFromFunction
        val originId = stanzaId.invoke(stanza) ?: return@completableFromFunction

        val resp = message {
            type = stanza.type
            to = sender

            if (stanza.isDeliveryReceiptRequested()) {
                "received" {
                    xmlns = DeliveryReceiptsModule.XMLNS
                    attribute("id", originId)
                }
            }
            if (stanza.isChatMarkerRequested()) {
                ChatMarkersModule.Marker.Received.xmppValue {
                    this.xmlns = ChatMarkersModule.XMLNS
                    this.attributes["id"] = originId
                }
            }
        }
        if (resp.children.isNotEmpty()) {
            halcyon.request.message(resp, true).send()
        }
    }


    override fun selectedChat(chatId: Long?) {
        FocusService.setActiveChat(chatId)
    }

    private fun dataToOpenChatUnreadsItem(openChats: SelectWithUnread): OpenChatUnreadsItem {
        return OpenChatUnreadsItem(
            openChatId = openChats.id,
            account = openChats.account,
            jid = openChats.jid,
            name = openChats.name,
            unread = openChats.cnt ?: 0,
            contactInRoster = openChats.inroster == 1L,
            type = openChats.type
        )
    }

    val _observeOpenChats = BehaviorSubject<List<OpenChatUnreadsItem>>(emptyList())

    override val notification: PublishSubject<Array<NotificationData>> = PublishSubject()

    override fun observeOpenChats(): Observable<List<OpenChatUnreadsItem>> = _observeOpenChats

    init {
        connectionService.eventBus.observe(HalcyonStateChangeEvent)
            .filter { e -> e.newState == AbstractHalcyon.State.Connected }
            .subscribe(onNext = { doAfterConnected(it.context) })
//        chatsQueries.query { it.selectUnsentMessages() }.observe {
//            Logger.w("IMPORTANT") { "Czytamy listę niewysłabych" }
//            it.executeAsList()
//        }.doOnBeforeNext {
//            Logger.w("IMPORTANT") { "Lista: ${it.size}" }
//
//        }
//            .subscribe(onNext = this::sendUnsentMessages)
        connectionService.eventBus.observe(MessageReceivedEvent).subscribe(onNext = ::processReceivedMessageEvent)
        connectionService.eventBus.observe(MIXMessageEvent).subscribe(onNext = ::processReceivedMessageEvent)
        connectionService.eventBus.observe(RosterEvent).subscribe(onNext = ::processReceivedRosterEvent)
        connectionService.eventBus.register(OwnChatStateChangeEvent, ::doOnOwnChatStateChangeEvent)
        openChatsQueries.query { it.selectWithUnread() }.observe {
            it.executeAsList().map(this::dataToOpenChatUnreadsItem)
        }.subscribe(onNext = _observeOpenChats::onNext)

        observeOpenChats().debounce(500, computationScheduler).observeOn(mainScheduler).map {
            it.sumOf { it.unread }.toInt()
        }.subscribe(onNext = { unreadCount.value = it })

        FocusService.observe()
//			.delay(128, mainScheduler)
            .subscribe {
                loadOpenChats()
            }
    }

    fun findRowIdByOriginId(openChatId: Long, authorJid: BareJID, originId: String) = chatsQueries.query {
        it.getRowIdByOriginId(openChatId, "${authorJid}%", originId)
    }.mapNotNull { it.executeAsOneOrNull() }

    fun findByOriginId(openChatId: Long, authorJid: BareJID, originId: String) = chatsQueries.query {
        it.getByOriginId(openChatId, "${authorJid}%", originId)
    }.mapNotNull { it.executeAsOneOrNull() }

    fun findByRemoteId(openChatId: Long, originId: String) = chatsQueries.query {
        it.getByRemoteId(openChatId, originId)
    }.mapNotNull { it.executeAsOneOrNull() }

    private fun doAfterConnected(halcyon: AbstractHalcyon) {
        sendUnsentMessages()
        loadNewHistory(halcyon)
        halcyon.boundJID?.bareJID?.let {
            loadAttachments(it)
        }
    }


    fun loadAttachments(account: BareJID, messages: List<MsgMeta>? = null) {
        log.i { "Loading attachments started ids=${messages?.map { it.originId }}. " }
        attachmentQueries.query {
            if (messages != null) it.allNotDownloadedByAccountAndIds(
                account = account,
                identifiers = messages.map { it.originId })
            else it.allNotDownloadedByAccount(account = account)
        }.map { it.executeAsList() }.flatten().flatMapMaybe {
            attachmentProvider.download(it)
        }.mapNotNull { if (it is DownloadResult.Success) it else null }.flatMapCompletable { att ->
            log.i { "Set downloaded mark: ${att.id}" }
            attachmentQueries.execute {
                it.markAsDownloaded(
                    id = att.id, size = att.size, contentType = att.contentType, localFile = att.imageLocation
                )
            }
        }.subscribe(onComplete = {
            log.i { "Loading attachments complete" }
            attachmentsDownloaded.onNext(Unit)
        })
    }

    private fun loadNewHistory(halcyon: AbstractHalcyon) {
        halcyon.boundJID?.bareJID?.let { account ->

            chatsQueries.query { it.getLatestMamStanzaId(account) }.map { it.executeAsOneOrNull() }
                .map { entry -> entry?.mam_stanza_id }.flatMapObservable { lastStanzaId ->
                    loadHistory(account = account, afterStanzaId = lastStanzaId).map {
                        it.map(ForwardedStanza<Message>::toMessageToProcess)
                    }.flatMapSingle {
                        processReceivedMessages(
                            type = MessageCollectionType.FromMAM,
                            account = account,
                            events = it,
                            forceCreateNewChat = true
                        )
                    }
                }.subscribe {
//                    loadAttachments(account)
                    loadOpenChats()
                }
        }
    }

    private fun loadOpenChats() {
        log.i { "Load openchats" }
        openChatsQueries.query { it.selectWithUnread() }.map {
            it.executeAsList().map(this::dataToOpenChatUnreadsItem)
        }.subscribe(onSuccess = _observeOpenChats::onNext)
    }

    override fun observeChats(chatId: Long, size: Long): Observable<List<MessageWithAttachment>> =
        chatsQueries.query { it.messageWithAttachment(chatId, size) }.observe {
            Logger.w("IMPORTANT") { "Czytamy listę wiadomości $chatId" }
            it.executeAsList()
        }.powtorzGdy(attachmentsDownloaded)

    override fun observeMessageCount(chatId: Long): Observable<Long> =
        chatsQueries.query { it.countByChatId(chatId) }.observe { it.executeAsOne() }

    override fun observeMarkers(chatId: Long): Observable<List<ChatMarkers>> =
        chatMarkers.query { it.listMarkers(chatId) }.observe { it.executeAsList() }

    private fun createOpenChat(account: BareJID, jid: BareJID, type: OpenChatType): Single<OpenChats> {
        return rosterPresenceService.findRosterEntry(account, jid).map {
            it.name ?: jid.toString()
        }.asSingle(jid.toString()).flatMap { contactName ->
            openChatsQueries.subscribeOn(ioScheduler).observeOn(ioScheduler).map {
                it.transactionWithResult<OpenChats> {
                    it.add(account = account, jid = jid, name = contactName, type = type)
                    val rowId = it.lastInsertRowId().executeAsOne()
                    it.selectById(rowId).executeAsOne()
                }
            }.onErrorResumeNext {
                openChatsQueries.map { it.selectByAccountJid(account = account, jid = jid) }
                    .subscribeOn(ioScheduler).observeOn(ioScheduler).map { it.executeAsOne() }
            }.doOnAfterSuccess { openChat ->
                when (openChat.type) {
                    OpenChatType.Direct -> loadHistory(account = account, with = jid)
                    OpenChatType.Mix -> loadHistory(
                        account = account, archiveJID = jid, node = MIXModule.NODE_MESSAGES
                    )
                }.map { it.map(ForwardedStanza<Message>::toMessageToProcess) }.flatMapSingle {
                    processReceivedMessages(
                        when (openChat.type) {
                            OpenChatType.Mix -> MessageCollectionType.FromMIX
                            OpenChatType.Direct -> MessageCollectionType.FromMAM
                        }, account = account, events = it
                    )
                }.subscribe { }
            }
        }
    }

    private fun loadHistory(
        account: BareJID,
        archiveJID: BareJID? = null,
        with: BareJID? = null,
        node: String? = null,
        afterStanzaId: String? = null,
    ): Observable<List<ForwardedStanza<Message>>> {
        val result = PublishSubject<List<ForwardedStanza<Message>>>()
        val list = mutableListOf<ForwardedStanza<Message>>()
        connectionService[account]!!.getModule<MAMModule>(MAMModule.TYPE).queryForMessages(
            to = archiveJID, with = with?.toString(), node = node, expectedBodies = 256, rsm = RSM.query {
                if (afterStanzaId == null) before() else after(afterStanzaId)
            }, ignoreComplete = true
        ) {
            consumer(list::add)
            summary {
                result.onNext(list)
                result.onComplete()
            }
        }
        return result.observeOn(ioScheduler).subscribeOn(ioScheduler)
    }

    fun openChatWith(account: BareJID, jid: BareJID, createNewChats: Boolean): Single<OpenChats?> {
        return openChatsQueries.query { it.selectByAccountJid(account = account, jid = jid) }
            .map { it.executeAsOneOrNull() }.flatMap {
                when {
                    it != null -> singleOf(it)
                    !createNewChats -> singleOf(null)
                    else -> createOpenChat(account, jid, OpenChatType.Direct)
                }
            }

    }

    override fun openChatWith(account: BareJID, jid: BareJID): Single<OpenChats> {
        return openChatsQueries.query { it.selectByAccountJid(account = account, jid = jid) }
            .map { it.executeAsOneOrNull() }.flatMap { openChat ->
                if (openChat != null) single { emitter -> emitter.onSuccess(openChat) }
                else createOpenChat(account, jid, OpenChatType.Direct)
            }
    }

    override fun findOpenChat(chatId: Long): Single<OpenChats?> =
        openChatsQueries.query { it.selectById(chatId) }.map { it.executeAsOneOrNull() }

    override fun findOpenChat(account: BareJID, jid: BareJID): Single<OpenChats?> =
        openChatsQueries.query { it.selectByAccountJid(account, jid) }.map { it.executeAsOneOrNull() }

    override fun sendMessage(openChat: OpenChats, text: String?, fileToUpload: FileToUpload?, editedMessageId: Long?) {
        if (fileToUpload != null) {
            sendMessageInternal(openChat, null, fileToUpload, editedMessageId)
        }
        if (text != null) {
            sendMessageInternal(openChat, text, null, editedMessageId)
        }
    }

    private fun sendMessageInternal(
        openChat: OpenChats,
        text: String?,
        fileToUpload: FileToUpload?,
        editedMessageId: Long?
    ) {
        log.i { "Sending message request: text=${text != null}; file=${fileToUpload != null}" }
        if (editedMessageId != null) {
            chatsQueries.execute {
                it.updateBody(
                    id = editedMessageId,
                    body = text,
                    correction_stanza_id = null,
                    correction_timestamp = Clock.System.now()
                )
                it.updateMessageState(state = MessageState.OutNotSent, listOf(editedMessageId))
            }.subscribe {
                sendUnsentMessage(editedMessageId)
            }
        } else {
            val uniqueId = nextUID()

            if (fileToUpload != null) {
                log.i { "Attachment found!" }
                attachmentProvider.prepareAttachment(fileToUpload).flatMapCompletable { att ->
                    attachmentQueries.execute {
                        it.addOutgoingAttachment(
                            chatId = openChat.id,
                            account = openChat.account,
                            originalStanzaId = uniqueId,
                            fileName = att.fileName,
                            localFile = att.localFile,
                            size = att.size,
                            contentType = att.contentType
                        )
                    }
                }
            } else {
                completableFromFunction { }
            }.andThen(driver.map { SharedDatabase(it).accountsDatabaseQueries }
                .query { it.selectByName(openChat.account) }.map { it.executeAsOne() }
                .map { it.nickname ?: it.userjid.localpart ?: "Me" }
                .flatMap { nickname ->
                    chatsQueries.map {
                        it.transactionWithResult<Long> {
                            when (openChat.type) {
                                OpenChatType.Direct -> it.add(
                                    openchat_id = openChat.id,
                                    account = openChat.account,
                                    author_jid = openChat.account,
                                    author_nickname = nickname,
                                    recipient_jid = openChat.jid,
                                    recipient_nickname = null,
                                    origin_stanza_id = uniqueId,
                                    mam_stanza_id = null,
                                    body = text,
                                    timestamp = Clock.System.now(),
                                    state = MessageState.OutNotSent,
                                    remote_stanza_id = null,
                                    participant_id = null,
                                    stanza_type = MessageType.Chat,
                                    markable = true
                                )

                                OpenChatType.Mix -> {
                                    it.add(
                                        openchat_id = openChat.id,
                                        account = openChat.account,
                                        author_jid = openChat.account,
                                        author_nickname = nickname,
                                        recipient_jid = openChat.jid,
                                        recipient_nickname = null,
                                        origin_stanza_id = uniqueId,
                                        mam_stanza_id = null,
                                        body = text,
                                        timestamp = Clock.System.now(),
                                        state = MessageState.OutNotSent,
                                        remote_stanza_id = null,
                                        participant_id = null,
                                        stanza_type = MessageType.Groupchat,
                                        markable = true
                                    )
                                }
                            }
                            it.lastInsertRowId().executeAsOne()
//                            it.selectById(rowId).executeAsOne()
                        }
                    }.map {
                        sendUnsentMessage(it)
                    }
                })
                .subscribe {
                    log.i { "Message saved!" }
                }
        }
    }

    private fun doOnOwnChatStateChangeEvent(event: OwnChatStateChangeEvent) {
        if (event.context.state != AbstractHalcyon.State.Connected) return
        log.i { "Chat state: ${event.oldState} -> ${event.state}" }
        findOpenChat(event.context.boundJID?.bareJID!!, event.jid).notNull().filter { it.type == OpenChatType.Direct }
            .subscribe {
                event.context.getModule<ChatStateModule>(ChatStateModule.TYPE).publishChatState(event.jid, event.state)
            }
    }

    override fun markAsRead(messages: MessageWithAttachment) {
        if (messages.openchat_id == FocusService.visibleChatId && FocusService.focused) {
            chatsQueries.execute {
                log.d { "Messages in openchat_id=${messages.openchat_id} marked as read." }
                it.updateMessagesState(
                    MessageState.IncomingRead,
                    messages.openchat_id,
                    messages.timestamp,
                    listOf(MessageState.IncomingUnread)
                )
            }.subscribe {
                loadOpenChats()
                sendReadConfirmation(messages)
            }
        }
    }

    private fun sendReadConfirmation(stanza: MessageWithAttachment) {
//		if (!stanza.markable) return
        findOpenChat(stanza.openchat_id!!).filter { it != null }.mapNotNull {
            connectionService[it!!.account]
        }.subscribe { halcyon ->
            val resp = message {
                type = MessageType.Chat
                to = stanza.author_jid

                ChatMarkersModule.Marker.Displayed.xmppValue {
                    this.xmlns = ChatMarkersModule.XMLNS
                    this.attributes["id"] = when (stanza.stanza_type) {
                        MessageType.Groupchat -> stanza.remote_stanza_id
                        else -> stanza.origin_stanza_id
                    }
                }
            }
            halcyon.request.message(resp, true).send()
        }
    }

    override fun markCurrentAsRead() {
        val states = listOf(MessageState.IncomingUnread)
        FocusService.visibleChatId?.let { chatId ->
            chatsQueries.query { it.getLatestMessage(chatId, states) }.mapNotNull { it.executeAsOneOrNull() }
                .flatMapCompletable { msg ->
                    chatsQueries.execute {
                        it.updateMessagesState(MessageState.IncomingRead, chatId, msg.timestamp, states)
                    }
                }.subscribe()
        }
    }

    override fun loadLastMessage(openChatId: Long): Maybe<Messages> = chatsQueries.query {
        it.selectNewest(openchat_id = openChatId, MessageState.values().filter { it.outgoing })
    }.map { it.executeAsOneOrNull() }.notNull()

    override fun closeChat(openChatId: Long): Completable {
        return findOpenChat(openChatId).subscribeOn(ioScheduler).observeOn(ioScheduler).notNull().doOnBeforeSuccess {
            if (it.type == OpenChatType.Mix) connectionService[it.account]!!.getModule<MIXModule>(MIXModule.TYPE)
                .leave(it.jid).asSingle().subscribe { }
        }.flatMapCompletable {
            openChatsQueries.execute { it.delete(openChatId) }
        }
    }

    private fun sendUnsentMessages(list: List<SelectUnsentMessages>) {
        log.i { "sendUnsentMessages(list)" }
        list.forEach(this::sendUnsentMessage)
    }

    private fun sendUnsentMessages() {
        log.i { "sendUnsentMessages()" }
        chatsQueries.query { it.selectUnsentMessages() }.map { it.executeAsList() }
            .subscribe(onSuccess = this::sendUnsentMessages)
    }

    private fun sendUnsentMessage(messageId: Long) {
        chatsQueries.query { it.selectUnsentMessages() }.map {
            it.executeAsList()
        }.flatten().filter { it.id == messageId }
            .subscribe {
                sendUnsentMessage(it)
            }
    }

    private fun sendUnsentMessage(message: SelectUnsentMessages) {
        log.i { "sendUnsentMessage(${message.id})" }
        val halcyon = connectionService[message.account] ?: return
        if (halcyon.state != AbstractHalcyon.State.Connected) {
            log.w { "Halcyon[${message.account}] is not connected" }
            return
        }

        log.i { "Starting! " }
        fileUploader.upload(message.account, message.origin_stanza_id)
            .onErrorResumeNext {
                log.i { "Error! ${it.message}" }
                singleOf(null)
            }
            .flatMap { slot ->
                log.i { "Trying to save attachment" }
                attachmentQueries.execute {
                    if (slot != null) it.markAsUploaded(
                        account = message.account, originStanzaId = message.origin_stanza_id, url = slot.getUrl
                    )
                }.asSingle { slot }
            }.map { slot ->
                log.i { "Preparing message. slot==${slot != null}" }
                message {
                    if (message.correction_timestamp != null) {
                        attributes["id"] = nextUID()
                        lastMessageCorrection(message.origin_stanza_id)
                    } else {
                        attributes["id"] = message.origin_stanza_id
                    }
                    to = message.recipient_jid
                    type = message.stanza_type
                    body = if (message.body?.isEmpty() == true && slot != null) slot.getUrl else message.body
                    if (message.markable) {
                        markable()
                    }
                    deliveryReceiptRequest()
                    if (slot != null) {
                        "x"("jabber:x:oob") {
                            "url" { +slot.getUrl }
                        }
                    }
                }
            }.flatMap { stanza ->
                halcyon.request.message(stanza).onSend {
                    log.i { "On send" }
                    chatsQueries.execute { query ->
                        query.updateMessageState(id = listOf(message.id), state = MessageState.OutSent)
                    }.subscribe()
                }.asSingle()
            }.subscribe {
                log.i { "Message sent" }
            }
    }

    private fun processReceivedRosterEvent(event: RosterEvent) {
        if (event is RosterEvent.ItemAdded && event.item.annotations.any { it is MIXRosterItemAnnotation }) {
            addMixChannel(event.context.boundJID?.bareJID!!, event.item).flatMapCompletable {
                updateMIXChannelsNames(event.context, event.item.jid)
            }.subscribe()
        }
    }

    private fun updateMIXChannelsNames(halcyon: AbstractHalcyon, jid: BareJID? = null) =
        openChatsQueries.query { it.selectAll() }.map { it.executeAsList() }.flatten()
            .filter { it.account == halcyon.boundJID!!.bareJID && it.type == OpenChatType.Mix }
            .filter { jid == null || jid == it.jid }.flatMapMaybe { openChat ->
                log.e { "Requesting" }
                halcyon.getModule(DiscoveryModule).info(openChat.jid).asSingle().map { it.identities }.flatten()
                    .filter { it.type == "mix" && !it.name.isNullOrBlank() }.firstOrComplete().map {
                        openChat.copy(name = it.name!!)
                    }
            }.flatMapCompletable { openChat ->
                log.e { "JID=${openChat.jid}; name=${openChat.name}" }
                openChatsQueries.execute { it.updateName(id = openChat.id, name = openChat.name) }
            }


    private fun addMixChannel(account: BareJID, item: RosterItem) =
        openChatsQueries.query { it.selectByAccountJid(account = account, jid = item.jid) }
            .map { it.executeAsOneOrNull() }.flatMap { openChats: OpenChats? ->
                if (openChats == null) createOpenChat(account, item.jid, OpenChatType.Mix) else single { openChats }
            }


// 	fun addMixChannel(account: BareJID, item: RosterItem) {
// 		openChatsQueries.query { it.selectByAccountJid(account = account, jid = item.jid) }
// 			.map { it.executeAsOneOrNull() }
// 			.subscribe { openChats: OpenChats? ->
// 				if (openChats == null) createOpenChat(account, item.jid, OpenChatType.Mix).subscribe()
// 			}
// 	}

    // i do not know what name is, and there is no documentation. xmpp docs on mix are not helpful :P
    @OptIn(ReflectionModuleManager::class)
    override fun createMixChannel(account: BareJID, participants: List<BareJID>, channelJID: String?) {
        val connection = connectionService[account]!!.getModule<MIXModule>(MIXModule.TYPE)
        val channelComponentName = channelJID?.toBareJID() ?: generateRandomRoomJid(account)
        val directMessage = participants.size == 1
        if (directMessage) {
            println("direct messaging")
            openChatWith(account, participants[0])
        } else {
            println("mix chatting")
            println(participants[0])
            connection
                .create(channelComponentName, "Mix Channel")
                .asSingle()
                .subscribeOn(ioScheduler)
            for (participant in participants) {
                // token not necessary for now. Not standardized in XMPP docs, replace with validity time eventually
                connection.createInvitation(participant, channelComponentName, account, "token")
            }
        }
    }

}


fun MessagesDatabaseQueries.saveOrUpdate(
    openChatId: Long,
    account: BareJID,
    senderJid: JID,
    annotation: MixAnnotation?,
    recipientJid: JID,
    stanzaType: MessageType,
    originId: String,
    mamStanzaId: String?,
    remoteStanzaId: String?,
    participantId: String?,
    body: String,
    timestamp: Instant,
    state: MessageState,
    markable: Boolean,
) {
    val log = Logger.withTag("saveOrUpdate")
    val author_jid = annotation?.jid ?: senderJid
    val updateId = getRowIdByOriginId(openChatId, "${author_jid.bareJID}%", originId).executeAsOneOrNull()
    if (updateId != null) {
        log.d { "Update entry originId=$originId" }
        update(
            id = updateId,
            mam_stanza_id = mamStanzaId,
            author_nickname = annotation?.nick,
            remote_stanza_id = remoteStanzaId,
            participant_id = participantId,
            timestamp = timestamp,
        )
    } else {
        log.d { "Add entry originId=$originId" }
        add(
            openchat_id = openChatId,
            account = account,
            author_jid = author_jid,
            author_nickname = annotation?.nick,
            recipient_jid = recipientJid,
            recipient_nickname = null,
            participant_id = participantId,
            origin_stanza_id = originId,
            mam_stanza_id = mamStanzaId,
            remote_stanza_id = remoteStanzaId,
            body = body,
            timestamp = timestamp,
            state = state,
            stanza_type = stanzaType,
            markable = markable
        )

    }
}
