istio
244 строки · 7.2 Кб
1// Copyright Istio Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package kubectlcmd
16
17import (
18"bytes"
19"context"
20"fmt"
21"os/exec"
22"strings"
23"sync"
24"time"
25
26"istio.io/istio/operator/pkg/util"
27"istio.io/istio/pkg/kube"
28"istio.io/istio/pkg/log"
29"istio.io/istio/pkg/util/sets"
30"istio.io/istio/tools/bug-report/pkg/common"
31)
32
33const (
34// The default number of in-flight requests allowed for the runner.
35defaultActiveRequestLimit = 32
36
37// reportInterval controls how frequently to output progress reports on running tasks.
38reportInterval = 30 * time.Second
39)
40
41type Runner struct {
42Client kube.CLIClient
43
44// Used to limit the number of concurrent tasks.
45taskSem chan struct{}
46
47// runningTasks tracks the in-flight fetch operations for user feedback.
48runningTasks sets.String
49runningTasksMu sync.RWMutex
50
51// runningTasksTicker is the report interval for running tasks.
52runningTasksTicker *time.Ticker
53}
54
55func NewRunner(activeRqLimit int) *Runner {
56if activeRqLimit <= 0 {
57activeRqLimit = defaultActiveRequestLimit
58}
59return &Runner{
60taskSem: make(chan struct{}, activeRqLimit),
61runningTasks: sets.New[string](),
62runningTasksMu: sync.RWMutex{},
63runningTasksTicker: time.NewTicker(reportInterval),
64}
65}
66
67func (r *Runner) SetClient(client kube.CLIClient) {
68r.Client = client
69}
70
71func (r *Runner) ReportRunningTasks() {
72go func() {
73time.Sleep(reportInterval)
74for range r.runningTasksTicker.C {
75r.printRunningTasks()
76}
77}()
78}
79
80// Options contains the Run options.
81type Options struct {
82// Path to the kubeconfig file.
83Kubeconfig string
84// ComponentName of the kubeconfig context to use.
85Context string
86
87// namespace - k8s namespace for Run command
88Namespace string
89
90// DryRun performs all steps but only logs the Run command without running it.
91DryRun bool
92// Maximum amount of time to wait for resources to be ready after install when Wait=true.
93WaitTimeout time.Duration
94
95// output - output mode for Run i.e. --output.
96Output string
97
98// extraArgs - more args to be added to the Run command, which are appended to
99// the end of the Run command.
100ExtraArgs []string
101}
102
103// Logs returns the logs for the given namespace/pod/container.
104func (r *Runner) Logs(namespace, pod, container string, previous, dryRun bool) (string, error) {
105if dryRun {
106return fmt.Sprintf("Dry run: would be running client.PodLogs(%s, %s, %s)", pod, namespace, container), nil
107}
108// ignore cancellation errors since this is subject to global timeout.
109task := fmt.Sprintf("PodLogs %s/%s/%s", namespace, pod, container)
110r.addRunningTask(task)
111defer r.removeRunningTask(task)
112return r.Client.PodLogs(context.TODO(), pod, namespace, container, previous)
113}
114
115// EnvoyGet sends a GET request for the URL in the Envoy container in the given namespace/pod and returns the result.
116func (r *Runner) EnvoyGet(namespace, pod, url string, dryRun bool) (string, error) {
117if dryRun {
118return fmt.Sprintf("Dry run: would be running client.EnvoyDo(%s, %s, %s)", pod, namespace, url), nil
119}
120task := fmt.Sprintf("ProxyGet %s/%s:%s", namespace, pod, url)
121r.addRunningTask(task)
122defer r.removeRunningTask(task)
123out, err := r.Client.EnvoyDo(context.TODO(), pod, namespace, "GET", url)
124return string(out), err
125}
126
127// Cat runs the cat command for the given path in the given namespace/pod/container.
128func (r *Runner) Cat(namespace, pod, container, path string, dryRun bool) (string, error) {
129cmdStr := "cat " + path
130if dryRun {
131return fmt.Sprintf("Dry run: would be running podExec %s/%s/%s:%s", pod, namespace, container, cmdStr), nil
132}
133task := fmt.Sprintf("PodExec %s/%s/%s:%s", namespace, pod, container, cmdStr)
134r.addRunningTask(task)
135defer r.removeRunningTask(task)
136stdout, stderr, err := r.Client.PodExec(pod, namespace, container, cmdStr)
137if err != nil {
138return "", fmt.Errorf("podExec error: %s\n\nstderr:\n%s\n\nstdout:\n%s",
139err, util.ConsolidateLog(stderr), stdout)
140}
141return stdout, nil
142}
143
144// Exec runs exec for the given command in the given namespace/pod/container.
145func (r *Runner) Exec(namespace, pod, container, cmdStr string, dryRun bool) (string, error) {
146if dryRun {
147return fmt.Sprintf("Dry run: would be running podExec %s/%s/%s:%s", pod, namespace, container, cmdStr), nil
148}
149task := fmt.Sprintf("PodExec %s/%s/%s:%s", namespace, pod, container, cmdStr)
150r.addRunningTask(task)
151defer r.removeRunningTask(task)
152stdout, stderr, err := r.Client.PodExec(pod, namespace, container, cmdStr)
153if err != nil {
154return "", fmt.Errorf("podExec error: %s\n\nstderr:\n%s\n\nstdout:\n%s",
155err, util.ConsolidateLog(stderr), stdout)
156}
157return stdout, nil
158}
159
160// RunCmd runs the given command in kubectl, adding -n namespace if namespace is not empty.
161func (r *Runner) RunCmd(command, namespace, kubeConfig, kubeContext string, dryRun bool) (string, error) {
162return r.Run(strings.Split(command, " "),
163&Options{
164Namespace: namespace,
165DryRun: dryRun,
166Kubeconfig: kubeConfig,
167Context: kubeContext,
168})
169}
170
171// Run runs the kubectl command by specifying subcommands in subcmds with opts.
172func (r *Runner) Run(subcmds []string, opts *Options) (string, error) {
173args := subcmds
174if opts.Kubeconfig != "" {
175args = append(args, "--kubeconfig", opts.Kubeconfig)
176}
177if opts.Context != "" {
178args = append(args, "--context", opts.Context)
179}
180if opts.Namespace != "" {
181args = append(args, "-n", opts.Namespace)
182}
183if opts.Output != "" {
184args = append(args, "-o", opts.Output)
185}
186args = append(args, opts.ExtraArgs...)
187
188cmd := exec.Command("kubectl", args...)
189var stdout, stderr bytes.Buffer
190cmd.Stdout = &stdout
191cmd.Stderr = &stderr
192
193cmdStr := strings.Join(args, " ")
194
195if opts.DryRun {
196log.Infof("dry run mode: would be running this cmd:\nkubectl %s\n", cmdStr)
197return "", nil
198}
199
200task := fmt.Sprintf("kubectl %s", cmdStr)
201r.addRunningTask(task)
202defer r.removeRunningTask(task)
203if err := cmd.Run(); err != nil {
204return "", fmt.Errorf("kubectl error: %s\n\nstderr:\n%s\n\nstdout:\n%s",
205err, util.ConsolidateLog(stderr.String()), stdout.String())
206}
207
208return stdout.String(), nil
209}
210
211func (r *Runner) printRunningTasks() {
212r.runningTasksMu.RLock()
213defer r.runningTasksMu.RUnlock()
214if r.runningTasks.IsEmpty() {
215return
216}
217common.LogAndPrintf("The following fetches are still running: \n")
218for t := range r.runningTasks {
219common.LogAndPrintf(" %s\n", t)
220}
221common.LogAndPrintf("\n")
222}
223
224func (r *Runner) addRunningTask(task string) {
225// Limit the concurrency of running tasks.
226r.taskSem <- struct{}{}
227
228r.runningTasksMu.Lock()
229defer r.runningTasksMu.Unlock()
230log.Infof("STARTING %s", task)
231r.runningTasks.Insert(task)
232}
233
234func (r *Runner) removeRunningTask(task string) {
235defer func() {
236// Free up a slot for another running task.
237<-r.taskSem
238}()
239
240r.runningTasksMu.Lock()
241defer r.runningTasksMu.Unlock()
242log.Infof("COMPLETED %s", task)
243r.runningTasks.Delete(task)
244}
245