import {
  IsYSequentialMessage,
  IsYUpdateMessage,
  WsMessage,
  YDocContentType,
  YSequentialMessage,
  YUpdateMessage
} from '@property-folders/contract';
import * as Y from 'yjs';
import * as awarenessProtocol from 'y-protocols/awareness';
import * as wsc from 'lib0/websocket';
import { fromBase64, toBase64 } from 'lib0/buffer';
import { Observable } from 'lib0/observable';
import { uuidv4 } from 'lib0/random';

interface AwarenessUpdate {
  added: [];
  updated: [];
  removed: [];
}

type DocBindingInfo = {
  id: string;
  ct: YDocContentType;
  doc: Y.Doc;
  updateHandler: (update: Uint8Array, origin: unknown) => void;
  queueSequential: YSequentialMessage[];
  awareness?: awarenessProtocol.Awareness;
  awarenessUpdateHandler?: (update: AwarenessUpdate, origin: unknown) => void;
  synced: boolean;
  /**
   * While disconnected, set to true if an update message attempt was made
   */
  pendingUpdate: boolean;
};

export type MessageLogLevel = 'full' | 'type' | 'none';

export interface DeniedEvent {
  docs: {id: string, ct: YDocContentType}[]
}

export enum YClientEventName {
  connect = 'connect',
  disconnect = 'disconnect',
  queueCleared = 'queueCleared',
  synced = 'synced',
  pendingUpdate = 'pendingUpdate',
  message = 'message',
  denied = 'denied',
  hasAccess = 'hasAccess'
}

const wsMaxLengthForRawUpdate = (3 * 31000) / 4;
const wsMaxLengthForEncodedMessage = 32000;
export type IsRoFunc = (id: string) => boolean;

export class YClient extends Observable<string> {
  /**
   * Map url => client
   */
  static _instances: Map<string, YClient> = new Map();

  public static instance(url: string, opts: { isRo: IsRoFunc }) {
    const key = url.toLowerCase();
    const existing = this._instances.get(key);
    if (existing) {
      return existing;
    }

    const newInstance = new YClient(url, { ...opts, messageLogLevel: 'none', logUpdates: false });
    this._instances.set(key, newInstance);
    return newInstance;
  }

  public static destroy(url: string) {
    const key = url.toLowerCase();
    const existing = this._instances.get(key);

    if (!existing) {
      return;
    }

    existing.destroyInternal();
    this._instances.delete(key);
  }

  private _client: wsc.WebsocketClient;
  private _boundDocs: Map<Y.Doc, DocBindingInfo> = new Map();
  // reverse-map of _boundDocs, these should be managed in sync
  private _boundIds: Map<string, DocBindingInfo> = new Map();
  private _preSignedReqs: {
    url: string;
    headers?: Record<string,string>;
  }[] = [];
  private _largeMessageQueue: WsMessage[] = [];
  private _logGeneral: (...data: any[]) => void;
  private _logMessage: (dir: 'send' | 'recv', message: WsMessage) => void;
  private _logUpdate: (update: Uint8Array) => void;
  private _logStateVector: (update: Uint8Array) => void;
  private _isRo: IsRoFunc;

  constructor(
    private url:
      string,
    opts: {
      isRo: IsRoFunc,
      messageLogLevel: MessageLogLevel,
      logUpdates: boolean,
      logStateVectors?: boolean
    }
  ) {
    super();
    this._client = new wsc.WebsocketClient(this.url);
    this._client.on('message', (message: WsMessage) => this.receive(message));
    this._client.on('connect', () => this.connected());
    this._client.on('disconnect', () => this.disconnected());

    this._isRo = opts.isRo;

    switch (opts.messageLogLevel) {
      case 'full':
        this._logMessage = (dir, message) => console.log(dir, message);
        this._logGeneral = console.log;
        break;
      case 'type':
        this._logMessage = (dir, message) => console.log(dir, { type: message.type });
        this._logGeneral = console.log;
        break;
      default:
        this._logMessage = () => {
          return;
        };
        this._logGeneral = () => {
          return;
        };
        break;
    }

    this._logUpdate = opts.logUpdates
      ? (update) => Y.logUpdate(update)
      : () => {
        return;
      };

    this._logStateVector = opts.logStateVectors
      ? (update) => {
        const sv = Y.encodeStateVectorFromUpdate(update);
        console.log({
          sv,
          enc: toBase64(sv)
        });
      }
      : () => {
        return;
      };
  }

  private connected() {
    this.emit(YClientEventName.connect, []);
    // auth is handled at the connection layer so we can just go right ahead and resub
    this.resubscribe();
  }

  private disconnected() {
    this._logGeneral('disconnected event fired');
    this._largeMessageQueue = [];
    // clear queues on disconnect:
    // when the client eventually reconnects, there will be an exchange of messages to get latest updates
    for (const doc of this._boundDocs.values()) {
      if (doc.queueSequential.length) {
        doc.queueSequential = [];
        this.setPendingUpdate(doc, true);
      }
      this.setSynced(doc, false);
      if (!doc.awareness) {
        continue;
      }

      // if we're disconnected then we don't have access to correct awareness information anymore. clear it out.
      const awareness = doc.awareness;
      awarenessProtocol.removeAwarenessStates(
        awareness,
        Array.from(doc.awareness.getStates().keys())
          .filter(client => client !== awareness.doc.clientID),
        'connection closed'
      );
    }
    this.emit(YClientEventName.disconnect, []);
  }

  private receive(message: WsMessage) {
    if (!(message && message.type)) {
      return;
    }
    if (message.type == 'pong') {
      return;
    }
    this._logMessage('recv', message);
    switch (message.type) {
      case 'y-up': {
        const binding = this._boundIds.get(message.id);

        if (!binding) {
          break;
        }

        const url = message.url || message.uri;
        if (url) {
          fetch(url).then(resp => {
            return resp.arrayBuffer();
          }).then(ab => {
            const update = new Uint8Array(ab);
            this.applyUpdate(binding, update);
          }).catch((err: unknown) => {
            console.error('failed to fetch update content from uri', {
              url,
              id: message.id,
              err
            });
          });
          break;
        }

        if (message.update) {
          const update = fromBase64(message.update);
          this.applyUpdate(binding, update);
        }
        break;
      }
      case 'y-aw-up': {
        const binding = this._boundIds.get(message.id);

        if (!binding?.awareness) {
          break;
        }

        awarenessProtocol.applyAwarenessUpdate(binding.awareness, fromBase64(message.update), this);
        break;
      }
      case 'y-get': {
        const binding = this._boundIds.get(message.id);
        if (!binding) {
          break;
        }
        this.emit(YClientEventName.hasAccess, [message.id]);
        const doc = binding.doc;
        const diff = message.vector
          ? Y.encodeStateAsUpdate(doc, fromBase64(message.vector))
          : Y.encodeStateAsUpdate(doc);

        this.sendUpdate(binding.id, diff, message.url, message.headers);
        break;
      }
      case 'y-aw-get': {
        // deprecated
        break;
      }
      case 'y-ack': {
        const binding = this._boundIds.get(message.id);
        if (!binding) {
          break;
        }
        if (binding.queueSequential[0]?.mid != message.omid) {
          console.error(`Expected current message in the queue to be ${message.id}`);
          binding.queueSequential = [];
          break;
        }
        binding.queueSequential.shift();
        if (binding.queueSequential.length === 0) {
          this.setPendingUpdate(this._boundIds.get(message.id), false);
        }
        this.sendNextAckable(message.id);
        break;
      }
      case 'y-nack': {
        const binding = this._boundIds.get(message.id);
        if (!binding) {
          break;
        }
        console.error(`Server processing message ${message.id} failed. Initialising re-sync`);
        binding.queueSequential = [];
        this.send({
          type: 'y-pend',
          id: message.id,
          ct: binding.ct
        });
        break;
      }
      case 'y-denied': {
        this.emit(YClientEventName.denied, [message.ids]);
        for (const id of message.ids) {
          const binding = this._boundIds.get(id);
          if (!binding) {
            continue;
          }

          binding.queueSequential = [];
          this.setPendingUpdate(binding, false);
        }
        break;
      }
      case 'psurl-provide': {
        this._preSignedReqs.push(...message.urls.map(url => ({ url })));
        this.processLargeMessageQueue();
        break;
      }
      case 'psreq-provide': {
        this._preSignedReqs.push(...message.reqs);
        this.processLargeMessageQueue();
        break;
      }
      default:
        this.emit(YClientEventName.message, [message]);
        break;
    }
  }

  private applyUpdate(binding: DocBindingInfo, update: Uint8Array) {
    this._logUpdate(update);
    Y.applyUpdate(binding.doc, update, this);
    this.setSynced(binding, true);
  }

  private setSynced(binding: DocBindingInfo, synced: boolean) {
    if (binding.synced === synced) {
      return;
    }

    binding.synced = synced;
    this.emit(YClientEventName.synced, [binding.doc, synced]);
  }

  private setPendingUpdate(binding: DocBindingInfo | undefined, pendingUpdate: boolean) {
    if (!binding) {
      return;
    }

    if (this._isRo(binding.id)) {
      return;
    }

    if (binding.pendingUpdate === pendingUpdate) {
      return;
    }

    binding.pendingUpdate = pendingUpdate;
    this.emit(YClientEventName.pendingUpdate, [binding.doc, pendingUpdate, binding.id]);
  }

  public async waitForSync(doc: Y.Doc) {
    const binding = this._boundDocs.get(doc);
    if (!binding) {
      return false;
    }

    if (binding.synced) {
      return true;
    }

    return await new Promise(resolve => {
      if (binding.synced) {
        resolve(true);
      }
      const handler = (eventDoc: Y.Doc, synced: boolean) => {
        if (eventDoc === doc && synced) {
          this.off(YClientEventName.synced, handler);
          resolve(true);
        }
      };
      this.on(YClientEventName.synced, handler);
    });
  }

  private send(message: WsMessage) {
    if (message.type === 'y-up' && this._isRo(message.id)) return;

    if (!this._client.connected) {
      this._logGeneral('cannot send, disconnected', message.type);
      if (message.type === 'y-up' || message.type === 'y-pend') {
        this.setPendingUpdate(this._boundIds.get(message.id), true);
      }
      return;
    }

    if (IsYSequentialMessage(message)) {
      const binding = this._boundIds.get(message.id);
      if (!binding) {
        console.error(`no binding found for id ${message.id}`);
        return;
      }
      binding.queueSequential.push(message);
      this.setPendingUpdate(binding, true);
      // if it's not 1, then there's something already in-flight, so don't send it!
      if (binding.queueSequential.length === 1) {
        this.sendNextAckable(message.id);
      }
    } else {
      this._logMessage('send', message);
      if (JSON.stringify(message).length > wsMaxLengthForEncodedMessage) {
        this._largeMessageQueue.push(message);
        this.processLargeMessageQueue();
      } else {
        this._client.send(message);
      }
    }
  }

  private sendNextAckable(id: string) {
    const binding = this._boundIds.get(id);
    if (!binding) {
      console.error(`no binding found for id ${id}`);
      return;
    }

    this.flattenQueueSequential(binding);

    const message = binding.queueSequential[0] as WsMessage | undefined;
    if (!message) {
      return;
    }

    this._logMessage('send', message);
    this._client.send(message);
  }

  // for now, we only care about sequences of small update messages
  // only call this just before sending a new message.
  // assumption is that there is not currently a message in-flight.
  private flattenQueueSequential(binding: DocBindingInfo) {
    if (binding.queueSequential.length < 2) return;

    let first: YUpdateMessage | undefined;
    let accumulatedUpdate: Uint8Array | undefined;
    let accumulatedCount = 0;

    for (let i = 0; i < binding.queueSequential.length; i++) {
      const message = binding.queueSequential.at(i);
      if (!IsYUpdateMessage(message)) break;
      if (!message.update) break;

      if (first && accumulatedUpdate) {
        const newUpdate = fromBase64(message.update);
        const newAccumulatedUpdate = Y.mergeUpdates([accumulatedUpdate, newUpdate]);

        // stop! it's too big!
        if (newAccumulatedUpdate.length > wsMaxLengthForRawUpdate) break;

        accumulatedUpdate = newAccumulatedUpdate;
      } else {
        first = message;
        accumulatedUpdate = fromBase64(message.update);
      }
      accumulatedCount += 1;
    }

    if (!first) return;
    if (!accumulatedUpdate) return;
    if (accumulatedCount < 2) return;

    first.update = toBase64(accumulatedUpdate);
    binding.queueSequential.splice(1, accumulatedCount - 1);
  }

  private processLargeMessageQueue() {
    while (this._largeMessageQueue.length) {
      if (!this._preSignedReqs.length) {
        this._client.send({
          type: 'psreq-request'
        });
        return;
      }

      const req = this._preSignedReqs.shift();
      const message = this._largeMessageQueue.shift();

      // this is more for the benefit of the typescript compiler
      if (!(req && message)) continue;
      const { url, headers } = req;

      fetch(url, {
        method: 'PUT',
        body: JSON.stringify(message),
        headers: {
          'Content-Type': 'application/json',
          ...headers
        }
      }).then(() => {
        this._client.send({
          type: 'psurl-process',
          url
        });
      }).catch(err => {
        this._logGeneral('Error sending large message for processing', err);
      });
    }
  }

  private sendUpdate(id: string, update: Uint8Array, uri: string | undefined = undefined, headers?: Record<string,string>) {
    const binding = this._boundIds.get(id);

    if (!binding) {
      return;
    }

    if (!update.length) {
      this.setPendingUpdate(binding, false);
      return;
    }

    if (update.length === 2 && update[0] === 0 && update[1] === 0) {
      // don't think we need to send this. it's just saying there's no changes
      this.setPendingUpdate(binding, false);
      return;
    }

    if (update.length < 4) {
      // just curious
      console.log(update);
      Y.logUpdate(update);
    }

    if (update.length > wsMaxLengthForRawUpdate) {
      if (uri) {
        fetch(uri, {
          method: 'PUT',
          body: update,
          headers: {
            'Content-Type': 'application/octet-stream',
            ...headers
          }
        }).then(() => {
          this.send({
            type: 'y-up',
            id,
            ct: binding.ct,
            uri,
            mid: uuidv4()
          });
        }).catch(err => {
          this._logGeneral('Error putting y-up update', id, err);
        });
        return;
      } else {
        this.send({
          type: 'y-pend',
          id,
          ct: binding.ct
        });
      }
      return;
    }

    const encoded = toBase64(update);
    this.send({
      type: 'y-up',
      id,
      ct: binding.ct,
      update: encoded,
      mid: uuidv4()
    });
  }

  private sendPreparedAwarenessUpdate(id: string, ct: YDocContentType, update: Uint8Array) {
    const encoded = toBase64(update);
    this.send({
      type: 'y-aw-up',
      id,
      ct,
      update: encoded
    });
  }

  private destroyInternal() {
    for (const binding of this._boundDocs.values()) {
      this.unbindInternal(binding);
    }
    this._client.destroy();
    super.destroy();
  }

  destroy() {
    YClient.destroy(this.url);
  }

  connect() {
    this._client.connect();
  }

  disconnect() {
    this._client.disconnect();
  }

  bind(doc: Y.Doc, id: string, type: YDocContentType) {
    const existing = this._boundDocs.get(doc);
    if (existing) {
      return;
    }

    const binding: DocBindingInfo = {
      id,
      ct: type,
      doc,
      queueSequential: [],
      updateHandler: (update, origin) => {
        // log all update state vectors
        this._logStateVector(update);
        if (origin === this) {
          // origin is `this` when the client applied an update received from the network
          return;
        }
        this.sendUpdate(id, update);
      },
      synced: false,
      pendingUpdate: false
    };
    doc.on('update', binding.updateHandler);
    this._boundDocs.set(doc, binding);
    this._boundIds.set(id, binding);
    this.subscribe(doc);
    this.getUpdates(doc);
  }

  unbind(doc: Y.Doc) {
    const binding = this._boundDocs.get(doc);
    this.unbindInternal(binding);
  }

  private unbindInternal(binding: DocBindingInfo | undefined) {
    if (!binding) {
      return;
    }
    this.unbindAwarenessInternal(binding);
    this.unsubscribe(binding.doc);
    binding.doc.off('update', binding?.updateHandler);
    this._boundDocs.delete(binding.doc);
    this._boundIds.delete(binding.id);
  }

  private unbindAwarenessInternal(binding: DocBindingInfo | undefined) {
    if (!(binding?.awareness && binding.awarenessUpdateHandler)) {
      return;
    }

    // we don't need to send an explicit clear-states message, as it looks like destroy() does so internally
    binding.awareness.destroy();
    binding.awareness = undefined;
    binding.awarenessUpdateHandler = undefined;
  }

  bindAwareness(doc: Y.Doc) {
    const binding = this._boundDocs.get(doc);
    if (!binding) {
      return undefined;
    }

    if (binding.awareness) {
      return binding.awareness;
    }

    const awareness = new awarenessProtocol.Awareness(doc);
    binding.awareness = awareness;
    binding.awarenessUpdateHandler = (event, origin) => {
      if (origin === this) {
        return;
      }
      const { added, updated, removed } = event;
      const changedClients = added.concat(updated).concat(removed);
      const update = awarenessProtocol.encodeAwarenessUpdate(awareness, changedClients);
      this.sendPreparedAwarenessUpdate(binding.id, binding.ct, update);
    };
    awareness.on('update', binding.awarenessUpdateHandler);
    return awareness;
  }

  unbindAwareness(doc: Y.Doc) {
    const binding = this._boundDocs.get(doc);
    this.unbindAwarenessInternal(binding);
  }

  private subscribe(doc: Y.Doc) {
    const binding = this._boundDocs.get(doc);
    if (!binding) {
      return;
    }

    this.send({
      type: 'y-sub',
      docs: [{
        id: binding.id,
        ct: binding.ct,
        vector: toBase64(Y.encodeStateVector(binding.doc))
      }]
    });
  }

  private resubscribe() {
    this.send({
      type: 'y-sub',
      docs: [...this._boundDocs.values()].map(binding => {
        return {
          id: binding.id,
          ct: binding.ct,
          vector: toBase64(Y.encodeStateVector(binding.doc))
        };
      })
    });
  }

  private unsubscribe(doc: Y.Doc) {
    const binding = this._boundDocs.get(doc);
    if (!binding) {
      return;
    }
    this.send({
      type: 'y-unsub',
      ids: [binding.id]
    });
  }

  private predictedBase64Size(size: number): number {
    return (4 * (size / 3)) & ~3;
  }

  getUpdates(doc: Y.Doc, full?: boolean) {
    const binding = this._boundDocs.get(doc);
    if (!binding) {
      return;
    }

    if (full) {
      this.send({
        type: 'y-get',
        id: binding.id,
        ct: binding.ct
      });
      return;
    }

    const stateVector = Y.encodeStateVector(doc);
    const encoded = toBase64(stateVector);

    this.send({
      type: 'y-get',
      id: binding.id,
      ct: binding.ct,
      vector: encoded
    });
  }

  hasPendingOutboundMessagesFor(id: string) {
    const binding = this._boundIds.get(id);
    if (!binding) {
      return false;
    }
    return binding.queueSequential.length > 0 || binding.pendingUpdate;
  }

  hasPendingOutboundMessages() {
    for (const binding of this._boundDocs.values()) {
      if (binding.queueSequential.length || binding.pendingUpdate) {
        this._logGeneral('found pending binding', binding);
        return true;
      }
    }

    return false;
  }

  suppressConnection() {
    this._client.disconnect();
  }
  unsuppressConnection() {
    this._client.connect();
  }
}
