Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
221 changes: 191 additions & 30 deletions lib/mobility-core/src/Kernel/Utils/SlidingWindowCounters.hs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ module Kernel.Utils.SlidingWindowCounters
getLatestRatio,
getCurrentWindowCount,
getCurrentWindowValues,
getCurrentWindowValuesWithTime,
makeSlidingWindowKey,
splitOnPeriodGranuality,
incrementPeriod,
Expand All @@ -32,13 +33,17 @@ module Kernel.Utils.SlidingWindowCounters
where

import qualified Control.Monad.Extra as CME
import qualified Data.Map.Strict as Map
import qualified Data.Set as Set
import qualified Data.Text as T
import Data.Time
import qualified EulerHS.Language as L
import EulerHS.Prelude hiding (id)
import qualified Kernel.Storage.Hedis as Redis
import Kernel.Types.SlidingWindowCounters
import Kernel.Types.Time (MonadClock)
import Kernel.Types.TryException
import Kernel.Utils.Time (measureDuration)

-- ========================== Helper functions ==========================

Expand Down Expand Up @@ -104,18 +109,8 @@ makeQuickAccessWindowCountKey :: Text -> Text
makeQuickAccessWindowCountKey = (<> "-sliding-window-result")

makeTimeBasedKey :: PeriodType -> Text -> UTCTime -> Text
makeTimeBasedKey periodType oldKey = do
(\periodString -> oldKey <> "-" <> periodString)
. T.pack
. formatTime
defaultTimeLocale
( case periodType of
Minutes -> "%Y-%m-%d-%H-%M"
Hours -> "%Y-%m-%d-%H"
Days -> "%Y-%m-%d"
Months -> "%Y-%m"
Years -> "%Y"
)
makeTimeBasedKey periodType oldKey utcTime =
oldKey <> "-" <> periodBucketId periodType utcTime

-- gives time in second
convertPeriodTypeToSeconds :: PeriodType -> Integer
Expand All @@ -130,9 +125,6 @@ convertPeriodTypeToSeconds periodType =
getkeysForLastPeriods :: SlidingWindowOptions -> UTCTime -> (UTCTime -> Text) -> [Text]
getkeysForLastPeriods swo utcTime keyModifier = map (makeSWKeyForTime swo utcTime keyModifier) [0 .. swo.period - 1]

getkeysUptoThisPeriod :: SlidingWindowOptions -> UTCTime -> (UTCTime -> Text) -> Integer -> [Text]
getkeysUptoThisPeriod swo utcTime keyModifier uptoPeriod = map (makeSWKeyForTime swo utcTime keyModifier) [0 .. uptoPeriod - 1]

makeSWKeyForTime :: SlidingWindowOptions -> UTCTime -> (UTCTime -> Text) -> Integer -> Text
makeSWKeyForTime SlidingWindowOptions {..} utcTime keyModifier periodMagnitude =
keyModifier
Expand All @@ -143,6 +135,49 @@ makeSWKeyForTime SlidingWindowOptions {..} utcTime keyModifier periodMagnitude =
getTimeUnit :: Integer
getTimeUnit = -1 * periodMagnitude * convertPeriodTypeToSeconds periodType

-- ==================== Read-side combined-cache helpers ====================

mkCombinedKey :: Text -> Text
mkCombinedKey k = k <> "-sw-combined"

mkReadLockKey :: Text -> Text
mkReadLockKey k = "SW-READ-LOCK-" <> k

periodBucketId :: PeriodType -> UTCTime -> Text
periodBucketId periodType utcTime =
T.pack $
formatTime defaultTimeLocale fmt utcTime
where
fmt = case periodType of
Minutes -> "%Y-%m-%d-%H-%M"
Hours -> "%Y-%m-%d-%H"
Days -> "%Y-%m-%d"
Months -> "%Y-%m"
Years -> "%Y"

combinedTtl :: SlidingWindowOptions -> Integer
combinedTtl SlidingWindowOptions {..} =
(period + 1) * convertPeriodTypeToSeconds periodType

decrementPeriod :: PeriodType -> UTCTime -> UTCTime
decrementPeriod periodType t@(UTCTime date time) = case periodType of
Minutes -> addUTCTime (-60) t
Hours -> addUTCTime (-3600) t
Days -> UTCTime (addDays (-1) date) time
Months -> UTCTime (addGregorianMonthsClip (-1) date) time
Years -> UTCTime (addGregorianYearsClip (-1) date) time

inWindowBucketTimes :: PeriodType -> UTCTime -> Integer -> [UTCTime]
inWindowBucketTimes pt now n =
take (fromInteger n) (iterate (decrementPeriod pt) now)

-- | All buckets covered by the window, head = current period (today),
-- oldest last. Pairs each bucket-id with its UTCTime so callers don't
-- have to re-derive the time from the id.
windowBuckets :: SlidingWindowOptions -> UTCTime -> [(UTCTime, Text)]
windowBuckets SlidingWindowOptions {..} now =
map (\t -> (t, periodBucketId periodType t)) (inWindowBucketTimes periodType now period)

-- ========================== Sliding Window Counters ==========================

incrementWindowCount ::
Expand Down Expand Up @@ -208,10 +243,15 @@ incrementByValueImpl mbTimeStamp val getOutOfWindowKey getStoredResultKey getWin
now <- L.runIO getCurrentTime
let utcTime = fromMaybe now mbTimeStamp
finalKey = getWindowKey periodType key utcTime
expirationTime = (period + 1) * convertPeriodTypeToSeconds periodType
bid = periodBucketId periodType utcTime
isToday = bid == periodBucketId periodType now
Redis.whenWithLockRedis (makeCachingLockKey key) 10 . void $ cacheTheCounts now val getOutOfWindowKey getStoredResultKey getWindowKey key swo
let expirationTime = (period + 1) * convertPeriodTypeToSeconds periodType
void $ Redis.incrby finalKey val
Redis.expire finalKey $ fromIntegral expirationTime
unless isToday $
Redis.withWaitAndLockRedis (mkReadLockKey key) 10 5000 $
Redis.hDel (mkCombinedKey key) [bid]
Comment thread
coderabbitai[bot] marked this conversation as resolved.

decrementWindowCount ::
( L.MonadFlow m,
Expand Down Expand Up @@ -253,10 +293,15 @@ decrementByValueImpl mbTimeStamp val getOutOfWindowKey getStoredResultKey getWin
now <- L.runIO getCurrentTime
let utcTime = fromMaybe now mbTimeStamp
finalKey = getWindowKey periodType key utcTime
expirationTime = (period + 1) * convertPeriodTypeToSeconds periodType
bid = periodBucketId periodType utcTime
isToday = bid == periodBucketId periodType now
Redis.whenWithLockRedis (makeCachingLockKey key) 10 . void $ cacheTheCounts now val getOutOfWindowKey getStoredResultKey getWindowKey key swo
let expirationTime = (period + 1) * convertPeriodTypeToSeconds periodType
void $ Redis.decrby finalKey val
Redis.expire finalKey $ fromIntegral expirationTime
unless isToday $
Redis.withWaitAndLockRedis (mkReadLockKey key) 10 5000 $
Redis.hDel (mkCombinedKey key) [bid]
Comment thread
coderabbitai[bot] marked this conversation as resolved.

-- the cached value would stay correct for current and current + 1 peroid in any given periodType so keeping the expiry as end of (current + 1) periodType from now
cacheTheCounts ::
Expand Down Expand Up @@ -370,49 +415,164 @@ getNumDenFromCache postiveCaseKey totalCaseKey =
totalCases <- MaybeT $ getCountsFromCache totalCaseKey
pure (positiveCases, totalCases)

-- | Sum of the current window. Implemented on top of getCurrentWindowValues
-- so it benefits from the combined-cache fast path automatically.
getCurrentWindowCount ::
( L.MonadFlow m,
MonadClock m,
Redis.HedisFlow m r,
TryException m
) =>
Text ->
SlidingWindowOptions ->
m Integer
getCurrentWindowCount key swo = do
now <- L.runIO getCurrentTime
mbCountFromCache <- getCountsFromCache key
case mbCountFromCache of
Just count -> pure count
Nothing -> Redis.withWaitAndLockRedis (makeCachingLockKey key) 10 20 $ cacheTheCounts now 0 makeSWKeyForTime makeQuickAccessWindowCountKey makeSlidingWindowKey key swo
values <- getCurrentWindowValues key swo
pure $ sum (catMaybes (values :: [Maybe Integer]))

-- | Read the last `nPeriod` bucket values from the combined cache (with
-- today's value spliced in from the live per-period key). Index 0 is the
-- current period; index i is i periods ago. See module-top comment for the
-- cache semantics.
getCurrentWindowValuesUptoLast ::
( L.MonadFlow m,
MonadClock m,
Redis.HedisFlow m r,
FromJSON a,
ToJSON a,
Num a,
TryException m
) =>
Integer ->
Text ->
SlidingWindowOptions ->
m [Maybe a]
getCurrentWindowValuesUptoLast nPeriod key swo = do
utcTime <- L.runIO getCurrentTime
let keysToFetch = getkeysUptoThisPeriod swo utcTime (makeSlidingWindowKey (periodType swo) key) nPeriod
mapM Redis.get keysToFetch
getCurrentWindowValuesUptoLast nPeriod key swo =
take (fromInteger nPeriod) . map snd <$> readWindowFromCombinedCache key swo

-- | Read all `period` bucket values from the combined cache.
getCurrentWindowValues ::
( L.MonadFlow m,
MonadClock m,
Redis.HedisFlow m r,
FromJSON a,
ToJSON a,
Num a,
TryException m
) =>
Text ->
SlidingWindowOptions ->
m [Maybe a]
getCurrentWindowValues key swo = do
utcTime <- L.runIO getCurrentTime
let keysToFetch = getkeysForLastPeriods swo utcTime $ makeSlidingWindowKey (periodType swo) key
mapM Redis.get keysToFetch
getCurrentWindowValues key swo = map snd <$> readWindowFromCombinedCache key swo

-- | Same as 'getCurrentWindowValues' but each value is paired with the
-- 'UTCTime' of its bucket. Use this when the caller needs to label values
-- with their period (e.g. render dates on the frontend) so the values
-- don't have to be zipped against a separately-computed date list — the
-- two could drift across a midnight boundary or any ordering change.
getCurrentWindowValuesWithTime ::
( L.MonadFlow m,
MonadClock m,
Redis.HedisFlow m r,
FromJSON a,
ToJSON a,
Num a,
TryException m
) =>
Text ->
SlidingWindowOptions ->
m [(UTCTime, Maybe a)]
getCurrentWindowValuesWithTime = readWindowFromCombinedCache

-- | Combined-cache read path: build the in-window bucket list, serve
-- historical buckets from the combined HASH (repairing it if stale),
-- splice today live from the per-period key, then return values paired
-- with the 'UTCTime' of each bucket (head = current period, oldest last).
readWindowFromCombinedCache ::
( L.MonadFlow m,
MonadClock m,
Redis.HedisFlow m r,
FromJSON a,
ToJSON a,
Num a,
TryException m
) =>
Text ->
SlidingWindowOptions ->
m [(UTCTime, Maybe a)]
readWindowFromCombinedCache key swo@SlidingWindowOptions {..} = do
now <- L.runIO getCurrentTime
let buckets = windowBuckets swo now
(todayBid, historicalBuckets) = case buckets of
(_, b) : rest -> (b, rest)
[] -> (periodBucketId periodType now, []) -- defensive; period >= 1 in practice
L.logDebug @Text "SlidingWindowCounters" $ "[lock-before] key=" <> key
(cached, durationMs) <-
measureDuration $
Redis.withWaitAndLockRedis (mkReadLockKey key) 10 5000 $
repairCombinedCacheIfStale key swo historicalBuckets
L.logDebug @Text "SlidingWindowCounters" $
"[lock-after] key=" <> key
<> " took_ms="
<> T.pack (show durationMs)
mbToday <- Redis.get (makeSlidingWindowKey periodType key now)
let merged = maybe cached (\v -> Map.insert todayBid v cached) mbToday
pure [(t, Map.lookup bid merged) | (t, bid) <- buckets]

repairCombinedCacheIfStale ::
( L.MonadFlow m,
Redis.HedisFlow m r,
FromJSON a,
ToJSON a,
Num a,
TryException m
) =>
Text ->
SlidingWindowOptions ->
[(UTCTime, Text)] ->
m (Map.Map Text a)
repairCombinedCacheIfStale key swo@SlidingWindowOptions {..} historicalBuckets = do
let combinedKey = mkCombinedKey key
historicalIds = map snd historicalBuckets
windowSet = Set.fromList historicalIds
ttl = combinedTtl swo
cur <- Map.fromList <$> Redis.hGetAll combinedKey
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is already fetched once in repairCombinedCacheIfStale at line 521. also you already know the missing keys already, use that instead of whole historicalBuckets in this function.

please reuse things.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No bro, if two threads try to do the same thing then it will be a redunduct process right?
we have kept the lock after checking.
Okay bro I will move the lock to repairCombinedCacheIfStale function only

let stale = filter (`Set.notMember` windowSet) (Map.keys cur)
fetched <- fmap catMaybes . forM historicalBuckets $ \(t, bid) ->
if Map.member bid cur
then pure Nothing
else bootstrapBucket key periodType ttl (t, bid)
unless (null stale) $ do
Redis.hDel combinedKey stale
Redis.expire combinedKey (fromInteger ttl)
let dropped = foldr Map.delete cur stale
added = foldr (uncurry Map.insert) dropped fetched
pure added

bootstrapBucket ::
( L.MonadFlow m,
Redis.HedisFlow m r,
FromJSON a,
ToJSON a,
Num a,
TryException m
) =>
Text ->
PeriodType ->
Integer ->
(UTCTime, Text) ->
m (Maybe (Text, a))
bootstrapBucket key periodType ttl (t, bid) = do
let combinedKey = mkCombinedKey key
oldKey = makeSlidingWindowKey periodType key t
mv <- Redis.get oldKey
case mv of
Just v -> do
Redis.hSetExp combinedKey bid v (fromInteger ttl)
pure $ Just (bid, v)
Nothing -> do
Redis.hSetExp combinedKey bid (0 :: Integer) (fromInteger ttl)
pure $ Just (bid, 0)

deleteCurrentWindowValues ::
( L.MonadFlow m,
Expand All @@ -426,3 +586,4 @@ deleteCurrentWindowValues key swo = do
utcTime <- L.runIO getCurrentTime
let keysToDel = getkeysForLastPeriods swo utcTime $ makeSlidingWindowKey (periodType swo) key
mapM_ Redis.del keysToDel
Redis.del (mkCombinedKey key)
Loading