type EOF = "EOF"

/**
 * The "backend" of a flow,
 * that does only implement the minimum required operations.
 *
 * Convenience operations are covered by {@link Flow}.
 */
interface FlowSource<T> {
  /**
   * Awaits the next item or returns {@link EOF},
   * if the flow is completely drained.
   */
  next(): Promise<Awaited<T> | EOF>;
}

class FlowSourceImpl<T> implements FlowSource<T> {
  private inFlight = new Set<{ promise: Promise<T> }>()
  private closed = false

  public addItem(job: Promise<T>) {
    if (this.closed) {
      throw new Error(`Invalid State Error: Cannot add item to closed source stream. `)
    }

    this.inFlight.add({promise: job})
    this.interruptWait()
  }

  public async next(): Promise<Awaited<T> | EOF> {
    while (true) {
      if (this.inFlight.size === 0 && this.closed) {
        return "EOF"
      }

      const {interrupt, promise} = createInterruptablePromiseForNextItem(this.inFlight)
      this.interruptWait = interrupt
      const result = await promise

      if (result === "interrupt") {
        continue
      }

      this.inFlight.delete(result.item)

      return result.resolved
    }
  }

  /**
   * Closes the flow source, so that no more new items are accepted.
   * After each of the remaining items have been awaited,
   * the next call to {@link next} will return {@link EOF}
   */
  public close() {
    this.closed = true
    this.interruptWait()
  }

  private interruptWait = () => {
  }
}

const createInterruptablePromiseForNextItem = <T>(inFlight: Set<{ promise: Promise<T> }>) => {
  const abortController = new AbortController()

  const interruptPromise = new Promise<"interrupt">((resolve) => {
    abortController.signal.onabort = () => resolve("interrupt")
  })

  const createNextItemPromise = async (): Promise<{ resolved: Awaited<T>, item: { promise: Promise<T> } }> =>
    await Promise.race(
      [...inFlight.values()]
        .map((item) => (async () => {
          const resolved = await item.promise
          return {resolved, item}
        })())
    )

  return {
    interrupt: () => abortController.abort(),
    promise: Promise.race([createNextItemPromise(), interruptPromise] as const)
  }
}

/**
 * A flow takes an amount of promises from a source and transports their resolved values to consumes
 * in an unspecified order.
 */
export interface Flow<T> extends FlowSource<T> {
  /**
   * Take every item and execute the block with it once.
   *
   * The result of the block will be forgotten.
   *
   * Kinda like an array forEach.
   */
  take(block: (item: Awaited<T>) => void): Flow<T>

  /**
   * Apply the block to each item asynchronously.
   * A stream of these operations will be returned.
   *
   * Kinda like an async version of the array map.
   */

  each<R>(block: (item: Awaited<T>) => Promise<R>): Flow<R>

  /**
   * Waits for every item of the flow until the flow eventually closes.
   * All items are captured into a list, which is returned after the last item was awaited.
   */

  consume(): Promise<Awaited<T>[]>

  /**
   * Executes the block once with {@link this} stream,
   * returning the result of this operation.
   *
   * Its main purpose is to be used in beautiful method chains
   * with flow extensions.
   */
  with<R>(block: (stream: Flow<T>) => R): R

  /**
   * Returns a filtered stream by the given {@link predicate}.
   *
   * Kinda like an array filter.
   */

  filter<S extends T>(predicate: (item: Awaited<T>) => item is Awaited<S>): Flow<S>
}

class ReadableFlowImpl<T> implements Flow<T> {
  constructor(private readonly source: FlowSource<T>) {
  }

  public next() {
    return this.source.next()
  }

  public take(block: (item: Awaited<T>) => void): Flow<T> {
    return this.each(async (item) => {
      block(item)
      return item
    })
  }

  public each<R>(block: (item: Awaited<T>) => Promise<R>): Flow<R> {
    return produceTaskFlow<R>(async (emit, close) => {
      while (true) {
        const waitResult = await this.next()

        if (waitResult === "EOF") {
          close()
          return
        }

        emit(block(waitResult))
      }
    })
  }

  public with<R>(block: (stream: Flow<T>) => R): R {
    return block(this)
  }

  public filter<S extends Awaited<T>>(predicate: (item: Awaited<T>) => item is S): Flow<S> {
    return produceItemFlow(async (emit, close) => {
      while (true) {
        const awaited = await this.next()

        if (awaited === "EOF") {
          break
        }

        if (!predicate(awaited)) {
          continue
        }

        emit(awaited)
      }

      close()
    })
  }

  public async consume(): Promise<Awaited<T>[]> {
    const resolvedItems: Awaited<T>[] = []
    while (true) {
      const awaited = await this.next()

      if (awaited === "EOF") {
        return resolvedItems
      }

      resolvedItems.push(awaited)
    }
  }
}

export const produceTaskFlow = <T>(block: (emit: (promise: Promise<T>) => void, close: () => void) => void): Flow<T> => {
  const sourceStream = new FlowSourceImpl<T>()
  const emit = (promise: Promise<T>) => {
    sourceStream.addItem(promise)
  }

  block(emit, () => {
    sourceStream.close()
  })
  return new ReadableFlowImpl(sourceStream)
}

export const produceItemFlow = <T>(block: (emit: (item: T) => void, close: () => void) => void): Flow<T> => {
  const sourceStream = new FlowSourceImpl<T>()

  const emit = (item: T) => {
    sourceStream.addItem(Promise.resolve(item))
  }

  block(emit, () => {
    sourceStream.close()
  })
  return new ReadableFlowImpl(sourceStream)
}

export const createReadWriteFlow = <T>() => {
  const sourceStream = new FlowSourceImpl<T>()

  return [
    new ReadableFlowImpl(sourceStream),
    (item: Promise<T>) => {
      sourceStream.addItem(item)
    },
    () => {
      sourceStream.close()
    }
  ] as const
}

export const createCombinedFlow = <T, R>(first: Flow<T>, second: Flow<R>): Flow<T | R> => {
  const source: FlowSource<T | R> = {
    async next(): Promise<Awaited<T | R> | "EOF"> {
      let firstFlowEnded = false
      let secondFlowEnded = false

      while (true) {
        if (firstFlowEnded && secondFlowEnded) {
          return "EOF"
        }

        const promises = []
        if (!firstFlowEnded) {
          promises.push((async () => ({name: "first", result: await first.next()} as const))())
        }
        if (!secondFlowEnded) {
          promises.push((async () => ({name: "second", result: await second.next()} as const))())
        }

        const awaited = await Promise.race(promises)

        if (awaited.result === "EOF") {
          if (awaited.name === "first") {
            firstFlowEnded = true
          } else {
            secondFlowEnded = true
          }
          continue
        }

        return awaited.result
      }
    }
  }

  return new ReadableFlowImpl(source)
}
