-
Notifications
You must be signed in to change notification settings - Fork 9
update_sliding_window #1307
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
update_sliding_window #1307
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -23,6 +23,7 @@ module Kernel.Utils.SlidingWindowCounters | |
| getLatestRatio, | ||
| getCurrentWindowCount, | ||
| getCurrentWindowValues, | ||
| getCurrentWindowValuesWithTime, | ||
| makeSlidingWindowKey, | ||
| splitOnPeriodGranuality, | ||
| incrementPeriod, | ||
|
|
@@ -32,13 +33,17 @@ module Kernel.Utils.SlidingWindowCounters | |
| where | ||
|
|
||
| import qualified Control.Monad.Extra as CME | ||
| import qualified Data.Map.Strict as Map | ||
| import qualified Data.Set as Set | ||
| import qualified Data.Text as T | ||
| import Data.Time | ||
| import qualified EulerHS.Language as L | ||
| import EulerHS.Prelude hiding (id) | ||
| import qualified Kernel.Storage.Hedis as Redis | ||
| import Kernel.Types.SlidingWindowCounters | ||
| import Kernel.Types.Time (MonadClock) | ||
| import Kernel.Types.TryException | ||
| import Kernel.Utils.Time (measureDuration) | ||
|
|
||
| -- ========================== Helper functions ========================== | ||
|
|
||
|
|
@@ -104,18 +109,8 @@ makeQuickAccessWindowCountKey :: Text -> Text | |
| makeQuickAccessWindowCountKey = (<> "-sliding-window-result") | ||
|
|
||
| makeTimeBasedKey :: PeriodType -> Text -> UTCTime -> Text | ||
| makeTimeBasedKey periodType oldKey = do | ||
| (\periodString -> oldKey <> "-" <> periodString) | ||
| . T.pack | ||
| . formatTime | ||
| defaultTimeLocale | ||
| ( case periodType of | ||
| Minutes -> "%Y-%m-%d-%H-%M" | ||
| Hours -> "%Y-%m-%d-%H" | ||
| Days -> "%Y-%m-%d" | ||
| Months -> "%Y-%m" | ||
| Years -> "%Y" | ||
| ) | ||
| makeTimeBasedKey periodType oldKey utcTime = | ||
| oldKey <> "-" <> periodBucketId periodType utcTime | ||
|
|
||
| -- gives time in second | ||
| convertPeriodTypeToSeconds :: PeriodType -> Integer | ||
|
|
@@ -130,9 +125,6 @@ convertPeriodTypeToSeconds periodType = | |
| getkeysForLastPeriods :: SlidingWindowOptions -> UTCTime -> (UTCTime -> Text) -> [Text] | ||
| getkeysForLastPeriods swo utcTime keyModifier = map (makeSWKeyForTime swo utcTime keyModifier) [0 .. swo.period - 1] | ||
|
|
||
| getkeysUptoThisPeriod :: SlidingWindowOptions -> UTCTime -> (UTCTime -> Text) -> Integer -> [Text] | ||
| getkeysUptoThisPeriod swo utcTime keyModifier uptoPeriod = map (makeSWKeyForTime swo utcTime keyModifier) [0 .. uptoPeriod - 1] | ||
|
|
||
| makeSWKeyForTime :: SlidingWindowOptions -> UTCTime -> (UTCTime -> Text) -> Integer -> Text | ||
| makeSWKeyForTime SlidingWindowOptions {..} utcTime keyModifier periodMagnitude = | ||
| keyModifier | ||
|
|
@@ -143,6 +135,49 @@ makeSWKeyForTime SlidingWindowOptions {..} utcTime keyModifier periodMagnitude = | |
| getTimeUnit :: Integer | ||
| getTimeUnit = -1 * periodMagnitude * convertPeriodTypeToSeconds periodType | ||
|
|
||
| -- ==================== Read-side combined-cache helpers ==================== | ||
|
|
||
| mkCombinedKey :: Text -> Text | ||
| mkCombinedKey k = k <> "-sw-combined" | ||
|
|
||
| mkReadLockKey :: Text -> Text | ||
| mkReadLockKey k = "SW-READ-LOCK-" <> k | ||
|
|
||
| periodBucketId :: PeriodType -> UTCTime -> Text | ||
| periodBucketId periodType utcTime = | ||
| T.pack $ | ||
| formatTime defaultTimeLocale fmt utcTime | ||
| where | ||
| fmt = case periodType of | ||
| Minutes -> "%Y-%m-%d-%H-%M" | ||
| Hours -> "%Y-%m-%d-%H" | ||
| Days -> "%Y-%m-%d" | ||
| Months -> "%Y-%m" | ||
| Years -> "%Y" | ||
|
|
||
| combinedTtl :: SlidingWindowOptions -> Integer | ||
| combinedTtl SlidingWindowOptions {..} = | ||
| (period + 1) * convertPeriodTypeToSeconds periodType | ||
|
|
||
| decrementPeriod :: PeriodType -> UTCTime -> UTCTime | ||
| decrementPeriod periodType t@(UTCTime date time) = case periodType of | ||
| Minutes -> addUTCTime (-60) t | ||
| Hours -> addUTCTime (-3600) t | ||
| Days -> UTCTime (addDays (-1) date) time | ||
| Months -> UTCTime (addGregorianMonthsClip (-1) date) time | ||
| Years -> UTCTime (addGregorianYearsClip (-1) date) time | ||
|
|
||
| inWindowBucketTimes :: PeriodType -> UTCTime -> Integer -> [UTCTime] | ||
| inWindowBucketTimes pt now n = | ||
| take (fromInteger n) (iterate (decrementPeriod pt) now) | ||
|
|
||
| -- | All buckets covered by the window, head = current period (today), | ||
| -- oldest last. Pairs each bucket-id with its UTCTime so callers don't | ||
| -- have to re-derive the time from the id. | ||
| windowBuckets :: SlidingWindowOptions -> UTCTime -> [(UTCTime, Text)] | ||
| windowBuckets SlidingWindowOptions {..} now = | ||
| map (\t -> (t, periodBucketId periodType t)) (inWindowBucketTimes periodType now period) | ||
|
|
||
| -- ========================== Sliding Window Counters ========================== | ||
|
|
||
| incrementWindowCount :: | ||
|
|
@@ -208,10 +243,15 @@ incrementByValueImpl mbTimeStamp val getOutOfWindowKey getStoredResultKey getWin | |
| now <- L.runIO getCurrentTime | ||
| let utcTime = fromMaybe now mbTimeStamp | ||
| finalKey = getWindowKey periodType key utcTime | ||
| expirationTime = (period + 1) * convertPeriodTypeToSeconds periodType | ||
| bid = periodBucketId periodType utcTime | ||
| isToday = bid == periodBucketId periodType now | ||
| Redis.whenWithLockRedis (makeCachingLockKey key) 10 . void $ cacheTheCounts now val getOutOfWindowKey getStoredResultKey getWindowKey key swo | ||
| let expirationTime = (period + 1) * convertPeriodTypeToSeconds periodType | ||
| void $ Redis.incrby finalKey val | ||
| Redis.expire finalKey $ fromIntegral expirationTime | ||
| unless isToday $ | ||
| Redis.withWaitAndLockRedis (mkReadLockKey key) 10 5000 $ | ||
| Redis.hDel (mkCombinedKey key) [bid] | ||
|
|
||
| decrementWindowCount :: | ||
| ( L.MonadFlow m, | ||
|
|
@@ -253,10 +293,15 @@ decrementByValueImpl mbTimeStamp val getOutOfWindowKey getStoredResultKey getWin | |
| now <- L.runIO getCurrentTime | ||
| let utcTime = fromMaybe now mbTimeStamp | ||
| finalKey = getWindowKey periodType key utcTime | ||
| expirationTime = (period + 1) * convertPeriodTypeToSeconds periodType | ||
| bid = periodBucketId periodType utcTime | ||
| isToday = bid == periodBucketId periodType now | ||
| Redis.whenWithLockRedis (makeCachingLockKey key) 10 . void $ cacheTheCounts now val getOutOfWindowKey getStoredResultKey getWindowKey key swo | ||
| let expirationTime = (period + 1) * convertPeriodTypeToSeconds periodType | ||
| void $ Redis.decrby finalKey val | ||
| Redis.expire finalKey $ fromIntegral expirationTime | ||
| unless isToday $ | ||
| Redis.withWaitAndLockRedis (mkReadLockKey key) 10 5000 $ | ||
| Redis.hDel (mkCombinedKey key) [bid] | ||
|
coderabbitai[bot] marked this conversation as resolved.
|
||
|
|
||
| -- the cached value would stay correct for current and current + 1 peroid in any given periodType so keeping the expiry as end of (current + 1) periodType from now | ||
| cacheTheCounts :: | ||
|
|
@@ -370,49 +415,164 @@ getNumDenFromCache postiveCaseKey totalCaseKey = | |
| totalCases <- MaybeT $ getCountsFromCache totalCaseKey | ||
| pure (positiveCases, totalCases) | ||
|
|
||
| -- | Sum of the current window. Implemented on top of getCurrentWindowValues | ||
| -- so it benefits from the combined-cache fast path automatically. | ||
| getCurrentWindowCount :: | ||
| ( L.MonadFlow m, | ||
| MonadClock m, | ||
| Redis.HedisFlow m r, | ||
| TryException m | ||
| ) => | ||
| Text -> | ||
| SlidingWindowOptions -> | ||
| m Integer | ||
| getCurrentWindowCount key swo = do | ||
| now <- L.runIO getCurrentTime | ||
| mbCountFromCache <- getCountsFromCache key | ||
| case mbCountFromCache of | ||
| Just count -> pure count | ||
| Nothing -> Redis.withWaitAndLockRedis (makeCachingLockKey key) 10 20 $ cacheTheCounts now 0 makeSWKeyForTime makeQuickAccessWindowCountKey makeSlidingWindowKey key swo | ||
| values <- getCurrentWindowValues key swo | ||
| pure $ sum (catMaybes (values :: [Maybe Integer])) | ||
|
|
||
| -- | Read the last `nPeriod` bucket values from the combined cache (with | ||
| -- today's value spliced in from the live per-period key). Index 0 is the | ||
| -- current period; index i is i periods ago. See module-top comment for the | ||
| -- cache semantics. | ||
| getCurrentWindowValuesUptoLast :: | ||
| ( L.MonadFlow m, | ||
| MonadClock m, | ||
| Redis.HedisFlow m r, | ||
| FromJSON a, | ||
| ToJSON a, | ||
| Num a, | ||
| TryException m | ||
| ) => | ||
| Integer -> | ||
| Text -> | ||
| SlidingWindowOptions -> | ||
| m [Maybe a] | ||
| getCurrentWindowValuesUptoLast nPeriod key swo = do | ||
| utcTime <- L.runIO getCurrentTime | ||
| let keysToFetch = getkeysUptoThisPeriod swo utcTime (makeSlidingWindowKey (periodType swo) key) nPeriod | ||
| mapM Redis.get keysToFetch | ||
| getCurrentWindowValuesUptoLast nPeriod key swo = | ||
| take (fromInteger nPeriod) . map snd <$> readWindowFromCombinedCache key swo | ||
|
|
||
| -- | Read all `period` bucket values from the combined cache. | ||
| getCurrentWindowValues :: | ||
| ( L.MonadFlow m, | ||
| MonadClock m, | ||
| Redis.HedisFlow m r, | ||
| FromJSON a, | ||
| ToJSON a, | ||
| Num a, | ||
| TryException m | ||
| ) => | ||
| Text -> | ||
| SlidingWindowOptions -> | ||
| m [Maybe a] | ||
| getCurrentWindowValues key swo = do | ||
| utcTime <- L.runIO getCurrentTime | ||
| let keysToFetch = getkeysForLastPeriods swo utcTime $ makeSlidingWindowKey (periodType swo) key | ||
| mapM Redis.get keysToFetch | ||
| getCurrentWindowValues key swo = map snd <$> readWindowFromCombinedCache key swo | ||
|
|
||
| -- | Same as 'getCurrentWindowValues' but each value is paired with the | ||
| -- 'UTCTime' of its bucket. Use this when the caller needs to label values | ||
| -- with their period (e.g. render dates on the frontend) so the values | ||
| -- don't have to be zipped against a separately-computed date list — the | ||
| -- two could drift across a midnight boundary or any ordering change. | ||
| getCurrentWindowValuesWithTime :: | ||
| ( L.MonadFlow m, | ||
| MonadClock m, | ||
| Redis.HedisFlow m r, | ||
| FromJSON a, | ||
| ToJSON a, | ||
| Num a, | ||
| TryException m | ||
| ) => | ||
| Text -> | ||
| SlidingWindowOptions -> | ||
| m [(UTCTime, Maybe a)] | ||
| getCurrentWindowValuesWithTime = readWindowFromCombinedCache | ||
|
|
||
| -- | Combined-cache read path: build the in-window bucket list, serve | ||
| -- historical buckets from the combined HASH (repairing it if stale), | ||
| -- splice today live from the per-period key, then return values paired | ||
| -- with the 'UTCTime' of each bucket (head = current period, oldest last). | ||
| readWindowFromCombinedCache :: | ||
| ( L.MonadFlow m, | ||
| MonadClock m, | ||
| Redis.HedisFlow m r, | ||
| FromJSON a, | ||
| ToJSON a, | ||
| Num a, | ||
| TryException m | ||
| ) => | ||
| Text -> | ||
| SlidingWindowOptions -> | ||
| m [(UTCTime, Maybe a)] | ||
| readWindowFromCombinedCache key swo@SlidingWindowOptions {..} = do | ||
| now <- L.runIO getCurrentTime | ||
| let buckets = windowBuckets swo now | ||
| (todayBid, historicalBuckets) = case buckets of | ||
| (_, b) : rest -> (b, rest) | ||
| [] -> (periodBucketId periodType now, []) -- defensive; period >= 1 in practice | ||
| L.logDebug @Text "SlidingWindowCounters" $ "[lock-before] key=" <> key | ||
| (cached, durationMs) <- | ||
| measureDuration $ | ||
| Redis.withWaitAndLockRedis (mkReadLockKey key) 10 5000 $ | ||
| repairCombinedCacheIfStale key swo historicalBuckets | ||
| L.logDebug @Text "SlidingWindowCounters" $ | ||
| "[lock-after] key=" <> key | ||
| <> " took_ms=" | ||
| <> T.pack (show durationMs) | ||
| mbToday <- Redis.get (makeSlidingWindowKey periodType key now) | ||
| let merged = maybe cached (\v -> Map.insert todayBid v cached) mbToday | ||
| pure [(t, Map.lookup bid merged) | (t, bid) <- buckets] | ||
|
|
||
| repairCombinedCacheIfStale :: | ||
| ( L.MonadFlow m, | ||
| Redis.HedisFlow m r, | ||
| FromJSON a, | ||
| ToJSON a, | ||
| Num a, | ||
| TryException m | ||
| ) => | ||
| Text -> | ||
| SlidingWindowOptions -> | ||
| [(UTCTime, Text)] -> | ||
| m (Map.Map Text a) | ||
| repairCombinedCacheIfStale key swo@SlidingWindowOptions {..} historicalBuckets = do | ||
| let combinedKey = mkCombinedKey key | ||
| historicalIds = map snd historicalBuckets | ||
| windowSet = Set.fromList historicalIds | ||
| ttl = combinedTtl swo | ||
| cur <- Map.fromList <$> Redis.hGetAll combinedKey | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is already fetched once in please reuse things.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No bro, if two threads try to do the same thing then it will be a redunduct process right? |
||
| let stale = filter (`Set.notMember` windowSet) (Map.keys cur) | ||
| fetched <- fmap catMaybes . forM historicalBuckets $ \(t, bid) -> | ||
| if Map.member bid cur | ||
| then pure Nothing | ||
| else bootstrapBucket key periodType ttl (t, bid) | ||
| unless (null stale) $ do | ||
| Redis.hDel combinedKey stale | ||
| Redis.expire combinedKey (fromInteger ttl) | ||
| let dropped = foldr Map.delete cur stale | ||
| added = foldr (uncurry Map.insert) dropped fetched | ||
| pure added | ||
|
|
||
| bootstrapBucket :: | ||
| ( L.MonadFlow m, | ||
| Redis.HedisFlow m r, | ||
| FromJSON a, | ||
| ToJSON a, | ||
| Num a, | ||
| TryException m | ||
| ) => | ||
| Text -> | ||
| PeriodType -> | ||
| Integer -> | ||
| (UTCTime, Text) -> | ||
| m (Maybe (Text, a)) | ||
| bootstrapBucket key periodType ttl (t, bid) = do | ||
| let combinedKey = mkCombinedKey key | ||
| oldKey = makeSlidingWindowKey periodType key t | ||
| mv <- Redis.get oldKey | ||
| case mv of | ||
| Just v -> do | ||
| Redis.hSetExp combinedKey bid v (fromInteger ttl) | ||
| pure $ Just (bid, v) | ||
| Nothing -> do | ||
| Redis.hSetExp combinedKey bid (0 :: Integer) (fromInteger ttl) | ||
| pure $ Just (bid, 0) | ||
|
|
||
| deleteCurrentWindowValues :: | ||
| ( L.MonadFlow m, | ||
|
|
@@ -426,3 +586,4 @@ deleteCurrentWindowValues key swo = do | |
| utcTime <- L.runIO getCurrentTime | ||
| let keysToDel = getkeysForLastPeriods swo utcTime $ makeSlidingWindowKey (periodType swo) key | ||
| mapM_ Redis.del keysToDel | ||
| Redis.del (mkCombinedKey key) | ||
Uh oh!
There was an error while loading. Please reload this page.