6
"github.com/wandb/wandb/core/pkg/observability"
7
"github.com/wandb/wandb/core/pkg/service"
10
type Responder interface {
11
Respond(response *service.ServerResponse)
14
type ResponderEntry struct {
19
type Dispatcher struct {
20
responders map[string]Responder
21
logger *observability.CoreLogger
24
// AddResponders adds the given responders to the stream's dispatcher.
25
func (d *Dispatcher) AddResponders(entries ...ResponderEntry) {
26
if d.responders == nil {
27
d.responders = make(map[string]Responder)
29
for _, entry := range entries {
30
responderId := entry.ID
31
if _, ok := d.responders[responderId]; !ok {
32
d.responders[responderId] = entry.Responder
34
d.logger.CaptureWarn("Responder already exists", "responder", responderId)
39
func (d *Dispatcher) handleRespond(result *service.Result) {
40
responderId := result.GetControl().GetConnectionId()
41
d.logger.Debug("dispatch: got result", "result", result)
42
if responderId == "" {
43
d.logger.Debug("dispatch: got result with no connection id", "result", result)
46
response := &service.ServerResponse{
47
ServerResponseType: &service.ServerResponse_ResultCommunicate{
48
ResultCommunicate: result,
51
if responder, ok := d.responders[responderId]; ok {
52
responder.Respond(response)
54
err := fmt.Errorf("dispatch: no responder found: %s", responderId)
55
d.logger.CaptureFatalAndPanic("dispatch: no responder found", err)
59
func NewDispatcher(logger *observability.CoreLogger) *Dispatcher {
62
responders: make(map[string]Responder),