diff --git a/lib/mobility-core/src/Kernel/Utils/SlidingWindowCounters.hs b/lib/mobility-core/src/Kernel/Utils/SlidingWindowCounters.hs index 3b1fbfede..7b5235e2c 100644 --- a/lib/mobility-core/src/Kernel/Utils/SlidingWindowCounters.hs +++ b/lib/mobility-core/src/Kernel/Utils/SlidingWindowCounters.hs @@ -23,6 +23,7 @@ module Kernel.Utils.SlidingWindowCounters getLatestRatio, getCurrentWindowCount, getCurrentWindowValues, + getCurrentWindowValuesWithTime, makeSlidingWindowKey, splitOnPeriodGranuality, incrementPeriod, @@ -32,13 +33,17 @@ module Kernel.Utils.SlidingWindowCounters where import qualified Control.Monad.Extra as CME +import qualified Data.Map.Strict as Map +import qualified Data.Set as Set import qualified Data.Text as T import Data.Time import qualified EulerHS.Language as L import EulerHS.Prelude hiding (id) import qualified Kernel.Storage.Hedis as Redis import Kernel.Types.SlidingWindowCounters +import Kernel.Types.Time (MonadClock) import Kernel.Types.TryException +import Kernel.Utils.Time (measureDuration) -- ========================== Helper functions ========================== @@ -104,18 +109,8 @@ makeQuickAccessWindowCountKey :: Text -> Text makeQuickAccessWindowCountKey = (<> "-sliding-window-result") makeTimeBasedKey :: PeriodType -> Text -> UTCTime -> Text -makeTimeBasedKey periodType oldKey = do - (\periodString -> oldKey <> "-" <> periodString) - . T.pack - . formatTime - defaultTimeLocale - ( case periodType of - Minutes -> "%Y-%m-%d-%H-%M" - Hours -> "%Y-%m-%d-%H" - Days -> "%Y-%m-%d" - Months -> "%Y-%m" - Years -> "%Y" - ) +makeTimeBasedKey periodType oldKey utcTime = + oldKey <> "-" <> periodBucketId periodType utcTime -- gives time in second convertPeriodTypeToSeconds :: PeriodType -> Integer @@ -130,9 +125,6 @@ convertPeriodTypeToSeconds periodType = getkeysForLastPeriods :: SlidingWindowOptions -> UTCTime -> (UTCTime -> Text) -> [Text] getkeysForLastPeriods swo utcTime keyModifier = map (makeSWKeyForTime swo utcTime keyModifier) [0 .. swo.period - 1] -getkeysUptoThisPeriod :: SlidingWindowOptions -> UTCTime -> (UTCTime -> Text) -> Integer -> [Text] -getkeysUptoThisPeriod swo utcTime keyModifier uptoPeriod = map (makeSWKeyForTime swo utcTime keyModifier) [0 .. uptoPeriod - 1] - makeSWKeyForTime :: SlidingWindowOptions -> UTCTime -> (UTCTime -> Text) -> Integer -> Text makeSWKeyForTime SlidingWindowOptions {..} utcTime keyModifier periodMagnitude = keyModifier @@ -143,6 +135,49 @@ makeSWKeyForTime SlidingWindowOptions {..} utcTime keyModifier periodMagnitude = getTimeUnit :: Integer getTimeUnit = -1 * periodMagnitude * convertPeriodTypeToSeconds periodType +-- ==================== Read-side combined-cache helpers ==================== + +mkCombinedKey :: Text -> Text +mkCombinedKey k = k <> "-sw-combined" + +mkReadLockKey :: Text -> Text +mkReadLockKey k = "SW-READ-LOCK-" <> k + +periodBucketId :: PeriodType -> UTCTime -> Text +periodBucketId periodType utcTime = + T.pack $ + formatTime defaultTimeLocale fmt utcTime + where + fmt = case periodType of + Minutes -> "%Y-%m-%d-%H-%M" + Hours -> "%Y-%m-%d-%H" + Days -> "%Y-%m-%d" + Months -> "%Y-%m" + Years -> "%Y" + +combinedTtl :: SlidingWindowOptions -> Integer +combinedTtl SlidingWindowOptions {..} = + (period + 1) * convertPeriodTypeToSeconds periodType + +decrementPeriod :: PeriodType -> UTCTime -> UTCTime +decrementPeriod periodType t@(UTCTime date time) = case periodType of + Minutes -> addUTCTime (-60) t + Hours -> addUTCTime (-3600) t + Days -> UTCTime (addDays (-1) date) time + Months -> UTCTime (addGregorianMonthsClip (-1) date) time + Years -> UTCTime (addGregorianYearsClip (-1) date) time + +inWindowBucketTimes :: PeriodType -> UTCTime -> Integer -> [UTCTime] +inWindowBucketTimes pt now n = + take (fromInteger n) (iterate (decrementPeriod pt) now) + +-- | All buckets covered by the window, head = current period (today), +-- oldest last. Pairs each bucket-id with its UTCTime so callers don't +-- have to re-derive the time from the id. +windowBuckets :: SlidingWindowOptions -> UTCTime -> [(UTCTime, Text)] +windowBuckets SlidingWindowOptions {..} now = + map (\t -> (t, periodBucketId periodType t)) (inWindowBucketTimes periodType now period) + -- ========================== Sliding Window Counters ========================== incrementWindowCount :: @@ -208,10 +243,15 @@ incrementByValueImpl mbTimeStamp val getOutOfWindowKey getStoredResultKey getWin now <- L.runIO getCurrentTime let utcTime = fromMaybe now mbTimeStamp finalKey = getWindowKey periodType key utcTime + expirationTime = (period + 1) * convertPeriodTypeToSeconds periodType + bid = periodBucketId periodType utcTime + isToday = bid == periodBucketId periodType now Redis.whenWithLockRedis (makeCachingLockKey key) 10 . void $ cacheTheCounts now val getOutOfWindowKey getStoredResultKey getWindowKey key swo - let expirationTime = (period + 1) * convertPeriodTypeToSeconds periodType void $ Redis.incrby finalKey val Redis.expire finalKey $ fromIntegral expirationTime + unless isToday $ + Redis.withWaitAndLockRedis (mkReadLockKey key) 10 5000 $ + Redis.hDel (mkCombinedKey key) [bid] decrementWindowCount :: ( L.MonadFlow m, @@ -253,10 +293,15 @@ decrementByValueImpl mbTimeStamp val getOutOfWindowKey getStoredResultKey getWin now <- L.runIO getCurrentTime let utcTime = fromMaybe now mbTimeStamp finalKey = getWindowKey periodType key utcTime + expirationTime = (period + 1) * convertPeriodTypeToSeconds periodType + bid = periodBucketId periodType utcTime + isToday = bid == periodBucketId periodType now Redis.whenWithLockRedis (makeCachingLockKey key) 10 . void $ cacheTheCounts now val getOutOfWindowKey getStoredResultKey getWindowKey key swo - let expirationTime = (period + 1) * convertPeriodTypeToSeconds periodType void $ Redis.decrby finalKey val Redis.expire finalKey $ fromIntegral expirationTime + unless isToday $ + Redis.withWaitAndLockRedis (mkReadLockKey key) 10 5000 $ + Redis.hDel (mkCombinedKey key) [bid] -- the cached value would stay correct for current and current + 1 peroid in any given periodType so keeping the expiry as end of (current + 1) periodType from now cacheTheCounts :: @@ -370,8 +415,11 @@ getNumDenFromCache postiveCaseKey totalCaseKey = totalCases <- MaybeT $ getCountsFromCache totalCaseKey pure (positiveCases, totalCases) +-- | Sum of the current window. Implemented on top of getCurrentWindowValues +-- so it benefits from the combined-cache fast path automatically. getCurrentWindowCount :: ( L.MonadFlow m, + MonadClock m, Redis.HedisFlow m r, TryException m ) => @@ -379,40 +427,152 @@ getCurrentWindowCount :: SlidingWindowOptions -> m Integer getCurrentWindowCount key swo = do - now <- L.runIO getCurrentTime - mbCountFromCache <- getCountsFromCache key - case mbCountFromCache of - Just count -> pure count - Nothing -> Redis.withWaitAndLockRedis (makeCachingLockKey key) 10 20 $ cacheTheCounts now 0 makeSWKeyForTime makeQuickAccessWindowCountKey makeSlidingWindowKey key swo + values <- getCurrentWindowValues key swo + pure $ sum (catMaybes (values :: [Maybe Integer])) +-- | Read the last `nPeriod` bucket values from the combined cache (with +-- today's value spliced in from the live per-period key). Index 0 is the +-- current period; index i is i periods ago. See module-top comment for the +-- cache semantics. getCurrentWindowValuesUptoLast :: ( L.MonadFlow m, + MonadClock m, Redis.HedisFlow m r, FromJSON a, + ToJSON a, + Num a, TryException m ) => Integer -> Text -> SlidingWindowOptions -> m [Maybe a] -getCurrentWindowValuesUptoLast nPeriod key swo = do - utcTime <- L.runIO getCurrentTime - let keysToFetch = getkeysUptoThisPeriod swo utcTime (makeSlidingWindowKey (periodType swo) key) nPeriod - mapM Redis.get keysToFetch +getCurrentWindowValuesUptoLast nPeriod key swo = + take (fromInteger nPeriod) . map snd <$> readWindowFromCombinedCache key swo +-- | Read all `period` bucket values from the combined cache. getCurrentWindowValues :: ( L.MonadFlow m, + MonadClock m, Redis.HedisFlow m r, FromJSON a, + ToJSON a, + Num a, TryException m ) => Text -> SlidingWindowOptions -> m [Maybe a] -getCurrentWindowValues key swo = do - utcTime <- L.runIO getCurrentTime - let keysToFetch = getkeysForLastPeriods swo utcTime $ makeSlidingWindowKey (periodType swo) key - mapM Redis.get keysToFetch +getCurrentWindowValues key swo = map snd <$> readWindowFromCombinedCache key swo + +-- | Same as 'getCurrentWindowValues' but each value is paired with the +-- 'UTCTime' of its bucket. Use this when the caller needs to label values +-- with their period (e.g. render dates on the frontend) so the values +-- don't have to be zipped against a separately-computed date list — the +-- two could drift across a midnight boundary or any ordering change. +getCurrentWindowValuesWithTime :: + ( L.MonadFlow m, + MonadClock m, + Redis.HedisFlow m r, + FromJSON a, + ToJSON a, + Num a, + TryException m + ) => + Text -> + SlidingWindowOptions -> + m [(UTCTime, Maybe a)] +getCurrentWindowValuesWithTime = readWindowFromCombinedCache + +-- | Combined-cache read path: build the in-window bucket list, serve +-- historical buckets from the combined HASH (repairing it if stale), +-- splice today live from the per-period key, then return values paired +-- with the 'UTCTime' of each bucket (head = current period, oldest last). +readWindowFromCombinedCache :: + ( L.MonadFlow m, + MonadClock m, + Redis.HedisFlow m r, + FromJSON a, + ToJSON a, + Num a, + TryException m + ) => + Text -> + SlidingWindowOptions -> + m [(UTCTime, Maybe a)] +readWindowFromCombinedCache key swo@SlidingWindowOptions {..} = do + now <- L.runIO getCurrentTime + let buckets = windowBuckets swo now + (todayBid, historicalBuckets) = case buckets of + (_, b) : rest -> (b, rest) + [] -> (periodBucketId periodType now, []) -- defensive; period >= 1 in practice + L.logDebug @Text "SlidingWindowCounters" $ "[lock-before] key=" <> key + (cached, durationMs) <- + measureDuration $ + Redis.withWaitAndLockRedis (mkReadLockKey key) 10 5000 $ + repairCombinedCacheIfStale key swo historicalBuckets + L.logDebug @Text "SlidingWindowCounters" $ + "[lock-after] key=" <> key + <> " took_ms=" + <> T.pack (show durationMs) + mbToday <- Redis.get (makeSlidingWindowKey periodType key now) + let merged = maybe cached (\v -> Map.insert todayBid v cached) mbToday + pure [(t, Map.lookup bid merged) | (t, bid) <- buckets] + +repairCombinedCacheIfStale :: + ( L.MonadFlow m, + Redis.HedisFlow m r, + FromJSON a, + ToJSON a, + Num a, + TryException m + ) => + Text -> + SlidingWindowOptions -> + [(UTCTime, Text)] -> + m (Map.Map Text a) +repairCombinedCacheIfStale key swo@SlidingWindowOptions {..} historicalBuckets = do + let combinedKey = mkCombinedKey key + historicalIds = map snd historicalBuckets + windowSet = Set.fromList historicalIds + ttl = combinedTtl swo + cur <- Map.fromList <$> Redis.hGetAll combinedKey + let stale = filter (`Set.notMember` windowSet) (Map.keys cur) + fetched <- fmap catMaybes . forM historicalBuckets $ \(t, bid) -> + if Map.member bid cur + then pure Nothing + else bootstrapBucket key periodType ttl (t, bid) + unless (null stale) $ do + Redis.hDel combinedKey stale + Redis.expire combinedKey (fromInteger ttl) + let dropped = foldr Map.delete cur stale + added = foldr (uncurry Map.insert) dropped fetched + pure added + +bootstrapBucket :: + ( L.MonadFlow m, + Redis.HedisFlow m r, + FromJSON a, + ToJSON a, + Num a, + TryException m + ) => + Text -> + PeriodType -> + Integer -> + (UTCTime, Text) -> + m (Maybe (Text, a)) +bootstrapBucket key periodType ttl (t, bid) = do + let combinedKey = mkCombinedKey key + oldKey = makeSlidingWindowKey periodType key t + mv <- Redis.get oldKey + case mv of + Just v -> do + Redis.hSetExp combinedKey bid v (fromInteger ttl) + pure $ Just (bid, v) + Nothing -> do + Redis.hSetExp combinedKey bid (0 :: Integer) (fromInteger ttl) + pure $ Just (bid, 0) deleteCurrentWindowValues :: ( L.MonadFlow m, @@ -426,3 +586,4 @@ deleteCurrentWindowValues key swo = do utcTime <- L.runIO getCurrentTime let keysToDel = getkeysForLastPeriods swo utcTime $ makeSlidingWindowKey (periodType swo) key mapM_ Redis.del keysToDel + Redis.del (mkCombinedKey key)