9
"github.com/ergo-services/ergo/etf"
10
"github.com/ergo-services/ergo/gen"
11
"github.com/ergo-services/ergo/lib"
15
syncReplyChannels = &sync.Pool{
16
New: func() interface{} {
17
return make(chan syncReplyMessage, 2)
22
type syncReplyMessage struct {
33
behavior gen.ProcessBehavior
34
env map[gen.EnvKey]interface{}
37
groupLeader gen.Process
40
mailBox chan gen.ProcessMailboxMessage
41
gracefulExit chan gen.ProcessGracefulExitRequest
42
direct chan gen.ProcessDirectMessage
44
context context.Context
45
kill context.CancelFunc
48
replyMutex sync.RWMutex
49
reply map[etf.Ref]chan syncReplyMessage
52
compression Compression
54
fallback gen.ProcessFallback
57
type processOptions struct {
62
type processExitFunc func(from etf.Pid, reason string) error
65
func (p *process) Self() etf.Pid {
70
func (p *process) Name() string {
75
func (p *process) RegisterName(name string) error {
76
if p.behavior == nil {
77
return lib.ErrProcessTerminated
79
return p.registerName(name, p.self)
83
func (p *process) UnregisterName(name string) error {
84
if p.behavior == nil {
85
return lib.ErrProcessTerminated
87
prc := p.ProcessByName(name)
89
return lib.ErrNameUnknown
91
if prc.Self() != p.self {
92
return lib.ErrNameOwner
94
return p.unregisterName(name)
98
func (p *process) Kill() {
99
if p.behavior == nil {
106
func (p *process) Exit(reason string) error {
107
if p.behavior == nil {
108
return lib.ErrProcessTerminated
110
return p.exit(p.self, reason)
114
func (p *process) Context() context.Context {
119
func (p *process) Parent() gen.Process {
127
func (p *process) GroupLeader() gen.Process {
128
if p.groupLeader == nil {
135
func (p *process) Links() []etf.Pid {
136
return p.processLinks(p.self)
140
func (p *process) Monitors() []etf.Pid {
141
return p.processMonitors(p.self)
145
func (p *process) MonitorsByName() []gen.ProcessID {
146
return p.processMonitorsByName(p.self)
150
func (p *process) MonitoredBy() []etf.Pid {
151
return p.processMonitoredBy(p.self)
155
func (p *process) Aliases() []etf.Alias {
160
func (p *process) Info() gen.ProcessInfo {
162
if p.behavior == nil {
164
return gen.ProcessInfo{}
168
if p.groupLeader != nil {
169
gl = p.groupLeader.Self()
174
monitors := p.Monitors()
175
monitorsByName := p.MonitorsByName()
176
monitoredBy := p.MonitoredBy()
177
return gen.ProcessInfo{
183
MonitorsByName: monitorsByName,
184
MonitoredBy: monitoredBy,
187
MessageQueueLen: len(p.mailBox),
188
TrapExit: p.trapExit,
189
Compression: p.compression.Enable,
194
func (p *process) Send(to interface{}, message etf.Term) error {
196
if p.behavior == nil {
198
return lib.ErrProcessTerminated
202
switch receiver := to.(type) {
204
return p.RouteSend(p.self, receiver, message)
206
return p.RouteSendReg(p.self, gen.ProcessID{Name: receiver, Node: string(p.self.Node)}, message)
208
return p.RouteSendReg(p.self, gen.ProcessID{Name: string(receiver), Node: string(p.self.Node)}, message)
210
return p.RouteSendReg(p.self, receiver, message)
212
return p.RouteSendAlias(p.self, receiver, message)
214
return fmt.Errorf("Unknown receiver type")
218
func (p *process) SendAfter(to interface{}, message etf.Term, after time.Duration) gen.CancelFunc {
220
timer := time.AfterFunc(after, func() { p.Send(to, message) })
225
func (p *process) CreateAlias() (etf.Alias, error) {
227
if p.behavior == nil {
229
return etf.Alias{}, lib.ErrProcessTerminated
236
func (p *process) DeleteAlias(alias etf.Alias) error {
238
if p.behavior == nil {
240
return lib.ErrProcessTerminated
243
return p.deleteAlias(p, alias)
247
func (p *process) ListEnv() map[gen.EnvKey]interface{} {
251
env := make(map[gen.EnvKey]interface{})
253
if p.groupLeader != nil {
254
for key, value := range p.groupLeader.ListEnv() {
259
for key, value := range p.parent.ListEnv() {
263
for key, value := range p.env {
271
func (p *process) SetEnv(name gen.EnvKey, value interface{}) {
283
func (p *process) Env(name gen.EnvKey) interface{} {
287
if value, ok := p.env[name]; ok {
291
if p.groupLeader != nil {
292
return p.groupLeader.Env(name)
299
func (p *process) Wait() {
306
func (p *process) WaitWithTimeout(d time.Duration) error {
311
timer := time.NewTimer(d)
316
return lib.ErrTimeout
317
case <-p.context.Done():
323
func (p *process) Link(with etf.Pid) error {
325
if p.behavior == nil {
327
return lib.ErrProcessTerminated
330
return p.RouteLink(p.self, with)
334
func (p *process) Unlink(with etf.Pid) error {
336
if p.behavior == nil {
338
return lib.ErrProcessTerminated
341
return p.RouteUnlink(p.self, with)
345
func (p *process) IsAlive() bool {
348
if p.behavior == nil {
351
return p.context.Err() == nil
355
func (p *process) NodeName() string {
356
return p.coreNodeName()
360
func (p *process) NodeStop() {
365
func (p *process) NodeUptime() int64 {
366
return p.coreUptime()
370
func (p *process) Children() ([]etf.Pid, error) {
371
c, err := p.Direct(gen.MessageDirectChildren{})
373
return []etf.Pid{}, err
375
children, correct := c.([]etf.Pid)
376
if correct == false {
377
return []etf.Pid{}, err
383
func (p *process) SetTrapExit(trap bool) {
388
func (p *process) TrapExit() bool {
393
func (p *process) SetCompression(enable bool) {
394
p.compression.Enable = enable
398
func (p *process) Compression() bool {
399
return p.compression.Enable
403
func (p *process) CompressionLevel() int {
404
return p.compression.Level
407
// SetCompressionLevel
408
func (p *process) SetCompressionLevel(level int) bool {
409
if level < 1 || level > 9 {
412
p.compression.Level = level
416
// CompressionThreshold
417
func (p *process) CompressionThreshold() int {
418
return p.compression.Threshold
421
// SetCompressionThreshold
422
func (p *process) SetCompressionThreshold(threshold int) bool {
423
if threshold < DefaultCompressionThreshold {
426
p.compression.Threshold = threshold
431
func (p *process) Behavior() gen.ProcessBehavior {
435
if p.behavior == nil {
442
func (p *process) Direct(request interface{}) (interface{}, error) {
443
return p.DirectWithTimeout(request, gen.DefaultCallTimeout)
447
func (p *process) DirectWithTimeout(request interface{}, timeout int) (interface{}, error) {
449
timeout = gen.DefaultCallTimeout
452
direct := gen.ProcessDirectMessage{
457
if err := p.PutSyncRequest(direct.Ref); err != nil {
463
case p.direct <- direct:
465
p.CancelSyncRequest(direct.Ref)
466
return nil, lib.ErrProcessBusy
469
return p.WaitSyncReply(direct.Ref, timeout)
472
func (p *process) RegisterEvent(event gen.Event, messages ...gen.EventMessage) error {
473
return p.registerEvent(p.self, event, messages)
476
func (p *process) UnregisterEvent(event gen.Event) error {
477
return p.unregisterEvent(p.self, event)
480
func (p *process) MonitorEvent(event gen.Event) error {
481
return p.monitorEvent(p.self, event)
484
func (p *process) DemonitorEvent(event gen.Event) error {
485
return p.demonitorEvent(p.self, event)
488
func (p *process) SendEventMessage(event gen.Event, message gen.EventMessage) error {
489
return p.sendEvent(p.self, event, message)
493
func (p *process) MonitorNode(name string) etf.Ref {
495
p.monitorNode(p.self, name, ref)
500
func (p *process) DemonitorNode(ref etf.Ref) bool {
501
return p.demonitorNode(ref)
505
func (p *process) MonitorProcess(process interface{}) etf.Ref {
507
switch mp := process.(type) {
509
p.RouteMonitor(p.self, mp, ref)
512
p.RouteMonitorReg(p.self, mp, ref)
515
p.RouteMonitorReg(p.self, gen.ProcessID{Name: mp, Node: string(p.self.Node)}, ref)
518
p.RouteMonitorReg(p.self, gen.ProcessID{Name: string(mp), Node: string(p.self.Node)}, ref)
522
// create fake gen.ProcessID. Monitor will send MessageDown with "noproc" as a reason
523
p.RouteMonitorReg(p.self, gen.ProcessID{Node: string(p.self.Node)}, ref)
528
func (p *process) DemonitorProcess(ref etf.Ref) bool {
529
if err := p.RouteDemonitor(p.self, ref); err != nil {
535
// RemoteSpawn makes request to spawn new process on a remote node
536
func (p *process) RemoteSpawn(node string, object string, opts gen.RemoteSpawnOptions, args ...etf.Term) (etf.Pid, error) {
537
return p.RemoteSpawnWithTimeout(gen.DefaultCallTimeout, node, object, opts, args...)
540
// RemoteSpawnWithTimeout makes request to spawn new process on a remote node with given timeout
541
func (p *process) RemoteSpawnWithTimeout(timeout int, node string, object string, opts gen.RemoteSpawnOptions, args ...etf.Term) (etf.Pid, error) {
543
p.PutSyncRequest(ref)
544
request := gen.RemoteSpawnRequest{
549
if err := p.RouteSpawnRequest(node, object, request, args...); err != nil {
550
p.CancelSyncRequest(ref)
551
return etf.Pid{}, err
554
reply, err := p.WaitSyncReply(ref, timeout)
556
return etf.Pid{}, err
559
// Result of the operation. If Result is a process identifier,
560
// the operation succeeded and the process identifier is the
561
// identifier of the newly created process. If Result is an atom,
562
// the operation failed and the atom identifies failure reason.
563
switch r := reply.(type) {
565
m := etf.Ref{} // empty reference
566
if opts.Monitor != m {
567
p.RouteMonitor(p.self, r, opts.Monitor)
570
p.RouteLink(p.self, r)
575
case lib.ErrTaken.Error():
576
return etf.Pid{}, lib.ErrTaken
577
case lib.ErrBehaviorUnknown.Error():
578
return etf.Pid{}, lib.ErrBehaviorUnknown
580
return etf.Pid{}, fmt.Errorf(string(r))
583
return etf.Pid{}, fmt.Errorf("unknown result: %#v", reply)
587
func (p *process) Spawn(name string, opts gen.ProcessOptions, behavior gen.ProcessBehavior, args ...etf.Term) (gen.Process, error) {
588
options := processOptions{
589
ProcessOptions: opts,
592
return p.spawn(name, options, behavior, args...)
596
func (p *process) PutSyncRequest(ref etf.Ref) error {
597
var preply map[etf.Ref]chan syncReplyMessage
603
return lib.ErrProcessTerminated
606
reply := syncReplyChannels.Get().(chan syncReplyMessage)
609
p.replyMutex.Unlock()
614
func (p *process) PutSyncReply(ref etf.Ref, reply etf.Term, err error) error {
615
var preply map[etf.Ref]chan syncReplyMessage
621
return lib.ErrProcessTerminated
625
rep, ok := preply[ref]
626
defer p.replyMutex.RUnlock()
629
// no process waiting for it
630
return lib.ErrReferenceUnknown
633
case rep <- syncReplyMessage{value: reply, err: err}:
639
func (p *process) CancelSyncRequest(ref etf.Ref) {
640
var preply map[etf.Ref]chan syncReplyMessage
651
p.replyMutex.Unlock()
655
func (p *process) WaitSyncReply(ref etf.Ref, timeout int) (etf.Term, error) {
656
var preply map[etf.Ref]chan syncReplyMessage
662
return nil, lib.ErrProcessTerminated
666
reply, wait_for_reply := preply[ref]
667
p.replyMutex.RUnlock()
669
if wait_for_reply == false {
670
return nil, fmt.Errorf("Unknown request")
673
defer func(ref etf.Ref) {
676
p.replyMutex.Unlock()
679
timer := lib.TakeTimer()
680
defer lib.ReleaseTimer(timer)
681
timer.Reset(time.Second * time.Duration(timeout))
686
// get back 'reply' struct to the pool
687
syncReplyChannels.Put(reply)
688
return m.value, m.err
690
return nil, lib.ErrTimeout
691
case <-p.context.Done():
692
return nil, lib.ErrProcessTerminated
699
func (p *process) ProcessChannels() gen.ProcessChannels {
700
return gen.ProcessChannels{
703
GracefulExit: p.gracefulExit,