Dragonfly2
288 строк · 6.3 Кб
1/*
2* Copyright 2020 The Dragonfly Authors
3*
4* Licensed under the Apache License, Version 2.0 (the "License");
5* you may not use this file except in compliance with the License.
6* You may obtain a copy of the License at
7*
8* http://www.apache.org/licenses/LICENSE-2.0
9*
10* Unless required by applicable law or agreed to in writing, software
11* distributed under the License is distributed on an "AS IS" BASIS,
12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13* See the License for the specific language governing permissions and
14* limitations under the License.
15*/
16
17package main
18
19import (
20"context"
21"flag"
22"fmt"
23"io"
24"log"
25"net"
26"net/http"
27"net/url"
28"os"
29"os/signal"
30"sort"
31"sync"
32"syscall"
33"time"
34
35"github.com/go-echarts/statsview"
36"github.com/go-echarts/statsview/viewer"
37"github.com/montanaflynn/stats"
38
39"d7y.io/dragonfly/v2/client/config"
40"d7y.io/dragonfly/v2/pkg/net/ip"
41"d7y.io/dragonfly/v2/pkg/unit"
42)
43
44var (
45target string
46output string
47proxy string
48con int
49duration *time.Duration
50)
51
52func init() {
53flag.StringVar(&target, "url", "", "target url for stress testing, example: http://localhost")
54flag.StringVar(&output, "output", "/tmp/statistics.txt", "all request statistics")
55flag.StringVar(&proxy, "proxy", "", "target proxy for downloading, example: http://127.0.0.1:65001")
56flag.IntVar(&con, "connections", 100, "concurrency count of connections")
57duration = flag.Duration("duration", 100*time.Second, "testing duration")
58}
59
60type Result struct {
61StatusCode int
62StartTime time.Time
63EndTime time.Time
64Cost time.Duration
65TaskID string
66PeerID string
67Size int64
68Message string
69}
70
71func main() {
72go debug()
73
74flag.Parse()
75
76var (
77wgProcess = &sync.WaitGroup{}
78wgCollect = &sync.WaitGroup{}
79)
80ctx, cancel := context.WithCancel(context.Background())
81resultCh := make(chan *Result, 1024)
82
83if proxy != "" {
84pu, err := url.Parse(proxy)
85if err != nil {
86panic(err)
87}
88http.DefaultClient.Transport = &http.Transport{
89Proxy: http.ProxyURL(pu),
90DialContext: (&net.Dialer{
91Timeout: 30 * time.Second,
92KeepAlive: 30 * time.Second,
93}).DialContext,
94ForceAttemptHTTP2: true,
95MaxIdleConns: 100,
96IdleConnTimeout: 90 * time.Second,
97TLSHandshakeTimeout: 10 * time.Second,
98ExpectContinueTimeout: 1 * time.Second,
99}
100}
101
102wgCollect.Add(1)
103go collect(wgCollect, resultCh)
104
105for i := 0; i < con; i++ {
106wgProcess.Add(1)
107go process(ctx, wgProcess, resultCh)
108}
109
110signals := make(chan os.Signal, 1)
111signal.Notify(signals, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGTERM)
112go forceExit(signals)
113
114loop:
115for {
116select {
117case <-time.After(*duration):
118break loop
119case sig := <-signals:
120log.Printf("receive signal: %v", sig)
121break loop
122}
123}
124cancel()
125wgProcess.Wait()
126close(resultCh)
127wgCollect.Wait()
128}
129
130func debug() {
131debugAddr := fmt.Sprintf("%s:%d", ip.IPv4.String(), 18066)
132viewer.SetConfiguration(viewer.WithAddr(debugAddr))
133if err := statsview.New().Start(); err != nil {
134log.Println("stat view start failed", err)
135}
136}
137
138func forceExit(signals chan os.Signal) {
139var count int
140for {
141select {
142case <-signals:
143count++
144if count > 2 {
145log.Printf("force exit")
146os.Exit(1)
147}
148}
149}
150}
151
152func process(ctx context.Context, wg *sync.WaitGroup, result chan *Result) {
153defer wg.Done()
154for {
155select {
156case <-ctx.Done():
157return
158default:
159}
160
161start := time.Now()
162resp, err := http.Get(target)
163if err != nil {
164log.Printf("connect target error: %s", err)
165continue
166}
167var msg string
168n, err := io.Copy(io.Discard, resp.Body)
169if err != nil {
170msg = err.Error()
171log.Printf("discard data error: %s", err)
172}
173end := time.Now()
174result <- &Result{
175StatusCode: resp.StatusCode,
176StartTime: start,
177EndTime: end,
178Cost: end.Sub(start),
179Size: n,
180TaskID: resp.Header.Get(config.HeaderDragonflyTask),
181PeerID: resp.Header.Get(config.HeaderDragonflyPeer),
182Message: msg,
183}
184resp.Body.Close()
185}
186}
187
188func collect(wg *sync.WaitGroup, resultCh chan *Result) {
189defer wg.Done()
190var results = make([]*Result, 0, 1000)
191loop:
192for {
193select {
194case result, ok := <-resultCh:
195if !ok {
196break loop
197}
198results = append(results, result)
199}
200}
201
202printStatistics(results)
203saveToOutput(results)
204}
205
206func printStatistics(results []*Result) {
207sort.Slice(results, func(i, j int) bool {
208return results[i].Cost < results[j].Cost
209})
210printLatency(results)
211printStatus(results)
212printThroughput(results)
213}
214
215func printStatus(results []*Result) {
216var status = make(map[int]int)
217for _, v := range results {
218status[v.StatusCode]++
219}
220
221fmt.Printf("HTTP codes\n")
222for code, count := range status {
223fmt.Printf("\t%d\t %d\n", code, count)
224}
225}
226
227func printLatency(results []*Result) {
228var dur []int64
229
230for _, v := range results {
231if v.StatusCode == 200 {
232dur = append(dur, v.EndTime.Sub(v.StartTime).Nanoseconds())
233}
234}
235if len(dur) == 0 {
236log.Printf("empty result with 200 status")
237return
238}
239
240d := stats.LoadRawData(dur)
241
242min, _ := stats.Min(d)
243max, _ := stats.Max(d)
244mean, _ := stats.Mean(d)
245fmt.Printf("Latency\n")
246fmt.Printf("\tavg\t %v\n", time.Duration(int64(mean)))
247fmt.Printf("\tmin\t %v\n", time.Duration(int64(min)))
248fmt.Printf("\tmax\t %v\n", time.Duration(int64(max)))
249
250fmt.Printf("Latency Distribution\n")
251for _, p := range []float64{50, 75, 90, 95, 99} {
252percentile, err := stats.Percentile(d, p)
253if err != nil {
254panic(err)
255}
256fmt.Printf("\t%.0f%%\t%v\n", p, time.Duration(int64(percentile)))
257}
258}
259
260func printThroughput(results []*Result) {
261var total int64
262for _, v := range results {
263total += v.Size
264}
265fmt.Printf("Throughput\t%v\n", unit.Bytes(total/int64(*duration/time.Second)))
266fmt.Printf("Request\t\t%d/s\n", len(results)/int(*duration/time.Second))
267}
268
269func saveToOutput(results []*Result) {
270out, err := os.OpenFile(output, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0644)
271if err != nil {
272panic(err)
273}
274defer out.Close()
275for _, v := range results {
276if v.TaskID == "" {
277v.TaskID = "unknown"
278}
279if v.PeerID == "" {
280v.PeerID = "unknown"
281}
282if _, err := out.WriteString(fmt.Sprintf("%s %s %d %v %d %d %s\n",
283v.TaskID, v.PeerID, v.StatusCode, v.Cost,
284v.StartTime.UnixNano()/100, v.EndTime.UnixNano()/100, v.Message)); err != nil {
285log.Panicln("write string failed", err)
286}
287}
288}
289