Dragonfly2

Форк
0
277 строк · 8.3 Кб
1
/*
2
 *     Copyright 2020 The Dragonfly Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *      http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package service
18

19
import (
20
	"context"
21
	"errors"
22
	"fmt"
23

24
	machineryv1tasks "github.com/RichardKnop/machinery/v1/tasks"
25

26
	logger "d7y.io/dragonfly/v2/internal/dflog"
27
	"d7y.io/dragonfly/v2/manager/models"
28
	"d7y.io/dragonfly/v2/manager/types"
29
	"d7y.io/dragonfly/v2/pkg/retry"
30
	"d7y.io/dragonfly/v2/pkg/slices"
31
	"d7y.io/dragonfly/v2/pkg/structure"
32
)
33

34
func (s *service) CreatePreheatJob(ctx context.Context, json types.CreatePreheatJobRequest) (*models.Job, error) {
35
	candidateSchedulers, err := s.findCandidateSchedulers(ctx, json.SchedulerClusterIDs)
36
	if err != nil {
37
		return nil, err
38
	}
39

40
	groupJobState, err := s.job.CreatePreheat(ctx, candidateSchedulers, json.Args)
41
	if err != nil {
42
		return nil, err
43
	}
44

45
	var candidateSchedulerClusters []models.SchedulerCluster
46
	for _, candidateScheduler := range candidateSchedulers {
47
		candidateSchedulerClusters = append(candidateSchedulerClusters, candidateScheduler.SchedulerCluster)
48
	}
49

50
	args, err := structure.StructToMap(json.Args)
51
	if err != nil {
52
		return nil, err
53
	}
54

55
	job := models.Job{
56
		TaskID:            groupJobState.GroupUUID,
57
		BIO:               json.BIO,
58
		Type:              json.Type,
59
		State:             groupJobState.State,
60
		Args:              args,
61
		UserID:            json.UserID,
62
		SchedulerClusters: candidateSchedulerClusters,
63
	}
64

65
	if err := s.db.WithContext(ctx).Create(&job).Error; err != nil {
66
		return nil, err
67
	}
68

69
	go s.pollingJob(context.Background(), job.ID, job.TaskID)
70

71
	return &job, nil
72
}
73

74
func (s *service) findCandidateSchedulers(ctx context.Context, schedulerClusterIDs []uint) ([]models.Scheduler, error) {
75
	var candidateSchedulers []models.Scheduler
76
	if len(schedulerClusterIDs) != 0 {
77
		// Find the scheduler clusters by request.
78
		for _, schedulerClusterID := range schedulerClusterIDs {
79
			schedulerCluster := models.SchedulerCluster{}
80
			if err := s.db.WithContext(ctx).First(&schedulerCluster, schedulerClusterID).Error; err != nil {
81
				return nil, err
82
			}
83

84
			var schedulers []models.Scheduler
85
			if err := s.db.WithContext(ctx).Preload("SchedulerCluster").Find(&schedulers, models.Scheduler{
86
				SchedulerClusterID: schedulerCluster.ID,
87
				State:              models.SchedulerStateActive,
88
			}).Error; err != nil {
89
				return nil, err
90
			}
91

92
			// Scan the schedulers to find the first scheduler that supports preheat.
93
			for _, scheduler := range schedulers {
94
				if slices.Contains(scheduler.Features, types.SchedulerFeaturePreheat) {
95
					candidateSchedulers = append(candidateSchedulers, scheduler)
96
					break
97
				}
98
			}
99
		}
100
	} else {
101
		// Find all of the scheduler clusters that has active schedulers.
102
		var candidateSchedulerClusters []models.SchedulerCluster
103
		if err := s.db.WithContext(ctx).Find(&candidateSchedulerClusters).Error; err != nil {
104
			return nil, err
105
		}
106

107
		for _, candidateSchedulerCluster := range candidateSchedulerClusters {
108
			var schedulers []models.Scheduler
109
			if err := s.db.WithContext(ctx).Preload("SchedulerCluster").Find(&schedulers, models.Scheduler{
110
				SchedulerClusterID: candidateSchedulerCluster.ID,
111
				State:              models.SchedulerStateActive,
112
			}).Error; err != nil {
113
				continue
114
			}
115

116
			// Scan the schedulers to find the first scheduler that supports preheat.
117
			for _, scheduler := range schedulers {
118
				if slices.Contains(scheduler.Features, types.SchedulerFeaturePreheat) {
119
					candidateSchedulers = append(candidateSchedulers, scheduler)
120
					break
121
				}
122
			}
123
		}
124
	}
125

126
	if len(candidateSchedulers) == 0 {
127
		return nil, errors.New("candidate schedulers not found")
128
	}
129

130
	return candidateSchedulers, nil
131
}
132

133
func (s *service) pollingJob(ctx context.Context, id uint, groupID string) {
134
	var (
135
		job models.Job
136
		log = logger.WithGroupAndJobID(groupID, fmt.Sprint(id))
137
	)
138
	if _, _, err := retry.Run(ctx, 5, 10, 480, func() (any, bool, error) {
139
		groupJob, err := s.job.GetGroupJobState(groupID)
140
		if err != nil {
141
			log.Errorf("polling group failed: %s", err.Error())
142
			return nil, false, err
143
		}
144

145
		result, err := structure.StructToMap(groupJob)
146
		if err != nil {
147
			log.Errorf("polling group failed: %s", err.Error())
148
			return nil, false, err
149
		}
150

151
		if err := s.db.WithContext(ctx).First(&job, id).Updates(models.Job{
152
			State:  groupJob.State,
153
			Result: result,
154
		}).Error; err != nil {
155
			log.Errorf("polling group failed: %s", err.Error())
156
			return nil, true, err
157
		}
158

159
		switch job.State {
160
		case machineryv1tasks.StateSuccess:
161
			log.Info("polling group succeeded")
162
			return nil, true, nil
163
		case machineryv1tasks.StateFailure:
164
			log.Error("polling group failed")
165
			return nil, true, nil
166
		default:
167
			msg := fmt.Sprintf("polling job state is %s", job.State)
168
			log.Info(msg)
169
			return nil, false, errors.New(msg)
170
		}
171
	}); err != nil {
172
		log.Errorf("polling group failed: %s", err.Error())
173
	}
174

175
	// Polling timeout and failed.
176
	if job.State != machineryv1tasks.StateSuccess && job.State != machineryv1tasks.StateFailure {
177
		job := models.Job{}
178
		if err := s.db.WithContext(ctx).First(&job, id).Updates(models.Job{
179
			State: machineryv1tasks.StateFailure,
180
		}).Error; err != nil {
181
			log.Errorf("polling group failed: %s", err.Error())
182
		}
183
		log.Error("polling group timeout")
184
	}
185
}
186

187
func (s *service) DestroyJob(ctx context.Context, id uint) error {
188
	job := models.Job{}
189
	if err := s.db.WithContext(ctx).First(&job, id).Error; err != nil {
190
		return err
191
	}
192

193
	if err := s.db.WithContext(ctx).Unscoped().Delete(&models.Job{}, id).Error; err != nil {
194
		return err
195
	}
196

197
	return nil
198
}
199

200
func (s *service) UpdateJob(ctx context.Context, id uint, json types.UpdateJobRequest) (*models.Job, error) {
201
	job := models.Job{}
202
	if err := s.db.WithContext(ctx).Preload("SeedPeerClusters").Preload("SchedulerClusters").First(&job, id).Updates(models.Job{
203
		BIO:    json.BIO,
204
		UserID: json.UserID,
205
	}).Error; err != nil {
206
		return nil, err
207
	}
208

209
	return &job, nil
210
}
211

212
func (s *service) GetJob(ctx context.Context, id uint) (*models.Job, error) {
213
	job := models.Job{}
214
	if err := s.db.WithContext(ctx).Preload("SeedPeerClusters").Preload("SchedulerClusters").First(&job, id).Error; err != nil {
215
		return nil, err
216
	}
217

218
	return &job, nil
219
}
220

221
func (s *service) GetJobs(ctx context.Context, q types.GetJobsQuery) ([]models.Job, int64, error) {
222
	var count int64
223
	var jobs []models.Job
224
	if err := s.db.WithContext(ctx).Scopes(models.Paginate(q.Page, q.PerPage)).Where(&models.Job{
225
		Type:   q.Type,
226
		State:  q.State,
227
		UserID: q.UserID,
228
	}).Order("created_at DESC").Find(&jobs).Limit(-1).Offset(-1).Count(&count).Error; err != nil {
229
		return nil, 0, err
230
	}
231

232
	return jobs, count, nil
233
}
234

235
func (s *service) AddJobToSchedulerClusters(ctx context.Context, id, schedulerClusterIDs []uint) error {
236
	job := models.Job{}
237
	if err := s.db.WithContext(ctx).First(&job, id).Error; err != nil {
238
		return err
239
	}
240

241
	var schedulerClusters []*models.SchedulerCluster
242
	for _, schedulerClusterID := range schedulerClusterIDs {
243
		schedulerCluster := models.SchedulerCluster{}
244
		if err := s.db.WithContext(ctx).First(&schedulerCluster, schedulerClusterID).Error; err != nil {
245
			return err
246
		}
247
		schedulerClusters = append(schedulerClusters, &schedulerCluster)
248
	}
249

250
	if err := s.db.WithContext(ctx).Model(&job).Association("SchedulerClusters").Append(schedulerClusters); err != nil {
251
		return err
252
	}
253

254
	return nil
255
}
256

257
func (s *service) AddJobToSeedPeerClusters(ctx context.Context, id, seedPeerClusterIDs []uint) error {
258
	job := models.Job{}
259
	if err := s.db.WithContext(ctx).First(&job, id).Error; err != nil {
260
		return err
261
	}
262

263
	var seedPeerClusters []*models.SeedPeerCluster
264
	for _, seedPeerClusterID := range seedPeerClusterIDs {
265
		seedPeerCluster := models.SeedPeerCluster{}
266
		if err := s.db.WithContext(ctx).First(&seedPeerCluster, seedPeerClusterID).Error; err != nil {
267
			return err
268
		}
269
		seedPeerClusters = append(seedPeerClusters, &seedPeerCluster)
270
	}
271

272
	if err := s.db.WithContext(ctx).Model(&job).Association("SeedPeerClusters").Append(seedPeerClusters); err != nil {
273
		return err
274
	}
275

276
	return nil
277
}
278

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.