From 9045b432ebe9fa72a01cc6fb4825ea36aad39fc9 Mon Sep 17 00:00:00 2001 From: LiYi Date: Wed, 18 Mar 2026 10:25:38 +0800 Subject: [PATCH] Merge commit '2edddcd8efd100ca69a7083110ef7ef53199efca' into feat_redis_bigkey_s Signed-off-by: LiYi Solve redis big key problem --- internal/job/job.go | 82 +++++++++++++++++++++++++++++++++++++++ manager/job/job.go | 11 ++++++ manager/job/sync_peers.go | 24 ++++++++++-- scheduler/job/job.go | 23 ++++++++--- 4 files changed, 132 insertions(+), 8 deletions(-) diff --git a/internal/job/job.go b/internal/job/job.go index abcc219187d..1368d1c0233 100644 --- a/internal/job/job.go +++ b/internal/job/job.go @@ -33,6 +33,11 @@ import ( "github.com/redis/go-redis/v9" logger "d7y.io/dragonfly/v2/internal/dflog" + pkgredis "d7y.io/dragonfly/v2/pkg/redis" +) + +var ( + ErrRedisNotInitialized = errors.New("redis client not initialized") ) type Config struct { @@ -48,6 +53,7 @@ type Job struct { Server *machinery.Server Worker *machinery.Worker Queue Queue + rdb redis.UniversalClient } func New(cfg *Config, queue Queue) (*Job, error) { @@ -101,6 +107,27 @@ func (t *Job) LaunchWorker(consumerTag string, concurrency int) error { return t.Worker.Launch() } +// InitRdb initializes the redis client for job. +func (t *Job) InitRdb(cfg *Config) error { + if t.rdb != nil { + return nil + } + // Initialize redis client. + var rdb redis.UniversalClient + rdb, err := pkgredis.NewRedis(&redis.UniversalOptions{ + Addrs: cfg.Addrs, + MasterName: cfg.MasterName, + Username: cfg.Username, + Password: cfg.Password, + DB: cfg.BackendDB, + }) + if err != nil { + return err + } + t.rdb = rdb + return nil +} + type GroupJobState struct { GroupUUID string State string @@ -150,6 +177,61 @@ func (t *Job) GetGroupJobState(groupID string) (*GroupJobState, error) { }, nil } +// SetTaskResults sets task results to redis by key. +func (t *Job) SetTaskResults(data []string, jobName string) (string, error) { + if t.rdb == nil { + return "", ErrRedisNotInitialized + } + + if len(data) == 0 { + return "", nil + } + + key := fmt.Sprintf("%s_results:%s:%d", jobName, t.Queue.String(), time.Now().Unix()) + + ctx := context.Background() + + // save results to redis set + for _, val := range data { + if err := t.rdb.SAdd(ctx, key, val).Err(); err != nil { + logger.Errorf("Failed to SAdd: %v", err) + return "", err + } + } + + // set expire time for the results + if err := t.rdb.Expire(ctx, key, time.Duration(DefaultResultsExpireIn)*time.Second).Err(); err != nil { + logger.Errorf("Failed to set expire: %v", err) + return "", err + } + + return key, nil +} + +// GetTaskResults gets task results from redis by key. +// results is not sorted, caller should sort it if needed. +func (t *Job) GetTaskResults(key string) ([]string, error) { + if t.rdb == nil { + return nil, ErrRedisNotInitialized + } + var cursor uint64 = 0 + var results []string + for { + items, nextCursor, err := t.rdb.SScan(context.Background(), key, cursor, "", 50).Result() + if err != nil { + break + } + for _, item := range items { + results = append(results, item) + } + cursor = nextCursor + if cursor == 0 { + break + } + } + return results, nil +} + func MarshalResponse(v any) (string, error) { b, err := json.Marshal(v) if err != nil { diff --git a/manager/job/job.go b/manager/job/job.go index bac8c7a6e86..0b91fef58cd 100644 --- a/manager/job/job.go +++ b/manager/job/job.go @@ -55,6 +55,17 @@ func New(cfg *config.Config, gdb *gorm.DB) (*Job, error) { if err != nil { return nil, err } + err = j.InitRdb(&internaljob.Config{ + Addrs: cfg.Database.Redis.Addrs, + MasterName: cfg.Database.Redis.MasterName, + Username: cfg.Database.Redis.Username, + Password: cfg.Database.Redis.Password, + BrokerDB: cfg.Database.Redis.BrokerDB, + BackendDB: cfg.Database.Redis.BackendDB, + }) + if err != nil { + return nil, err + } var certPool *x509.CertPool if cfg.Job.Preheat.TLS != nil { diff --git a/manager/job/sync_peers.go b/manager/job/sync_peers.go index e3dc5c4fafd..ad8dc05a871 100644 --- a/manager/job/sync_peers.go +++ b/manager/job/sync_peers.go @@ -163,12 +163,30 @@ func (s *syncPeers) createSyncPeers(ctx context.Context, scheduler models.Schedu return nil, err } - // Unmarshal sync peer task result. - var hosts []*resource.Host - if err := internaljob.UnmarshalResponse(results, &hosts); err != nil { + if len(results) == 0 || results[0].String() == "" { + logger.Infof("sync peers task %s result is empty", task.UUID) + return nil, fmt.Errorf("sync peers task result is empty") + } + + res_key := results[0].String() + + hostStringList, err := s.job.GetTaskResults(res_key) + if err != nil { + logger.Errorf("failed to get sync peer data: %v", err) return nil, err } + var hosts []*resource.Host + + for _, item := range hostStringList { + var host resource.Host + if err := internaljob.UnmarshalRequest(item, &host); err != nil { + logger.Errorf("Unmarshal sync peer item fail. item: %s, err: %v", item, err) + continue + } + hosts = append(hosts, &host) + } + return hosts, nil } diff --git a/scheduler/job/job.go b/scheduler/job/job.go index e497cfdbf0b..7a7f3a09aa8 100644 --- a/scheduler/job/job.go +++ b/scheduler/job/job.go @@ -117,6 +117,11 @@ func New(cfg *config.Config, resource resource.Resource) (Job, error) { logger.Errorf("register preheat job to local queue error: %s", err.Error()) return nil, err } + err = localJob.InitRdb(redisConfig) + if err != nil { + logger.Errorf("initialize redis for local job error: %s", err.Error()) + return nil, err + } return t, nil } @@ -283,17 +288,25 @@ func (j *job) preheatV2(ctx context.Context, req *internaljob.PreheatRequest) er // syncPeers is a job to sync peers. func (j *job) syncPeers() (string, error) { - var hosts []*resource.Host + var hosts []string j.resource.HostManager().Range(func(key, value any) bool { host, ok := value.(*resource.Host) if !ok { logger.Errorf("invalid host %v %v", key, value) return true } - - hosts = append(hosts, host) + json, err := internaljob.MarshalResponse(host) + if err != nil { + logger.Errorf("Marshal host fail. host: %s, err: %v", host, err) + return true + } + hosts = append(hosts, json) return true }) - - return internaljob.MarshalResponse(hosts) + key, errors := j.localJob.SetTaskResults(hosts, internaljob.SyncPeersJob) + if errors != nil { + logger.Errorf("Failed to set sync peers task results: %v", errors) + return "", errors + } + return key, nil }