jdk

Форк
0
/
waitBarrier_generic.cpp 
257 строк · 9.4 Кб
1
/*
2
 * Copyright (c) 2019, 2023, Oracle and/or its affiliates. All rights reserved.
3
 * Copyright Amazon.com Inc. or its affiliates. All Rights Reserved.
4
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5
 *
6
 * This code is free software; you can redistribute it and/or modify it
7
 * under the terms of the GNU General Public License version 2 only, as
8
 * published by the Free Software Foundation.
9
 *
10
 * This code is distributed in the hope that it will be useful, but WITHOUT
11
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
13
 * version 2 for more details (a copy is included in the LICENSE file that
14
 * accompanied this code).
15
 *
16
 * You should have received a copy of the GNU General Public License version
17
 * 2 along with this work; if not, write to the Free Software Foundation,
18
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19
 *
20
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21
 * or visit www.oracle.com if you need additional information or have any
22
 * questions.
23
 *
24
 */
25

26
#include "precompiled.hpp"
27
#include "runtime/atomic.hpp"
28
#include "runtime/orderAccess.hpp"
29
#include "runtime/os.hpp"
30
#include "utilities/waitBarrier_generic.hpp"
31
#include "utilities/spinYield.hpp"
32

33
// Implements the striped semaphore wait barrier.
34
//
35
// To guarantee progress and safety, we need to make sure that new barrier tag
36
// starts with the completely empty set of waiters and free semaphore. This
37
// requires either waiting for all threads to leave wait() for current barrier
38
// tag on disarm(), or waiting for all threads to leave the previous tag before
39
// reusing the semaphore in arm().
40
//
41
// When there are multiple threads, it is normal for some threads to take
42
// significant time to leave the barrier. Waiting for these threads introduces
43
// stalls on barrier reuse.
44
//
45
// If we wait on disarm(), this stall is nearly guaranteed to happen if some threads
46
// are de-scheduled by prior wait(). It would be especially bad if there are more
47
// waiting threads than CPUs: every thread would need to wake up and register itself
48
// as leaving, before we can unblock from disarm().
49
//
50
// If we wait on arm(), we can get lucky that most threads would be able to catch up,
51
// exit wait(), and so we arrive to arm() with semaphore ready for reuse. However,
52
// that is still insufficient in practice.
53
//
54
// Therefore, this implementation goes a step further and implements the _striped_
55
// semaphores. We maintain several semaphores in cells. The barrier tags are assigned
56
// to cells in some simple manner. Most of the current uses have sequential barrier
57
// tags, so simple modulo works well. We then operate on a cell like we would operate
58
// on a single semaphore: we wait at arm() for all threads to catch up before reusing
59
// the cell. For the cost of maintaining just a few cells, we have enough window for
60
// threads to catch up.
61
//
62
// The correctness is guaranteed by using a single atomic state variable per cell,
63
// with updates always done with CASes:
64
//
65
//   [.......... barrier tag ..........][.......... waiters ..........]
66
//  63                                  31                            0
67
//
68
// Cell starts with zero tag and zero waiters. Arming the cell swings barrier tag from
69
// zero to some tag, while checking that no waiters have appeared. Disarming swings
70
// the barrier tag back from tag to zero. Every waiter registers itself by incrementing
71
// the "waiters", while checking that barrier tag is still the same. Every completing waiter
72
// decrements the "waiters". When all waiters complete, a cell ends up in initial state,
73
// ready to be armed again. This allows accurate tracking of how many signals
74
// to issue and does not race with disarm.
75
//
76
// The implementation uses the strongest (default) barriers for extra safety, even
77
// when not strictly required to do so for correctness. Extra barrier overhead is
78
// dominated by the actual wait/notify latency anyway.
79
//
80

81
void GenericWaitBarrier::arm(int barrier_tag) {
82
  assert(barrier_tag != 0, "Pre arm: Should be arming with armed value");
83
  assert(Atomic::load(&_barrier_tag) == 0,
84
         "Pre arm: Should not be already armed. Tag: %d",
85
         Atomic::load(&_barrier_tag));
86
  Atomic::release_store(&_barrier_tag, barrier_tag);
87

88
  Cell &cell = tag_to_cell(barrier_tag);
89
  cell.arm(barrier_tag);
90

91
  // API specifies arm() must provide a trailing fence.
92
  OrderAccess::fence();
93
}
94

95
void GenericWaitBarrier::disarm() {
96
  int barrier_tag = Atomic::load_acquire(&_barrier_tag);
97
  assert(barrier_tag != 0, "Pre disarm: Should be armed. Tag: %d", barrier_tag);
98
  Atomic::release_store(&_barrier_tag, 0);
99

100
  Cell &cell = tag_to_cell(barrier_tag);
101
  cell.disarm(barrier_tag);
102

103
  // API specifies disarm() must provide a trailing fence.
104
  OrderAccess::fence();
105
}
106

107
void GenericWaitBarrier::wait(int barrier_tag) {
108
  assert(barrier_tag != 0, "Pre wait: Should be waiting on armed value");
109

110
  Cell &cell = tag_to_cell(barrier_tag);
111
  cell.wait(barrier_tag);
112

113
  // API specifies wait() must provide a trailing fence.
114
  OrderAccess::fence();
115
}
116

117
void GenericWaitBarrier::Cell::arm(int32_t requested_tag) {
118
  // Before we continue to arm, we need to make sure that all threads
119
  // have left the previous cell.
120

121
  int64_t state;
122

123
  SpinYield sp;
124
  while (true) {
125
    state = Atomic::load_acquire(&_state);
126
    assert(decode_tag(state) == 0,
127
           "Pre arm: Should not be armed. "
128
           "Tag: " INT32_FORMAT "; Waiters: " INT32_FORMAT,
129
           decode_tag(state), decode_waiters(state));
130
    if (decode_waiters(state) == 0) {
131
      break;
132
    }
133
    sp.wait();
134
  }
135

136
  // Try to swing cell to armed. This should always succeed after the check above.
137
  int64_t new_state = encode(requested_tag, 0);
138
  int64_t prev_state = Atomic::cmpxchg(&_state, state, new_state);
139
  if (prev_state != state) {
140
    fatal("Cannot arm the wait barrier. "
141
          "Tag: " INT32_FORMAT "; Waiters: " INT32_FORMAT,
142
          decode_tag(prev_state), decode_waiters(prev_state));
143
  }
144
}
145

146
int GenericWaitBarrier::Cell::signal_if_needed(int max) {
147
  int signals = 0;
148
  while (true) {
149
    int cur = Atomic::load_acquire(&_outstanding_wakeups);
150
    if (cur == 0) {
151
      // All done, no more waiters.
152
      return 0;
153
    }
154
    assert(cur > 0, "Sanity");
155

156
    int prev = Atomic::cmpxchg(&_outstanding_wakeups, cur, cur - 1);
157
    if (prev != cur) {
158
      // Contention, return to caller for early return or backoff.
159
      return prev;
160
    }
161

162
    // Signal!
163
    _sem.signal();
164

165
    if (++signals >= max) {
166
      // Signalled requested number of times, break out.
167
      return prev;
168
    }
169
  }
170
}
171

172
void GenericWaitBarrier::Cell::disarm(int32_t expected_tag) {
173
  int32_t waiters;
174

175
  while (true) {
176
    int64_t state = Atomic::load_acquire(&_state);
177
    int32_t tag = decode_tag(state);
178
    waiters = decode_waiters(state);
179

180
    assert((tag == expected_tag) && (waiters >= 0),
181
           "Mid disarm: Should be armed with expected tag and have sane waiters. "
182
           "Tag: " INT32_FORMAT "; Waiters: " INT32_FORMAT,
183
           tag, waiters);
184

185
    int64_t new_state = encode(0, waiters);
186
    if (Atomic::cmpxchg(&_state, state, new_state) == state) {
187
      // Successfully disarmed.
188
      break;
189
    }
190
  }
191

192
  // Wake up waiters, if we have at least one.
193
  // Allow other threads to assist with wakeups, if possible.
194
  if (waiters > 0) {
195
    Atomic::release_store(&_outstanding_wakeups, waiters);
196
    SpinYield sp;
197
    while (signal_if_needed(INT_MAX) > 0) {
198
      sp.wait();
199
    }
200
  }
201
  assert(Atomic::load(&_outstanding_wakeups) == 0, "Post disarm: Should not have outstanding wakeups");
202
}
203

204
void GenericWaitBarrier::Cell::wait(int32_t expected_tag) {
205
  // Try to register ourselves as pending waiter.
206
  while (true) {
207
    int64_t state = Atomic::load_acquire(&_state);
208
    int32_t tag = decode_tag(state);
209
    if (tag != expected_tag) {
210
      // Cell tag had changed while waiting here. This means either the cell had
211
      // been disarmed, or we are late and the cell was armed with a new tag.
212
      // Exit without touching anything else.
213
      return;
214
    }
215
    int32_t waiters = decode_waiters(state);
216

217
    assert((tag == expected_tag) && (waiters >= 0 && waiters < INT32_MAX),
218
           "Before wait: Should be armed with expected tag and waiters are in range. "
219
           "Tag: " INT32_FORMAT "; Waiters: " INT32_FORMAT,
220
           tag, waiters);
221

222
    int64_t new_state = encode(tag, waiters + 1);
223
    if (Atomic::cmpxchg(&_state, state, new_state) == state) {
224
      // Success! Proceed to wait.
225
      break;
226
    }
227
  }
228

229
  // Wait for notification.
230
  _sem.wait();
231

232
  // Unblocked! We help out with waking up two siblings. This allows to avalanche
233
  // the wakeups for many threads, even if some threads are lagging behind.
234
  // Note that we can only do this *before* reporting back as completed waiter,
235
  // otherwise we might prematurely wake up threads for another barrier tag.
236
  // Current arm() sequence protects us from this trouble by waiting until all waiters
237
  // leave.
238
  signal_if_needed(2);
239

240
  // Register ourselves as completed waiter before leaving.
241
  while (true) {
242
    int64_t state = Atomic::load_acquire(&_state);
243
    int32_t tag = decode_tag(state);
244
    int32_t waiters = decode_waiters(state);
245

246
    assert((tag == 0) && (waiters > 0),
247
           "After wait: Should be not armed and have non-complete waiters. "
248
           "Tag: " INT32_FORMAT "; Waiters: " INT32_FORMAT,
249
           tag, waiters);
250

251
    int64_t new_state = encode(tag, waiters - 1);
252
    if (Atomic::cmpxchg(&_state, state, new_state) == state) {
253
      // Success!
254
      break;
255
    }
256
  }
257
}
258

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.