ergo

Форк
0
/
process.go 
705 строк · 14.0 Кб
1
package node
2

3
import (
4
	"context"
5
	"fmt"
6
	"sync"
7
	"time"
8

9
	"github.com/ergo-services/ergo/etf"
10
	"github.com/ergo-services/ergo/gen"
11
	"github.com/ergo-services/ergo/lib"
12
)
13

14
var (
15
	syncReplyChannels = &sync.Pool{
16
		New: func() interface{} {
17
			return make(chan syncReplyMessage, 2)
18
		},
19
	}
20
)
21

22
type syncReplyMessage struct {
23
	value etf.Term
24
	err   error
25
}
26

27
type process struct {
28
	coreInternal
29
	sync.RWMutex
30

31
	name     string
32
	self     etf.Pid
33
	behavior gen.ProcessBehavior
34
	env      map[gen.EnvKey]interface{}
35

36
	parent      *process
37
	groupLeader gen.Process
38
	aliases     []etf.Alias
39

40
	mailBox      chan gen.ProcessMailboxMessage
41
	gracefulExit chan gen.ProcessGracefulExitRequest
42
	direct       chan gen.ProcessDirectMessage
43

44
	context context.Context
45
	kill    context.CancelFunc
46
	exit    processExitFunc
47

48
	replyMutex sync.RWMutex
49
	reply      map[etf.Ref]chan syncReplyMessage
50

51
	trapExit    bool
52
	compression Compression
53

54
	fallback gen.ProcessFallback
55
}
56

57
type processOptions struct {
58
	gen.ProcessOptions
59
	parent *process
60
}
61

62
type processExitFunc func(from etf.Pid, reason string) error
63

64
// Self
65
func (p *process) Self() etf.Pid {
66
	return p.self
67
}
68

69
// Name
70
func (p *process) Name() string {
71
	return p.name
72
}
73

74
// RegisterName
75
func (p *process) RegisterName(name string) error {
76
	if p.behavior == nil {
77
		return lib.ErrProcessTerminated
78
	}
79
	return p.registerName(name, p.self)
80
}
81

82
// UnregisterName
83
func (p *process) UnregisterName(name string) error {
84
	if p.behavior == nil {
85
		return lib.ErrProcessTerminated
86
	}
87
	prc := p.ProcessByName(name)
88
	if prc == nil {
89
		return lib.ErrNameUnknown
90
	}
91
	if prc.Self() != p.self {
92
		return lib.ErrNameOwner
93
	}
94
	return p.unregisterName(name)
95
}
96

97
// Kill
98
func (p *process) Kill() {
99
	if p.behavior == nil {
100
		return
101
	}
102
	p.kill()
103
}
104

105
// Exit
106
func (p *process) Exit(reason string) error {
107
	if p.behavior == nil {
108
		return lib.ErrProcessTerminated
109
	}
110
	return p.exit(p.self, reason)
111
}
112

113
// Context
114
func (p *process) Context() context.Context {
115
	return p.context
116
}
117

118
// Parent
119
func (p *process) Parent() gen.Process {
120
	if p.parent == nil {
121
		return nil
122
	}
123
	return p.parent
124
}
125

126
// GroupLeader
127
func (p *process) GroupLeader() gen.Process {
128
	if p.groupLeader == nil {
129
		return nil
130
	}
131
	return p.groupLeader
132
}
133

134
// Links
135
func (p *process) Links() []etf.Pid {
136
	return p.processLinks(p.self)
137
}
138

139
// Monitors
140
func (p *process) Monitors() []etf.Pid {
141
	return p.processMonitors(p.self)
142
}
143

144
// MonitorsByName
145
func (p *process) MonitorsByName() []gen.ProcessID {
146
	return p.processMonitorsByName(p.self)
147
}
148

149
// MonitoredBy
150
func (p *process) MonitoredBy() []etf.Pid {
151
	return p.processMonitoredBy(p.self)
152
}
153

154
// Aliases
155
func (p *process) Aliases() []etf.Alias {
156
	return p.aliases
157
}
158

159
// Info
160
func (p *process) Info() gen.ProcessInfo {
161
	p.RLock()
162
	if p.behavior == nil {
163
		p.RUnlock()
164
		return gen.ProcessInfo{}
165
	}
166

167
	gl := p.self
168
	if p.groupLeader != nil {
169
		gl = p.groupLeader.Self()
170
	}
171
	p.RUnlock()
172

173
	links := p.Links()
174
	monitors := p.Monitors()
175
	monitorsByName := p.MonitorsByName()
176
	monitoredBy := p.MonitoredBy()
177
	return gen.ProcessInfo{
178
		PID:             p.self,
179
		Name:            p.name,
180
		GroupLeader:     gl,
181
		Links:           links,
182
		Monitors:        monitors,
183
		MonitorsByName:  monitorsByName,
184
		MonitoredBy:     monitoredBy,
185
		Aliases:         p.aliases,
186
		Status:          "running",
187
		MessageQueueLen: len(p.mailBox),
188
		TrapExit:        p.trapExit,
189
		Compression:     p.compression.Enable,
190
	}
191
}
192

193
// Send
194
func (p *process) Send(to interface{}, message etf.Term) error {
195
	p.RLock()
196
	if p.behavior == nil {
197
		p.RUnlock()
198
		return lib.ErrProcessTerminated
199
	}
200
	p.RUnlock()
201

202
	switch receiver := to.(type) {
203
	case etf.Pid:
204
		return p.RouteSend(p.self, receiver, message)
205
	case string:
206
		return p.RouteSendReg(p.self, gen.ProcessID{Name: receiver, Node: string(p.self.Node)}, message)
207
	case etf.Atom:
208
		return p.RouteSendReg(p.self, gen.ProcessID{Name: string(receiver), Node: string(p.self.Node)}, message)
209
	case gen.ProcessID:
210
		return p.RouteSendReg(p.self, receiver, message)
211
	case etf.Alias:
212
		return p.RouteSendAlias(p.self, receiver, message)
213
	}
214
	return fmt.Errorf("Unknown receiver type")
215
}
216

217
// SendAfter
218
func (p *process) SendAfter(to interface{}, message etf.Term, after time.Duration) gen.CancelFunc {
219

220
	timer := time.AfterFunc(after, func() { p.Send(to, message) })
221
	return timer.Stop
222
}
223

224
// CreateAlias
225
func (p *process) CreateAlias() (etf.Alias, error) {
226
	p.RLock()
227
	if p.behavior == nil {
228
		p.RUnlock()
229
		return etf.Alias{}, lib.ErrProcessTerminated
230
	}
231
	p.RUnlock()
232
	return p.newAlias(p)
233
}
234

235
// DeleteAlias
236
func (p *process) DeleteAlias(alias etf.Alias) error {
237
	p.RLock()
238
	if p.behavior == nil {
239
		p.RUnlock()
240
		return lib.ErrProcessTerminated
241
	}
242
	p.RUnlock()
243
	return p.deleteAlias(p, alias)
244
}
245

246
// ListEnv
247
func (p *process) ListEnv() map[gen.EnvKey]interface{} {
248
	p.RLock()
249
	defer p.RUnlock()
250

251
	env := make(map[gen.EnvKey]interface{})
252

253
	if p.groupLeader != nil {
254
		for key, value := range p.groupLeader.ListEnv() {
255
			env[key] = value
256
		}
257
	}
258
	if p.parent != nil {
259
		for key, value := range p.parent.ListEnv() {
260
			env[key] = value
261
		}
262
	}
263
	for key, value := range p.env {
264
		env[key] = value
265
	}
266

267
	return env
268
}
269

270
// SetEnv
271
func (p *process) SetEnv(name gen.EnvKey, value interface{}) {
272
	p.Lock()
273
	defer p.Unlock()
274

275
	if value == nil {
276
		delete(p.env, name)
277
		return
278
	}
279
	p.env[name] = value
280
}
281

282
// Env
283
func (p *process) Env(name gen.EnvKey) interface{} {
284
	p.RLock()
285
	defer p.RUnlock()
286

287
	if value, ok := p.env[name]; ok {
288
		return value
289
	}
290

291
	if p.groupLeader != nil {
292
		return p.groupLeader.Env(name)
293
	}
294

295
	return nil
296
}
297

298
// Wait
299
func (p *process) Wait() {
300
	if p.IsAlive() {
301
		<-p.context.Done()
302
	}
303
}
304

305
// WaitWithTimeout
306
func (p *process) WaitWithTimeout(d time.Duration) error {
307
	if !p.IsAlive() {
308
		return nil
309
	}
310

311
	timer := time.NewTimer(d)
312
	defer timer.Stop()
313

314
	select {
315
	case <-timer.C:
316
		return lib.ErrTimeout
317
	case <-p.context.Done():
318
		return nil
319
	}
320
}
321

322
// Link
323
func (p *process) Link(with etf.Pid) error {
324
	p.RLock()
325
	if p.behavior == nil {
326
		p.RUnlock()
327
		return lib.ErrProcessTerminated
328
	}
329
	p.RUnlock()
330
	return p.RouteLink(p.self, with)
331
}
332

333
// Unlink
334
func (p *process) Unlink(with etf.Pid) error {
335
	p.RLock()
336
	if p.behavior == nil {
337
		p.RUnlock()
338
		return lib.ErrProcessTerminated
339
	}
340
	p.RUnlock()
341
	return p.RouteUnlink(p.self, with)
342
}
343

344
// IsAlive
345
func (p *process) IsAlive() bool {
346
	p.RLock()
347
	defer p.RUnlock()
348
	if p.behavior == nil {
349
		return false
350
	}
351
	return p.context.Err() == nil
352
}
353

354
// NodeName
355
func (p *process) NodeName() string {
356
	return p.coreNodeName()
357
}
358

359
// NodeStop
360
func (p *process) NodeStop() {
361
	p.coreStop()
362
}
363

364
// NodeUptime
365
func (p *process) NodeUptime() int64 {
366
	return p.coreUptime()
367
}
368

369
// Children
370
func (p *process) Children() ([]etf.Pid, error) {
371
	c, err := p.Direct(gen.MessageDirectChildren{})
372
	if err != nil {
373
		return []etf.Pid{}, err
374
	}
375
	children, correct := c.([]etf.Pid)
376
	if correct == false {
377
		return []etf.Pid{}, err
378
	}
379
	return children, nil
380
}
381

382
// SetTrapExit
383
func (p *process) SetTrapExit(trap bool) {
384
	p.trapExit = trap
385
}
386

387
// TrapExit
388
func (p *process) TrapExit() bool {
389
	return p.trapExit
390
}
391

392
// SetCompression
393
func (p *process) SetCompression(enable bool) {
394
	p.compression.Enable = enable
395
}
396

397
// Compression
398
func (p *process) Compression() bool {
399
	return p.compression.Enable
400
}
401

402
// CompressionLevel
403
func (p *process) CompressionLevel() int {
404
	return p.compression.Level
405
}
406

407
// SetCompressionLevel
408
func (p *process) SetCompressionLevel(level int) bool {
409
	if level < 1 || level > 9 {
410
		return false
411
	}
412
	p.compression.Level = level
413
	return true
414
}
415

416
// CompressionThreshold
417
func (p *process) CompressionThreshold() int {
418
	return p.compression.Threshold
419
}
420

421
// SetCompressionThreshold
422
func (p *process) SetCompressionThreshold(threshold int) bool {
423
	if threshold < DefaultCompressionThreshold {
424
		return false
425
	}
426
	p.compression.Threshold = threshold
427
	return true
428
}
429

430
// Behavior
431
func (p *process) Behavior() gen.ProcessBehavior {
432
	p.RLock()
433
	defer p.RUnlock()
434

435
	if p.behavior == nil {
436
		return nil
437
	}
438
	return p.behavior
439
}
440

441
// Direct
442
func (p *process) Direct(request interface{}) (interface{}, error) {
443
	return p.DirectWithTimeout(request, gen.DefaultCallTimeout)
444
}
445

446
// DirectWithTimeout
447
func (p *process) DirectWithTimeout(request interface{}, timeout int) (interface{}, error) {
448
	if timeout < 1 {
449
		timeout = gen.DefaultCallTimeout
450
	}
451

452
	direct := gen.ProcessDirectMessage{
453
		Ref:     p.MakeRef(),
454
		Message: request,
455
	}
456

457
	if err := p.PutSyncRequest(direct.Ref); err != nil {
458
		return nil, err
459
	}
460

461
	// sending request
462
	select {
463
	case p.direct <- direct:
464
	default:
465
		p.CancelSyncRequest(direct.Ref)
466
		return nil, lib.ErrProcessBusy
467
	}
468

469
	return p.WaitSyncReply(direct.Ref, timeout)
470
}
471

472
func (p *process) RegisterEvent(event gen.Event, messages ...gen.EventMessage) error {
473
	return p.registerEvent(p.self, event, messages)
474
}
475

476
func (p *process) UnregisterEvent(event gen.Event) error {
477
	return p.unregisterEvent(p.self, event)
478
}
479

480
func (p *process) MonitorEvent(event gen.Event) error {
481
	return p.monitorEvent(p.self, event)
482
}
483

484
func (p *process) DemonitorEvent(event gen.Event) error {
485
	return p.demonitorEvent(p.self, event)
486
}
487

488
func (p *process) SendEventMessage(event gen.Event, message gen.EventMessage) error {
489
	return p.sendEvent(p.self, event, message)
490
}
491

492
// MonitorNode
493
func (p *process) MonitorNode(name string) etf.Ref {
494
	ref := p.MakeRef()
495
	p.monitorNode(p.self, name, ref)
496
	return ref
497
}
498

499
// DemonitorNode
500
func (p *process) DemonitorNode(ref etf.Ref) bool {
501
	return p.demonitorNode(ref)
502
}
503

504
// MonitorProcess
505
func (p *process) MonitorProcess(process interface{}) etf.Ref {
506
	ref := p.MakeRef()
507
	switch mp := process.(type) {
508
	case etf.Pid:
509
		p.RouteMonitor(p.self, mp, ref)
510
		return ref
511
	case gen.ProcessID:
512
		p.RouteMonitorReg(p.self, mp, ref)
513
		return ref
514
	case string:
515
		p.RouteMonitorReg(p.self, gen.ProcessID{Name: mp, Node: string(p.self.Node)}, ref)
516
		return ref
517
	case etf.Atom:
518
		p.RouteMonitorReg(p.self, gen.ProcessID{Name: string(mp), Node: string(p.self.Node)}, ref)
519
		return ref
520
	}
521

522
	// create fake gen.ProcessID. Monitor will send MessageDown with "noproc" as a reason
523
	p.RouteMonitorReg(p.self, gen.ProcessID{Node: string(p.self.Node)}, ref)
524
	return ref
525
}
526

527
// DemonitorProcess
528
func (p *process) DemonitorProcess(ref etf.Ref) bool {
529
	if err := p.RouteDemonitor(p.self, ref); err != nil {
530
		return false
531
	}
532
	return true
533
}
534

535
// RemoteSpawn makes request to spawn new process on a remote node
536
func (p *process) RemoteSpawn(node string, object string, opts gen.RemoteSpawnOptions, args ...etf.Term) (etf.Pid, error) {
537
	return p.RemoteSpawnWithTimeout(gen.DefaultCallTimeout, node, object, opts, args...)
538
}
539

540
// RemoteSpawnWithTimeout makes request to spawn new process on a remote node with given timeout
541
func (p *process) RemoteSpawnWithTimeout(timeout int, node string, object string, opts gen.RemoteSpawnOptions, args ...etf.Term) (etf.Pid, error) {
542
	ref := p.MakeRef()
543
	p.PutSyncRequest(ref)
544
	request := gen.RemoteSpawnRequest{
545
		From:    p.self,
546
		Ref:     ref,
547
		Options: opts,
548
	}
549
	if err := p.RouteSpawnRequest(node, object, request, args...); err != nil {
550
		p.CancelSyncRequest(ref)
551
		return etf.Pid{}, err
552
	}
553

554
	reply, err := p.WaitSyncReply(ref, timeout)
555
	if err != nil {
556
		return etf.Pid{}, err
557
	}
558

559
	// Result of the operation. If Result is a process identifier,
560
	// the operation succeeded and the process identifier is the
561
	// identifier of the newly created process. If Result is an atom,
562
	// the operation failed and the atom identifies failure reason.
563
	switch r := reply.(type) {
564
	case etf.Pid:
565
		m := etf.Ref{} // empty reference
566
		if opts.Monitor != m {
567
			p.RouteMonitor(p.self, r, opts.Monitor)
568
		}
569
		if opts.Link {
570
			p.RouteLink(p.self, r)
571
		}
572
		return r, nil
573
	case etf.Atom:
574
		switch string(r) {
575
		case lib.ErrTaken.Error():
576
			return etf.Pid{}, lib.ErrTaken
577
		case lib.ErrBehaviorUnknown.Error():
578
			return etf.Pid{}, lib.ErrBehaviorUnknown
579
		}
580
		return etf.Pid{}, fmt.Errorf(string(r))
581
	}
582

583
	return etf.Pid{}, fmt.Errorf("unknown result: %#v", reply)
584
}
585

586
// Spawn
587
func (p *process) Spawn(name string, opts gen.ProcessOptions, behavior gen.ProcessBehavior, args ...etf.Term) (gen.Process, error) {
588
	options := processOptions{
589
		ProcessOptions: opts,
590
		parent:         p,
591
	}
592
	return p.spawn(name, options, behavior, args...)
593
}
594

595
// PutSyncRequest
596
func (p *process) PutSyncRequest(ref etf.Ref) error {
597
	var preply map[etf.Ref]chan syncReplyMessage
598
	p.RLock()
599
	preply = p.reply
600
	p.RUnlock()
601

602
	if preply == nil {
603
		return lib.ErrProcessTerminated
604
	}
605

606
	reply := syncReplyChannels.Get().(chan syncReplyMessage)
607
	p.replyMutex.Lock()
608
	preply[ref] = reply
609
	p.replyMutex.Unlock()
610
	return nil
611
}
612

613
// PutSyncReply
614
func (p *process) PutSyncReply(ref etf.Ref, reply etf.Term, err error) error {
615
	var preply map[etf.Ref]chan syncReplyMessage
616
	p.RLock()
617
	preply = p.reply
618
	p.RUnlock()
619

620
	if preply == nil {
621
		return lib.ErrProcessTerminated
622
	}
623

624
	p.replyMutex.RLock()
625
	rep, ok := preply[ref]
626
	defer p.replyMutex.RUnlock()
627

628
	if !ok {
629
		// no process waiting for it
630
		return lib.ErrReferenceUnknown
631
	}
632
	select {
633
	case rep <- syncReplyMessage{value: reply, err: err}:
634
	}
635
	return nil
636
}
637

638
// CancelSyncRequest
639
func (p *process) CancelSyncRequest(ref etf.Ref) {
640
	var preply map[etf.Ref]chan syncReplyMessage
641
	p.RLock()
642
	preply = p.reply
643
	p.RUnlock()
644

645
	if preply == nil {
646
		return
647
	}
648

649
	p.replyMutex.Lock()
650
	delete(preply, ref)
651
	p.replyMutex.Unlock()
652
}
653

654
// WaitSyncReply
655
func (p *process) WaitSyncReply(ref etf.Ref, timeout int) (etf.Term, error) {
656
	var preply map[etf.Ref]chan syncReplyMessage
657
	p.RLock()
658
	preply = p.reply
659
	p.RUnlock()
660

661
	if preply == nil {
662
		return nil, lib.ErrProcessTerminated
663
	}
664

665
	p.replyMutex.RLock()
666
	reply, wait_for_reply := preply[ref]
667
	p.replyMutex.RUnlock()
668

669
	if wait_for_reply == false {
670
		return nil, fmt.Errorf("Unknown request")
671
	}
672

673
	defer func(ref etf.Ref) {
674
		p.replyMutex.Lock()
675
		delete(preply, ref)
676
		p.replyMutex.Unlock()
677
	}(ref)
678

679
	timer := lib.TakeTimer()
680
	defer lib.ReleaseTimer(timer)
681
	timer.Reset(time.Second * time.Duration(timeout))
682

683
	for {
684
		select {
685
		case m := <-reply:
686
			// get back 'reply' struct to the pool
687
			syncReplyChannels.Put(reply)
688
			return m.value, m.err
689
		case <-timer.C:
690
			return nil, lib.ErrTimeout
691
		case <-p.context.Done():
692
			return nil, lib.ErrProcessTerminated
693
		}
694
	}
695

696
}
697

698
// ProcessChannels
699
func (p *process) ProcessChannels() gen.ProcessChannels {
700
	return gen.ProcessChannels{
701
		Mailbox:      p.mailBox,
702
		Direct:       p.direct,
703
		GracefulExit: p.gracefulExit,
704
	}
705
}
706

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.