package org.botdesigner.blueprint.context

import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.Job
import kotlinx.coroutines.async
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.asSharedFlow
import kotlinx.coroutines.flow.filterIsInstance
import kotlinx.coroutines.flow.onSubscription
import kotlinx.coroutines.flow.takeWhile
import kotlinx.coroutines.launch
import kotlinx.coroutines.supervisorScope
import kotlinx.coroutines.suspendCancellableCoroutine
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException

sealed interface ExecutionReturn<out Context> {

    /**
     * Complete execution.
     * */
    object Complete : ExecutionReturn<Nothing>

    /**
     * Suspend execution until [Context] matching [tryRelease] received
     * or execution timeout exceeded.
     * Executions that doesn't match [tryRelease] will be executed in parallel.
     * */
    class Suspend<Context>(
        val tryRelease: (Context) -> Boolean,
        val job: Job
    ) : ExecutionReturn<Context>
}

/**
 * Allows [Context] execution to suspend until other context is received
 * */
interface SuspensionContext<Context> {

    val key : String?

    /**
     * Called by blueprint runner to manage context suspension.
     * This function launches [block] blueprint and calls [onSuspended] each time this blueprint suspends.
     *
     * @param block block that launches blueprint on this [Context]
     * @param onSuspended called when launched blueprint was suspended.
     * */
    suspend fun launch(
        onSuspended : suspend (ExecutionReturn.Suspend<Context>) -> Unit,
        block : suspend () -> Unit,
    )

    /**
     * Called by blueprint to await next execution with [Context] matches [matcher].
     *
     * @return new context matches [matcher] or null if [timeout] > 0 and request was timed out
     * */
    suspend fun await(
        timeout : Long = Long.MAX_VALUE,
        matcher : (Context) -> Boolean,
    ) : Context?
}

abstract class SuspensionContextImpl<Context> : SuspensionContext<Context> {

    private val _suspensionFlow =
        MutableSharedFlow<ExecutionReturn<Context>>()

    override suspend fun launch(
        onSuspended: suspend (ExecutionReturn.Suspend<Context>) -> Unit,
        block: suspend () -> Unit,
    ): Unit = coroutineScope {
        _suspensionFlow
            .asSharedFlow()
            .onSubscription {
                launch { block() }.invokeOnCompletion {
                    _suspensionFlow.tryEmit(ExecutionReturn.Complete)
                }
            }
            .takeWhile { it !is ExecutionReturn.Complete }
            .filterIsInstance<ExecutionReturn.Suspend<Context>>()
            .collect(onSuspended)
    }

    @OptIn(ExperimentalCoroutinesApi::class)
    override suspend fun await(
        timeout: Long,
        matcher: (Context) -> Boolean,
    ): Context? = coroutineScope {

        val timeoutJob = Job(coroutineContext[Job])

        val deferredContext = async {
            val result = CompletableDeferred<Context>()
            val suspension = ExecutionReturn.Suspend<Context>(
                tryRelease = { ctx ->
                    matcher(ctx).also {
                        if (it) result.complete(ctx)
                    }
                },
                job = requireNotNull(coroutineContext[Job])
            )
            _suspensionFlow.emit(suspension)
            result.await()
        }.apply {
            invokeOnCompletion {
                timeoutJob.cancel()
            }
        }

        launch(timeoutJob) {
            delay(timeout.takeIf { it > 0 } ?: Long.MAX_VALUE)
        }.apply {
            invokeOnCompletion {
                deferredContext.takeIf { it.isActive }
                    ?.cancel(SuspenderCancellationException)
            }
        }

        // cannot use job.await() here because it will cancel parent coroutine instead of returning null on timeout
        suspendCancellableCoroutine { cont ->
            deferredContext.invokeOnCompletion {
                when (it) {
                    null -> cont.resume(deferredContext.getCompleted())
                    is SuspenderCancellationException -> cont.resume(null)
                    else -> cont.resumeWithException(it)
                }
            }
        }
    }
}

private object SuspenderCancellationException : kotlinx.coroutines.CancellationException("Suspension context was timed out")