Dragonfly2
277 строк · 8.3 Кб
1/*
2* Copyright 2020 The Dragonfly Authors
3*
4* Licensed under the Apache License, Version 2.0 (the "License");
5* you may not use this file except in compliance with the License.
6* You may obtain a copy of the License at
7*
8* http://www.apache.org/licenses/LICENSE-2.0
9*
10* Unless required by applicable law or agreed to in writing, software
11* distributed under the License is distributed on an "AS IS" BASIS,
12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13* See the License for the specific language governing permissions and
14* limitations under the License.
15*/
16
17package service18
19import (20"context"21"errors"22"fmt"23
24machineryv1tasks "github.com/RichardKnop/machinery/v1/tasks"25
26logger "d7y.io/dragonfly/v2/internal/dflog"27"d7y.io/dragonfly/v2/manager/models"28"d7y.io/dragonfly/v2/manager/types"29"d7y.io/dragonfly/v2/pkg/retry"30"d7y.io/dragonfly/v2/pkg/slices"31"d7y.io/dragonfly/v2/pkg/structure"32)
33
34func (s *service) CreatePreheatJob(ctx context.Context, json types.CreatePreheatJobRequest) (*models.Job, error) {35candidateSchedulers, err := s.findCandidateSchedulers(ctx, json.SchedulerClusterIDs)36if err != nil {37return nil, err38}39
40groupJobState, err := s.job.CreatePreheat(ctx, candidateSchedulers, json.Args)41if err != nil {42return nil, err43}44
45var candidateSchedulerClusters []models.SchedulerCluster46for _, candidateScheduler := range candidateSchedulers {47candidateSchedulerClusters = append(candidateSchedulerClusters, candidateScheduler.SchedulerCluster)48}49
50args, err := structure.StructToMap(json.Args)51if err != nil {52return nil, err53}54
55job := models.Job{56TaskID: groupJobState.GroupUUID,57BIO: json.BIO,58Type: json.Type,59State: groupJobState.State,60Args: args,61UserID: json.UserID,62SchedulerClusters: candidateSchedulerClusters,63}64
65if err := s.db.WithContext(ctx).Create(&job).Error; err != nil {66return nil, err67}68
69go s.pollingJob(context.Background(), job.ID, job.TaskID)70
71return &job, nil72}
73
74func (s *service) findCandidateSchedulers(ctx context.Context, schedulerClusterIDs []uint) ([]models.Scheduler, error) {75var candidateSchedulers []models.Scheduler76if len(schedulerClusterIDs) != 0 {77// Find the scheduler clusters by request.78for _, schedulerClusterID := range schedulerClusterIDs {79schedulerCluster := models.SchedulerCluster{}80if err := s.db.WithContext(ctx).First(&schedulerCluster, schedulerClusterID).Error; err != nil {81return nil, err82}83
84var schedulers []models.Scheduler85if err := s.db.WithContext(ctx).Preload("SchedulerCluster").Find(&schedulers, models.Scheduler{86SchedulerClusterID: schedulerCluster.ID,87State: models.SchedulerStateActive,88}).Error; err != nil {89return nil, err90}91
92// Scan the schedulers to find the first scheduler that supports preheat.93for _, scheduler := range schedulers {94if slices.Contains(scheduler.Features, types.SchedulerFeaturePreheat) {95candidateSchedulers = append(candidateSchedulers, scheduler)96break97}98}99}100} else {101// Find all of the scheduler clusters that has active schedulers.102var candidateSchedulerClusters []models.SchedulerCluster103if err := s.db.WithContext(ctx).Find(&candidateSchedulerClusters).Error; err != nil {104return nil, err105}106
107for _, candidateSchedulerCluster := range candidateSchedulerClusters {108var schedulers []models.Scheduler109if err := s.db.WithContext(ctx).Preload("SchedulerCluster").Find(&schedulers, models.Scheduler{110SchedulerClusterID: candidateSchedulerCluster.ID,111State: models.SchedulerStateActive,112}).Error; err != nil {113continue114}115
116// Scan the schedulers to find the first scheduler that supports preheat.117for _, scheduler := range schedulers {118if slices.Contains(scheduler.Features, types.SchedulerFeaturePreheat) {119candidateSchedulers = append(candidateSchedulers, scheduler)120break121}122}123}124}125
126if len(candidateSchedulers) == 0 {127return nil, errors.New("candidate schedulers not found")128}129
130return candidateSchedulers, nil131}
132
133func (s *service) pollingJob(ctx context.Context, id uint, groupID string) {134var (135job models.Job136log = logger.WithGroupAndJobID(groupID, fmt.Sprint(id))137)138if _, _, err := retry.Run(ctx, 5, 10, 480, func() (any, bool, error) {139groupJob, err := s.job.GetGroupJobState(groupID)140if err != nil {141log.Errorf("polling group failed: %s", err.Error())142return nil, false, err143}144
145result, err := structure.StructToMap(groupJob)146if err != nil {147log.Errorf("polling group failed: %s", err.Error())148return nil, false, err149}150
151if err := s.db.WithContext(ctx).First(&job, id).Updates(models.Job{152State: groupJob.State,153Result: result,154}).Error; err != nil {155log.Errorf("polling group failed: %s", err.Error())156return nil, true, err157}158
159switch job.State {160case machineryv1tasks.StateSuccess:161log.Info("polling group succeeded")162return nil, true, nil163case machineryv1tasks.StateFailure:164log.Error("polling group failed")165return nil, true, nil166default:167msg := fmt.Sprintf("polling job state is %s", job.State)168log.Info(msg)169return nil, false, errors.New(msg)170}171}); err != nil {172log.Errorf("polling group failed: %s", err.Error())173}174
175// Polling timeout and failed.176if job.State != machineryv1tasks.StateSuccess && job.State != machineryv1tasks.StateFailure {177job := models.Job{}178if err := s.db.WithContext(ctx).First(&job, id).Updates(models.Job{179State: machineryv1tasks.StateFailure,180}).Error; err != nil {181log.Errorf("polling group failed: %s", err.Error())182}183log.Error("polling group timeout")184}185}
186
187func (s *service) DestroyJob(ctx context.Context, id uint) error {188job := models.Job{}189if err := s.db.WithContext(ctx).First(&job, id).Error; err != nil {190return err191}192
193if err := s.db.WithContext(ctx).Unscoped().Delete(&models.Job{}, id).Error; err != nil {194return err195}196
197return nil198}
199
200func (s *service) UpdateJob(ctx context.Context, id uint, json types.UpdateJobRequest) (*models.Job, error) {201job := models.Job{}202if err := s.db.WithContext(ctx).Preload("SeedPeerClusters").Preload("SchedulerClusters").First(&job, id).Updates(models.Job{203BIO: json.BIO,204UserID: json.UserID,205}).Error; err != nil {206return nil, err207}208
209return &job, nil210}
211
212func (s *service) GetJob(ctx context.Context, id uint) (*models.Job, error) {213job := models.Job{}214if err := s.db.WithContext(ctx).Preload("SeedPeerClusters").Preload("SchedulerClusters").First(&job, id).Error; err != nil {215return nil, err216}217
218return &job, nil219}
220
221func (s *service) GetJobs(ctx context.Context, q types.GetJobsQuery) ([]models.Job, int64, error) {222var count int64223var jobs []models.Job224if err := s.db.WithContext(ctx).Scopes(models.Paginate(q.Page, q.PerPage)).Where(&models.Job{225Type: q.Type,226State: q.State,227UserID: q.UserID,228}).Order("created_at DESC").Find(&jobs).Limit(-1).Offset(-1).Count(&count).Error; err != nil {229return nil, 0, err230}231
232return jobs, count, nil233}
234
235func (s *service) AddJobToSchedulerClusters(ctx context.Context, id, schedulerClusterIDs []uint) error {236job := models.Job{}237if err := s.db.WithContext(ctx).First(&job, id).Error; err != nil {238return err239}240
241var schedulerClusters []*models.SchedulerCluster242for _, schedulerClusterID := range schedulerClusterIDs {243schedulerCluster := models.SchedulerCluster{}244if err := s.db.WithContext(ctx).First(&schedulerCluster, schedulerClusterID).Error; err != nil {245return err246}247schedulerClusters = append(schedulerClusters, &schedulerCluster)248}249
250if err := s.db.WithContext(ctx).Model(&job).Association("SchedulerClusters").Append(schedulerClusters); err != nil {251return err252}253
254return nil255}
256
257func (s *service) AddJobToSeedPeerClusters(ctx context.Context, id, seedPeerClusterIDs []uint) error {258job := models.Job{}259if err := s.db.WithContext(ctx).First(&job, id).Error; err != nil {260return err261}262
263var seedPeerClusters []*models.SeedPeerCluster264for _, seedPeerClusterID := range seedPeerClusterIDs {265seedPeerCluster := models.SeedPeerCluster{}266if err := s.db.WithContext(ctx).First(&seedPeerCluster, seedPeerClusterID).Error; err != nil {267return err268}269seedPeerClusters = append(seedPeerClusters, &seedPeerCluster)270}271
272if err := s.db.WithContext(ctx).Model(&job).Association("SeedPeerClusters").Append(seedPeerClusters); err != nil {273return err274}275
276return nil277}
278