diff --git a/config.nims b/config.nims index 69f9485d4..e7b14c439 100644 --- a/config.nims +++ b/config.nims @@ -7,7 +7,6 @@ if dirExists("nimbledeps/pkgs2"): switch("warning", "CaseTransition:off") switch("warning", "ObservableStores:off") switch("warning", "LockLevel:off") ---define:chronosStrictException --styleCheck:usages switch("warningAsError", "UseBase:on") --styleCheck:error diff --git a/libp2p/multiaddress.nim b/libp2p/multiaddress.nim index e1ba09a0a..c0b9f8741 100644 --- a/libp2p/multiaddress.nim +++ b/libp2p/multiaddress.nim @@ -13,8 +13,8 @@ {.push public.} import pkg/chronos, chronicles -import std/[nativesockets, hashes] -import tables, strutils, sets, stew/shims/net +import std/[nativesockets, net, hashes] +import tables, strutils, sets import multicodec, multihash, multibase, transcoder, vbuffer, peerid, protobuf/minprotobuf, errors, utility import stew/[base58, base32, endians2, results] diff --git a/libp2p/multistream.nim b/libp2p/multistream.nim index 1c7449d6f..51931041a 100644 --- a/libp2p/multistream.nim +++ b/libp2p/multistream.nim @@ -266,6 +266,7 @@ proc addHandler*[E]( proc start*(m: MultistreamSelect) {.async: (raises: [CancelledError]).} = # Nim 1.6.18: Using `mapIt` results in a seq of `.Raising([])` + # TODO https://github.com/nim-lang/Nim/issues/23445 var futs = newSeqOfCap[Future[void].Raising([CancelledError])](m.handlers.len) for it in m.handlers: futs.add it.protocol.start() @@ -278,7 +279,7 @@ proc start*(m: MultistreamSelect) {.async: (raises: [CancelledError]).} = doAssert m.handlers.len == futs.len, "Handlers modified while starting" for i, fut in futs: if not fut.finished: - pending.add noCancel fut.cancelAndWait() + pending.add fut.cancelAndWait() elif fut.completed: pending.add m.handlers[i].protocol.stop() else: @@ -286,7 +287,6 @@ proc start*(m: MultistreamSelect) {.async: (raises: [CancelledError]).} = await noCancel allFutures(pending) raise exc - proc stop*(m: MultistreamSelect) {.async: (raises: []).} = # Nim 1.6.18: Using `mapIt` results in a seq of `.Raising([CancelledError])` var futs = newSeqOfCap[Future[void].Raising([])](m.handlers.len) diff --git a/libp2p/protocols/pubsub/gossipsub/behavior.nim b/libp2p/protocols/pubsub/gossipsub/behavior.nim index 76641fc64..7b6de15c6 100644 --- a/libp2p/protocols/pubsub/gossipsub/behavior.nim +++ b/libp2p/protocols/pubsub/gossipsub/behavior.nim @@ -30,7 +30,7 @@ declareGauge(libp2p_gossipsub_healthy_peers_topics, "number of topics in mesh wi declareCounter(libp2p_gossipsub_above_dhigh_condition, "number of above dhigh pruning branches ran", labels = ["topic"]) declareGauge(libp2p_gossipsub_received_iwants, "received iwants", labels = ["kind"]) -proc grafted*(g: GossipSub, p: PubSubPeer, topic: string) {.raises: [].} = +proc grafted*(g: GossipSub, p: PubSubPeer, topic: string) = g.withPeerStats(p.peerId) do (stats: var PeerStats): var info = stats.topicInfos.getOrDefault(topic) info.graftTime = Moment.now() @@ -46,7 +46,7 @@ proc pruned*(g: GossipSub, p: PubSubPeer, topic: string, setBackoff: bool = true, - backoff = none(Duration)) {.raises: [].} = + backoff = none(Duration)) = if setBackoff: let backoffDuration = backoff.get(g.parameters.pruneBackoff) @@ -70,7 +70,7 @@ proc pruned*(g: GossipSub, trace "pruned", peer=p, topic -proc handleBackingOff*(t: var BackoffTable, topic: string) {.raises: [].} = +proc handleBackingOff*(t: var BackoffTable, topic: string) = let now = Moment.now() var expired = toSeq(t.getOrDefault(topic).pairs()) expired.keepIf do (pair: tuple[peer: PeerId, expire: Moment]) -> bool: @@ -79,7 +79,7 @@ proc handleBackingOff*(t: var BackoffTable, topic: string) {.raises: [].} = t.withValue(topic, v): v[].del(peer) -proc peerExchangeList*(g: GossipSub, topic: string): seq[PeerInfoMsg] {.raises: [].} = +proc peerExchangeList*(g: GossipSub, topic: string): seq[PeerInfoMsg] = if not g.parameters.enablePX: return @[] var peers = g.gossipsub.getOrDefault(topic, initHashSet[PubSubPeer]()).toSeq() @@ -100,7 +100,7 @@ proc peerExchangeList*(g: GossipSub, topic: string): seq[PeerInfoMsg] {.raises: proc handleGraft*(g: GossipSub, peer: PubSubPeer, - grafts: seq[ControlGraft]): seq[ControlPrune] = # {.raises: [Defect].} TODO chronicles exception on windows + grafts: seq[ControlGraft]): seq[ControlPrune] = var prunes: seq[ControlPrune] for graft in grafts: let topic = graft.topicID @@ -205,7 +205,7 @@ proc getPeers(prune: ControlPrune, peer: PubSubPeer): seq[(PeerId, Option[PeerRe routingRecords -proc handlePrune*(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) {.raises: [].} = +proc handlePrune*(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) = for prune in prunes: let topic = prune.topicID @@ -239,7 +239,7 @@ proc handlePrune*(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) {.r proc handleIHave*(g: GossipSub, peer: PubSubPeer, - ihaves: seq[ControlIHave]): ControlIWant {.raises: [].} = + ihaves: seq[ControlIHave]): ControlIWant = var res: ControlIWant if peer.score < g.parameters.gossipThreshold: trace "ihave: ignoring low score peer", peer, score = peer.score @@ -273,7 +273,7 @@ proc handleIDontWant*(g: GossipSub, proc handleIWant*(g: GossipSub, peer: PubSubPeer, - iwants: seq[ControlIWant]): seq[Message] {.raises: [].} = + iwants: seq[ControlIWant]): seq[Message] = var messages: seq[Message] invalidRequests = 0 @@ -299,7 +299,7 @@ proc handleIWant*(g: GossipSub, messages.add(msg) return messages -proc commitMetrics(metrics: var MeshMetrics) {.raises: [].} = +proc commitMetrics(metrics: var MeshMetrics) = libp2p_gossipsub_low_peers_topics.set(metrics.lowPeersTopics) libp2p_gossipsub_no_peers_topics.set(metrics.noPeersTopics) libp2p_gossipsub_under_dout_topics.set(metrics.underDoutTopics) @@ -308,7 +308,7 @@ proc commitMetrics(metrics: var MeshMetrics) {.raises: [].} = libp2p_gossipsub_peers_per_topic_fanout.set(metrics.otherPeersPerTopicFanout, labelValues = ["other"]) libp2p_gossipsub_peers_per_topic_mesh.set(metrics.otherPeersPerTopicMesh, labelValues = ["other"]) -proc rebalanceMesh*(g: GossipSub, topic: string, metrics: ptr MeshMetrics = nil) {.raises: [].} = +proc rebalanceMesh*(g: GossipSub, topic: string, metrics: ptr MeshMetrics = nil) = logScope: topic mesh = g.mesh.peers(topic) @@ -538,7 +538,7 @@ proc rebalanceMesh*(g: GossipSub, topic: string, metrics: ptr MeshMetrics = nil) backoff: g.parameters.pruneBackoff.seconds.uint64)]))) g.broadcast(prunes, prune, isHighPriority = true) -proc dropFanoutPeers*(g: GossipSub) {.raises: [].} = +proc dropFanoutPeers*(g: GossipSub) = # drop peers that we haven't published to in # GossipSubFanoutTTL seconds let now = Moment.now() @@ -551,7 +551,7 @@ proc dropFanoutPeers*(g: GossipSub) {.raises: [].} = for topic in drops: g.lastFanoutPubSub.del topic -proc replenishFanout*(g: GossipSub, topic: string) {.raises: [].} = +proc replenishFanout*(g: GossipSub, topic: string) = ## get fanout peers for a topic logScope: topic trace "about to replenish fanout" @@ -567,7 +567,7 @@ proc replenishFanout*(g: GossipSub, topic: string) {.raises: [].} = trace "fanout replenished with peers", peers = g.fanout.peers(topic) -proc getGossipPeers*(g: GossipSub): Table[PubSubPeer, ControlMessage] {.raises: [].} = +proc getGossipPeers*(g: GossipSub): Table[PubSubPeer, ControlMessage] = ## gossip iHave messages to peers ## @@ -620,17 +620,16 @@ proc getGossipPeers*(g: GossipSub): Table[PubSubPeer, ControlMessage] {.raises: g.rng.shuffle(allPeers) allPeers.setLen(target) - let msgIdsAsSet = ihave.messageIDs.toHashSet() - for peer in allPeers: control.mgetOrPut(peer, ControlMessage()).ihave.add(ihave) - peer.sentIHaves[^1].incl(msgIdsAsSet) + for msgId in ihave.messageIDs: + peer.sentIHaves[^1].incl(msgId) libp2p_gossipsub_cache_window_size.set(cacheWindowSize.int64) return control -proc onHeartbeat(g: GossipSub) {.raises: [].} = +proc onHeartbeat(g: GossipSub) = # reset IWANT budget # reset IHAVE cap block: @@ -694,8 +693,6 @@ proc onHeartbeat(g: GossipSub) {.raises: [].} = g.mcache.shift() # shift the cache -# {.pop.} # raises [] - proc heartbeat*(g: GossipSub) {.async.} = heartbeat "GossipSub", g.parameters.heartbeatInterval: trace "running heartbeat", instance = cast[int](g) diff --git a/libp2p/protocols/pubsub/gossipsub/scoring.nim b/libp2p/protocols/pubsub/gossipsub/scoring.nim index 7cac93d93..edee4d2bc 100644 --- a/libp2p/protocols/pubsub/gossipsub/scoring.nim +++ b/libp2p/protocols/pubsub/gossipsub/scoring.nim @@ -87,8 +87,6 @@ proc colocationFactor(g: GossipSub, peer: PubSubPeer): float64 = else: 0.0 -{.pop.} - proc disconnectPeer*(g: GossipSub, peer: PubSubPeer) {.async.} = try: await g.switch.disconnect(peer.peerId) diff --git a/libp2p/protocols/pubsub/mcache.nim b/libp2p/protocols/pubsub/mcache.nim index da0d92fb5..f18ac1cb7 100644 --- a/libp2p/protocols/pubsub/mcache.nim +++ b/libp2p/protocols/pubsub/mcache.nim @@ -9,50 +9,57 @@ {.push raises: [].} -import std/[sets, tables, options] +import std/[sets, tables] import rpc/[messages] +import results -export sets, tables, messages, options +export sets, tables, messages, results type CacheEntry* = object - mid*: MessageId + msgId*: MessageId topic*: string MCache* = object of RootObj msgs*: Table[MessageId, Message] history*: seq[seq[CacheEntry]] + pos*: int windowSize*: Natural -func get*(c: MCache, mid: MessageId): Option[Message] = - if mid in c.msgs: - try: some(c.msgs[mid]) +func get*(c: MCache, msgId: MessageId): Opt[Message] = + if msgId in c.msgs: + try: Opt.some(c.msgs[msgId]) except KeyError: raiseAssert "checked" else: - none(Message) + Opt.none(Message) -func contains*(c: MCache, mid: MessageId): bool = - mid in c.msgs +func contains*(c: MCache, msgId: MessageId): bool = + msgId in c.msgs func put*(c: var MCache, msgId: MessageId, msg: Message) = if not c.msgs.hasKeyOrPut(msgId, msg): # Only add cache entry if the message was not already in the cache - c.history[0].add(CacheEntry(mid: msgId, topic: msg.topic)) + c.history[c.pos].add(CacheEntry(msgId: msgId, topic: msg.topic)) func window*(c: MCache, topic: string): HashSet[MessageId] = let len = min(c.windowSize, c.history.len) for i in 0..