msrc

Форк
0
/
MultiService.go 
231 строка · 6.0 Кб
1
package msrc
2

3
import (
4
	"context"
5
	"os"
6
	"os/signal"
7
	"strconv"
8
	"sync"
9

10
	"github.com/pkg/errors"
11
	"github.com/sirupsen/logrus"
12
	"gitlab.systems-fd.com/packages/golang/helpers/h"
13
	"golang.org/x/sync/errgroup"
14
)
15

16
// multiServiceServer имплементирует логику сервера для MultiServiceInterface.
17
// Под капотом использует функционал signal.Notify для реализации правильной
18
// остановки по полученному из системы сигналу завершения работы.
19
type multiServiceServer struct {
20
	services           map[string]ServiceInterface
21
	servicesComplete   map[string]bool
22
	servicesCompleteMX sync.Mutex
23

24
	serviceStopFunc      *h.SyncValueStruct[context.CancelFunc]
25
	servicesIsWorkMx     *sync.Mutex
26
	isServicesRun        *h.SyncValueStruct[bool]
27
	isShutdownInProgress *h.SyncValueStruct[bool]
28
	logger               *logrus.Entry
29
}
30

31
// MultiService реализует конструктор сервиса MultiServiceInterface.
32
// Данный конструктор принимает на вход карту сервисов, которые необходимо
33
// запускать основному серверу. Данные сервисы будут запускаться при
34
// помощи функции Run().
35
//
36
// Вторым аргументом на вход передается livenessPort, который является
37
// не обязательным параметром. Данный параметр позволяет подключить к
38
// сервису дополнительный функционал проверки Liveness Probe для
39
// Kubernetes.
40
func MultiService(
41
	services map[string]ServiceInterface,
42
	livenessPort ...uint16,
43
) MultiServiceInterface {
44
	mainService := &multiServiceServer{
45
		services:             map[string]ServiceInterface{},
46
		servicesComplete:     map[string]bool{},
47
		servicesCompleteMX:   sync.Mutex{},
48
		serviceStopFunc:      h.SyncValue[context.CancelFunc](func() {}),
49
		servicesIsWorkMx:     &sync.Mutex{},
50
		isServicesRun:        h.SyncValue[bool](false),
51
		isShutdownInProgress: h.SyncValue[bool](false),
52
		logger:               logrus.WithField("prefix", `MultiService`),
53
	}
54

55
	if 0 != len(livenessPort) {
56
		if livenessPort[0] > 0 {
57
			livenessService := newLivenessService(strconv.FormatUint(uint64(livenessPort[0]), 10), mainService)
58

59
			services[`LivenessService`] = livenessService
60
		}
61
	}
62

63
	mainService.services = services
64

65
	return mainService
66
}
67

68
// Run выполняет запуск основного сервиса
69
func (m *multiServiceServer) Run() error {
70
	if m.isServicesRun.Get() {
71
		return nil
72
	}
73

74
	m.isServicesRun.Set(true)
75
	defer m.isServicesRun.Set(false)
76

77
	defer func() {
78
		m.logger.
79
			WithField("code", 20010).
80
			Info("All services completely stopped")
81
	}()
82

83
	m.servicesIsWorkMx.Lock()
84
	defer m.servicesIsWorkMx.Unlock()
85

86
	ctx, cancel := context.WithCancel(context.Background())
87
	defer cancel()
88

89
	m.serviceStopFunc.Set(cancel)
90

91
	osSignals := make(chan os.Signal, 1)
92
	signal.Notify(osSignals, signals...)
93

94
	defer signal.Stop(osSignals)
95
	defer close(osSignals)
96

97
	g, wCtx := errgroup.WithContext(ctx)
98

99
	// Регистрируем подписку на сигнал завершения работы от системы или
100
	// от общего рабочего контекста.
101
	go func() {
102
		select {
103
		case <-wCtx.Done():
104
		case <-osSignals:
105
		}
106

107
		m.Stop()
108
	}()
109

110
	m.servicesCompleteMX.Lock()
111
	for i := range m.services {
112
		serviceName := i
113
		service := m.services[i]
114
		m.servicesComplete[serviceName] = false
115

116
		g.Go(func() error {
117
			if h.IsCtxDone(wCtx) {
118
				return nil
119
			}
120

121
			err := service.Run()
122

123
			m.servicesCompleteMX.Lock()
124
			m.servicesComplete[serviceName] = true
125
			m.servicesCompleteMX.Unlock()
126

127
			// Если завершение пришло по функции Stop, то игнорируем ошибки.
128
			if h.IsCtxDone(wCtx) {
129
				return nil
130
			}
131

132
			if nil == err {
133
				m.logger.WithFields(logrus.Fields{
134
					"code":    20200,
135
					"service": serviceName,
136
				}).Info("Service complete")
137

138
				return nil
139
			}
140

141
			m.logger.
142
				WithError(err).
143
				WithField("code", 50000).
144
				WithField("service", serviceName).
145
				Error(`Response error from service`)
146

147
			return errors.Wrap(err, `[`+serviceName+`] response error from service`)
148
		})
149
	}
150
	m.servicesCompleteMX.Unlock()
151

152
	err := g.Wait()
153
	if err == context.Canceled {
154
		return nil
155
	}
156

157
	if nil != err {
158
		return errors.Wrap(err, `[MultiService] received error from service`)
159
	}
160

161
	return nil
162
}
163

164
// Stop выполняет остановку основного сервиса
165
func (m *multiServiceServer) Stop() {
166
	if m.isShutdownInProgress.Get() {
167
		return
168
	}
169

170
	m.isShutdownInProgress.Set(true)
171
	defer m.isShutdownInProgress.Set(false)
172

173
	m.logger.
174
		WithField("code", 20004).
175
		Info("Initialized graceful shutdown")
176

177
	m.serviceStopFunc.Get()()
178

179
	var wg sync.WaitGroup
180
	wg.Add(len(m.services))
181

182
	for name, service := range m.services {
183
		go func(name string, service ServiceInterface, wg *sync.WaitGroup) {
184
			defer wg.Done()
185

186
			m.servicesCompleteMX.Lock()
187
			status := m.servicesComplete[name]
188
			m.servicesCompleteMX.Unlock()
189

190
			if status {
191
				return
192
			}
193

194
			m.logger.
195
				WithField("code", 20004).
196
				WithField("service", name).
197
				Debug(`Shutdown service`)
198

199
			service.GracefulShutdown()
200
		}(name, service, &wg)
201
	}
202

203
	wg.Wait()
204

205
	m.servicesIsWorkMx.Lock()
206
	defer m.servicesIsWorkMx.Unlock()
207

208
	m.logger.
209
		WithField("code", 20004).
210
		Info("Services gracefully shutdown")
211
}
212

213
// IsServicesRun проверяет, запущен ли сервер
214
func (m *multiServiceServer) IsServicesRun() bool {
215
	status := true
216
	for _, service := range m.services {
217
		status = status && service.IsStarted()
218
	}
219

220
	return status
221
}
222

223
// IsServicesAlive проверяет, остановлен ли хоть один из сервисов
224
func (m *multiServiceServer) IsServicesAlive() bool {
225
	status := true
226
	for _, service := range m.services {
227
		status = status && service.IsAlive()
228
	}
229

230
	return status
231
}
232

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.