import { ClientReadableStream } from 'grpc-web';
import log from 'loglevel';
import {
  from,
  Observable,
  ObservableInput,
  ObservedValueOf,
  Subject,
  throwError,
} from 'rxjs';
import { catchError } from 'rxjs/operators';
import { ApiConsts } from '@/api/ApiConsts';
import { ErrorWithStatus } from '@/api/ErrorWithStatus';
import { apiUrl } from '@/constants/common';
import { store } from '@/redux/store';
import { authMetaDataSelector, isAuthenticatedSelector } from '@/redux/user';
import { WindowsUserInfo } from './owpb/pbFiles/basic_pb';
import { DeviceConnected } from './owpb/pbFiles/pushsub_events_pb';
import { PushSubServiceClient } from './owpb/pbFiles/Pushsub_serviceServiceClientPb';
import {
  EventMessage,
  PublishRequest,
  PublishResponse, SubscribeRequest,
} from './owpb/pbFiles/pushsub_service_pb';
import { CollectionUpdate } from './owpb/pbFiles/updates_service_pb';
import { Subscription } from './owpb/pbFiles/subscription_service_pb';
import { License } from './owpb/pbFiles/customer_pb';


/**
 * Wersja rxjs.from z konwersją błędu na ApiErrorWithStatus
 * @param input
 */
function rxjsFromWithErrorHandle<O extends ObservableInput<unknown>>(
  input: O
): Observable<ObservedValueOf<O>> {
  return from(input)
    .pipe(
      catchError((error) => {
        const errorWithStatus = new ErrorWithStatus(error);
        if (isAuthenticatedSelector(store.getState())) {
          // Todo: Obsługa błędów krytycznych jak np. utrata połączenia z serwerem
        }
        return throwError(errorWithStatus);
      })
    );
}


class PushSub {
  /** Serwisy GRPC */
  private pushSubServiceClient: PushSubServiceClient;

  /** Stream wiadomości z backendu */
  private pushSubStream: ClientReadableStream<EventMessage> | null = null;

  /** Dla automatycznego nadawania publikacji (przy publish) */
  private lastPublishId: number | undefined;

  /** Observable umożliwiające śledzenie zdarzeń przesyłanych przez serwer API */
  private eventReceivedSubject: Subject<EventMessage>;

  /** Observable umożliwiające reakcję na błąd w streamie  */
  private eventStreamErrorSubject: Subject<ErrorWithStatus>;

  /** Observable umożliwiające reakcję na updated kolekcji */
  private collectionsUpdateSubject: Subject<CollectionUpdate>;

  /** Observable umożliwiające reakcję na podłączenie urządzenia */
  private deviceConnectedSubject: Subject<[string, DeviceConnected]>;

  /** Observable umożliwiające reakcję na odłączenie urządzenia */
  private deviceDisconnectedSubject: Subject<string>;

  /** Observable umożliwiające reakcję na aktualizacje licencji */
  licenseChangedSubject: Subject<License>;

  /** Observable umożliwiające reakcję na aktualizacje subskrypcji */
  private subscriptionChangedSubject: Subject<Subscription>;

  /** Observable umożliwiające reakcję na zmianę sesji użytkownika Windows */
  private windowsSessionChangedSubject: Subject<[string, WindowsUserInfo]>;


  constructor() {
    this.pushSubServiceClient = new PushSubServiceClient(apiUrl);

    this.eventReceivedSubject = new Subject<EventMessage>();
    this.collectionsUpdateSubject = new Subject<CollectionUpdate>();
    this.deviceConnectedSubject = new Subject<[string, DeviceConnected]>();
    this.deviceDisconnectedSubject = new Subject<string>();
    this.eventStreamErrorSubject = new Subject<ErrorWithStatus>();
    this.licenseChangedSubject = new Subject<License>();
    this.subscriptionChangedSubject = new Subject<Subscription>();
    this.windowsSessionChangedSubject = new Subject<[string, WindowsUserInfo]>();
  }

  get eventReceived$(): Observable<EventMessage> {
    return this.eventReceivedSubject.asObservable();
  }

  get eventStreamError$(): Observable<ErrorWithStatus> {
    return this.eventStreamErrorSubject.asObservable();
  }

  get collectionsUpdate$(): Observable<CollectionUpdate> {
    return this.collectionsUpdateSubject.asObservable();
  }

  get deviceConnected$(): Observable<[string, DeviceConnected]> {
    return this.deviceConnectedSubject.asObservable();
  }

  get deviceDisconnected$(): Observable<string> {
    return this.deviceDisconnectedSubject.asObservable();
  }

  get licenseChanged$(): Observable<License> {
    return this.licenseChangedSubject.asObservable();
  }

  get subscriptionChanged$(): Observable<Subscription> {
    return this.subscriptionChangedSubject.asObservable();
  }

  get windowsSessionChanged$(): Observable<[string, WindowsUserInfo]> {
    return this.windowsSessionChangedSubject.asObservable();
  }

  /**
   * Rozpoczęcie obsługi streamu komunikatów
   */
  public startHandleEvents() {
    // Odpalenie streamu zdarzeń
    this.initEventsStream(['#']);
  }

  /**
   * Zatrzymanie obsługi streamu komunikatów
   */
  public stopHandleEvents() {
    this.pushSubStream?.cancel();
  }

  /**
   *  Zainicjowanie i obsługa streamu eventów API
   */
  private initEventsStream(topicsList: Array<string>) {
    const request = new SubscribeRequest();
    request.setTopicsList(topicsList);

    const authMetaData = authMetaDataSelector(store.getState());
    this.pushSubStream = this.pushSubServiceClient.initEventsStream(
      request,
      authMetaData
    ) as ClientReadableStream<EventMessage>;

    const callback = (eventMessage: EventMessage) => {
      const payload = eventMessage.getPayload_asU8();
      log.info(`PushSub Message: ${eventMessage.getTopic()}`);
      switch (eventMessage.getTopic()) {
        case ApiConsts.TopicCollectionUpdate: {
          const update = CollectionUpdate.deserializeBinary(payload);
          this.collectionsUpdateSubject.next(update);
          break;
        }
        case ApiConsts.TopicDeviceConnected: {
          this.deviceConnectedSubject.next([
            eventMessage.getPublisherId(),
            DeviceConnected.deserializeBinary(payload),
          ]);
          break;
        }
        case ApiConsts.TopicDeviceDisconnected: {
          this.deviceDisconnectedSubject.next(eventMessage.getPublisherId());
          break;
        }
        case ApiConsts.TopicPing:
          break;
        case ApiConsts.TopicLicenseChanged:
          this.licenseChangedSubject.next(License.deserializeBinary(payload));
          break;
        case ApiConsts.TopicSubscriptionChanged:
          this.subscriptionChangedSubject.next(Subscription.deserializeBinary(payload));
          break;
        case ApiConsts.TopicWindowsSessionChanged: {
          this.windowsSessionChangedSubject.next([
            eventMessage.getPublisherId(),
            WindowsUserInfo.deserializeBinary(payload),
          ]);
          break;
        }
        default:
          log.warn('PushSub unimplemented message topic');
          break;
      }
      this.eventReceivedSubject.next(eventMessage);
    };

    this.pushSubStream.on('data', callback);
    // Jeśli włączyć stream.on('error') to nie działa stream.on('status')
    // Obecnie obsługujemy 'status'
    this.pushSubStream.on('status', (status) => {
      const error = new ErrorWithStatus(status);
      log.error(`PushSub EventsStream error. Code: ${error.code} (${error.codeName}) Message: ${error.message} apiError: ${error.apiError}`);
      this.eventStreamErrorSubject.next(error);
    });
  }


  /**
   * Implementacja IPushSub
   */
  publish(
    topic: string,
    publishId: string,
    payload: Uint8Array | string
  ): Observable<PublishResponse> {
    const request = new PublishRequest();
    request.setTopic(topic);
    request.setPayload(payload);

    const newPublishId = (publishId === '' && this.lastPublishId)
      ? (this.lastPublishId += 1).toString()
      : publishId;
    request.setPublishId(newPublishId);
    const authMetaData = authMetaDataSelector(store.getState());
    const response = this.pushSubServiceClient.publish(request, authMetaData);
    return rxjsFromWithErrorHandle(response);
  }
}

export default new PushSub();
