mosn

Форк
0
203 строки · 5.8 Кб
1
package main
2

3
import (
4
	"context"
5
	"encoding/json"
6
	"fmt"
7
	"io"
8
	"math/rand"
9
	"time"
10

11
	"google.golang.org/grpc"
12
	"google.golang.org/grpc/codes"
13
	echopb "google.golang.org/grpc/examples/features/proto/echo"
14
	hellopb "google.golang.org/grpc/examples/helloworld/helloworld"
15
	"google.golang.org/grpc/metadata"
16
	"google.golang.org/grpc/status"
17
	mgrpc "mosn.io/mosn/pkg/filter/network/grpc"
18
)
19

20
func init() {
21
	mgrpc.RegisterServerHandler("hello", NewHelloExampleGrpcServer)
22
	mgrpc.RegisterServerHandler("echo", NewEchoStreamGrpcServer)
23
}
24

25
// helloServer is used to implement helloworld.GreeterServer.
26
type helloServer struct {
27
	hellopb.UnimplementedGreeterServer
28
}
29

30
// SayHello implements helloworld.GreeterServer
31
func (s *helloServer) SayHello(ctx context.Context, in *hellopb.HelloRequest) (*hellopb.HelloReply, error) {
32
	return &hellopb.HelloReply{Message: "Hello " + in.GetName()}, nil
33
}
34

35
func NewHelloExampleGrpcServer(_ json.RawMessage, options ...grpc.ServerOption) (mgrpc.RegisteredServer, error) {
36
	s := grpc.NewServer(options...)
37
	hellopb.RegisterGreeterServer(s, &helloServer{})
38
	return s, nil
39
}
40

41
const (
42
	timestampFormat = time.StampNano // "Jan _2 15:04:05.000"
43
	streamingCount  = 10
44
)
45

46
// echoServer is used to implement echo.EchoServer
47
type echoServer struct {
48
	echopb.UnimplementedEchoServer
49
}
50

51
func (s *echoServer) UnaryEcho(ctx context.Context, in *echopb.EchoRequest) (*echopb.EchoResponse, error) {
52
	fmt.Printf("--- UnaryEcho ---\n")
53
	// Create trailer in defer to record function return time.
54
	defer func() {
55
		trailer := metadata.Pairs("timestamp", time.Now().Format(timestampFormat))
56
		grpc.SetTrailer(ctx, trailer)
57
	}()
58

59
	// Read metadata from client.
60
	md, ok := metadata.FromIncomingContext(ctx)
61
	if !ok {
62
		return nil, status.Errorf(codes.DataLoss, "UnaryEcho: failed to get metadata")
63
	}
64
	if t, ok := md["timestamp"]; ok {
65
		fmt.Printf("timestamp from metadata:\n")
66
		for i, e := range t {
67
			fmt.Printf(" %d. %s\n", i, e)
68
		}
69
	}
70

71
	// Create and send header.
72
	header := metadata.New(map[string]string{"location": "MTV", "timestamp": time.Now().Format(timestampFormat)})
73
	grpc.SendHeader(ctx, header)
74

75
	fmt.Printf("request received: %v, sending echo\n", in)
76

77
	return &echopb.EchoResponse{Message: in.Message}, nil
78
}
79

80
func (s *echoServer) ServerStreamingEcho(in *echopb.EchoRequest, stream echopb.Echo_ServerStreamingEchoServer) error {
81
	fmt.Printf("--- ServerStreamingEcho ---\n")
82
	// Create trailer in defer to record function return time.
83
	defer func() {
84
		trailer := metadata.Pairs("timestamp", time.Now().Format(timestampFormat))
85
		stream.SetTrailer(trailer)
86
	}()
87

88
	// Read metadata from client.
89
	md, ok := metadata.FromIncomingContext(stream.Context())
90
	if !ok {
91
		return status.Errorf(codes.DataLoss, "ServerStreamingEcho: failed to get metadata")
92
	}
93
	if t, ok := md["timestamp"]; ok {
94
		fmt.Printf("timestamp from metadata:\n")
95
		for i, e := range t {
96
			fmt.Printf(" %d. %s\n", i, e)
97
		}
98
	}
99

100
	// Create and send header.
101
	header := metadata.New(map[string]string{"location": "MTV", "timestamp": time.Now().Format(timestampFormat)})
102
	stream.SendHeader(header)
103

104
	fmt.Printf("request received: %v\n", in)
105

106
	// Read requests and send responses.
107
	for i := 0; i < streamingCount; i++ {
108
		fmt.Printf("echo message %v\n", in.Message)
109
		err := stream.Send(&echopb.EchoResponse{Message: in.Message})
110
		if err != nil {
111
			return err
112
		}
113
	}
114
	return nil
115
}
116

117
func (s *echoServer) ClientStreamingEcho(stream echopb.Echo_ClientStreamingEchoServer) error {
118
	fmt.Printf("--- ClientStreamingEcho ---\n")
119
	// Create trailer in defer to record function return time.
120
	defer func() {
121
		trailer := metadata.Pairs("timestamp", time.Now().Format(timestampFormat))
122
		stream.SetTrailer(trailer)
123
	}()
124

125
	// Read metadata from client.
126
	md, ok := metadata.FromIncomingContext(stream.Context())
127
	if !ok {
128
		return status.Errorf(codes.DataLoss, "ClientStreamingEcho: failed to get metadata")
129
	}
130
	if t, ok := md["timestamp"]; ok {
131
		fmt.Printf("timestamp from metadata:\n")
132
		for i, e := range t {
133
			fmt.Printf(" %d. %s\n", i, e)
134
		}
135
	}
136

137
	// Create and send header.
138
	header := metadata.New(map[string]string{"location": "MTV", "timestamp": time.Now().Format(timestampFormat)})
139
	stream.SendHeader(header)
140

141
	// Read requests and send responses.
142
	var message string
143
	for {
144
		in, err := stream.Recv()
145
		if err == io.EOF {
146
			fmt.Printf("echo last received message\n")
147
			return stream.SendAndClose(&echopb.EchoResponse{Message: message})
148
		}
149
		message = in.Message
150
		fmt.Printf("request received: %v, building echo\n", in)
151
		if err != nil {
152
			return err
153
		}
154
	}
155
}
156

157
func (s *echoServer) BidirectionalStreamingEcho(stream echopb.Echo_BidirectionalStreamingEchoServer) error {
158
	fmt.Printf("--- BidirectionalStreamingEcho ---\n")
159
	// Create trailer in defer to record function return time.
160
	defer func() {
161
		trailer := metadata.Pairs("timestamp", time.Now().Format(timestampFormat))
162
		stream.SetTrailer(trailer)
163
	}()
164

165
	// Read metadata from client.
166
	md, ok := metadata.FromIncomingContext(stream.Context())
167
	if !ok {
168
		return status.Errorf(codes.DataLoss, "BidirectionalStreamingEcho: failed to get metadata")
169
	}
170

171
	if t, ok := md["timestamp"]; ok {
172
		fmt.Printf("timestamp from metadata:\n")
173
		for i, e := range t {
174
			fmt.Printf(" %d. %s\n", i, e)
175
		}
176
	}
177

178
	// Create and send header.
179
	header := metadata.New(map[string]string{"location": "MTV", "timestamp": time.Now().Format(timestampFormat)})
180
	stream.SendHeader(header)
181

182
	// Read requests and send responses.
183
	for {
184
		in, err := stream.Recv()
185
		if err == io.EOF {
186
			return nil
187
		}
188
		if err != nil {
189
			return err
190
		}
191
		fmt.Printf("request received %v, sending echo\n", in)
192
		if err := stream.Send(&echopb.EchoResponse{Message: in.Message}); err != nil {
193
			return err
194
		}
195
	}
196
}
197

198
func NewEchoStreamGrpcServer(_ json.RawMessage, options ...grpc.ServerOption) (mgrpc.RegisteredServer, error) {
199
	rand.Seed(time.Now().UnixNano())
200
	s := grpc.NewServer(options...)
201
	echopb.RegisterEchoServer(s, &echoServer{})
202
	return s, nil
203
}
204

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.