wandb

Форк
0
/
stream.go 
289 строк · 8.1 Кб
1
package server
2

3
import (
4
	"context"
5
	"fmt"
6
	"io"
7
	"log/slog"
8
	"os"
9
	"path/filepath"
10
	"sync"
11

12
	"github.com/wandb/wandb/core/internal/settings"
13
	"github.com/wandb/wandb/core/internal/shared"
14
	"github.com/wandb/wandb/core/internal/version"
15
	"github.com/wandb/wandb/core/internal/watcher"
16
	"github.com/wandb/wandb/core/pkg/monitor"
17
	"github.com/wandb/wandb/core/pkg/observability"
18
	"github.com/wandb/wandb/core/pkg/service"
19
)
20

21
const (
22
	internalConnectionId = "internal"
23
)
24

25
// Stream is a collection of components that work together to handle incoming
26
// data for a W&B run, store it locally, and send it to a W&B server.
27
// Stream.handler receives incoming data from the client and dispatches it to
28
// Stream.writer, which writes it to a local file. Stream.writer then sends the
29
// data to Stream.sender, which sends it to the W&B server. Stream.dispatcher
30
// handles dispatching responses to the appropriate client responders.
31
type Stream struct {
32
	// ctx is the context for the stream
33
	ctx context.Context
34

35
	// cancel is the cancel function for the stream
36
	cancel context.CancelFunc
37

38
	// logger is the logger for the stream
39
	logger *observability.CoreLogger
40

41
	// wg is the WaitGroup for the stream
42
	wg sync.WaitGroup
43

44
	// settings is the settings for the stream
45
	settings *settings.Settings
46

47
	// handler is the handler for the stream
48
	handler *Handler
49

50
	// writer is the writer for the stream
51
	writer *Writer
52

53
	// sender is the sender for the stream
54
	sender *Sender
55

56
	// inChan is the channel for incoming messages
57
	inChan chan *service.Record
58

59
	// loopBackChan is the channel for internal loopback messages
60
	loopBackChan chan *service.Record
61

62
	// internal responses from teardown path typically
63
	outChan chan *service.ServerResponse
64

65
	// dispatcher is the dispatcher for the stream
66
	dispatcher *Dispatcher
67
}
68

69
func streamLogger(settings *settings.Settings) *observability.CoreLogger {
70
	// TODO: when we add session concept re-do this to use user provided path
71
	targetPath := filepath.Join(settings.LogDir, "debug-core.log")
72
	if path := defaultLoggerPath.Load(); path != nil {
73
		path := path.(string)
74
		// check path exists
75
		if _, err := os.Stat(path); !os.IsNotExist(err) {
76
			err := os.Symlink(path, targetPath)
77
			if err != nil {
78
				slog.Error("error creating symlink", "error", err)
79
			}
80
		}
81
	}
82

83
	var writers []io.Writer
84
	name := settings.InternalLogFile
85
	file, err := os.OpenFile(name, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0666)
86
	if err != nil {
87
		slog.Error(fmt.Sprintf("error opening log file: %s", err))
88
	} else {
89
		writers = append(writers, file)
90
	}
91
	writer := io.MultiWriter(writers...)
92

93
	// TODO: add a log level to the settings
94
	level := slog.LevelInfo
95
	if os.Getenv("WANDB_CORE_DEBUG") != "" {
96
		level = slog.LevelDebug
97
	}
98

99
	opts := &slog.HandlerOptions{
100
		Level: level,
101
		// AddSource: true,
102
	}
103

104
	logger := observability.NewCoreLogger(
105
		slog.New(slog.NewJSONHandler(writer, opts)),
106
		observability.WithTags(observability.Tags{}),
107
		observability.WithCaptureMessage(observability.CaptureMessage),
108
		observability.WithCaptureException(observability.CaptureException),
109
	)
110
	logger.Info("using version", "core version", version.Version)
111
	logger.Info("created symlink", "path", targetPath)
112
	tags := observability.Tags{
113
		"run_id":  settings.RunID,
114
		"run_url": settings.RunURL,
115
		"project": settings.Project,
116
		"entity":  settings.Entity,
117
	}
118
	logger.SetTags(tags)
119

120
	return logger
121
}
122

123
// NewStream creates a new stream with the given settings and responders.
124
func NewStream(ctx context.Context, settings *settings.Settings, streamId string) *Stream {
125
	ctx, cancel := context.WithCancel(ctx)
126
	s := &Stream{
127
		ctx:          ctx,
128
		cancel:       cancel,
129
		logger:       streamLogger(settings),
130
		wg:           sync.WaitGroup{},
131
		settings:     settings,
132
		inChan:       make(chan *service.Record, BufferSize),
133
		loopBackChan: make(chan *service.Record, BufferSize),
134
		outChan:      make(chan *service.ServerResponse, BufferSize),
135
	}
136

137
	watcher := watcher.New(watcher.WithLogger(s.logger))
138
	s.handler = NewHandler(s.ctx, s.logger,
139
		WithHandlerSettings(s.settings.Proto),
140
		WithHandlerFwdChannel(make(chan *service.Record, BufferSize)),
141
		WithHandlerOutChannel(make(chan *service.Result, BufferSize)),
142
		WithHandlerSystemMonitor(monitor.NewSystemMonitor(s.logger, s.settings.Proto, s.loopBackChan)),
143
		WithHandlerFileHandler(NewFilesHandler(watcher, s.logger, s.settings.Proto)),
144
		WithHandlerTBHandler(NewTBHandler(watcher, s.logger, s.settings.Proto, s.loopBackChan)),
145
		WithHandlerFilesInfoHandler(NewFilesInfoHandler()),
146
		WithHandlerSummaryHandler(NewSummaryHandler(s.logger)),
147
		WithHandlerMetricHandler(NewMetricHandler()),
148
		WithHandlerWatcher(watcher),
149
	)
150

151
	s.writer = NewWriter(s.ctx, s.logger,
152
		WithWriterSettings(s.settings.Proto),
153
		WithWriterFwdChannel(make(chan *service.Record, BufferSize)),
154
	)
155

156
	s.sender = NewSender(s.ctx, s.cancel, s.logger, s.settings.Proto,
157
		WithSenderFwdChannel(s.loopBackChan),
158
		WithSenderOutChannel(make(chan *service.Result, BufferSize)),
159
	)
160

161
	s.dispatcher = NewDispatcher(s.logger)
162

163
	s.logger.Info("created new stream", "id", s.settings.Proto.RunId)
164
	return s
165
}
166

167
// AddResponders adds the given responders to the stream's dispatcher.
168
func (s *Stream) AddResponders(entries ...ResponderEntry) {
169
	s.dispatcher.AddResponders(entries...)
170
}
171

172
// Start starts the stream's handler, writer, sender, and dispatcher.
173
// We use Stream's wait group to ensure that all of these components are cleanly
174
// finalized and closed when the stream is closed in Stream.Close().
175
func (s *Stream) Start() {
176

177
	// forward records from the inChan and loopBackChan to the handler
178
	fwdChan := make(chan *service.Record, BufferSize)
179
	s.wg.Add(1)
180
	go func() {
181
		wg := sync.WaitGroup{}
182
		for _, ch := range []chan *service.Record{s.inChan, s.loopBackChan} {
183
			wg.Add(1)
184
			go func(ch chan *service.Record) {
185
				for record := range ch {
186
					fwdChan <- record
187
				}
188
				wg.Done()
189
			}(ch)
190
		}
191
		wg.Wait()
192
		close(fwdChan)
193
		s.wg.Done()
194
	}()
195

196
	// handle the client requests with the handler
197
	s.wg.Add(1)
198
	go func() {
199
		s.handler.Do(fwdChan)
200
		s.wg.Done()
201
	}()
202

203
	// write the data to a transaction log
204
	s.wg.Add(1)
205
	go func() {
206
		s.writer.Do(s.handler.fwdChan)
207
		s.wg.Done()
208
	}()
209

210
	// send the data to the server
211
	s.wg.Add(1)
212
	go func() {
213
		s.sender.Do(s.writer.fwdChan)
214
		s.wg.Done()
215
	}()
216

217
	// handle dispatching between components
218
	s.wg.Add(1)
219
	go func() {
220
		wg := sync.WaitGroup{}
221
		for _, ch := range []chan *service.Result{s.handler.outChan, s.sender.outChan} {
222
			wg.Add(1)
223
			go func(ch chan *service.Result) {
224
				for result := range ch {
225
					s.dispatcher.handleRespond(result)
226
				}
227
				wg.Done()
228
			}(ch)
229
		}
230
		wg.Wait()
231
		close(s.outChan)
232
		s.wg.Done()
233
	}()
234
	s.logger.Debug("starting stream", "id", s.settings.Proto.RunId)
235
}
236

237
// HandleRecord handles the given record by sending it to the stream's handler.
238
func (s *Stream) HandleRecord(rec *service.Record) {
239
	s.logger.Debug("handling record", "record", rec)
240
	s.inChan <- rec
241
}
242

243
func (s *Stream) GetRun() *service.RunRecord {
244
	return s.handler.GetRun()
245
}
246

247
// Close Gracefully wait for handler, writer, sender, dispatcher to shut down cleanly
248
// assumes an exit record has already been sent
249
func (s *Stream) Close() {
250
	// wait for the context to be canceled in the defer state machine in the sender
251
	<-s.ctx.Done()
252
	close(s.loopBackChan)
253
	close(s.inChan)
254
	s.wg.Wait()
255
}
256

257
// Respond Handle internal responses like from the finish and close path
258
func (s *Stream) Respond(resp *service.ServerResponse) {
259
	s.outChan <- resp
260
}
261

262
func (s *Stream) FinishAndClose(exitCode int32) {
263
	s.AddResponders(ResponderEntry{s, internalConnectionId})
264

265
	if !s.settings.Proto.GetXSync().GetValue() {
266
		// send exit record to handler
267
		record := &service.Record{
268
			RecordType: &service.Record_Exit{
269
				Exit: &service.RunExitRecord{
270
					ExitCode: exitCode,
271
				}},
272
			Control: &service.Control{AlwaysSend: true, ConnectionId: internalConnectionId, ReqResp: true},
273
		}
274

275
		s.HandleRecord(record)
276
		// TODO(beta): process the response so we can formulate a more correct footer
277
		<-s.outChan
278
	}
279

280
	s.Close()
281

282
	s.PrintFooter()
283
	s.logger.Info("closed stream", "id", s.settings.Proto.RunId)
284
}
285

286
func (s *Stream) PrintFooter() {
287
	run := s.GetRun()
288
	shared.PrintHeadFoot(run, s.settings.Proto, true)
289
}
290

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.