kraken
1// Copyright (c) 2016-2019 Uber Technologies, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14package dedup15
16import (17"sync"18"time"19
20"github.com/andres-erbsen/clock"21)
22
23// TaskGCInterval is the interval in which garbage collection of old tasks runs.
24const TaskGCInterval = time.Minute25
26// TaskRunner runs against some input and produces some output w/ a ttl.
27type TaskRunner interface {28Run(input interface{}) (output interface{}, ttl time.Duration)29}
30
31type task struct {32input interface{}33
34cond *sync.Cond35running bool36output interface{}37expiresAt time.Time38}
39
40func newTask(input interface{}) *task {41return &task{42input: input,43cond: sync.NewCond(new(sync.Mutex)),44}45}
46
47func (t *task) expired(now time.Time) bool {48return now.After(t.expiresAt)49}
50
51// Limiter deduplicates the running of a common task within a given limit. Tasks
52// are deduplicated based on input equality.
53type Limiter struct {54sync.RWMutex55clk clock.Clock56runner TaskRunner
57tasks map[interface{}]*task58gc *IntervalTrap59}
60
61// NewLimiter creates a new Limiter for tasks. The limit is determined per task
62// via the TaskRunner.
63func NewLimiter(clk clock.Clock, runner TaskRunner) *Limiter {64l := &Limiter{65clk: clk,66runner: runner,67tasks: make(map[interface{}]*task),68}69l.gc = NewIntervalTrap(TaskGCInterval, clk, &limiterTaskGC{l})70return l71}
72
73// Run runs a task with input.
74func (l *Limiter) Run(input interface{}) interface{} {75l.gc.Trap()76
77l.RLock()78t, ok := l.tasks[input]79l.RUnlock()80if !ok {81// Slow path, must initialize task struct under global write lock.82l.Lock()83t, ok = l.tasks[input]84if !ok {85t = newTask(input)86l.tasks[input] = t87}88l.Unlock()89}90return l.getOutput(t)91}
92
93func (l *Limiter) getOutput(t *task) interface{} {94t.cond.L.Lock()95
96if !t.expired(l.clk.Now()) {97defer t.cond.L.Unlock()98return t.output99}100
101if t.running {102t.cond.Wait()103defer t.cond.L.Unlock()104return t.output105}106
107t.running = true108t.cond.L.Unlock()109
110output, ttl := l.runner.Run(t.input)111
112t.cond.L.Lock()113t.output = output114t.expiresAt = l.clk.Now().Add(ttl)115t.running = false116t.cond.L.Unlock()117
118t.cond.Broadcast()119
120return output121}
122
123type limiterTaskGC struct {124limiter *Limiter125}
126
127func (gc *limiterTaskGC) Run() {128gc.limiter.Lock()129defer gc.limiter.Unlock()130
131for input, t := range gc.limiter.tasks {132t.cond.L.Lock()133expired := t.expired(gc.limiter.clk.Now()) && !t.running134t.cond.L.Unlock()135if expired {136delete(gc.limiter.tasks, input)137}138}139}
140