import { SocketMessage, SessionDeviceType, GatewayCommandMessage, GatewayCommandMessageType, GatewayResponseMessage, GatewayResponseMessageType } from '@ncss/models';

import { EventEmitter, Injectable } from '@angular/core';
import * as _ from 'lodash';
import * as io from 'socket.io-client';

import { AppSettingsService } from './../app-settings.service';
import { filter, first, timeout } from 'rxjs/operators';

export type SocketCallback = (msg: SocketMessage) => void;
export interface PropertyEventSubscription {
  unsubscribe: () => void;
}
export interface PropertyEvents {
  subscribe: (cb: SocketCallback) => PropertyEventSubscription;
}

@Injectable()
export class MobileLiveDeviceService {
  public onMessage = new EventEmitter<SocketMessage>();

  private cb: SocketCallback;
  private socket;
  private io;
  private token: string;
  private receivedMessages: { [key: string]: Date } = {};

  constructor(private appSettingsService: AppSettingsService) {
    this.io = io;
    setInterval(this.cleanReceivedMessages.bind(this), 10 * 1000);

  }
  public propertyEvents(gatewayId: number): PropertyEvents {
    return {
      subscribe: (cb: SocketCallback) => {
        if (this.socket) {
          this.socket.close();
        }
        this.cb = cb;
        this.socket = this.connectToNamespace('/live', { gatewayId });
        this.socket.on('message', this.message.bind(this));
        return {
          unsubscribe: () => {
            this.cb = null;
            if (this.socket) {
              this.socket.close();
            }
          },
        };
      },
    };
  }

  private connectToNamespace(ns: string = '', query: any = {}) {
    if (!this.token) {
      this.token = localStorage.getItem('token');
    }
    return this.io(this.appSettingsService.appSettings.socketEndpoint + ns, {
      transports: ['websocket'],
      query: {
        ...query,
        token: this.token,
        deviceType: SessionDeviceType.MOBILE,
      },
    });
  }

  private message(data: any) {
    const msg = SocketMessage.create(data);
    if (msg) {
      if (this.isMessageIdNew(msg.id)) {
        this.recordReceivedMessage(msg.id);
      }
      if (this.cb) {
        this.cb(msg);
      }
      this.onMessage.next(msg);
    }
  }

  private isMessageIdNew(id) {
    return !this.receivedMessages[id];
  }

  private recordReceivedMessage(id) {
    this.receivedMessages[id] = new Date(new Date().getTime() + (9 * 1000));
  }

  private cleanReceivedMessages() {
    const removeIds = [];
    const date = new Date();
    _.forEach(this.receivedMessages, (val, key) => {
      if (val < date) {
        removeIds.push(key);
      }
    });

    _.forEach(removeIds, (id) => {
      delete this.receivedMessages[id];
    });
  }

  public syncGateway(gatewayId: number) {
    if (!this.socket) {
      throw new Error('Must subscribe first');
    } else {
      const msg = new GatewayCommandMessage({ type: GatewayCommandMessageType.FORCE_SYNC, gatewayId });
      this.send(msg);
    }
  }

  public async pingPoweredDevice(deviceId: number, gatewayId: number) {
    if (!this.socket) {
      return null;
    }
    const msg = new GatewayCommandMessage({ type: GatewayCommandMessageType.PING_POWERED_DEVICE, gatewayId });
    msg.deviceId = deviceId;
    this.send(msg);
    try {
      const res = await this.onMessage.pipe(
        filter((r) => {
          return r &&
          r instanceof GatewayResponseMessage &&
          r.type === GatewayResponseMessageType.PING_POWERED_DEVICE_RESPONSE &&
          r.pingedDevice &&
          r.pingedDevice.id === deviceId;
        }),
        first(),
        timeout(30_000),
      ).toPromise();
      return res as GatewayResponseMessage;
    } catch (e) {
      return null;
    }
  }

  public send(msg: GatewayCommandMessage) {
    this.socket.emit('message', msg);
  }
}
