podman

Форк
0
234 строки · 5.7 Кб
1
// Copyright (c) 2012-2020 Ugorji Nwoke. All rights reserved.
2
// Use of this source code is governed by a MIT license found in the LICENSE file.
3

4
package codec
5

6
import (
7
	"bufio"
8
	"errors"
9
	"io"
10
	"net/rpc"
11
)
12

13
var (
14
	errRpcIsClosed = errors.New("rpc - connection has been closed")
15
	errRpcNoConn   = errors.New("rpc - no connection")
16

17
	rpcSpaceArr = [1]byte{' '}
18
)
19

20
// Rpc provides a rpc Server or Client Codec for rpc communication.
21
type Rpc interface {
22
	ServerCodec(conn io.ReadWriteCloser, h Handle) rpc.ServerCodec
23
	ClientCodec(conn io.ReadWriteCloser, h Handle) rpc.ClientCodec
24
}
25

26
// RPCOptions holds options specific to rpc functionality
27
type RPCOptions struct {
28
	// RPCNoBuffer configures whether we attempt to buffer reads and writes during RPC calls.
29
	//
30
	// Set RPCNoBuffer=true to turn buffering off.
31
	// Buffering can still be done if buffered connections are passed in, or
32
	// buffering is configured on the handle.
33
	RPCNoBuffer bool
34
}
35

36
// rpcCodec defines the struct members and common methods.
37
type rpcCodec struct {
38
	c io.Closer
39
	r io.Reader
40
	w io.Writer
41
	f ioFlusher
42

43
	dec *Decoder
44
	enc *Encoder
45
	h   Handle
46

47
	cls atomicClsErr
48
}
49

50
func newRPCCodec(conn io.ReadWriteCloser, h Handle) rpcCodec {
51
	return newRPCCodec2(conn, conn, conn, h)
52
}
53

54
func newRPCCodec2(r io.Reader, w io.Writer, c io.Closer, h Handle) rpcCodec {
55
	bh := h.getBasicHandle()
56
	// if the writer can flush, ensure we leverage it, else
57
	// we may hang waiting on read if write isn't flushed.
58
	// var f ioFlusher
59
	f, ok := w.(ioFlusher)
60
	if !bh.RPCNoBuffer {
61
		if bh.WriterBufferSize <= 0 {
62
			if !ok { // a flusher means there's already a buffer
63
				bw := bufio.NewWriter(w)
64
				f, w = bw, bw
65
			}
66
		}
67
		if bh.ReaderBufferSize <= 0 {
68
			if _, ok = w.(ioBuffered); !ok {
69
				r = bufio.NewReader(r)
70
			}
71
		}
72
	}
73
	return rpcCodec{
74
		c:   c,
75
		w:   w,
76
		r:   r,
77
		f:   f,
78
		h:   h,
79
		enc: NewEncoder(w, h),
80
		dec: NewDecoder(r, h),
81
	}
82
}
83

84
func (c *rpcCodec) write(obj ...interface{}) (err error) {
85
	err = c.ready()
86
	if err != nil {
87
		return
88
	}
89
	if c.f != nil {
90
		defer func() {
91
			flushErr := c.f.Flush()
92
			if flushErr != nil && err == nil {
93
				err = flushErr
94
			}
95
		}()
96
	}
97

98
	for _, o := range obj {
99
		err = c.enc.Encode(o)
100
		if err != nil {
101
			return
102
		}
103
		// defensive: ensure a space is always written after each encoding,
104
		// in case the value was a number, and encoding a value right after
105
		// without a space will lead to invalid output.
106
		if c.h.isJson() {
107
			_, err = c.w.Write(rpcSpaceArr[:])
108
			if err != nil {
109
				return
110
			}
111
		}
112
	}
113
	return
114
}
115

116
func (c *rpcCodec) read(obj interface{}) (err error) {
117
	err = c.ready()
118
	if err == nil {
119
		//If nil is passed in, we should read and discard
120
		if obj == nil {
121
			// return c.dec.Decode(&obj)
122
			err = c.dec.swallowErr()
123
		} else {
124
			err = c.dec.Decode(obj)
125
		}
126
	}
127
	return
128
}
129

130
func (c *rpcCodec) Close() (err error) {
131
	if c.c != nil {
132
		cls := c.cls.load()
133
		if !cls.closed {
134
			cls.err = c.c.Close()
135
			cls.closed = true
136
			c.cls.store(cls)
137
		}
138
		err = cls.err
139
	}
140
	return
141
}
142

143
func (c *rpcCodec) ready() (err error) {
144
	if c.c == nil {
145
		err = errRpcNoConn
146
	} else {
147
		cls := c.cls.load()
148
		if cls.closed {
149
			if err = cls.err; err == nil {
150
				err = errRpcIsClosed
151
			}
152
		}
153
	}
154
	return
155
}
156

157
func (c *rpcCodec) ReadResponseBody(body interface{}) error {
158
	return c.read(body)
159
}
160

161
// -------------------------------------
162

163
type goRpcCodec struct {
164
	rpcCodec
165
}
166

167
func (c *goRpcCodec) WriteRequest(r *rpc.Request, body interface{}) error {
168
	return c.write(r, body)
169
}
170

171
func (c *goRpcCodec) WriteResponse(r *rpc.Response, body interface{}) error {
172
	return c.write(r, body)
173
}
174

175
func (c *goRpcCodec) ReadResponseHeader(r *rpc.Response) error {
176
	return c.read(r)
177
}
178

179
func (c *goRpcCodec) ReadRequestHeader(r *rpc.Request) error {
180
	return c.read(r)
181
}
182

183
func (c *goRpcCodec) ReadRequestBody(body interface{}) error {
184
	return c.read(body)
185
}
186

187
// -------------------------------------
188

189
// goRpc is the implementation of Rpc that uses the communication protocol
190
// as defined in net/rpc package.
191
type goRpc struct{}
192

193
// GoRpc implements Rpc using the communication protocol defined in net/rpc package.
194
//
195
// Note: network connection (from net.Dial, of type io.ReadWriteCloser) is not buffered.
196
//
197
// For performance, you should configure WriterBufferSize and ReaderBufferSize on the handle.
198
// This ensures we use an adequate buffer during reading and writing.
199
// If not configured, we will internally initialize and use a buffer during reads and writes.
200
// This can be turned off via the RPCNoBuffer option on the Handle.
201
//
202
//	var handle codec.JsonHandle
203
//	handle.RPCNoBuffer = true // turns off attempt by rpc module to initialize a buffer
204
//
205
// Example 1: one way of configuring buffering explicitly:
206
//
207
//	var handle codec.JsonHandle // codec handle
208
//	handle.ReaderBufferSize = 1024
209
//	handle.WriterBufferSize = 1024
210
//	var conn io.ReadWriteCloser // connection got from a socket
211
//	var serverCodec = GoRpc.ServerCodec(conn, handle)
212
//	var clientCodec = GoRpc.ClientCodec(conn, handle)
213
//
214
// Example 2: you can also explicitly create a buffered connection yourself,
215
// and not worry about configuring the buffer sizes in the Handle.
216
//
217
//	var handle codec.Handle     // codec handle
218
//	var conn io.ReadWriteCloser // connection got from a socket
219
//	var bufconn = struct {      // bufconn here is a buffered io.ReadWriteCloser
220
//	    io.Closer
221
//	    *bufio.Reader
222
//	    *bufio.Writer
223
//	}{conn, bufio.NewReader(conn), bufio.NewWriter(conn)}
224
//	var serverCodec = GoRpc.ServerCodec(bufconn, handle)
225
//	var clientCodec = GoRpc.ClientCodec(bufconn, handle)
226
var GoRpc goRpc
227

228
func (x goRpc) ServerCodec(conn io.ReadWriteCloser, h Handle) rpc.ServerCodec {
229
	return &goRpcCodec{newRPCCodec(conn, h)}
230
}
231

232
func (x goRpc) ClientCodec(conn io.ReadWriteCloser, h Handle) rpc.ClientCodec {
233
	return &goRpcCodec{newRPCCodec(conn, h)}
234
}
235

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.