istio

Форк
0
244 строки · 7.2 Кб
1
// Copyright Istio Authors
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
package kubectlcmd
16

17
import (
18
	"bytes"
19
	"context"
20
	"fmt"
21
	"os/exec"
22
	"strings"
23
	"sync"
24
	"time"
25

26
	"istio.io/istio/operator/pkg/util"
27
	"istio.io/istio/pkg/kube"
28
	"istio.io/istio/pkg/log"
29
	"istio.io/istio/pkg/util/sets"
30
	"istio.io/istio/tools/bug-report/pkg/common"
31
)
32

33
const (
34
	// The default number of in-flight requests allowed for the runner.
35
	defaultActiveRequestLimit = 32
36

37
	// reportInterval controls how frequently to output progress reports on running tasks.
38
	reportInterval = 30 * time.Second
39
)
40

41
type Runner struct {
42
	Client kube.CLIClient
43

44
	// Used to limit the number of concurrent tasks.
45
	taskSem chan struct{}
46

47
	// runningTasks tracks the in-flight fetch operations for user feedback.
48
	runningTasks   sets.String
49
	runningTasksMu sync.RWMutex
50

51
	// runningTasksTicker is the report interval for running tasks.
52
	runningTasksTicker *time.Ticker
53
}
54

55
func NewRunner(activeRqLimit int) *Runner {
56
	if activeRqLimit <= 0 {
57
		activeRqLimit = defaultActiveRequestLimit
58
	}
59
	return &Runner{
60
		taskSem:            make(chan struct{}, activeRqLimit),
61
		runningTasks:       sets.New[string](),
62
		runningTasksMu:     sync.RWMutex{},
63
		runningTasksTicker: time.NewTicker(reportInterval),
64
	}
65
}
66

67
func (r *Runner) SetClient(client kube.CLIClient) {
68
	r.Client = client
69
}
70

71
func (r *Runner) ReportRunningTasks() {
72
	go func() {
73
		time.Sleep(reportInterval)
74
		for range r.runningTasksTicker.C {
75
			r.printRunningTasks()
76
		}
77
	}()
78
}
79

80
// Options contains the Run options.
81
type Options struct {
82
	// Path to the kubeconfig file.
83
	Kubeconfig string
84
	// ComponentName of the kubeconfig context to use.
85
	Context string
86

87
	// namespace - k8s namespace for Run command
88
	Namespace string
89

90
	// DryRun performs all steps but only logs the Run command without running it.
91
	DryRun bool
92
	// Maximum amount of time to wait for resources to be ready after install when Wait=true.
93
	WaitTimeout time.Duration
94

95
	// output - output mode for Run i.e. --output.
96
	Output string
97

98
	// extraArgs - more args to be added to the Run command, which are appended to
99
	// the end of the Run command.
100
	ExtraArgs []string
101
}
102

103
// Logs returns the logs for the given namespace/pod/container.
104
func (r *Runner) Logs(namespace, pod, container string, previous, dryRun bool) (string, error) {
105
	if dryRun {
106
		return fmt.Sprintf("Dry run: would be running client.PodLogs(%s, %s, %s)", pod, namespace, container), nil
107
	}
108
	// ignore cancellation errors since this is subject to global timeout.
109
	task := fmt.Sprintf("PodLogs %s/%s/%s", namespace, pod, container)
110
	r.addRunningTask(task)
111
	defer r.removeRunningTask(task)
112
	return r.Client.PodLogs(context.TODO(), pod, namespace, container, previous)
113
}
114

115
// EnvoyGet sends a GET request for the URL in the Envoy container in the given namespace/pod and returns the result.
116
func (r *Runner) EnvoyGet(namespace, pod, url string, dryRun bool) (string, error) {
117
	if dryRun {
118
		return fmt.Sprintf("Dry run: would be running client.EnvoyDo(%s, %s, %s)", pod, namespace, url), nil
119
	}
120
	task := fmt.Sprintf("ProxyGet %s/%s:%s", namespace, pod, url)
121
	r.addRunningTask(task)
122
	defer r.removeRunningTask(task)
123
	out, err := r.Client.EnvoyDo(context.TODO(), pod, namespace, "GET", url)
124
	return string(out), err
125
}
126

127
// Cat runs the cat command for the given path in the given namespace/pod/container.
128
func (r *Runner) Cat(namespace, pod, container, path string, dryRun bool) (string, error) {
129
	cmdStr := "cat " + path
130
	if dryRun {
131
		return fmt.Sprintf("Dry run: would be running podExec %s/%s/%s:%s", pod, namespace, container, cmdStr), nil
132
	}
133
	task := fmt.Sprintf("PodExec %s/%s/%s:%s", namespace, pod, container, cmdStr)
134
	r.addRunningTask(task)
135
	defer r.removeRunningTask(task)
136
	stdout, stderr, err := r.Client.PodExec(pod, namespace, container, cmdStr)
137
	if err != nil {
138
		return "", fmt.Errorf("podExec error: %s\n\nstderr:\n%s\n\nstdout:\n%s",
139
			err, util.ConsolidateLog(stderr), stdout)
140
	}
141
	return stdout, nil
142
}
143

144
// Exec runs exec for the given command in the given namespace/pod/container.
145
func (r *Runner) Exec(namespace, pod, container, cmdStr string, dryRun bool) (string, error) {
146
	if dryRun {
147
		return fmt.Sprintf("Dry run: would be running podExec %s/%s/%s:%s", pod, namespace, container, cmdStr), nil
148
	}
149
	task := fmt.Sprintf("PodExec %s/%s/%s:%s", namespace, pod, container, cmdStr)
150
	r.addRunningTask(task)
151
	defer r.removeRunningTask(task)
152
	stdout, stderr, err := r.Client.PodExec(pod, namespace, container, cmdStr)
153
	if err != nil {
154
		return "", fmt.Errorf("podExec error: %s\n\nstderr:\n%s\n\nstdout:\n%s",
155
			err, util.ConsolidateLog(stderr), stdout)
156
	}
157
	return stdout, nil
158
}
159

160
// RunCmd runs the given command in kubectl, adding -n namespace if namespace is not empty.
161
func (r *Runner) RunCmd(command, namespace, kubeConfig, kubeContext string, dryRun bool) (string, error) {
162
	return r.Run(strings.Split(command, " "),
163
		&Options{
164
			Namespace:  namespace,
165
			DryRun:     dryRun,
166
			Kubeconfig: kubeConfig,
167
			Context:    kubeContext,
168
		})
169
}
170

171
// Run runs the kubectl command by specifying subcommands in subcmds with opts.
172
func (r *Runner) Run(subcmds []string, opts *Options) (string, error) {
173
	args := subcmds
174
	if opts.Kubeconfig != "" {
175
		args = append(args, "--kubeconfig", opts.Kubeconfig)
176
	}
177
	if opts.Context != "" {
178
		args = append(args, "--context", opts.Context)
179
	}
180
	if opts.Namespace != "" {
181
		args = append(args, "-n", opts.Namespace)
182
	}
183
	if opts.Output != "" {
184
		args = append(args, "-o", opts.Output)
185
	}
186
	args = append(args, opts.ExtraArgs...)
187

188
	cmd := exec.Command("kubectl", args...)
189
	var stdout, stderr bytes.Buffer
190
	cmd.Stdout = &stdout
191
	cmd.Stderr = &stderr
192

193
	cmdStr := strings.Join(args, " ")
194

195
	if opts.DryRun {
196
		log.Infof("dry run mode: would be running this cmd:\nkubectl %s\n", cmdStr)
197
		return "", nil
198
	}
199

200
	task := fmt.Sprintf("kubectl %s", cmdStr)
201
	r.addRunningTask(task)
202
	defer r.removeRunningTask(task)
203
	if err := cmd.Run(); err != nil {
204
		return "", fmt.Errorf("kubectl error: %s\n\nstderr:\n%s\n\nstdout:\n%s",
205
			err, util.ConsolidateLog(stderr.String()), stdout.String())
206
	}
207

208
	return stdout.String(), nil
209
}
210

211
func (r *Runner) printRunningTasks() {
212
	r.runningTasksMu.RLock()
213
	defer r.runningTasksMu.RUnlock()
214
	if r.runningTasks.IsEmpty() {
215
		return
216
	}
217
	common.LogAndPrintf("The following fetches are still running: \n")
218
	for t := range r.runningTasks {
219
		common.LogAndPrintf("  %s\n", t)
220
	}
221
	common.LogAndPrintf("\n")
222
}
223

224
func (r *Runner) addRunningTask(task string) {
225
	// Limit the concurrency of running tasks.
226
	r.taskSem <- struct{}{}
227

228
	r.runningTasksMu.Lock()
229
	defer r.runningTasksMu.Unlock()
230
	log.Infof("STARTING %s", task)
231
	r.runningTasks.Insert(task)
232
}
233

234
func (r *Runner) removeRunningTask(task string) {
235
	defer func() {
236
		// Free up a slot for another running task.
237
		<-r.taskSem
238
	}()
239

240
	r.runningTasksMu.Lock()
241
	defer r.runningTasksMu.Unlock()
242
	log.Infof("COMPLETED %s", task)
243
	r.runningTasks.Delete(task)
244
}
245

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.