import {
  isServerValue,
  RTDBServerValueTIMESTAMP,
} from '@lp-lib/firebase-typesafe';
import { type Logger } from '@lp-lib/logger-base';
import { ConnectionStatus } from '@lp-lib/shared-schema';

import { apiService } from '../../../services/api-service';
import { backoffSleep } from '../../../utils/backoffSleep';
import {
  err2s,
  TimeoutError,
  uuidv4,
  waitForCondition,
} from '../../../utils/common';
import { Emitter, type EmitterListener } from '../../../utils/emitter';
import { type FirebaseEvents, type FirebaseService } from '../../Firebase';
import { CloudHostingUtils } from './shared';
import {
  type ControllerKind,
  FIREBASE_PREFIX,
  type OndGameCommand,
  type OnDGameController,
} from './types';

type OnDGameControlMessage = {
  command: OndGameCommand;
};

type MessageCallback = (message: OnDGameControlMessage) => Promise<void>;

type ControlRequest = {
  id: string;
  state: 'pending' | 'completed';
  message: OnDGameControlMessage;
  createdAt: number | RTDBServerValueTIMESTAMP;
  completedAt?: number | RTDBServerValueTIMESTAMP;
};

export class Consumer {
  private terminiated: boolean;
  private batchRef;
  private cleanup?: () => void;
  constructor(
    readonly controllerId: string,
    readonly venueId: string,
    readonly svc: FirebaseService,
    private log: Logger
  ) {
    this.terminiated = false;
    this.batchRef = svc.prefixedRef(`${FIREBASE_PREFIX}/${venueId}/control`);
    const controlRequestRef = svc.prefixedRef<ControlRequest>(
      `${FIREBASE_PREFIX}/${venueId}/control/request`
    );
    // remove the request when disconnected
    // case 1:
    //  if there is no pending/complete request, everything is okay
    // case 2:
    //  if there is a pending request, it will be removed when controller
    //  disconnected. It's okay since the whole game will be paused. The
    //  coordinator needs to retry.
    // case 3:
    //  if there is a complete request, the coordinator will be timeout.
    //  we should ensure the real command consumer is idempotent.
    controlRequestRef.onDisconnect().remove();
  }
  private async consume(
    req: ControlRequest | null,
    cb: MessageCallback,
    maxAttempts = 3,
    maxBackOffMs = 5000
  ) {
    let attempts = 1;
    while (attempts <= maxAttempts) {
      try {
        if (req?.state === 'pending') {
          await cb(req.message);
          req.state = 'completed';
          req.completedAt = RTDBServerValueTIMESTAMP;
          const newCompletedRef = this.batchRef.child('completed').push();
          await this.batchRef.update({
            '/request': req,
            [`/completed/${newCompletedRef.key}`]: req, // add to completed list
          });
          this.log.info(`request consumed: ${req.message.command}`, {
            id: req.id,
            state: req.state,
          });
        }
        break;
      } catch (error) {
        this.log.error(
          `comsume request failed, req: ${req?.id}, attempts: ${attempts}`,
          err2s(error)
        );
        await backoffSleep(attempts - 1, maxBackOffMs);
        attempts++;
      }
    }
  }
  start(cb: MessageCallback): void {
    if (this.terminiated)
      throw new Error('cannot start a terminiated consumer');
    const ref = this.batchRef.child<'request', ControlRequest>('request');
    let lastReq: ControlRequest | null = null;
    const callback = ref.on('value', async (snap) => {
      const req = snap.val();
      // RTDBServerValueTIMESTAMP causes the same value being triggered
      // more than once
      if (req?.id === lastReq?.id) return;
      lastReq = req;
      // the publisher cannot enqueue a new req if the current one is not
      // consumed.
      await this.consume(req, cb);
    });
    this.cleanup = () => {
      ref.off('value', callback);
    };
  }
  stop(terminate = false): void {
    this.log.info('stop consumer', { terminate });
    this.cleanup?.();
    this.terminiated = terminate;
  }
}

export class CommandQueueFullError extends Error {
  name = 'CommandQueueFullError';
}

export class CommandTimeoutError extends Error {
  name = 'CommandTimeoutError';
}

export class OnDGameCommandDispatcher {
  private ref;
  constructor(venueId: string, svc: FirebaseService, private log: Logger) {
    this.ref = svc.prefixedRef<ControlRequest>(
      `${FIREBASE_PREFIX}/${venueId}/control/request`
    );
  }

  async dispatch(
    payload: OnDGameControlMessage,
    options?: {
      callTimeout?: number;
      checkInterval?: number;
    }
  ): Promise<void> {
    const config = CloudHostingUtils.GetDefaultConfig();
    const opts = {
      callTimeout: config.coordinator.callTimeout,
      checkInterval: config.coordinator.checkInterval,
      ...options,
    };
    const newRequest: ControlRequest = {
      id: uuidv4(),
      state: 'pending',
      createdAt: RTDBServerValueTIMESTAMP,
      message: payload,
    };
    const result = await this.ref.transaction(
      (req) => {
        if (req && req.state === 'pending') return req;
        return newRequest;
      },
      undefined,
      true
    );
    this.log.info(`send command: ${newRequest.message.command}`, {
      id: newRequest.id,
      state: newRequest.state,
      committed: result.committed,
    });
    if (!result.committed) {
      this.log.error(
        'send command failed, transaction is not committed.',
        null
      );
      throw new Error('send command failed');
    }
    const committed = result.snapshot?.val();
    if (newRequest.id !== committed?.id) {
      this.log.error('command queue is full', null, {
        local: newRequest.id,
        remote: committed?.id,
      });
      throw new CommandQueueFullError('command queue is full');
    }

    // wait the remote controller to complete the command
    return (async () => {
      const reqId = newRequest.id;
      try {
        await waitForCondition(
          async () => {
            const snap = await this.ref.get();
            const synced = snap.val();
            if (!synced) return false;
            if (synced.id === reqId) {
              if (synced.state === 'completed') {
                await this.ref.set(null);
                return true;
              }
              return false;
            } else {
              this.log.warn('command changed', {
                local: reqId,
                remote: synced.id,
              });
              throw new Error('command is not processed in sequence');
            }
          },
          opts.callTimeout,
          opts.checkInterval
        );
      } catch (e) {
        if (e instanceof TimeoutError) {
          throw new CommandTimeoutError(
            `command timeout after ${opts.callTimeout} ms`
          );
        } else {
          throw e;
        }
      }
    })();
  }
}

// for testing
export class CallRequestHistory {
  private ref;
  constructor(venueId: string, svc: FirebaseService) {
    this.ref = svc.prefixedRef(`${FIREBASE_PREFIX}/${venueId}/control`);
  }

  async completed(): Promise<ControlRequest[]> {
    const snap = await this.ref.child('completed').orderByKey().get();
    const results = snap.val() as { [key: string]: ControlRequest };
    return Object.values(results);
  }
}

type HeartbeatOptions = {
  interval?: number;
  firebase?: boolean;
  inHouse?: boolean;
};

type AcquireOptions = {
  pollInterval?: number;
  heartbeatInterval?: number;
  heartbeat?: HeartbeatOptions;
};

export type ControllerEvents = {
  'taken-over': (next: OnDGameController) => void;
  'controller-changed': (
    curr: Nullable<OnDGameController>,
    next: Nullable<OnDGameController>
  ) => void;
};

type SignalResponse = {
  event: 'taken-over';
  instance: OnDGameController;
};

type Signal = () => Promise<SignalResponse>;

class SyncedInstance {
  // latest synced
  private _latest: Nullable<OnDGameController>;
  // latest synced only if it's not-null/undefined
  private _latestValued: Nullable<OnDGameController>;

  set(next: Nullable<OnDGameController>) {
    this._latest = next;
    if (!!next) {
      this._latestValued = next;
    }
  }

  get latest() {
    return this._latest;
  }

  get latestValued() {
    return this._latestValued;
  }
}

export class OndGameControllerSemaphore {
  private ref;
  private connectionStatusHandle;
  private consumer: Consumer | null;
  private syncedInstance: SyncedInstance;
  private emitter = new Emitter<ControllerEvents>();
  // heartbeat
  private hb: {
    id: ReturnType<typeof setTimeout> | null;
    pause: boolean;
    logging: {
      log: Logger;
      sendingEnabled: boolean;
      receiveEnabled: boolean;
    };
  };
  on = this.emitter.on.bind(this.emitter);
  off = this.emitter.off.bind(this.emitter);

  constructor(
    readonly venueId: string,
    private readonly firebase: {
      svc: FirebaseService;
      emitter: EmitterListener<FirebaseEvents>;
    },
    private log: Logger,
    private deps = {
      syncCloudInstanceBinding: apiService.venue.syncCloudInstanceBinding.bind(
        apiService.venue
      ),
      getCloudInstanceBinding: apiService.venue.getCloudInstanceBinding.bind(
        apiService.venue
      ),
    }
  ) {
    this.consumer = null;
    this.syncedInstance = new SyncedInstance();
    this.hb = {
      id: null,
      pause: true,
      logging: {
        log: log.scoped('ond-game-controller-heartbeat'),
        sendingEnabled: true,
        receiveEnabled: false,
      },
    };
    this.ref = firebase.svc.prefixedSafeRef<Nullable<OnDGameController>>(
      `${FIREBASE_PREFIX}/${venueId}/controller`
    );
    this.connectionStatusHandle = firebase.svc.createConnectionStatusHandle();
  }

  watch(): void {
    this.ref.on('value', async (snap) => {
      const next = snap.val();
      if (this.hb.logging.receiveEnabled) {
        this.hb.logging.log.info('heartbeat received', { ...next });
      }
      if (this.syncedInstance.latest?.id !== next?.id) {
        this.log.info('controller updated', {
          curr: this.syncedInstance ? { ...this.syncedInstance } : null,
          next,
          consumerControllerId: this.consumer?.controllerId,
        });
        this.emitter.emit(
          'controller-changed',
          this.syncedInstance.latest,
          next
        );
      }
      this.syncedInstance.set(next);
      if (!this.consumer) return;
      if (
        this.syncedInstance.latest &&
        this.syncedInstance.latest.id !== this.consumer.controllerId
      ) {
        this.log.info('controller take over, auto release');
        await this.internalRelease();
        this.emitter.emit('taken-over', this.syncedInstance.latest);
      }
    });
  }

  unwatch(): void {
    this.ref.off();
  }

  async acquire(
    id: string,
    kind: ControllerKind,
    cb: MessageCallback,
    options?: AcquireOptions
  ): Promise<Signal> {
    this.log.info('acquire attempt', { id, kind });
    if (this.consumer) throw new Error('controller is already accquired');
    const result = await this.ref.transaction(
      (current) => {
        this.log.info('acquire transaction', { controller: current });

        const next: OnDGameController = {
          id,
          kind,
          acquiredAt: RTDBServerValueTIMESTAMP,
          pingedAt: RTDBServerValueTIMESTAMP,
          status: ConnectionStatus.Connected,
          env: this.firebase.svc.prefix(),
        };

        switch (current?.kind) {
          case 'local':
          case 'cloud':
            // in local dev, a page refresh will cause the controller to be
            // disconnected. We'd like to take over the control in this case.
            if (
              current.status === ConnectionStatus.Disconnected ||
              CloudHostingUtils.NoHeartbeat(current)
            ) {
              // take over the control, the old instance (browser) should
              // receive _taken-over_ event and do the cleanup.
              return next;
            }
            return; // abort transaction
          default:
            return next;
        }
      },
      undefined,
      true
    );
    if (!result.committed) {
      throw new Error('Acquire failed, transaction is not committed.');
    }
    const committed = result.snapshot?.val();
    if (committed?.id !== id) {
      throw new Error(
        `Acquire failed, id mismatch: ${committed?.id} !== ${id}.`
      );
    }
    this.log.info('acquired', { controller: committed });
    await this.registerOnDisconnect();
    this.startHeartbeat(committed, options?.heartbeat);
    this.consumer = new Consumer(id, this.venueId, this.firebase.svc, this.log);
    this.consumer.start(cb);

    this.firebase.emitter.on('connection-state-changed', async (connected) => {
      if (connected) {
        let backfilled = false;
        const syncedController = this.syncedInstance.latestValued;
        if (syncedController && this.consumer) {
          const result = await this.ref.transaction((current) => {
            this.log.info('recovery transaction', { controller: current });
            if (current?.id !== syncedController.id) {
              this.log.info('abort recovery transaction, id mismatch', {
                local: syncedController.id,
                remote: current?.id,
              });
              return;
            }
            return {
              id: syncedController.id,
              kind: syncedController.kind,
              acquiredAt: syncedController.acquiredAt,
              pingedAt: syncedController.pingedAt,
              status: ConnectionStatus.Connected,
              env: syncedController.env,
            };
          });
          backfilled = result.committed;
        }
        if (backfilled) {
          await this.registerOnDisconnect();
        }
        this.hb.pause = false;
        this.log.info(`firebase reconnected, resume heartbeat`, {
          consumer: !!this.consumer,
          backfilled,
          latestValued: syncedController,
        });
      } else {
        this.hb.pause = true;
        this.log.info('firebase disconnected, pause heartbeat');
      }
    });

    return () => {
      return new Promise<SignalResponse>((resolve) => {
        this.emitter.on('taken-over', (next) => {
          resolve({ event: 'taken-over', instance: next });
        });
      });
    };
  }

  async release(): Promise<boolean> {
    if (!this.consumer) return false;
    const id = this.consumer.controllerId;
    this.log.info('release attempt', { id });
    const result = await this.ref.transaction(
      (controller) => {
        this.log.info('release transaction', { controller });
        if (!controller) return controller;
        if (controller.id !== id) return;
        return null;
      },
      undefined,
      true
    );
    if (!result.committed) {
      throw new Error('Release failed, transaction is not committed.');
    }
    const committed = result.snapshot?.val();
    if (committed) {
      throw new Error(
        `Release failed, id mismatch: ${committed?.id} !== ${id}.`
      );
    }
    await this.internalRelease();
    this.log.info('released', { id });
    return true;
  }

  updateHeartbeatLogging(args: {
    sendingEnabled?: boolean;
    receiveEnabled?: boolean;
  }): void {
    if (args.sendingEnabled !== undefined) {
      this.hb.logging.sendingEnabled = args.sendingEnabled;
    }
    if (args.receiveEnabled !== undefined) {
      this.hb.logging.receiveEnabled = args.receiveEnabled;
    }
  }

  private async registerOnDisconnect() {
    this.log.info('register onDisconnect handler');
    await this.ref
      .onDisconnect()
      .update({
        status: ConnectionStatus.Disconnected,
        disconnectedAt: RTDBServerValueTIMESTAMP,
        disconnectedReason: 'OndGameControl#registerOnDisconnect',
      })
      .catch((e) => {
        this.log.error('register onDisconnect.remove failed', e);
      });
    this.log.info('registered onDisconnect handler');
  }

  private async unRegisterOnDisconnect() {
    this.log.info('removing onDisconnect handler');
    await this.ref.onDisconnect().cancel();
    this.log.info('removed onDisconnect handler');
  }

  private async internalRelease(): Promise<void> {
    await this.unRegisterOnDisconnect();
    this.stopHeartbeat();
    if (this.consumer) {
      this.consumer.stop();
      this.consumer = null;
    }
  }

  async dispose(): Promise<void> {
    if (this.consumer) {
      await this.release();
    }
    this.unwatch();
    this.connectionStatusHandle.destroy();
  }

  private startHeartbeat(
    c: OnDGameController,
    options?: HeartbeatOptions
  ): void {
    const {
      interval = CloudHostingUtils.GetDefaultConfig().heartbeatInterval,
      firebase = true,
      inHouse = true,
    } = { ...options };
    this.stopHeartbeat();
    this.hb.pause = false;
    this.hb.id = setInterval(async () => {
      if (this.hb.pause) {
        if (this.hb.logging.sendingEnabled) {
          this.hb.logging.log.info('heartbeat paused');
        }
        return;
      }
      try {
        const p1 = firebase
          ? this.ref.transaction(
              (v) => {
                if (this.hb.logging.sendingEnabled) {
                  this.hb.logging.log.info('heartbeat transaction', {
                    controller: v,
                  });
                }
                if (!v) return;
                if (
                  this.connectionStatusHandle.connected &&
                  !isServerValue(v.disconnectedAt) &&
                  v.disconnectedAt !== null &&
                  v.disconnectedAt !== undefined
                ) {
                  // Fixup the connection. We're obviously online, but there is
                  // a state mismatch in the database. This is likely due to the
                  // onDisconnect hook executing serverside _after_ the user has
                  // reconnected. There is nothing today that can receive
                  // confirmation that the onDisconnect hook has _not_ executed.
                  // The heartbeat is still running because this client knows it
                  // is connected. So fixup this data.
                  return {
                    ...v,
                    status: ConnectionStatus.Connected,
                    disconnectedAt: null as never,
                    disconnectedReason: null as never,
                    pingedAt: RTDBServerValueTIMESTAMP,
                  };
                } else {
                  return { ...v, pingedAt: RTDBServerValueTIMESTAMP };
                }
              },
              undefined,
              true
            )
          : Promise.resolve();
        // local controller doesn't have cloud instance binding
        const p2 =
          inHouse && c.kind === 'cloud'
            ? this.deps.syncCloudInstanceBinding({
                venueId: this.venueId,
                id: c.id,
                kind: c.kind,
                acquiredAt: isServerValue(c.acquiredAt) ? 0 : c.acquiredAt,
                env: c.env,
                status: c.status,
                disconnectedAt: isServerValue(c.disconnectedAt)
                  ? 0
                  : c.disconnectedAt,
              })
            : Promise.resolve();
        const [a, b] = await Promise.all([p1, p2]);
        if (this.hb.logging.sendingEnabled) {
          this.hb.logging.log.info('heartbeat sent', {
            firebase: {
              committed: a?.committed,
              val: a?.snapshot?.val(),
            },
            inHouse: {
              status: b?.status,
            },
          });
        }
      } catch (error) {
        this.hb.logging.log.error('heartbeat sent failed', error);
      }
    }, interval);
  }

  private stopHeartbeat(): void {
    if (this.hb.id) {
      clearInterval(this.hb.id);
      this.hb.id = null;
    }
    this.hb.pause = true;
  }

  // For backward compatibility, the default exposed controller must be connected.
  async instance(
    allowDisconnected = false,
    cached = false
  ): Promise<OnDGameController | null> {
    let val = this.syncedInstance.latestValued;
    if (!cached) {
      const snap = await this.ref.get();
      val = snap.val();
    }
    if (!val) return null;
    return val.status === ConnectionStatus.Connected || allowDisconnected
      ? val
      : null;
  }

  // For backward compatibility, the default exposed controller must be connected.
  async ensureInstance(opts?: {
    getTimeoutMs?: number;
    heartbeatTimeoutMs?: number;
    allowDisconnected?: boolean;
  }): Promise<OnDGameController> {
    const { getTimeoutMs, allowDisconnected, heartbeatTimeoutMs } = {
      getTimeoutMs: 5000,
      allowDisconnected: false,
      heartbeatTimeoutMs: 15000,
      ...opts,
    };
    // 1. check firebase
    const controller = await waitForCondition(
      async () => {
        // if _allowDisconnected_ is true, the condition is satisfied if
        // _latest_ is not null.
        if (allowDisconnected) {
          return this.syncedInstance.latest;
        }
        // otherwise, the condition is satisfied if _latest_ is not null and
        // connected
        if (this.syncedInstance.latest?.status === ConnectionStatus.Connected) {
          return this.syncedInstance.latest;
        }
        return null;
      },
      getTimeoutMs,
      100,
      `controller not found in firebase after ${getTimeoutMs}ms`
    );

    if (!CloudHostingUtils.NoHeartbeat(controller, heartbeatTimeoutMs)) {
      return controller;
    }

    const msg = `controller has no heartbeat in firebase`;
    this.log.info(msg, { controller, heartbeatTimeoutMs });

    if (controller.kind !== 'cloud') throw new Error(msg);

    // 2. in case the firebase connection is bad, try 2nd check from the backend
    // of cloud instance binding
    const resp = await this.deps.getCloudInstanceBinding(this.venueId);
    const binding = resp.data.binding;
    this.log.info('check binding', { binding });
    if (!binding.controller || binding.controller.id !== controller.id) {
      throw new Error(`${msg}, binding does not match`);
    }
    if (CloudHostingUtils.NoHeartbeat(binding.controller, heartbeatTimeoutMs)) {
      throw new Error(`${msg}, also no heartbeat in binding`);
    } else {
      this.log.info('binding controller is still alive, backfill');
      this.syncedInstance.set(controller);
      return binding.controller;
    }
  }
}
