12
"github.com/wandb/wandb/core/internal/settings"
13
"github.com/wandb/wandb/core/internal/shared"
14
"github.com/wandb/wandb/core/internal/version"
15
"github.com/wandb/wandb/core/internal/watcher"
16
"github.com/wandb/wandb/core/pkg/monitor"
17
"github.com/wandb/wandb/core/pkg/observability"
18
"github.com/wandb/wandb/core/pkg/service"
22
internalConnectionId = "internal"
36
cancel context.CancelFunc
39
logger *observability.CoreLogger
45
settings *settings.Settings
57
inChan chan *service.Record
60
loopBackChan chan *service.Record
63
outChan chan *service.ServerResponse
66
dispatcher *Dispatcher
69
func streamLogger(settings *settings.Settings) *observability.CoreLogger {
71
targetPath := filepath.Join(settings.LogDir, "debug-core.log")
72
if path := defaultLoggerPath.Load(); path != nil {
75
if _, err := os.Stat(path); !os.IsNotExist(err) {
76
err := os.Symlink(path, targetPath)
78
slog.Error("error creating symlink", "error", err)
83
var writers []io.Writer
84
name := settings.InternalLogFile
85
file, err := os.OpenFile(name, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0666)
87
slog.Error(fmt.Sprintf("error opening log file: %s", err))
89
writers = append(writers, file)
91
writer := io.MultiWriter(writers...)
94
level := slog.LevelInfo
95
if os.Getenv("WANDB_CORE_DEBUG") != "" {
96
level = slog.LevelDebug
99
opts := &slog.HandlerOptions{
104
logger := observability.NewCoreLogger(
105
slog.New(slog.NewJSONHandler(writer, opts)),
106
observability.WithTags(observability.Tags{}),
107
observability.WithCaptureMessage(observability.CaptureMessage),
108
observability.WithCaptureException(observability.CaptureException),
110
logger.Info("using version", "core version", version.Version)
111
logger.Info("created symlink", "path", targetPath)
112
tags := observability.Tags{
113
"run_id": settings.RunID,
114
"run_url": settings.RunURL,
115
"project": settings.Project,
116
"entity": settings.Entity,
124
func NewStream(ctx context.Context, settings *settings.Settings, streamId string) *Stream {
125
ctx, cancel := context.WithCancel(ctx)
129
logger: streamLogger(settings),
130
wg: sync.WaitGroup{},
132
inChan: make(chan *service.Record, BufferSize),
133
loopBackChan: make(chan *service.Record, BufferSize),
134
outChan: make(chan *service.ServerResponse, BufferSize),
137
watcher := watcher.New(watcher.WithLogger(s.logger))
138
s.handler = NewHandler(s.ctx, s.logger,
139
WithHandlerSettings(s.settings.Proto),
140
WithHandlerFwdChannel(make(chan *service.Record, BufferSize)),
141
WithHandlerOutChannel(make(chan *service.Result, BufferSize)),
142
WithHandlerSystemMonitor(monitor.NewSystemMonitor(s.logger, s.settings.Proto, s.loopBackChan)),
143
WithHandlerFileHandler(NewFilesHandler(watcher, s.logger, s.settings.Proto)),
144
WithHandlerTBHandler(NewTBHandler(watcher, s.logger, s.settings.Proto, s.loopBackChan)),
145
WithHandlerFilesInfoHandler(NewFilesInfoHandler()),
146
WithHandlerSummaryHandler(NewSummaryHandler(s.logger)),
147
WithHandlerMetricHandler(NewMetricHandler()),
148
WithHandlerWatcher(watcher),
151
s.writer = NewWriter(s.ctx, s.logger,
152
WithWriterSettings(s.settings.Proto),
153
WithWriterFwdChannel(make(chan *service.Record, BufferSize)),
156
s.sender = NewSender(s.ctx, s.cancel, s.logger, s.settings.Proto,
157
WithSenderFwdChannel(s.loopBackChan),
158
WithSenderOutChannel(make(chan *service.Result, BufferSize)),
161
s.dispatcher = NewDispatcher(s.logger)
163
s.logger.Info("created new stream", "id", s.settings.Proto.RunId)
168
func (s *Stream) AddResponders(entries ...ResponderEntry) {
169
s.dispatcher.AddResponders(entries...)
175
func (s *Stream) Start() {
178
fwdChan := make(chan *service.Record, BufferSize)
181
wg := sync.WaitGroup{}
182
for _, ch := range []chan *service.Record{s.inChan, s.loopBackChan} {
184
go func(ch chan *service.Record) {
185
for record := range ch {
199
s.handler.Do(fwdChan)
206
s.writer.Do(s.handler.fwdChan)
213
s.sender.Do(s.writer.fwdChan)
220
wg := sync.WaitGroup{}
221
for _, ch := range []chan *service.Result{s.handler.outChan, s.sender.outChan} {
223
go func(ch chan *service.Result) {
224
for result := range ch {
225
s.dispatcher.handleRespond(result)
234
s.logger.Debug("starting stream", "id", s.settings.Proto.RunId)
238
func (s *Stream) HandleRecord(rec *service.Record) {
239
s.logger.Debug("handling record", "record", rec)
243
func (s *Stream) GetRun() *service.RunRecord {
244
return s.handler.GetRun()
249
func (s *Stream) Close() {
252
close(s.loopBackChan)
258
func (s *Stream) Respond(resp *service.ServerResponse) {
262
func (s *Stream) FinishAndClose(exitCode int32) {
263
s.AddResponders(ResponderEntry{s, internalConnectionId})
265
if !s.settings.Proto.GetXSync().GetValue() {
267
record := &service.Record{
268
RecordType: &service.Record_Exit{
269
Exit: &service.RunExitRecord{
272
Control: &service.Control{AlwaysSend: true, ConnectionId: internalConnectionId, ReqResp: true},
275
s.HandleRecord(record)
283
s.logger.Info("closed stream", "id", s.settings.Proto.RunId)
286
func (s *Stream) PrintFooter() {
288
shared.PrintHeadFoot(run, s.settings.Proto, true)