From 8f6584ab4ebb4d36084e360910ae207fc18e76d3 Mon Sep 17 00:00:00 2001 From: Niraj Nair Date: Fri, 5 Jun 2026 20:03:46 +0530 Subject: [PATCH] fix(engine): add scheduled user session cleanup via retention controller --- CHANGELOG.md | 26 +++ .../controllers/retention/controller.go | 105 +++++---- .../controllers/retention/user_session.go | 20 ++ pkg/repository/sqlcv1/users.sql | 18 ++ pkg/repository/sqlcv1/users.sql.go | 24 ++ pkg/repository/user_session.go | 24 ++ pkg/repository/user_session_test.go | 217 ++++++++++++++++++ 7 files changed, 393 insertions(+), 41 deletions(-) create mode 100644 internal/services/controllers/retention/user_session.go create mode 100644 pkg/repository/user_session_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index b0f1f9e672..3a8eba7d95 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,29 @@ +## [Unreleased] + +### Upgrade Notes + +* This update includes an additional cleanup job for the `UserSession` table to correctly remove expired/invalid user sessions from the table -- addressing unbounded growth. However, we recommend running the following query against your Postgres instance to ensure a bulk-cleanup: + +```sql +BEGIN; + +LOCK TABLE "UserSession" IN ACCESS EXCLUSIVE MODE; + +CREATE TABLE tmp_UserSession AS +SELECT * +FROM "UserSession" +WHERE + ("userId" IS NOT NULL AND "expiresAt" >= NOW()) +OR + ("userId" IS NULL AND "createdAt" >= NOW() - INTERVAL '24 hours'); + +DROP TABLE "UserSession"; + +ALTER TABLE tmp_UserSession RENAME TO "UserSession"; + +COMMIT; +``` + ## [0.89.0] - 2026-06-09 Hatchet v0.89.0 introduces a range of updates to the platform, consisting largely of performance improvements and bug fixes to the engine, alongside several user-experience changes to the dashboard. diff --git a/internal/services/controllers/retention/controller.go b/internal/services/controllers/retention/controller.go index aead8c8158..488c553c88 100644 --- a/internal/services/controllers/retention/controller.go +++ b/internal/services/controllers/retention/controller.go @@ -21,30 +21,32 @@ type RetentionController interface { } type RetentionControllerImpl struct { - l *zerolog.Logger - repo v1.Repository - dv datautils.DataDecoderValidator - s gocron.Scheduler - tenantAlerter *alerting.TenantAlertManager - a *hatcheterrors.Wrapped - p *partition.Partition - dataRetention bool - workerRetention bool - queueRetention bool + l *zerolog.Logger + repo v1.Repository + dv datautils.DataDecoderValidator + s gocron.Scheduler + tenantAlerter *alerting.TenantAlertManager + a *hatcheterrors.Wrapped + p *partition.Partition + dataRetention bool + workerRetention bool + queueRetention bool + userSessionRetention bool } type RetentionControllerOpt func(*RetentionControllerOpts) type RetentionControllerOpts struct { - l *zerolog.Logger - repo v1.Repository - dv datautils.DataDecoderValidator - ta *alerting.TenantAlertManager - alerter hatcheterrors.Alerter - p *partition.Partition - dataRetention bool - workerRetention bool - queueRetention bool + l *zerolog.Logger + repo v1.Repository + dv datautils.DataDecoderValidator + ta *alerting.TenantAlertManager + alerter hatcheterrors.Alerter + p *partition.Partition + dataRetention bool + workerRetention bool + queueRetention bool + userSessionRetention bool } func defaultRetentionControllerOpts() *RetentionControllerOpts { @@ -52,12 +54,13 @@ func defaultRetentionControllerOpts() *RetentionControllerOpts { alerter := hatcheterrors.NoOpAlerter{} return &RetentionControllerOpts{ - l: &logger, - dv: datautils.NewDataDecoderValidator(), - alerter: alerter, - dataRetention: true, - queueRetention: true, - workerRetention: false, + l: &logger, + dv: datautils.NewDataDecoderValidator(), + alerter: alerter, + dataRetention: true, + queueRetention: true, + workerRetention: false, + userSessionRetention: true, } } @@ -141,28 +144,35 @@ func New(fs ...RetentionControllerOpt) (*RetentionControllerImpl, error) { a.WithData(map[string]interface{}{"service": "retention-controller"}) return &RetentionControllerImpl{ - l: opts.l, - repo: opts.repo, - dv: opts.dv, - s: s, - tenantAlerter: opts.ta, - a: a, - p: opts.p, - dataRetention: opts.dataRetention, - workerRetention: opts.workerRetention, - queueRetention: opts.queueRetention, + l: opts.l, + repo: opts.repo, + dv: opts.dv, + s: s, + tenantAlerter: opts.ta, + a: a, + p: opts.p, + dataRetention: opts.dataRetention, + workerRetention: opts.workerRetention, + queueRetention: opts.queueRetention, + userSessionRetention: opts.userSessionRetention, }, nil } func (rc *RetentionControllerImpl) Start() (func() error, error) { rc.l.Debug().Msg("starting retention controller") - ctx, cancel := context.WithCancel(context.Background()) + var err error + ctx, cancel := context.WithCancelCause(context.Background()) + defer func() { + if err != nil { + cancel(err) + } + }() if rc.queueRetention { queueInterval := time.Second * 60 - _, err := rc.s.NewJob( + _, err = rc.s.NewJob( gocron.DurationJob(queueInterval), gocron.NewTask( rc.runDeleteMessageQueueItems(ctx), @@ -170,7 +180,6 @@ func (rc *RetentionControllerImpl) Start() (func() error, error) { ) if err != nil { - cancel() return nil, fmt.Errorf("could not set up runDeleteMessageQueueItems: %w", err) } } @@ -178,7 +187,7 @@ func (rc *RetentionControllerImpl) Start() (func() error, error) { if rc.workerRetention { workerInterval := 24 * time.Hour - _, err := rc.s.NewJob( + _, err = rc.s.NewJob( gocron.DurationJob(workerInterval), gocron.NewTask( rc.runCleanupOldWorkers(ctx), @@ -187,15 +196,29 @@ func (rc *RetentionControllerImpl) Start() (func() error, error) { ) if err != nil { - cancel() return nil, fmt.Errorf("could not set up runCleanupOldWorkers: %w", err) } } + if rc.userSessionRetention { + userSessionInterval := 1 * time.Hour + + _, err = rc.s.NewJob( + gocron.DurationJob(userSessionInterval), + gocron.NewTask( + rc.runCleanupUserSessions(ctx), + ), + ) + + if err != nil { + return nil, fmt.Errorf("could not set up runCleanupUserSessions: %w", err) + } + } + rc.s.Start() cleanup := func() error { - cancel() + cancel(nil) if err := rc.s.Shutdown(); err != nil { return fmt.Errorf("could not shutdown scheduler: %w", err) diff --git a/internal/services/controllers/retention/user_session.go b/internal/services/controllers/retention/user_session.go new file mode 100644 index 0000000000..332f1aa681 --- /dev/null +++ b/internal/services/controllers/retention/user_session.go @@ -0,0 +1,20 @@ +package retention + +import ( + "context" + "time" +) + +func (rc *RetentionControllerImpl) runCleanupUserSessions(ctx context.Context) func() { + return func() { + rc.l.Debug().Ctx(ctx).Msg("retention controller: cleaning up user sessions") + + ctx, cancel := context.WithTimeout(ctx, time.Second*20) + defer cancel() + + err := rc.repo.UserSession().CleanupUserSessions(ctx) + if err != nil { + rc.l.Err(err).Ctx(ctx).Msg("user sessions cleanup failed") + } + } +} diff --git a/pkg/repository/sqlcv1/users.sql b/pkg/repository/sqlcv1/users.sql index 6b6ee8708c..ac95a0411e 100644 --- a/pkg/repository/sqlcv1/users.sql +++ b/pkg/repository/sqlcv1/users.sql @@ -154,3 +154,21 @@ DELETE FROM WHERE "id" = @id::uuid RETURNING *; + +-- name: CleanupUserSessions :execresult +WITH sessions_to_delete AS ( + SELECT "id" + FROM "UserSession" + WHERE + "expiresAt" < NOW() + OR ( + "userId" IS NULL + AND "createdAt" < NOW() - INTERVAL '24 hours' + ) + ORDER BY "createdAt" ASC + LIMIT @batchSize::int + FOR UPDATE SKIP LOCKED +) +DELETE FROM "UserSession" AS us +USING sessions_to_delete AS s +WHERE us."id" = s."id"; diff --git a/pkg/repository/sqlcv1/users.sql.go b/pkg/repository/sqlcv1/users.sql.go index ecb2939773..0e117ebe93 100644 --- a/pkg/repository/sqlcv1/users.sql.go +++ b/pkg/repository/sqlcv1/users.sql.go @@ -9,9 +9,33 @@ import ( "context" "github.com/google/uuid" + "github.com/jackc/pgx/v5/pgconn" "github.com/jackc/pgx/v5/pgtype" ) +const cleanupUserSessions = `-- name: CleanupUserSessions :execresult +WITH sessions_to_delete AS ( + SELECT "id" + FROM "UserSession" + WHERE + "expiresAt" < NOW() + OR ( + "userId" IS NULL + AND "createdAt" < NOW() - INTERVAL '24 hours' + ) + ORDER BY "createdAt" ASC + LIMIT $1::int + FOR UPDATE SKIP LOCKED +) +DELETE FROM "UserSession" AS us +USING sessions_to_delete AS s +WHERE us."id" = s."id" +` + +func (q *Queries) CleanupUserSessions(ctx context.Context, db DBTX, batchsize int32) (pgconn.CommandTag, error) { + return db.Exec(ctx, cleanupUserSessions, batchsize) +} + const createUser = `-- name: CreateUser :one INSERT INTO "User" ( "id", diff --git a/pkg/repository/user_session.go b/pkg/repository/user_session.go index fcd1dd431e..5ff8742b9e 100644 --- a/pkg/repository/user_session.go +++ b/pkg/repository/user_session.go @@ -2,9 +2,11 @@ package repository import ( "context" + "errors" "time" "github.com/google/uuid" + "github.com/jackc/pgx/v5" "github.com/hatchet-dev/hatchet/pkg/repository/sqlchelpers" "github.com/hatchet-dev/hatchet/pkg/repository/sqlcv1" @@ -37,6 +39,8 @@ type UserSessionRepository interface { Update(ctx context.Context, sessionId uuid.UUID, opts *UpdateSessionOpts) (*sqlcv1.UserSession, error) Delete(ctx context.Context, sessionId uuid.UUID) (*sqlcv1.UserSession, error) GetById(ctx context.Context, sessionId uuid.UUID) (*sqlcv1.UserSession, error) + + CleanupUserSessions(ctx context.Context) error } type userSessionRepository struct { @@ -165,3 +169,23 @@ func (r *userSessionRepository) GetById(ctx context.Context, sessionId uuid.UUID sessionId, ) } + +func (r *userSessionRepository) CleanupUserSessions(ctx context.Context) error { + const batchSize int32 = 1000 + + for { + result, err := r.queries.CleanupUserSessions(ctx, r.pool, batchSize) + if err != nil { + if errors.Is(err, pgx.ErrNoRows) { + return nil + } + return err + } + + if result.RowsAffected() < int64(batchSize) { + break + } + } + + return nil +} diff --git a/pkg/repository/user_session_test.go b/pkg/repository/user_session_test.go new file mode 100644 index 0000000000..dbd31442a0 --- /dev/null +++ b/pkg/repository/user_session_test.go @@ -0,0 +1,217 @@ +//go:build !e2e && !load && !rampup && !integration + +package repository + +import ( + "context" + "testing" + "time" + + "github.com/google/uuid" + "github.com/hatchet-dev/hatchet/pkg/repository/sqlcv1" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/rs/zerolog" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func createUserSessionRepository(pool *pgxpool.Pool) *userSessionRepository { + logger := zerolog.Nop() + shared := &sharedRepository{ + pool: pool, + ddlPool: pool, + l: &logger, + queries: sqlcv1.New(), + } + return &userSessionRepository{ + sharedRepository: shared, + } +} + +func createTestUser(t *testing.T, pool *pgxpool.Pool) uuid.UUID { + t.Helper() + userId := uuid.New() + _, err := pool.Exec(ctx(t), ` + INSERT INTO "User" ("id", "email", "emailVerified", "name", "createdAt", "updatedAt") + VALUES ($1, $2, false, $3, NOW(), NOW()) + `, userId, userId.String()+"@test.com", "Test User") + require.NoError(t, err) + return userId +} + +func sessionExists(t *testing.T, pool *pgxpool.Pool, sessionId uuid.UUID) bool { + t.Helper() + var exists bool + err := pool.QueryRow(ctx(t), `SELECT EXISTS(SELECT 1 FROM "UserSession" WHERE "id" = $1)`, sessionId).Scan(&exists) + require.NoError(t, err) + return exists +} + +func countExistingSessions(t *testing.T, pool *pgxpool.Pool, sessionIds []uuid.UUID) int { + t.Helper() + if len(sessionIds) == 0 { + return 0 + } + + var count int + err := pool.QueryRow(ctx(t), `SELECT COUNT(*) FROM "UserSession" WHERE "id" = ANY($1::uuid[])`, sessionIds).Scan(&count) + require.NoError(t, err) + + return count +} + +func TestCleanupUserSessions(t *testing.T) { + pool, cleanup := setupPostgresWithMigration(t) + defer cleanup() + + testUserId := createTestUser(t, pool) + repo := createUserSessionRepository(pool) + + err := repo.CleanupUserSessions(ctx(t)) + require.NoError(t, err) + + testCases := []struct { + name string + expiresAt time.Time + userId *uuid.UUID + ageHours int + shouldDelete bool + }{ + { + name: "expired-session-with-user", + expiresAt: time.Now().UTC().Add(-1 * time.Hour), + userId: &testUserId, + ageHours: 0, + shouldDelete: true, + }, + { + name: "just-expired-session", + expiresAt: time.Now().UTC().Add(-1 * time.Second), + userId: &testUserId, + ageHours: 0, + shouldDelete: true, + }, + { + name: "unauthenticated-old-session", + expiresAt: time.Now().UTC().Add(48 * time.Hour), + userId: nil, + ageHours: 25, + shouldDelete: true, + }, + { + name: "valid-session-with-user", + expiresAt: time.Now().UTC().Add(24 * time.Hour), + userId: &testUserId, + ageHours: 0, + shouldDelete: false, + }, + { + name: "unauthenticated-recent-session", + expiresAt: time.Now().UTC().Add(48 * time.Hour), + userId: nil, + ageHours: 1, + shouldDelete: false, + }, + { + name: "session-expires-in-1-second", + expiresAt: time.Now().UTC().Add(1 * time.Second), + userId: &testUserId, + ageHours: 0, + shouldDelete: false, + }, + } + + sessionIds := make(map[string]uuid.UUID) + for _, tc := range testCases { + sessionId := uuid.New() + sessionIds[tc.name] = sessionId + + var err error + if tc.ageHours > 0 { + _, err = pool.Exec(ctx(t), ` + INSERT INTO "UserSession" ("id", "expiresAt", "userId", "data", "createdAt", "updatedAt") + VALUES ($1, $2 AT TIME ZONE 'UTC', $3, '{}', NOW() - INTERVAL '1 hour' * $4, NOW() - INTERVAL '1 hour' * $4) + `, sessionId, tc.expiresAt, tc.userId, tc.ageHours) + } else { + _, err = pool.Exec(ctx(t), ` + INSERT INTO "UserSession" ("id", "expiresAt", "userId", "data", "createdAt", "updatedAt") + VALUES ($1, $2 AT TIME ZONE 'UTC', $3, '{}', NOW(), NOW()) + `, sessionId, tc.expiresAt, tc.userId) + } + require.NoError(t, err, "failed to insert session: %s", tc.name) + } + + err = repo.CleanupUserSessions(ctx(t)) + require.NoError(t, err) + + for _, tc := range testCases { + sessionId := sessionIds[tc.name] + exists := sessionExists(t, pool, sessionId) + + if tc.shouldDelete { + assert.False(t, exists, "session '%s' should be deleted from DB", tc.name) + } else { + assert.True(t, exists, "session '%s' should exist in DB", tc.name) + } + } +} + +func TestCleanupUserSessions_MultiBatch(t *testing.T) { + pool, cleanup := setupPostgresWithMigration(t) + defer cleanup() + + testUserId := createTestUser(t, pool) + repo := createUserSessionRepository(pool) + + const totalSessions = 1500 + sessionIds := make([]uuid.UUID, totalSessions) + + for i := 0; i < totalSessions; i++ { + sessionIds[i] = uuid.New() + _, err := pool.Exec(ctx(t), ` + INSERT INTO "UserSession" ("id", "expiresAt", "userId", "data", "createdAt", "updatedAt") + VALUES ($1, $2 AT TIME ZONE 'UTC', $3, '{}', NOW(), NOW()) + `, sessionIds[i], time.Now().UTC().Add(-1*time.Hour), testUserId) + require.NoError(t, err) + } + + err := repo.CleanupUserSessions(ctx(t)) + require.NoError(t, err) + + existingCount := countExistingSessions(t, pool, sessionIds) + assert.Equal(t, 0, existingCount, "all %d sessions should be deleted", totalSessions) +} + +func TestCleanupUserSessions_ExactBatchBoundary(t *testing.T) { + pool, cleanup := setupPostgresWithMigration(t) + defer cleanup() + + testUserId := createTestUser(t, pool) + repo := createUserSessionRepository(pool) + + const totalSessions = 1000 + sessionIds := make([]uuid.UUID, totalSessions) + + for i := 0; i < totalSessions; i++ { + sessionIds[i] = uuid.New() + _, err := pool.Exec(ctx(t), ` + INSERT INTO "UserSession" ("id", "expiresAt", "userId", "data", "createdAt", "updatedAt") + VALUES ($1, $2 AT TIME ZONE 'UTC', $3, '{}', NOW(), NOW()) + `, sessionIds[i], time.Now().UTC().Add(-1*time.Hour), testUserId) + require.NoError(t, err) + } + + err := repo.CleanupUserSessions(ctx(t)) + require.NoError(t, err) + + existingCount := countExistingSessions(t, pool, sessionIds) + assert.Equal(t, 0, existingCount, "all %d sessions should be deleted", totalSessions) +} + +func ctx(t *testing.T) context.Context { + t.Helper() + ctx := context.Background() + ctx, cancel := context.WithTimeout(ctx, 30*time.Second) + t.Cleanup(cancel) + return ctx +}