import {ObjectUnsubscribedError, Subject, Subscriber, Subscription} from 'rxjs';

export interface ProcessQueueConfig<T> {
  initialValue?: T;
  initialValues?: T[];
  processFunctions?: Function[];
  delayProcessing?: boolean;
}

export class ProcessQueue<T> extends Subject<T> {
  /* values not processed by functions yet */
  private _rawQueue: T[] = [];

  /* values processed by functions */
  private _processedQueue: T[] = [];

  /* current value sent to subscribers
     allows for lazy subscriptions.
   */
  private _currentValue: T | undefined;

  /* functions to transform values */
  private _processFunctions: Function[] = [];

  private _delayProcessing = false;

  constructor(private config?: ProcessQueueConfig<T>) {
    super();

    if (
      this.config?.delayProcessing !== undefined &&
      this.config.delayProcessing !== null
    ) {
      this._delayProcessing = this.config.delayProcessing;
    }

    if (this.config?.initialValue) {
      this._rawQueue.push(this.config.initialValue);
    }

    if (this.config?.initialValues) {
      this._rawQueue.push(...this.config.initialValues);
    }

    if (this._rawQueue.length && !this._delayProcessing) {
      this._processValues([...this._rawQueue]);
      this._rawQueue = [];
    }
  }

  get value(): T[] {
    return this.getValues();
  }

  /** @internal */
  protected _subscribe(subscriber: Subscriber<T>): Subscription {
    // @ts-ignore
    const subscription = super._subscribe(subscriber);
    if (!subscription.closed) {
      if (this._currentValue) {
        subscriber.next(this._currentValue);
      }
    }
    return subscription;
  }

  getValues(): T[] {
    const {hasError, thrownError, _processedQueue} = this;
    if (hasError) {
      throw thrownError;
    }
    this._throwIfClosed();
    return _processedQueue;
  }

  protected _throwIfClosed() {
    if (this.closed) {
      throw new ObjectUnsubscribedError();
    }
  }

  private _iterate() {
    const that = this;
    if (this._processedQueue.length) {
      /**
       * Save current value for future subscriptions
       */
      this._currentValue = this._processedQueue.shift();
      if (this._currentValue) {
        super.next(this._currentValue);
      }
      this._iterate.call(that);
    }
  }

  private _processValues(values: T[]) {
    const _values = [...values];

    if (this._processFunctions.length) {
      for (let f = 0; f < this._processFunctions.length; f++) {
        const processFunction = this._processFunctions[f];

        for (let v = 0; v < _values.length; v++) {
          _values[v] = processFunction(_values[v]);
        }
      }
    }

    this._processedQueue.push(..._values);

    this._iterate();
  }

  addFunction(f: Function): void {
    this._processFunctions.push(f);
  }

  addFunctions(f: Function[]): void {
    this._processFunctions.push(...f);
  }

  start(): void {
    this._delayProcessing = false;
    if (this._rawQueue.length) {
      this._processValues([...this._rawQueue]);
      this._rawQueue = [];
    }
  }

  override next(value: T): void {
    if (this._delayProcessing) {
      this._rawQueue.push(value);
    } else {
      this._processValues([value]);
    }
  }

  nextMany(values: T[]) {
    if (this._delayProcessing) {
      this._rawQueue.push(...values);
    } else {
      this._processValues(values);
    }
  }
}
