answer `RequestManager` queries from disk if possible (#6109)

When restarting beacon node, orphaned blocks remain in the database but
on startup, only the canonical chain as selected by fork choice loads.
When a new block is discovered that builds on top of an orphaned block,
the orphaned block is re-downloaded using sync/request manager, despite
it already being present on disk. Such queries can be answered locally
to improve discovery speed of alternate forks.
This commit is contained in:
Etan Kissling 2024-03-21 18:37:31 +01:00 committed by GitHub
parent 9256db2265
commit 6f466894ab
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 141 additions and 45 deletions

View File

@ -310,13 +310,14 @@ proc addBlobless*(
quarantine.missing.del(signedBlock.root)
true
func popBlobless*(quarantine: var Quarantine, root: Eth2Digest):
Opt[deneb.SignedBeaconBlock] =
func popBlobless*(
quarantine: var Quarantine,
root: Eth2Digest): Opt[ForkedSignedBeaconBlock] =
var blck: deneb.SignedBeaconBlock
if quarantine.blobless.pop(root, blck):
Opt.some(blck)
Opt.some(ForkedSignedBeaconBlock.init(blck))
else:
Opt.none(deneb.SignedBeaconBlock)
Opt.none(ForkedSignedBeaconBlock)
iterator peekBlobless*(quarantine: var Quarantine): deneb.SignedBeaconBlock =
for k, v in quarantine.blobless.mpairs():

View File

@ -303,15 +303,17 @@ proc processBlobSidecar*(
let block_root = hash_tree_root(block_header)
if (let o = self.quarantine[].popBlobless(block_root); o.isSome):
let blobless = o.unsafeGet()
if self.blobQuarantine[].hasBlobs(blobless):
self.blockProcessor[].enqueueBlock(
MsgSource.gossip,
ForkedSignedBeaconBlock.init(blobless),
Opt.some(self.blobQuarantine[].popBlobs(block_root, blobless)))
else:
discard self.quarantine[].addBlobless(self.dag.finalizedHead.slot,
blobless)
withBlck(blobless):
when consensusFork >= ConsensusFork.Deneb:
if self.blobQuarantine[].hasBlobs(forkyBlck):
self.blockProcessor[].enqueueBlock(
MsgSource.gossip, blobless,
Opt.some(self.blobQuarantine[].popBlobs(block_root, forkyBlck)))
else:
discard self.quarantine[].addBlobless(
self.dag.finalizedHead.slot, forkyBlck)
else:
raiseAssert "Could not have been added as blobless"
blob_sidecars_received.inc()
blob_sidecar_delay.observe(delay.toFloatSeconds())

View File

@ -419,6 +419,16 @@ proc initFullNode(
await blockProcessor[].addBlock(MsgSource.gossip, signedBlock,
Opt.none(BlobSidecars),
maybeFinalized = maybeFinalized)
rmanBlockLoader = proc(
blockRoot: Eth2Digest): Opt[ForkedTrustedSignedBeaconBlock] =
dag.getForkedBlock(blockRoot)
rmanBlobLoader = proc(
blobId: BlobIdentifier): Opt[ref BlobSidecar] =
var blob_sidecar = BlobSidecar.new()
if dag.db.getBlobSidecar(blobId.block_root, blobId.index, blob_sidecar[]):
Opt.some blob_sidecar
else:
Opt.none(ref BlobSidecar)
processor = Eth2Processor.new(
config.doppelgangerDetection,
@ -444,7 +454,8 @@ proc initFullNode(
requestManager = RequestManager.init(
node.network, dag.cfg.DENEB_FORK_EPOCH, getBeaconTime,
(proc(): bool = syncManager.inProgress),
quarantine, blobQuarantine, rmanBlockVerifier)
quarantine, blobQuarantine, rmanBlockVerifier,
rmanBlockLoader, rmanBlobLoader)
if node.config.lightClientDataServe:
proc scheduleSendingLightClientUpdates(slot: Slot) =

View File

@ -37,10 +37,19 @@ const
POLL_INTERVAL = 1.seconds
type
BlockVerifierFn* =
proc(signedBlock: ForkedSignedBeaconBlock, maybeFinalized: bool):
Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).}
InhibitFn* = proc: bool {.gcsafe, raises:[].}
BlockVerifierFn* = proc(
signedBlock: ForkedSignedBeaconBlock,
maybeFinalized: bool
): Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).}
BlockLoaderFn* = proc(
blockRoot: Eth2Digest
): Opt[ForkedTrustedSignedBeaconBlock] {.gcsafe, raises: [].}
BlobLoaderFn* = proc(
blobId: BlobIdentifier): Opt[ref BlobSidecar] {.gcsafe, raises: [].}
InhibitFn* = proc: bool {.gcsafe, raises: [].}
RequestManager* = object
network*: Eth2Node
@ -49,6 +58,8 @@ type
quarantine: ref Quarantine
blobQuarantine: ref BlobQuarantine
blockVerifier: BlockVerifierFn
blockLoader: BlockLoaderFn
blobLoader: BlobLoaderFn
blockLoopFuture: Future[void].Raising([CancelledError])
blobLoopFuture: Future[void].Raising([CancelledError])
@ -64,7 +75,9 @@ proc init*(T: type RequestManager, network: Eth2Node,
inhibit: InhibitFn,
quarantine: ref Quarantine,
blobQuarantine: ref BlobQuarantine,
blockVerifier: BlockVerifierFn): RequestManager =
blockVerifier: BlockVerifierFn,
blockLoader: BlockLoaderFn = nil,
blobLoader: BlobLoaderFn = nil): RequestManager =
RequestManager(
network: network,
getBeaconTime: getBeaconTime,
@ -72,7 +85,8 @@ proc init*(T: type RequestManager, network: Eth2Node,
quarantine: quarantine,
blobQuarantine: blobQuarantine,
blockVerifier: blockVerifier,
)
blockLoader: blockLoader,
blobLoader: blobLoader)
proc checkResponse(roots: openArray[Eth2Digest],
blocks: openArray[ref ForkedSignedBeaconBlock]): bool =
@ -96,12 +110,13 @@ proc checkResponse(idList: seq[BlobIdentifier],
let block_root = hash_tree_root(blob.signed_block_header.message)
var found = false
for id in idList:
if id.block_root == block_root and
id.index == blob.index:
found = true
break
if id.block_root == block_root and id.index == blob.index:
found = true
break
if not found:
return false
return false
blob[].verify_blob_sidecar_inclusion_proof().isOkOr:
return false
true
proc requestBlocksByRoot(rman: RequestManager, items: seq[Eth2Digest]) {.async: (raises: [CancelledError]).} =
@ -204,10 +219,10 @@ proc fetchBlobsFromNetwork(self: RequestManager,
curRoot = block_root
if (let o = self.quarantine[].popBlobless(curRoot); o.isSome):
let b = o.unsafeGet()
discard await self.blockVerifier(ForkedSignedBeaconBlock.init(b), false)
discard await self.blockVerifier(b, false)
# TODO:
# If appropriate, return a VerifierError.InvalidBlob from verification,
# check for it here, and penalize the peer accordingly.
# If appropriate, return a VerifierError.InvalidBlob from
# verification, check for it here, and penalize the peer accordingly
else:
debug "Blobs by root request failed",
peer = peer, blobs = shortLog(idList), err = blobs.error()
@ -217,7 +232,8 @@ proc fetchBlobsFromNetwork(self: RequestManager,
if not(isNil(peer)):
self.network.peerPool.release(peer)
proc requestManagerBlockLoop(rman: RequestManager) {.async: (raises: [CancelledError]).} =
proc requestManagerBlockLoop(
rman: RequestManager) {.async: (raises: [CancelledError]).} =
while true:
# TODO This polling could be replaced with an AsyncEvent that is fired
# from the quarantine when there's work to do
@ -226,24 +242,50 @@ proc requestManagerBlockLoop(rman: RequestManager) {.async: (raises: [CancelledE
if rman.inhibit():
continue
let blocks = mapIt(rman.quarantine[].checkMissing(
SYNC_MAX_REQUESTED_BLOCKS), it.root)
if blocks.len == 0:
let missingBlockRoots =
rman.quarantine[].checkMissing(SYNC_MAX_REQUESTED_BLOCKS).mapIt(it.root)
if missingBlockRoots.len == 0:
continue
debug "Requesting detected missing blocks", blocks = shortLog(blocks)
var blockRoots: seq[Eth2Digest]
if rman.blockLoader == nil:
blockRoots = missingBlockRoots
else:
var verifiers:
seq[Future[Result[void, VerifierError]].Raising([CancelledError])]
for blockRoot in missingBlockRoots:
let blck = rman.blockLoader(blockRoot).valueOr:
blockRoots.add blockRoot
continue
debug "Loaded orphaned block from storage", blockRoot
verifiers.add rman.blockVerifier(
blck.asSigned(), maybeFinalized = false)
try:
await allFutures(verifiers)
except CancelledError as exc:
var futs = newSeqOfCap[Future[void].Raising([])](verifiers.len)
for verifier in verifiers:
futs.add verifier.cancelAndWait()
await noCancel allFutures(futs)
raise exc
if blockRoots.len == 0:
continue
debug "Requesting detected missing blocks", blocks = shortLog(blockRoots)
let start = SyncMoment.now(0)
var workers: array[PARALLEL_REQUESTS, Future[void].Raising([CancelledError])]
var workers:
array[PARALLEL_REQUESTS, Future[void].Raising([CancelledError])]
for i in 0 ..< PARALLEL_REQUESTS:
workers[i] = rman.requestBlocksByRoot(blocks)
workers[i] = rman.requestBlocksByRoot(blockRoots)
await allFutures(workers)
let finish = SyncMoment.now(uint64(len(blocks)))
let finish = SyncMoment.now(uint64(len(blockRoots)))
debug "Request manager block tick", blocks = shortLog(blocks),
debug "Request manager block tick", blocks = shortLog(blockRoots),
sync_speed = speed(start, finish)
proc getMissingBlobs(rman: RequestManager): seq[BlobIdentifier] =
@ -280,7 +322,8 @@ proc getMissingBlobs(rman: RequestManager): seq[BlobIdentifier] =
rman.quarantine[].removeBlobless(blobless)
fetches
proc requestManagerBlobLoop(rman: RequestManager) {.async: (raises: [CancelledError]).} =
proc requestManagerBlobLoop(
rman: RequestManager) {.async: (raises: [CancelledError]).} =
while true:
# TODO This polling could be replaced with an AsyncEvent that is fired
# from the quarantine when there's work to do
@ -288,19 +331,58 @@ proc requestManagerBlobLoop(rman: RequestManager) {.async: (raises: [CancelledEr
if rman.inhibit():
continue
let fetches = rman.getMissingBlobs()
if fetches.len > 0:
debug "Requesting detected missing blobs", blobs = shortLog(fetches)
let missingBlobIds = rman.getMissingBlobs()
if missingBlobIds.len == 0:
continue
var blobIds: seq[BlobIdentifier]
if rman.blobLoader == nil:
blobIds = missingBlobIds
else:
var
blockRoots: seq[Eth2Digest]
curRoot: Eth2Digest
for blobId in missingBlobIds:
if blobId.block_root != curRoot:
curRoot = blobId.block_root
blockRoots.add curRoot
let blob_sidecar = rman.blobLoader(blobId).valueOr:
blobIds.add blobId
if blockRoots.len > 0 and blockRoots[^1] == curRoot:
# A blob is missing, remove from list of fully available blocks
discard blockRoots.pop()
continue
debug "Loaded orphaned blob from storage", blobId
rman.blobQuarantine[].put(blob_sidecar)
var verifiers = newSeqOfCap[
Future[Result[void, VerifierError]]
.Raising([CancelledError])](blockRoots.len)
for blockRoot in blockRoots:
let blck = rman.quarantine[].popBlobless(blockRoot).valueOr:
continue
verifiers.add rman.blockVerifier(blck, maybeFinalized = false)
try:
await allFutures(verifiers)
except CancelledError as exc:
var futs = newSeqOfCap[Future[void].Raising([])](verifiers.len)
for verifier in verifiers:
futs.add verifier.cancelAndWait()
await noCancel allFutures(futs)
raise exc
if blobIds.len > 0:
debug "Requesting detected missing blobs", blobs = shortLog(blobIds)
let start = SyncMoment.now(0)
var workers: array[PARALLEL_REQUESTS, Future[void].Raising([CancelledError])]
var workers:
array[PARALLEL_REQUESTS, Future[void].Raising([CancelledError])]
for i in 0 ..< PARALLEL_REQUESTS:
workers[i] = rman.fetchBlobsFromNetwork(fetches)
workers[i] = rman.fetchBlobsFromNetwork(blobIds)
await allFutures(workers)
let finish = SyncMoment.now(uint64(len(fetches)))
let finish = SyncMoment.now(uint64(len(blobIds)))
debug "Request manager blob tick",
blobs_count = len(fetches),
blobs_count = len(blobIds),
sync_speed = speed(start, finish)
proc start*(rman: var RequestManager) =