index.mjs

// Copyright 2025 KtorZ <matthias.benkort@gmail.com>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/** @namespace OgmiosWebSocket */
/** @module @cardano-ogmios/mdk */

import * as SafeJson from "./safe-json.mjs";
import { IsoWebSocket } from "./iso-websocket.mjs";

/**
 * @memberOf module:@cardano-ogmios/mdk
 * @public
 * @type {object}
 * @property {function} parse
 *  A drop-in replacement for JSON.parse that handles big numbers.
 * @property {function} stringify
 *  A drop-in replacement for JSON.stringify that handles big numbers.
 *
 * @see {@link https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/JSON/parse}
 * @see {@link https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/JSON/stringify}
 */
export const Json = SafeJson.Json;

/** @private */
const DEFAULT_CONNECTION_STRING = "ws://127.0.0.1:1337";

/** @private */
const INITIAL_PIPELINE_BURST = 100;

/**
 * @memberOf module:@cardano-ogmios/mdk
 *
 * @description
 *   Creates an RPC-2.0 payload from a method, optional params and an optional id.
 * @param {string} method
 * @param {object} [params]
 * @param {any} [id]
 * @return {string}
 */
export function newRpcRequest(method, params = {}, id) {
  return Json.stringify({
    jsonrpc: "2.0",
    method,
    params,
    ...(id && { id }),
  });
}

/**
 * @callback application
 * @memberOf module:@cardano-ogmios/mdk
 * @param {OgmiosWebSocket.OgmiosWebSocket} ws
 *  An isomorphic WebSocket-like interface connected to a running Ogmios server. In addition to the
 *  usual event listeners methods, the object is enhanced with handful of useful methods.
 * @param {module:@cardano-ogmios/mdk.done} done
 *  A callback to mark the end of the application, possibly yielding a result.
 */

/**
 * @memberOf module:@cardano-ogmios/mdk
 *
 * @description
 *  Open a connection to Ogmios, and run the provided callback. The websocket is passed as a
 *  callback for subsequent operations.
 * @async
 * @param {module:@cardano-ogmios/mdk.application} callback
 *  Continuation once the connection has been established.
 * @param {string} [connectionString="ws://127.0.0.1:1337"]
 *  Server's connection string, or localhost.
 * @return {Promise}
 *  A promise returning whatever is passed to the `done` callback.
 *
 * @example
 * await ogmios((ws, done) => {
 *   ws.once('message', (data) => {
 *     done(Json.parse(data).result);
 *   });
 *
 *   ws.rpc('queryLedgerState/tip');
 * });
 */
export function ogmios(callback, connectionString = DEFAULT_CONNECTION_STRING) {
  return new Promise((resolve, reject) => {
    /**
     * @memberOf OgmiosWebSocket
     *
     * @typedef OgmiosWebSocket
     * @mixes Websocket
     * @type {object}
     * @property {OgmiosWebSocket.rpc} rpc
     *  Perform an arbitrary rpc query using the given method name and optional parameters.
     * @property {OgmiosWebSocket.queryLedgerState} queryLedgerState
     *  A short-hand for running a single ledger-state query.
     * @property {OgmiosWebSocket.queryNetwork} queryNetwork
     *  A short-hand for running a single network query.
     * @property {OgmiosWebSocket.newChainFollower} newChainFollower
     *  Create a generator for following the chain from any given point.
     * @see {@link https://github.com/websockets/ws/blob/master/doc/ws.md#class-websocket WebSocket}
     */
    const ws = new IsoWebSocket(connectionString, {
      maxPayload: Number.MAX_SAFE_INTEGER,
    });

    let output;

    ws.once("close", (code, reason, data) => {
      if (code !== 1000) {
        reject({
          message: "connection ended with error.",
          code,
          reason: reason.toString(),
          ...(data && { data }),
        });
      } else {
        resolve(output);
      }
    });

    ws.once("error", reject);

    ws.once("open", async () => {
      /**
       * @callback done
       * @memberOf module:@cardano-ogmios/mdk
       * @param {any} [result]
       *  An optional result returned by the application.
       */
      output = await new Promise((done) => callback(ws, done));
      ws.close();
    });

    // ----------------------------------------------------------------------------- General Helpers
    /**
     * @memberOf OgmiosWebSocket
     *
     * @description
     *  Perform an arbitrary rpc query using the given method name and optional parameters. This supposes
     *  that an event listener on `"message"` has been installed.
     *
     * @callback rpc
     * @param {string} method
     *  The Ogmios RPC method.
     * @param {object} [params]
     *  Optional parameters for the method.
     * @param {any} [id]
     *  An optional request id.
     *
     * @see {@link https://ogmios.dev/api/}
     *
     * @example
     * ws.once("message", (data) => { ... });
     * ws.rpc('findIntersection', { point: [ "origin" ] }, "my-request-id");
     * @example
     * ws.once("message", (data) => { ... });
     * ws.rpc('queryLedgerState/tip');
     */
    ws.rpc = function (method, params, id) {
      ws.send(newRpcRequest(method, params, id));
    };

    // ------------------------------------------------------------------- Local-State-Query Helpers

    function request(method, params) {
      return new Promise((resolve, reject) => {
        ws.once("message", (data) => {
          const { error, result } = Json.parse(data);
          if (error !== undefined) {
            reject(error);
          } else {
            resolve(result);
          }
        });

        ws.rpc(method, params);
      });
    }

    /**
     * @memberOf OgmiosWebSocket
     *
     * @description
     *  A short-hand for running a single ledger-state query. If a state has been acquired, then it is ran on
     *  that state. If not, it is ran from the most recent ledger state available.
     *
     * @callback queryLedgerState
     * @param {string} method
     *  The ledger state query name (without "queryLedgerState/").
     * @param {object} [params]
     *  Optional parameters for the query, if any.
     * @return {Promise<any>}
     *  A promise holding the result for that query.
     * @example
     * const tip = await ws.queryLedgerState("tip");
     */
    ws.queryLedgerState = function (method, params) {
      return request(`queryLedgerState/${method}`, params);
    };

    /**
     * @memberOf OgmiosWebSocket
     *
     * @description
     *  A short-hand for running a single network query.
     *
     * @callback queryNetwork
     * @param {string} method
     *  The ledger state query name (without "queryNetwork/").
     * @param {object} [params]
     *  Optional parameters for the query, if any.
     * @return {Promise<any>}
     *  A promise holding the result for that query.
     * @example
     * const startTime = await ws.queryNetwork("startTime");
     */
    ws.queryNetwork = function (method, params) {
      return request(`queryNetwork/${method}`, params);
    };

    // -------------------------------------------------------------------- Local-Chain-Sync helpers

    /**
     * @memberOf OgmiosWebSocket
     *
     * @description
     *  Create a generator of RollForward|RollBackward for following the chain from any given point.
     *
     * @callback newChainFollower
     * @param {Array<OgmiosWebSocket.Point|"origin">} [points]
     *  Points to use for intersection in order to negotiate for an intersection.
     * @param {integer} [count]
     *  Number of blocks to fetch. If omitted, will run indefinitely.
     * @return {OgmiosWebSocket.asyncChainFollower}
     *
     * @example
     * const chainFollower = await ws.newChainFollower();
     *
     * for await (const { block } of chainFollower()) {
     *   console.log(block);
     * }
     *
     * @example
     *  const chainFollower = await ws.newChainFollower(
     *    [
     *      {
     *        id: "3d6f139f9f019668fe0412cacfaeb9e0be42e7b0f6ab21d6bddbc12d771ec18a",
     *        slot: 86268539,
     *      },
     *    ],
     *    1
     *  );
     *
     *  for await (const { block } of chainFollower()) {
     *    console.log(block);
     *  }
     *
     * @see {@link https://ogmios.dev/api/#operation-subscribe-/?NextBlock}
     */
    ws.newChainFollower = async function newChainFollower(start, count) {
      // Sanitize inputs, but allow for shorthand syntax passing only a count.
      if (typeof start === "number" && count == undefined) {
        count = start;
        start = undefined;
      }

      if (start != undefined && !Array.isArray(start)) {
        throw new Error(`expected an Array to start 'start', got something else: ${start}`);
      }

      if (count != undefined && !Number.isInteger(count)) {
        throw new Error(`expected an Integer 'count', got something else: ${count}`);
      }

      // On new block received, notify the consumer.
      function addBlockListener(signals) {
        let ignoredFirstRollBack = false;

        ws.on("message", (data) => {
          const { direction, block } = Json.parse(data).result;

          // First message is always backward, ignore.
          if (direction === "backward" && !ignoredFirstRollBack) {
            ignoredFirstRollBack = true;
            return;
          }

          signals.pop().notify({ direction, block });
        });
      }

      // Enqueue a nextBlock request, and another one if needs be once the former as
      // resolved.
      function enqueue(tasks, signals, again) {
        tasks.unshift(
          new Promise((resolve) => {
            signals.unshift(
              again()
                ? {
                    notify(block) {
                      enqueue(tasks, signals, again);
                      resolve(block);
                    },
                  }
                : { notify: resolve },
            );
            ws.rpc("nextBlock");
          }),
        );
      }

      function requestBlocks(tasks, signals) {
        // Pipeline some initial request, up to INITIAL_PIPELINE_BURST but no more than
        // 'count' if 'count' is defined.
        const pipelined = Math.min(count ?? INITIAL_PIPELINE_BURST, INITIAL_PIPELINE_BURST);

        let n = pipelined;

        for (let i = 0; i <= pipelined; i += 1) {
          // Enqueue a "nextBlock" request, and count how many we've enqueued so far.
          // If needed, enqueue a next one. The lambda we pass to enqueue captures "n"
          // in its closure, so that we can have interior mutability without bothering
          // the "enqueue" function with the explicit management of "n".
          enqueue(tasks, signals, () => {
            n += 1;
            return n <= (count ?? Number.MAX_SAFE_INTEGER);
          });
        }
      }

      const { tasks, signals } = await new Promise((resolve, reject) => {
        const signals = [];
        const tasks = [];

        if (start != undefined) {
          // If an intersection is provided, negotiate that intersection and proceed
          // from there. Provided it is valid.
          ws.once("message", (data) => {
            const { error } = Json.parse(data);

            if (error !== undefined) {
              return reject(error);
            }

            addBlockListener(signals);
            requestBlocks(tasks, signals);
            resolve({ tasks, signals });
          });

          ws.rpc("findIntersection", { points: start });
        } else {
          // If no intersection is provided, find the tip and negotiate an intersection
          // from the tip.
          ws.once("message", (data) => {
            const { tip } = Json.parse(data).result;
            ws.once("message", (data) => {
              addBlockListener(signals);
              requestBlocks(tasks, signals);
              resolve({ tasks, signals });
            });
            ws.rpc("findIntersection", { points: [tip] });
          });

          ws.rpc("nextBlock");
        }
      });

      /**
       * @memberOf OgmiosWebSocket
       * @function asyncChainFollower
       * @generator
       * @async
       * @description
       *  Yields roll-forward or roll-backward events from a node. This generator terminates after
       *  yielding all the requested blocks (count) or never if following the chain indefinitely.
       * @returns {Promise<RollForward|RollBackward>}
       * @see {@link https://ogmios.dev/api/#operation-subscribe-/?NextBlock}
       */
      return async function* () {
        if (count != undefined) {
          for (let i = 0; i < count; i += 1) {
            yield await tasks.pop();
          }
          // Clear up the last promise corresponding to the first rollback,
          // to avoid dangling promises. Although, yield no result for that one.
          signals.pop().notify();
          await tasks.pop();
        } else {
          // When no count is provided, we effectively have an infinite generator.
          // Requests are properly pipelined, so we can simply wait for the next task
          // to be resolved. At any point in time, we have INITIAL_PIPELINE_BURST + 1
          // tasks pending.
          let block;
          while ((block = await tasks.pop())) {
            yield block;
          }
        }
      };
    };
  });
}

/**
 * @memberOf OgmiosWebSocket
 *
 * @typedef Point
 * @description
 *  A point on chain.
 * @type {object}
 * @property {string} id
 *  A base-16 encoded header hash.
 * @property {integer} slot
 *  An absolute slot number.
 */