go-clean-template

Форк
0
162 строки · 2.9 Кб
1
package server
2

3
import (
4
	"encoding/json"
5
	"fmt"
6
	"time"
7

8
	"github.com/streadway/amqp"
9

10
	"github.com/evrone/go-clean-template/pkg/logger"
11
	rmqrpc "github.com/evrone/go-clean-template/pkg/rabbitmq/rmq_rpc"
12
)
13

14
const (
15
	_defaultWaitTime = 5 * time.Second
16
	_defaultAttempts = 10
17
	_defaultTimeout  = 2 * time.Second
18
)
19

20
// CallHandler -.
21
type CallHandler func(*amqp.Delivery) (interface{}, error)
22

23
// Server -.
24
type Server struct {
25
	conn   *rmqrpc.Connection
26
	error  chan error
27
	stop   chan struct{}
28
	router map[string]CallHandler
29

30
	timeout time.Duration
31

32
	logger logger.Interface
33
}
34

35
// New -.
36
func New(url, serverExchange string, router map[string]CallHandler, l logger.Interface, opts ...Option) (*Server, error) {
37
	cfg := rmqrpc.Config{
38
		URL:      url,
39
		WaitTime: _defaultWaitTime,
40
		Attempts: _defaultAttempts,
41
	}
42

43
	s := &Server{
44
		conn:    rmqrpc.New(serverExchange, cfg),
45
		error:   make(chan error),
46
		stop:    make(chan struct{}),
47
		router:  router,
48
		timeout: _defaultTimeout,
49
		logger:  l,
50
	}
51

52
	// Custom options
53
	for _, opt := range opts {
54
		opt(s)
55
	}
56

57
	err := s.conn.AttemptConnect()
58
	if err != nil {
59
		return nil, fmt.Errorf("rmq_rpc server - NewServer - s.conn.AttemptConnect: %w", err)
60
	}
61

62
	go s.consumer()
63

64
	return s, nil
65
}
66

67
func (s *Server) consumer() {
68
	for {
69
		select {
70
		case <-s.stop:
71
			return
72
		case d, opened := <-s.conn.Delivery:
73
			if !opened {
74
				s.reconnect()
75

76
				return
77
			}
78

79
			_ = d.Ack(false) //nolint:errcheck // don't need this
80

81
			s.serveCall(&d)
82
		}
83
	}
84
}
85

86
func (s *Server) serveCall(d *amqp.Delivery) {
87
	callHandler, ok := s.router[d.Type]
88
	if !ok {
89
		s.publish(d, nil, rmqrpc.ErrBadHandler.Error())
90

91
		return
92
	}
93

94
	response, err := callHandler(d)
95
	if err != nil {
96
		s.publish(d, nil, rmqrpc.ErrInternalServer.Error())
97

98
		s.logger.Error(err, "rmq_rpc server - Server - serveCall - callHandler")
99

100
		return
101
	}
102

103
	body, err := json.Marshal(response)
104
	if err != nil {
105
		s.logger.Error(err, "rmq_rpc server - Server - serveCall - json.Marshal")
106
	}
107

108
	s.publish(d, body, rmqrpc.Success)
109
}
110

111
func (s *Server) publish(d *amqp.Delivery, body []byte, status string) {
112
	err := s.conn.Channel.Publish(d.ReplyTo, "", false, false,
113
		amqp.Publishing{
114
			ContentType:   "application/json",
115
			CorrelationId: d.CorrelationId,
116
			Type:          status,
117
			Body:          body,
118
		})
119
	if err != nil {
120
		s.logger.Error(err, "rmq_rpc server - Server - publish - s.conn.Channel.Publish")
121
	}
122
}
123

124
func (s *Server) reconnect() {
125
	close(s.stop)
126

127
	err := s.conn.AttemptConnect()
128
	if err != nil {
129
		s.error <- err
130
		close(s.error)
131

132
		return
133
	}
134

135
	s.stop = make(chan struct{})
136

137
	go s.consumer()
138
}
139

140
// Notify -.
141
func (s *Server) Notify() <-chan error {
142
	return s.error
143
}
144

145
// Shutdown -.
146
func (s *Server) Shutdown() error {
147
	select {
148
	case <-s.error:
149
		return nil
150
	default:
151
	}
152

153
	close(s.stop)
154
	time.Sleep(s.timeout)
155

156
	err := s.conn.Connection.Close()
157
	if err != nil {
158
		return fmt.Errorf("rmq_rpc server - Server - Shutdown - s.Connection.Close: %w", err)
159
	}
160

161
	return nil
162
}
163

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.