wandb

Форк
0
/
writer.go 
156 строк · 3.7 Кб
1
package server
2

3
import (
4
	"context"
5
	"os"
6
	"sync"
7

8
	"github.com/wandb/wandb/core/pkg/observability"
9
	"github.com/wandb/wandb/core/pkg/service"
10
)
11

12
type WriterOption func(*Writer)
13

14
func WithWriterFwdChannel(fwd chan *service.Record) WriterOption {
15
	return func(w *Writer) {
16
		w.fwdChan = fwd
17
	}
18
}
19

20
func WithWriterSettings(settings *service.Settings) WriterOption {
21
	return func(w *Writer) {
22
		w.settings = settings
23
	}
24
}
25

26
// Writer is responsible for writing messages to the append-only log.
27
// It receives messages from the handler, processes them,
28
// if the message is to be persisted it writes them to the log.
29
// It also sends the messages to the sender.
30
type Writer struct {
31
	// ctx is the context for the writer
32
	ctx context.Context
33

34
	// settings is the settings for the writer
35
	settings *service.Settings
36

37
	// logger is the logger for the writer
38
	logger *observability.CoreLogger
39

40
	// fwdChan is the channel for forwarding messages to the sender
41
	fwdChan chan *service.Record
42

43
	// storeChan is the channel for messages to be stored
44
	storeChan chan *service.Record
45

46
	// store is the store for the writer
47
	store *Store
48

49
	// recordNum is the running count of stored records
50
	recordNum int64
51

52
	// wg is the wait group for the writer
53
	wg sync.WaitGroup
54
}
55

56
// NewWriter returns a new Writer
57
func NewWriter(ctx context.Context, logger *observability.CoreLogger, opts ...WriterOption) *Writer {
58
	w := &Writer{
59
		ctx:    ctx,
60
		logger: logger,
61
		wg:     sync.WaitGroup{},
62
	}
63
	for _, opt := range opts {
64
		opt(w)
65
	}
66
	return w
67
}
68

69
func (w *Writer) startStore() {
70
	if w.settings.GetXSync().GetValue() {
71
		// do not set up store if we are syncing an offline run
72
		return
73
	}
74

75
	w.storeChan = make(chan *service.Record, BufferSize*8)
76

77
	var err error
78
	w.store = NewStore(w.ctx, w.settings.GetSyncFile().GetValue(), w.logger)
79
	err = w.store.Open(os.O_WRONLY)
80
	if err != nil {
81
		w.logger.CaptureFatalAndPanic("writer: error creating store", err)
82
	}
83

84
	w.wg.Add(1)
85
	go func() {
86
		for record := range w.storeChan {
87
			if err = w.store.Write(record); err != nil {
88
				w.logger.Error("writer: error storing record", "error", err)
89
			}
90
		}
91

92
		if err = w.store.Close(); err != nil {
93
			w.logger.CaptureError("writer: error closing store", err)
94
		}
95
		w.wg.Done()
96
	}()
97
}
98

99
// do is the main loop of the writer to process incoming messages
100
func (w *Writer) Do(inChan <-chan *service.Record) {
101
	defer w.logger.Reraise()
102
	w.logger.Info("writer: started", "stream_id", w.settings.RunId)
103

104
	w.startStore()
105

106
	for record := range inChan {
107
		w.handleRecord(record)
108
	}
109
	w.Close()
110
	w.wg.Wait()
111
}
112

113
// Close closes the writer and all its resources
114
// which includes the store
115
func (w *Writer) Close() {
116
	close(w.fwdChan)
117
	if w.storeChan != nil {
118
		close(w.storeChan)
119
	}
120
	w.logger.Info("writer: closed", "stream_id", w.settings.RunId)
121
}
122

123
// handleRecord Writing messages to the append-only log,
124
// and passing them to the sender.
125
// We ensure that the messages are written to the log
126
// before they are sent to the server.
127
func (w *Writer) handleRecord(record *service.Record) {
128
	w.logger.Debug("write: got a message", "record", record, "stream_id", w.settings.RunId)
129
	switch record.RecordType.(type) {
130
	case *service.Record_Request:
131
		w.sendRecord(record)
132
	case nil:
133
		w.logger.Error("nil record type")
134
	default:
135
		w.sendRecord(record)
136
		w.storeRecord(record)
137
	}
138
}
139

140
// storeRecord stores the record in the append-only log
141
func (w *Writer) storeRecord(record *service.Record) {
142
	if record.GetControl().GetLocal() {
143
		return
144
	}
145
	w.recordNum += 1
146
	record.Num = w.recordNum
147
	w.storeChan <- record
148
}
149

150
func (w *Writer) sendRecord(record *service.Record) {
151
	// TODO: redo it so it only uses control
152
	if w.settings.GetXOffline().GetValue() && !record.GetControl().GetAlwaysSend() {
153
		return
154
	}
155
	w.fwdChan <- record
156
}
157

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.