Better async IMO

This commit is contained in:
Tanguy 2022-09-05 10:32:26 +02:00
parent 9b855a176c
commit a4263867cc
No known key found for this signature in database
GPG Key ID: 7DD8EC6B6CE6C45E
2 changed files with 28 additions and 55 deletions

View File

@ -96,7 +96,7 @@ proc nonCancellableSend(
ws: WSSession,
data: seq[byte] = @[],
opcode: Opcode): Future[void]
{.async, raises: [Defect, WSClosedError].} =
{.async.} =
## Send a frame
##
@ -127,77 +127,50 @@ proc nonCancellableSend(
proc doSend(
ws: WSSession,
data: seq[byte] = @[],
opcode: Opcode,
fut: Future[void]): Future[void]
{.raises: [Defect, WSClosedError].} =
if fut.cancelled:
let sendFut = newFuture[void]("doSend")
sendFut.cancel()
return sendFut
opcode: Opcode
): Future[void] =
let
retFut = newFuture[void]("doSend")
sendFut = ws.nonCancellableSend(data, opcode)
let sendFut = ws.nonCancellableSend(data, opcode)
proc handleSend {.async.} =
try:
await sendFut
retFut.complete()
except CatchableError as exc:
retFut.fail(exc)
proc handleSent(future: pointer) =
if fut.finished:
return
if sendFut.failed:
fut.fail(sendFut.error)
elif sendFut.cancelled:
fut.cancel()
else:
fut.complete()
asyncSpawn handleSend()
retFut
sendFut.addCallback(handleSent)
sendFut
proc continueSending(ws: WSSession) {.gcsafe.} =
proc sendLoop(ws: WSSession) {.gcsafe, async.} =
while ws.sendQueue.len > 0:
let
task = ws.sendQueue.popFirst()
fut = task.fut
sendFut =
try:
ws.doSend(task.data, task.opcode, fut)
except WSClosedError as exc:
fut.fail(exc)
continue
let task = ws.sendQueue.popFirst()
ws.sendFut = sendFut
proc handleSent(future: pointer) =
ws.sendFut = nil
ws.continueSending()
ws.sendFut.addCallback(handleSent)
break
try:
await ws.doSend(task.data, task.opcode)
task.fut.complete()
except CatchableError as exc:
task.fut.fail(exc)
proc send*(
ws: WSSession,
data: seq[byte] = @[],
opcode: Opcode): Future[void]
{.raises: [Defect, WSClosedError].} =
let fut = newFuture[void]("send")
opcode: Opcode): Future[void] =
if opcode.isControl:
# Control frames (see Section 5.5) MAY be injected in the middle of
# a fragmented message. Control frames themselves MUST NOT be
# fragmented.
# See RFC 6455 Section 5.4 Fragmentation
discard ws.doSend(data, opcode, fut)
return fut
return ws.doSend(data, opcode)
if ws.sendFut != nil:
ws.sendQueue.addLast (data: data, opcode: opcode, fut: fut)
return fut
let fut = newFuture[void]("send")
ws.sendFut = ws.doSend(data, opcode, fut)
ws.sendQueue.addLast (data: data, opcode: opcode, fut: fut)
proc handleSent(future: pointer) =
ws.sendFut = nil
ws.continueSending()
if isNil(ws.sendLoop) or ws.sendLoop.finished:
ws.sendLoop = sendLoop(ws)
ws.sendFut.addCallback(handleSent)
fut
proc send*(

View File

@ -106,7 +106,7 @@ type
# fragments of another message unless an extension has been
# negotiated that can interpret the interleaving.
# See RFC 6455 Section 5.4 Fragmentation
sendFut*: Future[void]
sendLoop*: Future[void]
sendQueue*: Deque[
tuple[data: seq[byte], opcode: Opcode, fut: Future[void]]]