import React, { useMemo, useEffect } from "react"; import { useSelector, shallowEqual, useDispatch } from "react-redux"; import { AppDispatch } from "../../App"; import { RootState } from "../../store"; import { WalletMap } from "../../store/reducers/WalletsReducer"; import * as wsActions from "../../store/actions/WebsocketActions"; import packageJson from "../../../package.json"; import { APIResponse, KristAddress, KristTransaction, WSConnectionState, WSIncomingMessage, WSSubscriptionLevel } from "../../krist/api/types"; import { findWalletByAddress, syncWalletUpdate } from "../../krist/wallets/Wallet"; import WebSocketAsPromised from "websocket-as-promised"; import Debug from "debug"; const debug = Debug("kristweb:ws"); const DEFAULT_CONNECT_DEBOUNCE = 1000; const MAX_DEBOUNCE = 360000; class WebsocketConnection { private wallets?: WalletMap; private ws?: WebSocketAsPromised; private reconnectionTimer?: number; private messageID = 1; private connectDebounce = DEFAULT_CONNECT_DEBOUNCE; constructor(private dispatch: AppDispatch) { debug("WS component init"); this.attemptConnect(); } setWallets(wallets: WalletMap): void { this.wallets = wallets; } private async connect() { debug("attempting connection to server..."); this.setConnectionState("disconnected"); const syncNode = packageJson.defaultSyncNode; // TODO: support alt nodes // Get a websocket token const res = await fetch(syncNode + "/ws/start", { method: "POST" }); if (!res.ok || res.status !== 200) throw new Error("ws.errorToken"); const data: APIResponse<{ url: string }> = await res.json(); if (!data.ok || data.error) throw new Error("ws.errorToken"); this.setConnectionState("connecting"); // Connect to the websocket server this.ws = new WebSocketAsPromised(data.url, { packMessage: data => JSON.stringify(data), unpackMessage: data => JSON.parse(data.toString()) }); this.ws.onUnpackedMessage.addListener(this.handleMessage.bind(this)); this.ws.onClose.addListener(this.handleClose.bind(this)); this.messageID = 1; await this.ws.open(); this.connectDebounce = DEFAULT_CONNECT_DEBOUNCE; } async attemptConnect() { try { await this.connect(); } catch (err) { this.handleDisconnect(err); } } private handleDisconnect(err?: Error | number) { if (this.reconnectionTimer) window.clearTimeout(this.reconnectionTimer); // TODO: show errors to the user? this.setConnectionState("disconnected"); debug("failed to connect to server, reconnecting in %d ms", this.connectDebounce, err); this.reconnectionTimer = window.setTimeout(() => { this.connectDebounce = Math.min(this.connectDebounce * 2, MAX_DEBOUNCE); this.attemptConnect(); }, this.connectDebounce); } handleClose(event: { reason: number }) { this.handleDisconnect(event.reason); } private setConnectionState(state: WSConnectionState) { this.dispatch(wsActions.setConnectionState(state)); } handleMessage(data: WSIncomingMessage) { if (!this.ws || !this.ws.isOpened || this.ws.isClosed) return; if (data.ok === false || data.error) debug("message ERR: %d %s", data.id || -1, data.error); if (data.type === "hello") { // Initial connection debug("connected"); this.setConnectionState("connected"); // Subscribe to all the events this.subscribe("transactions"); this.subscribe("blocks"); this.subscribe("names"); this.subscribe("motd"); // Re-sync all balances just in case this.refreshBalances(); } else if (data.address && this.wallets) { // Probably a response to `refreshBalance` const address: KristAddress = data.address; const wallet = findWalletByAddress(this.wallets, address.address); if (!wallet) return; debug("syncing %s to %s (balance: %d)", address.address, wallet.id, address.balance); syncWalletUpdate(this.dispatch, wallet, address); } else if (data.type === "event" && data.event && this.wallets) { // Handle events switch (data.event) { case "transaction": { // If we receive a transaction relevant to any of our wallets, refresh // the balances. const transaction = data.transaction as KristTransaction; debug("transaction from %s to %s", transaction.from || "null", transaction.to || "null"); const fromWallet = findWalletByAddress(this.wallets, transaction.from); if (fromWallet) this.refreshBalance(fromWallet.address); const toWallet = findWalletByAddress(this.wallets, transaction.to); if (toWallet) this.refreshBalance(toWallet.address); break; } } } } /** Queues a command to re-fetch an address's balance. The response will be * handled in {@link handleMessage}. */ refreshBalance(address: string) { debug("refreshing balance of %s", address); this.ws?.sendPacked({ type: "address", id: this.messageID++, address }); } /** Re-syncs balances for all the wallets, just in case. */ refreshBalances() { debug("refreshing all balances"); const { wallets } = this; if (!wallets) return; for (const id in wallets) { this.refreshBalance(wallets[id].address); } } /** Subscribe to a Krist WS event. */ subscribe(event: WSSubscriptionLevel) { this.ws?.sendPacked({ type: "subscribe", event, id: this.messageID++ }); } } export function WebsocketService(): JSX.Element | null { const { wallets } = useSelector((s: RootState) => s.wallets, shallowEqual); const dispatch = useDispatch(); const connection = useMemo(() => new WebsocketConnection(dispatch), []); useEffect(() => { connection.setWallets(wallets); }, [wallets]); return null; }