import { Injectable } from '@angular/core';
import { forkJoin, from, Observable } from 'rxjs';
import { HelperService } from '../helper.service';
import RealTimeConnection from './real-time-connection';
import { RealTimeConnectionDeps } from "./real-time-connection.deps";
import RealTimeStreamConnection from './real-time-stream-connection';
import { RealTimeStreamSubscriber } from './real-time-stream-subscriber';

@Injectable({
  providedIn: 'root'
})
export class RealTimeService {

  private eventListeners: { [key: string]: (data: unknown) => void } = {}
  private streamSubscribers: { [key: string]: RealTimeStreamSubscriber } = {}
  private streamConnections: { [key: string]: RealTimeStreamConnection } = {}

  private readonly _steadyConnection: RealTimeConnection;

  constructor(
      private realTimeConnectionDeps: RealTimeConnectionDeps,
      private helperService: HelperService
    ) {
      this._steadyConnection = new RealTimeConnection(realTimeConnectionDeps);
      this.helperService.onUserIsActive(() => this.connectAll(), () => this.disconnectAll());
  }

  get steadyConnection() {
    return this._steadyConnection;
  }

  public connectAll(): Observable<void[]> {
    const connectionObservables: Observable<void>[] = [this.steadyConnection.connect(this.eventListeners, Object.values(this.streamSubscribers))];
    for (const streamName in this.streamConnections) {
      connectionObservables.push(this.streamConnections[streamName].connect());
    }
    return forkJoin(connectionObservables);
  }

  public disconnectAll(): Observable<unknown[]> {
    const connectionObservables: Observable<void>[] = [this.steadyConnection.disconnect()];
    for (const connectionName in this.streamConnections) {
      connectionObservables.push(this.streamConnections[connectionName].disconnect());
    }
    return forkJoin(connectionObservables);
  }

  public send(methodName: string, ...args: unknown[]): Observable<void> {
    return from(this.steadyConnection.connection.send(methodName, args));
  }

  public invoke(methodName: string, ...args: unknown[]): Observable<unknown> {
    return from(this.steadyConnection.connection.invoke(methodName, args));
  }

  public registerHandler<T>(methodName: string, listener: (data: T) => void): void {
    this.unRegisterHandler(methodName);
    this.eventListeners[methodName] = listener;
    this.steadyConnection.connection?.on(methodName, listener);
  }

  public unRegisterHandler(methodName: string): void {
    delete this.eventListeners[methodName];
    this.steadyConnection.connection?.off(methodName);
  }

  public startSteadyStream(subscriber: RealTimeStreamSubscriber) {
    this.disposeSteadyStream(subscriber);
    this.streamSubscribers[subscriber.hubInfo.methodName] = subscriber;
    return this.steadyConnection.stream(subscriber);
  }

  public disposeSteadyStream(subscriber: RealTimeStreamSubscriber) {
    if (this.streamSubscribers[subscriber.hubInfo.methodName]?.streamSubscriber.closed === false) {
      this.streamSubscribers[subscriber.hubInfo.methodName]?.connection?.dispose();
    }
    delete this.streamSubscribers[subscriber.hubInfo.methodName];
  }

  public startStreamConnection(subscriber: RealTimeStreamSubscriber) {
    this.disposeStreamConnection(subscriber.hubInfo.methodName);
    const streamConnection = new RealTimeStreamConnection(
      this.realTimeConnectionDeps,
      subscriber
    );
    this.streamConnections[subscriber.hubInfo.methodName] = streamConnection;
    this.helperService.ifUserIsActiveNow(() => streamConnection.connect());
  }

  public disposeStreamConnection(methodName: string) {
    this.streamConnections[methodName]?.dispose()
    delete this.streamConnections[methodName];
  }
}
