msrc
/
MultiService.go
231 строка · 6.0 Кб
1package msrc
2
3import (
4"context"
5"os"
6"os/signal"
7"strconv"
8"sync"
9
10"github.com/pkg/errors"
11"github.com/sirupsen/logrus"
12"gitlab.systems-fd.com/packages/golang/helpers/h"
13"golang.org/x/sync/errgroup"
14)
15
16// multiServiceServer имплементирует логику сервера для MultiServiceInterface.
17// Под капотом использует функционал signal.Notify для реализации правильной
18// остановки по полученному из системы сигналу завершения работы.
19type multiServiceServer struct {
20services map[string]ServiceInterface
21servicesComplete map[string]bool
22servicesCompleteMX sync.Mutex
23
24serviceStopFunc *h.SyncValueStruct[context.CancelFunc]
25servicesIsWorkMx *sync.Mutex
26isServicesRun *h.SyncValueStruct[bool]
27isShutdownInProgress *h.SyncValueStruct[bool]
28logger *logrus.Entry
29}
30
31// MultiService реализует конструктор сервиса MultiServiceInterface.
32// Данный конструктор принимает на вход карту сервисов, которые необходимо
33// запускать основному серверу. Данные сервисы будут запускаться при
34// помощи функции Run().
35//
36// Вторым аргументом на вход передается livenessPort, который является
37// не обязательным параметром. Данный параметр позволяет подключить к
38// сервису дополнительный функционал проверки Liveness Probe для
39// Kubernetes.
40func MultiService(
41services map[string]ServiceInterface,
42livenessPort ...uint16,
43) MultiServiceInterface {
44mainService := &multiServiceServer{
45services: map[string]ServiceInterface{},
46servicesComplete: map[string]bool{},
47servicesCompleteMX: sync.Mutex{},
48serviceStopFunc: h.SyncValue[context.CancelFunc](func() {}),
49servicesIsWorkMx: &sync.Mutex{},
50isServicesRun: h.SyncValue[bool](false),
51isShutdownInProgress: h.SyncValue[bool](false),
52logger: logrus.WithField("prefix", `MultiService`),
53}
54
55if 0 != len(livenessPort) {
56if livenessPort[0] > 0 {
57livenessService := newLivenessService(strconv.FormatUint(uint64(livenessPort[0]), 10), mainService)
58
59services[`LivenessService`] = livenessService
60}
61}
62
63mainService.services = services
64
65return mainService
66}
67
68// Run выполняет запуск основного сервиса
69func (m *multiServiceServer) Run() error {
70if m.isServicesRun.Get() {
71return nil
72}
73
74m.isServicesRun.Set(true)
75defer m.isServicesRun.Set(false)
76
77defer func() {
78m.logger.
79WithField("code", 20010).
80Info("All services completely stopped")
81}()
82
83m.servicesIsWorkMx.Lock()
84defer m.servicesIsWorkMx.Unlock()
85
86ctx, cancel := context.WithCancel(context.Background())
87defer cancel()
88
89m.serviceStopFunc.Set(cancel)
90
91osSignals := make(chan os.Signal, 1)
92signal.Notify(osSignals, signals...)
93
94defer signal.Stop(osSignals)
95defer close(osSignals)
96
97g, wCtx := errgroup.WithContext(ctx)
98
99// Регистрируем подписку на сигнал завершения работы от системы или
100// от общего рабочего контекста.
101go func() {
102select {
103case <-wCtx.Done():
104case <-osSignals:
105}
106
107m.Stop()
108}()
109
110m.servicesCompleteMX.Lock()
111for i := range m.services {
112serviceName := i
113service := m.services[i]
114m.servicesComplete[serviceName] = false
115
116g.Go(func() error {
117if h.IsCtxDone(wCtx) {
118return nil
119}
120
121err := service.Run()
122
123m.servicesCompleteMX.Lock()
124m.servicesComplete[serviceName] = true
125m.servicesCompleteMX.Unlock()
126
127// Если завершение пришло по функции Stop, то игнорируем ошибки.
128if h.IsCtxDone(wCtx) {
129return nil
130}
131
132if nil == err {
133m.logger.WithFields(logrus.Fields{
134"code": 20200,
135"service": serviceName,
136}).Info("Service complete")
137
138return nil
139}
140
141m.logger.
142WithError(err).
143WithField("code", 50000).
144WithField("service", serviceName).
145Error(`Response error from service`)
146
147return errors.Wrap(err, `[`+serviceName+`] response error from service`)
148})
149}
150m.servicesCompleteMX.Unlock()
151
152err := g.Wait()
153if err == context.Canceled {
154return nil
155}
156
157if nil != err {
158return errors.Wrap(err, `[MultiService] received error from service`)
159}
160
161return nil
162}
163
164// Stop выполняет остановку основного сервиса
165func (m *multiServiceServer) Stop() {
166if m.isShutdownInProgress.Get() {
167return
168}
169
170m.isShutdownInProgress.Set(true)
171defer m.isShutdownInProgress.Set(false)
172
173m.logger.
174WithField("code", 20004).
175Info("Initialized graceful shutdown")
176
177m.serviceStopFunc.Get()()
178
179var wg sync.WaitGroup
180wg.Add(len(m.services))
181
182for name, service := range m.services {
183go func(name string, service ServiceInterface, wg *sync.WaitGroup) {
184defer wg.Done()
185
186m.servicesCompleteMX.Lock()
187status := m.servicesComplete[name]
188m.servicesCompleteMX.Unlock()
189
190if status {
191return
192}
193
194m.logger.
195WithField("code", 20004).
196WithField("service", name).
197Debug(`Shutdown service`)
198
199service.GracefulShutdown()
200}(name, service, &wg)
201}
202
203wg.Wait()
204
205m.servicesIsWorkMx.Lock()
206defer m.servicesIsWorkMx.Unlock()
207
208m.logger.
209WithField("code", 20004).
210Info("Services gracefully shutdown")
211}
212
213// IsServicesRun проверяет, запущен ли сервер
214func (m *multiServiceServer) IsServicesRun() bool {
215status := true
216for _, service := range m.services {
217status = status && service.IsStarted()
218}
219
220return status
221}
222
223// IsServicesAlive проверяет, остановлен ли хоть один из сервисов
224func (m *multiServiceServer) IsServicesAlive() bool {
225status := true
226for _, service := range m.services {
227status = status && service.IsAlive()
228}
229
230return status
231}
232