Dragonfly2

Форк
0
288 строк · 6.3 Кб
1
/*
2
 *     Copyright 2020 The Dragonfly Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *      http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package main
18

19
import (
20
	"context"
21
	"flag"
22
	"fmt"
23
	"io"
24
	"log"
25
	"net"
26
	"net/http"
27
	"net/url"
28
	"os"
29
	"os/signal"
30
	"sort"
31
	"sync"
32
	"syscall"
33
	"time"
34

35
	"github.com/go-echarts/statsview"
36
	"github.com/go-echarts/statsview/viewer"
37
	"github.com/montanaflynn/stats"
38

39
	"d7y.io/dragonfly/v2/client/config"
40
	"d7y.io/dragonfly/v2/pkg/net/ip"
41
	"d7y.io/dragonfly/v2/pkg/unit"
42
)
43

44
var (
45
	target   string
46
	output   string
47
	proxy    string
48
	con      int
49
	duration *time.Duration
50
)
51

52
func init() {
53
	flag.StringVar(&target, "url", "", "target url for stress testing, example: http://localhost")
54
	flag.StringVar(&output, "output", "/tmp/statistics.txt", "all request statistics")
55
	flag.StringVar(&proxy, "proxy", "", "target proxy for downloading, example: http://127.0.0.1:65001")
56
	flag.IntVar(&con, "connections", 100, "concurrency count of connections")
57
	duration = flag.Duration("duration", 100*time.Second, "testing duration")
58
}
59

60
type Result struct {
61
	StatusCode int
62
	StartTime  time.Time
63
	EndTime    time.Time
64
	Cost       time.Duration
65
	TaskID     string
66
	PeerID     string
67
	Size       int64
68
	Message    string
69
}
70

71
func main() {
72
	go debug()
73

74
	flag.Parse()
75

76
	var (
77
		wgProcess = &sync.WaitGroup{}
78
		wgCollect = &sync.WaitGroup{}
79
	)
80
	ctx, cancel := context.WithCancel(context.Background())
81
	resultCh := make(chan *Result, 1024)
82

83
	if proxy != "" {
84
		pu, err := url.Parse(proxy)
85
		if err != nil {
86
			panic(err)
87
		}
88
		http.DefaultClient.Transport = &http.Transport{
89
			Proxy: http.ProxyURL(pu),
90
			DialContext: (&net.Dialer{
91
				Timeout:   30 * time.Second,
92
				KeepAlive: 30 * time.Second,
93
			}).DialContext,
94
			ForceAttemptHTTP2:     true,
95
			MaxIdleConns:          100,
96
			IdleConnTimeout:       90 * time.Second,
97
			TLSHandshakeTimeout:   10 * time.Second,
98
			ExpectContinueTimeout: 1 * time.Second,
99
		}
100
	}
101

102
	wgCollect.Add(1)
103
	go collect(wgCollect, resultCh)
104

105
	for i := 0; i < con; i++ {
106
		wgProcess.Add(1)
107
		go process(ctx, wgProcess, resultCh)
108
	}
109

110
	signals := make(chan os.Signal, 1)
111
	signal.Notify(signals, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGTERM)
112
	go forceExit(signals)
113

114
loop:
115
	for {
116
		select {
117
		case <-time.After(*duration):
118
			break loop
119
		case sig := <-signals:
120
			log.Printf("receive signal: %v", sig)
121
			break loop
122
		}
123
	}
124
	cancel()
125
	wgProcess.Wait()
126
	close(resultCh)
127
	wgCollect.Wait()
128
}
129

130
func debug() {
131
	debugAddr := fmt.Sprintf("%s:%d", ip.IPv4.String(), 18066)
132
	viewer.SetConfiguration(viewer.WithAddr(debugAddr))
133
	if err := statsview.New().Start(); err != nil {
134
		log.Println("stat view start failed", err)
135
	}
136
}
137

138
func forceExit(signals chan os.Signal) {
139
	var count int
140
	for {
141
		select {
142
		case <-signals:
143
			count++
144
			if count > 2 {
145
				log.Printf("force exit")
146
				os.Exit(1)
147
			}
148
		}
149
	}
150
}
151

152
func process(ctx context.Context, wg *sync.WaitGroup, result chan *Result) {
153
	defer wg.Done()
154
	for {
155
		select {
156
		case <-ctx.Done():
157
			return
158
		default:
159
		}
160

161
		start := time.Now()
162
		resp, err := http.Get(target)
163
		if err != nil {
164
			log.Printf("connect target error: %s", err)
165
			continue
166
		}
167
		var msg string
168
		n, err := io.Copy(io.Discard, resp.Body)
169
		if err != nil {
170
			msg = err.Error()
171
			log.Printf("discard data error: %s", err)
172
		}
173
		end := time.Now()
174
		result <- &Result{
175
			StatusCode: resp.StatusCode,
176
			StartTime:  start,
177
			EndTime:    end,
178
			Cost:       end.Sub(start),
179
			Size:       n,
180
			TaskID:     resp.Header.Get(config.HeaderDragonflyTask),
181
			PeerID:     resp.Header.Get(config.HeaderDragonflyPeer),
182
			Message:    msg,
183
		}
184
		resp.Body.Close()
185
	}
186
}
187

188
func collect(wg *sync.WaitGroup, resultCh chan *Result) {
189
	defer wg.Done()
190
	var results = make([]*Result, 0, 1000)
191
loop:
192
	for {
193
		select {
194
		case result, ok := <-resultCh:
195
			if !ok {
196
				break loop
197
			}
198
			results = append(results, result)
199
		}
200
	}
201

202
	printStatistics(results)
203
	saveToOutput(results)
204
}
205

206
func printStatistics(results []*Result) {
207
	sort.Slice(results, func(i, j int) bool {
208
		return results[i].Cost < results[j].Cost
209
	})
210
	printLatency(results)
211
	printStatus(results)
212
	printThroughput(results)
213
}
214

215
func printStatus(results []*Result) {
216
	var status = make(map[int]int)
217
	for _, v := range results {
218
		status[v.StatusCode]++
219
	}
220

221
	fmt.Printf("HTTP codes\n")
222
	for code, count := range status {
223
		fmt.Printf("\t%d\t %d\n", code, count)
224
	}
225
}
226

227
func printLatency(results []*Result) {
228
	var dur []int64
229

230
	for _, v := range results {
231
		if v.StatusCode == 200 {
232
			dur = append(dur, v.EndTime.Sub(v.StartTime).Nanoseconds())
233
		}
234
	}
235
	if len(dur) == 0 {
236
		log.Printf("empty result with 200 status")
237
		return
238
	}
239

240
	d := stats.LoadRawData(dur)
241

242
	min, _ := stats.Min(d)
243
	max, _ := stats.Max(d)
244
	mean, _ := stats.Mean(d)
245
	fmt.Printf("Latency\n")
246
	fmt.Printf("\tavg\t %v\n", time.Duration(int64(mean)))
247
	fmt.Printf("\tmin\t %v\n", time.Duration(int64(min)))
248
	fmt.Printf("\tmax\t %v\n", time.Duration(int64(max)))
249

250
	fmt.Printf("Latency Distribution\n")
251
	for _, p := range []float64{50, 75, 90, 95, 99} {
252
		percentile, err := stats.Percentile(d, p)
253
		if err != nil {
254
			panic(err)
255
		}
256
		fmt.Printf("\t%.0f%%\t%v\n", p, time.Duration(int64(percentile)))
257
	}
258
}
259

260
func printThroughput(results []*Result) {
261
	var total int64
262
	for _, v := range results {
263
		total += v.Size
264
	}
265
	fmt.Printf("Throughput\t%v\n", unit.Bytes(total/int64(*duration/time.Second)))
266
	fmt.Printf("Request\t\t%d/s\n", len(results)/int(*duration/time.Second))
267
}
268

269
func saveToOutput(results []*Result) {
270
	out, err := os.OpenFile(output, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0644)
271
	if err != nil {
272
		panic(err)
273
	}
274
	defer out.Close()
275
	for _, v := range results {
276
		if v.TaskID == "" {
277
			v.TaskID = "unknown"
278
		}
279
		if v.PeerID == "" {
280
			v.PeerID = "unknown"
281
		}
282
		if _, err := out.WriteString(fmt.Sprintf("%s %s %d %v %d %d %s\n",
283
			v.TaskID, v.PeerID, v.StatusCode, v.Cost,
284
			v.StartTime.UnixNano()/100, v.EndTime.UnixNano()/100, v.Message)); err != nil {
285
			log.Panicln("write string failed", err)
286
		}
287
	}
288
}
289

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.