/* eslint-disable @typescript-eslint/no-unsafe-return */
import { call, take, takeEvery, select, delay, race } from "redux-saga/effects";

import { ACTION } from "../../actions";

import {
  allLoadingFieldsDisable,
  allLoadingFieldsEnable,
  closeStream,
  flowriteModeEnable,
  interruptUpdate,
  loadingIndicatorDisable,
  loadingIndicatorEnable,
  resetPromptWarningSecondBody,
  secondBodyUpdate,
  timeoutError,
} from "../utils/helpers";

import subscribe from "../utils/subscribe";
import messageConsumer from "./consumers/message-consumer";
import errorConsumer from "./consumers/error-consumer";

// the main function receiving completion data from the backend
// takes listeners defined in utils/listeners to react on different types of messages
/* eslint-disable no-console */
/* eslint-disable @typescript-eslint/no-explicit-any */
/* eslint-disable @typescript-eslint/explicit-module-boundary-types */

function* listenStream(action: any): any {
  const { source } = action.payload;
  // attach listeners, start streaming and create an eventChannel for the arriving events
  const chan = yield call(subscribe, source);
  try {
    // set the appState to flowriting
    yield flowriteModeEnable();
    yield loadingIndicatorEnable();
    yield allLoadingFieldsEnable();
    const { appState } = yield select((state) => state);

    let pill = false;

    while (true) {
      const { payload, timeout } = yield race({
        payload: take(chan),
        timeout: delay(60000),
      });

      if (payload) {
        switch (payload.type) {
          // normal case of a new message
          case "message":
            yield messageConsumer(source, payload);
            break;

          case "error":
            yield errorConsumer(source);
            break;

          default:
            yield closeStream(source);
            yield takeEvery(
              ACTION.SAGAS.EVENT_STREAM.INTERRUPT,
              closeStream,
              source
            );
        }
      } else if (timeout) {
        if (
          !appState.active_stream_1 &&
          !appState.active_stream_2 &&
          !appState.active_stream_3
        ) {
          yield closeStream(source);
        } else {
          yield timeoutError(source);
          pill = true;
        }
        break;
      }

      if (appState.interrupt) {
        // clear the bodies
        yield secondBodyUpdate("", 1);
        yield secondBodyUpdate("", 2);
        yield secondBodyUpdate("", 3);

        // interrupt
        yield interruptUpdate(false);
        break;
      }

      // within each while-loop cycle we can interrupt the stream process
      // resulting in a quick closing of the source event channel
      yield takeEvery(ACTION.SAGAS.EVENT_STREAM.INTERRUPT, closeStream, source);
    }

    if (!pill) {
      yield resetPromptWarningSecondBody();
      yield loadingIndicatorDisable();
      yield allLoadingFieldsDisable();
    }
  } catch (error) {
    yield closeStream(source);
    yield call(console.log, "Error", error);
  }
}

export default listenStream;
