Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions services/api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ ipnet = "2"
criterion = { version = "0.5", features = ["html_reports"] }
testcontainers = { version = "0.23", features = ["tokio"] }
testcontainers-modules = { version = "0.11", features = ["redis", "tokio"] }
wiremock = "0.6"

[[bench]]
name = "api_key_auth"
Expand Down
171 changes: 143 additions & 28 deletions services/api/src/email/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use validator::ValidateEmail;
use crate::cache::RedisCache;
use crate::config::Config;
use crate::email::templates::EmailTemplateEngine;
use crate::metrics::Metrics;

/// Configuration for email idempotency deduplication.
#[derive(Clone, Debug)]
Expand Down Expand Up @@ -87,6 +88,8 @@ pub struct EmailService {
client: reqwest::Client,
cache: Option<RedisCache>,
pub idempotency: IdempotencyConfig,
metrics: Option<Metrics>,
sendgrid_base_url: String,
}

impl EmailService {
Expand All @@ -98,6 +101,15 @@ impl EmailService {
config: Config,
cache: Option<RedisCache>,
idempotency: IdempotencyConfig,
) -> Result<Self> {
Self::with_cache_and_metrics(config, cache, idempotency, None)
}

pub fn with_cache_and_metrics(
config: Config,
cache: Option<RedisCache>,
idempotency: IdempotencyConfig,
metrics: Option<Metrics>,
) -> Result<Self> {
let template_engine = EmailTemplateEngine::new()?;
let client = reqwest::Client::builder()
Expand All @@ -110,9 +122,17 @@ impl EmailService {
client,
cache,
idempotency,
metrics,
sendgrid_base_url: "https://api.sendgrid.com".to_string(),
})
}

#[cfg(test)]
pub fn with_base_url(mut self, base_url: String) -> Self {
self.sendgrid_base_url = base_url;
self
}

/// Send an email using SendGrid
pub async fn send_email(
&self,
Expand Down Expand Up @@ -218,38 +238,78 @@ impl EmailService {
}
});

// Send via SendGrid
let response = self
.client
.post("https://api.sendgrid.com/v3/mail/send")
.bearer_auth(api_key)
.json(&payload)
.send()
.await
.context("Failed to send email via SendGrid")?;
// Send via SendGrid with retry (max 3 attempts, exp backoff + jitter)
const MAX_ATTEMPTS: u32 = 3;
let mut last_error = String::new();

for attempt in 0..MAX_ATTEMPTS {
let response = self
.client
.post(format!("{}/v3/mail/send", self.sendgrid_base_url))
.bearer_auth(api_key)
.json(&payload)
.send()
.await
.context("Failed to send email via SendGrid")?;

if !response.status().is_success() {
let status = response.status();
let body = response.text().await.unwrap_or_default();
anyhow::bail!("SendGrid API error {}: {}", status, body);
}

// Extract message ID from response headers
let message_id = response
.headers()
.get("x-message-id")
.and_then(|v| v.to_str().ok())
.unwrap_or("unknown")
.to_string();

tracing::info!(
"Email sent successfully to {} using template {} (message_id: {})",
recipient,
template_name,
message_id
);
if status.is_success() {
let message_id = response
.headers()
.get("x-message-id")
.and_then(|v| v.to_str().ok())
.unwrap_or("unknown")
.to_string();

Ok(message_id)
tracing::info!(
"Email sent successfully to {} using template {} (message_id: {})",
recipient,
template_name,
message_id
);
return Ok(message_id);
}

let should_retry = status.as_u16() == 429 || status.is_server_error();
let retry_after_header = response
.headers()
.get("retry-after")
.and_then(|v| v.to_str().ok())
.and_then(|s| s.parse::<u64>().ok());

if !should_retry || attempt + 1 == MAX_ATTEMPTS {
last_error = format!("SendGrid API error {}", status);
break;
}

let reason = if status.as_u16() == 429 { "rate_limited" } else { "server_error" };
if let Some(m) = &self.metrics {
m.observe_sendgrid_retry(reason);
}

// Respect Retry-After (seconds) if present, else exp backoff + jitter
let delay_ms: u64 = if let Some(secs) = retry_after_header {
secs * 1_000
} else {
let jitter = (std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.subsec_millis() % 100) as u64;
(1u64 << attempt) * 100 + jitter
};

tracing::warn!(
attempt = attempt + 1,
delay_ms,
reason,
"SendGrid transient error {}, retrying",
status
);
tokio::time::sleep(Duration::from_millis(delay_ms)).await;
}

anyhow::bail!(last_error);
}

/// Preview email without sending (for testing/development)
Expand Down Expand Up @@ -399,6 +459,61 @@ mod tests {
);
}

/// Two 429s followed by a 202: the service should succeed on the third attempt.
#[tokio::test]
async fn retry_succeeds_after_two_429s() {
use wiremock::matchers::{method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};

let mock_server = MockServer::start().await;

// First two calls return 429, third returns 202.
Mock::given(method("POST"))
.and(path("/v3/mail/send"))
.respond_with(ResponseTemplate::new(429))
.up_to_n_times(2)
.mount(&mock_server)
.await;

Mock::given(method("POST"))
.and(path("/v3/mail/send"))
.respond_with(
ResponseTemplate::new(202)
.insert_header("x-message-id", "test-msg-id"),
)
.mount(&mock_server)
.await;

let mut config = Config::from_env();
config.sendgrid_api_key = Some("test-key".to_string());
config.from_email = Some("from@example.com".to_string());

let metrics = crate::metrics::Metrics::new().unwrap();
let service = EmailService::with_cache_and_metrics(
config,
None,
IdempotencyConfig::default(),
Some(metrics.clone()),
)
.unwrap()
.with_base_url(mock_server.uri());

let data = serde_json::json!({"confirm_url": "https://example.com/confirm?token=abc"});
let result = service
.send_email("user@example.com", "newsletter_confirmation", &data)
.await;

assert!(result.is_ok(), "expected success after retries, got: {:?}", result);
assert_eq!(result.unwrap(), "test-msg-id");

// Verify the retry counter was incremented twice (one per 429)
let rendered = metrics.render().unwrap();
assert!(
rendered.contains(r#"sendgrid_retries_total{reason="rate_limited"} 2"#),
"expected 2 rate_limited retries in metrics:\n{rendered}"
);
}

#[test]
fn valid_address_passes() {
assert!(sanitize_email("user@example.com").is_ok());
Expand Down
15 changes: 15 additions & 0 deletions services/api/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub struct Metrics {
db_pool_connections_idle: IntGaugeVec,
db_pool_acquire_duration: HistogramVec,
rate_limit_rejections: IntCounterVec,
sendgrid_retries: IntCounterVec,
}

impl Metrics {
Expand Down Expand Up @@ -120,6 +121,12 @@ impl Metrics {
)
.context("rate_limit_rejections metric")?;

let sendgrid_retries = IntCounterVec::new(
prometheus::Opts::new("sendgrid_retries_total", "SendGrid send retries by reason"),
&["reason"],
)
.context("sendgrid_retries metric")?;

registry.register(Box::new(cache_hits.clone()))?;
registry.register(Box::new(cache_misses.clone()))?;
registry.register(Box::new(invalidations.clone()))?;
Expand All @@ -132,6 +139,7 @@ impl Metrics {
registry.register(Box::new(db_pool_connections_idle.clone()))?;
registry.register(Box::new(db_pool_acquire_duration.clone()))?;
registry.register(Box::new(rate_limit_rejections.clone()))?;
registry.register(Box::new(sendgrid_retries.clone()))?;

Ok(Self {
registry,
Expand All @@ -147,6 +155,7 @@ impl Metrics {
db_pool_connections_idle,
db_pool_acquire_duration,
rate_limit_rejections,
sendgrid_retries,
})
}

Expand Down Expand Up @@ -226,6 +235,12 @@ impl Metrics {
.inc();
}

/// Increment the SendGrid retry counter.
/// `reason` should be "rate_limited" (429) or "server_error" (5xx).
pub fn observe_sendgrid_retry(&self, reason: &str) {
self.sendgrid_retries.with_label_values(&[reason]).inc();
}

pub fn render(&self) -> anyhow::Result<String> {
let mut buffer = vec![];
let encoder = TextEncoder::new();
Expand Down
Loading
Loading