chore(store)!: move protocol implementation opinions to `@waku/sdk` (#1913)

* refactor the Store protocol into Core and SDK, simplify `queryGenerator()`

* update imports & types

* chore: `@noble/hashes` moves to `sdk`

* chore: update tests

* chore: update size-limit import path

* fix: cursor tests, use `Cursor` type from `proto.Index` instead of redefining

* export wakuStore from sdk

* fix: imports

* chore: use specific version for package

* chore: handle error for peer access

* use type instead of interface

* rm: comment

* add TODO

* chore!: remove deprecated function definition

* chore: improve logging
This commit is contained in:
Danish Arora 2024-04-01 16:47:47 +05:30 committed by GitHub
parent 0a8382e7a2
commit bf42c8f53a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 469 additions and 392 deletions

View File

@ -54,7 +54,7 @@ module.exports = [
},
{
name: "History retrieval protocols",
path: "packages/core/bundle/index.js",
path: "packages/sdk/bundle/index.js",
import: "{ wakuStore }",
},
{

38
package-lock.json generated
View File

@ -6,10 +6,10 @@
"": {
"name": "@waku/root",
"workspaces": [
"packages/proto",
"packages/interfaces",
"packages/utils",
"packages/message-hash",
"packages/proto",
"packages/enr",
"packages/core",
"packages/relay",
@ -2709,14 +2709,14 @@
"license": "MIT"
},
"node_modules/@libp2p/bootstrap": {
"version": "10.0.11",
"resolved": "https://registry.npmjs.org/@libp2p/bootstrap/-/bootstrap-10.0.11.tgz",
"integrity": "sha512-uFqfMFtCDLIFUNOOvBFUzcSSkJx9y428jYzxpyLoWv0XH4pd3gaHcPgEvK9ZddhNysg1BDslivsFw6ZyE3Tvsg==",
"version": "10.0.16",
"resolved": "https://registry.npmjs.org/@libp2p/bootstrap/-/bootstrap-10.0.16.tgz",
"integrity": "sha512-ZFuq5OtQfdeZVjfWrJpW/OuPVOuAflu1nzq9g6/UiVfSvBaZtwe8hcMCQDXv21V8fCVsd703sblzkBwBYi17rQ==",
"dependencies": {
"@libp2p/interface": "^1.1.1",
"@libp2p/peer-id": "^4.0.4",
"@libp2p/interface": "^1.1.4",
"@libp2p/peer-id": "^4.0.7",
"@multiformats/mafmt": "^12.1.6",
"@multiformats/multiaddr": "^12.1.10"
"@multiformats/multiaddr": "^12.1.14"
}
},
"node_modules/@libp2p/crypto": {
@ -27834,7 +27834,6 @@
"license": "MIT OR Apache-2.0",
"dependencies": {
"@libp2p/ping": "^1.0.12",
"@noble/hashes": "^1.3.2",
"@waku/enr": "^0.0.21",
"@waku/interfaces": "0.0.22",
"@waku/message-hash": "^0.1.11",
@ -28038,6 +28037,9 @@
"name": "@waku/interfaces",
"version": "0.0.22",
"license": "MIT OR Apache-2.0",
"dependencies": {
"@waku/proto": "^0.0.6"
},
"devDependencies": {
"@chainsafe/libp2p-gossipsub": "^12.0.0",
"@multiformats/multiaddr": "^12.0.0",
@ -28345,11 +28347,12 @@
"license": "MIT OR Apache-2.0",
"dependencies": {
"@chainsafe/libp2p-noise": "^14.1.0",
"@libp2p/bootstrap": "^10.0.11",
"@libp2p/bootstrap": "^10.0.16",
"@libp2p/identify": "^1.0.11",
"@libp2p/mplex": "^10.0.12",
"@libp2p/ping": "^1.0.12",
"@libp2p/websockets": "^8.0.11",
"@noble/hashes": "^1.3.3",
"@waku/core": "0.0.27",
"@waku/discovery": "0.0.1",
"@waku/dns-discovery": "0.0.21",
@ -30306,14 +30309,14 @@
"version": "2.0.4"
},
"@libp2p/bootstrap": {
"version": "10.0.11",
"resolved": "https://registry.npmjs.org/@libp2p/bootstrap/-/bootstrap-10.0.11.tgz",
"integrity": "sha512-uFqfMFtCDLIFUNOOvBFUzcSSkJx9y428jYzxpyLoWv0XH4pd3gaHcPgEvK9ZddhNysg1BDslivsFw6ZyE3Tvsg==",
"version": "10.0.16",
"resolved": "https://registry.npmjs.org/@libp2p/bootstrap/-/bootstrap-10.0.16.tgz",
"integrity": "sha512-ZFuq5OtQfdeZVjfWrJpW/OuPVOuAflu1nzq9g6/UiVfSvBaZtwe8hcMCQDXv21V8fCVsd703sblzkBwBYi17rQ==",
"requires": {
"@libp2p/interface": "^1.1.1",
"@libp2p/peer-id": "^4.0.4",
"@libp2p/interface": "^1.1.4",
"@libp2p/peer-id": "^4.0.7",
"@multiformats/mafmt": "^12.1.6",
"@multiformats/multiaddr": "^12.1.10"
"@multiformats/multiaddr": "^12.1.14"
}
},
"@libp2p/crypto": {
@ -32245,7 +32248,6 @@
"requires": {
"@libp2p/ping": "^1.0.12",
"@multiformats/multiaddr": "^12.0.0",
"@noble/hashes": "^1.3.2",
"@rollup/plugin-commonjs": "^25.0.7",
"@rollup/plugin-json": "^6.0.0",
"@rollup/plugin-node-resolve": "^15.2.3",
@ -32394,6 +32396,7 @@
"requires": {
"@chainsafe/libp2p-gossipsub": "^12.0.0",
"@multiformats/multiaddr": "^12.0.0",
"@waku/proto": "^0.0.6",
"cspell": "^8.6.0",
"libp2p": "^1.1.2",
"npm-run-all": "^4.1.5"
@ -32515,11 +32518,12 @@
"requires": {
"@chainsafe/libp2p-gossipsub": "^12.0.0",
"@chainsafe/libp2p-noise": "^14.1.0",
"@libp2p/bootstrap": "^10.0.11",
"@libp2p/bootstrap": "^10.0.16",
"@libp2p/identify": "^1.0.11",
"@libp2p/mplex": "^10.0.12",
"@libp2p/ping": "^1.0.12",
"@libp2p/websockets": "^8.0.11",
"@noble/hashes": "^1.3.3",
"@rollup/plugin-commonjs": "^25.0.7",
"@rollup/plugin-json": "^6.0.0",
"@rollup/plugin-node-resolve": "^15.2.3",

View File

@ -3,10 +3,10 @@
"private": true,
"type": "module",
"workspaces": [
"packages/proto",
"packages/interfaces",
"packages/utils",
"packages/message-hash",
"packages/proto",
"packages/enr",
"packages/core",
"packages/relay",

View File

@ -73,7 +73,6 @@
},
"dependencies": {
"@libp2p/ping": "^1.0.12",
"@noble/hashes": "^1.3.2",
"@waku/enr": "^0.0.21",
"@waku/interfaces": "0.0.22",
"@waku/message-hash": "^0.1.11",

View File

@ -13,8 +13,9 @@ export * as waku_light_push from "./lib/light_push/index.js";
export { LightPushCodec, LightPushCore } from "./lib/light_push/index.js";
export * as waku_store from "./lib/store/index.js";
export { StoreCore } from "./lib/store/index.js";
export { PageDirection, wakuStore, createCursor } from "./lib/store/index.js";
export { PageDirection } from "./lib/store/index.js";
export { waitForRemotePeer } from "./lib/wait_for_remote_peer.js";

View File

@ -1,17 +1,14 @@
import type { Stream } from "@libp2p/interface";
import { sha256 } from "@noble/hashes/sha256";
import type { Peer } from "@libp2p/interface";
import {
Cursor,
IDecodedMessage,
IDecoder,
IStore,
IStoreCore,
Libp2p,
ProtocolCreateOptions
} from "@waku/interfaces";
import { proto_store as proto } from "@waku/proto";
import { ensurePubsubTopicIsConfigured, isDefined } from "@waku/utils";
import { Logger } from "@waku/utils";
import { concat, utf8ToBytes } from "@waku/utils/bytes";
import all from "it-all";
import * as lp from "it-length-prefixed";
import { pipe } from "it-pipe";
@ -28,9 +25,7 @@ const log = new Logger("store");
export const StoreCodec = "/vac/waku/store/2.0.0-beta4";
export const DefaultPageSize = 10;
export { PageDirection };
export { PageDirection, Params };
export interface TimeFilter {
startTime: Date;
@ -72,361 +67,104 @@ export interface QueryOptions {
*
* The Waku Store protocol can be used to retrieved historical messages.
*/
class Store extends BaseProtocol implements IStore {
private readonly NUM_PEERS_PROTOCOL = 1;
export class StoreCore extends BaseProtocol implements IStoreCore {
constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
super(StoreCodec, libp2p.components, log, options!.pubsubTopics!, options);
}
/**
* Processes messages based on the provided callback and options.
* @private
*/
private async processMessages<T extends IDecodedMessage>(
messages: Promise<T | undefined>[],
callback: (message: T) => Promise<void | boolean> | boolean | void,
options?: QueryOptions
): Promise<boolean> {
let abort = false;
const messagesOrUndef: Array<T | undefined> = await Promise.all(messages);
let processedMessages: Array<T> = messagesOrUndef.filter(isDefined);
if (this.shouldReverseOrder(options)) {
processedMessages = processedMessages.reverse();
async *queryPerPage<T extends IDecodedMessage>(
queryOpts: Params,
decoders: Map<string, IDecoder<T>>,
peer: Peer
): AsyncGenerator<Promise<T | undefined>[]> {
if (
queryOpts.contentTopics.toString() !==
Array.from(decoders.keys()).toString()
) {
throw new Error(
"Internal error, the decoders should match the query's content topics"
);
}
await Promise.all(
processedMessages.map(async (msg) => {
if (msg && !abort) {
abort = Boolean(await callback(msg));
}
})
);
let currentCursor = queryOpts.cursor;
while (true) {
queryOpts.cursor = currentCursor;
return abort;
}
const historyRpcQuery = HistoryRpc.createQuery(queryOpts);
/**
* Determines whether to reverse the order of messages based on the provided options.
*
* Messages in pages are ordered from oldest (first) to most recent (last).
* https://github.com/vacp2p/rfc/issues/533
*
* @private
*/
private shouldReverseOrder(options?: QueryOptions): boolean {
return (
typeof options?.pageDirection === "undefined" ||
options?.pageDirection === PageDirection.BACKWARD
);
}
const stream = await this.getStream(peer);
/**
* @deprecated Use `queryWithOrderedCallback` instead
**/
queryOrderedCallback = this.queryWithOrderedCallback;
const res = await pipe(
[historyRpcQuery.encode()],
lp.encode,
stream,
lp.decode,
async (source) => await all(source)
);
/**
* Do a query to a Waku Store to retrieve historical/missed messages.
*
* The callback function takes a `WakuMessage` in input,
* messages are processed in order:
* - oldest to latest if `options.pageDirection` == { @link PageDirection.FORWARD }
* - latest to oldest if `options.pageDirection` == { @link PageDirection.BACKWARD }
*
* The ordering may affect performance.
* The ordering depends on the behavior of the remote store node.
* If strong ordering is needed, you may need to handle this at application level
* and set your own timestamps too (the WakuMessage timestamps are not certified).
*
* @throws If not able to reach a Waku Store peer to query,
* or if an error is encountered when processing the reply,
* or if two decoders with the same content topic are passed.
*/
async queryWithOrderedCallback<T extends IDecodedMessage>(
decoders: IDecoder<T>[],
callback: (message: T) => Promise<void | boolean> | boolean | void,
options?: QueryOptions
): Promise<void> {
for await (const promises of this.queryGenerator(decoders, options)) {
if (await this.processMessages(promises, callback, options)) break;
}
}
/**
* Do a query to a Waku Store to retrieve historical/missed messages.
* The callback function takes a `Promise<WakuMessage>` in input,
* useful if messages need to be decrypted and performance matters.
*
* The order of the messages passed to the callback is as follows:
* - within a page, messages are expected to be ordered from oldest to most recent
* - pages direction depends on { @link QueryOptions.pageDirection }
*
* Do note that the resolution of the `Promise<WakuMessage | undefined` may
* break the order as it may rely on the browser decryption API, which in turn,
* may have a different speed depending on the type of decryption.
*
* @throws If not able to reach a Waku Store peer to query,
* or if an error is encountered when processing the reply,
* or if two decoders with the same content topic are passed.
*/
async queryWithPromiseCallback<T extends IDecodedMessage>(
decoders: IDecoder<T>[],
callback: (
message: Promise<T | undefined>
) => Promise<void | boolean> | boolean | void,
options?: QueryOptions
): Promise<void> {
let abort = false;
for await (const page of this.queryGenerator(decoders, options)) {
const _promises = page.map(async (msgPromise) => {
if (abort) return;
abort = Boolean(await callback(msgPromise));
const bytes = new Uint8ArrayList();
res.forEach((chunk) => {
bytes.append(chunk);
});
await Promise.all(_promises);
if (abort) break;
}
}
const reply = historyRpcQuery.decode(bytes);
/**
* Do a query to a Waku Store to retrieve historical/missed messages.
*
* This is a generator, useful if you want most control on how messages
* are processed.
*
* The order of the messages returned by the remote Waku node SHOULD BE
* as follows:
* - within a page, messages SHOULD be ordered from oldest to most recent
* - pages direction depends on { @link QueryOptions.pageDirection }
* @throws If not able to reach a Waku Store peer to query,
* or if an error is encountered when processing the reply,
* or if two decoders with the same content topic are passed.
*
* This API only supports querying a single pubsub topic at a time.
* If multiple decoders are provided, they must all have the same pubsub topic.
* @throws If multiple decoders with different pubsub topics are provided.
* @throws If no decoders are provided.
* @throws If no decoders are found for the provided pubsub topic.
*/
async *queryGenerator<T extends IDecodedMessage>(
decoders: IDecoder<T>[],
options?: QueryOptions
): AsyncGenerator<Promise<T | undefined>[]> {
if (decoders.length === 0) {
throw new Error("No decoders provided");
}
if (!reply.response) {
log.warn("Stopping pagination due to store `response` field missing");
break;
}
let startTime, endTime;
const response = reply.response as proto.HistoryResponse;
if (options?.timeFilter) {
startTime = options.timeFilter.startTime;
endTime = options.timeFilter.endTime;
}
if (response.error && response.error !== HistoryError.NONE) {
throw "History response contains an Error: " + response.error;
}
// convert array to set to remove duplicates
const uniquePubsubTopicsInQuery = Array.from(
new Set(decoders.map((decoder) => decoder.pubsubTopic))
);
// If multiple pubsub topics are provided, throw an error
if (uniquePubsubTopicsInQuery.length > 1) {
throw new Error(
"API does not support querying multiple pubsub topics at once"
);
}
// we can be certain that there is only one pubsub topic in the query
const pubsubTopicForQuery = uniquePubsubTopicsInQuery[0];
ensurePubsubTopicIsConfigured(pubsubTopicForQuery, this.pubsubTopics);
// check that the pubsubTopic from the Cursor and Decoder match
if (
options?.cursor?.pubsubTopic &&
options.cursor.pubsubTopic !== pubsubTopicForQuery
) {
throw new Error(
`Cursor pubsub topic (${options?.cursor?.pubsubTopic}) does not match decoder pubsub topic (${pubsubTopicForQuery})`
);
}
const decodersAsMap = new Map();
decoders.forEach((dec) => {
if (decodersAsMap.has(dec.contentTopic)) {
throw new Error(
"API does not support different decoder per content topic"
if (!response.messages || !response.messages.length) {
log.warn(
"Stopping pagination due to store `response.messages` field missing or empty"
);
break;
}
decodersAsMap.set(dec.contentTopic, dec);
});
const contentTopics = decoders
.filter((decoder) => decoder.pubsubTopic === pubsubTopicForQuery)
.map((dec) => dec.contentTopic);
log.error(`${response.messages.length} messages retrieved from store`);
if (contentTopics.length === 0) {
throw new Error("No decoders found for topic " + pubsubTopicForQuery);
}
const queryOpts = Object.assign(
{
pubsubTopic: pubsubTopicForQuery,
pageDirection: PageDirection.BACKWARD,
pageSize: DefaultPageSize
},
options,
{ contentTopics, startTime, endTime }
);
const peer = (
await this.getPeers({
numPeers: this.NUM_PEERS_PROTOCOL,
maxBootstrapPeers: 1
})
)[0];
for await (const messages of paginate<T>(
this.getStream.bind(this, peer),
queryOpts,
decodersAsMap,
options?.cursor
)) {
yield messages;
}
}
}
async function* paginate<T extends IDecodedMessage>(
streamFactory: () => Promise<Stream>,
queryOpts: Params,
decoders: Map<string, IDecoder<T>>,
cursor?: Cursor
): AsyncGenerator<Promise<T | undefined>[]> {
if (
queryOpts.contentTopics.toString() !==
Array.from(decoders.keys()).toString()
) {
throw new Error(
"Internal error, the decoders should match the query's content topics"
);
}
let currentCursor = cursor;
while (true) {
queryOpts.cursor = currentCursor;
const historyRpcQuery = HistoryRpc.createQuery(queryOpts);
log.info(
"Querying store peer",
`for (${queryOpts.pubsubTopic})`,
queryOpts.contentTopics
);
const stream = await streamFactory();
const res = await pipe(
[historyRpcQuery.encode()],
lp.encode,
stream,
lp.decode,
async (source) => await all(source)
);
const bytes = new Uint8ArrayList();
res.forEach((chunk) => {
bytes.append(chunk);
});
const reply = historyRpcQuery.decode(bytes);
if (!reply.response) {
log.warn("Stopping pagination due to store `response` field missing");
break;
}
const response = reply.response as proto.HistoryResponse;
if (response.error && response.error !== HistoryError.NONE) {
throw "History response contains an Error: " + response.error;
}
if (!response.messages || !response.messages.length) {
log.warn(
"Stopping pagination due to store `response.messages` field missing or empty"
);
break;
}
log.error(`${response.messages.length} messages retrieved from store`);
yield response.messages.map((protoMsg) => {
const contentTopic = protoMsg.contentTopic;
if (typeof contentTopic !== "undefined") {
const decoder = decoders.get(contentTopic);
if (decoder) {
return decoder.fromProtoObj(
queryOpts.pubsubTopic,
toProtoMessage(protoMsg)
);
yield response.messages.map((protoMsg) => {
const contentTopic = protoMsg.contentTopic;
if (typeof contentTopic !== "undefined") {
const decoder = decoders.get(contentTopic);
if (decoder) {
return decoder.fromProtoObj(
queryOpts.pubsubTopic,
toProtoMessage(protoMsg)
);
}
}
return Promise.resolve(undefined);
});
const nextCursor = response.pagingInfo?.cursor;
if (typeof nextCursor === "undefined") {
// If the server does not return cursor then there is an issue,
// Need to abort, or we end up in an infinite loop
log.warn(
"Stopping pagination due to `response.pagingInfo.cursor` missing from store response"
);
break;
}
return Promise.resolve(undefined);
});
const nextCursor = response.pagingInfo?.cursor;
if (typeof nextCursor === "undefined") {
// If the server does not return cursor then there is an issue,
// Need to abort, or we end up in an infinite loop
log.warn(
"Stopping pagination due to `response.pagingInfo.cursor` missing from store response"
);
break;
}
currentCursor = nextCursor;
currentCursor = nextCursor;
const responsePageSize = response.pagingInfo?.pageSize;
const queryPageSize = historyRpcQuery.query?.pagingInfo?.pageSize;
if (
// Response page size smaller than query, meaning this is the last page
responsePageSize &&
queryPageSize &&
responsePageSize < queryPageSize
) {
break;
const responsePageSize = response.pagingInfo?.pageSize;
const queryPageSize = historyRpcQuery.query?.pagingInfo?.pageSize;
if (
// Response page size smaller than query, meaning this is the last page
responsePageSize &&
queryPageSize &&
responsePageSize < queryPageSize
) {
break;
}
}
}
}
export async function createCursor(message: IDecodedMessage): Promise<Cursor> {
if (
!message ||
!message.timestamp ||
!message.payload ||
!message.contentTopic
) {
throw new Error("Message is missing required fields");
}
const contentTopicBytes = utf8ToBytes(message.contentTopic);
const digest = sha256(concat([contentTopicBytes, message.payload]));
const messageTime = BigInt(message.timestamp.getTime()) * BigInt(1000000);
return {
digest,
pubsubTopic: message.pubsubTopic,
senderTime: messageTime,
receiverTime: messageTime
};
}
export function wakuStore(
init: Partial<ProtocolCreateOptions> = {}
): (libp2p: Libp2p) => IStore {
return (libp2p: Libp2p) => new Store(libp2p, init);
}

View File

@ -51,7 +51,7 @@ export async function waitForRemotePeer(
if (!waku.store)
throw new Error("Cannot wait for Store peer: protocol not mounted");
promises.push(
waitForConnectedPeer(waku.store, waku.libp2p.services.metadata)
waitForConnectedPeer(waku.store.protocol, waku.libp2p.services.metadata)
);
}

View File

@ -62,5 +62,8 @@
"CHANGELOG.md",
"LICENSE",
"README.md"
]
],
"dependencies": {
"@waku/proto": "^0.0.6"
}
}

View File

@ -1,5 +1,7 @@
import { proto_store as proto } from "@waku/proto";
import type { IDecodedMessage, IDecoder } from "./message.js";
import type { IBaseProtocolCore } from "./protocols.js";
import type { IBaseProtocolCore, IBaseProtocolSDK } from "./protocols.js";
export enum PageDirection {
BACKWARD = "backward",
@ -42,10 +44,19 @@ export type StoreQueryOptions = {
* Cursor as an index to start a query from. Must be generated from a Waku
* Message.
*/
cursor?: Cursor;
cursor?: proto.Index;
};
export interface IStore extends IBaseProtocolCore {
export type IStoreCore = IBaseProtocolCore;
export type IStoreSDK = IBaseProtocolSDK & {
protocol: IBaseProtocolCore;
createCursor(message: IDecodedMessage): Cursor;
queryGenerator: <T extends IDecodedMessage>(
decoders: IDecoder<T>[],
options?: StoreQueryOptions
) => AsyncGenerator<Promise<T | undefined>[]>;
queryWithOrderedCallback: <T extends IDecodedMessage>(
decoders: IDecoder<T>[],
callback: (message: T) => Promise<void | boolean> | boolean | void,
@ -58,8 +69,4 @@ export interface IStore extends IBaseProtocolCore {
) => Promise<void | boolean> | boolean | void,
options?: StoreQueryOptions
) => Promise<void>;
queryGenerator: <T extends IDecodedMessage>(
decoders: IDecoder<T>[],
options?: StoreQueryOptions
) => AsyncGenerator<Promise<T | undefined>[]>;
}
};

View File

@ -7,12 +7,12 @@ import type { Libp2p } from "./libp2p.js";
import type { ILightPushSDK } from "./light_push.js";
import { Protocols } from "./protocols.js";
import type { IRelay } from "./relay.js";
import type { IStore } from "./store.js";
import type { IStoreSDK } from "./store.js";
export interface Waku {
libp2p: Libp2p;
relay?: IRelay;
store?: IStore;
store?: IStoreSDK;
filter?: IFilter;
lightPush?: ILightPushSDK;
@ -31,7 +31,7 @@ export interface Waku {
export interface LightNode extends Waku {
relay: undefined;
store: IStore;
store: IStoreSDK;
filter: IFilter;
lightPush: ILightPushSDK;
}
@ -45,7 +45,7 @@ export interface RelayNode extends Waku {
export interface FullNode extends Waku {
relay: IRelay;
store: IStore;
store: IStoreSDK;
filter: IFilter;
lightPush: ILightPushSDK;
}

View File

@ -62,20 +62,21 @@
},
"dependencies": {
"@chainsafe/libp2p-noise": "^14.1.0",
"@libp2p/bootstrap": "^10.0.16",
"@libp2p/identify": "^1.0.11",
"@libp2p/mplex": "^10.0.12",
"@libp2p/ping": "^1.0.12",
"@libp2p/websockets": "^8.0.11",
"@waku/discovery": "0.0.1",
"@noble/hashes": "^1.3.3",
"@waku/core": "0.0.27",
"@waku/discovery": "0.0.1",
"@waku/dns-discovery": "0.0.21",
"@waku/interfaces": "0.0.22",
"@waku/local-peer-cache-discovery": "^1.0.0",
"@waku/peer-exchange": "^0.0.20",
"@waku/relay": "0.0.10",
"@waku/utils": "0.0.15",
"libp2p": "^1.1.2",
"@libp2p/bootstrap": "^10.0.11"
"libp2p": "^1.1.2"
},
"devDependencies": {
"@chainsafe/libp2p-gossipsub": "^12.0.0",

View File

@ -13,6 +13,7 @@ export * from "./waku.js";
export { createLightNode, createNode } from "./light-node/index.js";
export { wakuLightPush } from "./protocols/light_push.js";
export { wakuStore } from "./protocols/store.js";
export * as waku from "@waku/core";
export * as utils from "@waku/utils";

View File

@ -1,7 +1,8 @@
import { wakuFilter, wakuStore } from "@waku/core";
import { wakuFilter } from "@waku/core";
import { type Libp2pComponents, type LightNode } from "@waku/interfaces";
import { wakuLightPush } from "../protocols/light_push.js";
import { wakuStore } from "../protocols/store.js";
import { createLibp2pAndUpdateOptions } from "../utils/libp2p.js";
import { CreateWakuNodeOptions, WakuNode, WakuOptions } from "../waku.js";

View File

@ -0,0 +1,321 @@
import { sha256 } from "@noble/hashes/sha256";
import { StoreCore, waku_store } from "@waku/core";
import {
Cursor,
IDecodedMessage,
IDecoder,
IStoreSDK,
type Libp2p,
PageDirection,
type ProtocolCreateOptions
} from "@waku/interfaces";
import { ensurePubsubTopicIsConfigured, isDefined, Logger } from "@waku/utils";
import { concat } from "@waku/utils/bytes";
import { utf8ToBytes } from "../index.js";
import { BaseProtocolSDK } from "./base_protocol.js";
export const DefaultPageSize = 10;
const DEFAULT_NUM_PEERS = 1;
const log = new Logger("waku:store:protocol");
export class StoreSDK extends BaseProtocolSDK implements IStoreSDK {
public readonly protocol: StoreCore;
constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
// TODO: options.numPeersToUse is disregarded: https://github.com/waku-org/js-waku/issues/1685
super({ numPeersToUse: DEFAULT_NUM_PEERS });
this.protocol = new StoreCore(libp2p, options);
}
/**
* Do a query to a Waku Store to retrieve historical/missed messages.
*
* This is a generator, useful if you want most control on how messages
* are processed.
*
* The order of the messages returned by the remote Waku node SHOULD BE
* as follows:
* - within a page, messages SHOULD be ordered from oldest to most recent
* - pages direction depends on { @link QueryOptions.pageDirection }
* @throws If not able to reach a Waku Store peer to query,
* or if an error is encountered when processing the reply,
* or if two decoders with the same content topic are passed.
*
* This API only supports querying a single pubsub topic at a time.
* If multiple decoders are provided, they must all have the same pubsub topic.
* @throws If multiple decoders with different pubsub topics are provided.
* @throws If no decoders are provided.
* @throws If no decoders are found for the provided pubsub topic.
*/
async *queryGenerator<T extends IDecodedMessage>(
decoders: IDecoder<T>[],
options?: waku_store.QueryOptions
): AsyncGenerator<Promise<T | undefined>[]> {
const { pubsubTopic, contentTopics, decodersAsMap } =
this.validateDecodersAndPubsubTopic(decoders, options);
const queryOpts = this.constructOptions(
pubsubTopic,
contentTopics,
options
);
const peer = (
await this.protocol.getPeers({
numPeers: this.numPeers,
maxBootstrapPeers: 1
})
)[0];
if (!peer) throw new Error("No peers available to query");
const responseGenerator = this.protocol.queryPerPage(
queryOpts,
decodersAsMap,
peer
);
for await (const messages of responseGenerator) {
yield messages;
}
}
/**
* Do a query to a Waku Store to retrieve historical/missed messages.
*
* The callback function takes a `WakuMessage` in input,
* messages are processed in order:
* - oldest to latest if `options.pageDirection` == { @link PageDirection.FORWARD }
* - latest to oldest if `options.pageDirection` == { @link PageDirection.BACKWARD }
*
* The ordering may affect performance.
* The ordering depends on the behavior of the remote store node.
* If strong ordering is needed, you may need to handle this at application level
* and set your own timestamps too (the WakuMessage timestamps are not certified).
*
* @throws If not able to reach a Waku Store peer to query,
* or if an error is encountered when processing the reply,
* or if two decoders with the same content topic are passed.
*/
async queryWithOrderedCallback<T extends IDecodedMessage>(
decoders: IDecoder<T>[],
callback: (message: T) => Promise<void | boolean> | boolean | void,
options?: waku_store.QueryOptions
): Promise<void> {
for await (const promises of this.queryGenerator(decoders, options)) {
if (await this.processMessages(promises, callback, options)) break;
}
}
/**
* Do a query to a Waku Store to retrieve historical/missed messages.
* The callback function takes a `Promise<WakuMessage>` in input,
* useful if messages need to be decrypted and performance matters.
*
* The order of the messages passed to the callback is as follows:
* - within a page, messages are expected to be ordered from oldest to most recent
* - pages direction depends on { @link QueryOptions.pageDirection }
*
* Do note that the resolution of the `Promise<WakuMessage | undefined` may
* break the order as it may rely on the browser decryption API, which in turn,
* may have a different speed depending on the type of decryption.
*
* @throws If not able to reach a Waku Store peer to query,
* or if an error is encountered when processing the reply,
* or if two decoders with the same content topic are passed.
*/
async queryWithPromiseCallback<T extends IDecodedMessage>(
decoders: IDecoder<T>[],
callback: (
message: Promise<T | undefined>
) => Promise<void | boolean> | boolean | void,
options?: waku_store.QueryOptions
): Promise<void> {
let abort = false;
for await (const page of this.queryGenerator(decoders, options)) {
const _promises = page.map(async (msgPromise) => {
if (abort) return;
abort = Boolean(await callback(msgPromise));
});
await Promise.all(_promises);
if (abort) break;
}
}
createCursor(message: IDecodedMessage): Cursor {
if (
!message ||
!message.timestamp ||
!message.payload ||
!message.contentTopic
) {
throw new Error("Message is missing required fields");
}
const contentTopicBytes = utf8ToBytes(message.contentTopic);
const digest = sha256(concat([contentTopicBytes, message.payload]));
const messageTime = BigInt(message.timestamp.getTime()) * BigInt(1000000);
return {
digest,
pubsubTopic: message.pubsubTopic,
senderTime: messageTime,
receiverTime: messageTime
};
}
private validateDecodersAndPubsubTopic<T extends IDecodedMessage>(
decoders: IDecoder<T>[],
options?: waku_store.QueryOptions
): {
pubsubTopic: string;
contentTopics: string[];
decodersAsMap: Map<string, IDecoder<T>>;
} {
if (decoders.length === 0) {
throw new Error("No decoders provided");
}
// convert array to set to remove duplicates
const uniquePubsubTopicsInQuery = Array.from(
new Set(decoders.map((decoder) => decoder.pubsubTopic))
);
// If multiple pubsub topics are provided, throw an error
if (uniquePubsubTopicsInQuery.length > 1) {
throw new Error(
"API does not support querying multiple pubsub topics at once"
);
}
// we can be certain that there is only one pubsub topic in the query
const pubsubTopicForQuery = uniquePubsubTopicsInQuery[0];
ensurePubsubTopicIsConfigured(
pubsubTopicForQuery,
this.protocol.pubsubTopics
);
// check that the pubsubTopic from the Cursor and Decoder match
if (
options?.cursor?.pubsubTopic &&
options.cursor.pubsubTopic !== pubsubTopicForQuery
) {
throw new Error(
`Cursor pubsub topic (${options?.cursor?.pubsubTopic}) does not match decoder pubsub topic (${pubsubTopicForQuery})`
);
}
const decodersAsMap = new Map();
decoders.forEach((dec) => {
if (decodersAsMap.has(dec.contentTopic)) {
throw new Error(
"API does not support different decoder per content topic"
);
}
decodersAsMap.set(dec.contentTopic, dec);
});
const contentTopics = decoders
.filter((decoder) => decoder.pubsubTopic === pubsubTopicForQuery)
.map((dec) => dec.contentTopic);
if (contentTopics.length === 0) {
throw new Error("No decoders found for topic " + pubsubTopicForQuery);
}
return {
pubsubTopic: pubsubTopicForQuery,
contentTopics,
decodersAsMap
};
}
private constructOptions(
pubsubTopic: string,
contentTopics: string[],
options: waku_store.QueryOptions = {}
): waku_store.Params {
let startTime, endTime;
if (options?.timeFilter) {
startTime = options.timeFilter.startTime;
endTime = options.timeFilter.endTime;
}
if (!startTime) {
log.warn("No start time provided");
}
if (!endTime) {
log.warn("No end time provided");
}
const queryOpts = Object.assign(
{
pubsubTopic: pubsubTopic,
pageDirection: PageDirection.BACKWARD,
pageSize: DefaultPageSize
},
options,
{ contentTopics, startTime, endTime }
);
return queryOpts;
}
/**
* Processes messages based on the provided callback and options.
* @private
*/
private async processMessages<T extends IDecodedMessage>(
messages: Promise<T | undefined>[],
callback: (message: T) => Promise<void | boolean> | boolean | void,
options?: waku_store.QueryOptions
): Promise<boolean> {
let abort = false;
const messagesOrUndef: Array<T | undefined> = await Promise.all(messages);
let processedMessages: Array<T> = messagesOrUndef.filter(isDefined);
if (this.shouldReverseOrder(options)) {
processedMessages = processedMessages.reverse();
}
await Promise.all(
processedMessages.map(async (msg) => {
if (msg && !abort) {
abort = Boolean(await callback(msg));
}
})
);
return abort;
}
/**
* Determines whether to reverse the order of messages based on the provided options.
*
* Messages in pages are ordered from oldest (first) to most recent (last).
* https://github.com/vacp2p/rfc/issues/533
*
* @private
*/
private shouldReverseOrder(options?: waku_store.QueryOptions): boolean {
return (
typeof options?.pageDirection === "undefined" ||
options?.pageDirection === PageDirection.BACKWARD
);
}
}
export function wakuStore(
init: Partial<ProtocolCreateOptions> = {}
): (libp2p: Libp2p) => IStoreSDK {
return (libp2p: Libp2p) => new StoreSDK(libp2p, init);
}

View File

@ -1,8 +1,9 @@
import { wakuFilter, wakuStore } from "@waku/core";
import { wakuFilter } from "@waku/core";
import { type FullNode, type RelayNode } from "@waku/interfaces";
import { RelayCreateOptions, wakuRelay } from "@waku/relay";
import { wakuLightPush } from "../protocols/light_push.js";
import { wakuStore } from "../protocols/store.js";
import { createLibp2pAndUpdateOptions } from "../utils/libp2p.js";
import { CreateWakuNodeOptions, WakuNode, WakuOptions } from "../waku.js";

View File

@ -8,7 +8,7 @@ import type {
IFilterSubscription,
ILightPushSDK,
IRelay,
IStore,
IStoreSDK,
Libp2p,
LightNode,
ProtocolCreateOptions,
@ -55,7 +55,7 @@ export type CreateWakuNodeOptions = ProtocolCreateOptions &
export class WakuNode implements Waku {
public libp2p: Libp2p;
public relay?: IRelay;
public store?: IStore;
public store?: IStoreSDK;
public filter?: IFilter;
public lightPush?: ILightPushSDK;
public connectionManager: ConnectionManager;
@ -64,7 +64,7 @@ export class WakuNode implements Waku {
constructor(
options: WakuOptions,
libp2p: Libp2p,
store?: (libp2p: Libp2p) => IStore,
store?: (libp2p: Libp2p) => IStoreSDK,
lightPush?: (libp2p: Libp2p) => ILightPushSDK,
filter?: (libp2p: Libp2p) => IFilter,
relay?: (libp2p: Libp2p) => IRelay
@ -148,7 +148,7 @@ export class WakuNode implements Waku {
}
if (_protocols.includes(Protocols.Store)) {
if (this.store) {
codecs.push(this.store.multicodec);
codecs.push(this.store.protocol.multicodec);
} else {
log.error(
"Store codec not included in dial codec: protocol not mounted locally"

View File

@ -1,4 +1,4 @@
import { createCursor, DecodedMessage } from "@waku/core";
import { DecodedMessage } from "@waku/core";
import type { LightNode } from "@waku/interfaces";
import { DefaultPubsubTopic } from "@waku/interfaces";
import { bytesToUtf8 } from "@waku/utils/bytes";
@ -63,7 +63,7 @@ describe("Waku Store, cursor", function () {
}
// create cursor to extract messages after the cursorIndex
const cursor = await createCursor(messages[cursorIndex]);
const cursor = waku.store.createCursor(messages[cursorIndex]);
const messagesAfterCursor: DecodedMessage[] = [];
for await (const page of waku.store.queryGenerator([TestDecoder], {
@ -108,7 +108,7 @@ describe("Waku Store, cursor", function () {
}
// create cursor to extract messages after the cursorIndex
const cursor = await createCursor(messages[5]);
const cursor = waku.store.createCursor(messages[5]);
// query node2 with the cursor from node1
const messagesAfterCursor: DecodedMessage[] = [];
@ -142,7 +142,7 @@ describe("Waku Store, cursor", function () {
messages.push(msg as DecodedMessage);
}
}
const cursor = await createCursor(messages[5]);
const cursor = waku.store.createCursor(messages[5]);
// setting a wrong digest
cursor.digest = new Uint8Array([]);
@ -185,7 +185,7 @@ describe("Waku Store, cursor", function () {
}
}
messages[5].pubsubTopic = customShardedPubsubTopic1;
const cursor = await createCursor(messages[5]);
const cursor = waku.store.createCursor(messages[5]);
try {
for await (const page of waku.store.queryGenerator([TestDecoder], {

View File

@ -115,7 +115,7 @@ describe("Wait for remote peer", function () {
await delay(1000);
await waitForRemotePeer(waku2, [Protocols.Store]);
const peers = (await waku2.store.connectedPeers()).map((peer) =>
const peers = (await waku2.store.protocol.connectedPeers()).map((peer) =>
peer.id.toString()
);
const nimPeerId = multiAddrWithId.getPeerId();
@ -144,7 +144,7 @@ describe("Wait for remote peer", function () {
await waku2.dial(multiAddrWithId);
await waitPromise;
const peers = (await waku2.store.connectedPeers()).map((peer) =>
const peers = (await waku2.store.protocol.connectedPeers()).map((peer) =>
peer.id.toString()
);
@ -235,8 +235,8 @@ describe("Wait for remote peer", function () {
const filterPeers = (await waku2.filter.connectedPeers()).map((peer) =>
peer.id.toString()
);
const storePeers = (await waku2.store.connectedPeers()).map((peer) =>
peer.id.toString()
const storePeers = (await waku2.store.protocol.connectedPeers()).map(
(peer) => peer.id.toString()
);
const lightPushPeers = (
await waku2.lightPush.protocol.connectedPeers()