mosn

Форк
0
/
stream_test.go 
261 строка · 7.8 Кб
1
//go:build MOSNTest
2
// +build MOSNTest
3

4
package grpc
5

6
import (
7
	"context"
8
	"errors"
9
	"fmt"
10
	"io"
11
	"testing"
12
	"time"
13

14
	"google.golang.org/grpc"
15
	pb "google.golang.org/grpc/examples/features/proto/echo"
16
	"google.golang.org/grpc/metadata"
17
	"mosn.io/mosn/pkg/log"
18
	. "mosn.io/mosn/test/framework"
19
	"mosn.io/mosn/test/lib"
20
)
21

22
const (
23
	timestampFormat = time.StampNano // "Jan _2 15:04:05.000"
24
	streamingCount  = 10
25
)
26

27
func TestStreamGrpc(t *testing.T) {
28
	Scenario(t, "streaming grpc server is mosn networkfilter", func() {
29
		_, _ = lib.InitMosn(ConfigStreamGrpcFilter) // no servers need
30
		Case("server streaming", func() {
31
			conn, err := grpc.Dial("127.0.0.1:2045", grpc.WithInsecure(), grpc.WithBlock())
32
			Verify(err, Equal, nil)
33
			defer conn.Close()
34
			c := pb.NewEchoClient(conn)
35
			message := "this is server streaming metadata"
36
			Verify(serverStreamingWithMetadata(c, message), Equal, nil)
37
		})
38
		Case("client streaming", func() {
39
			conn, err := grpc.Dial("127.0.0.1:2045", grpc.WithInsecure(), grpc.WithBlock())
40
			Verify(err, Equal, nil)
41
			defer conn.Close()
42
			c := pb.NewEchoClient(conn)
43
			message := "this is client streaming metadata"
44
			Verify(clientStreamWithMetadata(c, message), Equal, nil)
45
		})
46
		Case("bidrectional", func() {
47
			conn, err := grpc.Dial("127.0.0.1:2045", grpc.WithInsecure(), grpc.WithBlock())
48
			Verify(err, Equal, nil)
49
			defer conn.Close()
50
			c := pb.NewEchoClient(conn)
51
			message := "this is bidrectional metadata"
52
			Verify(bidirectionalWithMetadata(c, message), Equal, nil)
53
		})
54
	})
55
}
56

57
// stream grpc client is implemented with reference to grpc/examples/features/proto/echo
58
func serverStreamingWithMetadata(c pb.EchoClient, message string) error {
59
	// Create metadata and context.
60
	md := metadata.Pairs("timestamp", time.Now().Format(timestampFormat))
61
	ctx := metadata.NewOutgoingContext(context.Background(), md)
62
	// Make RPC using the context with the metadata.
63
	stream, err := c.ServerStreamingEcho(ctx, &pb.EchoRequest{Message: message})
64
	if err != nil {
65
		log.DefaultLogger.Errorf("failed to call ServerStreamingEcho: %v", err)
66
		return err
67
	}
68
	// Read the header when the header arrives.
69
	header, err := stream.Header()
70
	if err != nil {
71
		log.DefaultLogger.Errorf("failed to get header from stream: %v", err)
72
		return err
73
	}
74
	//  Read metadata from server's header.
75
	if _, ok := header["timestamp"]; !ok {
76
		log.DefaultLogger.Errorf("timestamp expected but doesn't exist in header")
77
		return errors.New("timestamp expected but doesn't exist in header")
78
	}
79
	if _, ok := header["location"]; !ok {
80
		log.DefaultLogger.Errorf("location expected but doesn't exist in header")
81
		return errors.New("location expected but doesn't exist in header")
82
	}
83
	// Read all the responses.
84
	var rpcStatus error
85
	respCount := 0
86
	for {
87
		r, err := stream.Recv()
88
		if err != nil {
89
			rpcStatus = err
90
			break
91
		}
92
		if r.Message != message {
93
			rpcStatus = fmt.Errorf("unexpected response message: %s", r.Message)
94
			break
95
		}
96
		respCount++
97
	}
98
	if rpcStatus != io.EOF || respCount != streamingCount {
99
		log.DefaultLogger.Errorf("failed to finish server streaming: %v, response count: %d", rpcStatus, respCount)
100
		return errors.New("unexpected response")
101
	}
102
	// Read the trailer after the RPC is finished.
103
	trailer := stream.Trailer()
104
	if _, ok := trailer["timestamp"]; !ok {
105
		log.DefaultLogger.Errorf("timestamp expected but doesn't exist in trailer")
106
		return errors.New("timestamp expected but doesn't exist in trailer")
107
	}
108
	return nil
109
}
110

111
func clientStreamWithMetadata(c pb.EchoClient, message string) error {
112
	// Create metadata and context.
113
	md := metadata.Pairs("timestamp", time.Now().Format(timestampFormat))
114
	ctx := metadata.NewOutgoingContext(context.Background(), md)
115

116
	// Make RPC using the context with the metadata.
117
	stream, err := c.ClientStreamingEcho(ctx)
118
	if err != nil {
119
		log.DefaultLogger.Errorf("failed to get header from stream: %v", err)
120
		return err
121
	}
122
	// Read the header when the header arrives.
123
	header, err := stream.Header()
124
	if err != nil {
125
		log.DefaultLogger.Errorf("failed to get header from stream: %v", err)
126
		return err
127
	}
128
	//  Read metadata from server's header.
129
	if _, ok := header["timestamp"]; !ok {
130
		log.DefaultLogger.Errorf("timestamp expected but doesn't exist in header")
131
		return errors.New("timestamp expected but doesn't exist in header")
132
	}
133
	if _, ok := header["location"]; !ok {
134
		log.DefaultLogger.Errorf("location expected but doesn't exist in header")
135
		return errors.New("location expected but doesn't exist in header")
136
	}
137
	// Send all requests to the server.
138
	for i := 0; i < streamingCount; i++ {
139
		if err := stream.Send(&pb.EchoRequest{Message: message}); err != nil {
140
			log.DefaultLogger.Errorf("failed to send streaming: %v", err)
141
			return err
142
		}
143
	}
144
	// Read the response.
145
	r, err := stream.CloseAndRecv()
146
	if err != nil {
147
		log.DefaultLogger.Errorf("failed to CloseAndRecv: %v", err)
148
		return err
149
	}
150
	if r.Message != message {
151
		return fmt.Errorf("unexpected response message: %s", r.Message)
152
	}
153
	// Read the trailer after the RPC is finished.
154
	trailer := stream.Trailer()
155
	if _, ok := trailer["timestamp"]; !ok {
156
		log.DefaultLogger.Errorf("timestamp expected but doesn't exist in trailer")
157
		return errors.New("timestamp expected but doesn't exist in trailer")
158
	}
159
	return nil
160
}
161

162
func bidirectionalWithMetadata(c pb.EchoClient, message string) error {
163
	// Create metadata and context.
164
	md := metadata.Pairs("timestamp", time.Now().Format(timestampFormat))
165
	ctx := metadata.NewOutgoingContext(context.Background(), md)
166

167
	// Make RPC using the context with the metadata.
168
	stream, err := c.BidirectionalStreamingEcho(ctx)
169
	if err != nil {
170
		log.DefaultLogger.Errorf("failed to call BidirectionalStreamingEcho: %v", err)
171
		return err
172
	}
173
	errCh := make(chan error)
174
	go func() {
175
		// Read the header when the header arrives.
176
		header, err := stream.Header()
177
		if err != nil {
178
			log.DefaultLogger.Errorf("failed to get header from stream: %v", err)
179
			errCh <- err
180
			return
181
		}
182
		//  Read metadata from server's header.
183
		if _, ok := header["timestamp"]; !ok {
184
			log.DefaultLogger.Errorf("timestamp expected but doesn't exist in header")
185
			errCh <- errors.New("timestamp expected but doesn't exist in header")
186
			return
187
		}
188
		if _, ok := header["location"]; !ok {
189
			log.DefaultLogger.Errorf("location expected but doesn't exist in header")
190
			errCh <- errors.New("location expected but doesn't exist in header")
191
			return
192
		}
193
		// Send all requests to the server.
194
		for i := 0; i < streamingCount; i++ {
195
			if err := stream.Send(&pb.EchoRequest{Message: message}); err != nil {
196
				log.DefaultLogger.Errorf("failed to send streaming: %v", err)
197
				errCh <- err
198
				return
199
			}
200
		}
201
		stream.CloseSend()
202
	}()
203
	// Read all the responses.
204
	var rpcStatus error
205
	respCount := 0
206
RESP:
207
	for {
208
		select {
209
		case err := <-errCh:
210
			rpcStatus = err
211
			break RESP
212
		default:
213
			r, err := stream.Recv()
214
			if err != nil {
215
				rpcStatus = err
216
				break RESP
217
			}
218
			if r.Message != message {
219
				rpcStatus = fmt.Errorf("unexpected response message: %s", r.Message)
220
				break
221
			}
222
			respCount++
223
		}
224
	}
225
	if rpcStatus != io.EOF || respCount != streamingCount {
226
		log.DefaultLogger.Errorf("failed to finish server streaming: %v, response count: %d", rpcStatus, respCount)
227
		return errors.New("unexpected response")
228
	}
229
	// Read the trailer after the RPC is finished.
230
	trailer := stream.Trailer()
231
	if _, ok := trailer["timestamp"]; !ok {
232
		log.DefaultLogger.Errorf("timestamp expected but doesn't exist in trailer")
233
		return errors.New("timestamp expected but doesn't exist in trailer")
234
	}
235
	return nil
236
}
237

238
const ConfigStreamGrpcFilter = `{
239
	"servers":[
240
		{
241
			"default_log_path":"stdout",
242
			"default_log_level":"ERROR",
243
			"listeners":[
244
				{
245
					"address":"127.0.0.1:2045",
246
					"bind_port": true,
247
					"filter_chains": [{
248
						"filters": [
249
							{
250
								"type":"grpc",
251
								"config": {
252
									"server_name":"echo"
253
								}
254
							}
255
						]
256
					}]
257
				}
258
			]
259
		}
260
	]
261
}`
262

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.