wandb
1package server_test
2
3import (
4"testing"
5
6"github.com/wandb/wandb/core/pkg/service"
7)
8
9type data struct {
10items map[string]string
11step int64
12flush bool
13stepNil bool
14flushNil bool
15}
16
17func makeFlushRecord() *service.Record {
18record := &service.Record{
19RecordType: &service.Record_Request{
20Request: &service.Request{
21RequestType: &service.Request_Defer{
22Defer: &service.DeferRequest{
23State: service.DeferRequest_FLUSH_PARTIAL_HISTORY,
24},
25},
26},
27},
28}
29return record
30}
31
32func makePartialHistoryRecord(d data) *service.Record {
33items := []*service.HistoryItem{}
34for k, v := range d.items {
35items = append(items, &service.HistoryItem{
36Key: k,
37ValueJson: v,
38})
39}
40partialHistoryRequest := &service.PartialHistoryRequest{
41Item: items,
42}
43if !d.stepNil {
44partialHistoryRequest.Step = &service.HistoryStep{Num: d.step}
45}
46if !d.flushNil {
47partialHistoryRequest.Action = &service.HistoryAction{Flush: d.flush}
48}
49record := &service.Record{
50RecordType: &service.Record_Request{
51Request: &service.Request{
52RequestType: &service.Request_PartialHistory{
53PartialHistory: partialHistoryRequest,
54},
55},
56},
57Control: &service.Control{
58MailboxSlot: "junk",
59},
60}
61return record
62}
63
64func makeHistoryRecord(d data) *service.Record {
65items := []*service.HistoryItem{}
66for k, v := range d.items {
67items = append(items, &service.HistoryItem{
68Key: k,
69ValueJson: v,
70})
71}
72history := &service.HistoryRecord{
73Item: items,
74Step: &service.HistoryStep{Num: d.step},
75}
76record := &service.Record{
77RecordType: &service.Record_History{
78History: history,
79},
80Control: &service.Control{
81MailboxSlot: "junk",
82},
83}
84return record
85}
86
87func makeOutput(record *service.Record) data {
88switch x := record.GetRecordType().(type) {
89case *service.Record_History:
90history := x.History
91if history == nil {
92return data{}
93}
94items := map[string]string{}
95for _, item := range history.Item {
96// if strings.HasPrefix(item.Key, "_") {
97// continue
98// }
99items[item.Key] = item.ValueJson
100}
101return data{
102items: items,
103step: history.Step.Num,
104}
105case *service.Record_Request:
106state := x.Request.GetDefer().GetState()
107if state != service.DeferRequest_FLUSH_PARTIAL_HISTORY {
108return data{}
109}
110return data{
111flush: true,
112}
113default:
114return data{}
115}
116}
117
118type testCase struct {
119name string
120input []data
121expected []data
122}
123
124func TestHandlePartialHistory(t *testing.T) {
125testCases := []testCase{
126{
127name: "NoFlushIncreaseStepFlush",
128input: []data{
129{
130items: map[string]string{
131"key1": "1",
132"key2": "2",
133},
134step: 0,
135flush: false,
136},
137{
138items: map[string]string{
139"key2": "3",
140},
141step: 1,
142flush: true,
143},
144},
145expected: []data{
146{
147items: map[string]string{
148"key1": "1",
149"key2": "2",
150},
151step: 0,
152},
153{
154items: map[string]string{
155"key2": "3",
156},
157step: 1,
158},
159{
160flush: true,
161},
162},
163},
164{
165name: "NoFlushNoIncreaseStepFlush",
166input: []data{
167{
168items: map[string]string{
169"key1": "1",
170"key2": "2",
171},
172step: 0,
173flush: false,
174},
175{
176items: map[string]string{
177"key2": "3",
178},
179step: 0,
180flush: true,
181},
182},
183expected: []data{
184{
185items: map[string]string{
186"key1": "1",
187"key2": "3",
188},
189step: 0,
190},
191{
192flush: true,
193},
194},
195},
196{
197name: "FlushIncreaseStepFlush",
198input: []data{
199{
200items: map[string]string{
201"key1": "1",
202"key2": "2",
203},
204step: 0,
205flush: true,
206},
207{
208items: map[string]string{
209"key2": "3",
210},
211step: 1,
212flush: true,
213},
214},
215expected: []data{
216{
217items: map[string]string{
218"key1": "1",
219"key2": "2",
220},
221step: 0,
222},
223{
224items: map[string]string{
225"key2": "3",
226},
227step: 1,
228},
229{
230flush: true,
231},
232},
233},
234{
235name: "FlushNoIncreaseStepFlush",
236input: []data{
237{
238items: map[string]string{
239"key1": "1",
240"key2": "2",
241},
242step: 0,
243flush: true,
244},
245{
246items: map[string]string{
247"key2": "3",
248},
249step: 0,
250flush: true,
251},
252},
253expected: []data{
254{
255items: map[string]string{
256"key1": "1",
257"key2": "2",
258},
259step: 0,
260},
261{
262flush: true,
263},
264},
265},
266{
267name: "FlushIncreaseStepNoFlush",
268input: []data{
269{
270items: map[string]string{
271"key1": "1",
272"key2": "2",
273},
274step: 0,
275flush: true,
276},
277{
278items: map[string]string{
279"key2": "3",
280},
281step: 1,
282flush: false,
283},
284},
285expected: []data{
286{
287items: map[string]string{
288"key1": "1",
289"key2": "2",
290},
291step: 0,
292},
293{
294items: map[string]string{
295"key2": "3",
296},
297step: 1,
298},
299{
300flush: true,
301},
302},
303},
304{
305name: "FlushNoIncreaseStepNoFlush",
306input: []data{
307{
308items: map[string]string{
309"key1": "1",
310"key2": "2",
311},
312step: 0,
313flush: true,
314},
315{
316items: map[string]string{
317"key2": "3",
318},
319step: 0,
320flush: false,
321},
322},
323expected: []data{
324{
325items: map[string]string{
326"key1": "1",
327"key2": "2",
328},
329step: 0,
330},
331{
332flush: true,
333},
334},
335},
336{
337name: "NoFlushIncreaseStepNoFlush",
338input: []data{
339{
340items: map[string]string{
341"key1": "1",
342"key2": "2",
343},
344step: 0,
345flush: false,
346},
347{
348items: map[string]string{
349"key2": "3",
350},
351step: 1,
352flush: false,
353},
354},
355expected: []data{
356{
357items: map[string]string{
358"key1": "1",
359"key2": "2",
360},
361step: 0,
362},
363{
364items: map[string]string{
365"key2": "3",
366},
367step: 1,
368},
369{
370flush: true,
371},
372},
373},
374{
375name: "NoFlushNoIncreaseStepNoFlush",
376input: []data{
377{
378items: map[string]string{
379"key1": "1",
380"key2": "2",
381},
382step: 0,
383flush: false,
384},
385{
386items: map[string]string{
387"key2": "3",
388},
389step: 0,
390flush: false,
391},
392},
393expected: []data{
394{
395items: map[string]string{
396"key1": "1",
397"key2": "3",
398},
399step: 0,
400},
401{
402flush: true,
403},
404},
405},
406{
407name: "NilStepNilFlushNilStepNilFlush",
408input: []data{
409{
410items: map[string]string{
411"key1": "1",
412"key2": "2",
413},
414stepNil: true,
415flushNil: true,
416},
417{
418items: map[string]string{
419"key2": "3",
420},
421stepNil: true,
422flushNil: true,
423},
424},
425expected: []data{
426{
427items: map[string]string{
428"key1": "1",
429"key2": "2",
430},
431step: 0,
432},
433{
434items: map[string]string{
435"key2": "3",
436},
437step: 1,
438},
439{
440flush: true,
441},
442},
443},
444{
445name: "NilStepNilFlushNilStepNoFlush",
446input: []data{
447{
448items: map[string]string{
449"key1": "1",
450"key2": "2",
451},
452stepNil: true,
453flushNil: true,
454},
455{
456items: map[string]string{
457"key1": "2",
458"key2": "3",
459},
460stepNil: true,
461flush: false,
462},
463},
464expected: []data{
465{
466items: map[string]string{
467"key1": "1",
468"key2": "2",
469},
470step: 0,
471},
472{
473items: map[string]string{
474"key1": "2",
475"key2": "3",
476},
477step: 1,
478},
479{
480flush: true,
481},
482},
483},
484{
485name: "NilStepNilFlushNilStepFlush",
486input: []data{
487{
488items: map[string]string{
489"key1": "1",
490},
491stepNil: true,
492flushNil: true,
493},
494{
495items: map[string]string{
496"key1": "2",
497},
498stepNil: true,
499flush: true,
500},
501},
502expected: []data{
503{
504items: map[string]string{
505"key1": "1",
506},
507step: 0,
508},
509{
510items: map[string]string{
511"key1": "2",
512},
513step: 1,
514},
515{
516flush: true,
517},
518},
519},
520{
521name: "StepNoFlushNilStepNilFlush",
522input: []data{
523{
524items: map[string]string{
525"key1": "1",
526},
527step: 1,
528flush: false,
529},
530{
531items: map[string]string{
532"key1": "2",
533},
534stepNil: true,
535flushNil: true,
536},
537},
538expected: []data{
539{
540items: map[string]string{
541"key1": "2",
542},
543step: 1,
544},
545{
546flush: true,
547},
548},
549},
550{
551name: "StepFlushNilStepNilFlush",
552input: []data{
553{
554items: map[string]string{
555"key1": "1",
556},
557step: 1,
558flush: true,
559},
560{
561items: map[string]string{
562"key1": "2",
563},
564stepNil: true,
565flushNil: true,
566},
567},
568expected: []data{
569{
570items: map[string]string{
571"key1": "1",
572},
573step: 1,
574},
575{
576items: map[string]string{
577"key1": "2",
578},
579step: 2,
580},
581{
582flush: true,
583},
584},
585},
586{
587name: "StepNilFlushNilStepNilFlush",
588input: []data{
589{
590items: map[string]string{
591"key1": "1",
592},
593step: 1,
594flushNil: true,
595},
596{
597items: map[string]string{
598"key1": "2",
599},
600stepNil: true,
601flushNil: true,
602},
603},
604expected: []data{
605{
606items: map[string]string{
607"key1": "2",
608},
609step: 1,
610},
611{
612flush: true,
613},
614},
615},
616{
617name: "NilStepFlushNilStepNilFlush",
618input: []data{
619{
620items: map[string]string{
621"key1": "1",
622},
623stepNil: true,
624flush: true,
625},
626{
627items: map[string]string{
628"key1": "2",
629},
630stepNil: true,
631flushNil: true,
632},
633},
634expected: []data{
635{
636items: map[string]string{
637"key1": "1",
638},
639step: 0,
640},
641{
642items: map[string]string{
643"key1": "2",
644},
645step: 1,
646},
647{
648flush: true,
649},
650},
651},
652{
653name: "NilStepNoFlushNilStepNilFlush",
654input: []data{
655{
656items: map[string]string{
657"key1": "1",
658},
659stepNil: true,
660flush: false,
661},
662{
663items: map[string]string{
664"key1": "2",
665},
666stepNil: true,
667flushNil: true,
668},
669},
670expected: []data{
671{
672items: map[string]string{
673"key1": "2",
674},
675step: 0,
676},
677{
678flush: true,
679},
680},
681},
682}
683
684for _, tc := range testCases {
685t.Run(tc.name, func(t *testing.T) {
686inChan, loopbackChan := makeInboundChannels()
687fwdChan, outChan := makeOutboundChannels()
688
689makeHandler(inChan, loopbackChan, fwdChan, outChan, false)
690
691for _, d := range tc.input {
692record := makePartialHistoryRecord(d)
693inChan <- record
694}
695
696inChan <- makeFlushRecord()
697
698for _, d := range tc.expected {
699record := <-fwdChan
700actual := makeOutput(record)
701if actual.step != d.step {
702t.Errorf("expected step %v, got %v", d.step, actual.step)
703}
704for k, v := range d.items {
705if actual.items[k] != v {
706t.Errorf("expected %v, got %v", v, actual.items[k])
707}
708}
709if d.flush != actual.flush {
710t.Errorf("expected %v, got %v", d.flush, d.flush)
711}
712}
713},
714)
715}
716}
717
718func TestHandleHistory(t *testing.T) {
719testCases := []testCase{
720{
721name: "IncreaseStep",
722input: []data{
723{
724items: map[string]string{
725"key1": "1",
726"key2": "2",
727},
728step: 0,
729},
730{
731items: map[string]string{
732"key2": "3",
733},
734step: 1,
735},
736},
737expected: []data{
738{
739items: map[string]string{
740"key1": "1",
741"key2": "2",
742"_step": "0",
743"_runtime": "0.000000",
744},
745step: 0,
746},
747{
748items: map[string]string{
749"key2": "3",
750"_step": "1",
751"_runtime": "0.000000",
752},
753step: 1,
754},
755{
756flush: true,
757},
758},
759},
760// { // TODO: mock out time
761// name: "Timestamp",
762// input: []data{
763// {
764// items: map[string]string{
765// "key1": "1",
766// "key2": "2",
767// "_timestamp": "1.257894e+09",
768// },
769// step: 0,
770// },
771// },
772// expected: []data{
773// {
774// items: map[string]string{
775// "key1": "1",
776// "key2": "2",
777// "_runtime": "63393490800.000000",
778// "_step": "0",
779// },
780// step: 0,
781// },
782// {
783// flush: true,
784// },
785// },
786// },
787}
788for _, tc := range testCases {
789t.Run(tc.name, func(t *testing.T) {
790inChan, loopbackChan := makeInboundChannels()
791fwdChan, outChan := makeOutboundChannels()
792
793makeHandler(inChan, loopbackChan, fwdChan, outChan, false)
794
795for _, d := range tc.input {
796record := makeHistoryRecord(d)
797inChan <- record
798}
799
800inChan <- makeFlushRecord()
801
802for _, d := range tc.expected {
803record := <-fwdChan
804actual := makeOutput(record)
805if actual.step != d.step {
806t.Errorf("expected step %v, got %v", d.step, actual.step)
807}
808for k, v := range actual.items {
809if d.items[k] != v {
810t.Errorf("expected %v, got %v", v, actual.items[k])
811}
812}
813if d.flush != actual.flush {
814t.Errorf("expected %v, got %v", d.flush, d.flush)
815}
816}
817})
818}
819
820}
821