Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions internal/job/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,3 +149,10 @@ type DeleteFailureTask struct {
HostType string `json:"host_type"`
Description string `json:"description"`
}

// SyncPeerRequest defines the request parameters for sync peer task.
type SyncPeerRequest struct {
GroupUUID string `json:"group_uuid"`
GroupTaskCount int `json:"group_task_count"`
TotalGroupTask int `json:"total_group_task"`
}
10 changes: 7 additions & 3 deletions manager/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,9 @@ type SyncPeersConfig struct {

// BatchSize is the batch size when operating gorm database.
BatchSize int `yaml:"batchSize" mapstructure:"batchSize"`

// SyncBatchSize is the batch size when get host info from scheduler.
SyncBatchSize int `yaml:"syncBatchSize" mapstructure:"syncBatchSize"`
}

type PreheatTLSClientConfig struct {
Expand Down Expand Up @@ -446,9 +449,10 @@ func New() *Config {
TLS: PreheatTLSClientConfig{},
},
SyncPeers: SyncPeersConfig{
Interval: DefaultJobSyncPeersInterval,
Timeout: DefaultJobSyncPeersTimeout,
BatchSize: DefaultJobSyncPeersBatchSize,
Interval: DefaultJobSyncPeersInterval,
Timeout: DefaultJobSyncPeersTimeout,
BatchSize: DefaultJobSyncPeersDatabaseBatchSize,
SyncBatchSize: DefaultJobSyncPeersBatchSize,
},
},
Metrics: MetricsConfig{
Expand Down
6 changes: 4 additions & 2 deletions manager/config/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,10 @@ const (
// DefaultClusterJobRateLimit is default rate limit(requests per second) for job Open API by cluster.
DefaultClusterJobRateLimit uint32 = 10

// DefaultJobSyncPeersBatchSize is the default batch size for syncing all peers information from the scheduler and
// operating on the database.
// DefaultJobSyncPeersDatabaseBatchSize is the default batch size for operating on the database.
DefaultJobSyncPeersDatabaseBatchSize = 1

// DefaultJobSyncPeersBatchSize is the default batch size for syncing all peers information from the scheduler.
DefaultJobSyncPeersBatchSize = 500
)

Expand Down
50 changes: 39 additions & 11 deletions manager/job/sync_peers.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"context"
"errors"
"fmt"
"reflect"
"sync"
"time"

Expand Down Expand Up @@ -161,25 +162,52 @@ func (s *syncPeers) createSyncPeers(ctx context.Context, scheduler models.Schedu
return nil, err
}

// Initialize task signature.
task := &machineryv1tasks.Signature{
UUID: fmt.Sprintf("task_%s", uuid.New().String()),
Name: internaljob.SyncPeersJob,
RoutingKey: queue.String(),
var tasks []*machineryv1tasks.Signature
for i := 0; i < s.config.Job.SyncPeers.SyncBatchSize; i++ {
// Initialize task signature.
task := &machineryv1tasks.Signature{
UUID: fmt.Sprintf("task_%s", uuid.New().String()),
Name: internaljob.SyncPeersJob,
RoutingKey: queue.String(),
}
tasks = append(tasks, task)
}
if len(tasks) <= 0 {
return []*resource.Host{}, nil
}
group, err := machineryv1tasks.NewGroup(tasks...)
if err != nil {
return nil, err
}
for i, signature := range group.Tasks {
// Set signature args.
args, err := internaljob.MarshalRequest(internaljob.SyncPeerRequest{
GroupUUID: group.GroupUUID,
GroupTaskCount: i,
TotalGroupTask: len(group.Tasks),
})
if err != nil {
logger.Errorf("[sync-peers] sync peer task marshal request: %v, error: %v", args, err)
return nil, err
}
signature.Args = args
}

// Send sync peer task to worker.
logger.Infof("[sync-peers] create sync peers in queue %v, task: %#v", queue, task)
asyncResult, err := s.job.Server.SendTaskWithContext(ctx, task)
logger.Infof("[sync-peers] create sync peers in queue %v, tasksNumber %d, taskUUID %v", queue, len(tasks), tasks[0].UUID)
asyncResults, err := s.job.Server.SendGroupWithContext(ctx, group, 0)
if err != nil {
logger.Errorf("[sync-peers] create sync peers in queue %v failed", queue, err)
return nil, err
}

// Get sync peer task result.
results, err := asyncResult.GetWithTimeout(s.config.Job.SyncPeers.Timeout, DefaultTaskPollingInterval)
if err != nil {
return nil, err
results := make([]reflect.Value, 0, len(asyncResults))
for _, asyncResult := range asyncResults {
result, err := asyncResult.GetWithTimeout(s.config.Job.SyncPeers.Timeout, DefaultTaskPollingInterval)
if err != nil {
return nil, err
}
results = append(results, result...)
}

// Unmarshal sync peer task result.
Expand Down
41 changes: 39 additions & 2 deletions scheduler/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"io"
"net"
"slices"
"sort"
"strconv"
"sync"

Expand Down Expand Up @@ -860,7 +861,18 @@ func (j *job) selectPeers(ips []string, count *uint32, percentage *uint32, log *
}

// syncPeers is a job to sync peers.
func (j *job) syncPeers() (string, error) {
func (j *job) syncPeers(_ context.Context, data string) (string, error) {
req := &internaljob.SyncPeerRequest{}
if err := internaljob.UnmarshalRequest(data, req); err != nil {
logger.Errorf("[sync-peers] unmarshal request err: %s, request body: %s", err.Error(), data)
return "", err
}

if err := validator.New().Struct(req); err != nil {
logger.Errorf("[sync-peers] sync peer task %s validate failed: %s", req.GroupUUID, err.Error())
return "", err
}

hosts := make([]*resource.Host, 0, j.resource.HostManager().Len())
j.resource.HostManager().Range(func(key, value any) bool {
host, ok := value.(*resource.Host)
Expand All @@ -873,7 +885,32 @@ func (j *job) syncPeers() (string, error) {
return true
})

return internaljob.MarshalResponse(hosts)
// Sort hosts by ID.
sort.Slice(hosts, func(i, j int) bool {
return hosts[i].ID < hosts[j].ID
})

// We ignore changes in hosts data across multiple task requests within the same group.
// TODO: (Could this cause problems? Perhaps we should create a snapshot of the hosts for same group to ensure that the host information is completely accurate.)
// Split hosts by task count.
total := len(hosts)
groupSize := total / req.TotalGroupTask
remainder := total % req.TotalGroupTask

var start int
if req.GroupTaskCount < remainder {
start = req.GroupTaskCount * (groupSize + 1)
} else {
start = remainder*(groupSize+1) + (req.GroupTaskCount-remainder)*groupSize
}
end := start + groupSize
if req.GroupTaskCount < remainder {
end++
}

taskHosts := hosts[start:end]

return internaljob.MarshalResponse(taskHosts)
}

// getTask is a job to get task.
Expand Down
Loading