import type { Consumer } from "@rails/actioncable";
import { replicateGraphQL } from "rxdb/plugins/replication-graphql";
import { RxCollection } from "rxdb";
import { singular } from "pluralize";
import { camelCase, upperFirst } from "lodash";
import { createActionCableHandler } from "graphql-ruby-client/subscriptions/createActionCableHandler";
import { RxReplicationState } from "rxdb/plugins/replication";

import { DatabaseConfig } from "./types";

type Checkpoint = { id: string; updatedAt: number };

// This function tries to automatically map the schema to a GraphQL fields
// query. Therefore, every sync step will include all fields defined in the
// schema.
function resolveFields(properties: { [p: string]: any }, level = 0): string {
  const SPACER = "  ";
  let output = "";

  for (const [name, definition] of Object.entries(properties)) {
    // remove "private" fields (RxDB specific fields like _deleted)
    if (name.startsWith("_")) {
      continue;
    }

    if (definition.type === "array") {
      if (definition.items.type === "object") {
        output += `${SPACER.repeat(level)}${name} {\n`;
        output += resolveFields(definition.items.properties, level + 1);
        output += `${SPACER.repeat(level)}}\n`;
      } else {
        output += `${SPACER.repeat(level)}${name}\n`;
      }
    } else if (definition.type === "object") {
      output += `${SPACER.repeat(level)}${name} {\n`;
      output += resolveFields(definition.properties, level + 1);
      output += `${SPACER.repeat(level)}}\n`;
    } else {
      output += `${SPACER.repeat(level)}${name}\n`;
    }
  }

  return output;
}

function resolveVariables(
  properties: { [k: string]: Record<string, any> },
  documentState: Record<string, any>,
): Record<string, any> {
  const alwaysIncludeKeys = ["id", "deleted"];
  const finalVariables: Record<string, any> = {};
  for (const [key, value] of Object.entries(documentState)) {
    const definition = properties[key];
    if (definition?.final && !alwaysIncludeKeys.includes(key)) {
      continue;
    }

    // recursively resolve nested properties
    if (definition?.properties) {
      // nested properties where the base is null, are just null
      if (value === null) {
        finalVariables[key] = null;
      } else {
        finalVariables[key] = resolveVariables(definition.properties, value);
      }
    } else if (
      definition?.type === "array" &&
      definition?.items &&
      definition?.items?.properties
    ) {
      if (!value) continue;
      finalVariables[key] = value.map((item: Record<string, any>) =>
        resolveVariables(definition.items.properties, item),
      );
    } else {
      finalVariables[key] = value;
    }
  }

  return finalVariables;
}

const streamQueryBuilder =
  (
    consumer: Consumer,
    collection: RxCollection,
    replicationState: RxReplicationState<any, Checkpoint>,
  ) =>
  () => {
    const className = upperFirst(camelCase(singular(collection.name)));
    const handler = createActionCableHandler({ cable: consumer });
    const query = `subscription stream${className} {
      stream${className} {
        documents {
          ${resolveFields(collection.schema.jsonSchema.properties)}
          deleted
        }
        checkpoint {
          id
          updatedAt
        }
      }
    }`;

    handler(
      { name: `stream${className}`, text: query },
      {},
      {},
      {
        // eslint-disable-next-line @typescript-eslint/no-empty-function
        onCompleted() {},
        onNext: ({ data }: any) => {
          if (data) {
            replicationState.emitEvent(data[`stream${className}`]);
          }
        },
        onError: (error: Error) => console.error(error),
      },
    );
  };

const pullQueryBuilder =
  (collection: RxCollection) =>
  (checkpoint: Checkpoint | undefined, limit?: number) => {
    if (!checkpoint) {
      checkpoint = {
        id: "",
        updatedAt: 0,
      };
    }

    const className = upperFirst(camelCase(singular(collection.name)));
    const query = `query pull${className}($checkpoint: CheckpointInput!, $limit: Int!) {
      pull${className}(checkpoint: $checkpoint, limit: $limit) {
        documents {
            ${resolveFields(collection.schema.jsonSchema.properties)}
            deleted
        }
        checkpoint {
          id
          updatedAt
        }
      }
    }`;

    return {
      query,
      variables: {
        checkpoint,
        limit,
      },
    };
  };

const pushQueryBuilder = (collection: RxCollection) => (records: any[]) => {
  const className = upperFirst(camelCase(singular(collection.name)));
  const query = `
    mutation push${className}($records: [Push${className}Input!]!) {
      push${className}(records: $records) {
        documents {
          ${resolveFields(collection.schema.jsonSchema.properties)}
          deleted
        }
      }
    }
  `;

  const variables = {
    records: records.map((record) => ({
      assumedMasterState: record.assumedMasterState
        ? resolveVariables(
            collection.schema.jsonSchema.properties,
            record.assumedMasterState,
          )
        : null,
      newDocumentState: resolveVariables(
        collection.schema.jsonSchema.properties,
        record.newDocumentState,
      ),
    })),
  };

  return {
    query,
    variables,
  };
};

export async function replicate(
  config: DatabaseConfig,
  consumer: Consumer,
  collection: RxCollection,
) {
  // declare headers
  const headers: { Authorization?: string } = {};

  if (config.auth?.currentToken?.accessToken?.accessToken) {
    headers.Authorization = `Bearer ${config.auth?.currentToken?.accessToken?.accessToken}`;
  }

  const replicationState = replicateGraphQL({
    collection,
    replicationIdentifier: "graphql",
    deletedField: "deleted",
    headers,
    url: {
      http: config.api.http,
    },
    pull: {
      queryBuilder: pullQueryBuilder(collection),
      modifier: (doc) => {
        // We have to remove optional non-existent field values
        // they are set as null by GraphQL but should be undefined
        Object.entries(doc).forEach(([k, v]) => {
          if (v === null) {
            delete doc[k];
          }
        });

        return doc;
      },
      batchSize: 100,
    },
    push: {
      queryBuilder: pushQueryBuilder(collection),
      responseModifier: (response) => response.documents,
      batchSize: 100,
    },
  });
  replicationState.setCredentials("include");
  replicationState.error$.subscribe((error) => {
    if (
      -1 !==
      error.parameters?.errors?.findIndex(
        (err) => err.message === "Unauthenticated client",
      )
    ) {
      // on web, we redirect to login page
      if (
        window &&
        typeof window === "object" &&
        window.location &&
        window.location.href
      ) {
        console.log("user is not authenticated, redirect to login");
        window.location.href = config.url.userSignIn;
      }
    } else {
      console.error("replication error:", error.parameters.errors);
    }
  });
  streamQueryBuilder(consumer, collection, replicationState)();

  return replicationState;
}
