33{-# LANGUAGE KindSignatures #-}
44{-# LANGUAGE NamedFieldPuns #-}
55{-# LANGUAGE ScopedTypeVariables #-}
6+ {-# LANGUAGE BangPatterns #-}
67
78module Ouroboros.Network.Protocol.ObjectDiffusion.Direct (directPipelined ) where
89
@@ -12,8 +13,25 @@ import Network.TypedProtocol.Proofs (Queue (..), enqueue)
1213
1314import Ouroboros.Network.Protocol.ObjectDiffusion.Inbound
1415import Ouroboros.Network.Protocol.ObjectDiffusion.Outbound
15- import Ouroboros.Network.Protocol.ObjectDiffusion.Type (BlockingReplyList (.. ),
16- SingBlockingStyle (.. ))
16+ import Ouroboros.Network.Protocol.ObjectDiffusion.Type (
17+ BlockingReplyList (.. ),
18+ SingBlockingStyle (.. ),
19+ NumObjectIdsReq (.. ),
20+ NumObjectIdsAck (.. ),
21+ )
22+
23+ import Control.Exception (assert )
24+ import Data.List.NonEmpty (NonEmpty (.. ))
25+ import Data.Sequence.Strict (StrictSeq )
26+ import Data.Map.Strict (Map )
27+ import Data.Word (Word16 )
28+ import Control.Tracer (Tracer , traceWith )
29+ import qualified Data.Sequence.Strict as Seq
30+ import qualified Data.Map.Strict as Map
31+ import qualified Data.Foldable as Foldable
32+ import Control.Monad (when )
33+ import Data.Set (Set )
34+ import qualified Data.Set as Set
1735
1836
1937directPipelined
@@ -64,3 +82,297 @@ directPipelined (ObjectDiffusionOutbound mOutbound)
6482 directSender q inbound' outbound
6583
6684 directSender EmptyQ (SendMsgDone v) _outbound = pure v
85+
86+ --
87+ -- Client implementation
88+ --
89+
90+ data TraceObjectDiffusionDirect objectId object =
91+ EventRecvMsgRequestObjectIds
92+ (StrictSeq objectId )
93+ (Map objectId object )
94+ [object ]
95+ NumObjectIdsAck
96+ NumObjectIdsReq
97+ | EventRecvMsgRequestObjects
98+ (StrictSeq objectId )
99+ (Map objectId object )
100+ [object ]
101+ [objectId ]
102+ deriving Show
103+
104+
105+ objectDiffusionClient
106+ :: forall objectId object m .
107+ (Ord objectId , Show objectId , Monad m )
108+ => Tracer m (TraceObjectDiffusionDirect objectId object )
109+ -> (object -> objectId )
110+ -> Word16 -- ^ Maximum number of unacknowledged object IDs allowed
111+ -> [object ]
112+ -> ObjectDiffusionOutbound objectId object m ()
113+ objectDiffusionClient tracer objectId maxUnacked =
114+ ObjectDiffusionOutbound . pure . client Seq. empty Map. empty
115+ where
116+ client :: StrictSeq objectId
117+ -> Map objectId object
118+ -> [object ]
119+ -> OutboundStIdle objectId object m ()
120+ client ! unackedSeq ! unackedMap remainingObjects =
121+ assert invariant
122+ OutboundStIdle {
123+ recvMsgRequestObjectIds,
124+ recvMsgRequestObjects,
125+ recvMsgDone
126+ }
127+ where
128+ invariant =
129+ Map. isSubmapOfBy
130+ (\ _ _ -> True )
131+ unackedMap
132+ (Map. fromList [ (x, () ) | x <- Foldable. toList unackedSeq ])
133+
134+ recvMsgRequestObjectIds :: forall blocking .
135+ SingBlockingStyle blocking
136+ -> NumObjectIdsAck
137+ -> NumObjectIdsReq
138+ -> m (OutboundStObjectIds blocking objectId object m () )
139+ recvMsgRequestObjectIds blocking ackNo reqNo = do
140+ traceWith tracer (EventRecvMsgRequestObjectIds unackedSeq unackedMap
141+ remainingObjects ackNo reqNo)
142+
143+ when (ackNo > fromIntegral (Seq. length unackedSeq)) $
144+ error $ " objectDiffusionClient.recvMsgRequestObjectIds: "
145+ <> " peer acknowledged more object IDs than possible"
146+
147+ when ( fromIntegral (Seq. length unackedSeq)
148+ - getNumObjectIdsAck ackNo
149+ + getNumObjectIdsReq reqNo
150+ > maxUnacked) $
151+ error $ " objectDiffusionClient.recvMsgRequestObjectIds: "
152+ <> " peer requested more object IDs than permitted"
153+
154+ let unackedSeq' = Seq. drop (fromIntegral ackNo) unackedSeq
155+ unackedMap' = Foldable. foldl' (flip Map. delete) unackedMap
156+ (Seq. take (fromIntegral ackNo) unackedSeq)
157+
158+ case blocking of
159+ SingBlocking | not (Seq. null unackedSeq')
160+ -> error $ " objectDiffusionClient.recvMsgRequestObjectIds: "
161+ <> " peer made a blocking request for more object IDs when "
162+ <> " there are still unacknowledged object IDs."
163+ _ -> return ()
164+
165+ -- This example is eager, it always provides as many as asked for,
166+ -- up to the number remaining available.
167+ let unackedExtra = take (fromIntegral reqNo) remainingObjects
168+ unackedSeq'' = unackedSeq'
169+ <> Seq. fromList (map objectId unackedExtra)
170+ unackedMap'' = unackedMap'
171+ <> Map. fromList [ (objectId obj, obj)
172+ | obj <- unackedExtra ]
173+ remainingObjects' = drop (fromIntegral reqNo) remainingObjects
174+
175+ return $! case (blocking, unackedExtra) of
176+ (SingBlocking , [] ) ->
177+ error " TODO"
178+
179+ (SingBlocking , obj: objs) ->
180+ SendMsgReplyObjectIds
181+ (BlockingReply (fmap objectId (obj :| objs)))
182+ (client unackedSeq'' unackedMap'' remainingObjects')
183+
184+ (SingNonBlocking , objs) ->
185+ SendMsgReplyObjectIds
186+ (NonBlockingReply (fmap objectId objs))
187+ (client unackedSeq'' unackedMap'' remainingObjects')
188+
189+ recvMsgRequestObjects :: [objectId ]
190+ -> m (OutboundStObjects objectId object m () )
191+ recvMsgRequestObjects objectIds = do
192+ traceWith tracer (EventRecvMsgRequestObjects unackedSeq unackedMap
193+ remainingObjects objectIds)
194+ case [ oid | oid <- objectIds, oid `Map.notMember` unackedMap ] of
195+ [] -> pure (SendMsgReplyObjects objects client')
196+ where
197+ objects = fmap (unackedMap Map. ! ) objectIds
198+ client' = client unackedSeq unackedMap' remainingObjects
199+ unackedMap' = foldr Map. delete unackedMap objectIds
200+ -- Here we remove from the map, while the seq stays unchanged.
201+ -- This enforces that each object can be requested at most once.
202+
203+ missing -> error $ " txSubmissionClientConst.recvMsgRequestTxs: "
204+ <> " requested missing TxIds: " ++ show missing
205+
206+ recvMsgDone :: m ()
207+ recvMsgDone = pure ()
208+
209+
210+ --
211+ -- Server implementation
212+ --
213+
214+ data ServerState objectId object = ServerState {
215+ requestedObjectIdsInFlight :: NumObjectIdsReq ,
216+ unacknowledgedObjectIds :: StrictSeq objectId ,
217+ availableObjectIds :: Set objectId ,
218+ bufferedObjects :: Map objectId (Maybe object ),
219+ numObjectsToAcknowledge :: NumObjectIdsAck
220+ }
221+ deriving Show
222+
223+ initialServerState :: ServerState txid tx
224+ initialServerState = ServerState 0 Seq. empty Set. empty Map. empty 0
225+
226+ data TraceEventServer txid tx =
227+ EventRequestTxIdsBlocking
228+ (ServerState txid tx )
229+ NumObjectIdsAck
230+ NumObjectIdsReq
231+ | EventRequestTxIdsPipelined
232+ (ServerState txid tx )
233+ NumObjectIdsAck
234+ NumObjectIdsReq
235+ | EventRequestTxsPipelined
236+ (ServerState txid tx )
237+ [txid ]
238+
239+
240+ objectDiffusionServer
241+ :: forall objectId object m .
242+ (Ord objectId , Monad m )
243+ => Tracer m (TraceEventServer objectId object )
244+ -> (object -> objectId )
245+ -> Word16 -- ^ Maximum number of unacknowledged object IDs allowed
246+ -> Word16 -- ^ Maximum number of object IDs to request in any one go
247+ -> Word16 -- ^ Maximum number of objects to request in any one go
248+ -> ObjectDiffusionInboundPipelined objectId object m [object ]
249+ objectDiffusionServer tracer txId maxUnackedTxIds maxTxIdsReq maxTxsReq =
250+ ObjectDiffusionInboundPipelined (serverIdle [] Zero initialServerState)
251+ where
252+ serverIdle :: forall (n :: N ).
253+ [object ]
254+ -> Nat n
255+ -> ServerState objectId object
256+ -> InboundStIdle Z objectId object m [object ]
257+ serverIdle accum Zero st =
258+ undefined
259+
260+ canRequestMoreObjects :: ServerState k tx -> Bool
261+ canRequestMoreObjects st =
262+ not (Set. null (availableObjectIds st))
263+
264+
265+ handleReply :: forall (n :: N ).
266+ [object ]
267+ -> Nat n
268+ -> ServerState objectId object
269+ -> Collect objectId object
270+ -> InboundStIdle n objectId object m [object ]
271+ handleReply accum n st (CollectObjectIds reqNo objectIds) =
272+ undefined
273+ -- -- Upon receiving a batch of new txids we extend our available set,
274+ -- -- and extended the unacknowledged sequence.
275+ -- serverIdle accum n st {
276+ -- requestedTxIdsInFlight = requestedTxIdsInFlight st - reqNo,
277+ -- unacknowledgedTxIds = unacknowledgedTxIds st
278+ -- <> Seq.fromList (map fst txids),
279+ -- availableTxids = availableTxids st
280+ -- <> Map.fromList txids
281+ -- }
282+ --
283+ -- handleReply accum n st (CollectTxs txids txs) =
284+ -- -- When we receive a batch of transactions, in general we get a subset of
285+ -- -- those that we asked for, with the remainder now deemed unnecessary.
286+ -- -- But we still have to acknowledge the txids we were given. This combined
287+ -- -- with the fact that we request txs out of order means our bufferedTxs
288+ -- -- has to track all the txids we asked for, even though not all have
289+ -- -- replies.
290+ -- --
291+ -- -- We have to update the unacknowledgedTxIds here eagerly and not delay it
292+ -- -- to serverReqTxs, otherwise we could end up blocking in serverIdle on
293+ -- -- more pipelined results rather than being able to move on.
294+ -- return $ serverIdle accum' n st {
295+ -- bufferedTxs = bufferedTxs'',
296+ -- unacknowledgedTxIds = unacknowledgedTxIds',
297+ -- numTxsToAcknowledge = numTxsToAcknowledge st
298+ -- + fromIntegral (Seq.length acknowledgedTxIds)
299+ -- }
300+ -- where
301+ -- txIdsRequestedWithTxsReceived :: [(txid, Maybe tx)]
302+ -- txIdsRequestedWithTxsReceived =
303+ -- [ (txid, mbTx)
304+ -- | let txsMap :: Map txid tx
305+ -- txsMap = Map.fromList [ (txId tx, tx) | tx <- txs ]
306+ -- , txid <- Map.keys txids
307+ -- , let !mbTx = Map.lookup txid txsMap
308+ -- ]
309+ --
310+ -- bufferedTxs' = bufferedTxs st
311+ -- <> Map.fromList txIdsRequestedWithTxsReceived
312+ --
313+ -- -- Check if having received more txs we can now confirm any (in strict
314+ -- -- order in the unacknowledgedTxIds sequence).
315+ -- (acknowledgedTxIds, unacknowledgedTxIds') =
316+ -- Seq.spanl (`Map.member` bufferedTxs') (unacknowledgedTxIds st)
317+ --
318+ -- -- If so we can add the acknowledged txs to our accumulating result
319+ -- accum' = accum
320+ -- ++ foldr (\txid r -> maybe r (:r) (bufferedTxs' Map.! txid)) []
321+ -- acknowledgedTxIds
322+ --
323+ -- -- And remove acknowledged txs from our buffer
324+ -- bufferedTxs'' = Foldable.foldl' (flip Map.delete) bufferedTxs' acknowledgedTxIds
325+ --
326+
327+ serverReqObjects :: forall (n :: N ).
328+ [object ]
329+ -> Nat n
330+ -> ServerState objectId object
331+ -> InboundStIdle n objectId object m [object ]
332+ serverReqObjects accum n st =
333+ undefined
334+ -- SendMsgRequestTxsPipelined
335+ -- txsToRequest
336+ -- (do traceWith tracer (EventRequestTxsPipelined st (Map.keys txsToRequest))
337+ -- pure $ serverReqTxIds accum (Succ n) st {
338+ -- availableTxids = availableTxids'
339+ -- })
340+ -- where
341+ -- -- This implementation is deliberately naive, we pick in an arbitrary
342+ -- -- order and up to a fixed limit. The real thing should take account of
343+ -- -- the expected transaction sizes, to pipeline well and keep within
344+ -- -- pipelining byte limits.
345+ -- (txsToRequest, availableTxids') =
346+ -- Map.splitAt (fromIntegral maxTxToRequest) (availableTxids st)
347+
348+
349+ serverReqObjectIds :: forall (n :: N ).
350+ [object ]
351+ -> Nat n
352+ -> ServerState objectId object
353+ -> InboundStIdle n objectId object m [object ]
354+ serverReqObjectIds accum n st =
355+ undefined
356+ -- | numTxIdsToRequest > 0
357+ -- = SendMsgRequestTxIdsPipelined
358+ -- (numTxsToAcknowledge st)
359+ -- numTxIdsToRequest
360+ -- (do traceWith tracer (EventRequestTxIdsPipelined st (numTxsToAcknowledge st) numTxIdsToRequest)
361+ -- pure $ serverIdle accum (Succ n) st {
362+ -- requestedTxIdsInFlight = requestedTxIdsInFlight st
363+ -- + numTxIdsToRequest,
364+ -- numTxsToAcknowledge = 0
365+ -- })
366+ --
367+ -- | otherwise
368+ -- = serverIdle accum n st
369+ -- where
370+ -- -- This definition is justified by the fact that the
371+ -- -- 'numTxsToAcknowledge' are not included in the 'unacknowledgedTxIds'.
372+ -- numTxIdsToRequest =
373+ -- NumTxIdsToReq $
374+ -- (maxUnacked
375+ -- - fromIntegral (Seq.length (unacknowledgedTxIds st))
376+ -- - getNumTxIdsToReq (requestedTxIdsInFlight st))
377+ -- `min` maxTxIdsToRequest
378+ --
0 commit comments