diff --git a/go.mod b/go.mod index b3e11c03f..1c30ec092 100644 --- a/go.mod +++ b/go.mod @@ -82,7 +82,7 @@ require ( github.com/cenkalti/backoff/v4 v4.2.1 github.com/gorilla/sessions v1.2.1 github.com/ipfs/go-log/v2 v2.5.1 - github.com/jellydator/ttlcache/v3 v3.1.0 + github.com/jellydator/ttlcache/v3 v3.2.0 github.com/jmoiron/sqlx v1.3.5 github.com/ladydascalie/currency v1.6.0 github.com/meirf/gopart v0.0.0-20180520194036-37e9492a85a8 diff --git a/go.sum b/go.sum index 9b774534a..466cb1c7a 100644 --- a/go.sum +++ b/go.sum @@ -1222,8 +1222,8 @@ github.com/jbenet/go-temp-err-catcher v0.1.0 h1:zpb3ZH6wIE8Shj2sKS+khgRvf7T7RABo github.com/jbenet/go-temp-err-catcher v0.1.0/go.mod h1:0kJRvmDZXNMIiJirNPEYfhpPwbGVtZVWC34vc5WLsDk= github.com/jedisct1/go-minisign v0.0.0-20190909160543-45766022959e/go.mod h1:G1CVv03EnqU1wYL2dFwXxW2An0az9JTl/ZsqXQeBlkU= github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU= -github.com/jellydator/ttlcache/v3 v3.1.0 h1:0gPFG0IHHP6xyUyXq+JaD8fwkDCqgqwohXNJBcYE71g= -github.com/jellydator/ttlcache/v3 v3.1.0/go.mod h1:hi7MGFdMAwZna5n2tuvh63DvFLzVKySzCVW6+0gA2n4= +github.com/jellydator/ttlcache/v3 v3.2.0 h1:6lqVJ8X3ZaUwvzENqPAobDsXNExfUJd61u++uW8a3LE= +github.com/jellydator/ttlcache/v3 v3.2.0/go.mod h1:hi7MGFdMAwZna5n2tuvh63DvFLzVKySzCVW6+0gA2n4= github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jessevdk/go-flags v1.5.0/go.mod h1:Fw0T6WPc1dYxT4mKEZRfG5kJhaTDP9pj1c2EWnYs/m4= diff --git a/vendor/github.com/jellydator/ttlcache/v3/cache.go b/vendor/github.com/jellydator/ttlcache/v3/cache.go index 93943da22..1ad3afbec 100644 --- a/vendor/github.com/jellydator/ttlcache/v3/cache.go +++ b/vendor/github.com/jellydator/ttlcache/v3/cache.go @@ -148,6 +148,10 @@ func (c *Cache[K, V]) set(key K, value V, ttl time.Duration) *Item[K, V] { c.evict(EvictionReasonCapacityReached, c.items.lru.Back()) } + if ttl == PreviousOrDefaultTTL { + ttl = c.options.ttl + } + // create a new item item := newItem(key, value, ttl, c.options.enableVersionTracking) elem = c.items.lru.PushFront(item) @@ -478,6 +482,13 @@ func (c *Cache[K, V]) Items() map[K]*Item[K, V] { // Range stops the iteration. func (c *Cache[K, V]) Range(fn func(item *Item[K, V]) bool) { c.items.mu.RLock() + + // Check if cache is empty + if c.items.lru.Len() == 0 { + c.items.mu.RUnlock() + return + } + for item := c.items.lru.Front(); item != c.items.lru.Back().Next(); item = item.Next() { i := item.Value.(*Item[K, V]) c.items.mu.RUnlock() diff --git a/vendor/github.com/jellydator/ttlcache/v3/item.go b/vendor/github.com/jellydator/ttlcache/v3/item.go index 72568e07e..c3c26cf6b 100644 --- a/vendor/github.com/jellydator/ttlcache/v3/item.go +++ b/vendor/github.com/jellydator/ttlcache/v3/item.go @@ -9,6 +9,10 @@ const ( // NoTTL indicates that an item should never expire. NoTTL time.Duration = -1 + // PreviousOrDefaultTTL indicates that existing TTL of item should be used + // default TTL will be used as fallback if item doesn't exist + PreviousOrDefaultTTL time.Duration = -2 + // DefaultTTL indicates that the default TTL value of the cache // instance should be used. DefaultTTL time.Duration = 0 @@ -58,17 +62,23 @@ func (item *Item[K, V]) update(value V, ttl time.Duration) { defer item.mu.Unlock() item.value = value + + // update version if enabled + if item.version > -1 { + item.version++ + } + + // no need to update ttl or expiry in this case + if ttl == PreviousOrDefaultTTL { + return + } + item.ttl = ttl // reset expiration timestamp because the new TTL may be // 0 or below item.expiresAt = time.Time{} item.touchUnsafe() - - // update version if enabled - if item.version > -1 { - item.version++ - } } // touch updates the item's expiration timestamp. diff --git a/vendor/modules.txt b/vendor/modules.txt index 69bb273f2..d25103fce 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -436,7 +436,7 @@ github.com/jackpal/go-nat-pmp # github.com/jbenet/go-temp-err-catcher v0.1.0 ## explicit; go 1.13 github.com/jbenet/go-temp-err-catcher -# github.com/jellydator/ttlcache/v3 v3.1.0 +# github.com/jellydator/ttlcache/v3 v3.2.0 ## explicit; go 1.18 github.com/jellydator/ttlcache/v3 # github.com/jinzhu/copier v0.0.0-20190924061706-b57f9002281a diff --git a/wakuv2/waku.go b/wakuv2/waku.go index d540b8e7b..60fc25fa6 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -32,6 +32,7 @@ import ( "sync" "time" + "github.com/jellydator/ttlcache/v3" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/p2p/protocol/identify" "github.com/multiformats/go-multiaddr" @@ -78,6 +79,7 @@ const messageQueueLimit = 1024 const requestTimeout = 30 * time.Second const bootnodesQueryBackoffMs = 200 const bootnodesMaxRetries = 7 +const cacheTTL = 20 * time.Minute type ITelemetryClient interface { PushReceivedEnvelope(*protocol.Envelope) @@ -101,9 +103,9 @@ type Waku struct { symKeys map[string][]byte // Symmetric key storage keyMu sync.RWMutex // Mutex associated with key stores - envelopes map[gethcommon.Hash]*common.ReceivedMessage // Pool of envelopes currently tracked by this node - expirations map[uint32]mapset.Set // Message expiration pool - poolMu sync.RWMutex // Mutex to sync the message and expiration pools + envelopeCache *ttlcache.Cache[gethcommon.Hash, *common.ReceivedMessage] // Pool of envelopes currently tracked by this node + expirations map[uint32]mapset.Set // Message expiration pool + poolMu sync.RWMutex // Mutex to sync the message and expiration pools bandwidthCounter *metrics.BandwidthCounter @@ -155,6 +157,12 @@ func (w *Waku) SetStatusTelemetryClient(client ITelemetryClient) { w.statusTelemetryClient = client } +func newTTLCache() *ttlcache.Cache[gethcommon.Hash, *common.ReceivedMessage] { + cache := ttlcache.New[gethcommon.Hash, *common.ReceivedMessage](ttlcache.WithTTL[gethcommon.Hash, *common.ReceivedMessage](cacheTTL)) + go cache.Start() + return cache +} + // New creates a WakuV2 client ready to communicate through the LibP2P network. func New(nodeKey string, fleet string, cfg *Config, logger *zap.Logger, appDB *sql.DB, ts *timesource.NTPTimeSource, onHistoricMessagesRequestFailed func([]byte, peer.ID, error), onPeerStats func(types.ConnStatus)) (*Waku, error) { var err error @@ -183,7 +191,7 @@ func New(nodeKey string, fleet string, cfg *Config, logger *zap.Logger, appDB *s cfg: cfg, privateKeys: make(map[string]*ecdsa.PrivateKey), symKeys: make(map[string][]byte), - envelopes: make(map[gethcommon.Hash]*common.ReceivedMessage), + envelopeCache: newTTLCache(), expirations: make(map[uint32]mapset.Set), msgQueue: make(chan *common.ReceivedMessage, messageQueueLimit), sendQueue: make(chan *protocol.Envelope, 1000), @@ -1035,7 +1043,7 @@ func (w *Waku) Send(pubsubTopic string, msg *pb.WakuMessage) ([]byte, error) { w.sendQueue <- envelope w.poolMu.Lock() - _, alreadyCached := w.envelopes[gethcommon.BytesToHash(envelope.Hash())] + alreadyCached := w.envelopeCache.Has(gethcommon.BytesToHash(envelope.Hash())) w.poolMu.Unlock() if !alreadyCached { recvMessage := common.NewReceivedMessage(envelope, common.RelayedMessageType) @@ -1280,6 +1288,8 @@ func (w *Waku) setupRelaySubscriptions() error { func (w *Waku) Stop() error { w.cancel() + w.envelopeCache.Stop() + err := w.identifyService.Close() if err != nil { return err @@ -1344,21 +1354,19 @@ func (w *Waku) OnNewEnvelopes(envelope *protocol.Envelope, msgType common.Messag // addEnvelope adds an envelope to the envelope map, used for sending func (w *Waku) addEnvelope(envelope *common.ReceivedMessage) { - hash := envelope.Hash() - w.poolMu.Lock() - w.envelopes[hash] = envelope + w.envelopeCache.Set(envelope.Hash(), envelope, ttlcache.DefaultTTL) w.poolMu.Unlock() } func (w *Waku) add(recvMessage *common.ReceivedMessage, processImmediately bool) (bool, error) { common.EnvelopesReceivedCounter.Inc() - hash := recvMessage.Hash() - w.poolMu.Lock() - envelope, alreadyCached := w.envelopes[hash] + envelope := w.envelopeCache.Get(recvMessage.Hash()) + alreadyCached := envelope != nil w.poolMu.Unlock() + if !alreadyCached { recvMessage.Processed.Store(false) w.addEnvelope(recvMessage) @@ -1375,7 +1383,7 @@ func (w *Waku) add(recvMessage *common.ReceivedMessage, processImmediately bool) common.EnvelopesSizeMeter.Observe(float64(len(recvMessage.Envelope.Message().Payload))) } - if !alreadyCached || !envelope.Processed.Load() { + if !alreadyCached || !envelope.Value().Processed.Load() { if processImmediately { logger.Debug("immediately processing envelope") w.processReceivedMessage(recvMessage) @@ -1444,24 +1452,18 @@ func (w *Waku) processReceivedMessage(e *common.ReceivedMessage) { }) } -// Envelopes retrieves all the messages currently pooled by the node. -func (w *Waku) Envelopes() []*common.ReceivedMessage { - w.poolMu.RLock() - defer w.poolMu.RUnlock() - - all := make([]*common.ReceivedMessage, 0, len(w.envelopes)) - for _, envelope := range w.envelopes { - all = append(all, envelope) - } - return all -} - // GetEnvelope retrieves an envelope from the message queue by its hash. // It returns nil if the envelope can not be found. func (w *Waku) GetEnvelope(hash gethcommon.Hash) *common.ReceivedMessage { w.poolMu.RLock() defer w.poolMu.RUnlock() - return w.envelopes[hash] + + envelope := w.envelopeCache.Get(hash) + if envelope == nil { + return nil + } + + return envelope.Value() } // isEnvelopeCached checks if envelope with specific hash has already been received and cached. @@ -1469,14 +1471,15 @@ func (w *Waku) IsEnvelopeCached(hash gethcommon.Hash) bool { w.poolMu.Lock() defer w.poolMu.Unlock() - _, exist := w.envelopes[hash] - return exist + return w.envelopeCache.Has(hash) } func (w *Waku) ClearEnvelopesCache() { w.poolMu.Lock() defer w.poolMu.Unlock() - w.envelopes = make(map[gethcommon.Hash]*common.ReceivedMessage) + + w.envelopeCache.Stop() + w.envelopeCache = newTTLCache() } func (w *Waku) PeerCount() int {