go-clean-template

Форк
0
248 строк · 4.4 Кб
1
package client
2

3
import (
4
	"encoding/json"
5
	"errors"
6
	"fmt"
7
	"sync"
8
	"time"
9

10
	"github.com/google/uuid"
11
	"github.com/streadway/amqp"
12

13
	rmqrpc "github.com/evrone/go-clean-template/pkg/rabbitmq/rmq_rpc"
14
)
15

16
// ErrConnectionClosed -.
17
var ErrConnectionClosed = errors.New("rmq_rpc client - Client - RemoteCall - Connection closed")
18

19
const (
20
	_defaultWaitTime = 5 * time.Second
21
	_defaultAttempts = 10
22
	_defaultTimeout  = 2 * time.Second
23
)
24

25
// Message -.
26
type Message struct {
27
	Queue         string
28
	Priority      uint8
29
	ContentType   string
30
	Body          []byte
31
	ReplyTo       string
32
	CorrelationID string
33
}
34

35
type pendingCall struct {
36
	done   chan struct{}
37
	status string
38
	body   []byte
39
}
40

41
// Client -.
42
type Client struct {
43
	conn           *rmqrpc.Connection
44
	serverExchange string
45
	error          chan error
46
	stop           chan struct{}
47

48
	rw    sync.RWMutex
49
	calls map[string]*pendingCall
50

51
	timeout time.Duration
52
}
53

54
// New -.
55
func New(url, serverExchange, clientExchange string, opts ...Option) (*Client, error) {
56
	cfg := rmqrpc.Config{
57
		URL:      url,
58
		WaitTime: _defaultWaitTime,
59
		Attempts: _defaultAttempts,
60
	}
61

62
	c := &Client{
63
		conn:           rmqrpc.New(clientExchange, cfg),
64
		serverExchange: serverExchange,
65
		error:          make(chan error),
66
		stop:           make(chan struct{}),
67
		calls:          make(map[string]*pendingCall),
68
		timeout:        _defaultTimeout,
69
	}
70

71
	// Custom options
72
	for _, opt := range opts {
73
		opt(c)
74
	}
75

76
	err := c.conn.AttemptConnect()
77
	if err != nil {
78
		return nil, fmt.Errorf("rmq_rpc client - NewClient - c.conn.AttemptConnect: %w", err)
79
	}
80

81
	go c.consumer()
82

83
	return c, nil
84
}
85

86
func (c *Client) publish(corrID, handler string, request interface{}) error {
87
	var (
88
		requestBody []byte
89
		err         error
90
	)
91

92
	if request != nil {
93
		requestBody, err = json.Marshal(request)
94
		if err != nil {
95
			return err
96
		}
97
	}
98

99
	err = c.conn.Channel.Publish(c.serverExchange, "", false, false,
100
		amqp.Publishing{
101
			ContentType:   "application/json",
102
			CorrelationId: corrID,
103
			ReplyTo:       c.conn.ConsumerExchange,
104
			Type:          handler,
105
			Body:          requestBody,
106
		})
107
	if err != nil {
108
		return fmt.Errorf("c.Channel.Publish: %w", err)
109
	}
110

111
	return nil
112
}
113

114
// RemoteCall -.
115
func (c *Client) RemoteCall(handler string, request, response interface{}) error { //nolint:cyclop // complex func
116
	select {
117
	case <-c.stop:
118
		time.Sleep(c.timeout)
119
		select {
120
		case <-c.stop:
121
			return ErrConnectionClosed
122
		default:
123
		}
124
	default:
125
	}
126

127
	corrID := uuid.New().String()
128

129
	err := c.publish(corrID, handler, request)
130
	if err != nil {
131
		return fmt.Errorf("rmq_rpc client - Client - RemoteCall - c.publish: %w", err)
132
	}
133

134
	call := &pendingCall{done: make(chan struct{})}
135

136
	c.addCall(corrID, call)
137
	defer c.deleteCall(corrID)
138

139
	select {
140
	case <-time.After(c.timeout):
141
		return rmqrpc.ErrTimeout
142
	case <-call.done:
143
	}
144

145
	if call.status == rmqrpc.Success {
146
		err = json.Unmarshal(call.body, &response)
147
		if err != nil {
148
			return fmt.Errorf("rmq_rpc client - Client - RemoteCall - json.Unmarshal: %w", err)
149
		}
150

151
		return nil
152
	}
153

154
	if call.status == rmqrpc.ErrBadHandler.Error() {
155
		return rmqrpc.ErrBadHandler
156
	}
157

158
	if call.status == rmqrpc.ErrInternalServer.Error() {
159
		return rmqrpc.ErrInternalServer
160
	}
161

162
	return nil
163
}
164

165
func (c *Client) consumer() {
166
	for {
167
		select {
168
		case <-c.stop:
169
			return
170
		case d, opened := <-c.conn.Delivery:
171
			if !opened {
172
				c.reconnect()
173

174
				return
175
			}
176

177
			_ = d.Ack(false) //nolint:errcheck // don't need this
178

179
			c.getCall(&d)
180
		}
181
	}
182
}
183

184
func (c *Client) reconnect() {
185
	close(c.stop)
186

187
	err := c.conn.AttemptConnect()
188
	if err != nil {
189
		c.error <- err
190
		close(c.error)
191

192
		return
193
	}
194

195
	c.stop = make(chan struct{})
196

197
	go c.consumer()
198
}
199

200
func (c *Client) getCall(d *amqp.Delivery) {
201
	c.rw.RLock()
202
	call, ok := c.calls[d.CorrelationId]
203
	c.rw.RUnlock()
204

205
	if !ok {
206
		return
207
	}
208

209
	call.status = d.Type
210
	call.body = d.Body
211
	close(call.done)
212
}
213

214
func (c *Client) addCall(corrID string, call *pendingCall) {
215
	c.rw.Lock()
216
	c.calls[corrID] = call
217
	c.rw.Unlock()
218
}
219

220
func (c *Client) deleteCall(corrID string) {
221
	c.rw.Lock()
222
	delete(c.calls, corrID)
223
	c.rw.Unlock()
224
}
225

226
// Notify -.
227
func (c *Client) Notify() <-chan error {
228
	return c.error
229
}
230

231
// Shutdown -.
232
func (c *Client) Shutdown() error {
233
	select {
234
	case <-c.error:
235
		return nil
236
	default:
237
	}
238

239
	close(c.stop)
240
	time.Sleep(c.timeout)
241

242
	err := c.conn.Connection.Close()
243
	if err != nil {
244
		return fmt.Errorf("rmq_rpc client - Client - Shutdown - c.Connection.Close: %w", err)
245
	}
246

247
	return nil
248
}
249

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.