// Copyright (c) 2020-2021 Drew Lemmy // This file is part of KristWeb 2 under GPL-3.0. // Full details: https://github.com/tmpim/KristWeb2/blob/master/LICENSE.txt import { useState, useEffect } from "react"; import { useSelector, shallowEqual } from "react-redux"; import { store } from "../../App"; import { RootState } from "../../store"; import { WalletMap } from "../../store/reducers/WalletsReducer"; import * as wsActions from "../../store/actions/WebsocketActions"; import * as nodeActions from "../../store/actions/NodeActions"; import * as api from "../../krist/api"; import { KristAddress, KristBlock, KristTransaction, WSConnectionState, WSIncomingMessage, WSSubscriptionLevel } from "../../krist/api/types"; import { findWalletByAddress, syncWallet, syncWalletUpdate } from "../../krist/wallets/Wallet"; import WebSocketAsPromised from "websocket-as-promised"; import { throttle } from "lodash-es"; import Debug from "debug"; const debug = Debug("kristweb:ws"); const REFRESH_THROTTLE_MS = 500; const DEFAULT_CONNECT_DEBOUNCE_MS = 1000; const MAX_CONNECT_DEBOUNCE_MS = 360000; class WebsocketConnection { private wallets?: WalletMap; private ws?: WebSocketAsPromised; private reconnectionTimer?: number; private messageID = 1; private connectDebounce = DEFAULT_CONNECT_DEBOUNCE_MS; private forceClosing = false; // TODO: automatically clean this up? private refreshThrottles: Record<string, (address: string) => void> = {}; constructor(public syncNode: string) { debug("WS component init"); this.attemptConnect(); } setWallets(wallets: WalletMap): void { this.wallets = wallets; } private async connect() { debug("attempting connection to server..."); this.setConnectionState("disconnected"); // Get a websocket token const { url } = await api.post<{ url: string }>("ws/start"); this.setConnectionState("connecting"); // Connect to the websocket server this.ws = new WebSocketAsPromised(url, { packMessage: data => JSON.stringify(data), unpackMessage: data => JSON.parse(data.toString()) }); this.ws.onUnpackedMessage.addListener(this.handleMessage.bind(this)); this.ws.onClose.addListener(this.handleClose.bind(this)); this.messageID = 1; await this.ws.open(); this.connectDebounce = DEFAULT_CONNECT_DEBOUNCE_MS; } async attemptConnect() { try { await this.connect(); } catch (err) { this.handleDisconnect(err); } } private handleDisconnect(err?: Error) { if (this.reconnectionTimer) window.clearTimeout(this.reconnectionTimer); // TODO: show errors to the user? this.setConnectionState("disconnected"); debug("failed to connect to server, reconnecting in %d ms", this.connectDebounce, err); this.reconnectionTimer = window.setTimeout(() => { this.connectDebounce = Math.min(this.connectDebounce * 2, MAX_CONNECT_DEBOUNCE_MS); this.attemptConnect(); }, this.connectDebounce); } handleClose(event: { code: number; reason: string }) { debug("ws closed with code %d reason %s", event.code, event.reason); this.handleDisconnect(); } /** Forcibly disconnect this instance from the websocket server (e.g. on * component unmount) */ forceClose(): void { debug("received force close request"); if (this.forceClosing) return; this.forceClosing = true; if (!this.ws || !this.ws.isOpened || this.ws.isClosed) return; debug("force closing ws"); this.ws.close(); } private setConnectionState(state: WSConnectionState) { store.dispatch(wsActions.setConnectionState(state)); } handleMessage(data: WSIncomingMessage) { if (!this.ws || !this.ws.isOpened || this.ws.isClosed) return; if (data.ok === false || data.error) debug("message ERR: %d %s", data.id || -1, data.error); if (data.type === "hello") { // Initial connection debug("connected"); this.setConnectionState("connected"); // Subscribe to all the events this.subscribe("transactions"); this.subscribe("blocks"); this.subscribe("names"); this.subscribe("motd"); // Re-sync all balances just in case this.refreshBalances(); } else if (data.address && this.wallets) { // Probably a response to `refreshBalance` const address: KristAddress = data.address; const wallet = findWalletByAddress(this.wallets, address.address); if (!wallet) return; debug("syncing %s to %s (balance: %d)", address.address, wallet.id, address.balance); syncWalletUpdate(wallet, address); } else if (data.type === "event" && data.event && this.wallets) { // Handle events switch (data.event) { case "transaction": { // If we receive a transaction relevant to any of our wallets, refresh // the balances. const transaction = data.transaction as KristTransaction; debug("transaction [%s] from %s to %s", transaction.type, transaction.from || "null", transaction.to || "null"); const fromWallet = findWalletByAddress(this.wallets, transaction.from || undefined); const toWallet = findWalletByAddress(this.wallets, transaction.to); switch (transaction.type) { // Update the name counts using the address lookup case "name_purchase": case "name_transfer": if (fromWallet) syncWallet(fromWallet); if (toWallet) syncWallet(toWallet); break; // Any other transaction; refresh the balances via the websocket default: if (fromWallet) this.refreshBalance(fromWallet.address); if (toWallet) this.refreshBalance(toWallet.address); break; } break; } case "block": { // Update the last block ID, which will trigger a re-fetch for // work-related and block value-related components. const block = data.block as KristBlock; debug("block id now %d", block.height); store.dispatch(nodeActions.setLastBlockID(block.height)); break; } } } } /** Queues a command to re-fetch an address's balance. The response will be * handled in {@link handleMessage}. This is automatically throttled to * execute on the leading edge of 500ms (REFRESH_THROTTLE_MS). */ refreshBalance(address: string) { if (this.refreshThrottles[address]) { // Use the existing throttled function if it exists this.refreshThrottles[address](address); } else { // Create and cache a new throttle function for this address const throttled = throttle( this._refreshBalance.bind(this), REFRESH_THROTTLE_MS, { leading: true, trailing: false } ); this.refreshThrottles[address] = throttled; throttled(address); } } private _refreshBalance(address: string) { debug("refreshing balance of %s", address); this.ws?.sendPacked({ type: "address", id: this.messageID++, address }); } /** Re-syncs balances for all the wallets, just in case. */ refreshBalances() { debug("refreshing all balances"); const { wallets } = this; if (!wallets) return; for (const id in wallets) { this.refreshBalance(wallets[id].address); } } /** Subscribe to a Krist WS event. */ subscribe(event: WSSubscriptionLevel) { this.ws?.sendPacked({ type: "subscribe", event, id: this.messageID++ }); } } export function WebsocketService(): JSX.Element | null { const { wallets } = useSelector((s: RootState) => s.wallets, shallowEqual); const syncNode = api.useSyncNode(); const [connection, setConnection] = useState<WebsocketConnection | undefined>(); // On first render, or if the sync node changes, create the websocket // connection useEffect(() => { // Don't reconnect if we already have a connection and the sync node hasn't // changed (prevents infinite loops) if (connection && connection.syncNode === syncNode) return; // Close any existing connections if (connection) connection.forceClose(); // Connect to the Krist websocket server setConnection(new WebsocketConnection(syncNode)); // On unmount, force close the existing connection return () => { if (connection) connection.forceClose(); }; }, [syncNode, connection]); // If the wallets change, let the websocket service know so that it can keep // track of events related to any new wallets useEffect(() => { if (connection) connection.setWallets(wallets); }, [wallets, connection]); return null; }