podman

Форк
0
413 строк · 10.1 Кб
1
// Copyright 2018 The go-libvirt Authors.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//   http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
package libvirt
16

17
import (
18
	"bytes"
19
	"errors"
20
	"fmt"
21
	"io"
22
	"reflect"
23
	"strings"
24
	"sync/atomic"
25

26
	"github.com/digitalocean/go-libvirt/internal/constants"
27
	"github.com/digitalocean/go-libvirt/internal/event"
28
	xdr "github.com/digitalocean/go-libvirt/internal/go-xdr/xdr2"
29
	"github.com/digitalocean/go-libvirt/socket"
30
)
31

32
// ErrUnsupported is returned if a procedure is not supported by libvirt
33
var ErrUnsupported = errors.New("unsupported procedure requested")
34

35
// internal rpc response
36
type response struct {
37
	Payload []byte
38
	Status  uint32
39
}
40

41
// Error reponse from libvirt
42
type Error struct {
43
	Code    uint32
44
	Message string
45
}
46

47
func (e Error) Error() string {
48
	return e.Message
49
}
50

51
// checkError is used to check whether an error is a libvirtError, and if it is,
52
// whether its error code matches the one passed in. It will return false if
53
// these conditions are not met.
54
func checkError(err error, expectedError ErrorNumber) bool {
55
	for err != nil {
56
		e, ok := err.(Error)
57
		if ok {
58
			return e.Code == uint32(expectedError)
59
		}
60
		err = errors.Unwrap(err)
61
	}
62
	return false
63
}
64

65
// IsNotFound detects libvirt's ERR_NO_DOMAIN.
66
func IsNotFound(err error) bool {
67
	return checkError(err, ErrNoDomain)
68
}
69

70
// callback sends RPC responses to respective callers.
71
func (l *Libvirt) callback(id int32, res response) {
72
	l.cmux.Lock()
73
	defer l.cmux.Unlock()
74

75
	c, ok := l.callbacks[id]
76
	if !ok {
77
		return
78
	}
79

80
	c <- res
81
}
82

83
// Route sends incoming packets to their listeners.
84
func (l *Libvirt) Route(h *socket.Header, buf []byte) {
85
	// Route events to their respective listener
86
	var event event.Event
87

88
	switch {
89
	case h.Program == constants.QEMUProgram && h.Procedure == constants.QEMUProcDomainMonitorEvent:
90
		event = &DomainEvent{}
91
	case h.Program == constants.Program && h.Procedure == constants.ProcDomainEventCallbackLifecycle:
92
		event = &DomainEventCallbackLifecycleMsg{}
93
	}
94

95
	if event != nil {
96
		err := eventDecoder(buf, event)
97
		if err != nil { // event was malformed, drop.
98
			return
99
		}
100

101
		l.stream(event)
102
		return
103
	}
104

105
	// send response to caller
106
	l.callback(h.Serial, response{Payload: buf, Status: h.Status})
107
}
108

109
// serial provides atomic access to the next sequential request serial number.
110
func (l *Libvirt) serial() int32 {
111
	return atomic.AddInt32(&l.s, 1)
112
}
113

114
// stream decodes and relays domain events to their respective listener.
115
func (l *Libvirt) stream(e event.Event) {
116
	l.emux.RLock()
117
	defer l.emux.RUnlock()
118

119
	q, ok := l.events[e.GetCallbackID()]
120
	if !ok {
121
		return
122
	}
123

124
	q.Push(e)
125
}
126

127
// addStream configures the routing for an event stream.
128
func (l *Libvirt) addStream(s *event.Stream) {
129
	l.emux.Lock()
130
	defer l.emux.Unlock()
131

132
	l.events[s.CallbackID] = s
133
}
134

135
// removeStream deletes an event stream. The caller should first notify libvirt
136
// to stop sending events for this stream. Subsequent calls to removeStream are
137
// idempotent and return nil.
138
func (l *Libvirt) removeStream(id int32) error {
139
	l.emux.Lock()
140
	defer l.emux.Unlock()
141

142
	// if the event is already removed, just return nil
143
	q, ok := l.events[id]
144
	if ok {
145
		delete(l.events, id)
146
		q.Shutdown()
147
	}
148

149
	return nil
150
}
151

152
// removeAllStreams deletes all event streams.  This is meant to be used to
153
// clean up only once the underlying connection to libvirt is disconnected and
154
// thus does not attempt to notify libvirt to stop sending events.
155
func (l *Libvirt) removeAllStreams() {
156
	l.emux.Lock()
157
	defer l.emux.Unlock()
158

159
	for _, ev := range l.events {
160
		ev.Shutdown()
161
		delete(l.events, ev.CallbackID)
162
	}
163
}
164

165
// register configures a method response callback
166
func (l *Libvirt) register(id int32, c chan response) {
167
	l.cmux.Lock()
168
	defer l.cmux.Unlock()
169

170
	l.callbacks[id] = c
171
}
172

173
// deregister destroys a method response callback. It is the responsibility of
174
// the caller to manage locking (l.cmux) during this call.
175
func (l *Libvirt) deregister(id int32) {
176
	_, ok := l.callbacks[id]
177
	if !ok {
178
		return
179
	}
180

181
	close(l.callbacks[id])
182
	delete(l.callbacks, id)
183
}
184

185
// deregisterAll closes all waiting callback channels. This is used to clean up
186
// if the connection to libvirt is lost. Callers waiting for responses will
187
// return an error when the response channel is closed, rather than just
188
// hanging.
189
func (l *Libvirt) deregisterAll() {
190
	l.cmux.Lock()
191
	defer l.cmux.Unlock()
192

193
	for id := range l.callbacks {
194
		l.deregister(id)
195
	}
196
}
197

198
// request performs a libvirt RPC request.
199
// returns response returned by server.
200
// if response is not OK, decodes error from it and returns it.
201
func (l *Libvirt) request(proc uint32, program uint32, payload []byte) (response, error) {
202
	return l.requestStream(proc, program, payload, nil, nil)
203
}
204

205
// requestStream performs a libvirt RPC request. The `out` and `in` parameters
206
// are optional, and should be nil when RPC endpoints don't return a stream.
207
func (l *Libvirt) requestStream(proc uint32, program uint32, payload []byte,
208
	out io.Reader, in io.Writer) (response, error) {
209
	serial := l.serial()
210
	c := make(chan response)
211

212
	l.register(serial, c)
213
	defer func() {
214
		l.cmux.Lock()
215
		defer l.cmux.Unlock()
216

217
		l.deregister(serial)
218
	}()
219

220
	err := l.socket.SendPacket(serial, proc, program, payload, socket.Call,
221
		socket.StatusOK)
222
	if err != nil {
223
		return response{}, err
224
	}
225

226
	resp, err := l.getResponse(c)
227
	if err != nil {
228
		return resp, err
229
	}
230

231
	if out != nil {
232
		abort := make(chan bool)
233
		outErr := make(chan error)
234
		go func() {
235
			outErr <- l.socket.SendStream(serial, proc, program, out, abort)
236
		}()
237

238
		// Even without incoming stream server sends confirmation once all data is received
239
		resp, err = l.processIncomingStream(c, in)
240
		if err != nil {
241
			abort <- true
242
			return resp, err
243
		}
244

245
		err = <-outErr
246
		if err != nil {
247
			return response{}, err
248
		}
249
	}
250

251
	switch in {
252
	case nil:
253
		return resp, nil
254
	default:
255
		return l.processIncomingStream(c, in)
256
	}
257
}
258

259
// processIncomingStream is called once we've successfully sent a request to
260
// libvirt. It writes the responses back to the stream passed by the caller
261
// until libvirt sends a packet with statusOK or an error.
262
func (l *Libvirt) processIncomingStream(c chan response, inStream io.Writer) (response, error) {
263
	for {
264
		resp, err := l.getResponse(c)
265
		if err != nil {
266
			return resp, err
267
		}
268

269
		// StatusOK indicates end of stream
270
		if resp.Status == socket.StatusOK {
271
			return resp, nil
272
		}
273

274
		// FIXME: this smells.
275
		// StatusError is handled in getResponse, so this must be StatusContinue
276
		// StatusContinue is only valid here for stream packets
277
		// libvirtd breaks protocol and returns StatusContinue with an
278
		// empty response Payload when the stream finishes
279
		if len(resp.Payload) == 0 {
280
			return resp, nil
281
		}
282
		if inStream != nil {
283
			_, err = inStream.Write(resp.Payload)
284
			if err != nil {
285
				return response{}, err
286
			}
287
		}
288
	}
289
}
290

291
func (l *Libvirt) getResponse(c chan response) (response, error) {
292
	resp := <-c
293
	if resp.Status == socket.StatusError {
294
		return resp, decodeError(resp.Payload)
295
	}
296

297
	return resp, nil
298
}
299

300
// encode XDR encodes the provided data.
301
func encode(data interface{}) ([]byte, error) {
302
	var buf bytes.Buffer
303
	_, err := xdr.Marshal(&buf, data)
304

305
	return buf.Bytes(), err
306
}
307

308
// decodeError extracts an error message from the provider buffer.
309
func decodeError(buf []byte) error {
310
	dec := xdr.NewDecoder(bytes.NewReader(buf))
311

312
	e := struct {
313
		Code     uint32
314
		DomainID uint32
315
		Padding  uint8
316
		Message  string
317
		Level    uint32
318
	}{}
319
	_, err := dec.Decode(&e)
320
	if err != nil {
321
		return err
322
	}
323

324
	if strings.Contains(e.Message, "unknown procedure") {
325
		return ErrUnsupported
326
	}
327

328
	// if libvirt returns ERR_OK, ignore the error
329
	if ErrorNumber(e.Code) == ErrOk {
330
		return nil
331
	}
332

333
	return Error{Code: uint32(e.Code), Message: e.Message}
334
}
335

336
// eventDecoder decodes an event from a xdr buffer.
337
func eventDecoder(buf []byte, e interface{}) error {
338
	dec := xdr.NewDecoder(bytes.NewReader(buf))
339
	_, err := dec.Decode(e)
340
	return err
341
}
342

343
type typedParamDecoder struct{}
344

345
// Decode decodes a TypedParam. These are part of the libvirt spec, and not xdr
346
// proper. TypedParams contain a name, which is called Field for some reason,
347
// and a Value, which itself has a "discriminant" - an integer enum encoding the
348
// actual type, and a value, the length of which varies based on the actual
349
// type.
350
func (tpd typedParamDecoder) Decode(d *xdr.Decoder, v reflect.Value) (int, error) {
351
	// Get the name of the typed param first
352
	name, n, err := d.DecodeString()
353
	if err != nil {
354
		return n, err
355
	}
356
	val, n2, err := tpd.decodeTypedParamValue(d)
357
	n += n2
358
	if err != nil {
359
		return n, err
360
	}
361
	tp := &TypedParam{Field: name, Value: *val}
362
	v.Set(reflect.ValueOf(*tp))
363

364
	return n, nil
365
}
366

367
// decodeTypedParamValue decodes the Value part of a TypedParam.
368
func (typedParamDecoder) decodeTypedParamValue(d *xdr.Decoder) (*TypedParamValue, int, error) {
369
	// All TypedParamValues begin with a uint32 discriminant that tells us what
370
	// type they are.
371
	discriminant, n, err := d.DecodeUint()
372
	if err != nil {
373
		return nil, n, err
374
	}
375
	var n2 int
376
	var tpv *TypedParamValue
377
	switch discriminant {
378
	case 1:
379
		var val int32
380
		n2, err = d.Decode(&val)
381
		tpv = &TypedParamValue{D: discriminant, I: val}
382
	case 2:
383
		var val uint32
384
		n2, err = d.Decode(&val)
385
		tpv = &TypedParamValue{D: discriminant, I: val}
386
	case 3:
387
		var val int64
388
		n2, err = d.Decode(&val)
389
		tpv = &TypedParamValue{D: discriminant, I: val}
390
	case 4:
391
		var val uint64
392
		n2, err = d.Decode(&val)
393
		tpv = &TypedParamValue{D: discriminant, I: val}
394
	case 5:
395
		var val float64
396
		n2, err = d.Decode(&val)
397
		tpv = &TypedParamValue{D: discriminant, I: val}
398
	case 6:
399
		var val int32
400
		n2, err = d.Decode(&val)
401
		tpv = &TypedParamValue{D: discriminant, I: val}
402
	case 7:
403
		var val string
404
		n2, err = d.Decode(&val)
405
		tpv = &TypedParamValue{D: discriminant, I: val}
406

407
	default:
408
		err = fmt.Errorf("invalid parameter type %v", discriminant)
409
	}
410
	n += n2
411

412
	return tpv, n, err
413
}
414

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.