podman
570 строк · 14.2 Кб
1package containers
2
3import (
4"bytes"
5"context"
6"encoding/binary"
7"errors"
8"fmt"
9"io"
10"net"
11"net/http"
12"net/url"
13"os"
14"reflect"
15"strconv"
16"time"
17
18"github.com/containers/common/pkg/detach"
19"github.com/containers/podman/v5/libpod/define"
20"github.com/containers/podman/v5/pkg/bindings"
21"github.com/moby/term"
22"github.com/sirupsen/logrus"
23terminal "golang.org/x/term"
24)
25
26// The CloseWriter interface is used to determine whether we can do a one-sided
27// close of a hijacked connection.
28type CloseWriter interface {
29CloseWrite() error
30}
31
32// Attach attaches to a running container
33func Attach(ctx context.Context, nameOrID string, stdin io.Reader, stdout io.Writer, stderr io.Writer, attachReady chan bool, options *AttachOptions) error {
34if options == nil {
35options = new(AttachOptions)
36}
37isSet := struct {
38stdin bool
39stdout bool
40stderr bool
41}{
42stdin: !(stdin == nil || reflect.ValueOf(stdin).IsNil()),
43stdout: !(stdout == nil || reflect.ValueOf(stdout).IsNil()),
44stderr: !(stderr == nil || reflect.ValueOf(stderr).IsNil()),
45}
46// Ensure golang can determine that interfaces are "really" nil
47if !isSet.stdin {
48stdin = (io.Reader)(nil)
49}
50if !isSet.stdout {
51stdout = (io.Writer)(nil)
52}
53if !isSet.stderr {
54stderr = (io.Writer)(nil)
55}
56
57conn, err := bindings.GetClient(ctx)
58if err != nil {
59return err
60}
61
62// Do we need to wire in stdin?
63ctnr, err := Inspect(ctx, nameOrID, new(InspectOptions).WithSize(false))
64if err != nil {
65return err
66}
67
68params, err := options.ToParams()
69if err != nil {
70return err
71}
72detachKeysInBytes := []byte{}
73if options.Changed("DetachKeys") {
74params.Add("detachKeys", options.GetDetachKeys())
75
76detachKeysInBytes, err = term.ToBytes(options.GetDetachKeys())
77if err != nil {
78return fmt.Errorf("invalid detach keys: %w", err)
79}
80}
81if isSet.stdin {
82params.Add("stdin", "true")
83}
84if isSet.stdout {
85params.Add("stdout", "true")
86}
87if isSet.stderr {
88params.Add("stderr", "true")
89}
90
91// Unless all requirements are met, don't use "stdin" is a terminal
92file, ok := stdin.(*os.File)
93outFile, outOk := stdout.(*os.File)
94needTTY := ok && outOk && terminal.IsTerminal(int(file.Fd())) && ctnr.Config.Tty
95if needTTY {
96state, err := setRawTerminal(file)
97if err != nil {
98return err
99}
100defer func() {
101if err := terminal.Restore(int(file.Fd()), state); err != nil {
102logrus.Errorf("Unable to restore terminal: %q", err)
103}
104logrus.SetFormatter(&logrus.TextFormatter{})
105}()
106}
107
108headers := make(http.Header)
109headers.Add("Connection", "Upgrade")
110headers.Add("Upgrade", "tcp")
111
112var socket net.Conn
113socketSet := false
114dialContext := conn.Client.Transport.(*http.Transport).DialContext
115t := &http.Transport{
116DialContext: func(ctx context.Context, network, address string) (net.Conn, error) {
117c, err := dialContext(ctx, network, address)
118if err != nil {
119return nil, err
120}
121if !socketSet {
122socket = c
123socketSet = true
124}
125return c, err
126},
127IdleConnTimeout: time.Duration(0),
128}
129conn.Client.Transport = t
130response, err := conn.DoRequest(ctx, nil, http.MethodPost, "/containers/%s/attach", params, headers, nameOrID)
131if err != nil {
132return err
133}
134
135if !(response.IsSuccess() || response.IsInformational()) {
136defer response.Body.Close()
137return response.Process(nil)
138}
139
140if needTTY {
141winChange := make(chan os.Signal, 1)
142winCtx, winCancel := context.WithCancel(ctx)
143defer winCancel()
144notifyWinChange(winCtx, winChange, file, outFile)
145attachHandleResize(ctx, winCtx, winChange, false, nameOrID, file, outFile)
146}
147
148// If we are attaching around a start, we need to "signal"
149// back that we are in fact attached so that started does
150// not execute before we can attach.
151if attachReady != nil {
152attachReady <- true
153}
154
155stdoutChan := make(chan error)
156stdinChan := make(chan error, 1) // stdin channel should not block
157
158if isSet.stdin {
159go func() {
160logrus.Debugf("Copying STDIN to socket")
161
162_, err := detach.Copy(socket, stdin, detachKeysInBytes)
163if err != nil && err != define.ErrDetach {
164logrus.Errorf("Failed to write input to service: %v", err)
165}
166if err == nil {
167if closeWrite, ok := socket.(CloseWriter); ok {
168if err := closeWrite.CloseWrite(); err != nil {
169logrus.Warnf("Failed to close STDIN for writing: %v", err)
170}
171}
172}
173stdinChan <- err
174}()
175}
176
177buffer := make([]byte, 1024)
178if ctnr.Config.Tty {
179go func() {
180logrus.Debugf("Copying STDOUT of container in terminal mode")
181
182if !isSet.stdout {
183stdoutChan <- fmt.Errorf("container %q requires stdout to be set", ctnr.ID)
184}
185// If not multiplex'ed, read from server and write to stdout
186_, err := io.Copy(stdout, socket)
187
188stdoutChan <- err
189}()
190
191for {
192select {
193case err := <-stdoutChan:
194if err != nil {
195return err
196}
197
198return nil
199case err := <-stdinChan:
200if err != nil {
201return err
202}
203
204return nil
205}
206}
207} else {
208logrus.Debugf("Copying standard streams of container %q in non-terminal mode", ctnr.ID)
209for {
210// Read multiplexed channels and write to appropriate stream
211fd, l, err := DemuxHeader(socket, buffer)
212if err != nil {
213if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) {
214return nil
215}
216return err
217}
218frame, err := DemuxFrame(socket, buffer, l)
219if err != nil {
220return err
221}
222
223switch {
224case fd == 0:
225if isSet.stdout {
226if _, err := stdout.Write(frame[0:l]); err != nil {
227return err
228}
229}
230case fd == 1:
231if isSet.stdout {
232if _, err := stdout.Write(frame[0:l]); err != nil {
233return err
234}
235}
236case fd == 2:
237if isSet.stderr {
238if _, err := stderr.Write(frame[0:l]); err != nil {
239return err
240}
241}
242case fd == 3:
243return fmt.Errorf("from service from stream: %s", frame)
244default:
245return fmt.Errorf("unrecognized channel '%d' in header, 0-3 supported", fd)
246}
247}
248}
249}
250
251// DemuxHeader reads header for stream from server multiplexed stdin/stdout/stderr/2nd error channel
252func DemuxHeader(r io.Reader, buffer []byte) (fd, sz int, err error) {
253n, err := io.ReadFull(r, buffer[0:8])
254if err != nil {
255return
256}
257if n < 8 {
258err = io.ErrUnexpectedEOF
259return
260}
261
262fd = int(buffer[0])
263if fd < 0 || fd > 3 {
264err = fmt.Errorf(`channel "%d" found, 0-3 supported: %w`, fd, ErrLostSync)
265return
266}
267
268sz = int(binary.BigEndian.Uint32(buffer[4:8]))
269return
270}
271
272// DemuxFrame reads contents for frame from server multiplexed stdin/stdout/stderr/2nd error channel
273func DemuxFrame(r io.Reader, buffer []byte, length int) (frame []byte, err error) {
274if len(buffer) < length {
275buffer = append(buffer, make([]byte, length-len(buffer)+1)...)
276}
277
278n, err := io.ReadFull(r, buffer[0:length])
279if err != nil {
280return nil, err
281}
282if n < length {
283err = io.ErrUnexpectedEOF
284return
285}
286
287return buffer[0:length], nil
288}
289
290// ResizeContainerTTY sets container's TTY height and width in characters
291func ResizeContainerTTY(ctx context.Context, nameOrID string, options *ResizeTTYOptions) error {
292if options == nil {
293options = new(ResizeTTYOptions)
294}
295return resizeTTY(ctx, bindings.JoinURL("containers", nameOrID, "resize"), options.Height, options.Width)
296}
297
298// ResizeExecTTY sets session's TTY height and width in characters
299func ResizeExecTTY(ctx context.Context, sessionID string, options *ResizeExecTTYOptions) error {
300if options == nil {
301options = new(ResizeExecTTYOptions)
302}
303return resizeTTY(ctx, bindings.JoinURL("exec", sessionID, "resize"), options.Height, options.Width)
304}
305
306// resizeTTY set size of TTY of container
307func resizeTTY(ctx context.Context, endpoint string, height *int, width *int) error {
308conn, err := bindings.GetClient(ctx)
309if err != nil {
310return err
311}
312
313params := url.Values{}
314if height != nil {
315params.Set("h", strconv.Itoa(*height))
316}
317if width != nil {
318params.Set("w", strconv.Itoa(*width))
319}
320params.Set("running", "true")
321rsp, err := conn.DoRequest(ctx, nil, http.MethodPost, endpoint, params, nil)
322if err != nil {
323return err
324}
325defer rsp.Body.Close()
326
327return rsp.Process(nil)
328}
329
330type rawFormatter struct {
331logrus.TextFormatter
332}
333
334func (f *rawFormatter) Format(entry *logrus.Entry) ([]byte, error) {
335buffer, err := f.TextFormatter.Format(entry)
336if err != nil {
337return buffer, err
338}
339return append(buffer, '\r'), nil
340}
341
342// This is intended to not be run as a goroutine, handling resizing for a container
343// or exec session. It will call resize once and then starts a goroutine which calls resize on winChange
344func attachHandleResize(ctx, winCtx context.Context, winChange chan os.Signal, isExec bool, id string, file *os.File, outFile *os.File) {
345resize := func() {
346w, h, err := getTermSize(file, outFile)
347if err != nil {
348logrus.Warnf("Failed to obtain TTY size: %v", err)
349}
350
351var resizeErr error
352if isExec {
353resizeErr = ResizeExecTTY(ctx, id, new(ResizeExecTTYOptions).WithHeight(h).WithWidth(w))
354} else {
355resizeErr = ResizeContainerTTY(ctx, id, new(ResizeTTYOptions).WithHeight(h).WithWidth(w))
356}
357if resizeErr != nil {
358logrus.Debugf("Failed to resize TTY: %v", resizeErr)
359}
360}
361
362resize()
363
364go func() {
365for {
366select {
367case <-winCtx.Done():
368return
369case <-winChange:
370resize()
371}
372}
373}()
374}
375
376// Configure the given terminal for raw mode
377func setRawTerminal(file *os.File) (*terminal.State, error) {
378state, err := makeRawTerm(file)
379if err != nil {
380return nil, err
381}
382
383logrus.SetFormatter(&rawFormatter{})
384
385return state, err
386}
387
388// ExecStartAndAttach starts and attaches to a given exec session.
389func ExecStartAndAttach(ctx context.Context, sessionID string, options *ExecStartAndAttachOptions) error {
390if options == nil {
391options = new(ExecStartAndAttachOptions)
392}
393conn, err := bindings.GetClient(ctx)
394if err != nil {
395return err
396}
397
398// TODO: Make this configurable (can't use streams' InputStream as it's
399// buffered)
400terminalFile := os.Stdin
401terminalOutFile := os.Stdout
402
403logrus.Debugf("Starting & Attaching to exec session ID %q", sessionID)
404
405// We need to inspect the exec session first to determine whether to use
406// -t.
407resp, err := conn.DoRequest(ctx, nil, http.MethodGet, "/exec/%s/json", nil, nil, sessionID)
408if err != nil {
409return err
410}
411defer resp.Body.Close()
412
413respStruct := new(define.InspectExecSession)
414if err := resp.Process(respStruct); err != nil {
415return err
416}
417isTerm := true
418if respStruct.ProcessConfig != nil {
419isTerm = respStruct.ProcessConfig.Tty
420}
421
422// If we are in TTY mode, we need to set raw mode for the terminal.
423// TODO: Share all of this with Attach() for containers.
424needTTY := terminalFile != nil && terminal.IsTerminal(int(terminalFile.Fd())) && isTerm
425
426body := struct {
427Detach bool `json:"Detach"`
428TTY bool `json:"Tty"`
429Height uint16 `json:"h"`
430Width uint16 `json:"w"`
431}{
432Detach: false,
433TTY: needTTY,
434}
435
436if needTTY {
437state, err := setRawTerminal(terminalFile)
438if err != nil {
439return err
440}
441defer func() {
442if err := terminal.Restore(int(terminalFile.Fd()), state); err != nil {
443logrus.Errorf("Unable to restore terminal: %q", err)
444}
445logrus.SetFormatter(&logrus.TextFormatter{})
446}()
447w, h, err := getTermSize(terminalFile, terminalOutFile)
448if err != nil {
449logrus.Warnf("Failed to obtain TTY size: %v", err)
450}
451body.Width = uint16(w)
452body.Height = uint16(h)
453}
454
455bodyJSON, err := json.Marshal(body)
456if err != nil {
457return err
458}
459
460var socket net.Conn
461socketSet := false
462dialContext := conn.Client.Transport.(*http.Transport).DialContext
463t := &http.Transport{
464DialContext: func(ctx context.Context, network, address string) (net.Conn, error) {
465c, err := dialContext(ctx, network, address)
466if err != nil {
467return nil, err
468}
469if !socketSet {
470socket = c
471socketSet = true
472}
473return c, err
474},
475IdleConnTimeout: time.Duration(0),
476}
477conn.Client.Transport = t
478response, err := conn.DoRequest(ctx, bytes.NewReader(bodyJSON), http.MethodPost, "/exec/%s/start", nil, nil, sessionID)
479if err != nil {
480return err
481}
482defer response.Body.Close()
483
484if !(response.IsSuccess() || response.IsInformational()) {
485return response.Process(nil)
486}
487
488if needTTY {
489winChange := make(chan os.Signal, 1)
490winCtx, winCancel := context.WithCancel(ctx)
491defer winCancel()
492
493notifyWinChange(winCtx, winChange, terminalFile, terminalOutFile)
494attachHandleResize(ctx, winCtx, winChange, true, sessionID, terminalFile, terminalOutFile)
495}
496
497if options.GetAttachInput() {
498go func() {
499logrus.Debugf("Copying STDIN to socket")
500_, err := detach.Copy(socket, options.InputStream, []byte{})
501if err != nil {
502logrus.Errorf("Failed to write input to service: %v", err)
503}
504
505if closeWrite, ok := socket.(CloseWriter); ok {
506logrus.Debugf("Closing STDIN")
507if err := closeWrite.CloseWrite(); err != nil {
508logrus.Warnf("Failed to close STDIN for writing: %v", err)
509}
510}
511}()
512}
513
514buffer := make([]byte, 1024)
515if isTerm {
516logrus.Debugf("Handling terminal attach to exec")
517if !options.GetAttachOutput() {
518return fmt.Errorf("exec session %s has a terminal and must have STDOUT enabled", sessionID)
519}
520// If not multiplex'ed, read from server and write to stdout
521_, err := detach.Copy(options.GetOutputStream(), socket, []byte{})
522if err != nil {
523return err
524}
525} else {
526logrus.Debugf("Handling non-terminal attach to exec")
527for {
528// Read multiplexed channels and write to appropriate stream
529fd, l, err := DemuxHeader(socket, buffer)
530if err != nil {
531if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) {
532return nil
533}
534return err
535}
536frame, err := DemuxFrame(socket, buffer, l)
537if err != nil {
538return err
539}
540
541switch {
542case fd == 0:
543if options.GetAttachInput() {
544// Write STDIN to STDOUT (echoing characters
545// typed by another attach session)
546if _, err := options.GetOutputStream().Write(frame[0:l]); err != nil {
547return err
548}
549}
550case fd == 1:
551if options.GetAttachOutput() {
552if _, err := options.GetOutputStream().Write(frame[0:l]); err != nil {
553return err
554}
555}
556case fd == 2:
557if options.GetAttachError() {
558if _, err := options.GetErrorStream().Write(frame[0:l]); err != nil {
559return err
560}
561}
562case fd == 3:
563return fmt.Errorf("from service from stream: %s", frame)
564default:
565return fmt.Errorf("unrecognized channel '%d' in header, 0-3 supported", fd)
566}
567}
568}
569return nil
570}
571