8
"github.com/wandb/wandb/core/pkg/observability"
9
"github.com/wandb/wandb/core/pkg/service"
12
type WriterOption func(*Writer)
14
func WithWriterFwdChannel(fwd chan *service.Record) WriterOption {
15
return func(w *Writer) {
20
func WithWriterSettings(settings *service.Settings) WriterOption {
21
return func(w *Writer) {
26
// Writer is responsible for writing messages to the append-only log.
27
// It receives messages from the handler, processes them,
28
// if the message is to be persisted it writes them to the log.
29
// It also sends the messages to the sender.
31
// ctx is the context for the writer
34
// settings is the settings for the writer
35
settings *service.Settings
37
// logger is the logger for the writer
38
logger *observability.CoreLogger
40
// fwdChan is the channel for forwarding messages to the sender
41
fwdChan chan *service.Record
43
// storeChan is the channel for messages to be stored
44
storeChan chan *service.Record
46
// store is the store for the writer
49
// recordNum is the running count of stored records
52
// wg is the wait group for the writer
56
// NewWriter returns a new Writer
57
func NewWriter(ctx context.Context, logger *observability.CoreLogger, opts ...WriterOption) *Writer {
63
for _, opt := range opts {
69
func (w *Writer) startStore() {
70
if w.settings.GetXSync().GetValue() {
71
// do not set up store if we are syncing an offline run
75
w.storeChan = make(chan *service.Record, BufferSize*8)
78
w.store = NewStore(w.ctx, w.settings.GetSyncFile().GetValue(), w.logger)
79
err = w.store.Open(os.O_WRONLY)
81
w.logger.CaptureFatalAndPanic("writer: error creating store", err)
86
for record := range w.storeChan {
87
if err = w.store.Write(record); err != nil {
88
w.logger.Error("writer: error storing record", "error", err)
92
if err = w.store.Close(); err != nil {
93
w.logger.CaptureError("writer: error closing store", err)
99
// do is the main loop of the writer to process incoming messages
100
func (w *Writer) Do(inChan <-chan *service.Record) {
101
defer w.logger.Reraise()
102
w.logger.Info("writer: started", "stream_id", w.settings.RunId)
106
for record := range inChan {
107
w.handleRecord(record)
113
// Close closes the writer and all its resources
114
// which includes the store
115
func (w *Writer) Close() {
117
if w.storeChan != nil {
120
w.logger.Info("writer: closed", "stream_id", w.settings.RunId)
123
// handleRecord Writing messages to the append-only log,
124
// and passing them to the sender.
125
// We ensure that the messages are written to the log
126
// before they are sent to the server.
127
func (w *Writer) handleRecord(record *service.Record) {
128
w.logger.Debug("write: got a message", "record", record, "stream_id", w.settings.RunId)
129
switch record.RecordType.(type) {
130
case *service.Record_Request:
133
w.logger.Error("nil record type")
136
w.storeRecord(record)
140
// storeRecord stores the record in the append-only log
141
func (w *Writer) storeRecord(record *service.Record) {
142
if record.GetControl().GetLocal() {
146
record.Num = w.recordNum
147
w.storeChan <- record
150
func (w *Writer) sendRecord(record *service.Record) {
151
// TODO: redo it so it only uses control
152
if w.settings.GetXOffline().GetValue() && !record.GetControl().GetAlwaysSend() {