feat: local discovery (#1811)

* initialise the new package

* feat: implement LocalStorageDiscovery

* add: Startable to cspell

* add compliance test

* add: discovery tests

* rm: browser tests script

* address comments

* add type safety to getting peers from local

* only dispatch peer if it does not exist

* move ws ma extraction to utils

* chore: update package name to local-discovery

* fix: add compliance test with no external deps on service node

* use peer:identify instead of peer:update

* add: unit tests & remove sdk dependency

* move tests to self package

* update cspell + remove unrequired deps

* add types

* maintain in-memory peers for localstorage

* address comments

* chore: rename

* use name from options

* fix: saving peers

* rm: only
This commit is contained in:
Danish Arora 2024-02-16 20:06:27 +05:30 committed by GitHub
parent aabd907f6a
commit 199f6ab2ff
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
23 changed files with 1477 additions and 11 deletions

View File

@ -4,6 +4,7 @@
"language": "en",
"words": [
"abortable",
"Addrs",
"ahadns",
"Alives",
"asym",
@ -96,6 +97,7 @@
"secp",
"sharded",
"sscanf",
"Startable",
"staticnode",
"statusim",
"statusteam",
@ -125,7 +127,15 @@
"Привет",
"مرحبا"
],
"flagWords": ["pubSub: pubsub", "pubSubTopics: pubsubTopics", "pubSubTopic: pubsubTopic", "PubSub: Pubsub", "PubSubTopics: PubsubTopics", "PubSubTopic: PubsubTopic", "DefaultPubSubTopic: DefaultPubsubTopic"],
"flagWords": [
"pubSub: pubsub",
"pubSubTopics: pubsubTopics",
"pubSubTopic: pubsubTopic",
"PubSub: Pubsub",
"PubSubTopics: PubsubTopics",
"PubSubTopic: PubsubTopic",
"DefaultPubSubTopic: DefaultPubsubTopic"
],
"ignorePaths": [
"package.json",
"package-lock.json",

1
.gitignore vendored
View File

@ -13,3 +13,4 @@ test-results
playwright-report
example
allure-results
packages/local-discovery/mock_local_storage

963
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@ -12,6 +12,7 @@
"packages/relay",
"packages/peer-exchange",
"packages/dns-discovery",
"packages/local-discovery",
"packages/message-encryption",
"packages/sdk",
"packages/tests",

View File

@ -97,8 +97,11 @@ export class ConnectionManager
const peersDiscoveredByBootstrap: Peer[] = [];
const peersDiscoveredByPeerExchange: Peer[] = [];
const peersDiscoveredByLocal: Peer[] = [];
const peersConnectedByBootstrap: Peer[] = [];
const peersConnectedByPeerExchange: Peer[] = [];
const peersConnectedByLocal: Peer[] = [];
for (const peer of peersDiscovered) {
const tags = await this.getTagNamesForPeer(peer.id);
@ -107,6 +110,8 @@ export class ConnectionManager
peersDiscoveredByBootstrap.push(peer);
} else if (tags.includes(Tags.PEER_EXCHANGE)) {
peersDiscoveredByPeerExchange.push(peer);
} else if (tags.includes(Tags.LOCAL)) {
peersDiscoveredByLocal.push(peer);
}
}
@ -118,17 +123,21 @@ export class ConnectionManager
peersConnectedByBootstrap.push(peer);
} else if (tags.includes(Tags.PEER_EXCHANGE)) {
peersConnectedByPeerExchange.push(peer);
} else if (tags.includes(Tags.LOCAL)) {
peersConnectedByLocal.push(peer);
}
}
return {
DISCOVERED: {
[Tags.BOOTSTRAP]: peersDiscoveredByBootstrap,
[Tags.PEER_EXCHANGE]: peersDiscoveredByPeerExchange
[Tags.PEER_EXCHANGE]: peersDiscoveredByPeerExchange,
[Tags.LOCAL]: peersDiscoveredByLocal
},
CONNECTED: {
[Tags.BOOTSTRAP]: peersConnectedByBootstrap,
[Tags.PEER_EXCHANGE]: peersConnectedByPeerExchange
[Tags.PEER_EXCHANGE]: peersConnectedByPeerExchange,
[Tags.LOCAL]: peersConnectedByLocal
}
};
}

View File

@ -2,7 +2,8 @@ import type { Peer, PeerId, TypedEventEmitter } from "@libp2p/interface";
export enum Tags {
BOOTSTRAP = "bootstrap",
PEER_EXCHANGE = "peer-exchange"
PEER_EXCHANGE = "peer-exchange",
LOCAL = "local"
}
export interface ConnectionManagerOptions {
@ -40,10 +41,12 @@ export interface PeersByDiscoveryResult {
DISCOVERED: {
[Tags.BOOTSTRAP]: Peer[];
[Tags.PEER_EXCHANGE]: Peer[];
[Tags.LOCAL]: Peer[];
};
CONNECTED: {
[Tags.BOOTSTRAP]: Peer[];
[Tags.PEER_EXCHANGE]: Peer[];
[Tags.LOCAL]: Peer[];
};
}

View File

@ -16,3 +16,4 @@ export * from "./keep_alive_manager.js";
export * from "./dns_discovery.js";
export * from "./metadata.js";
export * from "./constants.js";
export * from "./local_storage.js";

View File

@ -0,0 +1,4 @@
export type LocalStoragePeerInfo = {
id: string;
address: string;
};

View File

@ -0,0 +1,6 @@
module.exports = {
parserOptions: {
tsconfigRootDir: __dirname,
project: "./tsconfig.dev.json"
}
};

View File

@ -0,0 +1,6 @@
{
"reporterEnabled": "spec, allure-mocha",
"allureMochaReporter": {
"outputDir": "allure-results"
}
}

View File

@ -0,0 +1,26 @@
const config = {
extension: ['ts'],
spec: 'src/**/*.spec.ts',
require: ['ts-node/register', 'isomorphic-fetch'],
loader: 'ts-node/esm',
'node-option': [
'experimental-specifier-resolution=node',
'loader=ts-node/esm'
],
exit: true
};
if (process.env.CI) {
console.log("Running tests in parallel");
config.parallel = true;
config.jobs = 6;
console.log("Activating allure reporting");
config.reporter = 'mocha-multi-reporters';
config.reporterOptions = {
configFile: '.mocha.reporters.json'
};
} else {
console.log("Running tests serially. To enable parallel execution update mocha config");
}
module.exports = config;

View File

View File

@ -0,0 +1,3 @@
const config = require("../../karma.conf.cjs");
module.exports = config;

View File

@ -0,0 +1,83 @@
{
"name": "@waku/local-discovery",
"version": "0.0.1",
"description": "Local Discovery keeps records of healthy peers in the browser's local storage for quicker bootstrapping.",
"types": "./dist/index.d.ts",
"module": "./dist/index.js",
"exports": {
".": {
"types": "./dist/index.d.ts",
"import": "./dist/index.js"
}
},
"type": "module",
"author": "Waku Team",
"homepage": "https://github.com/waku-org/js-waku/tree/master/packages/local-discovery#readme",
"repository": {
"type": "git",
"url": "https://github.com/waku-org/js-waku.git"
},
"bugs": {
"url": "https://github.com/waku-org/js-waku/issues"
},
"license": "MIT OR Apache-2.0",
"keywords": [
"waku",
"decentralized",
"secure",
"communication",
"web3",
"ethereum",
"dapps",
"privacy"
],
"scripts": {
"build": "run-s build:**",
"build:esm": "tsc",
"build:bundle": "rollup --config rollup.config.js",
"fix": "run-s fix:*",
"fix:lint": "eslint src *.js --fix",
"check": "run-s check:*",
"check:lint": "eslint src --ext .ts",
"check:spelling": "cspell \"{README.md,src/**/*.ts}\"",
"check:tsc": "tsc -p tsconfig.dev.json",
"prepublish": "npm run build",
"reset-hard": "git clean -dfx -e .idea && git reset --hard && npm i && npm run build",
"test": "NODE_ENV=test run-s test:*",
"test:node": "NODE_ENV=test TS_NODE_PROJECT=./tsconfig.dev.json mocha"
},
"engines": {
"node": ">=18"
},
"dependencies": {
"@libp2p/interface": "^1.1.2",
"@waku/interfaces": "^0.0.21",
"@waku/utils": "^0.0.14"
},
"devDependencies": {
"@rollup/plugin-commonjs": "^25.0.7",
"@rollup/plugin-json": "^6.0.0",
"@rollup/plugin-node-resolve": "^15.2.3",
"@types/chai": "^4.3.11",
"@types/node-localstorage": "^1.3.3",
"@waku/build-utils": "*",
"chai": "^4.3.10",
"chai-as-promised": "^7.1.1",
"cspell": "^7.3.2",
"mocha": "^10.2.0",
"node-localstorage": "^3.0.5",
"npm-run-all": "^4.1.5",
"rollup": "^4.9.5",
"sinon": "^17.0.1"
},
"files": [
"dist",
"bundle",
"src/**/*.ts",
"!**/*.spec.*",
"!**/*.json",
"CHANGELOG.md",
"LICENSE",
"README.md"
]
}

View File

@ -0,0 +1,24 @@
import commonjs from "@rollup/plugin-commonjs";
import json from "@rollup/plugin-json";
import { nodeResolve } from "@rollup/plugin-node-resolve";
import { extractExports } from "@waku/build-utils";
import * as packageJson from "./package.json" assert { type: "json" };
const input = extractExports(packageJson);
export default {
input,
output: {
dir: "bundle",
format: "esm"
},
plugins: [
commonjs(),
json(),
nodeResolve({
browser: true,
preferBuiltins: false
})
]
};

View File

@ -0,0 +1,160 @@
import { TypedEventEmitter } from "@libp2p/interface";
import {
CustomEvent,
IdentifyResult,
PeerDiscovery,
PeerDiscoveryEvents,
PeerInfo,
Startable
} from "@libp2p/interface";
import { createFromJSON } from "@libp2p/peer-id-factory";
import { multiaddr } from "@multiformats/multiaddr";
import {
type Libp2pComponents,
type LocalStoragePeerInfo,
Tags
} from "@waku/interfaces";
import { getWsMultiaddrFromMultiaddrs, Logger } from "@waku/utils";
const log = new Logger("peer-exchange-discovery");
type LocalStorageDiscoveryOptions = {
tagName?: string;
tagValue?: number;
tagTTL?: number;
};
export const DEFAULT_LOCAL_TAG_NAME = Tags.LOCAL;
const DEFAULT_LOCAL_TAG_VALUE = 50;
const DEFAULT_LOCAL_TAG_TTL = 100_000_000;
export class LocalStorageDiscovery
extends TypedEventEmitter<PeerDiscoveryEvents>
implements PeerDiscovery, Startable
{
private isStarted: boolean;
private peers: LocalStoragePeerInfo[] = [];
constructor(
private readonly components: Libp2pComponents,
private readonly options?: LocalStorageDiscoveryOptions
) {
super();
this.isStarted = false;
this.peers = this.getPeersFromLocalStorage();
}
get [Symbol.toStringTag](): string {
return "@waku/local-discovery";
}
async start(): Promise<void> {
if (this.isStarted) return;
log.info("Starting Local Storage Discovery");
this.components.events.addEventListener(
"peer:identify",
this.handleNewPeers
);
for (const { id: idStr, address } of this.peers) {
const peerId = await createFromJSON({ id: idStr });
if (await this.components.peerStore.has(peerId)) continue;
await this.components.peerStore.save(peerId, {
multiaddrs: [multiaddr(address)],
tags: {
[this.options?.tagName ?? DEFAULT_LOCAL_TAG_NAME]: {
value: this.options?.tagValue ?? DEFAULT_LOCAL_TAG_VALUE,
ttl: this.options?.tagTTL ?? DEFAULT_LOCAL_TAG_TTL
}
}
});
this.dispatchEvent(
new CustomEvent<PeerInfo>("peer", {
detail: {
id: peerId,
multiaddrs: [multiaddr(address)]
}
})
);
}
log.info(`Discovered ${this.peers.length} peers`);
this.isStarted = true;
}
stop(): void | Promise<void> {
if (!this.isStarted) return;
log.info("Stopping Local Storage Discovery");
this.components.events.removeEventListener(
"peer:identify",
this.handleNewPeers
);
this.isStarted = false;
this.savePeersToLocalStorage();
}
handleNewPeers = (event: CustomEvent<IdentifyResult>): void => {
const { peerId, listenAddrs } = event.detail;
const websocketMultiaddr = getWsMultiaddrFromMultiaddrs(listenAddrs);
const localStoragePeers = this.getPeersFromLocalStorage();
const existingPeerIndex = localStoragePeers.findIndex(
(_peer) => _peer.id === peerId.toString()
);
if (existingPeerIndex >= 0) {
localStoragePeers[existingPeerIndex].address =
websocketMultiaddr.toString();
} else {
localStoragePeers.push({
id: peerId.toString(),
address: websocketMultiaddr.toString()
});
}
this.peers = localStoragePeers;
this.savePeersToLocalStorage();
};
private getPeersFromLocalStorage(): LocalStoragePeerInfo[] {
try {
const storedPeersData = localStorage.getItem("waku:peers");
if (!storedPeersData) return [];
const peers = JSON.parse(storedPeersData);
return peers.filter(isValidStoredPeer);
} catch (error) {
log.error("Error parsing peers from local storage:", error);
return [];
}
}
private savePeersToLocalStorage(): void {
localStorage.setItem("waku:peers", JSON.stringify(this.peers));
}
}
function isValidStoredPeer(peer: any): peer is LocalStoragePeerInfo {
return (
peer &&
typeof peer === "object" &&
typeof peer.id === "string" &&
typeof peer.address === "string"
);
}
export function wakuLocalStorageDiscovery(): (
components: Libp2pComponents,
options?: LocalStorageDiscoveryOptions
) => LocalStorageDiscovery {
return (
components: Libp2pComponents,
options?: LocalStorageDiscoveryOptions
) => new LocalStorageDiscovery(components, options);
}

View File

@ -0,0 +1,147 @@
import type { IdentifyResult } from "@libp2p/interface";
import { TypedEventEmitter } from "@libp2p/interface";
import tests from "@libp2p/interface-compliance-tests/peer-discovery";
import { prefixLogger } from "@libp2p/logger";
import { createSecp256k1PeerId } from "@libp2p/peer-id-factory";
import { createFromJSON } from "@libp2p/peer-id-factory";
import { PersistentPeerStore } from "@libp2p/peer-store";
import { multiaddr } from "@multiformats/multiaddr";
import { Libp2pComponents } from "@waku/interfaces";
import { LocalStoragePeerInfo } from "@waku/interfaces";
import chai, { expect } from "chai";
import chaiAsPromised from "chai-as-promised";
import { MemoryDatastore } from "datastore-core/memory";
import { LocalStorage } from "node-localstorage";
import sinon from "sinon";
import { LocalStorageDiscovery } from "./index.js";
chai.use(chaiAsPromised);
global.localStorage = new LocalStorage("./mock_local_storage");
const mockPeers = [
{
id: "16Uiu2HAm4v86W3bmT1BiH6oSPzcsSr24iDQpSN5Qa992BCjjwgrD",
address:
"/ip4/127.0.0.1/tcp/8000/ws/p2p/16Uiu2HAm4v86W3bmT1BiH6oSPzcsSr24iDQpSN5Qa992BCjjwgrD"
},
{
id: "16Uiu2HAm4v86W3bmT1BiH6oSPzcsSr24iDQpSN5Qa992BCjjwgrE",
address:
"/ip4/127.0.0.1/tcp/8001/ws/p2p/16Uiu2HAm4v86W3bmT1BiH6oSPzcsSr24iDQpSN5Qa992BCjjwgrE"
}
];
async function setPeersInLocalStorage(
peers: LocalStoragePeerInfo[]
): Promise<void> {
localStorage.setItem("waku:peers", JSON.stringify(peers));
}
describe("Local Storage Discovery", function () {
this.timeout(25_000);
let components: Libp2pComponents;
beforeEach(async function () {
localStorage.clear();
components = {
peerStore: new PersistentPeerStore({
events: new TypedEventEmitter(),
peerId: await createSecp256k1PeerId(),
datastore: new MemoryDatastore(),
logger: prefixLogger("local_discovery.spec.ts")
}),
events: new TypedEventEmitter()
} as unknown as Libp2pComponents;
});
describe("Compliance Tests", function () {
beforeEach(async function () {
await setPeersInLocalStorage([mockPeers[0]]);
});
tests({
async setup() {
return new LocalStorageDiscovery(components);
},
async teardown() {}
});
});
describe("Unit Tests", function () {
let discovery: LocalStorageDiscovery;
beforeEach(async function () {
discovery = new LocalStorageDiscovery(components);
await setPeersInLocalStorage(mockPeers);
});
it("should load peers from local storage and dispatch events", async () => {
const dispatchEventSpy = sinon.spy(discovery, "dispatchEvent");
await discovery.start();
expect(dispatchEventSpy.calledWith(sinon.match.has("type", "peer"))).to.be
.true;
mockPeers.forEach((mockPeer) => {
expect(
dispatchEventSpy.calledWith(
sinon.match.hasNested("detail.id", mockPeer.id)
)
).to.be.true;
});
});
it("should update peers in local storage on 'peer:identify' event", async () => {
const newPeerIdentifyEvent = {
detail: {
peerId: await createFromJSON({
id: mockPeers[1].id
}),
listenAddrs: [multiaddr(mockPeers[1].address)]
}
} as CustomEvent<IdentifyResult>;
// Directly invoke handleNewPeers to simulate receiving an 'identify' event
discovery.handleNewPeers(newPeerIdentifyEvent);
const updatedPeers = JSON.parse(
localStorage.getItem("waku:peers") || "[]"
);
expect(updatedPeers).to.deep.include({
id: newPeerIdentifyEvent.detail.peerId.toString(),
address: newPeerIdentifyEvent.detail.listenAddrs[0].toString()
});
});
it("should handle corrupted local storage data gracefully", async () => {
localStorage.setItem("waku:peers", "not-a-valid-json");
try {
await discovery.start();
} catch (error) {
expect.fail(
"start() should not have thrown an error for corrupted local storage data"
);
}
});
it("should add and remove event listeners correctly", async () => {
const addEventListenerSpy = sinon.spy(
components.events,
"addEventListener"
);
const removeEventListenerSpy = sinon.spy(
components.events,
"removeEventListener"
);
await discovery.start();
expect(addEventListenerSpy.calledWith("peer:identify")).to.be.true;
await discovery.stop();
expect(removeEventListenerSpy.calledWith("peer:identify")).to.be.true;
});
});
});

View File

@ -0,0 +1,3 @@
{
"extends": "../../tsconfig.dev"
}

View File

@ -0,0 +1,10 @@
{
"extends": "../../tsconfig",
"compilerOptions": {
"outDir": "dist/",
"rootDir": "src",
"tsBuildInfoFile": "dist/.tsbuildinfo"
},
"include": ["src"],
"exclude": ["src/**/*.spec.ts", "src/test_utils"]
}

View File

@ -0,0 +1,4 @@
{
"extends": ["../../typedoc.base.json"],
"entryPoints": ["src/index.ts"]
}

View File

@ -69,6 +69,7 @@
"@waku/core": "0.0.26",
"@waku/dns-discovery": "0.0.20",
"@waku/interfaces": "0.0.21",
"@waku/local-discovery": "^0.0.1",
"@waku/peer-exchange": "^0.0.19",
"@waku/relay": "0.0.9",
"@waku/utils": "0.0.14",

View File

@ -28,6 +28,7 @@ import {
PubsubTopic,
type ShardInfo
} from "@waku/interfaces";
import { wakuLocalStorageDiscovery } from "@waku/local-discovery";
import { wakuPeerExchangeDiscovery } from "@waku/peer-exchange";
import { RelayCreateOptions, wakuGossipSub, wakuRelay } from "@waku/relay";
import { ensureShardingConfigured } from "@waku/utils";
@ -193,6 +194,7 @@ export function defaultPeerDiscoveries(
): ((components: Libp2pComponents) => PeerDiscovery)[] {
const discoveries = [
wakuDnsDiscovery([enrTree["PROD"]], DEFAULT_NODE_REQUIREMENTS),
wakuLocalStorageDiscovery(),
wakuPeerExchangeDiscovery(pubsubTopics)
];
return discoveries;

View File

@ -1,3 +1,4 @@
import type { Multiaddr } from "@multiformats/multiaddr";
export * from "./is_defined.js";
export * from "./random_subset.js";
export * from "./group_by.js";
@ -14,3 +15,15 @@ export function removeItemFromArray(arr: unknown[], value: unknown): unknown[] {
}
return arr;
}
export function getWsMultiaddrFromMultiaddrs(
addresses: Multiaddr[]
): Multiaddr {
const wsMultiaddr = addresses.find(
(addr) => addr.toString().includes("ws") || addr.toString().includes("wss")
);
if (!wsMultiaddr) {
throw new Error("No ws multiaddr found in the given addresses");
}
return wsMultiaddr;
}