/* eslint-disable no-console */
import { DOCUMENT } from '@angular/common';
import { inject, Injectable } from '@angular/core';
import { ComponentStore } from '@ngrx/component-store';
import { EventType, SubscriptionEvent, SubscriptionMessage, WsMessage, WsMessageContent } from './types';
import { assertDefined } from './utils';
import { combineLatest, EMPTY, Observable, Subject, timer } from 'rxjs';
import { WebSocketSubject, WebSocketSubjectConfig } from 'rxjs/webSocket';
import { SocketStatsStore } from './socket-stats.store';
import {
  catchError,
  distinctUntilChanged,
  exhaustMap,
  filter,
  map,
  switchMap,
  takeWhile,
  tap,
  withLatestFrom
} from 'rxjs/operators';

import { takeUntilDestroyed } from '@angular/core/rxjs-interop';
import { DownloadTokenStore } from '../state/download-token.store';
import { DOWNLOAD_API_WS_PATH, DOWNLOAD_ENABLED } from '../../utils/constants';

const RETRY_SECONDS = 5;
const MAX_RETRIES = 30;
const DEBUG_MODE = true;

interface SocketState {
  wsSubjectConfig?: WebSocketSubjectConfig<WsMessage>;

  subMessages: WsMessageContent[];

  socket?: WebSocketSubject<WsMessage>;

  connectError?: unknown;
}

@Injectable({
  providedIn: 'root'
})
export class SocketService extends ComponentStore<SocketState> {
  readonly document = inject(DOCUMENT);
  readonly downloadApiTokenStore = inject(DownloadTokenStore);

  private readonly statsStore = inject(SocketStatsStore);

  private messages = new Subject<WsMessageContent>();

  private readonly connected = new Subject<void>();

  private readonly wsSubjectConfig$ = this.select(({ wsSubjectConfig }) => wsSubjectConfig);

  /**
   * The current state of the websocket connection.
   */
  readonly isConnected$ = this.statsStore.isConnected$;

  /**
   * A stream of messages received
   */
  private messages$ = this.messages.asObservable();

  private readonly subMessages$ = this.select(({ subMessages }) => subMessages);

  private readonly socket$ = this.select(({ socket }) => socket);

  /**
   * A stream of messages to send, combined with whether the websocket is connected.
   * This will emit when the websocket is connected, and there are messages to send.
   */
  private readonly toSend$ = combineLatest([this.isConnected$, this.subMessages$]).pipe(
    filter(([isConnected, queue]) => !!(isConnected && queue.length)),
    map(([, queue]) => queue)
  );

  /**
   * Constructs the WebSocketSubjectConfig object, with open and close observers
   * to handle connection status, and trying to re-connect when disconnected.
   */
  private readonly setUpWebSocketSubjectConfig = this.effect((token$: Observable<string>) =>
    token$.pipe(
      tap(token => {
        const url = `${DOWNLOAD_API_WS_PATH}/files/progress?token=${token}`.replace(/^http/, 'ws') + '';

        const config: WebSocketSubjectConfig<WsMessage> = {
          url,
          closeObserver: {
            next: event => {
              DEBUG_MODE && console.log('closeObserver', event);
              this.statsStore.setConnected(false);

              this.tryReconnect();
            }
          },
          openObserver: {
            next: event => {
              DEBUG_MODE && console.log('openObserver', event);

              this.patchState({ connectError: undefined });

              this.statsStore.setConnected(true);
              this.statsStore.bumpConnections();

              // Notify connected
              this.connected.next();
            }
          }
        };

        this.patchState({ wsSubjectConfig: config });
      })
    )
  );

  /**
   * Attempts to connect to the websocket.
   */
  readonly connect = this.effect(trigger$ =>
    trigger$.pipe(
      withLatestFrom(this.wsSubjectConfig$),
      filter(([, config]) => !!config),
      switchMap(([, config]) => {
        assertDefined(config);

        // Create a new socket and listen for messages, pushing them into the messages Subject.
        const socket = new WebSocketSubject(config);
        this.patchState({ socket });
        return socket.pipe(
          tap(msg => {
            this.messages.next(msg);
            this.statsStore.bumpMessagesReceived();
          }),
          catchError(err => {
            this.patchState({ connectError: err });

            DEBUG_MODE && console.log('error in connect', err);
            return EMPTY;
          })
        );
      })
    )
  );

  /**
   * Disconnects the socket. For simulation purposes. The service will automatically try to reconnect.
   */
  readonly disconnect = this.effect(trigger$ =>
    trigger$.pipe(
      withLatestFrom(this.isConnected$, this.socket$),
      tap(([, isConnected, socket]) => {
        if (isConnected && socket) {
          socket.complete();
        }
      })
    )
  );

  /**
   * Handles attempting to reconnect to the websocket until connected or
   * the max retries have been reached.
   */
  private readonly tryReconnect = this.effect(trigger$ =>
    trigger$.pipe(
      exhaustMap(() => {
        return timer(RETRY_SECONDS * 1000).pipe(
          withLatestFrom(this.isConnected$),
          takeWhile(([, isConnected]) => {
            if (!isConnected) {
              this.statsStore.bumpConnectionRetries();
            }

            return !isConnected && this.statsStore.reconnectionTries < MAX_RETRIES;
          }),
          tap(() => {
            this.connect();
          })
        );
      })
    )
  );

  /**
   * Watches the queue for changes, and when the socket exists,
   * sends the messages in the queue.
   */
  readonly watchQueue = this.effect((queue$: Observable<WsMessageContent[]>) =>
    queue$.pipe(
      withLatestFrom(this.socket$),
      tap(([queue, socket]) => {
        DEBUG_MODE && console.log('watchQueue', queue, socket);

        if (!socket) {
          return;
        }

        while (queue.length) {
          const msg = queue.shift();
          assertDefined(msg);

          DEBUG_MODE && console.log('Sending queued message', msg);
          socket.next(msg);

          this.patchState({ subMessages: queue });
        }
      })
    )
  );

  /**
   * Adds a message to the queue to send to the server to subscribe or unsubscribe to/from a notification.
   */
  private readonly queueSubMessage = this.effect((msg$: Observable<SubscriptionMessage>) =>
    msg$.pipe(
      withLatestFrom(this.subMessages$),
      tap(([msg, queue]) => {
        if (msg.isSubscribe) {
          this.statsStore.bumpSubscriptionCount();
        } else {
          this.statsStore.dropSubscriptionCount();
        }

        this.patchState({ subMessages: [...queue, msg] });
      })
    )
  );

  constructor() {
    super({
      subMessages: []
    });

    DOWNLOAD_ENABLED && this.initWebsocket();
  }

  private initWebsocket() {
    this.statsStore.setConnected(false);

    const downloadApi$ = this.downloadApiTokenStore.downloadToken$;
    downloadApi$
      .pipe(
        takeUntilDestroyed(),
        filter(token => !!token),
        distinctUntilChanged()
      )
      .subscribe(token => {
        this.disconnect();
        this.setUpWebSocketSubjectConfig(token);
        setTimeout(() => {
          this.connect();
        });
      });

    this.watchQueue(this.toSend$);
  }

  /**
   * Begins listening to a type of events or events.
   *
   * Sets up the subscription with the server, sending a subscribe message, and returning a stream
   * of filtered messages.
   *
   * When the client closes the stream, sends an unsubscribe message to the server.
   *
   * @param eventType
   * @returns A stream of messages of the specified type.
   */
  listen<T extends SubscriptionEvent>(eventType?: EventType | EventType[] | SubscriptionEvent<any>): Observable<T> {
    return this.messages$.pipe(
      map(msg => msg as SubscriptionEvent),
      filter(msg => {
        if (!eventType) {
          return true;
        }
        if (typeof eventType === 'string') {
          return msg.event_type === eventType;
        } else if (Array.isArray(eventType)) {
          return eventType.includes(msg.event_type);
        } else {
          return eventType.event_type === msg.event_type;
        }
      }),
      map(msg => msg as T)
    );
  }

  subscribeTo(msg: SubscriptionMessage) {
    this.queueSubMessage(msg);
  }
}
