import { Observable } from 'rxjs';
import { share } from 'rxjs/operators';
import { diffusionMessageDescriptor } from '../message-descriptor';
import TopicStatus from '../topic/topic-status';

const shouldIgnore = (message) =>
  !message || message.getStatus() !== TopicStatus.removal;

const handleWith = (observer) => (message) => {
  try {
    const descriptor = diffusionMessageDescriptor.for(message);
    observer.next(descriptor);
  } catch (error) {
    observer.error(error);
  }
};

const DIFFUSION_TIMEOUT_SECONDS = 10;

// eslint-disable-next-line import/prefer-default-export
export const fromDiffusion = ({
  Diffusion,
  diffusionHost,
  diffusionPort,
  onCallbackFunction,
}) =>
  Observable.create((observer) => {
    const handler = handleWith(observer);

    Diffusion.connect({
      // debug: true,
      wsTimeout: DIFFUSION_TIMEOUT_SECONDS * 1000,
      disableFlash: true,
      disableSilverlight: true,
      disableXHR: true,
      disableIframe: true,
      disableForeverFrame: true,
      serverHost: diffusionHost,
      serverPort: diffusionPort,
      onDataFunction: handler,
      onTopicStatusFunction: (message) => {
        if (shouldIgnore(message)) return;
        handler(message);
      },
      onLostConnectionFunction: () => {
        Diffusion.trace('onLostConnectionFunction');
        Diffusion.reconnect();
      },
      onAbortFunction: () => {
        Diffusion.trace('ERROR -> Aborted');
      },
      onCallbackFunction,
      onBeforeUnloadFunction: () => {
        Diffusion.trace('onBeforeUnloadFunction');
        observer.complete();
      },
      onServerRejectedCredentialsFunction: () => {
        Diffusion.trace('credentials rejected');
      },
      onConnectionRejectFunction: () => {
        Diffusion.trace('onConnectionRejectFunction');
      },
      onMessageNotAcknowledgedFunction: () => {
        Diffusion.trace('onMessageNotAcknowledgedFunction');
      },
      onInvalidClientFunction: () => {
        Diffusion.trace('onInvalidClientFunction');
      },
      onCascadeFunction: (transport: string) => {
        Diffusion.trace('onCascadeFunction');

        if (transport === 'None') {
          Diffusion.close();
          /** if we got None we cant connect to diffusion with any allowed transports */
          Diffusion.trace('Cant load any transport');
        }
      },
      onPingFunction: (message: string) => {
        Diffusion.trace(message);
      },
    });

    return () => {
      Diffusion.trace('fromDiffusion close');
      Diffusion.close();
    };
  }).pipe(share());
