Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 82 additions & 0 deletions internal/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ import (
"github.com/redis/go-redis/v9"

logger "d7y.io/dragonfly/v2/internal/dflog"
pkgredis "d7y.io/dragonfly/v2/pkg/redis"
)

var (
ErrRedisNotInitialized = errors.New("redis client not initialized")
)

type Config struct {
Expand All @@ -48,6 +53,7 @@ type Job struct {
Server *machinery.Server
Worker *machinery.Worker
Queue Queue
rdb redis.UniversalClient
}

func New(cfg *Config, queue Queue) (*Job, error) {
Expand Down Expand Up @@ -101,6 +107,27 @@ func (t *Job) LaunchWorker(consumerTag string, concurrency int) error {
return t.Worker.Launch()
}

// InitRdb initializes the redis client for job.
func (t *Job) InitRdb(cfg *Config) error {
if t.rdb != nil {
return nil
}
// Initialize redis client.
var rdb redis.UniversalClient
rdb, err := pkgredis.NewRedis(&redis.UniversalOptions{
Addrs: cfg.Addrs,
MasterName: cfg.MasterName,
Username: cfg.Username,
Password: cfg.Password,
DB: cfg.BackendDB,
})
if err != nil {
return err
}
t.rdb = rdb
return nil
}

type GroupJobState struct {
GroupUUID string
State string
Expand Down Expand Up @@ -150,6 +177,61 @@ func (t *Job) GetGroupJobState(groupID string) (*GroupJobState, error) {
}, nil
}

// SetTaskResults sets task results to redis by key.
func (t *Job) SetTaskResults(data []string, jobName string) (string, error) {
if t.rdb == nil {
return "", ErrRedisNotInitialized
}

if len(data) == 0 {
return "", nil
}

key := fmt.Sprintf("%s_results:%s:%d", jobName, t.Queue.String(), time.Now().Unix())

ctx := context.Background()

// save results to redis set
for _, val := range data {
if err := t.rdb.SAdd(ctx, key, val).Err(); err != nil {

@CooooolFrog CooooolFrog Mar 31, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the number of peers becomes excessively large, wouldn't this approach actually place a greater strain on redis?

With 100,000 Peers, this implies that the redis.sAdd() method must be invoked 100,000 times. Although this eliminates the issue of a single Key holding an excessively large volume of data, it introduces a new problem: executing redis.sAdd() 100,000 times incurs significant overhead—potentially resulting in a situation even worse than the original one.

Regardless of how redis.SAdd() is invoked, it alters only the write method, not the resulting data structure. Whether you write 100,000 entries in a single operation or perform 100,000 separate writes of one entry each, the outcome remains the same: a single Set containing 100,000 members.

In a separate PR(#4654), the implementation utilizes sharding to limit the number of Peers associated with any single Key. This method resolves the issue of excessive data storage under a single Key without incurring the overhead of repeated redis.sAdd() calls. I believe this implementation approach is superior; what are your thoughts?

To summarize, I think the correct approach is to attempt to shard the data using different keys, rather than writing data to the same key in small, frequent increments.

logger.Errorf("Failed to SAdd: %v", err)
return "", err
}
}

// set expire time for the results
if err := t.rdb.Expire(ctx, key, time.Duration(DefaultResultsExpireIn)*time.Second).Err(); err != nil {
logger.Errorf("Failed to set expire: %v", err)
return "", err
}

return key, nil
}

// GetTaskResults gets task results from redis by key.
// results is not sorted, caller should sort it if needed.
func (t *Job) GetTaskResults(key string) ([]string, error) {
if t.rdb == nil {
return nil, ErrRedisNotInitialized
}
var cursor uint64 = 0
var results []string
for {
items, nextCursor, err := t.rdb.SScan(context.Background(), key, cursor, "", 50).Result()
if err != nil {
break
}
for _, item := range items {
results = append(results, item)
}
cursor = nextCursor
if cursor == 0 {
break
}
}
return results, nil
}

func MarshalResponse(v any) (string, error) {
b, err := json.Marshal(v)
if err != nil {
Expand Down
11 changes: 11 additions & 0 deletions manager/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,17 @@ func New(cfg *config.Config, gdb *gorm.DB) (*Job, error) {
if err != nil {
return nil, err
}
err = j.InitRdb(&internaljob.Config{
Addrs: cfg.Database.Redis.Addrs,
MasterName: cfg.Database.Redis.MasterName,
Username: cfg.Database.Redis.Username,
Password: cfg.Database.Redis.Password,
BrokerDB: cfg.Database.Redis.BrokerDB,
BackendDB: cfg.Database.Redis.BackendDB,
})
if err != nil {
return nil, err
}

var certPool *x509.CertPool
if cfg.Job.Preheat.TLS != nil {
Expand Down
24 changes: 21 additions & 3 deletions manager/job/sync_peers.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,12 +163,30 @@ func (s *syncPeers) createSyncPeers(ctx context.Context, scheduler models.Schedu
return nil, err
}

// Unmarshal sync peer task result.
var hosts []*resource.Host
if err := internaljob.UnmarshalResponse(results, &hosts); err != nil {
if len(results) == 0 || results[0].String() == "" {
logger.Infof("sync peers task %s result is empty", task.UUID)
return nil, fmt.Errorf("sync peers task result is empty")
}

res_key := results[0].String()

hostStringList, err := s.job.GetTaskResults(res_key)
if err != nil {
logger.Errorf("failed to get sync peer data: %v", err)
return nil, err
}

var hosts []*resource.Host

for _, item := range hostStringList {
var host resource.Host
if err := internaljob.UnmarshalRequest(item, &host); err != nil {
logger.Errorf("Unmarshal sync peer item fail. item: %s, err: %v", item, err)
continue
}
hosts = append(hosts, &host)
}

return hosts, nil
}

Expand Down
23 changes: 18 additions & 5 deletions scheduler/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,11 @@ func New(cfg *config.Config, resource resource.Resource) (Job, error) {
logger.Errorf("register preheat job to local queue error: %s", err.Error())
return nil, err
}
err = localJob.InitRdb(redisConfig)
if err != nil {
logger.Errorf("initialize redis for local job error: %s", err.Error())
return nil, err
}

return t, nil
}
Expand Down Expand Up @@ -283,17 +288,25 @@ func (j *job) preheatV2(ctx context.Context, req *internaljob.PreheatRequest) er

// syncPeers is a job to sync peers.
func (j *job) syncPeers() (string, error) {
var hosts []*resource.Host
var hosts []string
j.resource.HostManager().Range(func(key, value any) bool {
host, ok := value.(*resource.Host)
if !ok {
logger.Errorf("invalid host %v %v", key, value)
return true
}

hosts = append(hosts, host)
json, err := internaljob.MarshalResponse(host)
if err != nil {
logger.Errorf("Marshal host fail. host: %s, err: %v", host, err)
return true
}
hosts = append(hosts, json)
return true
})

return internaljob.MarshalResponse(hosts)
key, errors := j.localJob.SetTaskResults(hosts, internaljob.SyncPeersJob)
if errors != nil {
logger.Errorf("Failed to set sync peers task results: %v", errors)
return "", errors
}
return key, nil
}
Loading