From 719a4cc72aa912bd8170ad1062322647a15b354c Mon Sep 17 00:00:00 2001 From: anu19981 Date: Thu, 30 Apr 2026 17:15:31 +0530 Subject: [PATCH 01/11] Sk/aarokya prod (#1255) * aarokya-integ * aarokya-headerchanges --- lib/mobility-core/mobility-core.cabal | 6 +++ .../External/PartnerSdk/Aarokya/Flow.hs | 32 +++++++++++++ .../External/PartnerSdk/Aarokya/Types.hs | 28 ++++++++++++ .../Kernel/External/PartnerSdk/Interface.hs | 22 +++++++++ .../External/PartnerSdk/Interface/Aarokya.hs | 45 +++++++++++++++++++ .../External/PartnerSdk/Interface/Types.hs | 26 +++++++++++ .../src/Kernel/External/PartnerSdk/Types.hs | 22 +++++++++ 7 files changed, 181 insertions(+) create mode 100644 lib/mobility-core/src/Kernel/External/PartnerSdk/Aarokya/Flow.hs create mode 100644 lib/mobility-core/src/Kernel/External/PartnerSdk/Aarokya/Types.hs create mode 100644 lib/mobility-core/src/Kernel/External/PartnerSdk/Interface.hs create mode 100644 lib/mobility-core/src/Kernel/External/PartnerSdk/Interface/Aarokya.hs create mode 100644 lib/mobility-core/src/Kernel/External/PartnerSdk/Interface/Types.hs create mode 100644 lib/mobility-core/src/Kernel/External/PartnerSdk/Types.hs diff --git a/lib/mobility-core/mobility-core.cabal b/lib/mobility-core/mobility-core.cabal index bae8825ef..31c544b47 100644 --- a/lib/mobility-core/mobility-core.cabal +++ b/lib/mobility-core/mobility-core.cabal @@ -155,6 +155,12 @@ library Kernel.External.Notification.PayTM.Client Kernel.External.Notification.PayTM.Types Kernel.External.Notification.Types + Kernel.External.PartnerSdk.Aarokya.Flow + Kernel.External.PartnerSdk.Aarokya.Types + Kernel.External.PartnerSdk.Interface + Kernel.External.PartnerSdk.Interface.Aarokya + Kernel.External.PartnerSdk.Interface.Types + Kernel.External.PartnerSdk.Types Kernel.External.Payment.Interface Kernel.External.Payment.Interface.Events.Types Kernel.External.Payment.Interface.Juspay diff --git a/lib/mobility-core/src/Kernel/External/PartnerSdk/Aarokya/Flow.hs b/lib/mobility-core/src/Kernel/External/PartnerSdk/Aarokya/Flow.hs new file mode 100644 index 000000000..afc93e492 --- /dev/null +++ b/lib/mobility-core/src/Kernel/External/PartnerSdk/Aarokya/Flow.hs @@ -0,0 +1,32 @@ +module Kernel.External.PartnerSdk.Aarokya.Flow where + +import EulerHS.Types as Euler +import Kernel.External.PartnerSdk.Aarokya.Types +import Kernel.Prelude +import qualified Kernel.Tools.Metrics.CoreMetrics as Metrics +import Kernel.Types.Error +import Kernel.Utils.Common +import Servant hiding (throwError) + +type GenerateTokenAPI = + "auth" + :> "token" + :> Header "Authorization" Text + :> ReqBody '[JSON] AarokyaTokenRequest + :> Post '[JSON] AarokyaTokenResponse + +generateToken :: + (Metrics.CoreMetrics m, MonadFlow m, HasRequestId r, MonadReader r m) => + BaseUrl -> + Text -> + AarokyaTokenRequest -> + m AarokyaTokenResponse +generateToken url basicToken request = do + let proxy = Proxy @GenerateTokenAPI + eulerClient = Euler.client proxy (Just ("Basic " <> basicToken)) request + callAarokyaAPI url eulerClient "aarokya-generate-token" proxy + +callAarokyaAPI :: (MonadFlow m, HasRequestId r, MonadReader r m) => CallAPI' m r api res res +callAarokyaAPI url eulerClient description proxy = do + callAPI url eulerClient description proxy + >>= fromEitherM (\err -> InternalError $ "Failed to call " <> description <> " API: " <> show err) diff --git a/lib/mobility-core/src/Kernel/External/PartnerSdk/Aarokya/Types.hs b/lib/mobility-core/src/Kernel/External/PartnerSdk/Aarokya/Types.hs new file mode 100644 index 000000000..e62387cb4 --- /dev/null +++ b/lib/mobility-core/src/Kernel/External/PartnerSdk/Aarokya/Types.hs @@ -0,0 +1,28 @@ +module Kernel.External.PartnerSdk.Aarokya.Types where + +import Kernel.External.Encryption +import Kernel.Prelude + +data AarokyaIdProof = AarokyaIdProof + { proof_type :: Text, + number :: Text + } + deriving (Show, Eq, Generic, ToJSON, FromJSON) + +data AarokyaTokenRequest = AarokyaTokenRequest + { phone_country_code :: Text, + phone_number :: Text, + id_proof :: AarokyaIdProof + } + deriving (Show, Eq, Generic, ToJSON, FromJSON) + +newtype AarokyaTokenResponse = AarokyaTokenResponse + { access_token :: Text + } + deriving (Show, Eq, Generic, ToJSON, FromJSON) + +data AarokyaSdkConfig = AarokyaSdkConfig + { url :: BaseUrl, + basicToken :: EncryptedField 'AsEncrypted Text + } + deriving (Show, Eq, Generic, ToJSON, FromJSON) diff --git a/lib/mobility-core/src/Kernel/External/PartnerSdk/Interface.hs b/lib/mobility-core/src/Kernel/External/PartnerSdk/Interface.hs new file mode 100644 index 000000000..ee97b04d7 --- /dev/null +++ b/lib/mobility-core/src/Kernel/External/PartnerSdk/Interface.hs @@ -0,0 +1,22 @@ +module Kernel.External.PartnerSdk.Interface + ( module Reexport, + module Kernel.External.PartnerSdk.Interface, + ) +where + +import qualified Kernel.External.PartnerSdk.Interface.Aarokya as Aarokya +import Kernel.External.PartnerSdk.Interface.Types +import Kernel.External.PartnerSdk.Types as Reexport +import Kernel.Tools.Metrics.CoreMetrics (CoreMetrics) +import Kernel.Utils.Common + +generateToken :: + ( EncFlow m r, + CoreMetrics m, + HasRequestId r + ) => + PartnerSdkConfig -> + GenerateTokenReq -> + m GenerateTokenResp +generateToken serviceConfig req = case serviceConfig of + AarokyaPartnerSdkConfig cfg -> Aarokya.generateToken cfg req diff --git a/lib/mobility-core/src/Kernel/External/PartnerSdk/Interface/Aarokya.hs b/lib/mobility-core/src/Kernel/External/PartnerSdk/Interface/Aarokya.hs new file mode 100644 index 000000000..fa01f5743 --- /dev/null +++ b/lib/mobility-core/src/Kernel/External/PartnerSdk/Interface/Aarokya.hs @@ -0,0 +1,45 @@ +module Kernel.External.PartnerSdk.Interface.Aarokya where + +import Kernel.External.Encryption +import qualified Kernel.External.PartnerSdk.Aarokya.Flow as Aarokya +import qualified Kernel.External.PartnerSdk.Aarokya.Types as AarokyaTypes +import Kernel.External.PartnerSdk.Interface.Types +import Kernel.Prelude +import qualified Kernel.Tools.Metrics.CoreMetrics as Metrics +import Kernel.Utils.Common + +generateToken :: + ( Metrics.CoreMetrics m, + EncFlow m r, + HasRequestId r, + MonadReader r m + ) => + AarokyaTypes.AarokyaSdkConfig -> + GenerateTokenReq -> + m GenerateTokenResp +generateToken config req = do + basicToken <- decrypt config.basicToken + let aarokyaReq = toAarokyaTokenRequest req + resp <- Aarokya.generateToken config.url basicToken aarokyaReq + pure $ fromAarokyaTokenResponse resp + +toAarokyaTokenRequest :: GenerateTokenReq -> AarokyaTypes.AarokyaTokenRequest +toAarokyaTokenRequest req = + AarokyaTypes.AarokyaTokenRequest + { phone_country_code = req.phoneCountryCode, + phone_number = req.phoneNumber, + id_proof = toAarokyaIdProof req.idProof + } + +toAarokyaIdProof :: IdProof -> AarokyaTypes.AarokyaIdProof +toAarokyaIdProof p = + AarokyaTypes.AarokyaIdProof + { proof_type = p.proofType, + number = p.number + } + +fromAarokyaTokenResponse :: AarokyaTypes.AarokyaTokenResponse -> GenerateTokenResp +fromAarokyaTokenResponse resp = + GenerateTokenResp + { accessToken = resp.access_token + } diff --git a/lib/mobility-core/src/Kernel/External/PartnerSdk/Interface/Types.hs b/lib/mobility-core/src/Kernel/External/PartnerSdk/Interface/Types.hs new file mode 100644 index 000000000..0c6d21b68 --- /dev/null +++ b/lib/mobility-core/src/Kernel/External/PartnerSdk/Interface/Types.hs @@ -0,0 +1,26 @@ +module Kernel.External.PartnerSdk.Interface.Types where + +import qualified Kernel.External.PartnerSdk.Aarokya.Types as Aarokya +import Kernel.Prelude + +data IdProof = IdProof + { proofType :: Text, + number :: Text + } + deriving (Show, Eq, Generic, ToJSON, FromJSON) + +data GenerateTokenReq = GenerateTokenReq + { phoneCountryCode :: Text, + phoneNumber :: Text, + idProof :: IdProof + } + deriving (Show, Eq, Generic, ToJSON, FromJSON) + +newtype GenerateTokenResp = GenerateTokenResp + { accessToken :: Text + } + deriving (Show, Eq, Generic, ToJSON, FromJSON) + +data PartnerSdkConfig + = AarokyaPartnerSdkConfig Aarokya.AarokyaSdkConfig + deriving (Show, Eq, Generic, ToJSON, FromJSON) diff --git a/lib/mobility-core/src/Kernel/External/PartnerSdk/Types.hs b/lib/mobility-core/src/Kernel/External/PartnerSdk/Types.hs new file mode 100644 index 000000000..93435c1a6 --- /dev/null +++ b/lib/mobility-core/src/Kernel/External/PartnerSdk/Types.hs @@ -0,0 +1,22 @@ +{-# LANGUAGE TemplateHaskell #-} + +module Kernel.External.PartnerSdk.Types where + +import Data.Aeson.Types +import EulerHS.Prelude +import Kernel.Beam.Lib.UtilsTH (mkBeamInstancesForEnumAndList) +import Kernel.Storage.Esqueleto (derivePersistField) + +data PartnerSdkService = Aarokya + deriving (Show, Read, Eq, Ord, Generic) + +$(mkBeamInstancesForEnumAndList ''PartnerSdkService) +derivePersistField "PartnerSdkService" + +instance FromJSON PartnerSdkService where + parseJSON (String "Aarokya") = pure Aarokya + parseJSON (String _) = parseFail "Expected \"Aarokya\"" + parseJSON e = typeMismatch "String" e + +instance ToJSON PartnerSdkService where + toJSON = String . show From 3c02fa9d2cdaf6a65c539231bf4a8e1d570274ff Mon Sep 17 00:00:00 2001 From: Vijay Gupta Date: Fri, 24 Apr 2026 02:48:35 +0530 Subject: [PATCH 02/11] Changed log level from error to warning when primary result is not found in runInMultiCloudRedisMaybeResult function --- lib/mobility-core/src/Kernel/Storage/Hedis/Queries.hs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/mobility-core/src/Kernel/Storage/Hedis/Queries.hs b/lib/mobility-core/src/Kernel/Storage/Hedis/Queries.hs index 46c17e9c8..115889527 100644 --- a/lib/mobility-core/src/Kernel/Storage/Hedis/Queries.hs +++ b/lib/mobility-core/src/Kernel/Storage/Hedis/Queries.hs @@ -176,7 +176,7 @@ runInMultiCloudRedisMaybeResult action = do case primaryResult of Just _ -> pure primaryResult -- Primary has result, return immediately Nothing -> do - logError $ "SECONDARY_CLUSTER: Primary returned Nothing, trying secondary" + logWarning $ "SECONDARY_CLUSTER: Primary returned Nothing, trying secondary" -- Primary returned Nothing, try secondary secondaryResult <- withTryCatch "runInMultiCloudRedisMaybeResult" $ From b624d3a72946981c463e6d80810dc4e225e8b7bf Mon Sep 17 00:00:00 2001 From: Akhilesh Singh Bhadauriya Date: Tue, 5 May 2026 18:55:21 +0530 Subject: [PATCH 03/11] feat: add cluster-aware bulk Redis operations for sharded commands --- .../src/Kernel/Storage/Hedis/Queries.hs | 91 +++++++++++++++++++ 1 file changed, 91 insertions(+) diff --git a/lib/mobility-core/src/Kernel/Storage/Hedis/Queries.hs b/lib/mobility-core/src/Kernel/Storage/Hedis/Queries.hs index 115889527..19b692960 100644 --- a/lib/mobility-core/src/Kernel/Storage/Hedis/Queries.hs +++ b/lib/mobility-core/src/Kernel/Storage/Hedis/Queries.hs @@ -17,6 +17,9 @@ module Kernel.Storage.Hedis.Queries (module Reexport, module Kernel.Storage.Hedi import qualified Data.Aeson as Ae import qualified Data.ByteString as BS import qualified Data.ByteString.Lazy as BSL +import Data.Either (partitionEithers) +import Data.Hashable (Hashable, hash) +import qualified Data.Map.Strict as Map import Data.String.Conversions import Data.Text hiding (concatMap, map, null) import qualified Data.Text as T @@ -1187,3 +1190,91 @@ sAdd key members = withLogTag "Redis" $ do \err -> withLogTag "CLUSTER" $ logTagInfo "FAILED_TO_SADD" (show err) + +-- ============================================================================= +-- Cluster-aware bulk Redis. Groups items by their cluster hash slot +-- (CRC16 → 0..16383) so every command targeting a single shard runs inside +-- ONE 'runRedis' block: hedis pipelines automatically inside a Redis monad +-- action, so N commands cost one TCP round-trip per shard. Shards run in +-- parallel via awaitableFork+await (same pattern as Domain.Utils.mapConcurrently), +-- so total wallclock is (slowest shard), not (sum of all shards). +-- +-- Same prefix semantics as the rest of the wrapper: keys go through +-- 'buildKey' (which reads 'hedisEnv.keyModifier'), so callers wrapping in +-- 'withCrossAppRedis' get raw keys; without it, the configured app prefix +-- applies. Either way, writes via these helpers are interchangeable with +-- reads via 'get' / 'setExp' / etc. +-- +-- Two flavours: +-- +-- 1. 'bulkShardedRedis' — per-item action. Use when each command operates on +-- one key (GET / SET / SETEX / SETNX / DEL / EXPIRE / TTL / EXISTS / INCR +-- / DECR / HGET / HSET / SADD / ZADD / LPUSH / RPUSH / etc.). Action +-- receives the original item plus the already-prefixed key bytes — same +-- shape as 'runWithPrefix'\'s action. +-- +-- 2. 'bulkShardedRedisBatch' — per-shard batch action receiving the bucket +-- of items for one slot. Use for multi-key single commands — every key +-- in a bucket already shares a slot by construction, so cluster's +-- same-slot rule for these commands is satisfied: MGET / MSET / MSETNX / +-- SUNION / SINTER / SDIFF / DEL [k1,k2..] / EXISTS [k1,k2..] / TOUCH / +-- UNLINK / ZUNIONSTORE / etc. Returns per-shard results concatenated; +-- pair items with results inside the action ('zip' inputs with reply) +-- if you need to align them back to input order. +-- +-- Out of scope: MULTI/EXEC transactions — use 'runHedisTransaction' instead. +-- +-- Standalone (non-clustered) Redis: 'keyToSlot' still buckets keys but every +-- bucket resolves to the same connection, so grouping is harmless overhead. +-- ============================================================================= + +-- | Produce a Redis-cluster hash-tag segment "{shard-N}" that buckets the input +-- into one of 'shards' synthetic slots. Embedding it anywhere in a Redis key +-- forces all keys sharing a bucket onto the same cluster slot (Redis hashes +-- the first '{...}' segment), which is the precondition for the bulk +-- helpers below to actually pipeline a batch. +-- +-- Tuning: 8–32 is the sweet spot for batches of 5–50 on 3–6 cluster shards. +-- Smaller (1–4) risks write hotspots; larger (256+) erodes the pipelining +-- win because per-batch slot count approaches batch size. +shardHashTag :: Hashable a => Int -> a -> Text +shardHashTag shards x = + let n = max 1 shards + in "{shard-" <> show (hash x `mod` n) <> "}" + +bulkShardedRedisBatch :: + (HedisFlow m env, TryException m, Forkable m, L.MonadFlow m) => + -- | extracts the raw (un-prefixed) routing key from each item + (a -> Text) -> + -- | per-shard action — invoked once per slot inside 'runRedis'; receives + -- (item, prefixed-key-bytes) pairs that all share a slot. Same-slot rule + -- for MGET/MSET/etc. is satisfied by construction. + ([(a, BS.ByteString)] -> Redis (Either Reply [b])) -> + [a] -> + m [b] +bulkShardedRedisBatch _ _ [] = pure [] +bulkShardedRedisBatch keyOf shardOp items = do + pairs <- forM items $ \x -> do + keyBs <- buildKey (keyOf x) + pure (x, keyBs) + let groups = Map.elems $ Kernel.Prelude.foldl' insertItem Map.empty pairs + Kernel.Prelude.concat <$> traverse runChunk (chunksOf clusterMGetForkLimit groups) + where + insertItem acc pair@(_, keyBs) = + let slot = fromIntegral (Hedis.keyToSlot keyBs) :: Int + in Map.insertWith (++) slot [pair] acc + runShard shardPairs = do + con <- asks (.hedisClusterEnv.hedisConnection) + reply <- liftIO $ Hedis.runRedis con (shardOp shardPairs) + Error.fromEitherM (HedisReplyError . show) reply + runChunk chunk = do + awaitables <- mapM (awaitableFork "bulkShardedRedisBatch" . runShard) chunk + results <- mapM (L.await Nothing) awaitables + case partitionEithers results of + ([], successes) -> pure (Kernel.Prelude.concat successes) + (err : _, _) -> + Error.throwError $ HedisReplyError $ "bulkShardedRedisBatch shard fork failed: " <> show err + chunksOf k = DL.unfoldr step + where + step [] = Nothing + step xs = Just (DL.splitAt k xs) From 05c0439d90340544a4af332e3991234fc09bbb34 Mon Sep 17 00:00:00 2001 From: Akhilesh Singh Bhadauriya Date: Tue, 5 May 2026 20:45:14 +0530 Subject: [PATCH 04/11] feat: bulkShardedRedisBatch wraps existing m-action helpers per shard --- .../src/Kernel/Storage/Hedis/Queries.hs | 30 ++++++++++--------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/lib/mobility-core/src/Kernel/Storage/Hedis/Queries.hs b/lib/mobility-core/src/Kernel/Storage/Hedis/Queries.hs index 19b692960..96e454052 100644 --- a/lib/mobility-core/src/Kernel/Storage/Hedis/Queries.hs +++ b/lib/mobility-core/src/Kernel/Storage/Hedis/Queries.hs @@ -1242,31 +1242,33 @@ shardHashTag shards x = let n = max 1 shards in "{shard-" <> show (hash x `mod` n) <> "}" +-- | Per-call key cap inside a single shard. A shard's items are split into +-- batches of this size and the caller's action runs once per batch, so a +-- single MGET/MSET never balloons past this many keys. +bulkShardBatchSize :: Int +bulkShardBatchSize = 100 + bulkShardedRedisBatch :: (HedisFlow m env, TryException m, Forkable m, L.MonadFlow m) => -- | extracts the raw (un-prefixed) routing key from each item (a -> Text) -> - -- | per-shard action — invoked once per slot inside 'runRedis'; receives - -- (item, prefixed-key-bytes) pairs that all share a slot. Same-slot rule - -- for MGET/MSET/etc. is satisfied by construction. - ([(a, BS.ByteString)] -> Redis (Either Reply [b])) -> + -- | per-shard action — invoked once per slot. All items in the input list + -- share a cluster slot, so the caller should use a multi-key wrapper + -- ('mGetCluster', 'mSet', etc.) to get one pipelined round-trip per shard. + ([a] -> m [b]) -> [a] -> m [b] bulkShardedRedisBatch _ _ [] = pure [] bulkShardedRedisBatch keyOf shardOp items = do - pairs <- forM items $ \x -> do + slotted <- forM items $ \x -> do keyBs <- buildKey (keyOf x) - pure (x, keyBs) - let groups = Map.elems $ Kernel.Prelude.foldl' insertItem Map.empty pairs + let slot = fromIntegral (Hedis.keyToSlot keyBs) :: Int + pure (slot, x) + let groups = Map.elems $ Kernel.Prelude.foldl' insertItem Map.empty slotted Kernel.Prelude.concat <$> traverse runChunk (chunksOf clusterMGetForkLimit groups) where - insertItem acc pair@(_, keyBs) = - let slot = fromIntegral (Hedis.keyToSlot keyBs) :: Int - in Map.insertWith (++) slot [pair] acc - runShard shardPairs = do - con <- asks (.hedisClusterEnv.hedisConnection) - reply <- liftIO $ Hedis.runRedis con (shardOp shardPairs) - Error.fromEitherM (HedisReplyError . show) reply + insertItem acc (slot, x) = Map.insertWith (++) slot [x] acc + runShard shardItems = Kernel.Prelude.concat <$> mapM shardOp (chunksOf bulkShardBatchSize shardItems) runChunk chunk = do awaitables <- mapM (awaitableFork "bulkShardedRedisBatch" . runShard) chunk results <- mapM (L.await Nothing) awaitables From 4c5be11bfe2041473a9d1813011f219bb062e89b Mon Sep 17 00:00:00 2001 From: Akhilesh Singh Bhadauriya Date: Tue, 5 May 2026 20:45:14 +0530 Subject: [PATCH 05/11] feat: add Lua-based setExpMany for atomic bulk SET-with-TTL --- .../src/Kernel/Storage/Hedis/Queries.hs | 33 ++++++++++++++++++- 1 file changed, 32 insertions(+), 1 deletion(-) diff --git a/lib/mobility-core/src/Kernel/Storage/Hedis/Queries.hs b/lib/mobility-core/src/Kernel/Storage/Hedis/Queries.hs index 96e454052..47752ddcf 100644 --- a/lib/mobility-core/src/Kernel/Storage/Hedis/Queries.hs +++ b/lib/mobility-core/src/Kernel/Storage/Hedis/Queries.hs @@ -19,14 +19,16 @@ import qualified Data.ByteString as BS import qualified Data.ByteString.Lazy as BSL import Data.Either (partitionEithers) import Data.Hashable (Hashable, hash) +import qualified Data.List as DL import qualified Data.Map.Strict as Map import Data.String.Conversions -import Data.Text hiding (concatMap, map, null) +import Data.Text hiding (chunksOf, concatMap, map, null) import qualified Data.Text as T import qualified Data.Text as Text import Database.Redis as Reexport (GeoBy (..), GeoFrom (..), Queued, Redis, RedisTx, Reply, TxResult (..)) import qualified Database.Redis as Hedis import qualified Database.Redis.Cluster as Cluster +import qualified EulerHS.Language as L import EulerHS.Prelude (whenLeft) import GHC.Records.Extra import Kernel.Beam.Connection.EnvVars (getRunInMasterCloudRedisCell, getRunInMasterLTSRedisCell) @@ -1128,6 +1130,11 @@ geoSearch key from by = withLogTag "Redis" $ do pure [] -- Return an empty list if there was an error Right items -> pure items +-- | Hard cap on concurrent forks so a request with keys spanning many slots +-- can't unbounded-fan-out into a fork storm. +clusterMGetForkLimit :: Int +clusterMGetForkLimit = 32 + geoSearchDecoded :: (FromJSON a, HedisFlow m env, TryException m) => Text -> @@ -1280,3 +1287,27 @@ bulkShardedRedisBatch keyOf shardOp items = do where step [] = Nothing step xs = Just (DL.splitAt k xs) + +-- | Bulk SET-with-TTL via a single Lua script — one Redis command per call, +-- atomic on the server side. Designed to be passed as the per-shard action to +-- 'bulkShardedRedisBatch': all input pairs MUST hash to the same cluster slot +-- (use 'shardHashTag' to bucket), otherwise cluster will return a CROSSSLOT +-- error. Reply errors propagate via 'runHedis'. +-- Use it for less number of keys per call (up to ~100) to get the pipelining and atomicity benefits; +setExpMany :: + (HedisFlow m env, TryException m, ToJSON a) => + ExpirationTime -> + [(Text, a)] -> + m () +setExpMany _ [] = pure () +setExpMany expirationTime pairs = withTimeRedis "RedisCluster" "setExpMany" . withLogTag "Redis" $ do + prefKeys <- mapM (buildKey . fst) pairs + let vals = map (BSL.toStrict . Ae.encode . snd) pairs + ttlArg = cs (show (toInteger expirationTime) :: String) :: BS.ByteString + script = + "local ttl = tonumber(ARGV[1]) " + <> "for i = 1, #KEYS do " + <> "redis.call('SET', KEYS[i], ARGV[i + 1], 'EX', ttl) " + <> "end " + <> "return 'OK'" + void . runHedis $ (Hedis.eval script prefKeys (ttlArg : vals) :: Redis (Either Reply Reply)) From 05be3f5c7ed464a54c0cbaec48f8792497b855b5 Mon Sep 17 00:00:00 2001 From: Vijay Gupta Date: Wed, 29 Apr 2026 17:21:13 +0530 Subject: [PATCH 06/11] feat: Add support for asynchronous cluster MGET operations and new environment variable retrieval function - Introduced `getClusterMGetAsyncEnabled` to fetch the environment variable for enabling async MGET. - Enhanced `mGetClusterRaw` to utilize the new async feature, allowing for improved performance with concurrent requests. - Added internal logic to manage concurrent forks and limit the number of simultaneous operations. --- .../src/Kernel/Beam/Connection/EnvVars.hs | 5 + .../src/Kernel/Storage/Hedis/Queries.hs | 171 +++++++++++++++++- 2 files changed, 174 insertions(+), 2 deletions(-) diff --git a/lib/mobility-core/src/Kernel/Beam/Connection/EnvVars.hs b/lib/mobility-core/src/Kernel/Beam/Connection/EnvVars.hs index f57fc4035..347837bca 100644 --- a/lib/mobility-core/src/Kernel/Beam/Connection/EnvVars.hs +++ b/lib/mobility-core/src/Kernel/Beam/Connection/EnvVars.hs @@ -70,3 +70,8 @@ getRunInMasterLTSRedisCell :: IO Bool getRunInMasterLTSRedisCell = do envVal <- lookupEnv "RUN_IN_MASTER_LTS_REDIS_CELL" pure (fromMaybe False (readMaybe =<< envVal)) + +getClusterMGetAsyncEnabled :: IO Bool +getClusterMGetAsyncEnabled = do + envVal <- lookupEnv "CLUSTER_MGET_ASYNC_ENABLED" + pure (fromMaybe False (readMaybe =<< envVal)) diff --git a/lib/mobility-core/src/Kernel/Storage/Hedis/Queries.hs b/lib/mobility-core/src/Kernel/Storage/Hedis/Queries.hs index 47752ddcf..f9bcc4fd6 100644 --- a/lib/mobility-core/src/Kernel/Storage/Hedis/Queries.hs +++ b/lib/mobility-core/src/Kernel/Storage/Hedis/Queries.hs @@ -1,3 +1,5 @@ +{-# LANGUAGE BangPatterns #-} + {- Copyright 2022-23, Juspay India Pvt Ltd @@ -19,19 +21,23 @@ import qualified Data.ByteString as BS import qualified Data.ByteString.Lazy as BSL import Data.Either (partitionEithers) import Data.Hashable (Hashable, hash) +import qualified Data.IntMap.Strict as IntMap import qualified Data.List as DL +import qualified Data.List.NonEmpty as NE import qualified Data.Map.Strict as Map import Data.String.Conversions -import Data.Text hiding (chunksOf, concatMap, map, null) +import Data.Text hiding (any, chunksOf, concat, concatMap, length, map, null, replicate, zip) import qualified Data.Text as T import qualified Data.Text as Text +import qualified Data.Vector as V +import Database.Redis (keyToSlot) import Database.Redis as Reexport (GeoBy (..), GeoFrom (..), Queued, Redis, RedisTx, Reply, TxResult (..)) import qualified Database.Redis as Hedis import qualified Database.Redis.Cluster as Cluster import qualified EulerHS.Language as L import EulerHS.Prelude (whenLeft) import GHC.Records.Extra -import Kernel.Beam.Connection.EnvVars (getRunInMasterCloudRedisCell, getRunInMasterLTSRedisCell) +import Kernel.Beam.Connection.EnvVars (getClusterMGetAsyncEnabled, getRunInMasterCloudRedisCell, getRunInMasterLTSRedisCell) import Kernel.Prelude import Kernel.Storage.Hedis.Config import Kernel.Storage.Hedis.Error @@ -365,6 +371,167 @@ safeGet :: (FromJSON a, HedisFlow m env, TryException m) => Text -> m (Maybe a) safeGet key = get' key (del key) +-- | Internal: cluster MGET returning one Vector slot per input key, aligned +-- by index. Hits → @Just bs@; misses, Redis errors, fork failures all → @Nothing@. +-- Groups by hash slot (Cluster requires single-slot MGETs); when async is +-- enabled, runs forks in capped chunks to bound parallelism. +mGetClusterRaw :: + forall m env. + (HedisFlow m env, TryException m, L.MonadFlow m, Forkable m) => + [Text] -> + m (V.Vector (Maybe BS.ByteString)) +mGetClusterRaw [] = pure V.empty +mGetClusterRaw keys = withLogTag "CLUSTER" $ do + let !nKeys = length keys + prefKeys <- mapM buildKey keys + let !groups = + IntMap.elems $ + IntMap.fromListWith + (<>) + [(fromEnum (keyToSlot pk), NE.singleton (i, pk)) | (i, pk) <- zip [0 ..] prefKeys] + asyncEnabled <- liftIO getClusterMGetAsyncEnabled + results <- + if asyncEnabled && length groups > 1 + then concat <$> traverse runChunk (chunksOf clusterMGetForkLimit groups) + else traverse runGroup groups + pure $! V.replicate nKeys Nothing V.// concat results + where + runGroup :: NonEmpty (Int, BS.ByteString) -> m [(Int, Maybe BS.ByteString)] + runGroup grp = do + let (idxs, prefKs) = NE.unzip grp + missForGroup = map (,Nothing) (NE.toList idxs) + result <- + withTimeRedis "RedisCluster" "mget" $ + withTryCatch "mGetCluster" $ + runHedisEither $ Hedis.mget (NE.toList prefKs) + case result of + Left exc -> do + logTagError "ERROR_WHILE_MGET" $ "Cluster MGET threw: " <> show exc + pure missForGroup + Right (Left reply) -> do + logTagError "ERROR_WHILE_MGET" $ "Cluster MGET failed: " <> show reply + pure missForGroup + Right (Right listBS) -> + -- Defensive: tolerate a malformed Redis reply that disagrees on length. + let !padded = DL.take (NE.length grp) (listBS <> DL.repeat Nothing) + in pure $ DL.zip (NE.toList idxs) padded + + runChunk :: [NonEmpty (Int, BS.ByteString)] -> m [[(Int, Maybe BS.ByteString)]] + runChunk chunk = do + awaitables <- forM (DL.zip [0 :: Int ..] chunk) $ \(j, grp) -> + (grp,) <$> awaitableFork ("mGetCluster:" <> show j) (runGroup grp) + forM awaitables $ \(grp, aw) -> do + res <- L.await Nothing aw + case res of + Right xs -> pure xs + Left err -> do + logTagError "ERROR_WHILE_MGET_FORK" $ "Concurrent fork failed: " <> show err + pure $ map ((,Nothing) . fst) (NE.toList grp) + + chunksOf :: Int -> [a] -> [[a]] + chunksOf k = DL.unfoldr step + where + step [] = Nothing + step xs = Just (DL.splitAt k xs) + +-- | Cluster MGET returning just the decoded values that were found. No +-- order guarantee on the output. Decode failures are logged and dropped. +mGetCluster :: + (FromJSON a, HedisFlow m env, TryException m, L.MonadFlow m, Forkable m) => + [Text] -> + m [a] +mGetCluster keys = do + raw <- mGetClusterRaw keys + catMaybes . V.toList <$> V.mapM decodeBytesLogging raw + +-- | Cluster MGET returning (key, value) pairs in input-key order. Decode +-- failures are logged (with the offending key) and dropped; the key is +-- left in Redis for the writer to fix or overwrite. +mGetClusterWithKeys :: + (FromJSON a, HedisFlow m env, TryException m, L.MonadFlow m, Forkable m) => + [Text] -> + m [(Text, a)] +mGetClusterWithKeys keys = do + raw <- mGetClusterRaw keys + catMaybes . V.toList + <$> V.zipWithM decodeKeyedPair (V.fromList keys) raw + +-- | Internal: standalone MGET returning a Vector aligned by input index. +-- Hits → @Just bs@; misses and Redis errors → @Nothing@. +mGetStandaloneRaw :: + (HedisFlow m env, TryException m) => + [Text] -> + m (V.Vector (Maybe BS.ByteString)) +mGetStandaloneRaw [] = pure V.empty +mGetStandaloneRaw keys = withLogTag "STANDALONE" $ do + let !nKeys = length keys + prefKeys <- mapM buildKey keys + result <- + withTimeRedis "RedisStandalone" "mget" $ + withTryCatch "mGetStandalone" $ + runHedisEither' $ Hedis.mget prefKeys + case result of + Left exc -> do + logTagError "ERROR_WHILE_MGET" $ "Standalone MGET threw: " <> show exc + pure $ V.replicate nKeys Nothing + Right (Left reply) -> do + logTagError "ERROR_WHILE_MGET" $ "Standalone MGET failed: " <> show reply + pure $ V.replicate nKeys Nothing + Right (Right listBS) -> + pure $ V.fromListN nKeys (DL.take nKeys (listBS <> DL.repeat Nothing)) + +-- | Standalone MGET returning just the decoded values that were found. +mGetStandalone :: (FromJSON a, HedisFlow m env, TryException m) => [Text] -> m [a] +mGetStandalone keys = do + raw <- mGetStandaloneRaw keys + catMaybes . V.toList <$> V.mapM decodeBytesLogging raw + +-- | Standalone MGET returning (key, value) pairs in input-key order. Decode +-- failures are logged (with the offending key) and dropped. +mGetStandaloneWithKeys :: + (FromJSON a, HedisFlow m env, TryException m) => + [Text] -> + m [(Text, a)] +mGetStandaloneWithKeys keys = do + raw <- mGetStandaloneRaw keys + catMaybes . V.toList + <$> V.zipWithM decodeKeyedPair (V.fromList keys) raw + +-- | Decode raw bytes; on failure log and return Nothing. Used by the +-- values-only paths where the key isn't tracked. +decodeBytesLogging :: + (FromJSON a, HedisFlow m env, TryException m) => + Maybe BS.ByteString -> + m (Maybe a) +decodeBytesLogging Nothing = pure Nothing +decodeBytesLogging (Just bs) = case Ae.eitherDecode (BSL.fromStrict bs) of + Right v -> pure (Just v) + Left e -> do + logTagError "REDIS" $ "MGet decode failure: " <> cs e + pure Nothing + +-- | Decode bytes paired with their originating key. On bad JSON, log with the +-- key for traceability and drop the entry — does NOT delete the key from Redis. +decodeKeyedPair :: + (FromJSON a, HedisFlow m env, TryException m) => + Text -> + Maybe BS.ByteString -> + m (Maybe (Text, a)) +decodeKeyedPair _ Nothing = pure Nothing +decodeKeyedPair k (Just bs) = case Ae.eitherDecode (BSL.fromStrict bs) of + Right v -> pure (Just (k, v)) + Left e -> do + logTagError "REDIS" $ "MGet decode failure for key " <> k <> ": " <> cs e + pure Nothing + +decodeMGetResult :: + (FromJSON a, HedisFlow m env, TryException m) => + Text -> + Maybe BS.ByteString -> + m (Maybe a) +decodeMGetResult _ Nothing = pure Nothing +decodeMGetResult key (Just bs) = decodeJSONWithErrorHandler key bs (del key) + set :: (ToJSON a, HedisFlow m env, TryException m) => Text -> a -> m () set key val = withLogTag "Redis" $ do From b17a355cb0aecfc097a5c82114f5d82169cd9731 Mon Sep 17 00:00:00 2001 From: dushyant-2002 Date: Wed, 29 Apr 2026 17:10:48 +0530 Subject: [PATCH 07/11] added-udf-9-for-members-discount udf-9-for-members-discount-fix --- .../src/Kernel/External/Payment/Interface/Juspay.hs | 8 +++++--- .../src/Kernel/External/Payment/Interface/Types.hs | 6 +++++- .../src/Kernel/External/Payment/Juspay/Types/Offer.hs | 1 + 3 files changed, 11 insertions(+), 4 deletions(-) diff --git a/lib/mobility-core/src/Kernel/External/Payment/Interface/Juspay.hs b/lib/mobility-core/src/Kernel/External/Payment/Interface/Juspay.hs index c1fa4ed0d..e2334bd28 100644 --- a/lib/mobility-core/src/Kernel/External/Payment/Interface/Juspay.hs +++ b/lib/mobility-core/src/Kernel/External/Payment/Interface/Juspay.hs @@ -777,15 +777,15 @@ offerList config mRoutingId req = do mkOfferListReq :: OfferListReq -> Juspay.OfferListReq mkOfferListReq OfferListReq {..} = Juspay.OfferListReq - { order = mkOfferOrder order planId registrationDate dutyDate paymentMode numOfRides offerListingMetric, + { order = mkOfferOrder order planId registrationDate dutyDate paymentMode numOfRides offerListingMetric membershipStatus, payment_method_info = [], customer = mkOfferCustomer <$> customer, offer_code = Nothing } -mkOfferOrder :: OfferOrder -> Text -> UTCTime -> UTCTime -> Text -> Int -> Maybe UDF6 -> Juspay.OfferOrder +mkOfferOrder :: OfferOrder -> Text -> UTCTime -> UTCTime -> Text -> Int -> Maybe UDF6 -> Maybe UDF9 -> Juspay.OfferOrder ---- add duty day and payment mode respectively in holes ---- -mkOfferOrder OfferOrder {..} planId registrationDate dutyDate paymentMode numOfRides offerListingMetric = +mkOfferOrder OfferOrder {..} planId registrationDate dutyDate paymentMode numOfRides offerListingMetric membershipStatus = Juspay.OfferOrder { order_id = orderId, amount = show amount, @@ -798,6 +798,7 @@ mkOfferOrder OfferOrder {..} planId registrationDate dutyDate paymentMode numOfR let strNumRides = show numOfRides if strNumRides == "-1" then "DEFAULT" else strNumRides, udf6 = parseUDF6 <$> offerListingMetric, + udf9 = parseUDF9 <$> membershipStatus, basket = decodeUtf8 . A.encode <$> basket } where @@ -805,6 +806,7 @@ mkOfferOrder OfferOrder {..} planId registrationDate dutyDate paymentMode numOfR case offerListingMetric' of LIST_BASED_ON_DATE listingDates -> pack $ formatTime defaultTimeLocale "%d_%m_%y" listingDates _ -> show offerListingMetric' + parseUDF9 (MembershipStatus isMember) = if isMember then "TRUE" else "FALSE" mkOfferCustomer :: OfferCustomer -> Juspay.OfferCustomer mkOfferCustomer OfferCustomer {..} = Juspay.OfferCustomer {id = customerId, email, mobile} diff --git a/lib/mobility-core/src/Kernel/External/Payment/Interface/Types.hs b/lib/mobility-core/src/Kernel/External/Payment/Interface/Types.hs index 2fbb94230..96709fd18 100644 --- a/lib/mobility-core/src/Kernel/External/Payment/Interface/Types.hs +++ b/lib/mobility-core/src/Kernel/External/Payment/Interface/Types.hs @@ -469,12 +469,16 @@ data OfferListReq = OfferListReq dutyDate :: UTCTime, paymentMode :: Text, numOfRides :: Int, - offerListingMetric :: Maybe UDF6 + offerListingMetric :: Maybe UDF6, + membershipStatus :: Maybe UDF9 } data UDF6 = IS_VISIBLE | IS_APPLICABLE | LIST_BASED_ON_DATE UTCTime deriving stock (Show, Eq, Generic, Read) +data UDF9 = MembershipStatus Bool + deriving stock (Show, Eq, Generic, Read) + data OfferOrder = OfferOrder { orderId :: Maybe Text, amount :: HighPrecMoney, diff --git a/lib/mobility-core/src/Kernel/External/Payment/Juspay/Types/Offer.hs b/lib/mobility-core/src/Kernel/External/Payment/Juspay/Types/Offer.hs index 208614d95..537a39405 100644 --- a/lib/mobility-core/src/Kernel/External/Payment/Juspay/Types/Offer.hs +++ b/lib/mobility-core/src/Kernel/External/Payment/Juspay/Types/Offer.hs @@ -43,6 +43,7 @@ data OfferOrder = OfferOrder udf4 :: Text, udf5 :: Text, udf6 :: Maybe Text, + udf9 :: Maybe Text, basket :: Maybe Text } deriving stock (Show, Generic) From bfd943517d1537c448698e1f2a69adf92e50fb77 Mon Sep 17 00:00:00 2001 From: khuzema786 Date: Sun, 10 May 2026 14:37:13 +0530 Subject: [PATCH 08/11] Publish for Notification should happen after xAdd --- lib/mobility-core/src/Kernel/External/Notification/GRPC/Flow.hs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/mobility-core/src/Kernel/External/Notification/GRPC/Flow.hs b/lib/mobility-core/src/Kernel/External/Notification/GRPC/Flow.hs index 85c4c93d5..65ae55aa9 100644 --- a/lib/mobility-core/src/Kernel/External/Notification/GRPC/Flow.hs +++ b/lib/mobility-core/src/Kernel/External/Notification/GRPC/Flow.hs @@ -44,8 +44,8 @@ notifyPerson cfg notificationData = Hedis.runInMasterCloudRedisCell $ do [startUuid, midOneUuid, _, _] -> T.intercalate "-" [startUuid, midOneUuid] _ -> notificationData.streamId let object = NotificationMessage notificationStreamId now - _ <- Hedis.withCrossAppRedis $ Hedis.publish "active-notification" object void $ Hedis.withCrossAppRedis $ Hedis.xAddExp ("N" <> notificationStreamId <> "{" <> (show shardId) <> "}") "*" (buildFieldValue notificationData now) cfg.streamExpirationTime + void $ Hedis.withCrossAppRedis $ Hedis.publish "active-notification" object where buildFieldValue notifData createdAt = [ ("entity.id", TE.encodeUtf8 notifData.entityId), From 95eb4de469817725617bdce9b3d1a3ae708593be Mon Sep 17 00:00:00 2001 From: Vijay Gupta Date: Thu, 7 May 2026 17:02:57 +0530 Subject: [PATCH 09/11] feat: Add MasterCloudForward integration and test executable - Introduced `MasterCloudForward` module for AWS-egress forwarding functionality. - Added `master-cloud-forward-itest` executable for integration testing. - Updated cabal and package.yaml files to include new dependencies and configurations. - Implemented tests to validate the forwarding behavior against a real-world API. --- lib/mobility-core/mobility-core.cabal | 158 ++++++++++ lib/mobility-core/package.yaml | 10 + .../src/Kernel/External/MasterCloudForward.hs | 290 ++++++++++++++++++ lib/mobility-core/test-integration/Main.hs | 223 ++++++++++++++ 4 files changed, 681 insertions(+) create mode 100644 lib/mobility-core/src/Kernel/External/MasterCloudForward.hs create mode 100644 lib/mobility-core/test-integration/Main.hs diff --git a/lib/mobility-core/mobility-core.cabal b/lib/mobility-core/mobility-core.cabal index 31c544b47..6efc09db4 100644 --- a/lib/mobility-core/mobility-core.cabal +++ b/lib/mobility-core/mobility-core.cabal @@ -132,6 +132,7 @@ library Kernel.External.Maps.OSRM.RoadsClient Kernel.External.Maps.Types Kernel.External.Maps.Utils + Kernel.External.MasterCloudForward Kernel.External.MultiModal Kernel.External.MultiModal.Interface Kernel.External.MultiModal.Interface.Google @@ -737,6 +738,163 @@ library , xmlbf default-language: Haskell2010 +executable master-cloud-forward-itest + main-is: Main.hs + other-modules: + Paths_mobility_core + hs-source-dirs: + test-integration + default-extensions: + ConstraintKinds + DataKinds + DefaultSignatures + DeriveAnyClass + DeriveFunctor + DeriveGeneric + DuplicateRecordFields + ExplicitNamespaces + FlexibleContexts + FlexibleInstances + FunctionalDependencies + GADTs + LambdaCase + MultiParamTypeClasses + MultiWayIf + NamedFieldPuns + NoImplicitPrelude + OverloadedLabels + OverloadedStrings + PatternSynonyms + PolyKinds + RankNTypes + RecordWildCards + ScopedTypeVariables + TupleSections + TypeApplications + TypeFamilies + TypeOperators + ViewPatterns + BlockArguments + TypeSynonymInstances + UndecidableInstances + ghc-options: -fwrite-ide-info -hiedir=.hie -Wall -Wcompat -Widentities -fhide-source-paths -Werror -fplugin=RecordDotPreprocessor -Wwarn=ambiguous-fields + build-depends: + aeson + , aeson-casing + , async + , base >=4.7 && <5 + , base64 + , base64-bytestring + , beam-core + , beam-mysql + , beam-postgres + , bimap + , bytestring + , case-insensitive + , casing + , cassava + , cereal + , clickhouse-haskell + , clock + , concurrency + , containers + , cryptonite + , data-default-class + , deriving-aeson + , dhall + , directory + , double-conversion + , either + , esqueleto + , euler-hs + , exceptions + , extra + , fast-logger + , filepath + , fmt + , formatting + , generic-lens + , geojson + , hashable + , hedis + , hex-text + , hspec + , http-api-data + , http-client + , http-client-tls + , http-media + , http-types + , hw-kafka-client + , insert-ordered-containers + , jwt + , kleene + , lattices + , lens + , memory + , mobility-core + , monad-logger + , morpheus-graphql-client + , mtl + , network + , openapi3 + , parsec + , passetto-client + , persistent + , persistent-postgresql + , postgresql-migration + , postgresql-simple + , process + , prometheus-client + , prometheus-metrics-ghc + , prometheus-proc + , random + , random-strings + , record-dot-preprocessor + , record-hasfield + , regex-compat + , resource-pool + , safe + , safe-exceptions + , safe-money + , scientific + , sequelize + , servant + , servant-client + , servant-client-core + , servant-multipart + , servant-multipart-api + , servant-multipart-client + , servant-openapi3 + , servant-server + , singletons-th + , slack-web + , split + , stm + , string-conversions + , tasty + , tasty-hunit + , template-haskell + , text + , text-conversions + , time + , tinylog + , transformers + , universum + , unix + , unliftio + , unliftio-core + , unordered-containers + , uuid + , vector + , wai + , wai-app-static + , wai-middleware-prometheus + , warp + , xml-conduit + , xml-types + , xmlbf + default-language: Haskell2010 + test-suite mobility-core-tests type: exitcode-stdio-1.0 main-is: Main.hs diff --git a/lib/mobility-core/package.yaml b/lib/mobility-core/package.yaml index 49930bd41..7a15cd566 100644 --- a/lib/mobility-core/package.yaml +++ b/lib/mobility-core/package.yaml @@ -187,3 +187,13 @@ tests: - test/src dependencies: - mobility-core + +executables: + master-cloud-forward-itest: + main: Main.hs + source-dirs: + - test-integration + dependencies: + - mobility-core + - async + - http-client-tls diff --git a/lib/mobility-core/src/Kernel/External/MasterCloudForward.hs b/lib/mobility-core/src/Kernel/External/MasterCloudForward.hs new file mode 100644 index 000000000..4f120eace --- /dev/null +++ b/lib/mobility-core/src/Kernel/External/MasterCloudForward.hs @@ -0,0 +1,290 @@ +{- + Copyright 2022-23, Juspay India Pvt Ltd + + This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License + + as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is + + distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + + FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero + + General Public License along with this program. If not, see . +-} +{-# LANGUAGE DeriveAnyClass #-} +{-# LANGUAGE TemplateHaskell #-} + +-- | Generic AWS-egress forwarder. Raw HTTP passthrough — no JSON envelope +-- on the wire. GCP-side @runThroughMasterCloud@ rewrites the request URL +-- and adds two headers; AWS-side @forwardEgressApp@ reads those headers +-- and replays the request from the whitelisted IP. +module Kernel.External.MasterCloudForward + ( MasterCloudProxyConfig (..), + emptyMasterCloudProxyConfig, + HasMasterCloudForwarder (..), + ForwardError (..), + ForwardAPI, + forwardAPI, + runThroughMasterCloud, + getRunApiInMasterCloud, + forwardEgressApp, + ) +where + +import qualified Control.Exception as Exc +import qualified Data.Aeson as A +import qualified Data.ByteString.Builder as BSB +import qualified Data.ByteString.Lazy as LBS +import qualified Data.CaseInsensitive as CI +import qualified Data.Sequence as Seq +import qualified Data.Text as T +import qualified Data.Text.Encoding as TE +import qualified Data.Text.Encoding.Error as TEE +import qualified EulerHS.Language as L +import EulerHS.Prelude (Free (..)) +import qualified EulerHS.Types as ET +import Kernel.Prelude +import qualified Kernel.Tools.Metrics.CoreMetrics as Metrics +import Kernel.Types.Common +import Kernel.Types.Error.BaseError +import Kernel.Types.Error.BaseError.HTTPError +import Kernel.Utils.Dhall (FromDhall) +import Kernel.Utils.Logging +import Kernel.Utils.Servant.BaseUrl (showBaseUrlText) +import Kernel.Utils.Servant.Client (HasRequestId, defaultHttpManager) +import qualified Network.HTTP.Client as Http +import qualified Network.HTTP.Types as HTTP +import qualified Network.HTTP.Types.URI as URI +import qualified Network.Wai as Wai +import Servant +import Servant.Client.Core (ClientError) +import qualified Servant.Client.Core as SCC +import qualified Servant.Client.Free as SCF +import qualified Servant.Server as SS +import System.Environment (lookupEnv) + +-- Same record on both sides. GCP populates @masterUrl@ + @masterSecret@; +-- AWS populates @masterSecret@ only. Unused fields stay 'Nothing'. +data MasterCloudProxyConfig = MasterCloudProxyConfig + { masterUrl :: Maybe BaseUrl, + masterSecret :: Maybe Text + } + deriving (Generic, Show, FromJSON, ToJSON, FromDhall) + +emptyMasterCloudProxyConfig :: MasterCloudProxyConfig +emptyMasterCloudProxyConfig = MasterCloudProxyConfig Nothing Nothing + +class HasMasterCloudForwarder env where + masterCloudProxyConfig :: env -> MasterCloudProxyConfig + +data ForwardError + = ForwardNotConfigured + | ForwardMissingSecret + | ForwardBadSecret + | ForwardMissingDestination + | ForwardInvalidUrl Text + | ForwardUpstreamFailure Text + deriving (Eq, Generic, Show, IsBecknAPIError, FromJSON, ToJSON) + +instanceExceptionWithParent 'HTTPException ''ForwardError + +instance IsBaseError ForwardError where + toMessage = \case + ForwardNotConfigured -> Just "Forwarder secret not configured on this service." + ForwardMissingSecret -> Just "Missing X-Forwarder-Secret header." + ForwardBadSecret -> Just "X-Forwarder-Secret does not match." + ForwardMissingDestination -> Just "Missing X-Forward-Destination header." + ForwardInvalidUrl u -> Just $ "Invalid X-Forward-Destination URL: " <> u + ForwardUpstreamFailure msg -> Just $ "Upstream forwarder failure: " <> msg + +instance IsHTTPError ForwardError where + toErrorCode = \case + ForwardNotConfigured -> "FORWARDER_NOT_CONFIGURED" + ForwardMissingSecret -> "FORWARDER_MISSING_SECRET" + ForwardBadSecret -> "FORWARDER_BAD_SECRET" + ForwardMissingDestination -> "FORWARDER_MISSING_DESTINATION" + ForwardInvalidUrl _ -> "FORWARDER_INVALID_URL" + ForwardUpstreamFailure _ -> "FORWARDER_UPSTREAM_FAILURE" + + toHttpCode = \case + ForwardNotConfigured -> E401 + ForwardMissingSecret -> E401 + ForwardBadSecret -> E401 + ForwardMissingDestination -> E400 + ForwardInvalidUrl _ -> E400 + ForwardUpstreamFailure _ -> E500 + +instance IsAPIError ForwardError + +type ForwardAPI = + "forward-egress" + :> Header "X-Forwarder-Secret" Text + :> Header "X-Forward-Destination" Text + :> Raw + +forwardAPI :: Proxy ForwardAPI +forwardAPI = Proxy + +-- True iff @RUN_API_IN_MASTER_CLOUD=True@ in env. +getRunApiInMasterCloud :: IO Bool +getRunApiInMasterCloud = do + envVal <- lookupEnv "RUN_API_IN_MASTER_CLOUD" + pure (fromMaybe False (readMaybe =<< envVal)) + +-- Drop-in replacement for @callAPI@. Triple-gated: env on + masterUrl set + +-- masterSecret set → forwarded. Anything else → direct call. +runThroughMasterCloud :: + ( HasMasterCloudForwarder r, + MonadReader r m, + MonadFlow m, + Log m, + Metrics.CoreMetrics m, + HasRequestId r + ) => + BaseUrl -> + ET.EulerClient a -> + Text -> + m (Either ClientError a) +runThroughMasterCloud origBaseUrl eClient desc = do + shouldForward <- liftIO getRunApiInMasterCloud + cfg <- asks masterCloudProxyConfig + case (shouldForward, cfg.masterUrl, cfg.masterSecret) of + (True, Just fwdUrl, Just secret) -> do + logDebug $ "MASTER_CLOUD_FORWARD: forwarding " <> desc <> " via " <> showBaseUrlText fwdUrl + interpretWithForwarder origBaseUrl fwdUrl secret eClient + _ -> do + logDebug $ "MASTER_CLOUD_FORWARD: direct call for " <> desc + L.callAPI' (Just defaultHttpManager) origBaseUrl eClient + +-- Walk the EulerClient free monad. Each RunRequest gets its path rewritten +-- to /forward-egress and two headers appended; the rewritten request is +-- dispatched against the AWS forwarder and the upstream response is fed +-- back to the continuation. +interpretWithForwarder :: + ( MonadFlow m, + Log m, + Metrics.CoreMetrics m, + MonadReader r m, + HasRequestId r + ) => + BaseUrl -> + BaseUrl -> + Text -> + ET.EulerClient a -> + m (Either ClientError a) +interpretWithForwarder origBaseUrl fwdUrl secret (ET.EulerClient freeClient) = + go freeClient + where + go (Pure a) = pure (Right a) + go (Free (SCF.Throw e)) = pure (Left e) + go (Free (SCF.RunRequest req cont)) = do + let pathBs = LBS.toStrict (BSB.toLazyByteString (SCC.requestPath req)) + queryBs = URI.renderQuery True (toList (SCC.requestQueryString req)) + basePart = TE.encodeUtf8 (showBaseUrlText origBaseUrl) + originalFullUrl = TE.decodeUtf8With TEE.lenientDecode (basePart <> pathBs <> queryBs) + let extraHeaders = + SCC.requestHeaders req + Seq.|> (CI.mk "X-Forwarder-Secret", TE.encodeUtf8 secret) + Seq.|> (CI.mk "X-Forward-Destination", TE.encodeUtf8 originalFullUrl) + r1 = req {SCC.requestPath = BSB.byteString "/forward-egress"} + r2 = r1 {SCC.requestQueryString = mempty} + newReq = r2 {SCC.requestHeaders = extraHeaders} + let onestep = ET.EulerClient (Free (SCF.RunRequest newReq pure)) + result <- L.callAPI' (Just defaultHttpManager) fwdUrl onestep + case result of + Left err -> pure (Left err) + Right response -> go (cont response) + +-- AWS-side WAI handler. Validates secret, then replays the incoming request +-- to @X-Forward-Destination@ and proxies the response back. +forwardEgressApp :: + MasterCloudProxyConfig -> + Http.Manager -> + Wai.Application +forwardEgressApp cfg mgr req sendResp = + case validateForwardRequest cfg req of + Left err -> sendResp (forwardErrorResponse err) + Right destReq -> proxyRequest destReq req sendResp mgr + +-- Verify @X-Forwarder-Secret@ matches and parse @X-Forward-Destination@. +-- Returns the prepared upstream request, or a typed 'ForwardError'. +validateForwardRequest :: + MasterCloudProxyConfig -> + Wai.Request -> + Either ForwardError Http.Request +validateForwardRequest cfg req = do + let hdrs = Wai.requestHeaders req + look n = TE.decodeUtf8With TEE.lenientDecode <$> lookup n hdrs + expected <- cfg.masterSecret `orFail` ForwardNotConfigured + got <- look "X-Forwarder-Secret" `orFail` ForwardMissingSecret + when (got /= expected) $ Left ForwardBadSecret + dest <- look "X-Forward-Destination" `orFail` ForwardMissingDestination + Http.parseRequest (T.unpack dest) `orFail` ForwardInvalidUrl dest + +orFail :: Maybe a -> e -> Either e a +orFail (Just a) _ = Right a +orFail Nothing e = Left e + +-- Render a typed 'ForwardError' as a JSON @{ errorCode, errorMessage }@ +-- response with the status code from 'toHttpCode'. Same envelope as the rest +-- of the Kernel error machinery. +forwardErrorResponse :: ForwardError -> Wai.Response +forwardErrorResponse err = + Wai.responseLBS + (httpCodeToStatus (toHttpCode err)) + [(CI.mk "Content-Type", "application/json")] + ( A.encode $ + A.object + [ "errorCode" A..= toErrorCode err, + "errorMessage" A..= fromMaybe "" (toMessage err) + ] + ) + +-- 'HttpCode' lives in the Kernel error stack; @http-types@ uses 'HTTP.Status'. +-- 'toServerError' bridges them via Servant's @ServerError@ which carries an +-- 'Int' code. +httpCodeToStatus :: HttpCode -> HTTP.Status +httpCodeToStatus c = + let se = toServerError c + in HTTP.mkStatus (SS.errHTTPCode se) (TE.encodeUtf8 (T.pack (SS.errReasonPhrase se))) + +proxyRequest :: + Http.Request -> + Wai.Request -> + (Wai.Response -> IO Wai.ResponseReceived) -> + Http.Manager -> + IO Wai.ResponseReceived +proxyRequest destReq0 incoming sendResp mgr = do + bodyBytes <- Wai.strictRequestBody incoming + let destReq = buildDestRequest destReq0 incoming bodyBytes + result <- Exc.try @Exc.SomeException (Http.httpLbs destReq mgr) + sendResp (toWaiResponse result) + +-- Copy method/body/headers from the incoming WAI request onto the parsed +-- destination http-client request, dropping hop-by-hop headers and the two +-- forwarder-internal headers. +buildDestRequest :: Http.Request -> Wai.Request -> LBS.ByteString -> Http.Request +buildDestRequest base incoming body = + let h = filter notHopByHop (Wai.requestHeaders incoming) + r1 = base {Http.method = Wai.requestMethod incoming} + r2 = r1 {Http.requestBody = Http.RequestBodyLBS body} + in r2 {Http.requestHeaders = h} + where + notHopByHop (n, _) = + n /= "Host" + && n /= "Content-Length" + && n /= "X-Forwarder-Secret" + && n /= "X-Forward-Destination" + +-- Translate an http-client outcome into a WAI response. http-client.httpLbs +-- auto-decompresses gzip, so we strip Content-Encoding / Content-Length / +-- Transfer-Encoding from the upstream headers to keep the body consistent. +toWaiResponse :: Either Exc.SomeException (Http.Response LBS.ByteString) -> Wai.Response +toWaiResponse = \case + Left e -> + forwardErrorResponse (ForwardUpstreamFailure (T.pack (show e))) + Right resp -> + let safeHeaders = filter (not . isStrippable . fst) (Http.responseHeaders resp) + in Wai.responseLBS (Http.responseStatus resp) safeHeaders (Http.responseBody resp) + where + isStrippable n = n == "Content-Encoding" || n == "Content-Length" || n == "Transfer-Encoding" diff --git a/lib/mobility-core/test-integration/Main.hs b/lib/mobility-core/test-integration/Main.hs new file mode 100644 index 000000000..2d2d8b916 --- /dev/null +++ b/lib/mobility-core/test-integration/Main.hs @@ -0,0 +1,223 @@ +{- + Copyright 2022-23, Juspay India Pvt Ltd + + This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License + + as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is + + distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + + FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero + + General Public License along with this program. If not, see . +-} +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE DeriveAnyClass #-} +{-# LANGUAGE DeriveGeneric #-} +{-# LANGUAGE DerivingStrategies #-} +{-# LANGUAGE DuplicateRecordFields #-} +{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE MultiParamTypeClasses #-} +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE RecordWildCards #-} +{-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE TypeApplications #-} +{-# LANGUAGE TypeOperators #-} + +-- Real-world parity test: GET https://jsonplaceholder.typicode.com/todos/1 +-- via plain `callAPI` and via `runThroughMasterCloud` (through a local +-- forwarder), then assert both decoded `Todo` values are identical. +module Main (main) where + +import qualified Control.Concurrent.Async as Async +import qualified Control.Exception as Exc +import qualified Data.HashMap.Strict as HM +import Data.IORef +import qualified Data.Text as T +import qualified EulerHS.Runtime as R +import qualified EulerHS.Types as ET +import Kernel.External.MasterCloudForward + ( HasMasterCloudForwarder (..), + MasterCloudProxyConfig (..), + forwardEgressApp, + runThroughMasterCloud, + ) +import Kernel.Prelude +import qualified Kernel.Streaming.Kafka.Producer.Types as Kafka +import qualified Kernel.Tools.Metrics.CoreMetrics.Types as Metrics +import qualified Kernel.Types.Flow as KFlow +import Kernel.Types.Logging (LogLevel (..), LoggerConfig (..)) +import qualified Kernel.Utils.IOLogging as IOLogging +import Kernel.Utils.Servant.Client (callAPI) +import qualified Network.HTTP.Client as Http +import qualified Network.HTTP.Client.TLS as HttpTLS +import qualified Network.Wai.Handler.Warp as Warp +import Servant +import qualified Servant.Client as SC +import System.Environment (setEnv) +import System.Exit (ExitCode (..), exitWith) +import qualified Prelude as P + +-- Mirrors the real /todos/{id} response shape. +data Todo = Todo + { userId :: Int, + id :: Int, + title :: Text, + completed :: Bool + } + deriving stock (Generic, Eq, Show) + deriving anyclass (FromJSON, ToJSON) + +type TodoAPI = "todos" :> Capture "id" Int :> Get '[JSON] Todo + +todoAPI :: Proxy TodoAPI +todoAPI = Proxy + +getTodoClient :: Int -> ET.EulerClient Todo +getTodoClient = ET.client todoAPI + +-- Minimal AppEnv carrying the forwarder config plus the fields +-- MonadFlow / HasARTFlow / HasCoreMetrics demand on r. +data TestEnv = TestEnv + { teProxyConfig :: MasterCloudProxyConfig, + requestId :: Maybe Text, + sessionId :: Maybe Text, + loggerEnv :: IOLogging.LoggerEnv, + shouldLogRequestId :: Bool, + kafkaProducerForART :: Maybe Kafka.KafkaProducerTools, + coreMetrics :: Metrics.CoreMetricsContainer, + version :: Metrics.DeploymentVersion, + url :: Maybe Text + } + deriving (Generic) + +instance HasMasterCloudForwarder TestEnv where + masterCloudProxyConfig = teProxyConfig + +forwarderPort :: Int +forwarderPort = 9101 + +mkBaseUrl :: SC.Scheme -> P.String -> Int -> P.String -> BaseUrl +mkBaseUrl scheme host port path = + BaseUrl + { baseUrlScheme = scheme, + baseUrlHost = host, + baseUrlPort = port, + baseUrlPath = path + } + +silentLoggerConfig :: LoggerConfig +silentLoggerConfig = + LoggerConfig + { level = ERROR, + logToFile = False, + logFilePath = "/tmp/master-cloud-forward-itest.log", + logToConsole = False, + logRawSql = False, + prettyPrinting = False + } + +bootstrapEnv :: + IOLogging.LoggerEnv -> + Metrics.CoreMetricsContainer -> + TestEnv +bootstrapEnv logger metrics = + TestEnv + { teProxyConfig = + MasterCloudProxyConfig + { masterUrl = Just (mkBaseUrl SC.Http "localhost" forwarderPort ""), + masterSecret = Just "itest-secret" + }, + requestId = Just "itest-req", + sessionId = Just "itest-sess", + loggerEnv = logger, + shouldLogRequestId = False, + kafkaProducerForART = Nothing, + coreMetrics = metrics, + version = Metrics.DeploymentVersion "itest-0.0.0", + url = Nothing + } + +-- Wraps forwardEgressApp with a hit counter so we can assert traffic actually +-- traversed it. +countingForwarderApp :: + IORef Int -> + MasterCloudProxyConfig -> + Http.Manager -> + Application +countingForwarderApp hits cfg mgr req sendResp = do + modifyIORef' hits (+ 1) + forwardEgressApp cfg mgr req sendResp + +main :: IO () +main = do + P.putStrLn "[itest] starting jsonplaceholder forwarder parity test" + + -- TLS-aware Manager: used by both the local forwarder (to reach typicode) + -- AND the EulerHS FlowRuntime (for both the direct callAPI path and the + -- GCP→forwarder hop). + mgr <- Http.newManager HttpTLS.tlsManagerSettings + + forwarderHits <- newIORef (0 :: Int) + let fwdCfg = + MasterCloudProxyConfig + { masterUrl = Nothing, + masterSecret = Just "itest-secret" + } + + forwarderAsync <- + Async.async . Warp.run forwarderPort $ + countingForwarderApp forwarderHits fwdCfg mgr + + -- Let Warp bind. + liftIO $ threadDelay 1000000 + + result <- Exc.try @Exc.SomeException $ + IOLogging.withLoggerEnv silentLoggerConfig (Just "itest") $ \logger -> do + coreMx <- Metrics.registerCoreMetricsContainer + R.withFlowRuntime Nothing $ \flowRt0 -> do + let flowRt = flowRt0 {R._httpClientManagers = HM.insert "default" mgr (R._httpClientManagers flowRt0)} + env = bootstrapEnv logger coreMx + todoBaseUrl = mkBaseUrl SC.Https "jsonplaceholder.typicode.com" 443 "" + + -- Path 1: plain callAPI — direct to typicode, baseline. + rDirect <- + KFlow.runFlowR flowRt env $ + callAPI todoBaseUrl (getTodoClient 1) "getTodo-direct" todoAPI + + -- Path 2: runThroughMasterCloud — env on, forwarder configured → + -- request goes through localhost forwarder which replays to typicode. + setEnv "RUN_API_IN_MASTER_CLOUD" "True" + rForwarded <- + KFlow.runFlowR flowRt env $ + runThroughMasterCloud todoBaseUrl (getTodoClient 1) "getTodo-forwarded" + + -- Compare. + case (rDirect, rForwarded) of + (Right todoDirect, Right todoFwd) -> do + P.putStrLn $ "[itest] direct : " <> show todoDirect + P.putStrLn $ "[itest] forwarded : " <> show todoFwd + when (todoDirect /= todoFwd) $ + fail' ("mismatch: direct=" <> show todoDirect <> " forwarded=" <> show todoFwd) + fHits <- readIORef forwarderHits + when (fHits /= 1) $ + fail' ("forwarder hits=" <> show fHits <> ", expected 1") + P.putStrLn "PASS — direct and forwarded responses are byte-for-byte identical" + P.putStrLn $ " forwarder hit count: " <> show fHits + (Left e, _) -> + fail' ("direct callAPI failed: " <> show e) + (_, Left e) -> + fail' ("forwarded call failed: " <> show e) + + Async.cancel forwarderAsync + case result of + Left e -> do + P.putStrLn $ "[itest] EXCEPTION: " <> P.show e + exitWith (ExitFailure 1) + Right _ -> exitWith ExitSuccess + where + fail' reason = do + P.putStrLn $ T.unpack ("FAIL: " <> T.pack reason) + exitWith (ExitFailure 1) From eb8cc0fbd8f6e360e78f89b8aa4ebfa59a3ba370 Mon Sep 17 00:00:00 2001 From: Vijay Gupta Date: Tue, 12 May 2026 23:26:33 +0530 Subject: [PATCH 10/11] Enhance MasterCloudForward with logging capabilities - Integrated IO logging into the forwardEgressApp and proxyRequest functions. - Added error logging for validation failures and upstream request errors. - Ensured log messages are consistent with the application's logging framework. --- .../src/Kernel/External/MasterCloudForward.hs | 30 +++++++++++++++---- 1 file changed, 25 insertions(+), 5 deletions(-) diff --git a/lib/mobility-core/src/Kernel/External/MasterCloudForward.hs b/lib/mobility-core/src/Kernel/External/MasterCloudForward.hs index 4f120eace..bb911e6c5 100644 --- a/lib/mobility-core/src/Kernel/External/MasterCloudForward.hs +++ b/lib/mobility-core/src/Kernel/External/MasterCloudForward.hs @@ -49,6 +49,7 @@ import Kernel.Types.Common import Kernel.Types.Error.BaseError import Kernel.Types.Error.BaseError.HTTPError import Kernel.Utils.Dhall (FromDhall) +import qualified Kernel.Utils.IOLogging as IOLog import Kernel.Utils.Logging import Kernel.Utils.Servant.BaseUrl (showBaseUrlText) import Kernel.Utils.Servant.Client (HasRequestId, defaultHttpManager) @@ -196,15 +197,20 @@ interpretWithForwarder origBaseUrl fwdUrl secret (ET.EulerClient freeClient) = Right response -> go (cont response) -- AWS-side WAI handler. Validates secret, then replays the incoming request --- to @X-Forward-Destination@ and proxies the response back. +-- to @X-Forward-Destination@ and proxies the response back. The 'LoggerEnv' +-- is the same one held on @AppEnv.loggerEnv@; log lines land in the standard +-- Kibana index alongside @logInfo@/@logError@ output from the rest of the app. forwardEgressApp :: + IOLog.LoggerEnv -> MasterCloudProxyConfig -> Http.Manager -> Wai.Application -forwardEgressApp cfg mgr req sendResp = +forwardEgressApp logEnv cfg mgr req sendResp = case validateForwardRequest cfg req of - Left err -> sendResp (forwardErrorResponse err) - Right destReq -> proxyRequest destReq req sendResp mgr + Left err -> do + IOLog.logOutputIO logEnv ERROR ("forward-egress validation failed: " <> T.pack (show err)) Nothing Nothing + sendResp (forwardErrorResponse err) + Right destReq -> proxyRequest logEnv destReq req sendResp mgr -- Verify @X-Forwarder-Secret@ matches and parse @X-Forward-Destination@. -- Returns the prepared upstream request, or a typed 'ForwardError'. @@ -249,15 +255,29 @@ httpCodeToStatus c = in HTTP.mkStatus (SS.errHTTPCode se) (TE.encodeUtf8 (T.pack (SS.errReasonPhrase se))) proxyRequest :: + IOLog.LoggerEnv -> Http.Request -> Wai.Request -> (Wai.Response -> IO Wai.ResponseReceived) -> Http.Manager -> IO Wai.ResponseReceived -proxyRequest destReq0 incoming sendResp mgr = do +proxyRequest logEnv destReq0 incoming sendResp mgr = do bodyBytes <- Wai.strictRequestBody incoming let destReq = buildDestRequest destReq0 incoming bodyBytes result <- Exc.try @Exc.SomeException (Http.httpLbs destReq mgr) + case result of + Left e -> do + let decode = TE.decodeUtf8With TEE.lenientDecode + destUrl = + fromMaybe (decode (Http.host destReq <> Http.path destReq <> Http.queryString destReq)) $ + decode <$> lookup "X-Forward-Destination" (Wai.requestHeaders incoming) + IOLog.logOutputIO + logEnv + ERROR + ("forward-egress upstream FAILED " <> destUrl <> " err=" <> T.pack (show e)) + Nothing + Nothing + Right _ -> pure () sendResp (toWaiResponse result) -- Copy method/body/headers from the incoming WAI request onto the parsed From 7cd18ac8e0b1812cc4b8b88c948e44ad55a9ee9a Mon Sep 17 00:00:00 2001 From: Vijay Gupta Date: Wed, 13 May 2026 00:36:10 +0530 Subject: [PATCH 11/11] Refactor ForwardAPI to exclude headers and update countingForwarderApp with logging - Removed declaration of @X-Forwarder-Secret@ and @X-Forward-Destination@ headers from ForwardAPI to prevent double parsing. - Updated countingForwarderApp to accept an IOLogging.LoggerEnv parameter for enhanced logging during request handling. --- .../src/Kernel/External/MasterCloudForward.hs | 6 +++-- lib/mobility-core/test-integration/Main.hs | 22 ++++++++++--------- 2 files changed, 16 insertions(+), 12 deletions(-) diff --git a/lib/mobility-core/src/Kernel/External/MasterCloudForward.hs b/lib/mobility-core/src/Kernel/External/MasterCloudForward.hs index bb911e6c5..c63972f06 100644 --- a/lib/mobility-core/src/Kernel/External/MasterCloudForward.hs +++ b/lib/mobility-core/src/Kernel/External/MasterCloudForward.hs @@ -117,10 +117,12 @@ instance IsHTTPError ForwardError where instance IsAPIError ForwardError +-- The @X-Forwarder-Secret@ and @X-Forward-Destination@ headers are +-- intentionally not declared here: validation happens inside the WAI +-- 'forwardEgressApp', and declaring them on the Servant API caused them +-- to be parsed twice per request. type ForwardAPI = "forward-egress" - :> Header "X-Forwarder-Secret" Text - :> Header "X-Forward-Destination" Text :> Raw forwardAPI :: Proxy ForwardAPI diff --git a/lib/mobility-core/test-integration/Main.hs b/lib/mobility-core/test-integration/Main.hs index 2d2d8b916..5c1a809e1 100644 --- a/lib/mobility-core/test-integration/Main.hs +++ b/lib/mobility-core/test-integration/Main.hs @@ -143,13 +143,14 @@ bootstrapEnv logger metrics = -- Wraps forwardEgressApp with a hit counter so we can assert traffic actually -- traversed it. countingForwarderApp :: + IOLogging.LoggerEnv -> IORef Int -> MasterCloudProxyConfig -> Http.Manager -> Application -countingForwarderApp hits cfg mgr req sendResp = do +countingForwarderApp logEnv hits cfg mgr req sendResp = do modifyIORef' hits (+ 1) - forwardEgressApp cfg mgr req sendResp + forwardEgressApp logEnv cfg mgr req sendResp main :: IO () main = do @@ -167,15 +168,15 @@ main = do masterSecret = Just "itest-secret" } - forwarderAsync <- - Async.async . Warp.run forwarderPort $ - countingForwarderApp forwarderHits fwdCfg mgr - - -- Let Warp bind. - liftIO $ threadDelay 1000000 - result <- Exc.try @Exc.SomeException $ IOLogging.withLoggerEnv silentLoggerConfig (Just "itest") $ \logger -> do + forwarderAsync <- + Async.async . Warp.run forwarderPort $ + countingForwarderApp logger forwarderHits fwdCfg mgr + + -- Let Warp bind. + liftIO $ threadDelay 1000000 + coreMx <- Metrics.registerCoreMetricsContainer R.withFlowRuntime Nothing $ \flowRt0 -> do let flowRt = flowRt0 {R._httpClientManagers = HM.insert "default" mgr (R._httpClientManagers flowRt0)} @@ -211,7 +212,8 @@ main = do (_, Left e) -> fail' ("forwarded call failed: " <> show e) - Async.cancel forwarderAsync + Async.cancel forwarderAsync + case result of Left e -> do P.putStrLn $ "[itest] EXCEPTION: " <> P.show e