wandb

Форк
0
/
sender.go 
1289 строк · 38.8 Кб
1
package server
2

3
import (
4
	"context"
5
	"fmt"
6
	"io"
7
	"maps"
8
	"net/url"
9
	"os"
10
	"path/filepath"
11
	"strings"
12
	"sync"
13
	"time"
14

15
	"github.com/hashicorp/go-retryablehttp"
16
	"github.com/segmentio/encoding/json"
17

18
	"github.com/Khan/genqlient/graphql"
19
	"google.golang.org/protobuf/proto"
20
	"google.golang.org/protobuf/types/known/wrapperspb"
21

22
	"github.com/wandb/wandb/core/internal/api"
23
	"github.com/wandb/wandb/core/internal/clients"
24
	"github.com/wandb/wandb/core/internal/debounce"
25
	"github.com/wandb/wandb/core/internal/filetransfer"
26
	"github.com/wandb/wandb/core/internal/gql"
27
	"github.com/wandb/wandb/core/internal/runconfig"
28
	"github.com/wandb/wandb/core/internal/shared"
29
	"github.com/wandb/wandb/core/internal/version"
30
	"github.com/wandb/wandb/core/pkg/artifacts"
31
	fs "github.com/wandb/wandb/core/pkg/filestream"
32
	"github.com/wandb/wandb/core/pkg/launch"
33
	"github.com/wandb/wandb/core/pkg/observability"
34
	"github.com/wandb/wandb/core/pkg/service"
35
	"github.com/wandb/wandb/core/pkg/utils"
36
)
37

38
const (
39
	// RFC3339Micro Modified from time.RFC3339Nano
40
	RFC3339Micro             = "2006-01-02T15:04:05.000000Z07:00"
41
	configDebouncerRateLimit = 1 / 30.0 // todo: audit rate limit
42
	configDebouncerBurstSize = 1        // todo: audit burst size
43
)
44

45
type SenderOption func(*Sender)
46

47
func WithSenderFwdChannel(fwd chan *service.Record) SenderOption {
48
	return func(s *Sender) {
49
		s.fwdChan = fwd
50
	}
51
}
52

53
func WithSenderOutChannel(out chan *service.Result) SenderOption {
54
	return func(s *Sender) {
55
		s.outChan = out
56
	}
57
}
58

59
// Sender is the sender for a stream it handles the incoming messages and sends to the server
60
// or/and to the dispatcher/handler
61
type Sender struct {
62
	// ctx is the context for the handler
63
	ctx context.Context
64

65
	// cancel is the cancel function for the handler
66
	cancel context.CancelFunc
67

68
	// logger is the logger for the sender
69
	logger *observability.CoreLogger
70

71
	// settings is the settings for the sender
72
	settings *service.Settings
73

74
	// fwdChan is the channel for loopback messages (messages from the sender to the handler)
75
	fwdChan chan *service.Record
76

77
	// outChan is the channel for dispatcher messages
78
	outChan chan *service.Result
79

80
	// graphqlClient is the graphql client
81
	graphqlClient graphql.Client
82

83
	// fileStream is the file stream
84
	fileStream *fs.FileStream
85

86
	// filetransfer is the file uploader/downloader
87
	fileTransferManager *filetransfer.FileTransferManager
88

89
	// RunRecord is the run record
90
	// TODO: remove this and use properly updated settings
91
	//       + a flag indicating whether the run has started
92
	RunRecord *service.RunRecord
93

94
	// resumeState is the resume state
95
	resumeState *ResumeState
96

97
	telemetry *service.TelemetryRecord
98

99
	metricSender *MetricSender
100

101
	configDebouncer *debounce.Debouncer
102

103
	// Keep track of summary which is being updated incrementally
104
	summaryMap map[string]*service.SummaryItem
105

106
	// Keep track of config which is being updated incrementally
107
	runConfig *runconfig.RunConfig
108

109
	// Info about the (local) server we are talking to
110
	serverInfo *gql.ServerInfoServerInfo
111

112
	// Keep track of exit record to pass to file stream when the time comes
113
	exitRecord *service.Record
114

115
	syncService *SyncService
116

117
	store *Store
118

119
	jobBuilder *launch.JobBuilder
120

121
	wgFileTransfer sync.WaitGroup
122
}
123

124
// NewSender creates a new Sender with the given settings
125
func NewSender(
126
	ctx context.Context,
127
	cancel context.CancelFunc,
128
	logger *observability.CoreLogger,
129
	settings *service.Settings,
130
	opts ...SenderOption,
131
) *Sender {
132

133
	sender := &Sender{
134
		ctx:            ctx,
135
		cancel:         cancel,
136
		settings:       settings,
137
		logger:         logger,
138
		summaryMap:     make(map[string]*service.SummaryItem),
139
		runConfig:      runconfig.New(),
140
		telemetry:      &service.TelemetryRecord{CoreVersion: version.Version},
141
		wgFileTransfer: sync.WaitGroup{},
142
	}
143
	if !settings.GetXOffline().GetValue() {
144
		baseURL, err := url.Parse(settings.GetBaseUrl().GetValue())
145
		if err != nil {
146
			logger.CaptureFatalAndPanic("sender: failed to parse base URL", err)
147
		}
148
		backend := api.New(api.BackendOptions{
149
			BaseURL: baseURL,
150
			Logger:  logger.Logger,
151
			APIKey:  settings.GetApiKey().GetValue(),
152
		})
153

154
		graphqlHeaders := map[string]string{
155
			"X-WANDB-USERNAME":   settings.GetUsername().GetValue(),
156
			"X-WANDB-USER-EMAIL": settings.GetEmail().GetValue(),
157
		}
158
		maps.Copy(graphqlHeaders, settings.GetXExtraHttpHeaders().GetValue())
159

160
		graphqlClient := backend.NewClient(api.ClientOptions{
161
			RetryPolicy:     clients.CheckRetry,
162
			RetryMax:        int(settings.GetXGraphqlRetryMax().GetValue()),
163
			RetryWaitMin:    clients.SecondsToDuration(settings.GetXGraphqlRetryWaitMinSeconds().GetValue()),
164
			RetryWaitMax:    clients.SecondsToDuration(settings.GetXGraphqlRetryWaitMaxSeconds().GetValue()),
165
			NonRetryTimeout: clients.SecondsToDuration(settings.GetXGraphqlTimeoutSeconds().GetValue()),
166
			ExtraHeaders:    graphqlHeaders,
167
		})
168
		url := fmt.Sprintf("%s/graphql", settings.GetBaseUrl().GetValue())
169
		sender.graphqlClient = graphql.NewClient(url, graphqlClient)
170

171
		fileStreamHeaders := map[string]string{}
172
		if settings.GetXShared().GetValue() {
173
			fileStreamHeaders["X-WANDB-USE-ASYNC-FILESTREAM"] = "true"
174
		}
175

176
		fileStreamRetryClient := backend.NewClient(api.ClientOptions{
177
			RetryMax:        int(settings.GetXFileStreamRetryMax().GetValue()),
178
			RetryWaitMin:    clients.SecondsToDuration(settings.GetXFileStreamRetryWaitMinSeconds().GetValue()),
179
			RetryWaitMax:    clients.SecondsToDuration(settings.GetXFileStreamRetryWaitMaxSeconds().GetValue()),
180
			NonRetryTimeout: clients.SecondsToDuration(settings.GetXFileStreamTimeoutSeconds().GetValue()),
181
			ExtraHeaders:    fileStreamHeaders,
182
		})
183

184
		sender.fileStream = fs.NewFileStream(
185
			fs.WithSettings(settings),
186
			fs.WithLogger(logger),
187
			fs.WithAPIClient(fileStreamRetryClient),
188
			fs.WithClientId(shared.ShortID(32)),
189
		)
190

191
		fileTransferRetryClient := retryablehttp.NewClient()
192
		fileTransferRetryClient.Logger = logger
193
		fileTransferRetryClient.CheckRetry = clients.CheckRetry
194
		fileTransferRetryClient.RetryMax = int(settings.GetXFileTransferRetryMax().GetValue())
195
		fileTransferRetryClient.RetryWaitMin = clients.SecondsToDuration(settings.GetXFileTransferRetryWaitMinSeconds().GetValue())
196
		fileTransferRetryClient.RetryWaitMax = clients.SecondsToDuration(settings.GetXFileTransferRetryWaitMaxSeconds().GetValue())
197
		fileTransferRetryClient.HTTPClient.Timeout = clients.SecondsToDuration(settings.GetXFileTransferTimeoutSeconds().GetValue())
198
		fileTransferRetryClient.Backoff = clients.ExponentialBackoffWithJitter
199

200
		defaultFileTransfer := filetransfer.NewDefaultFileTransfer(
201
			logger,
202
			fileTransferRetryClient,
203
		)
204
		sender.fileTransferManager = filetransfer.NewFileTransferManager(
205
			filetransfer.WithLogger(logger),
206
			filetransfer.WithSettings(settings),
207
			filetransfer.WithFileTransfer(defaultFileTransfer),
208
			filetransfer.WithFSCChan(sender.fileStream.GetInputChan()),
209
		)
210

211
		sender.getServerInfo()
212

213
		if !settings.GetDisableJobCreation().GetValue() {
214
			sender.jobBuilder = launch.NewJobBuilder(settings, logger)
215
		}
216
	}
217
	sender.configDebouncer = debounce.NewDebouncer(
218
		configDebouncerRateLimit,
219
		configDebouncerBurstSize,
220
		logger,
221
	)
222

223
	for _, opt := range opts {
224
		opt(sender)
225
	}
226

227
	return sender
228
}
229

230
// do sending of messages to the server
231
func (s *Sender) Do(inChan <-chan *service.Record) {
232
	defer s.logger.Reraise()
233
	s.logger.Info("sender: started", "stream_id", s.settings.RunId)
234

235
	for record := range inChan {
236
		s.sendRecord(record)
237
		// TODO: reevaluate the logic here
238
		s.configDebouncer.Debounce(s.upsertConfig)
239
	}
240
	s.Close()
241
	s.logger.Info("sender: closed", "stream_id", s.settings.RunId)
242
}
243

244
func (s *Sender) Close() {
245
	// sender is done processing data, close our dispatch channel
246
	close(s.outChan)
247
}
248

249
func (s *Sender) GetOutboundChannel() chan *service.Result {
250
	return s.outChan
251
}
252

253
func (s *Sender) SetGraphqlClient(client graphql.Client) {
254
	s.graphqlClient = client
255
}
256

257
func (s *Sender) SendRecord(record *service.Record) {
258
	// this is for testing purposes only yet
259
	s.sendRecord(record)
260
}
261

262
// sendRecord sends a record
263
func (s *Sender) sendRecord(record *service.Record) {
264
	s.logger.Debug("sender: sendRecord", "record", record, "stream_id", s.settings.RunId)
265
	switch x := record.RecordType.(type) {
266
	case *service.Record_Run:
267
		s.sendRun(record, x.Run)
268
	case *service.Record_Footer:
269
	case *service.Record_Header:
270
	case *service.Record_Final:
271
	case *service.Record_Exit:
272
		s.sendExit(record, x.Exit)
273
	case *service.Record_Alert:
274
		s.sendAlert(record, x.Alert)
275
	case *service.Record_Metric:
276
		s.sendMetric(record, x.Metric)
277
	case *service.Record_Files:
278
		s.sendFiles(record, x.Files)
279
	case *service.Record_History:
280
		s.sendHistory(record, x.History)
281
	case *service.Record_Summary:
282
		s.sendSummary(record, x.Summary)
283
	case *service.Record_Config:
284
		s.sendConfig(record, x.Config)
285
	case *service.Record_Stats:
286
		s.sendSystemMetrics(record, x.Stats)
287
	case *service.Record_OutputRaw:
288
		s.sendOutputRaw(record, x.OutputRaw)
289
	case *service.Record_Telemetry:
290
		s.sendTelemetry(record, x.Telemetry)
291
	case *service.Record_Preempting:
292
		s.sendPreempting(record)
293
	case *service.Record_Request:
294
		s.sendRequest(record, x.Request)
295
	case *service.Record_LinkArtifact:
296
		s.sendLinkArtifact(record)
297
	case *service.Record_UseArtifact:
298
		s.sendUseArtifact(record)
299
	case *service.Record_Artifact:
300
		s.sendArtifact(record, x.Artifact)
301
	case nil:
302
		err := fmt.Errorf("sender: sendRecord: nil RecordType")
303
		s.logger.CaptureFatalAndPanic("sender: sendRecord: nil RecordType", err)
304
	default:
305
		err := fmt.Errorf("sender: sendRecord: unexpected type %T", x)
306
		s.logger.CaptureFatalAndPanic("sender: sendRecord: unexpected type", err)
307
	}
308
}
309

310
// sendRequest sends a request
311
func (s *Sender) sendRequest(record *service.Record, request *service.Request) {
312

313
	switch x := request.RequestType.(type) {
314
	case *service.Request_RunStart:
315
		s.sendRunStart(x.RunStart)
316
	case *service.Request_NetworkStatus:
317
		s.sendNetworkStatusRequest(x.NetworkStatus)
318
	case *service.Request_Defer:
319
		s.sendDefer(x.Defer)
320
	case *service.Request_LogArtifact:
321
		s.sendLogArtifact(record, x.LogArtifact)
322
	case *service.Request_PollExit:
323
	case *service.Request_ServerInfo:
324
		s.sendServerInfo(record, x.ServerInfo)
325
	case *service.Request_DownloadArtifact:
326
		s.sendDownloadArtifact(record, x.DownloadArtifact)
327
	case *service.Request_Sync:
328
		s.sendSync(record, x.Sync)
329
	case *service.Request_SenderRead:
330
		s.sendSenderRead(record, x.SenderRead)
331
	case *service.Request_Cancel:
332
		// TODO: audit this
333
	case nil:
334
		err := fmt.Errorf("sender: sendRequest: nil RequestType")
335
		s.logger.CaptureFatalAndPanic("sender: sendRequest: nil RequestType", err)
336
	default:
337
		err := fmt.Errorf("sender: sendRequest: unexpected type %T", x)
338
		s.logger.CaptureFatalAndPanic("sender: sendRequest: unexpected type", err)
339
	}
340
}
341

342
// updateSettings updates the settings from the run record upon a run start
343
// with the information from the server
344
func (s *Sender) updateSettings() {
345
	if s.settings == nil || s.RunRecord == nil {
346
		return
347
	}
348

349
	if s.settings.XStartTime == nil && s.RunRecord.StartTime != nil {
350
		startTime := float64(s.RunRecord.StartTime.Seconds) + float64(s.RunRecord.StartTime.Nanos)/1e9
351
		s.settings.XStartTime = &wrapperspb.DoubleValue{Value: startTime}
352
	}
353

354
	// TODO: verify that this is the correct update logic
355
	if s.RunRecord.GetEntity() != "" {
356
		s.settings.Entity = &wrapperspb.StringValue{Value: s.RunRecord.Entity}
357
	}
358
	if s.RunRecord.GetProject() != "" && s.settings.Project == nil {
359
		s.settings.Project = &wrapperspb.StringValue{Value: s.RunRecord.Project}
360
	}
361
	if s.RunRecord.GetDisplayName() != "" && s.settings.RunName == nil {
362
		s.settings.RunName = &wrapperspb.StringValue{Value: s.RunRecord.DisplayName}
363
	}
364
}
365

366
// sendRun starts up all the resources for a run
367
func (s *Sender) sendRunStart(_ *service.RunStartRequest) {
368
	fsPath := fmt.Sprintf(
369
		"files/%s/%s/%s/file_stream",
370
		s.RunRecord.Entity,
371
		s.RunRecord.Project,
372
		s.RunRecord.RunId,
373
	)
374

375
	fs.WithPath(fsPath)(s.fileStream)
376
	fs.WithOffsets(s.resumeState.GetFileStreamOffset())(s.fileStream)
377

378
	s.updateSettings()
379
	s.fileStream.Start()
380
	s.fileTransferManager.Start()
381
}
382

383
func (s *Sender) sendNetworkStatusRequest(_ *service.NetworkStatusRequest) {
384
}
385

386
func (s *Sender) sendJobFlush() {
387
	if s.jobBuilder == nil {
388
		return
389
	}
390
	input := s.runConfig.Tree()
391
	output := make(map[string]interface{})
392

393
	var out interface{}
394
	for k, v := range s.summaryMap {
395
		bytes := []byte(v.GetValueJson())
396
		err := json.Unmarshal(bytes, &out)
397
		if err != nil {
398
			s.logger.Error("sender: sendDefer: failed to unmarshal summary", "error", err)
399
			return
400
		}
401
		output[k] = out
402
	}
403

404
	artifact, err := s.jobBuilder.Build(input, output)
405
	if err != nil {
406
		s.logger.Error("sender: sendDefer: failed to build job artifact", "error", err)
407
		return
408
	}
409
	if artifact == nil {
410
		s.logger.Info("sender: sendDefer: no job artifact to save")
411
		return
412
	}
413
	saver := artifacts.NewArtifactSaver(
414
		s.ctx, s.graphqlClient, s.fileTransferManager, artifact, 0, "",
415
	)
416
	if _, err = saver.Save(s.fwdChan); err != nil {
417
		s.logger.Error("sender: sendDefer: failed to save job artifact", "error", err)
418
	}
419
}
420

421
func (s *Sender) sendDefer(request *service.DeferRequest) {
422
	switch request.State {
423
	case service.DeferRequest_BEGIN:
424
		request.State++
425
		s.sendRequestDefer(request)
426
	case service.DeferRequest_FLUSH_RUN:
427
		request.State++
428
		s.sendRequestDefer(request)
429
	case service.DeferRequest_FLUSH_STATS:
430
		request.State++
431
		s.sendRequestDefer(request)
432
	case service.DeferRequest_FLUSH_PARTIAL_HISTORY:
433
		request.State++
434
		s.sendRequestDefer(request)
435
	case service.DeferRequest_FLUSH_TB:
436
		request.State++
437
		s.sendRequestDefer(request)
438
	case service.DeferRequest_FLUSH_SUM:
439
		request.State++
440
		s.sendRequestDefer(request)
441
	case service.DeferRequest_FLUSH_DEBOUNCER:
442
		s.configDebouncer.Flush(s.upsertConfig)
443
		s.writeAndSendConfigFile()
444
		request.State++
445
		s.sendRequestDefer(request)
446
	case service.DeferRequest_FLUSH_OUTPUT:
447
		request.State++
448
		s.sendRequestDefer(request)
449
	case service.DeferRequest_FLUSH_JOB:
450
		s.sendJobFlush()
451
		request.State++
452
		s.sendRequestDefer(request)
453
	case service.DeferRequest_FLUSH_DIR:
454
		request.State++
455
		s.sendRequestDefer(request)
456
	case service.DeferRequest_FLUSH_FP:
457
		s.wgFileTransfer.Wait()
458
		s.fileTransferManager.Close()
459
		request.State++
460
		s.sendRequestDefer(request)
461
	case service.DeferRequest_JOIN_FP:
462
		request.State++
463
		s.sendRequestDefer(request)
464
	case service.DeferRequest_FLUSH_FS:
465
		s.fileStream.Close()
466
		request.State++
467
		s.sendRequestDefer(request)
468
	case service.DeferRequest_FLUSH_FINAL:
469
		request.State++
470
		s.sendRequestDefer(request)
471
	case service.DeferRequest_END:
472
		request.State++
473
		s.syncService.Flush()
474
		s.respondExit(s.exitRecord)
475
		// cancel tells the stream to close the loopback channel
476
		s.cancel()
477
	default:
478
		err := fmt.Errorf("sender: sendDefer: unexpected state %v", request.State)
479
		s.logger.CaptureFatalAndPanic("sender: sendDefer: unexpected state", err)
480
	}
481
}
482

483
func (s *Sender) sendRequestDefer(request *service.DeferRequest) {
484
	rec := &service.Record{
485
		RecordType: &service.Record_Request{Request: &service.Request{
486
			RequestType: &service.Request_Defer{Defer: request},
487
		}},
488
		Control: &service.Control{AlwaysSend: true},
489
	}
490
	s.fwdChan <- rec
491
}
492

493
func (s *Sender) sendTelemetry(_ *service.Record, telemetry *service.TelemetryRecord) {
494
	proto.Merge(s.telemetry, telemetry)
495
	s.updateConfigPrivate()
496
	// TODO(perf): improve when debounce config is added, for now this sends all the time
497
	s.sendConfig(nil, nil /*configRecord*/)
498
}
499

500
func (s *Sender) sendPreempting(record *service.Record) {
501
	s.fileStream.StreamRecord(record)
502
}
503

504
func (s *Sender) sendLinkArtifact(record *service.Record) {
505
	linker := artifacts.ArtifactLinker{
506
		Ctx:           s.ctx,
507
		Logger:        s.logger,
508
		LinkArtifact:  record.GetLinkArtifact(),
509
		GraphqlClient: s.graphqlClient,
510
	}
511
	err := linker.Link()
512
	if err != nil {
513
		s.logger.CaptureFatalAndPanic("sender: sendLinkArtifact: link failure", err)
514
	}
515

516
	result := &service.Result{
517
		Control: record.Control,
518
		Uuid:    record.Uuid,
519
	}
520
	s.outChan <- result
521
}
522

523
func (s *Sender) sendUseArtifact(record *service.Record) {
524
	if s.jobBuilder == nil {
525
		s.logger.Warn("sender: sendUseArtifact: job builder disabled, skipping")
526
		return
527
	}
528
	s.jobBuilder.HandleUseArtifactRecord(record)
529
}
530

531
// Applies the change record to the run configuration.
532
func (s *Sender) updateConfig(configRecord *service.ConfigRecord) {
533
	s.runConfig.ApplyChangeRecord(configRecord, func(err error) {
534
		s.logger.CaptureError("Error updating run config", err)
535
	})
536
}
537

538
// Inserts W&B-internal information into the run configuration.
539
//
540
// Uses the given telemetry
541
func (s *Sender) updateConfigPrivate() {
542
	metrics := []map[int]interface{}(nil)
543
	if s.metricSender != nil {
544
		metrics = s.metricSender.configMetrics
545
	}
546

547
	s.runConfig.AddTelemetryAndMetrics(s.telemetry, metrics)
548
}
549

550
// Serializes the run configuration to send to the backend.
551
func (s *Sender) serializeConfig(format runconfig.ConfigFormat) string {
552
	serializedConfig, err := s.runConfig.Serialize(format)
553

554
	if err != nil {
555
		err = fmt.Errorf("failed to marshal config: %s", err)
556
		s.logger.CaptureFatalAndPanic("sender: sendRun: ", err)
557
	}
558

559
	return string(serializedConfig)
560
}
561

562
func (s *Sender) sendRunResult(record *service.Record, runResult *service.RunUpdateResult) {
563
	result := &service.Result{
564
		ResultType: &service.Result_RunResult{
565
			RunResult: runResult,
566
		},
567
		Control: record.Control,
568
		Uuid:    record.Uuid,
569
	}
570
	s.outChan <- result
571

572
}
573

574
func (s *Sender) checkAndUpdateResumeState(record *service.Record) error {
575
	if s.graphqlClient == nil {
576
		return nil
577
	}
578
	// There was no resume status set, so we don't need to do anything
579
	if s.settings.GetResume().GetValue() == "" {
580
		return nil
581
	}
582

583
	// init resume state if it doesn't exist
584
	s.resumeState = NewResumeState(s.logger, s.settings.GetResume().GetValue())
585
	run := s.RunRecord
586
	// If we couldn't get the resume status, we should fail if resume is set
587
	data, err := gql.RunResumeStatus(s.ctx, s.graphqlClient, &run.Project, utils.NilIfZero(run.Entity), run.RunId)
588
	if err != nil {
589
		err = fmt.Errorf("failed to get run resume status: %s", err)
590
		s.logger.Error("sender:", "error", err)
591
		result := &service.RunUpdateResult{
592
			Error: &service.ErrorInfo{
593
				Message: err.Error(),
594
				Code:    service.ErrorInfo_COMMUNICATION,
595
			}}
596
		s.sendRunResult(record, result)
597
		return err
598
	}
599

600
	if result, err := s.resumeState.Update(
601
		data,
602
		s.RunRecord,
603
		s.runConfig,
604
	); err != nil {
605
		s.sendRunResult(record, result)
606
		return err
607
	}
608

609
	return nil
610
}
611

612
func (s *Sender) sendRun(record *service.Record, run *service.RunRecord) {
613
	if s.graphqlClient != nil {
614
		// The first run record sent by the client is encoded incorrectly,
615
		// causing it to overwrite the entire "_wandb" config key rather than
616
		// just the necessary part ("_wandb/code_path"). This can overwrite
617
		// the config from a resumed run, so we have to do this first.
618
		//
619
		// Logically, it would make more sense to instead start with the
620
		// resumed config and apply updates on top of it.
621
		s.updateConfig(run.Config)
622
		proto.Merge(s.telemetry, run.Telemetry)
623
		s.updateConfigPrivate()
624

625
		if s.RunRecord == nil {
626
			var ok bool
627
			s.RunRecord, ok = proto.Clone(run).(*service.RunRecord)
628
			if !ok {
629
				err := fmt.Errorf("failed to clone RunRecord")
630
				s.logger.CaptureFatalAndPanic("sender: sendRun: ", err)
631
			}
632

633
			if err := s.checkAndUpdateResumeState(record); err != nil {
634
				s.logger.Error("sender: sendRun: failed to checkAndUpdateResumeState", "error", err)
635
				return
636
			}
637
		}
638

639
		config := s.serializeConfig(runconfig.FormatJson)
640

641
		var tags []string
642
		tags = append(tags, run.Tags...)
643

644
		var commit, repo string
645
		git := run.GetGit()
646
		if git != nil {
647
			commit = git.GetCommit()
648
			repo = git.GetRemoteUrl()
649
		}
650

651
		program := s.settings.GetProgram().GetValue()
652
		// start a new context with an additional argument from the parent context
653
		// this is used to pass the retry function to the graphql client
654
		ctx := context.WithValue(s.ctx, clients.CtxRetryPolicyKey, clients.UpsertBucketRetryPolicy)
655
		data, err := gql.UpsertBucket(
656
			ctx,                              // ctx
657
			s.graphqlClient,                  // client
658
			nil,                              // id
659
			&run.RunId,                       // name
660
			utils.NilIfZero(run.Project),     // project
661
			utils.NilIfZero(run.Entity),      // entity
662
			utils.NilIfZero(run.RunGroup),    // groupName
663
			nil,                              // description
664
			utils.NilIfZero(run.DisplayName), // displayName
665
			utils.NilIfZero(run.Notes),       // notes
666
			utils.NilIfZero(commit),          // commit
667
			&config,                          // config
668
			utils.NilIfZero(run.Host),        // host
669
			nil,                              // debug
670
			utils.NilIfZero(program),         // program
671
			utils.NilIfZero(repo),            // repo
672
			utils.NilIfZero(run.JobType),     // jobType
673
			nil,                              // state
674
			utils.NilIfZero(run.SweepId),     // sweep
675
			tags,                             // tags []string,
676
			nil,                              // summaryMetrics
677
		)
678
		if err != nil {
679
			err = fmt.Errorf("failed to upsert bucket: %s", err)
680
			s.logger.Error("sender: sendRun:", "error", err)
681
			// TODO(run update): handle error communication back to the client
682
			fmt.Println("ERROR: failed to upsert bucket", err.Error())
683
			// TODO(sync): make this more robust in case of a failed UpsertBucket request.
684
			//  Need to inform the sync service that this ops failed.
685
			if record.GetControl().GetReqResp() || record.GetControl().GetMailboxSlot() != "" {
686
				result := &service.Result{
687
					ResultType: &service.Result_RunResult{
688
						RunResult: &service.RunUpdateResult{
689
							Error: &service.ErrorInfo{
690
								Message: err.Error(),
691
								Code:    service.ErrorInfo_COMMUNICATION,
692
							},
693
						},
694
					},
695
					Control: record.Control,
696
					Uuid:    record.Uuid,
697
				}
698
				s.outChan <- result
699
			}
700
			return
701
		}
702

703
		bucket := data.GetUpsertBucket().GetBucket()
704
		project := bucket.GetProject()
705
		entity := project.GetEntity()
706
		s.RunRecord.StorageId = bucket.GetId()
707
		// s.RunRecord.RunId = bucket.GetName()
708
		s.RunRecord.DisplayName = utils.ZeroIfNil(bucket.GetDisplayName())
709
		s.RunRecord.Project = project.GetName()
710
		s.RunRecord.Entity = entity.GetName()
711
		s.RunRecord.SweepId = utils.ZeroIfNil(bucket.GetSweepName())
712
	}
713

714
	if record.GetControl().GetReqResp() || record.GetControl().GetMailboxSlot() != "" {
715
		runResult := s.RunRecord
716
		if runResult == nil {
717
			runResult = run
718
		}
719
		result := &service.Result{
720
			ResultType: &service.Result_RunResult{
721
				RunResult: &service.RunUpdateResult{Run: runResult},
722
			},
723
			Control: record.Control,
724
			Uuid:    record.Uuid,
725
		}
726
		s.outChan <- result
727
	}
728
}
729

730
// sendHistory sends a history record to the file stream,
731
// which will then send it to the server
732
func (s *Sender) sendHistory(record *service.Record, _ *service.HistoryRecord) {
733
	s.fileStream.StreamRecord(record)
734
}
735

736
func (s *Sender) sendSummary(_ *service.Record, summary *service.SummaryRecord) {
737
	// TODO(network): buffer summary sending for network efficiency until we can send only updates
738
	// TODO(compat): handle deletes, nested keys
739
	// TODO(compat): write summary file
740

741
	// track each key in the in memory summary store
742
	// TODO(memory): avoid keeping summary for all distinct keys
743
	for _, item := range summary.Update {
744
		s.summaryMap[item.Key] = item
745
	}
746

747
	// build list of summary items from the map
748
	var summaryItems []*service.SummaryItem
749
	for _, v := range s.summaryMap {
750
		summaryItems = append(summaryItems, v)
751
	}
752

753
	// build a full summary record to send
754
	record := &service.Record{
755
		RecordType: &service.Record_Summary{
756
			Summary: &service.SummaryRecord{
757
				Update: summaryItems,
758
			},
759
		},
760
	}
761

762
	s.fileStream.StreamRecord(record)
763
}
764

765
func (s *Sender) upsertConfig() {
766
	if s.graphqlClient == nil {
767
		return
768
	}
769
	config := s.serializeConfig(runconfig.FormatJson)
770

771
	ctx := context.WithValue(s.ctx, clients.CtxRetryPolicyKey, clients.UpsertBucketRetryPolicy)
772
	_, err := gql.UpsertBucket(
773
		ctx,                                  // ctx
774
		s.graphqlClient,                      // client
775
		nil,                                  // id
776
		&s.RunRecord.RunId,                   // name
777
		utils.NilIfZero(s.RunRecord.Project), // project
778
		utils.NilIfZero(s.RunRecord.Entity),  // entity
779
		nil,                                  // groupName
780
		nil,                                  // description
781
		nil,                                  // displayName
782
		nil,                                  // notes
783
		nil,                                  // commit
784
		&config,                              // config
785
		nil,                                  // host
786
		nil,                                  // debug
787
		nil,                                  // program
788
		nil,                                  // repo
789
		nil,                                  // jobType
790
		nil,                                  // state
791
		nil,                                  // sweep
792
		nil,                                  // tags []string,
793
		nil,                                  // summaryMetrics
794
	)
795
	if err != nil {
796
		s.logger.Error("sender: sendConfig:", "error", err)
797
	}
798
}
799

800
func (s *Sender) writeAndSendConfigFile() {
801
	if s.settings.GetXSync().GetValue() {
802
		// if sync is enabled, we don't need to do all this
803
		return
804
	}
805

806
	config := s.serializeConfig(runconfig.FormatYaml)
807
	configFile := filepath.Join(s.settings.GetFilesDir().GetValue(), ConfigFileName)
808
	if err := os.WriteFile(configFile, []byte(config), 0644); err != nil {
809
		s.logger.Error("sender: writeAndSendConfigFile: failed to write config file", "error", err)
810
	}
811

812
	record := &service.Record{
813
		RecordType: &service.Record_Files{
814
			Files: &service.FilesRecord{
815
				Files: []*service.FilesItem{
816
					{
817
						Path: ConfigFileName,
818
						Type: service.FilesItem_WANDB,
819
					},
820
				},
821
			},
822
		},
823
	}
824
	s.fwdChan <- record
825
}
826

827
// sendConfig sends a config record to the server via an upsertBucket mutation
828
// and updates the in memory config
829
func (s *Sender) sendConfig(_ *service.Record, configRecord *service.ConfigRecord) {
830
	if configRecord != nil {
831
		s.updateConfig(configRecord)
832
	}
833
	s.configDebouncer.SetNeedsDebounce()
834
}
835

836
// sendSystemMetrics sends a system metrics record via the file stream
837
func (s *Sender) sendSystemMetrics(record *service.Record, _ *service.StatsRecord) {
838
	s.fileStream.StreamRecord(record)
839
}
840

841
func (s *Sender) sendOutputRaw(record *service.Record, _ *service.OutputRawRecord) {
842
	// TODO: match logic handling of lines to the one in the python version
843
	// - handle carriage returns (for tqdm-like progress bars)
844
	// - handle caching multiple (non-new lines) and sending them in one chunk
845
	// - handle lines longer than ~60_000 characters
846

847
	// copy the record to avoid mutating the original
848
	recordCopy := proto.Clone(record).(*service.Record)
849
	outputRaw := recordCopy.GetOutputRaw()
850

851
	// ignore empty "new lines"
852
	if outputRaw.Line == "\n" {
853
		return
854
	}
855

856
	outputFile := filepath.Join(s.settings.GetFilesDir().GetValue(), OutputFileName)
857
	// append line to file
858
	f, err := os.OpenFile(outputFile, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644)
859
	if err != nil {
860
		s.logger.Error("sender: sendOutputRaw: failed to open output file", "error", err)
861
	}
862
	if _, err := f.WriteString(outputRaw.Line + "\n"); err != nil {
863
		s.logger.Error("sender: sendOutputRaw: failed to write to output file", "error", err)
864
	}
865
	defer func() {
866
		if err := f.Close(); err != nil {
867
			s.logger.Error("sender: sendOutputRaw: failed to close output file", "error", err)
868
		}
869
	}()
870

871
	// generate compatible timestamp to python iso-format (microseconds without Z)
872
	t := strings.TrimSuffix(time.Now().UTC().Format(RFC3339Micro), "Z")
873
	outputRaw.Line = fmt.Sprintf("%s %s", t, outputRaw.Line)
874
	if outputRaw.OutputType == service.OutputRawRecord_STDERR {
875
		outputRaw.Line = fmt.Sprintf("ERROR %s", outputRaw.Line)
876
	}
877
	s.fileStream.StreamRecord(recordCopy)
878
}
879

880
func (s *Sender) sendAlert(_ *service.Record, alert *service.AlertRecord) {
881
	if s.graphqlClient == nil {
882
		return
883
	}
884

885
	if s.RunRecord == nil {
886
		err := fmt.Errorf("sender: sendAlert: RunRecord not set")
887
		s.logger.CaptureFatalAndPanic("sender received error", err)
888
	}
889
	// TODO: handle invalid alert levels
890
	severity := gql.AlertSeverity(alert.Level)
891

892
	data, err := gql.NotifyScriptableRunAlert(
893
		s.ctx,
894
		s.graphqlClient,
895
		s.RunRecord.Entity,
896
		s.RunRecord.Project,
897
		s.RunRecord.RunId,
898
		alert.Title,
899
		alert.Text,
900
		&severity,
901
		&alert.WaitDuration,
902
	)
903
	if err != nil {
904
		err = fmt.Errorf("sender: sendAlert: failed to notify scriptable run alert: %s", err)
905
		s.logger.CaptureError("sender received error", err)
906
	} else {
907
		s.logger.Info("sender: sendAlert: notified scriptable run alert", "data", data)
908
	}
909

910
}
911

912
// respondExit called from the end of the defer state machine
913
func (s *Sender) respondExit(record *service.Record) {
914
	if record == nil || s.settings.GetXSync().GetValue() {
915
		return
916
	}
917
	if record.Control.ReqResp || record.Control.MailboxSlot != "" {
918
		result := &service.Result{
919
			ResultType: &service.Result_ExitResult{ExitResult: &service.RunExitResult{}},
920
			Control:    record.Control,
921
			Uuid:       record.Uuid,
922
		}
923
		s.outChan <- result
924
	}
925
}
926

927
// sendExit sends an exit record to the server and triggers the shutdown of the stream
928
func (s *Sender) sendExit(record *service.Record, _ *service.RunExitRecord) {
929
	// response is done by respondExit() and called when defer state machine is complete
930
	s.exitRecord = record
931

932
	s.fileStream.StreamRecord(record)
933

934
	// send a defer request to the handler to indicate that the user requested to finish the stream
935
	// and the defer state machine can kick in triggering the shutdown process
936
	request := &service.Request{RequestType: &service.Request_Defer{
937
		Defer: &service.DeferRequest{State: service.DeferRequest_BEGIN}},
938
	}
939
	if record.Control == nil {
940
		record.Control = &service.Control{AlwaysSend: true}
941
	}
942

943
	rec := &service.Record{
944
		RecordType: &service.Record_Request{Request: request},
945
		Control:    record.Control,
946
		Uuid:       record.Uuid,
947
	}
948
	s.fwdChan <- rec
949
}
950

951
// sendMetric sends a metrics record to the file stream,
952
// which will then send it to the server
953
func (s *Sender) sendMetric(record *service.Record, metric *service.MetricRecord) {
954
	if s.metricSender == nil {
955
		s.metricSender = NewMetricSender()
956
	}
957

958
	if metric.GetGlobName() != "" {
959
		s.logger.Warn("sender: sendMetric: glob name is not supported in the backend", "globName", metric.GetGlobName())
960
		return
961
	}
962

963
	s.encodeMetricHints(record, metric)
964
	s.updateConfigPrivate()
965
	s.sendConfig(nil, nil /*configRecord*/)
966
}
967

968
// sendFiles iterates over the files in the FilesRecord and sends them to
969
func (s *Sender) sendFiles(_ *service.Record, filesRecord *service.FilesRecord) {
970
	files := filesRecord.GetFiles()
971
	for _, file := range files {
972
		if strings.HasPrefix(file.GetPath(), "media") {
973
			file.Type = service.FilesItem_MEDIA
974
		}
975
		s.wgFileTransfer.Add(1)
976
		go func(file *service.FilesItem) {
977
			s.sendFile(file)
978
			s.wgFileTransfer.Done()
979
		}(file)
980
	}
981
}
982

983
// sendFile sends a file to the server
984
// TODO: improve this to handle multiple files and send them in one request
985
func (s *Sender) sendFile(file *service.FilesItem) {
986
	if s.graphqlClient == nil || s.fileTransferManager == nil {
987
		return
988
	}
989

990
	if s.RunRecord == nil {
991
		err := fmt.Errorf("sender: sendFile: RunRecord not set")
992
		s.logger.CaptureFatalAndPanic("sender received error", err)
993
	}
994

995
	fullPath := filepath.Join(s.settings.GetFilesDir().GetValue(), file.GetPath())
996
	if _, err := os.Stat(fullPath); os.IsNotExist(err) {
997
		s.logger.Warn("sender: sendFile: file does not exist", "path", fullPath)
998
		return
999
	}
1000

1001
	data, err := gql.CreateRunFiles(
1002
		s.ctx,
1003
		s.graphqlClient,
1004
		s.RunRecord.Entity,
1005
		s.RunRecord.Project,
1006
		s.RunRecord.RunId,
1007
		[]string{file.GetPath()},
1008
	)
1009
	if err != nil {
1010
		err = fmt.Errorf("sender: sendFile: failed to get upload urls: %s", err)
1011
		s.logger.CaptureError("sender received error", err)
1012
		return
1013
	}
1014
	headers := data.GetCreateRunFiles().GetUploadHeaders()
1015
	for _, f := range data.GetCreateRunFiles().GetFiles() {
1016
		fullPath := filepath.Join(s.settings.GetFilesDir().GetValue(), f.Name)
1017
		task := &filetransfer.Task{
1018
			Type:    filetransfer.UploadTask,
1019
			Path:    fullPath,
1020
			Name:    f.Name,
1021
			Url:     *f.UploadUrl,
1022
			Headers: headers,
1023
		}
1024

1025
		task.SetProgressCallback(
1026
			func(processed, total int) {
1027
				if processed == 0 {
1028
					return
1029
				}
1030
				record := &service.Record{
1031
					RecordType: &service.Record_Request{
1032
						Request: &service.Request{
1033
							RequestType: &service.Request_FileTransferInfo{
1034
								FileTransferInfo: &service.FileTransferInfoRequest{
1035
									Type:      service.FileTransferInfoRequest_Upload,
1036
									Path:      fullPath,
1037
									Size:      int64(total),
1038
									Processed: int64(processed),
1039
								},
1040
							},
1041
						},
1042
					},
1043
				}
1044
				s.fwdChan <- record
1045
			},
1046
		)
1047
		task.SetCompletionCallback(
1048
			func(t *filetransfer.Task) {
1049
				s.fileTransferManager.FileStreamCallback(t)
1050
				fileCounts := &service.FileCounts{}
1051
				switch file.GetType() {
1052
				case service.FilesItem_MEDIA:
1053
					fileCounts.MediaCount = 1
1054
				case service.FilesItem_OTHER:
1055
					fileCounts.OtherCount = 1
1056
				case service.FilesItem_WANDB:
1057
					fileCounts.WandbCount = 1
1058
				}
1059

1060
				record := &service.Record{
1061
					RecordType: &service.Record_Request{
1062
						Request: &service.Request{
1063
							RequestType: &service.Request_FileTransferInfo{
1064
								FileTransferInfo: &service.FileTransferInfoRequest{
1065
									Type:       service.FileTransferInfoRequest_Upload,
1066
									Path:       fullPath,
1067
									Size:       t.Size,
1068
									Processed:  t.Size,
1069
									FileCounts: fileCounts,
1070
								},
1071
							},
1072
						},
1073
					},
1074
				}
1075
				s.fwdChan <- record
1076
			},
1077
		)
1078
		s.fileTransferManager.AddTask(task)
1079
	}
1080
}
1081

1082
func (s *Sender) sendArtifact(record *service.Record, msg *service.ArtifactRecord) {
1083
	saver := artifacts.NewArtifactSaver(
1084
		s.ctx, s.graphqlClient, s.fileTransferManager, msg, 0, "",
1085
	)
1086
	artifactID, err := saver.Save(s.fwdChan)
1087
	if err != nil {
1088
		err = fmt.Errorf("sender: sendArtifact: failed to log artifact ID: %s; error: %s", artifactID, err)
1089
		s.logger.Error("sender: sendArtifact:", "error", err)
1090
		return
1091
	}
1092
}
1093

1094
func (s *Sender) sendLogArtifact(record *service.Record, msg *service.LogArtifactRequest) {
1095
	var response service.LogArtifactResponse
1096
	saver := artifacts.NewArtifactSaver(
1097
		s.ctx, s.graphqlClient, s.fileTransferManager, msg.Artifact, msg.HistoryStep, msg.StagingDir,
1098
	)
1099
	artifactID, err := saver.Save(s.fwdChan)
1100
	if err != nil {
1101
		response.ErrorMessage = err.Error()
1102
	} else {
1103
		response.ArtifactId = artifactID
1104
	}
1105

1106
	result := &service.Result{
1107
		ResultType: &service.Result_Response{
1108
			Response: &service.Response{
1109
				ResponseType: &service.Response_LogArtifactResponse{
1110
					LogArtifactResponse: &response,
1111
				},
1112
			},
1113
		},
1114
		Control: record.Control,
1115
		Uuid:    record.Uuid,
1116
	}
1117
	s.jobBuilder.HandleLogArtifactResult(&response, msg.Artifact)
1118
	s.outChan <- result
1119
}
1120

1121
func (s *Sender) sendDownloadArtifact(record *service.Record, msg *service.DownloadArtifactRequest) {
1122
	// TODO: this should be handled by a separate service starup mechanism
1123
	s.fileTransferManager.Start()
1124

1125
	var response service.DownloadArtifactResponse
1126
	downloader := artifacts.NewArtifactDownloader(s.ctx, s.graphqlClient, s.fileTransferManager, msg.ArtifactId, msg.DownloadRoot, &msg.AllowMissingReferences)
1127
	err := downloader.Download()
1128
	if err != nil {
1129
		s.logger.CaptureError("senderError: downloadArtifact: failed to download artifact: %v", err)
1130
		response.ErrorMessage = err.Error()
1131
	}
1132

1133
	result := &service.Result{
1134
		ResultType: &service.Result_Response{
1135
			Response: &service.Response{
1136
				ResponseType: &service.Response_DownloadArtifactResponse{
1137
					DownloadArtifactResponse: &response,
1138
				},
1139
			},
1140
		},
1141
		Control: record.Control,
1142
		Uuid:    record.Uuid,
1143
	}
1144
	s.outChan <- result
1145
}
1146

1147
func (s *Sender) sendSync(record *service.Record, request *service.SyncRequest) {
1148

1149
	s.syncService = NewSyncService(s.ctx,
1150
		WithSyncServiceLogger(s.logger),
1151
		WithSyncServiceSenderFunc(s.sendRecord),
1152
		WithSyncServiceOverwrite(request.GetOverwrite()),
1153
		WithSyncServiceSkip(request.GetSkip()),
1154
		WithSyncServiceFlushCallback(func(err error) {
1155
			var errorInfo *service.ErrorInfo
1156
			if err != nil {
1157
				errorInfo = &service.ErrorInfo{
1158
					Message: err.Error(),
1159
					Code:    service.ErrorInfo_UNKNOWN,
1160
				}
1161
			}
1162

1163
			var url string
1164
			if s.RunRecord != nil {
1165
				baseUrl := s.settings.GetBaseUrl().GetValue()
1166
				baseUrl = strings.Replace(baseUrl, "api.", "", 1)
1167
				url = fmt.Sprintf("%s/%s/%s/runs/%s", baseUrl, s.RunRecord.Entity, s.RunRecord.Project, s.RunRecord.RunId)
1168
			}
1169
			result := &service.Result{
1170
				ResultType: &service.Result_Response{
1171
					Response: &service.Response{
1172
						ResponseType: &service.Response_SyncResponse{
1173
							SyncResponse: &service.SyncResponse{
1174
								Url:   url,
1175
								Error: errorInfo,
1176
							},
1177
						},
1178
					},
1179
				},
1180
				Control: record.Control,
1181
				Uuid:    record.Uuid,
1182
			}
1183
			s.outChan <- result
1184
		}),
1185
	)
1186
	s.syncService.Start()
1187

1188
	rec := &service.Record{
1189
		RecordType: &service.Record_Request{
1190
			Request: &service.Request{
1191
				RequestType: &service.Request_SenderRead{
1192
					SenderRead: &service.SenderReadRequest{
1193
						StartOffset: request.GetStartOffset(),
1194
						FinalOffset: request.GetFinalOffset(),
1195
					},
1196
				},
1197
			},
1198
		},
1199
		Control: record.Control,
1200
		Uuid:    record.Uuid,
1201
	}
1202
	s.fwdChan <- rec
1203
}
1204

1205
func (s *Sender) sendSenderRead(record *service.Record, request *service.SenderReadRequest) {
1206
	if s.store == nil {
1207
		store := NewStore(s.ctx, s.settings.GetSyncFile().GetValue(), s.logger)
1208
		err := store.Open(os.O_RDONLY)
1209
		if err != nil {
1210
			s.logger.CaptureError("sender: sendSenderRead: failed to create store", err)
1211
			return
1212
		}
1213
		s.store = store
1214
	}
1215
	// TODO:
1216
	// 1. seek to startOffset
1217
	//
1218
	// if err := s.store.reader.SeekRecord(request.GetStartOffset()); err != nil {
1219
	// 	s.logger.CaptureError("sender: sendSenderRead: failed to seek record", err)
1220
	// 	return
1221
	// }
1222
	// 2. read records until finalOffset
1223
	//
1224
	for {
1225
		record, err := s.store.Read()
1226
		if s.settings.GetXSync().GetValue() {
1227
			s.syncService.SyncRecord(record, err)
1228
		} else if record != nil {
1229
			s.sendRecord(record)
1230
		}
1231
		if err == io.EOF {
1232
			return
1233
		}
1234
		if err != nil {
1235
			s.logger.CaptureError("sender: sendSenderRead: failed to read record", err)
1236
			return
1237
		}
1238
	}
1239
}
1240

1241
func (s *Sender) getServerInfo() {
1242
	if s.graphqlClient == nil {
1243
		return
1244
	}
1245

1246
	data, err := gql.ServerInfo(s.ctx, s.graphqlClient)
1247
	if err != nil {
1248
		err = fmt.Errorf("sender: getServerInfo: failed to get server info: %s", err)
1249
		s.logger.CaptureError("sender received error", err)
1250
		return
1251
	}
1252
	s.serverInfo = data.GetServerInfo()
1253

1254
	s.logger.Info("sender: getServerInfo: got server info", "serverInfo", s.serverInfo)
1255
}
1256

1257
// TODO: this function is for deciding which GraphQL query/mutation versions to use
1258
// func (s *Sender) getServerVersion() string {
1259
// 	if s.serverInfo == nil {
1260
// 		return ""
1261
// 	}
1262
// 	return s.serverInfo.GetLatestLocalVersionInfo().GetVersionOnThisInstanceString()
1263
// }
1264

1265
func (s *Sender) sendServerInfo(record *service.Record, _ *service.ServerInfoRequest) {
1266

1267
	localInfo := &service.LocalInfo{}
1268
	if s.serverInfo != nil && s.serverInfo.GetLatestLocalVersionInfo() != nil {
1269
		localInfo = &service.LocalInfo{
1270
			Version:   s.serverInfo.GetLatestLocalVersionInfo().GetLatestVersionString(),
1271
			OutOfDate: s.serverInfo.GetLatestLocalVersionInfo().GetOutOfDate(),
1272
		}
1273
	}
1274

1275
	result := &service.Result{
1276
		ResultType: &service.Result_Response{
1277
			Response: &service.Response{
1278
				ResponseType: &service.Response_ServerInfoResponse{
1279
					ServerInfoResponse: &service.ServerInfoResponse{
1280
						LocalInfo: localInfo,
1281
					},
1282
				},
1283
			},
1284
		},
1285
		Control: record.Control,
1286
		Uuid:    record.Uuid,
1287
	}
1288
	s.outChan <- result
1289
}
1290

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.