Rename gossipsub to wakurelaysub

This commit is contained in:
Richard Ramos 2021-03-24 09:42:37 -04:00
parent 7213a9e9c6
commit 874e7c124d
No known key found for this signature in database
GPG Key ID: 80D4B01265FDFE8F
26 changed files with 144 additions and 141 deletions

View File

@ -107,12 +107,12 @@ If you want to trace using a remote peer, you can do so using the `traced` daemo
For instance, to capture the trace as a json file, you can use the following option:
```go
pubsub.NewGossipSub(..., pubsub.NewEventTracer(pubsub.NewJSONTracer("/path/to/trace.json")))
pubsub.NewWakuRelaySub(..., pubsub.NewEventTracer(pubsub.NewJSONTracer("/path/to/trace.json")))
```
To capture the trace as a protobuf, you can use the following option:
```go
pubsub.NewGossipSub(..., pubsub.NewEventTracer(pubsub.NewPBTracer("/path/to/trace.pb")))
pubsub.NewWakuRelaySub(..., pubsub.NewEventTracer(pubsub.NewPBTracer("/path/to/trace.pb")))
```
Finally, to use the remote tracer, you can use the following incantations:
@ -128,7 +128,7 @@ if err != nil {
panic(err)
}
ps, err := pubsub.NewGossipSub(..., pubsub.WithEventTracer(tracer))
ps, err := pubsub.NewWakuRelaySub(..., pubsub.WithEventTracer(tracer))
```
## Contribute

View File

@ -8,7 +8,7 @@ import (
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
pb "github.com/libp2p/go-libp2p-pubsub/pb"
pb "github.com/status-im/go-wakurelay-pubsub/pb"
"github.com/libp2p/go-msgio/protoio"

View File

@ -4,7 +4,7 @@ import (
"testing"
compat_pb "github.com/libp2p/go-libp2p-pubsub/compat"
pb "github.com/libp2p/go-libp2p-pubsub/pb"
pb "github.com/status-im/go-wakurelay-pubsub/pb"
)
func TestMultitopicMessageCompatibility(t *testing.T) {

View File

@ -295,7 +295,7 @@ func TestGossipSubDiscoveryAfterBootstrap(t *testing.T) {
func waitUntilGossipsubMeshCount(ps *PubSub, topic string, count int) {
done := false
doneCh := make(chan bool, 1)
rt := ps.rt.(*GossipSubRouter)
rt := ps.rt.(*WakuRelaySubRouter)
for !done {
ps.eval <- func() {
doneCh <- len(rt.mesh[topic]) == count

2
doc.go
View File

@ -7,7 +7,7 @@
//
// - NewFloodSub creates an instance that uses the floodsub routing algorithm.
//
// - NewGossipSub creates an instance that uses the gossipsub routing algorithm.
// - NewWakuRelaySub creates an instance that uses the gossipsub routing algorithm.
//
// - NewRandomSub creates an instance that uses the randomsub routing algorithm.
//

View File

@ -13,7 +13,7 @@ import (
"testing"
"time"
pb "github.com/libp2p/go-libp2p-pubsub/pb"
pb "github.com/status-im/go-wakurelay-pubsub/pb"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"

View File

@ -33,7 +33,7 @@ func newGossipTracer() *gossipTracer {
}
}
func (gt *gossipTracer) Start(gs *GossipSubRouter) {
func (gt *gossipTracer) Start(gs *WakuRelaySubRouter) {
if gt == nil {
return
}

View File

@ -4,7 +4,7 @@ import (
"testing"
"time"
pb "github.com/libp2p/go-libp2p-pubsub/pb"
pb "github.com/status-im/go-wakurelay-pubsub/pb"
"github.com/libp2p/go-libp2p-core/peer"
)

View File

@ -7,7 +7,7 @@ import (
"sort"
"time"
pb "github.com/libp2p/go-libp2p-pubsub/pb"
pb "github.com/status-im/go-wakurelay-pubsub/pb"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
@ -26,6 +26,11 @@ const (
// See the spec for details about how v1.1.0 compares to v1.0.0:
// https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.1.md
GossipSubID_v11 = protocol.ID("/meshsub/1.1.0")
// WakuRelayID_v200b2 is the protocol ID for version 2.0.0-beta2 of the WakuRelay protocol.
// See the spec for details about how v1.1.0 compares to v1.0.0:
// https://specs.vac.dev/specs/waku/v2/waku-relay
WakuRelayID_v200b2 = protocol.ID("/vac/waku/relay/2.0.0-beta2")
)
var (
@ -163,22 +168,21 @@ var (
GossipSubIWantFollowupTime = 3 * time.Second
)
// NewGossipSub returns a new PubSub object using GossipSubRouter as the router.
func NewGossipSub(ctx context.Context, h host.Host, customProtocols []protocol.ID, opts ...Option) (*PubSub, error) {
rt := &GossipSubRouter{
customProtocols: customProtocols,
peers: make(map[peer.ID]protocol.ID),
mesh: make(map[string]map[peer.ID]struct{}),
fanout: make(map[string]map[peer.ID]struct{}),
lastpub: make(map[string]int64),
gossip: make(map[peer.ID][]*pb.ControlIHave),
control: make(map[peer.ID]*pb.ControlMessage),
backoff: make(map[string]map[peer.ID]time.Time),
peerhave: make(map[peer.ID]int),
iasked: make(map[peer.ID]int),
outbound: make(map[peer.ID]bool),
connect: make(chan connectInfo, GossipSubMaxPendingConnections),
mcache: NewMessageCache(GossipSubHistoryGossip, GossipSubHistoryLength),
// NewWakuRelaySub returns a new PubSub object using WakuRelaySubRouter as the router.
func NewWakuRelaySub(ctx context.Context, h host.Host, customProtocols []protocol.ID, opts ...Option) (*PubSub, error) {
rt := &WakuRelaySubRouter{
peers: make(map[peer.ID]protocol.ID),
mesh: make(map[string]map[peer.ID]struct{}),
fanout: make(map[string]map[peer.ID]struct{}),
lastpub: make(map[string]int64),
gossip: make(map[peer.ID][]*pb.ControlIHave),
control: make(map[peer.ID]*pb.ControlMessage),
backoff: make(map[string]map[peer.ID]time.Time),
peerhave: make(map[peer.ID]int),
iasked: make(map[peer.ID]int),
outbound: make(map[peer.ID]bool),
connect: make(chan connectInfo, GossipSubMaxPendingConnections),
mcache: NewMessageCache(GossipSubHistoryGossip, GossipSubHistoryLength),
// these are configured per router to allow variation in tests
D: GossipSubD,
@ -205,9 +209,9 @@ func NewGossipSub(ctx context.Context, h host.Host, customProtocols []protocol.I
// WithPeerScore is a gossipsub router option that enables peer scoring.
func WithPeerScore(params *PeerScoreParams, thresholds *PeerScoreThresholds) Option {
return func(ps *PubSub) error {
gs, ok := ps.rt.(*GossipSubRouter)
gs, ok := ps.rt.(*WakuRelaySubRouter)
if !ok {
return fmt.Errorf("pubsub router is not gossipsub")
return fmt.Errorf("pubsub router is not wakurelaysub")
}
// sanity check: validate the score parameters
@ -251,9 +255,9 @@ func WithPeerScore(params *PeerScoreParams, thresholds *PeerScoreThresholds) Opt
// to publishThreshold
func WithFloodPublish(floodPublish bool) Option {
return func(ps *PubSub) error {
gs, ok := ps.rt.(*GossipSubRouter)
gs, ok := ps.rt.(*WakuRelaySubRouter)
if !ok {
return fmt.Errorf("pubsub router is not gossipsub")
return fmt.Errorf("pubsub router is not wakurelaysub")
}
gs.floodPublish = floodPublish
@ -267,9 +271,9 @@ func WithFloodPublish(floodPublish bool) Option {
// used for bootstrapping.
func WithPeerExchange(doPX bool) Option {
return func(ps *PubSub) error {
gs, ok := ps.rt.(*GossipSubRouter)
gs, ok := ps.rt.(*WakuRelaySubRouter)
if !ok {
return fmt.Errorf("pubsub router is not gossipsub")
return fmt.Errorf("pubsub router is not wakurelaysub")
}
gs.doPX = doPX
@ -285,9 +289,9 @@ func WithPeerExchange(doPX bool) Option {
// symmetrically configured at both ends.
func WithDirectPeers(pis []peer.AddrInfo) Option {
return func(ps *PubSub) error {
gs, ok := ps.rt.(*GossipSubRouter)
gs, ok := ps.rt.(*WakuRelaySubRouter)
if !ok {
return fmt.Errorf("pubsub router is not gossipsub")
return fmt.Errorf("pubsub router is not wakurelaysub")
}
direct := make(map[peer.ID]struct{})
@ -312,37 +316,36 @@ func WithDirectPeers(pis []peer.AddrInfo) Option {
// 1s by default. The default value for direct connect ticks is 300.
func WithDirectConnectTicks(t uint64) Option {
return func(ps *PubSub) error {
gs, ok := ps.rt.(*GossipSubRouter)
gs, ok := ps.rt.(*WakuRelaySubRouter)
if !ok {
return fmt.Errorf("pubsub router is not gossipsub")
return fmt.Errorf("pubsub router is not wakurelaysub")
}
gs.directConnectTicks = t
return nil
}
}
// GossipSubRouter is a router that implements the gossipsub protocol.
// WakuRelaySubRouter is a router that implements the gossipsub protocol.
// For each topic we have joined, we maintain an overlay through which
// messages flow; this is the mesh map.
// For each topic we publish to without joining, we maintain a list of peers
// to use for injecting our messages in the overlay with stable routes; this
// is the fanout map. Fanout peer lists are expired if we don't publish any
// messages to their topic for GossipSubFanoutTTL.
type GossipSubRouter struct {
p *PubSub
customProtocols []protocol.ID
peers map[peer.ID]protocol.ID // peer protocols
direct map[peer.ID]struct{} // direct peers
mesh map[string]map[peer.ID]struct{} // topic meshes
fanout map[string]map[peer.ID]struct{} // topic fanout
lastpub map[string]int64 // last publish time for fanout topics
gossip map[peer.ID][]*pb.ControlIHave // pending gossip
control map[peer.ID]*pb.ControlMessage // pending control messages
peerhave map[peer.ID]int // number of IHAVEs received from peer in the last heartbeat
iasked map[peer.ID]int // number of messages we have asked from peer in the last heartbeat
outbound map[peer.ID]bool // connection direction cache, marks peers with outbound connections
backoff map[string]map[peer.ID]time.Time // prune backoff
connect chan connectInfo // px connection requests
type WakuRelaySubRouter struct {
p *PubSub
peers map[peer.ID]protocol.ID // peer protocols
direct map[peer.ID]struct{} // direct peers
mesh map[string]map[peer.ID]struct{} // topic meshes
fanout map[string]map[peer.ID]struct{} // topic fanout
lastpub map[string]int64 // last publish time for fanout topics
gossip map[peer.ID][]*pb.ControlIHave // pending gossip
control map[peer.ID]*pb.ControlMessage // pending control messages
peerhave map[peer.ID]int // number of IHAVEs received from peer in the last heartbeat
iasked map[peer.ID]int // number of messages we have asked from peer in the last heartbeat
outbound map[peer.ID]bool // connection direction cache, marks peers with outbound connections
backoff map[string]map[peer.ID]time.Time // prune backoff
connect chan connectInfo // px connection requests
mcache *MessageCache
tracer *pubsubTracer
@ -400,11 +403,11 @@ type connectInfo struct {
spr *record.Envelope
}
func (gs *GossipSubRouter) Protocols() []protocol.ID {
return append([]protocol.ID{GossipSubID_v11, GossipSubID_v10, FloodSubID}, gs.customProtocols...)
func (gs *WakuRelaySubRouter) Protocols() []protocol.ID {
return []protocol.ID{WakuRelayID_v200b2, GossipSubID_v11, GossipSubID_v10, FloodSubID}
}
func (gs *GossipSubRouter) Attach(p *PubSub) {
func (gs *WakuRelaySubRouter) Attach(p *PubSub) {
gs.p = p
gs.tracer = p.tracer
@ -441,7 +444,7 @@ func (gs *GossipSubRouter) Attach(p *PubSub) {
}
}
func (gs *GossipSubRouter) AddPeer(p peer.ID, proto protocol.ID) {
func (gs *WakuRelaySubRouter) AddPeer(p peer.ID, proto protocol.ID) {
log.Debugf("PEERUP: Add new peer %s using %s", p, proto)
gs.tracer.AddPeer(p, proto)
gs.peers[p] = proto
@ -464,7 +467,7 @@ loop:
gs.outbound[p] = outbound
}
func (gs *GossipSubRouter) RemovePeer(p peer.ID) {
func (gs *WakuRelaySubRouter) RemovePeer(p peer.ID) {
log.Debugf("PEERDOWN: Remove disconnected peer %s", p)
gs.tracer.RemovePeer(p)
delete(gs.peers, p)
@ -479,7 +482,7 @@ func (gs *GossipSubRouter) RemovePeer(p peer.ID) {
delete(gs.outbound, p)
}
func (gs *GossipSubRouter) EnoughPeers(topic string, suggested int) bool {
func (gs *WakuRelaySubRouter) EnoughPeers(topic string, suggested int) bool {
// check all peers in the topic
tmap, ok := gs.p.topics[topic]
if !ok {
@ -508,7 +511,7 @@ func (gs *GossipSubRouter) EnoughPeers(topic string, suggested int) bool {
return false
}
func (gs *GossipSubRouter) AcceptFrom(p peer.ID) AcceptStatus {
func (gs *WakuRelaySubRouter) AcceptFrom(p peer.ID) AcceptStatus {
_, direct := gs.direct[p]
if direct {
return AcceptAll
@ -521,7 +524,7 @@ func (gs *GossipSubRouter) AcceptFrom(p peer.ID) AcceptStatus {
return gs.gate.AcceptFrom(p)
}
func (gs *GossipSubRouter) HandleRPC(rpc *RPC) {
func (gs *WakuRelaySubRouter) HandleRPC(rpc *RPC) {
ctl := rpc.GetControl()
if ctl == nil {
return
@ -540,7 +543,7 @@ func (gs *GossipSubRouter) HandleRPC(rpc *RPC) {
gs.sendRPC(rpc.from, out)
}
func (gs *GossipSubRouter) handleIHave(p peer.ID, ctl *pb.ControlMessage) []*pb.ControlIWant {
func (gs *WakuRelaySubRouter) handleIHave(p peer.ID, ctl *pb.ControlMessage) []*pb.ControlIWant {
// we ignore IHAVE gossip from any peer whose score is below the gossip threshold
score := gs.score.Score(p)
if score < gs.gossipThreshold {
@ -604,7 +607,7 @@ func (gs *GossipSubRouter) handleIHave(p peer.ID, ctl *pb.ControlMessage) []*pb.
return []*pb.ControlIWant{&pb.ControlIWant{MessageIDs: iwantlst}}
}
func (gs *GossipSubRouter) handleIWant(p peer.ID, ctl *pb.ControlMessage) []*pb.Message {
func (gs *WakuRelaySubRouter) handleIWant(p peer.ID, ctl *pb.ControlMessage) []*pb.Message {
// we don't respond to IWANT requests from any peer whose score is below the gossip threshold
score := gs.score.Score(p)
if score < gs.gossipThreshold {
@ -643,7 +646,7 @@ func (gs *GossipSubRouter) handleIWant(p peer.ID, ctl *pb.ControlMessage) []*pb.
return msgs
}
func (gs *GossipSubRouter) handleGraft(p peer.ID, ctl *pb.ControlMessage) []*pb.ControlPrune {
func (gs *WakuRelaySubRouter) handleGraft(p peer.ID, ctl *pb.ControlMessage) []*pb.ControlPrune {
var prune []string
doPX := gs.doPX
@ -736,7 +739,7 @@ func (gs *GossipSubRouter) handleGraft(p peer.ID, ctl *pb.ControlMessage) []*pb.
return cprune
}
func (gs *GossipSubRouter) handlePrune(p peer.ID, ctl *pb.ControlMessage) {
func (gs *WakuRelaySubRouter) handlePrune(p peer.ID, ctl *pb.ControlMessage) {
score := gs.score.Score(p)
for _, prune := range ctl.GetPrune() {
@ -770,11 +773,11 @@ func (gs *GossipSubRouter) handlePrune(p peer.ID, ctl *pb.ControlMessage) {
}
}
func (gs *GossipSubRouter) addBackoff(p peer.ID, topic string) {
func (gs *WakuRelaySubRouter) addBackoff(p peer.ID, topic string) {
gs.doAddBackoff(p, topic, GossipSubPruneBackoff)
}
func (gs *GossipSubRouter) doAddBackoff(p peer.ID, topic string, interval time.Duration) {
func (gs *WakuRelaySubRouter) doAddBackoff(p peer.ID, topic string, interval time.Duration) {
backoff, ok := gs.backoff[topic]
if !ok {
backoff = make(map[peer.ID]time.Time)
@ -786,7 +789,7 @@ func (gs *GossipSubRouter) doAddBackoff(p peer.ID, topic string, interval time.D
}
}
func (gs *GossipSubRouter) pxConnect(peers []*pb.PeerInfo) {
func (gs *WakuRelaySubRouter) pxConnect(peers []*pb.PeerInfo) {
if len(peers) > GossipSubPrunePeers {
shufflePeerInfo(peers)
peers = peers[:GossipSubPrunePeers]
@ -839,7 +842,7 @@ func (gs *GossipSubRouter) pxConnect(peers []*pb.PeerInfo) {
}
}
func (gs *GossipSubRouter) connector() {
func (gs *WakuRelaySubRouter) connector() {
for {
select {
case ci := <-gs.connect:
@ -869,7 +872,7 @@ func (gs *GossipSubRouter) connector() {
}
}
func (gs *GossipSubRouter) Publish(msg *Message) {
func (gs *WakuRelaySubRouter) Publish(msg *Message) {
gs.mcache.Put(msg.Message)
from := msg.ReceivedFrom
@ -941,7 +944,7 @@ func (gs *GossipSubRouter) Publish(msg *Message) {
}
}
func (gs *GossipSubRouter) Join(topic string) {
func (gs *WakuRelaySubRouter) Join(topic string) {
gmap, ok := gs.mesh[topic]
if ok {
return
@ -992,7 +995,7 @@ func (gs *GossipSubRouter) Join(topic string) {
}
}
func (gs *GossipSubRouter) Leave(topic string) {
func (gs *WakuRelaySubRouter) Leave(topic string) {
gmap, ok := gs.mesh[topic]
if !ok {
return
@ -1010,19 +1013,19 @@ func (gs *GossipSubRouter) Leave(topic string) {
}
}
func (gs *GossipSubRouter) sendGraft(p peer.ID, topic string) {
func (gs *WakuRelaySubRouter) sendGraft(p peer.ID, topic string) {
graft := []*pb.ControlGraft{&pb.ControlGraft{TopicID: &topic}}
out := rpcWithControl(nil, nil, nil, graft, nil)
gs.sendRPC(p, out)
}
func (gs *GossipSubRouter) sendPrune(p peer.ID, topic string) {
func (gs *WakuRelaySubRouter) sendPrune(p peer.ID, topic string) {
prune := []*pb.ControlPrune{gs.makePrune(p, topic, gs.doPX)}
out := rpcWithControl(nil, nil, nil, nil, prune)
gs.sendRPC(p, out)
}
func (gs *GossipSubRouter) sendRPC(p peer.ID, out *RPC) {
func (gs *WakuRelaySubRouter) sendRPC(p peer.ID, out *RPC) {
// do we own the RPC?
own := false
@ -1069,7 +1072,7 @@ func (gs *GossipSubRouter) sendRPC(p peer.ID, out *RPC) {
}
}
func (gs *GossipSubRouter) doDropRPC(rpc *RPC, p peer.ID, reason string) {
func (gs *WakuRelaySubRouter) doDropRPC(rpc *RPC, p peer.ID, reason string) {
log.Debugf("dropping message to peer %s: %s", p.Pretty(), reason)
gs.tracer.DropRPC(rpc, p)
// push control messages that need to be retried
@ -1079,7 +1082,7 @@ func (gs *GossipSubRouter) doDropRPC(rpc *RPC, p peer.ID, reason string) {
}
}
func (gs *GossipSubRouter) doSendRPC(rpc *RPC, p peer.ID, mch chan *RPC) {
func (gs *WakuRelaySubRouter) doSendRPC(rpc *RPC, p peer.ID, mch chan *RPC) {
select {
case mch <- rpc:
gs.tracer.SendRPC(rpc, p)
@ -1204,7 +1207,7 @@ func fragmentMessageIds(msgIds []string, limit int) [][]string {
return out
}
func (gs *GossipSubRouter) heartbeatTimer() {
func (gs *WakuRelaySubRouter) heartbeatTimer() {
time.Sleep(GossipSubHeartbeatInitialDelay)
select {
case gs.p.eval <- gs.heartbeat:
@ -1229,7 +1232,7 @@ func (gs *GossipSubRouter) heartbeatTimer() {
}
}
func (gs *GossipSubRouter) heartbeat() {
func (gs *WakuRelaySubRouter) heartbeat() {
defer log.EventBegin(gs.p.ctx, "heartbeat").Done()
gs.heartbeatTicks++
@ -1484,7 +1487,7 @@ func (gs *GossipSubRouter) heartbeat() {
gs.mcache.Shift()
}
func (gs *GossipSubRouter) clearIHaveCounters() {
func (gs *WakuRelaySubRouter) clearIHaveCounters() {
if len(gs.peerhave) > 0 {
// throw away the old map and make a new one
gs.peerhave = make(map[peer.ID]int)
@ -1496,14 +1499,14 @@ func (gs *GossipSubRouter) clearIHaveCounters() {
}
}
func (gs *GossipSubRouter) applyIwantPenalties() {
func (gs *WakuRelaySubRouter) applyIwantPenalties() {
for p, count := range gs.gossipTracer.GetBrokenPromises() {
log.Infof("peer %s didn't follow up in %d IWANT requests; adding penalty", p, count)
gs.score.AddPenalty(p, count)
}
}
func (gs *GossipSubRouter) clearBackoff() {
func (gs *WakuRelaySubRouter) clearBackoff() {
// we only clear once every 15 ticks to avoid iterating over the map(s) too much
if gs.heartbeatTicks%15 != 0 {
return
@ -1524,7 +1527,7 @@ func (gs *GossipSubRouter) clearBackoff() {
}
}
func (gs *GossipSubRouter) directConnect() {
func (gs *WakuRelaySubRouter) directConnect() {
// we donly do this every some ticks to allow pending connections to complete and account
// for restarts/downtime
if gs.heartbeatTicks%gs.directConnectTicks != 0 {
@ -1548,7 +1551,7 @@ func (gs *GossipSubRouter) directConnect() {
}
}
func (gs *GossipSubRouter) sendGraftPrune(tograft, toprune map[peer.ID][]string, noPX map[peer.ID]bool) {
func (gs *WakuRelaySubRouter) sendGraftPrune(tograft, toprune map[peer.ID][]string, noPX map[peer.ID]bool) {
for p, topics := range tograft {
graft := make([]*pb.ControlGraft, 0, len(topics))
for _, topic := range topics {
@ -1588,7 +1591,7 @@ func (gs *GossipSubRouter) sendGraftPrune(tograft, toprune map[peer.ID][]string,
// emitGossip emits IHAVE gossip advertising items in the message cache window
// of this topic.
func (gs *GossipSubRouter) emitGossip(topic string, exclude map[peer.ID]struct{}) {
func (gs *WakuRelaySubRouter) emitGossip(topic string, exclude map[peer.ID]struct{}) {
mids := gs.mcache.GetGossipIDs(topic)
if len(mids) == 0 {
return
@ -1611,7 +1614,7 @@ func (gs *GossipSubRouter) emitGossip(topic string, exclude map[peer.ID]struct{}
for p := range gs.p.topics[topic] {
_, inExclude := exclude[p]
_, direct := gs.direct[p]
if !inExclude && !direct && (gs.peers[p] == GossipSubID_v10 || gs.peers[p] == GossipSubID_v11) && gs.score.Score(p) >= gs.gossipThreshold {
if !inExclude && !direct && (gs.peers[p] == GossipSubID_v10 || gs.peers[p] == GossipSubID_v11 || gs.peers[p] == WakuRelayID_v200b2) && gs.score.Score(p) >= gs.gossipThreshold {
peers = append(peers, p)
}
}
@ -1644,7 +1647,7 @@ func (gs *GossipSubRouter) emitGossip(topic string, exclude map[peer.ID]struct{}
}
}
func (gs *GossipSubRouter) flush() {
func (gs *WakuRelaySubRouter) flush() {
// send gossip first, which will also piggyback pending control
for p, ihave := range gs.gossip {
delete(gs.gossip, p)
@ -1660,13 +1663,13 @@ func (gs *GossipSubRouter) flush() {
}
}
func (gs *GossipSubRouter) enqueueGossip(p peer.ID, ihave *pb.ControlIHave) {
func (gs *WakuRelaySubRouter) enqueueGossip(p peer.ID, ihave *pb.ControlIHave) {
gossip := gs.gossip[p]
gossip = append(gossip, ihave)
gs.gossip[p] = gossip
}
func (gs *GossipSubRouter) piggybackGossip(p peer.ID, out *RPC, ihave []*pb.ControlIHave) {
func (gs *WakuRelaySubRouter) piggybackGossip(p peer.ID, out *RPC, ihave []*pb.ControlIHave) {
ctl := out.GetControl()
if ctl == nil {
ctl = &pb.ControlMessage{}
@ -1676,7 +1679,7 @@ func (gs *GossipSubRouter) piggybackGossip(p peer.ID, out *RPC, ihave []*pb.Cont
ctl.Ihave = ihave
}
func (gs *GossipSubRouter) pushControl(p peer.ID, ctl *pb.ControlMessage) {
func (gs *WakuRelaySubRouter) pushControl(p peer.ID, ctl *pb.ControlMessage) {
// remove IHAVE/IWANT from control message, gossip is not retried
ctl.Ihave = nil
ctl.Iwant = nil
@ -1685,7 +1688,7 @@ func (gs *GossipSubRouter) pushControl(p peer.ID, ctl *pb.ControlMessage) {
}
}
func (gs *GossipSubRouter) piggybackControl(p peer.ID, out *RPC, ctl *pb.ControlMessage) {
func (gs *WakuRelaySubRouter) piggybackControl(p peer.ID, out *RPC, ctl *pb.ControlMessage) {
// check control message for staleness first
var tograft []*pb.ControlGraft
var toprune []*pb.ControlPrune
@ -1733,7 +1736,7 @@ func (gs *GossipSubRouter) piggybackControl(p peer.ID, out *RPC, ctl *pb.Control
}
}
func (gs *GossipSubRouter) makePrune(p peer.ID, topic string, doPX bool) *pb.ControlPrune {
func (gs *WakuRelaySubRouter) makePrune(p peer.ID, topic string, doPX bool) *pb.ControlPrune {
if gs.peers[p] == GossipSubID_v10 {
// GossipSub v1.0 -- no peer exchange, the peer won't be able to parse it anyway
return &pb.ControlPrune{TopicID: &topic}
@ -1771,7 +1774,7 @@ func (gs *GossipSubRouter) makePrune(p peer.ID, topic string, doPX bool) *pb.Con
return &pb.ControlPrune{TopicID: &topic, Peers: px, Backoff: &backoff}
}
func (gs *GossipSubRouter) getPeers(topic string, count int, filter func(peer.ID) bool) []peer.ID {
func (gs *WakuRelaySubRouter) getPeers(topic string, count int, filter func(peer.ID) bool) []peer.ID {
tmap, ok := gs.p.topics[topic]
if !ok {
return nil
@ -1779,7 +1782,7 @@ func (gs *GossipSubRouter) getPeers(topic string, count int, filter func(peer.ID
peers := make([]peer.ID, 0, len(tmap))
for p := range tmap {
if (gs.peers[p] == GossipSubID_v10 || gs.peers[p] == GossipSubID_v11) && filter(p) {
if (gs.peers[p] == GossipSubID_v10 || gs.peers[p] == GossipSubID_v11 || gs.peers[p] == WakuRelayID_v200b2) && filter(p) {
peers = append(peers, p)
}
}

View File

@ -13,7 +13,7 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
pb "github.com/libp2p/go-libp2p-pubsub/pb"
pb "github.com/status-im/go-wakurelay-pubsub/pb"
logging "github.com/ipfs/go-log"
"github.com/libp2p/go-msgio/protoio"
@ -31,7 +31,7 @@ func TestGossipsubAttackSpamIWANT(t *testing.T) {
attacker := hosts[1]
// Set up gossipsub on the legit host
ps, err := NewGossipSub(ctx, legit)
ps, err := NewWakuRelaySub(ctx, legit)
if err != nil {
t.Fatal(err)
}
@ -148,7 +148,7 @@ func TestGossipsubAttackSpamIHAVE(t *testing.T) {
attacker := hosts[1]
// Set up gossipsub on the legit host
ps, err := NewGossipSub(ctx, legit,
ps, err := NewWakuRelaySub(ctx, legit,
WithPeerScore(
&PeerScoreParams{
AppSpecificScore: func(peer.ID) float64 { return 0 },
@ -222,7 +222,7 @@ func TestGossipsubAttackSpamIHAVE(t *testing.T) {
firstBatchCount := iwc
// the score should still be 0 because we haven't broken any promises yet
score := ps.rt.(*GossipSubRouter).score.Score(attacker.ID())
score := ps.rt.(*WakuRelaySubRouter).score.Score(attacker.ID())
if score != 0 {
t.Fatalf("Expected 0 score, but got %f", score)
}
@ -250,7 +250,7 @@ func TestGossipsubAttackSpamIHAVE(t *testing.T) {
time.Sleep(GossipSubIWantFollowupTime)
// The score should now be negative because of broken promises
score = ps.rt.(*GossipSubRouter).score.Score(attacker.ID())
score = ps.rt.(*WakuRelaySubRouter).score.Score(attacker.ID())
if score >= 0 {
t.Fatalf("Expected negative score, but got %f", score)
}
@ -281,7 +281,7 @@ func TestGossipsubAttackGRAFTNonExistentTopic(t *testing.T) {
attacker := hosts[1]
// Set up gossipsub on the legit host
ps, err := NewGossipSub(ctx, legit)
ps, err := NewWakuRelaySub(ctx, legit)
if err != nil {
t.Fatal(err)
}
@ -365,7 +365,7 @@ func TestGossipsubAttackGRAFTDuringBackoff(t *testing.T) {
attacker := hosts[1]
// Set up gossipsub on the legit host
ps, err := NewGossipSub(ctx, legit,
ps, err := NewWakuRelaySub(ctx, legit,
WithPeerScore(
&PeerScoreParams{
AppSpecificScore: func(peer.ID) float64 { return 0 },
@ -461,7 +461,7 @@ func TestGossipsubAttackGRAFTDuringBackoff(t *testing.T) {
t.Fatalf("Expected %d PRUNE messages but got %d", 1, pc)
}
score1 := ps.rt.(*GossipSubRouter).score.Score(attacker.ID())
score1 := ps.rt.(*WakuRelaySubRouter).score.Score(attacker.ID())
if score1 >= 0 {
t.Fatalf("Expected negative score, but got %f", score1)
}
@ -480,7 +480,7 @@ func TestGossipsubAttackGRAFTDuringBackoff(t *testing.T) {
t.Fatalf("Expected %d PRUNE messages but got %d", 2, pc)
}
score2 := ps.rt.(*GossipSubRouter).score.Score(attacker.ID())
score2 := ps.rt.(*WakuRelaySubRouter).score.Score(attacker.ID())
if score2 >= score1 {
t.Fatalf("Expected score below %f, but got %f", score1, score2)
}
@ -497,7 +497,7 @@ func TestGossipsubAttackGRAFTDuringBackoff(t *testing.T) {
t.Fatalf("Expected %d PRUNE messages but got %d", 3, pc)
}
score3 := ps.rt.(*GossipSubRouter).score.Score(attacker.ID())
score3 := ps.rt.(*WakuRelaySubRouter).score.Score(attacker.ID())
if score3 >= score2 {
t.Fatalf("Expected score below %f, but got %f", score2, score3)
}
@ -524,7 +524,7 @@ func TestGossipsubAttackGRAFTDuringBackoff(t *testing.T) {
// make sure we are _not_ in the mesh
res := make(chan bool)
ps.eval <- func() {
mesh := ps.rt.(*GossipSubRouter).mesh[mytopic]
mesh := ps.rt.(*WakuRelaySubRouter).mesh[mytopic]
_, inMesh := mesh[attacker.ID()]
res <- inMesh
}
@ -609,7 +609,7 @@ func TestGossipsubAttackInvalidMessageSpam(t *testing.T) {
// Set up gossipsub on the legit host
tracer := &gsAttackInvalidMsgTracer{}
ps, err := NewGossipSub(ctx, legit,
ps, err := NewWakuRelaySub(ctx, legit,
WithEventTracer(tracer),
WithPeerScore(params, thresholds),
)
@ -618,7 +618,7 @@ func TestGossipsubAttackInvalidMessageSpam(t *testing.T) {
}
attackerScore := func() float64 {
return ps.rt.(*GossipSubRouter).score.Score(attacker.ID())
return ps.rt.(*WakuRelaySubRouter).score.Score(attacker.ID())
}
// Subscribe to mytopic on the legit host

View File

@ -10,7 +10,7 @@ import (
"testing"
"time"
pb "github.com/libp2p/go-libp2p-pubsub/pb"
pb "github.com/status-im/go-wakurelay-pubsub/pb"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
@ -25,7 +25,7 @@ import (
)
func getGossipsub(ctx context.Context, h host.Host, opts ...Option) *PubSub {
ps, err := NewGossipSub(ctx, h, opts...)
ps, err := NewWakuRelaySub(ctx, h, opts...)
if err != nil {
panic(err)
}
@ -317,7 +317,7 @@ func TestGossipsubFanoutExpiry(t *testing.T) {
}
psubs[0].eval <- func() {
if len(psubs[0].rt.(*GossipSubRouter).fanout) == 0 {
if len(psubs[0].rt.(*WakuRelaySubRouter).fanout) == 0 {
t.Fatal("owner has no fanout")
}
}
@ -326,7 +326,7 @@ func TestGossipsubFanoutExpiry(t *testing.T) {
time.Sleep(time.Second * 2)
psubs[0].eval <- func() {
if len(psubs[0].rt.(*GossipSubRouter).fanout) > 0 {
if len(psubs[0].rt.(*WakuRelaySubRouter).fanout) > 0 {
t.Fatal("fanout hasn't expired")
}
}
@ -966,7 +966,7 @@ func TestGossipsubStarTopology(t *testing.T) {
// configure the center of the star with a very low D
psubs[0].eval <- func() {
gs := psubs[0].rt.(*GossipSubRouter)
gs := psubs[0].rt.(*WakuRelaySubRouter)
gs.D = 0
gs.Dlo = 0
gs.Dhi = 0
@ -1050,7 +1050,7 @@ func TestGossipsubStarTopologyWithSignedPeerRecords(t *testing.T) {
// configure the center of the star with a very low D
psubs[0].eval <- func() {
gs := psubs[0].rt.(*GossipSubRouter)
gs := psubs[0].rt.(*WakuRelaySubRouter)
gs.D = 0
gs.Dlo = 0
gs.Dhi = 0
@ -1223,7 +1223,7 @@ func TestGossipsubDirectPeersFanout(t *testing.T) {
// verify that h0 is in the fanout of h2, but not h1 who is a direct peer
result := make(chan bool, 2)
psubs[2].eval <- func() {
rt := psubs[2].rt.(*GossipSubRouter)
rt := psubs[2].rt.(*WakuRelaySubRouter)
fanout := rt.fanout["test"]
_, ok := fanout[h[0].ID()]
result <- ok
@ -1250,7 +1250,7 @@ func TestGossipsubDirectPeersFanout(t *testing.T) {
time.Sleep(2 * time.Second)
psubs[2].eval <- func() {
rt := psubs[2].rt.(*GossipSubRouter)
rt := psubs[2].rt.(*WakuRelaySubRouter)
mesh := rt.mesh["test"]
_, ok := mesh[h[0].ID()]
result <- ok
@ -1501,7 +1501,7 @@ func TestGossipsubScoreValidatorEx(t *testing.T) {
// a negative score (its message got rejected)
res := make(chan float64, 1)
psubs[0].eval <- func() {
res <- psubs[0].rt.(*GossipSubRouter).score.Score(hosts[1].ID())
res <- psubs[0].rt.(*WakuRelaySubRouter).score.Score(hosts[1].ID())
}
score := <-res
if score != 0 {
@ -1509,7 +1509,7 @@ func TestGossipsubScoreValidatorEx(t *testing.T) {
}
psubs[0].eval <- func() {
res <- psubs[0].rt.(*GossipSubRouter).score.Score(hosts[2].ID())
res <- psubs[0].rt.(*WakuRelaySubRouter).score.Score(hosts[2].ID())
}
score = <-res
if score >= 0 {
@ -1530,7 +1530,7 @@ func TestGossipsubPiggybackControl(t *testing.T) {
res := make(chan *RPC, 1)
ps.eval <- func() {
gs := ps.rt.(*GossipSubRouter)
gs := ps.rt.(*WakuRelaySubRouter)
test1 := "test1"
test2 := "test2"
test3 := "test3"
@ -1585,8 +1585,8 @@ func TestGossipsubMultipleGraftTopics(t *testing.T) {
secondPeer := hosts[1].ID()
p2Sub := psubs[1]
p1Router := psubs[0].rt.(*GossipSubRouter)
p2Router := psubs[1].rt.(*GossipSubRouter)
p1Router := psubs[0].rt.(*WakuRelaySubRouter)
p2Router := psubs[1].rt.(*WakuRelaySubRouter)
finChan := make(chan struct{})
@ -1721,7 +1721,7 @@ func TestGossipsubOpportunisticGrafting(t *testing.T) {
res := make(chan int, 1)
for _, ps := range psubs {
ps.eval <- func() {
gs := ps.rt.(*GossipSubRouter)
gs := ps.rt.(*WakuRelaySubRouter)
count := 0
for _, h := range hosts[:10] {
_, ok := gs.mesh["test"][h.ID()]

View File

@ -3,7 +3,7 @@ package pubsub
import (
"fmt"
pb "github.com/libp2p/go-libp2p-pubsub/pb"
pb "github.com/status-im/go-wakurelay-pubsub/pb"
"github.com/libp2p/go-libp2p-core/peer"
)

View File

@ -5,7 +5,7 @@ import (
"fmt"
"testing"
pb "github.com/libp2p/go-libp2p-pubsub/pb"
pb "github.com/status-im/go-wakurelay-pubsub/pb"
)
func TestMessageCache(t *testing.T) {

View File

@ -163,9 +163,9 @@ type peerGaterStats struct {
// interval.
func WithPeerGater(params *PeerGaterParams) Option {
return func(ps *PubSub) error {
gs, ok := ps.rt.(*GossipSubRouter)
gs, ok := ps.rt.(*WakuRelaySubRouter)
if !ok {
return fmt.Errorf("pubsub router is not gossipsub")
return fmt.Errorf("pubsub router is not wakurelaysub")
}
err := params.validate()

View File

@ -10,7 +10,7 @@ import (
"sync/atomic"
"time"
pb "github.com/libp2p/go-libp2p-pubsub/pb"
pb "github.com/status-im/go-wakurelay-pubsub/pb"
"github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/discovery"

View File

@ -146,9 +146,9 @@ type TopicScoreSnapshot struct {
// This option must be passed _after_ the WithPeerScore option.
func WithPeerScoreInspect(inspect interface{}, period time.Duration) Option {
return func(ps *PubSub) error {
gs, ok := ps.rt.(*GossipSubRouter)
gs, ok := ps.rt.(*WakuRelaySubRouter)
if !ok {
return fmt.Errorf("pubsub router is not gossipsub")
return fmt.Errorf("pubsub router is not wakurelaysub")
}
if gs.score == nil {
@ -232,7 +232,7 @@ func (ps *peerScore) SetTopicScoreParams(topic string, p *TopicScoreParams) erro
}
// router interface
func (ps *peerScore) Start(gs *GossipSubRouter) {
func (ps *peerScore) Start(gs *WakuRelaySubRouter) {
if ps == nil {
return
}

View File

@ -3,7 +3,7 @@ package pubsub
import (
"fmt"
pb "github.com/libp2p/go-libp2p-pubsub/pb"
pb "github.com/status-im/go-wakurelay-pubsub/pb"
"github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/peer"

View File

@ -3,7 +3,7 @@ package pubsub
import (
"testing"
pb "github.com/libp2p/go-libp2p-pubsub/pb"
pb "github.com/status-im/go-wakurelay-pubsub/pb"
"github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/peer"

View File

@ -4,7 +4,7 @@ import (
"errors"
"regexp"
pb "github.com/libp2p/go-libp2p-pubsub/pb"
pb "github.com/status-im/go-wakurelay-pubsub/pb"
"github.com/libp2p/go-libp2p-core/peer"
)

View File

@ -6,7 +6,7 @@ import (
"testing"
"time"
pb "github.com/libp2p/go-libp2p-pubsub/pb"
pb "github.com/status-im/go-wakurelay-pubsub/pb"
"github.com/libp2p/go-libp2p-core/peer"
)

View File

@ -69,7 +69,7 @@ func newTagTracer(cmgr connmgr.ConnManager) *tagTracer {
}
}
func (t *tagTracer) Start(gs *GossipSubRouter) {
func (t *tagTracer) Start(gs *WakuRelaySubRouter) {
if t == nil {
return
}

View File

@ -10,7 +10,7 @@ import (
connmgri "github.com/libp2p/go-libp2p-core/connmgr"
"github.com/libp2p/go-libp2p-core/peer"
pb "github.com/libp2p/go-libp2p-pubsub/pb"
pb "github.com/status-im/go-wakurelay-pubsub/pb"
)
func TestTagTracerMeshTags(t *testing.T) {

View File

@ -6,7 +6,7 @@ import (
"fmt"
"sync"
pb "github.com/libp2p/go-libp2p-pubsub/pb"
pb "github.com/status-im/go-wakurelay-pubsub/pb"
"github.com/libp2p/go-libp2p-core/peer"
)
@ -48,9 +48,9 @@ func (t *Topic) SetScoreParams(p *TopicScoreParams) error {
result := make(chan error, 1)
update := func() {
gs, ok := t.p.rt.(*GossipSubRouter)
gs, ok := t.p.rt.(*WakuRelaySubRouter)
if !ok {
result <- fmt.Errorf("pubsub router is not gossipsub")
result <- fmt.Errorf("pubsub router is not wakurelaysub")
return
}

View File

@ -6,7 +6,7 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
pb "github.com/libp2p/go-libp2p-pubsub/pb"
pb "github.com/status-im/go-wakurelay-pubsub/pb"
)
// Generic event tracer interface

View File

@ -11,7 +11,7 @@ import (
"testing"
"time"
pb "github.com/libp2p/go-libp2p-pubsub/pb"
pb "github.com/status-im/go-wakurelay-pubsub/pb"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"

View File

@ -9,7 +9,7 @@ import (
"sync"
"time"
pb "github.com/libp2p/go-libp2p-pubsub/pb"
pb "github.com/status-im/go-wakurelay-pubsub/pb"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"