14
"google.golang.org/grpc"
15
pb "google.golang.org/grpc/examples/features/proto/echo"
16
"google.golang.org/grpc/metadata"
17
"mosn.io/mosn/pkg/log"
18
. "mosn.io/mosn/test/framework"
19
"mosn.io/mosn/test/lib"
23
timestampFormat = time.StampNano // "Jan _2 15:04:05.000"
27
func TestStreamGrpc(t *testing.T) {
28
Scenario(t, "streaming grpc server is mosn networkfilter", func() {
29
_, _ = lib.InitMosn(ConfigStreamGrpcFilter) // no servers need
30
Case("server streaming", func() {
31
conn, err := grpc.Dial("127.0.0.1:2045", grpc.WithInsecure(), grpc.WithBlock())
32
Verify(err, Equal, nil)
34
c := pb.NewEchoClient(conn)
35
message := "this is server streaming metadata"
36
Verify(serverStreamingWithMetadata(c, message), Equal, nil)
38
Case("client streaming", func() {
39
conn, err := grpc.Dial("127.0.0.1:2045", grpc.WithInsecure(), grpc.WithBlock())
40
Verify(err, Equal, nil)
42
c := pb.NewEchoClient(conn)
43
message := "this is client streaming metadata"
44
Verify(clientStreamWithMetadata(c, message), Equal, nil)
46
Case("bidrectional", func() {
47
conn, err := grpc.Dial("127.0.0.1:2045", grpc.WithInsecure(), grpc.WithBlock())
48
Verify(err, Equal, nil)
50
c := pb.NewEchoClient(conn)
51
message := "this is bidrectional metadata"
52
Verify(bidirectionalWithMetadata(c, message), Equal, nil)
57
// stream grpc client is implemented with reference to grpc/examples/features/proto/echo
58
func serverStreamingWithMetadata(c pb.EchoClient, message string) error {
59
// Create metadata and context.
60
md := metadata.Pairs("timestamp", time.Now().Format(timestampFormat))
61
ctx := metadata.NewOutgoingContext(context.Background(), md)
62
// Make RPC using the context with the metadata.
63
stream, err := c.ServerStreamingEcho(ctx, &pb.EchoRequest{Message: message})
65
log.DefaultLogger.Errorf("failed to call ServerStreamingEcho: %v", err)
68
// Read the header when the header arrives.
69
header, err := stream.Header()
71
log.DefaultLogger.Errorf("failed to get header from stream: %v", err)
74
// Read metadata from server's header.
75
if _, ok := header["timestamp"]; !ok {
76
log.DefaultLogger.Errorf("timestamp expected but doesn't exist in header")
77
return errors.New("timestamp expected but doesn't exist in header")
79
if _, ok := header["location"]; !ok {
80
log.DefaultLogger.Errorf("location expected but doesn't exist in header")
81
return errors.New("location expected but doesn't exist in header")
83
// Read all the responses.
87
r, err := stream.Recv()
92
if r.Message != message {
93
rpcStatus = fmt.Errorf("unexpected response message: %s", r.Message)
98
if rpcStatus != io.EOF || respCount != streamingCount {
99
log.DefaultLogger.Errorf("failed to finish server streaming: %v, response count: %d", rpcStatus, respCount)
100
return errors.New("unexpected response")
102
// Read the trailer after the RPC is finished.
103
trailer := stream.Trailer()
104
if _, ok := trailer["timestamp"]; !ok {
105
log.DefaultLogger.Errorf("timestamp expected but doesn't exist in trailer")
106
return errors.New("timestamp expected but doesn't exist in trailer")
111
func clientStreamWithMetadata(c pb.EchoClient, message string) error {
112
// Create metadata and context.
113
md := metadata.Pairs("timestamp", time.Now().Format(timestampFormat))
114
ctx := metadata.NewOutgoingContext(context.Background(), md)
116
// Make RPC using the context with the metadata.
117
stream, err := c.ClientStreamingEcho(ctx)
119
log.DefaultLogger.Errorf("failed to get header from stream: %v", err)
122
// Read the header when the header arrives.
123
header, err := stream.Header()
125
log.DefaultLogger.Errorf("failed to get header from stream: %v", err)
128
// Read metadata from server's header.
129
if _, ok := header["timestamp"]; !ok {
130
log.DefaultLogger.Errorf("timestamp expected but doesn't exist in header")
131
return errors.New("timestamp expected but doesn't exist in header")
133
if _, ok := header["location"]; !ok {
134
log.DefaultLogger.Errorf("location expected but doesn't exist in header")
135
return errors.New("location expected but doesn't exist in header")
137
// Send all requests to the server.
138
for i := 0; i < streamingCount; i++ {
139
if err := stream.Send(&pb.EchoRequest{Message: message}); err != nil {
140
log.DefaultLogger.Errorf("failed to send streaming: %v", err)
144
// Read the response.
145
r, err := stream.CloseAndRecv()
147
log.DefaultLogger.Errorf("failed to CloseAndRecv: %v", err)
150
if r.Message != message {
151
return fmt.Errorf("unexpected response message: %s", r.Message)
153
// Read the trailer after the RPC is finished.
154
trailer := stream.Trailer()
155
if _, ok := trailer["timestamp"]; !ok {
156
log.DefaultLogger.Errorf("timestamp expected but doesn't exist in trailer")
157
return errors.New("timestamp expected but doesn't exist in trailer")
162
func bidirectionalWithMetadata(c pb.EchoClient, message string) error {
163
// Create metadata and context.
164
md := metadata.Pairs("timestamp", time.Now().Format(timestampFormat))
165
ctx := metadata.NewOutgoingContext(context.Background(), md)
167
// Make RPC using the context with the metadata.
168
stream, err := c.BidirectionalStreamingEcho(ctx)
170
log.DefaultLogger.Errorf("failed to call BidirectionalStreamingEcho: %v", err)
173
errCh := make(chan error)
175
// Read the header when the header arrives.
176
header, err := stream.Header()
178
log.DefaultLogger.Errorf("failed to get header from stream: %v", err)
182
// Read metadata from server's header.
183
if _, ok := header["timestamp"]; !ok {
184
log.DefaultLogger.Errorf("timestamp expected but doesn't exist in header")
185
errCh <- errors.New("timestamp expected but doesn't exist in header")
188
if _, ok := header["location"]; !ok {
189
log.DefaultLogger.Errorf("location expected but doesn't exist in header")
190
errCh <- errors.New("location expected but doesn't exist in header")
193
// Send all requests to the server.
194
for i := 0; i < streamingCount; i++ {
195
if err := stream.Send(&pb.EchoRequest{Message: message}); err != nil {
196
log.DefaultLogger.Errorf("failed to send streaming: %v", err)
203
// Read all the responses.
213
r, err := stream.Recv()
218
if r.Message != message {
219
rpcStatus = fmt.Errorf("unexpected response message: %s", r.Message)
225
if rpcStatus != io.EOF || respCount != streamingCount {
226
log.DefaultLogger.Errorf("failed to finish server streaming: %v, response count: %d", rpcStatus, respCount)
227
return errors.New("unexpected response")
229
// Read the trailer after the RPC is finished.
230
trailer := stream.Trailer()
231
if _, ok := trailer["timestamp"]; !ok {
232
log.DefaultLogger.Errorf("timestamp expected but doesn't exist in trailer")
233
return errors.New("timestamp expected but doesn't exist in trailer")
238
const ConfigStreamGrpcFilter = `{
241
"default_log_path":"stdout",
242
"default_log_level":"ERROR",
245
"address":"127.0.0.1:2045",