podman
254 строки · 6.7 Кб
1//go:build !windows
2
3package notifyproxy
4
5import (
6"context"
7"errors"
8"fmt"
9"io"
10"net"
11"os"
12"strings"
13"syscall"
14"time"
15
16"github.com/containers/podman/v5/libpod/define"
17"github.com/coreos/go-systemd/v22/daemon"
18"github.com/sirupsen/logrus"
19"golang.org/x/sys/unix"
20)
21
22const (
23// All constants below are defined by systemd.
24_notifyRcvbufSize = 8 * 1024 * 1024
25_notifyBufferMax = 4096
26_notifyFdMax = 768
27_notifyBarrierMsg = "BARRIER=1"
28_notifyRdyMsg = daemon.SdNotifyReady
29)
30
31// SendMessage sends the specified message to the specified socket.
32// No message is sent if no socketPath is provided and the NOTIFY_SOCKET
33// variable is not set either.
34func SendMessage(socketPath string, message string) error {
35if socketPath == "" {
36socketPath, _ = os.LookupEnv("NOTIFY_SOCKET")
37if socketPath == "" {
38return nil
39}
40}
41socketAddr := &net.UnixAddr{
42Name: socketPath,
43Net: "unixgram",
44}
45conn, err := net.DialUnix(socketAddr.Net, nil, socketAddr)
46if err != nil {
47return err
48}
49defer conn.Close()
50
51_, err = conn.Write([]byte(message))
52return err
53}
54
55// NotifyProxy can be used to proxy notify messages.
56type NotifyProxy struct {
57connection *net.UnixConn
58socketPath string
59container Container // optional
60
61// Channels for synchronizing the goroutine waiting for the READY
62// message and the one checking if the optional container is still
63// running.
64errorChan chan error
65readyChan chan bool
66}
67
68// New creates a NotifyProxy that starts listening immediately. The specified
69// temp directory can be left empty.
70func New(tmpDir string) (*NotifyProxy, error) {
71tempFile, err := os.CreateTemp(tmpDir, "-podman-notify-proxy.sock")
72if err != nil {
73return nil, err
74}
75defer tempFile.Close()
76
77socketPath := tempFile.Name()
78if err := syscall.Unlink(socketPath); err != nil { // Unlink the socket so we can bind it
79return nil, err
80}
81
82socketAddr := &net.UnixAddr{
83Name: socketPath,
84Net: "unixgram",
85}
86conn, err := net.ListenUnixgram(socketAddr.Net, socketAddr)
87if err != nil {
88return nil, err
89}
90
91if err := conn.SetReadBuffer(_notifyRcvbufSize); err != nil {
92return nil, fmt.Errorf("setting read buffer: %w", err)
93}
94
95errorChan := make(chan error, 1)
96readyChan := make(chan bool, 1)
97
98proxy := &NotifyProxy{
99connection: conn,
100socketPath: socketPath,
101errorChan: errorChan,
102readyChan: readyChan,
103}
104
105// Start waiting for the READY message in the background. This way,
106// the proxy can be created prior to starting the container and
107// circumvents a race condition on writing/reading on the socket.
108proxy.listen()
109
110return proxy, nil
111}
112
113// listen waits for the READY message in the background, and process file
114// descriptors and barriers send over the NOTIFY_SOCKET. The goroutine returns
115// when the socket is closed.
116func (p *NotifyProxy) listen() {
117go func() {
118// See https://github.com/containers/podman/issues/16515 for a description of the protocol.
119fdSize := unix.CmsgSpace(4)
120buffer := make([]byte, _notifyBufferMax)
121oob := make([]byte, _notifyFdMax*fdSize)
122sBuilder := strings.Builder{}
123for {
124n, oobn, flags, _, err := p.connection.ReadMsgUnix(buffer, oob)
125if err != nil {
126if !errors.Is(err, io.EOF) {
127p.errorChan <- err
128return
129}
130logrus.Errorf("Error reading unix message on socket %q: %v", p.socketPath, err)
131continue
132}
133
134if n > _notifyBufferMax || oobn > _notifyFdMax*fdSize {
135logrus.Errorf("Ignoring unix message on socket %q: incorrect number of bytes read (n=%d, oobn=%d)", p.socketPath, n, oobn)
136continue
137}
138
139if flags&unix.MSG_CTRUNC != 0 {
140logrus.Errorf("Ignoring unix message on socket %q: message truncated", p.socketPath)
141continue
142}
143
144sBuilder.Reset()
145sBuilder.Write(buffer[:n])
146var isBarrier, isReady bool
147
148for _, line := range strings.Split(sBuilder.String(), "\n") {
149switch line {
150case _notifyRdyMsg:
151isReady = true
152case _notifyBarrierMsg:
153isBarrier = true
154}
155}
156
157if isBarrier {
158scms, err := unix.ParseSocketControlMessage(oob)
159if err != nil {
160logrus.Errorf("parsing control message on socket %q: %v", p.socketPath, err)
161}
162for _, scm := range scms {
163fds, err := unix.ParseUnixRights(&scm)
164if err != nil {
165logrus.Errorf("parsing unix rights of control message on socket %q: %v", p.socketPath, err)
166continue
167}
168for _, fd := range fds {
169if err := unix.Close(fd); err != nil {
170logrus.Errorf("closing fd passed on socket %q: %v", fd, err)
171continue
172}
173}
174}
175continue
176}
177
178if isReady {
179p.readyChan <- true
180}
181}
182}()
183}
184
185// SocketPath returns the path of the socket the proxy is listening on.
186func (p *NotifyProxy) SocketPath() string {
187return p.socketPath
188}
189
190// Close closes the listener and removes the socket.
191func (p *NotifyProxy) Close() error {
192defer os.Remove(p.socketPath)
193return p.connection.Close()
194}
195
196// AddContainer associates a container with the proxy.
197func (p *NotifyProxy) AddContainer(container Container) {
198p.container = container
199}
200
201// ErrNoReadyMessage is returned when we are waiting for the READY message of a
202// container that is not in the running state anymore.
203var ErrNoReadyMessage = errors.New("container stopped running before READY message was received")
204
205// Container avoids a circular dependency among this package and libpod.
206type Container interface {
207State() (define.ContainerStatus, error)
208ID() string
209}
210
211// Wait waits until receiving the `READY` notify message. Note that the
212// this function must only be executed inside a systemd service which will kill
213// the process after a given timeout. If the (optional) container stopped
214// running before the `READY` is received, the waiting gets canceled and
215// ErrNoReadyMessage is returned.
216func (p *NotifyProxy) Wait() error {
217// If the proxy has a container we need to watch it as it may exit
218// without sending a READY message. The goroutine below returns when
219// the container exits OR when the function returns (see deferred the
220// cancel()) in which case we either we've either received the READY
221// message or encountered an error reading from the socket.
222if p.container != nil {
223// Create a cancellable context to make sure the goroutine
224// below terminates on function return.
225ctx, cancel := context.WithCancel(context.Background())
226defer cancel()
227go func() {
228for {
229select {
230case <-ctx.Done():
231return
232case <-time.After(time.Second):
233state, err := p.container.State()
234if err != nil {
235p.errorChan <- err
236return
237}
238if state != define.ContainerStateRunning {
239p.errorChan <- fmt.Errorf("%w: %s", ErrNoReadyMessage, p.container.ID())
240return
241}
242}
243}
244}()
245}
246
247// Wait for the ready/error channel.
248select {
249case <-p.readyChan:
250return nil
251case err := <-p.errorChan:
252return err
253}
254}
255