package org.botdesigner.blueprint.queue

import kotlinx.atomicfu.AtomicInt
import kotlinx.atomicfu.atomic
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import org.botdesigner.blueprint.context.ExecutionReturn
import org.botdesigner.blueprint.context.SuspensionContext


interface BlueprintHandlerQueue<T : SuspensionContext<T>> {
    suspend fun enqueue(
        timeout: IncrementalTimeout,
        context: T,
        action: suspend (T) -> Unit
    )
}

internal class QueueInfo<T>{

    val mutex: Mutex = Mutex()
    val suspendedConsumers: MutableSet<ExecutionReturn.Suspend<T>> = LinkedHashSet()
    val runningTasks : AtomicInt = atomic(0)
}

class BlueprintHandlerQueueImpl<T : SuspensionContext<T>>() : BlueprintHandlerQueue<T> {

    private val map =
        mutableMapOf<String, QueueInfo<T>>()

    private val queueMutex = Mutex()

    override suspend fun enqueue(
        timeout: IncrementalTimeout,
        context: T,
        action: suspend (T) -> Unit
    ) {
        val key = context.key ?: return action(context)

        with(getOrCreate(key)) {
            try {
                runningTasks += 1
                runBlock(
                    delay = timeout,
                    context = context,
                    block = action
                )
            } finally {
                if (runningTasks.decrementAndGet() <= 0) {
                    queueMutex.withLock {
                        map.remove(key)
                    }
                }
            }
        }
    }

    private suspend fun getOrCreate(key: String): QueueInfo<T> {
        queueMutex.withLock {
            map[key]?.let { return it }
            val queue = QueueInfo<T>()
            map[key] = queue
            return queue
        }
    }
}

private suspend fun <T : SuspensionContext<T>> QueueInfo<T>.runBlock(
    delay: IncrementalTimeout,
    context: T,
    block: suspend (T) -> Unit
) {
    mutex.withLock {
        suspendedConsumers
            .firstOrNull { it.tryRelease(context) }
            ?.let {
                delay.reset()
                suspendedConsumers.remove(it)
                return
            }
    }

    val consumers: MutableSet<ExecutionReturn.Suspend<*>> = linkedSetOf()

    try {
        context.launch(
            onSuspended = { suspension ->
                mutex.withLock {
                    consumers += suspension
                    suspendedConsumers += suspension
                    suspension.job.invokeOnCompletion { _ ->
                        suspendedConsumers -= suspension
                        consumers -= suspension
                    }
                }
            }
        ) {
            block(context)
        }
    } finally {
        mutex.withLock {
            suspendedConsumers.removeAll(consumers)
        }
    }
}