chore(lightpush)!: move protocol implementation to `@waku/sdk` (1/n) (#1964)

* chore: decouple `Filter` between `core` and `sdk`
moves `SubscriptionManager` to `sdk` side

* chore: update package dependencies
also update peer deps in sdk

* chore: update imports

* chore: update tests

* chore(side-change): update lightpush

* chore: update size-limit import

* chore(sdk): update dependencies
This commit is contained in:
Danish Arora 2024-04-19 17:20:34 +05:30 committed by GitHub
parent e5e8cd5e17
commit 5fb100602b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 541 additions and 522 deletions

View File

@ -44,7 +44,7 @@ module.exports = [
},
{
name: "Waku Filter",
path: "packages/core/bundle/index.js",
path: "packages/sdk/bundle/index.js",
import: "{ wakuFilter }",
},
{

72
package-lock.json generated
View File

@ -36494,15 +36494,14 @@
},
"packages/core": {
"name": "@waku/core",
"version": "0.0.27",
"version": "0.0.28",
"license": "MIT OR Apache-2.0",
"dependencies": {
"@libp2p/ping": "^1.0.12",
"@waku/enr": "^0.0.21",
"@waku/interfaces": "0.0.22",
"@waku/message-hash": "^0.1.11",
"@waku/enr": "^0.0.22",
"@waku/interfaces": "0.0.23",
"@waku/proto": "0.0.6",
"@waku/utils": "0.0.15",
"@waku/utils": "0.0.16",
"debug": "^4.3.4",
"it-all": "^3.0.4",
"it-length-prefixed": "^9.0.4",
@ -36538,7 +36537,6 @@
"@multiformats/multiaddr": "^12.0.0",
"@waku/enr": "^0.0.21",
"@waku/interfaces": "0.0.22",
"@waku/message-hash": "^0.1.11",
"@waku/proto": "0.0.6",
"@waku/utils": "0.0.15",
"libp2p": "^1.1.2"
@ -36557,11 +36555,11 @@
"version": "0.0.1",
"license": "MIT OR Apache-2.0",
"dependencies": {
"@waku/core": "0.0.27",
"@waku/enr": "0.0.21",
"@waku/interfaces": "0.0.22",
"@waku/core": "0.0.28",
"@waku/enr": "0.0.22",
"@waku/interfaces": "0.0.23",
"@waku/proto": "^0.0.6",
"@waku/utils": "0.0.15",
"@waku/utils": "0.0.16",
"debug": "^4.3.4",
"dns-query": "^0.11.2",
"hi-base32": "^0.5.1",
@ -36608,7 +36606,7 @@
},
"packages/enr": {
"name": "@waku/enr",
"version": "0.0.21",
"version": "0.0.22",
"license": "MIT OR Apache-2.0",
"dependencies": {
"@ethersproject/rlp": "^5.7.0",
@ -36616,7 +36614,7 @@
"@libp2p/peer-id": "^4.0.4",
"@multiformats/multiaddr": "^12.0.0",
"@noble/secp256k1": "^1.7.1",
"@waku/utils": "0.0.15",
"@waku/utils": "0.0.16",
"debug": "^4.3.4",
"js-sha3": "^0.9.2"
},
@ -36628,7 +36626,7 @@
"@types/chai": "^4.3.11",
"@types/mocha": "^10.0.6",
"@waku/build-utils": "*",
"@waku/interfaces": "0.0.22",
"@waku/interfaces": "0.0.23",
"chai": "^4.3.10",
"cspell": "^8.6.1",
"fast-check": "^3.15.1",
@ -36657,7 +36655,7 @@
},
"packages/interfaces": {
"name": "@waku/interfaces",
"version": "0.0.22",
"version": "0.0.23",
"license": "MIT OR Apache-2.0",
"dependencies": {
"@waku/proto": "^0.0.6"
@ -36675,14 +36673,14 @@
},
"packages/message-encryption": {
"name": "@waku/message-encryption",
"version": "0.0.25",
"version": "0.0.26",
"license": "MIT OR Apache-2.0",
"dependencies": {
"@noble/secp256k1": "^1.7.1",
"@waku/core": "0.0.27",
"@waku/interfaces": "0.0.22",
"@waku/core": "0.0.28",
"@waku/interfaces": "0.0.23",
"@waku/proto": "0.0.6",
"@waku/utils": "0.0.15",
"@waku/utils": "0.0.16",
"debug": "^4.3.4",
"js-sha3": "^0.9.2",
"uint8arrays": "^5.0.1"
@ -36719,11 +36717,11 @@
},
"packages/message-hash": {
"name": "@waku/message-hash",
"version": "0.1.11",
"version": "0.1.12",
"license": "MIT OR Apache-2.0",
"dependencies": {
"@noble/hashes": "^1.3.2",
"@waku/utils": "0.0.15"
"@waku/utils": "0.0.16"
},
"devDependencies": {
"@rollup/plugin-commonjs": "^25.0.7",
@ -36733,7 +36731,7 @@
"@types/debug": "^4.1.12",
"@types/mocha": "^10.0.6",
"@waku/build-utils": "*",
"@waku/interfaces": "0.0.22",
"@waku/interfaces": "0.0.23",
"chai": "^4.3.10",
"cspell": "^8.6.1",
"fast-check": "^3.15.1",
@ -36801,15 +36799,15 @@
},
"packages/relay": {
"name": "@waku/relay",
"version": "0.0.10",
"version": "0.0.11",
"license": "MIT OR Apache-2.0",
"dependencies": {
"@chainsafe/libp2p-gossipsub": "^12.0.0",
"@noble/hashes": "^1.3.2",
"@waku/core": "0.0.27",
"@waku/interfaces": "0.0.22",
"@waku/core": "0.0.28",
"@waku/interfaces": "0.0.23",
"@waku/proto": "0.0.6",
"@waku/utils": "0.0.15",
"@waku/utils": "0.0.16",
"chai": "^4.3.10",
"debug": "^4.3.4",
"fast-check": "^3.15.1"
@ -36842,7 +36840,7 @@
},
"packages/sdk": {
"name": "@waku/sdk",
"version": "0.0.23",
"version": "0.0.24",
"license": "MIT OR Apache-2.0",
"dependencies": {
"@chainsafe/libp2p-noise": "^14.1.0",
@ -36852,11 +36850,12 @@
"@libp2p/ping": "^1.0.12",
"@libp2p/websockets": "^8.0.11",
"@noble/hashes": "^1.3.3",
"@waku/core": "0.0.27",
"@waku/core": "0.0.28",
"@waku/discovery": "0.0.1",
"@waku/interfaces": "0.0.22",
"@waku/relay": "0.0.10",
"@waku/utils": "0.0.15",
"@waku/interfaces": "0.0.23",
"@waku/proto": "^0.0.6",
"@waku/relay": "0.0.11",
"@waku/utils": "0.0.16",
"libp2p": "^1.1.2"
},
"devDependencies": {
@ -36876,10 +36875,11 @@
},
"peerDependencies": {
"@libp2p/bootstrap": "^10",
"@waku/core": "0.0.27",
"@waku/interfaces": "0.0.22",
"@waku/relay": "0.0.10",
"@waku/utils": "0.0.15"
"@waku/core": "0.0.28",
"@waku/interfaces": "0.0.23",
"@waku/message-hash": "^0.1.12",
"@waku/relay": "0.0.11",
"@waku/utils": "0.0.16"
},
"peerDependenciesMeta": {
"@libp2p/bootstrap": {
@ -36940,11 +36940,11 @@
},
"packages/utils": {
"name": "@waku/utils",
"version": "0.0.15",
"version": "0.0.16",
"license": "MIT OR Apache-2.0",
"dependencies": {
"@noble/hashes": "^1.3.2",
"@waku/interfaces": "0.0.22",
"@waku/interfaces": "0.0.23",
"chai": "^4.3.10",
"debug": "^4.3.4",
"uint8arrays": "^5.0.1"

View File

@ -75,7 +75,6 @@
"@libp2p/ping": "^1.0.12",
"@waku/enr": "^0.0.22",
"@waku/interfaces": "0.0.23",
"@waku/message-hash": "^0.1.12",
"@waku/proto": "0.0.6",
"@waku/utils": "0.0.16",
"debug": "^4.3.4",
@ -111,7 +110,6 @@
"libp2p": "^1.1.2",
"@waku/enr": "^0.0.21",
"@waku/interfaces": "0.0.22",
"@waku/message-hash": "^0.1.11",
"@waku/proto": "0.0.6",
"@waku/utils": "0.0.15"
},

View File

@ -7,7 +7,7 @@ export type {
export * as message from "./lib/message/index.js";
export * as waku_filter from "./lib/filter/index.js";
export { wakuFilter, FilterCodecs } from "./lib/filter/index.js";
export { FilterCore, FilterCodecs } from "./lib/filter/index.js";
export * as waku_light_push from "./lib/light_push/index.js";
export { LightPushCodec, LightPushCore } from "./lib/light_push/index.js";

View File

@ -1,30 +1,13 @@
import { Stream } from "@libp2p/interface";
import type { Peer } from "@libp2p/interface";
import type { IncomingStreamData } from "@libp2p/interface-internal";
import type {
Callback,
ContentTopic,
IAsyncIterator,
IDecodedMessage,
IDecoder,
IFilter,
IProtoMessage,
IReceiver,
IBaseProtocolCore,
Libp2p,
ProtocolCreateOptions,
PubsubTopic,
SingleShardInfo,
Unsubscribe
PubsubTopic
} from "@waku/interfaces";
import { DefaultPubsubTopic } from "@waku/interfaces";
import { messageHashStr } from "@waku/message-hash";
import { WakuMessage } from "@waku/proto";
import {
ensurePubsubTopicIsConfigured,
groupByContentTopic,
singleShardInfoToPubsubTopic,
toAsyncIterator
} from "@waku/utils";
import { Logger } from "@waku/utils";
import all from "it-all";
import * as lp from "it-length-prefixed";
@ -40,329 +23,20 @@ import {
const log = new Logger("filter:v2");
type SubscriptionCallback<T extends IDecodedMessage> = {
decoders: IDecoder<T>[];
callback: Callback<T>;
};
export const FilterCodecs = {
SUBSCRIBE: "/vac/waku/filter-subscribe/2.0.0-beta1",
PUSH: "/vac/waku/filter-push/2.0.0-beta1"
};
/**
* A subscription object refers to a subscription to a given pubsub topic.
*/
class Subscription {
readonly peers: Peer[];
private readonly pubsubTopic: PubsubTopic;
private newStream: (peer: Peer) => Promise<Stream>;
readonly receivedMessagesHashStr: string[] = [];
private subscriptionCallbacks: Map<
ContentTopic,
SubscriptionCallback<IDecodedMessage>
>;
export class FilterCore extends BaseProtocol implements IBaseProtocolCore {
constructor(
pubsubTopic: PubsubTopic,
remotePeers: Peer[],
newStream: (peer: Peer) => Promise<Stream>
private handleIncomingMessage: (
pubsubTopic: PubsubTopic,
wakuMessage: WakuMessage
) => Promise<void>,
libp2p: Libp2p,
options?: ProtocolCreateOptions
) {
this.peers = remotePeers;
this.pubsubTopic = pubsubTopic;
this.newStream = newStream;
this.subscriptionCallbacks = new Map();
}
async subscribe<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>
): Promise<void> {
const decodersArray = Array.isArray(decoders) ? decoders : [decoders];
// check that all decoders are configured for the same pubsub topic as this subscription
decodersArray.forEach((decoder) => {
if (decoder.pubsubTopic !== this.pubsubTopic) {
throw new Error(
`Pubsub topic not configured: decoder is configured for pubsub topic ${decoder.pubsubTopic} but this subscription is for pubsub topic ${this.pubsubTopic}. Please create a new Subscription for the different pubsub topic.`
);
}
});
const decodersGroupedByCT = groupByContentTopic(decodersArray);
const contentTopics = Array.from(decodersGroupedByCT.keys());
const promises = this.peers.map(async (peer) => {
const stream = await this.newStream(peer);
const request = FilterSubscribeRpc.createSubscribeRequest(
this.pubsubTopic,
contentTopics
);
try {
const res = await pipe(
[request.encode()],
lp.encode,
stream,
lp.decode,
async (source) => await all(source)
);
if (!res || !res.length) {
throw Error(
`No response received for request ${request.requestId}: ${res}`
);
}
const { statusCode, requestId, statusDesc } =
FilterSubscribeResponse.decode(res[0].slice());
if (statusCode < 200 || statusCode >= 300) {
throw new Error(
`Filter subscribe request ${requestId} failed with status code ${statusCode}: ${statusDesc}`
);
}
log.info(
"Subscribed to peer ",
peer.id.toString(),
"for content topics",
contentTopics
);
} catch (e) {
throw new Error(
"Error subscribing to peer: " +
peer.id.toString() +
" for content topics: " +
contentTopics +
": " +
e
);
}
});
const results = await Promise.allSettled(promises);
this.handleErrors(results, "subscribe");
// Save the callback functions by content topics so they
// can easily be removed (reciprocally replaced) if `unsubscribe` (reciprocally `subscribe`)
// is called for those content topics
decodersGroupedByCT.forEach((decoders, contentTopic) => {
// Cast the type because a given `subscriptionCallbacks` map may hold
// Decoder that decode to different implementations of `IDecodedMessage`
const subscriptionCallback = {
decoders,
callback
} as unknown as SubscriptionCallback<IDecodedMessage>;
// The callback and decoder may override previous values, this is on
// purpose as the user may call `subscribe` to refresh the subscription
this.subscriptionCallbacks.set(contentTopic, subscriptionCallback);
});
}
async unsubscribe(contentTopics: ContentTopic[]): Promise<void> {
const promises = this.peers.map(async (peer) => {
const stream = await this.newStream(peer);
const unsubscribeRequest = FilterSubscribeRpc.createUnsubscribeRequest(
this.pubsubTopic,
contentTopics
);
try {
await pipe([unsubscribeRequest.encode()], lp.encode, stream.sink);
} catch (error) {
throw new Error("Error unsubscribing: " + error);
}
contentTopics.forEach((contentTopic: string) => {
this.subscriptionCallbacks.delete(contentTopic);
});
});
const results = await Promise.allSettled(promises);
this.handleErrors(results, "unsubscribe");
}
async ping(): Promise<void> {
const promises = this.peers.map(async (peer) => {
const stream = await this.newStream(peer);
const request = FilterSubscribeRpc.createSubscriberPingRequest();
try {
const res = await pipe(
[request.encode()],
lp.encode,
stream,
lp.decode,
async (source) => await all(source)
);
if (!res || !res.length) {
throw Error(
`No response received for request ${request.requestId}: ${res}`
);
}
const { statusCode, requestId, statusDesc } =
FilterSubscribeResponse.decode(res[0].slice());
if (statusCode < 200 || statusCode >= 300) {
throw new Error(
`Filter ping request ${requestId} failed with status code ${statusCode}: ${statusDesc}`
);
}
log.info(`Ping successful for peer ${peer.id.toString()}`);
} catch (error) {
log.error("Error pinging: ", error);
throw error; // Rethrow the actual error instead of wrapping it
}
});
const results = await Promise.allSettled(promises);
this.handleErrors(results, "ping");
}
async unsubscribeAll(): Promise<void> {
const promises = this.peers.map(async (peer) => {
const stream = await this.newStream(peer);
const request = FilterSubscribeRpc.createUnsubscribeAllRequest(
this.pubsubTopic
);
try {
const res = await pipe(
[request.encode()],
lp.encode,
stream,
lp.decode,
async (source) => await all(source)
);
if (!res || !res.length) {
throw Error(
`No response received for request ${request.requestId}: ${res}`
);
}
const { statusCode, requestId, statusDesc } =
FilterSubscribeResponse.decode(res[0].slice());
if (statusCode < 200 || statusCode >= 300) {
throw new Error(
`Filter unsubscribe all request ${requestId} failed with status code ${statusCode}: ${statusDesc}`
);
}
this.subscriptionCallbacks.clear();
log.info(
`Unsubscribed from all content topics for pubsub topic ${this.pubsubTopic}`
);
} catch (error) {
throw new Error(
"Error unsubscribing from all content topics: " + error
);
}
});
const results = await Promise.allSettled(promises);
this.handleErrors(results, "unsubscribeAll");
}
async processMessage(message: WakuMessage): Promise<void> {
const hashedMessageStr = messageHashStr(
this.pubsubTopic,
message as IProtoMessage
);
if (this.receivedMessagesHashStr.includes(hashedMessageStr)) {
log.info("Message already received, skipping");
return;
}
this.receivedMessagesHashStr.push(hashedMessageStr);
const { contentTopic } = message;
const subscriptionCallback = this.subscriptionCallbacks.get(contentTopic);
if (!subscriptionCallback) {
log.error("No subscription callback available for ", contentTopic);
return;
}
log.info(
"Processing message with content topic ",
contentTopic,
" on pubsub topic ",
this.pubsubTopic
);
await pushMessage(subscriptionCallback, this.pubsubTopic, message);
}
// Filter out only the rejected promises and extract & handle their reasons
private handleErrors(
results: PromiseSettledResult<void>[],
type: "ping" | "subscribe" | "unsubscribe" | "unsubscribeAll"
): void {
const errors = results
.filter(
(result): result is PromiseRejectedResult =>
result.status === "rejected"
)
.map((rejectedResult) => rejectedResult.reason);
if (errors.length === this.peers.length) {
const errorCounts = new Map<string, number>();
// TODO: streamline error logging with https://github.com/orgs/waku-org/projects/2/views/1?pane=issue&itemId=42849952
errors.forEach((error) => {
const message = error instanceof Error ? error.message : String(error);
errorCounts.set(message, (errorCounts.get(message) || 0) + 1);
});
const uniqueErrorMessages = Array.from(
errorCounts,
([message, count]) => `${message} (occurred ${count} times)`
).join(", ");
throw new Error(`Error ${type} all peers: ${uniqueErrorMessages}`);
} else if (errors.length > 0) {
// TODO: handle renewing faulty peers with new peers (https://github.com/waku-org/js-waku/issues/1463)
log.warn(
`Some ${type} failed. These will be refreshed with new peers`,
errors
);
} else {
log.info(`${type} successful for all peers`);
}
}
}
const DEFAULT_NUM_PEERS = 3;
class Filter extends BaseProtocol implements IReceiver {
private activeSubscriptions = new Map<string, Subscription>();
private getActiveSubscription(
pubsubTopic: PubsubTopic
): Subscription | undefined {
return this.activeSubscriptions.get(pubsubTopic);
}
private setActiveSubscription(
pubsubTopic: PubsubTopic,
subscription: Subscription
): Subscription {
this.activeSubscriptions.set(pubsubTopic, subscription);
return subscription;
}
//TODO: Remove when FilterCore and FilterSDK are introduced
private readonly numPeersToUse: number;
constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
super(
FilterCodecs.SUBSCRIBE,
libp2p.components,
@ -371,92 +45,9 @@ class Filter extends BaseProtocol implements IReceiver {
options
);
this.numPeersToUse = options?.numPeersToUse ?? DEFAULT_NUM_PEERS;
libp2p.handle(FilterCodecs.PUSH, this.onRequest.bind(this)).catch((e) => {
log.error("Failed to register ", FilterCodecs.PUSH, e);
});
this.activeSubscriptions = new Map();
}
/**
* Creates a new subscription to the given pubsub topic.
* The subscription is made to multiple peers for decentralization.
* @param pubsubTopicShardInfo The pubsub topic to subscribe to.
* @returns The subscription object.
*/
async createSubscription(
pubsubTopicShardInfo: SingleShardInfo | PubsubTopic = DefaultPubsubTopic
): Promise<Subscription> {
const pubsubTopic =
typeof pubsubTopicShardInfo == "string"
? pubsubTopicShardInfo
: singleShardInfoToPubsubTopic(pubsubTopicShardInfo);
ensurePubsubTopicIsConfigured(pubsubTopic, this.pubsubTopics);
const peers = await this.getPeers({
maxBootstrapPeers: 1,
numPeers: this.numPeersToUse
});
if (peers.length === 0) {
throw new Error("No peer found to initiate subscription.");
}
log.info(
`Creating filter subscription with ${peers.length} peers: `,
peers.map((peer) => peer.id.toString())
);
const subscription =
this.getActiveSubscription(pubsubTopic) ??
this.setActiveSubscription(
pubsubTopic,
new Subscription(pubsubTopic, peers, this.getStream.bind(this))
);
return subscription;
}
public toSubscriptionIterator<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[]
): Promise<IAsyncIterator<T>> {
return toAsyncIterator(this, decoders);
}
/**
* This method is used to satisfy the `IReceiver` interface.
*
* @hidden
*
* @param decoders The decoders to use for the subscription.
* @param callback The callback function to use for the subscription.
* @param opts Optional protocol options for the subscription.
*
* @returns A Promise that resolves to a function that unsubscribes from the subscription.
*
* @remarks
* This method should not be used directly.
* Instead, use `createSubscription` to create a new subscription.
*/
async subscribe<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>
): Promise<Unsubscribe> {
const subscription = await this.createSubscription();
await subscription.subscribe(decoders, callback);
const contentTopics = Array.from(
groupByContentTopic(
Array.isArray(decoders) ? decoders : [decoders]
).keys()
);
return async () => {
await subscription.unsubscribe(contentTopics);
};
}
private onRequest(streamData: IncomingStreamData): void {
@ -480,16 +71,7 @@ class Filter extends BaseProtocol implements IReceiver {
return;
}
const subscription = this.getActiveSubscription(pubsubTopic);
if (!subscription) {
log.error(
`No subscription locally registered for topic ${pubsubTopic}`
);
return;
}
await subscription.processMessage(wakuMessage);
await this.handleIncomingMessage(pubsubTopic, wakuMessage);
}
}).then(
() => {
@ -503,38 +85,118 @@ class Filter extends BaseProtocol implements IReceiver {
log.error("Error decoding message", e);
}
}
}
export function wakuFilter(
init: ProtocolCreateOptions = { pubsubTopics: [] }
): (libp2p: Libp2p) => IFilter {
return (libp2p: Libp2p) => new Filter(libp2p, init);
}
async subscribe(
pubsubTopic: PubsubTopic,
peer: Peer,
contentTopics: ContentTopic[]
): Promise<void> {
const stream = await this.getStream(peer);
async function pushMessage<T extends IDecodedMessage>(
subscriptionCallback: SubscriptionCallback<T>,
pubsubTopic: PubsubTopic,
message: WakuMessage
): Promise<void> {
const { decoders, callback } = subscriptionCallback;
const { contentTopic } = message;
if (!contentTopic) {
log.warn("Message has no content topic, skipping");
return;
}
try {
const decodePromises = decoders.map((dec) =>
dec
.fromProtoObj(pubsubTopic, message as IProtoMessage)
.then((decoded) => decoded || Promise.reject("Decoding failed"))
const request = FilterSubscribeRpc.createSubscribeRequest(
pubsubTopic,
contentTopics
);
const decodedMessage = await Promise.any(decodePromises);
const res = await pipe(
[request.encode()],
lp.encode,
stream,
lp.decode,
async (source) => await all(source)
);
await callback(decodedMessage);
} catch (e) {
log.error("Error decoding message", e);
if (!res || !res.length) {
throw Error(
`No response received for request ${request.requestId}: ${res}`
);
}
const { statusCode, requestId, statusDesc } =
FilterSubscribeResponse.decode(res[0].slice());
if (statusCode < 200 || statusCode >= 300) {
throw new Error(
`Filter subscribe request ${requestId} failed with status code ${statusCode}: ${statusDesc}`
);
}
}
async unsubscribe(
pubsubTopic: PubsubTopic,
peer: Peer,
contentTopics: ContentTopic[]
): Promise<void> {
const stream = await this.getStream(peer);
const unsubscribeRequest = FilterSubscribeRpc.createUnsubscribeRequest(
pubsubTopic,
contentTopics
);
await pipe([unsubscribeRequest.encode()], lp.encode, stream.sink);
}
async unsubscribeAll(pubsubTopic: PubsubTopic, peer: Peer): Promise<void> {
const stream = await this.getStream(peer);
const request = FilterSubscribeRpc.createUnsubscribeAllRequest(pubsubTopic);
const res = await pipe(
[request.encode()],
lp.encode,
stream,
lp.decode,
async (source) => await all(source)
);
if (!res || !res.length) {
throw Error(
`No response received for request ${request.requestId}: ${res}`
);
}
const { statusCode, requestId, statusDesc } =
FilterSubscribeResponse.decode(res[0].slice());
if (statusCode < 200 || statusCode >= 300) {
throw new Error(
`Filter unsubscribe all request ${requestId} failed with status code ${statusCode}: ${statusDesc}`
);
}
}
async ping(peer: Peer): Promise<void> {
const stream = await this.getStream(peer);
const request = FilterSubscribeRpc.createSubscriberPingRequest();
try {
const res = await pipe(
[request.encode()],
lp.encode,
stream,
lp.decode,
async (source) => await all(source)
);
if (!res || !res.length) {
throw Error(
`No response received for request ${request.requestId}: ${res}`
);
}
const { statusCode, requestId, statusDesc } =
FilterSubscribeResponse.decode(res[0].slice());
if (statusCode < 200 || statusCode >= 300) {
throw new Error(
`Filter ping request ${requestId} failed with status code ${statusCode}: ${statusDesc}`
);
}
log.info(`Ping successful for peer ${peer.id.toString()}`);
} catch (error) {
log.error("Error pinging: ", error);
throw error; // Rethrow the actual error instead of wrapping it
}
}
}

View File

@ -70,7 +70,7 @@ export async function waitForRemotePeer(
if (!waku.filter)
throw new Error("Cannot wait for Filter peer: protocol not mounted");
promises.push(
waitForConnectedPeer(waku.filter, waku.libp2p.services.metadata)
waitForConnectedPeer(waku.filter.protocol, waku.libp2p.services.metadata)
);
}

View File

@ -2,7 +2,11 @@ import type { PeerId } from "@libp2p/interface";
import type { IDecodedMessage, IDecoder, SingleShardInfo } from "./message.js";
import type { ContentTopic, PubsubTopic } from "./misc.js";
import type { Callback, IBaseProtocolCore } from "./protocols.js";
import type {
Callback,
IBaseProtocolCore,
IBaseProtocolSDK
} from "./protocols.js";
import type { IReceiver } from "./receiver.js";
export type ContentFilter = {
@ -22,8 +26,10 @@ export interface IFilterSubscription {
unsubscribeAll(): Promise<void>;
}
export type IFilter = IReceiver &
IBaseProtocolCore & {
export type IFilter = IReceiver & IBaseProtocolCore;
export type IFilterSDK = IReceiver &
IBaseProtocolSDK & { protocol: IBaseProtocolCore } & {
createSubscription(
pubsubTopicShardInfo?: SingleShardInfo | PubsubTopic,
peerId?: PeerId

View File

@ -2,7 +2,7 @@ import type { PeerId, Stream } from "@libp2p/interface";
import type { Multiaddr } from "@multiformats/multiaddr";
import { IConnectionManager } from "./connection_manager.js";
import type { IFilter } from "./filter.js";
import type { IFilterSDK } from "./filter.js";
import type { Libp2p } from "./libp2p.js";
import type { ILightPushSDK } from "./light_push.js";
import { Protocols } from "./protocols.js";
@ -13,7 +13,7 @@ export interface Waku {
libp2p: Libp2p;
relay?: IRelay;
store?: IStoreSDK;
filter?: IFilter;
filter?: IFilterSDK;
lightPush?: ILightPushSDK;
connectionManager: IConnectionManager;
@ -32,7 +32,7 @@ export interface Waku {
export interface LightNode extends Waku {
relay: undefined;
store: IStoreSDK;
filter: IFilter;
filter: IFilterSDK;
lightPush: ILightPushSDK;
}
@ -46,6 +46,6 @@ export interface RelayNode extends Waku {
export interface FullNode extends Waku {
relay: IRelay;
store: IStoreSDK;
filter: IFilter;
filter: IFilterSDK;
lightPush: ILightPushSDK;
}

View File

@ -71,6 +71,7 @@
"@waku/core": "0.0.28",
"@waku/discovery": "0.0.1",
"@waku/interfaces": "0.0.23",
"@waku/proto": "^0.0.6",
"@waku/relay": "0.0.11",
"@waku/utils": "0.0.16",
"libp2p": "^1.1.2"
@ -89,10 +90,11 @@
},
"peerDependencies": {
"@libp2p/bootstrap": "^10",
"@waku/core": "0.0.27",
"@waku/interfaces": "0.0.22",
"@waku/relay": "0.0.10",
"@waku/utils": "0.0.15"
"@waku/core": "0.0.28",
"@waku/interfaces": "0.0.23",
"@waku/message-hash": "^0.1.12",
"@waku/relay": "0.0.11",
"@waku/utils": "0.0.16"
},
"peerDependenciesMeta": {
"@waku/interfaces": {

View File

@ -13,6 +13,7 @@ export * from "./waku.js";
export { createLightNode, createNode } from "./light-node/index.js";
export { wakuLightPush } from "./protocols/light_push.js";
export { wakuFilter } from "./protocols/filter.js";
export { wakuStore } from "./protocols/store.js";
export * as waku from "@waku/core";

View File

@ -1,6 +1,6 @@
import { wakuFilter } from "@waku/core";
import { type Libp2pComponents, type LightNode } from "@waku/interfaces";
import { wakuFilter } from "../protocols/filter.js";
import { wakuLightPush } from "../protocols/light_push.js";
import { wakuStore } from "../protocols/store.js";
import { createLibp2pAndUpdateOptions } from "../utils/libp2p.js";

View File

@ -1,4 +1,4 @@
import { IBaseProtocolSDK } from "..";
import { IBaseProtocolSDK } from "@waku/interfaces";
interface Options {
numPeersToUse?: number;

View File

@ -0,0 +1,351 @@
import type { Peer } from "@libp2p/interface";
import { FilterCore } from "@waku/core";
import {
type Callback,
ContentTopic,
DefaultPubsubTopic,
type IAsyncIterator,
type IDecodedMessage,
type IDecoder,
type IFilterSDK,
IProtoMessage,
type Libp2p,
type ProtocolCreateOptions,
type PubsubTopic,
type SingleShardInfo,
type Unsubscribe
} from "@waku/interfaces";
import { messageHashStr } from "@waku/message-hash";
import { WakuMessage } from "@waku/proto";
import {
ensurePubsubTopicIsConfigured,
groupByContentTopic,
Logger,
singleShardInfoToPubsubTopic,
toAsyncIterator
} from "@waku/utils";
import { BaseProtocolSDK } from "./base_protocol";
type SubscriptionCallback<T extends IDecodedMessage> = {
decoders: IDecoder<T>[];
callback: Callback<T>;
};
const log = new Logger("sdk:filter");
export class SubscriptionManager {
private readonly pubsubTopic: PubsubTopic;
readonly peers: Peer[];
readonly receivedMessagesHashStr: string[] = [];
private subscriptionCallbacks: Map<
ContentTopic,
SubscriptionCallback<IDecodedMessage>
>;
constructor(
pubsubTopic: PubsubTopic,
remotePeers: Peer[],
private protocol: FilterCore
) {
this.peers = remotePeers;
this.pubsubTopic = pubsubTopic;
this.subscriptionCallbacks = new Map();
}
async subscribe<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>
): Promise<void> {
const decodersArray = Array.isArray(decoders) ? decoders : [decoders];
// check that all decoders are configured for the same pubsub topic as this subscription
decodersArray.forEach((decoder) => {
if (decoder.pubsubTopic !== this.pubsubTopic) {
throw new Error(
`Pubsub topic not configured: decoder is configured for pubsub topic ${decoder.pubsubTopic} but this subscription is for pubsub topic ${this.pubsubTopic}. Please create a new Subscription for the different pubsub topic.`
);
}
});
const decodersGroupedByCT = groupByContentTopic(decodersArray);
const contentTopics = Array.from(decodersGroupedByCT.keys());
const promises = this.peers.map(async (peer) => {
await this.protocol.subscribe(this.pubsubTopic, peer, contentTopics);
});
const results = await Promise.allSettled(promises);
this.handleErrors(results, "subscribe");
// Save the callback functions by content topics so they
// can easily be removed (reciprocally replaced) if `unsubscribe` (reciprocally `subscribe`)
// is called for those content topics
decodersGroupedByCT.forEach((decoders, contentTopic) => {
// Cast the type because a given `subscriptionCallbacks` map may hold
// Decoder that decode to different implementations of `IDecodedMessage`
const subscriptionCallback = {
decoders,
callback
} as unknown as SubscriptionCallback<IDecodedMessage>;
// The callback and decoder may override previous values, this is on
// purpose as the user may call `subscribe` to refresh the subscription
this.subscriptionCallbacks.set(contentTopic, subscriptionCallback);
});
}
async unsubscribe(contentTopics: ContentTopic[]): Promise<void> {
const promises = this.peers.map(async (peer) => {
await this.protocol.unsubscribe(this.pubsubTopic, peer, contentTopics);
contentTopics.forEach((contentTopic: string) => {
this.subscriptionCallbacks.delete(contentTopic);
});
});
const results = await Promise.allSettled(promises);
this.handleErrors(results, "unsubscribe");
}
async ping(): Promise<void> {
const promises = this.peers.map(async (peer) => {
await this.protocol.ping(peer);
});
const results = await Promise.allSettled(promises);
this.handleErrors(results, "ping");
}
async unsubscribeAll(): Promise<void> {
const promises = this.peers.map(async (peer) => {
await this.protocol.unsubscribeAll(this.pubsubTopic, peer);
});
const results = await Promise.allSettled(promises);
this.subscriptionCallbacks.clear();
this.handleErrors(results, "unsubscribeAll");
}
async processIncomingMessage(message: WakuMessage): Promise<void> {
const hashedMessageStr = messageHashStr(
this.pubsubTopic,
message as IProtoMessage
);
if (this.receivedMessagesHashStr.includes(hashedMessageStr)) {
log.info("Message already received, skipping");
return;
}
this.receivedMessagesHashStr.push(hashedMessageStr);
const { contentTopic } = message;
const subscriptionCallback = this.subscriptionCallbacks.get(contentTopic);
if (!subscriptionCallback) {
log.error("No subscription callback available for ", contentTopic);
return;
}
log.info(
"Processing message with content topic ",
contentTopic,
" on pubsub topic ",
this.pubsubTopic
);
await pushMessage(subscriptionCallback, this.pubsubTopic, message);
}
// Filter out only the rejected promises and extract & handle their reasons
private handleErrors(
results: PromiseSettledResult<void>[],
type: "ping" | "subscribe" | "unsubscribe" | "unsubscribeAll"
): void {
const errors = results
.filter(
(result): result is PromiseRejectedResult =>
result.status === "rejected"
)
.map((rejectedResult) => rejectedResult.reason);
if (errors.length === this.peers.length) {
const errorCounts = new Map<string, number>();
// TODO: streamline error logging with https://github.com/orgs/waku-org/projects/2/views/1?pane=issue&itemId=42849952
errors.forEach((error) => {
const message = error instanceof Error ? error.message : String(error);
errorCounts.set(message, (errorCounts.get(message) || 0) + 1);
});
const uniqueErrorMessages = Array.from(
errorCounts,
([message, count]) => `${message} (occurred ${count} times)`
).join(", ");
throw new Error(`Error ${type} all peers: ${uniqueErrorMessages}`);
} else if (errors.length > 0) {
// TODO: handle renewing faulty peers with new peers (https://github.com/waku-org/js-waku/issues/1463)
log.warn(
`Some ${type} failed. These will be refreshed with new peers`,
errors
);
} else {
log.info(`${type} successful for all peers`);
}
}
}
class FilterSDK extends BaseProtocolSDK implements IFilterSDK {
public readonly protocol: FilterCore;
private activeSubscriptions = new Map<string, SubscriptionManager>();
private async handleIncomingMessage(
pubsubTopic: PubsubTopic,
wakuMessage: WakuMessage
): Promise<void> {
const subscription = this.getActiveSubscription(pubsubTopic);
if (!subscription) {
log.error(`No subscription locally registered for topic ${pubsubTopic}`);
return;
}
await subscription.processIncomingMessage(wakuMessage);
}
constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
super({ numPeersToUse: options?.numPeersToUse });
this.protocol = new FilterCore(
this.handleIncomingMessage.bind(this),
libp2p,
options
);
this.activeSubscriptions = new Map();
}
//TODO: move to SubscriptionManager
private getActiveSubscription(
pubsubTopic: PubsubTopic
): SubscriptionManager | undefined {
return this.activeSubscriptions.get(pubsubTopic);
}
private setActiveSubscription(
pubsubTopic: PubsubTopic,
subscription: SubscriptionManager
): SubscriptionManager {
this.activeSubscriptions.set(pubsubTopic, subscription);
return subscription;
}
/**
* Creates a new subscription to the given pubsub topic.
* The subscription is made to multiple peers for decentralization.
* @param pubsubTopicShardInfo The pubsub topic to subscribe to.
* @returns The subscription object.
*/
async createSubscription(
pubsubTopicShardInfo: SingleShardInfo | PubsubTopic = DefaultPubsubTopic
): Promise<SubscriptionManager> {
const pubsubTopic =
typeof pubsubTopicShardInfo == "string"
? pubsubTopicShardInfo
: singleShardInfoToPubsubTopic(pubsubTopicShardInfo);
ensurePubsubTopicIsConfigured(pubsubTopic, this.protocol.pubsubTopics);
const peers = await this.protocol.getPeers();
if (peers.length === 0) {
throw new Error("No peer found to initiate subscription.");
}
log.info(
`Creating filter subscription with ${peers.length} peers: `,
peers.map((peer) => peer.id.toString())
);
const subscription =
this.getActiveSubscription(pubsubTopic) ??
this.setActiveSubscription(
pubsubTopic,
new SubscriptionManager(pubsubTopic, peers, this.protocol)
);
return subscription;
}
//TODO: remove this dependency on IReceiver
/**
* This method is used to satisfy the `IReceiver` interface.
*
* @hidden
*
* @param decoders The decoders to use for the subscription.
* @param callback The callback function to use for the subscription.
* @param opts Optional protocol options for the subscription.
*
* @returns A Promise that resolves to a function that unsubscribes from the subscription.
*
* @remarks
* This method should not be used directly.
* Instead, use `createSubscription` to create a new subscription.
*/
async subscribe<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>
): Promise<Unsubscribe> {
const subscription = await this.createSubscription();
await subscription.subscribe(decoders, callback);
const contentTopics = Array.from(
groupByContentTopic(
Array.isArray(decoders) ? decoders : [decoders]
).keys()
);
return async () => {
await subscription.unsubscribe(contentTopics);
};
}
public toSubscriptionIterator<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[]
): Promise<IAsyncIterator<T>> {
return toAsyncIterator(this, decoders);
}
}
export function wakuFilter(
init: ProtocolCreateOptions
): (libp2p: Libp2p) => IFilterSDK {
return (libp2p: Libp2p) => new FilterSDK(libp2p, init);
}
async function pushMessage<T extends IDecodedMessage>(
subscriptionCallback: SubscriptionCallback<T>,
pubsubTopic: PubsubTopic,
message: WakuMessage
): Promise<void> {
const { decoders, callback } = subscriptionCallback;
const { contentTopic } = message;
if (!contentTopic) {
log.warn("Message has no content topic, skipping");
return;
}
try {
const decodePromises = decoders.map((dec) =>
dec
.fromProtoObj(pubsubTopic, message as IProtoMessage)
.then((decoded) => decoded || Promise.reject("Decoding failed"))
);
const decodedMessage = await Promise.any(decodePromises);
await callback(decodedMessage);
} catch (e) {
log.error("Error decoding message", e);
}
}

View File

@ -14,14 +14,13 @@ import { ensurePubsubTopicIsConfigured, Logger } from "@waku/utils";
import { BaseProtocolSDK } from "./base_protocol.js";
const DEFAULT_NUM_PEERS = 3;
const log = new Logger("sdk:light-push");
export class LightPushSDK extends BaseProtocolSDK implements ILightPushSDK {
class LightPushSDK extends BaseProtocolSDK implements ILightPushSDK {
public readonly protocol: LightPushCore;
constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
super({ numPeersToUse: options?.numPeersToUse ?? DEFAULT_NUM_PEERS });
super({ numPeersToUse: options?.numPeersToUse });
this.protocol = new LightPushCore(libp2p, options);
}

View File

@ -1,7 +1,7 @@
import { wakuFilter } from "@waku/core";
import { type FullNode, type RelayNode } from "@waku/interfaces";
import { RelayCreateOptions, wakuRelay } from "@waku/relay";
import { wakuFilter } from "../protocols/filter.js";
import { wakuLightPush } from "../protocols/light_push.js";
import { wakuStore } from "../protocols/store.js";
import { createLibp2pAndUpdateOptions } from "../utils/libp2p.js";

View File

@ -4,7 +4,7 @@ import { multiaddr, Multiaddr, MultiaddrInput } from "@multiformats/multiaddr";
import { ConnectionManager, DecodedMessage } from "@waku/core";
import type {
Callback,
IFilter,
IFilterSDK,
IFilterSubscription,
ILightPushSDK,
IRelay,
@ -56,7 +56,7 @@ export class WakuNode implements Waku {
public libp2p: Libp2p;
public relay?: IRelay;
public store?: IStoreSDK;
public filter?: IFilter;
public filter?: IFilterSDK;
public lightPush?: ILightPushSDK;
public connectionManager: ConnectionManager;
public readonly pubsubTopics: PubsubTopic[];
@ -66,7 +66,7 @@ export class WakuNode implements Waku {
libp2p: Libp2p,
store?: (libp2p: Libp2p) => IStoreSDK,
lightPush?: (libp2p: Libp2p) => ILightPushSDK,
filter?: (libp2p: Libp2p) => IFilter,
filter?: (libp2p: Libp2p) => IFilterSDK,
relay?: (libp2p: Libp2p) => IRelay
) {
if (options.pubsubTopics.length == 0) {
@ -166,7 +166,7 @@ export class WakuNode implements Waku {
}
if (_protocols.includes(Protocols.Filter)) {
if (this.filter) {
codecs.push(this.filter.multicodec);
codecs.push(this.filter.protocol.multicodec);
} else {
log.error(
"Filter codec not included in dial codec: protocol not mounted locally"

View File

@ -1,4 +1,4 @@
import { wakuFilter } from "@waku/core";
import { wakuFilter } from "@waku/sdk";
import {
bytesToUtf8,
createEncoder,

View File

@ -200,7 +200,7 @@ describe("Wait for remote peer", function () {
await waku2.dial(multiAddrWithId);
await waitForRemotePeer(waku2, [Protocols.Filter]);
const peers = (await waku2.filter.connectedPeers()).map((peer) =>
const peers = (await waku2.filter.protocol.connectedPeers()).map((peer) =>
peer.id.toString()
);
@ -232,8 +232,8 @@ describe("Wait for remote peer", function () {
Protocols.LightPush
]);
const filterPeers = (await waku2.filter.connectedPeers()).map((peer) =>
peer.id.toString()
const filterPeers = (await waku2.filter.protocol.connectedPeers()).map(
(peer) => peer.id.toString()
);
const storePeers = (await waku2.store.protocol.connectedPeers()).map(
(peer) => peer.id.toString()