Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,29 @@
## [Unreleased]

### Upgrade Notes

* This update includes an additional cleanup job for the `UserSession` table to correctly remove expired/invalid user sessions from the table -- addressing unbounded growth. However, we recommend running the following query against your Postgres instance to ensure a bulk-cleanup:

```sql

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mrkaye97 Can you give this a look-over? Seems fine to me but would like to make sure!

BEGIN;

LOCK TABLE "UserSession" IN ACCESS EXCLUSIVE MODE;

CREATE TABLE tmp_UserSession AS
SELECT *
FROM "UserSession"
WHERE
("userId" IS NOT NULL AND "expiresAt" >= NOW())
OR
("userId" IS NULL AND "createdAt" >= NOW() - INTERVAL '24 hours');

DROP TABLE "UserSession";

ALTER TABLE tmp_UserSession RENAME TO "UserSession";

COMMIT;
```

## [0.89.0] - 2026-06-09

Hatchet v0.89.0 introduces a range of updates to the platform, consisting largely of performance improvements and bug fixes to the engine, alongside several user-experience changes to the dashboard.
Expand Down
105 changes: 64 additions & 41 deletions internal/services/controllers/retention/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,43 +21,46 @@ type RetentionController interface {
}

type RetentionControllerImpl struct {
l *zerolog.Logger
repo v1.Repository
dv datautils.DataDecoderValidator
s gocron.Scheduler
tenantAlerter *alerting.TenantAlertManager
a *hatcheterrors.Wrapped
p *partition.Partition
dataRetention bool
workerRetention bool
queueRetention bool
l *zerolog.Logger
repo v1.Repository
dv datautils.DataDecoderValidator
s gocron.Scheduler
tenantAlerter *alerting.TenantAlertManager
a *hatcheterrors.Wrapped
p *partition.Partition
dataRetention bool
workerRetention bool
queueRetention bool
userSessionRetention bool
}

type RetentionControllerOpt func(*RetentionControllerOpts)

type RetentionControllerOpts struct {
l *zerolog.Logger
repo v1.Repository
dv datautils.DataDecoderValidator
ta *alerting.TenantAlertManager
alerter hatcheterrors.Alerter
p *partition.Partition
dataRetention bool
workerRetention bool
queueRetention bool
l *zerolog.Logger
repo v1.Repository
dv datautils.DataDecoderValidator
ta *alerting.TenantAlertManager
alerter hatcheterrors.Alerter
p *partition.Partition
dataRetention bool
workerRetention bool
queueRetention bool
userSessionRetention bool
}

func defaultRetentionControllerOpts() *RetentionControllerOpts {
logger := logger.NewDefaultLogger("retention-controller")
alerter := hatcheterrors.NoOpAlerter{}

return &RetentionControllerOpts{
l: &logger,
dv: datautils.NewDataDecoderValidator(),
alerter: alerter,
dataRetention: true,
queueRetention: true,
workerRetention: false,
l: &logger,
dv: datautils.NewDataDecoderValidator(),
alerter: alerter,
dataRetention: true,
queueRetention: true,
workerRetention: false,
userSessionRetention: true,
}
}

Expand Down Expand Up @@ -141,44 +144,50 @@ func New(fs ...RetentionControllerOpt) (*RetentionControllerImpl, error) {
a.WithData(map[string]interface{}{"service": "retention-controller"})

return &RetentionControllerImpl{
l: opts.l,
repo: opts.repo,
dv: opts.dv,
s: s,
tenantAlerter: opts.ta,
a: a,
p: opts.p,
dataRetention: opts.dataRetention,
workerRetention: opts.workerRetention,
queueRetention: opts.queueRetention,
l: opts.l,
repo: opts.repo,
dv: opts.dv,
s: s,
tenantAlerter: opts.ta,
a: a,
p: opts.p,
dataRetention: opts.dataRetention,
workerRetention: opts.workerRetention,
queueRetention: opts.queueRetention,
userSessionRetention: opts.userSessionRetention,
}, nil
}

func (rc *RetentionControllerImpl) Start() (func() error, error) {
rc.l.Debug().Msg("starting retention controller")

ctx, cancel := context.WithCancel(context.Background())
var err error
ctx, cancel := context.WithCancelCause(context.Background())

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

praise: Beautiful 😎

defer func() {
if err != nil {
cancel(err)
}
}()

if rc.queueRetention {
queueInterval := time.Second * 60

_, err := rc.s.NewJob(
_, err = rc.s.NewJob(
gocron.DurationJob(queueInterval),
gocron.NewTask(
rc.runDeleteMessageQueueItems(ctx),
),
)

if err != nil {
cancel()
return nil, fmt.Errorf("could not set up runDeleteMessageQueueItems: %w", err)
}
}

if rc.workerRetention {
workerInterval := 24 * time.Hour

_, err := rc.s.NewJob(
_, err = rc.s.NewJob(
gocron.DurationJob(workerInterval),
gocron.NewTask(
rc.runCleanupOldWorkers(ctx),
Expand All @@ -187,15 +196,29 @@ func (rc *RetentionControllerImpl) Start() (func() error, error) {
)

if err != nil {
cancel()
return nil, fmt.Errorf("could not set up runCleanupOldWorkers: %w", err)
}
}

if rc.userSessionRetention {
userSessionInterval := 1 * time.Hour

_, err = rc.s.NewJob(
gocron.DurationJob(userSessionInterval),
gocron.NewTask(
rc.runCleanupUserSessions(ctx),
),
)

if err != nil {
return nil, fmt.Errorf("could not set up runCleanupUserSessions: %w", err)
}
}

rc.s.Start()

cleanup := func() error {
cancel()
cancel(nil)

if err := rc.s.Shutdown(); err != nil {
return fmt.Errorf("could not shutdown scheduler: %w", err)
Expand Down
20 changes: 20 additions & 0 deletions internal/services/controllers/retention/user_session.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package retention

import (
"context"
"time"
)

func (rc *RetentionControllerImpl) runCleanupUserSessions(ctx context.Context) func() {
return func() {
rc.l.Debug().Ctx(ctx).Msg("retention controller: cleaning up user sessions")

ctx, cancel := context.WithTimeout(ctx, time.Second*20)
defer cancel()

err := rc.repo.UserSession().CleanupUserSessions(ctx)
if err != nil {
rc.l.Err(err).Ctx(ctx).Msg("user sessions cleanup failed")
}
}
}
18 changes: 18 additions & 0 deletions pkg/repository/sqlcv1/users.sql
Original file line number Diff line number Diff line change
Expand Up @@ -154,3 +154,21 @@ DELETE FROM
WHERE
"id" = @id::uuid
RETURNING *;

-- name: CleanupUserSessions :execresult
WITH sessions_to_delete AS (
SELECT "id"
FROM "UserSession"
WHERE
"expiresAt" < NOW()
OR (
"userId" IS NULL
AND "createdAt" < NOW() - INTERVAL '24 hours'
)
ORDER BY "createdAt" ASC
LIMIT @batchSize::int
FOR UPDATE SKIP LOCKED
)
DELETE FROM "UserSession" AS us
USING sessions_to_delete AS s
WHERE us."id" = s."id";
24 changes: 24 additions & 0 deletions pkg/repository/sqlcv1/users.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 24 additions & 0 deletions pkg/repository/user_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package repository

import (
"context"
"errors"
"time"

"github.com/google/uuid"
"github.com/jackc/pgx/v5"

"github.com/hatchet-dev/hatchet/pkg/repository/sqlchelpers"
"github.com/hatchet-dev/hatchet/pkg/repository/sqlcv1"
Expand Down Expand Up @@ -37,6 +39,8 @@ type UserSessionRepository interface {
Update(ctx context.Context, sessionId uuid.UUID, opts *UpdateSessionOpts) (*sqlcv1.UserSession, error)
Delete(ctx context.Context, sessionId uuid.UUID) (*sqlcv1.UserSession, error)
GetById(ctx context.Context, sessionId uuid.UUID) (*sqlcv1.UserSession, error)

CleanupUserSessions(ctx context.Context) error
}

type userSessionRepository struct {
Expand Down Expand Up @@ -165,3 +169,23 @@ func (r *userSessionRepository) GetById(ctx context.Context, sessionId uuid.UUID
sessionId,
)
}

func (r *userSessionRepository) CleanupUserSessions(ctx context.Context) error {
const batchSize int32 = 1000

for {
result, err := r.queries.CleanupUserSessions(ctx, r.pool, batchSize)
if err != nil {
if errors.Is(err, pgx.ErrNoRows) {
return nil
}
return err
}
Comment on lines +178 to +183

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If no sessions have expired (and no data is found), we're going to be returning a false positive pgx.ErrNoRows type -- which would be logged as an error etc.

Think we should be guarding against this with a errors.Is(err, pgx.ErrNoRows) check that returns a nil error if true.


if result.RowsAffected() < int64(batchSize) {
break
}
}

return nil
}
Loading