jdk

Форк
0
/
test_nonblockingQueue.cpp 
284 строки · 8.6 Кб
1
/*
2
 * Copyright (c) 2021, 2024, Oracle and/or its affiliates. All rights reserved.
3
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4
 *
5
 * This code is free software; you can redistribute it and/or modify it
6
 * under the terms of the GNU General Public License version 2 only, as
7
 * published by the Free Software Foundation.
8
 *
9
 * This code is distributed in the hope that it will be useful, but WITHOUT
10
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12
 * version 2 for more details (a copy is included in the LICENSE file that
13
 * accompanied this code).
14
 *
15
 * You should have received a copy of the GNU General Public License version
16
 * 2 along with this work; if not, write to the Free Software Foundation,
17
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18
 *
19
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20
 * or visit www.oracle.com if you need additional information or have any
21
 * questions.
22
 */
23

24
#include "precompiled.hpp"
25
#include "memory/allocation.inline.hpp"
26
#include "runtime/atomic.hpp"
27
#include "utilities/globalDefinitions.hpp"
28
#include "utilities/nonblockingQueue.inline.hpp"
29
#include "utilities/pair.hpp"
30
#include "threadHelper.inline.hpp"
31
#include "unittest.hpp"
32

33
#include <new>
34

35
class NonblockingQueueTestElement {
36
  typedef NonblockingQueueTestElement Element;
37

38
  Element* volatile _entry;
39
  Element* volatile _entry1;
40
  size_t _id;
41

42
  static Element* volatile* entry_ptr(Element& e) { return &e._entry; }
43
  static Element* volatile* entry1_ptr(Element& e) { return &e._entry1; }
44

45
public:
46
  using TestQueue = NonblockingQueue<Element, &entry_ptr>;
47
  using TestQueue1 = NonblockingQueue<Element, &entry1_ptr>;
48

49
  NonblockingQueueTestElement(size_t id = 0) : _entry(), _entry1(), _id(id) {}
50
  size_t id() const { return _id; }
51
  void set_id(size_t value) { _id = value; }
52
  Element* next() { return _entry; }
53
  Element* next1() { return _entry1; }
54
};
55

56
typedef NonblockingQueueTestElement Element;
57
typedef Element::TestQueue TestQueue;
58
typedef Element::TestQueue1 TestQueue1;
59

60
static void initialize(Element* elements, size_t size, TestQueue* queue) {
61
  for (size_t i = 0; i < size; ++i) {
62
    elements[i].set_id(i);
63
  }
64
  ASSERT_TRUE(queue->empty());
65
  ASSERT_EQ(0u, queue->length());
66
  ASSERT_TRUE(queue->is_end(queue->first()));
67
  ASSERT_TRUE(queue->pop() == nullptr);
68

69
  for (size_t id = 0; id < size; ++id) {
70
    ASSERT_EQ(id, queue->length());
71
    Element* e = &elements[id];
72
    ASSERT_EQ(id, e->id());
73
    queue->push(*e);
74
    ASSERT_FALSE(queue->empty());
75
    // first() is always the oldest element.
76
    ASSERT_EQ(&elements[0], queue->first());
77
  }
78
}
79

80
class NonblockingQueueTestBasics : public ::testing::Test {
81
public:
82
  NonblockingQueueTestBasics();
83

84
  static const size_t nelements = 10;
85
  Element elements[nelements];
86
  TestQueue queue;
87
};
88

89
const size_t NonblockingQueueTestBasics::nelements;
90

91
NonblockingQueueTestBasics::NonblockingQueueTestBasics() : queue() {
92
  initialize(elements, nelements, &queue);
93
}
94

95
TEST_F(NonblockingQueueTestBasics, pop) {
96
  for (size_t i = 0; i < nelements; ++i) {
97
    ASSERT_FALSE(queue.empty());
98
    ASSERT_EQ(nelements - i, queue.length());
99
    Element* e = queue.pop();
100
    ASSERT_TRUE(e != nullptr);
101
    ASSERT_EQ(&elements[i], e);
102
    ASSERT_EQ(i, e->id());
103
  }
104
  ASSERT_TRUE(queue.empty());
105
  ASSERT_EQ(0u, queue.length());
106
  ASSERT_TRUE(queue.pop() == nullptr);
107
}
108

109
TEST_F(NonblockingQueueTestBasics, append) {
110
  TestQueue other_queue;
111
  ASSERT_TRUE(other_queue.empty());
112
  ASSERT_EQ(0u, other_queue.length());
113
  ASSERT_TRUE(other_queue.is_end(other_queue.first()));
114
  ASSERT_TRUE(other_queue.pop() == nullptr);
115

116
  Pair<Element*, Element*> pair = queue.take_all();
117
  other_queue.append(*pair.first, *pair.second);
118
  ASSERT_EQ(nelements, other_queue.length());
119
  ASSERT_TRUE(queue.empty());
120
  ASSERT_EQ(0u, queue.length());
121
  ASSERT_TRUE(queue.is_end(queue.first()));
122
  ASSERT_TRUE(queue.pop() == nullptr);
123

124
  for (size_t i = 0; i < nelements; ++i) {
125
    ASSERT_EQ(nelements - i, other_queue.length());
126
    Element* e = other_queue.pop();
127
    ASSERT_TRUE(e != nullptr);
128
    ASSERT_EQ(&elements[i], e);
129
    ASSERT_EQ(i, e->id());
130
  }
131
  ASSERT_EQ(0u, other_queue.length());
132
  ASSERT_TRUE(other_queue.pop() == nullptr);
133
}
134

135
TEST_F(NonblockingQueueTestBasics, two_queues) {
136
  TestQueue1 queue1;
137
  ASSERT_TRUE(queue1.pop() == nullptr);
138

139
  for (size_t id = 0; id < nelements; ++id) {
140
    queue1.push(elements[id]);
141
  }
142
  ASSERT_EQ(nelements, queue1.length());
143
  Element* e0 = queue.first();
144
  Element* e1 = queue1.first();
145
  ASSERT_TRUE(e0 != nullptr);
146
  ASSERT_TRUE(e1 != nullptr);
147
  ASSERT_FALSE(queue.is_end(e0));
148
  ASSERT_FALSE(queue1.is_end(e1));
149
  while (!queue.is_end(e0) && !queue1.is_end(e1)) {
150
    ASSERT_EQ(e0, e1);
151
    e0 = e0->next();
152
    e1 = e1->next1();
153
  }
154
  ASSERT_TRUE(queue.is_end(e0));
155
  ASSERT_TRUE(queue1.is_end(e1));
156

157
  for (size_t i = 0; i < nelements; ++i) {
158
    ASSERT_EQ(nelements - i, queue.length());
159
    ASSERT_EQ(nelements - i, queue1.length());
160

161
    Element* e = queue.pop();
162
    ASSERT_TRUE(e != nullptr);
163
    ASSERT_EQ(&elements[i], e);
164
    ASSERT_EQ(i, e->id());
165

166
    Element* e1 = queue1.pop();
167
    ASSERT_TRUE(e1 != nullptr);
168
    ASSERT_EQ(&elements[i], e1);
169
    ASSERT_EQ(i, e1->id());
170

171
    ASSERT_EQ(e, e1);
172
  }
173
  ASSERT_EQ(0u, queue.length());
174
  ASSERT_EQ(0u, queue1.length());
175
  ASSERT_TRUE(queue.pop() == nullptr);
176
  ASSERT_TRUE(queue1.pop() == nullptr);
177
}
178

179
class NonblockingQueueTestThread : public JavaTestThread {
180
  uint _id;
181
  TestQueue* _from;
182
  TestQueue* _to;
183
  volatile size_t* _processed;
184
  size_t _process_limit;
185
  size_t _local_processed;
186
  volatile bool _ready;
187

188
public:
189
  NonblockingQueueTestThread(Semaphore* post,
190
                             uint id,
191
                             TestQueue* from,
192
                             TestQueue* to,
193
                             volatile size_t* processed,
194
                             size_t process_limit) :
195
    JavaTestThread(post),
196
    _id(id),
197
    _from(from),
198
    _to(to),
199
    _processed(processed),
200
    _process_limit(process_limit),
201
    _local_processed(0),
202
    _ready(false)
203
  {}
204

205
  virtual void main_run() {
206
    Atomic::release_store_fence(&_ready, true);
207
    while (true) {
208
      Element* e = _from->pop();
209
      if (e != nullptr) {
210
        _to->push(*e);
211
        Atomic::inc(_processed);
212
        ++_local_processed;
213
      } else if (Atomic::load_acquire(_processed) == _process_limit) {
214
        tty->print_cr("thread %u processed " SIZE_FORMAT, _id, _local_processed);
215
        return;
216
      }
217
    }
218
  }
219

220
  bool ready() const { return Atomic::load_acquire(&_ready); }
221
};
222

223
TEST_VM(NonblockingQueueTest, stress) {
224
  Semaphore post;
225
  TestQueue initial_queue;
226
  TestQueue start_queue;
227
  TestQueue middle_queue;
228
  TestQueue final_queue;
229
  volatile size_t stage1_processed = 0;
230
  volatile size_t stage2_processed = 0;
231

232
  const size_t nelements = 10000;
233
  Element* elements = NEW_C_HEAP_ARRAY(Element, nelements, mtOther);
234
  for (size_t id = 0; id < nelements; ++id) {
235
    ::new (&elements[id]) Element(id);
236
    initial_queue.push(elements[id]);
237
  }
238
  ASSERT_EQ(nelements, initial_queue.length());
239

240
  // - stage1 threads pop from start_queue and push to middle_queue.
241
  // - stage2 threads pop from middle_queue and push to final_queue.
242
  // - all threads in a stage count the number of elements processed in
243
  //   their corresponding stageN_processed counter.
244

245
  const uint stage1_threads = 2;
246
  const uint stage2_threads = 2;
247
  const uint nthreads = stage1_threads + stage2_threads;
248
  NonblockingQueueTestThread* threads[nthreads] = {};
249

250
  for (uint i = 0; i < ARRAY_SIZE(threads); ++i) {
251
    TestQueue* from = &start_queue;
252
    TestQueue* to = &middle_queue;
253
    volatile size_t* processed = &stage1_processed;
254
    if (i >= stage1_threads) {
255
      from = &middle_queue;
256
      to = &final_queue;
257
      processed = &stage2_processed;
258
    }
259
    threads[i] =
260
      new NonblockingQueueTestThread(&post, i, from, to, processed, nelements);
261
    threads[i]->doit();
262
    while (!threads[i]->ready()) {} // Wait until ready to start test.
263
  }
264

265
  // Transfer elements to start_queue to start test.
266
  Pair<Element*, Element*> pair = initial_queue.take_all();
267
  start_queue.append(*pair.first, *pair.second);
268

269
  // Wait for all threads to complete.
270
  for (uint i = 0; i < nthreads; ++i) {
271
    post.wait();
272
  }
273

274
  // Verify expected state.
275
  ASSERT_EQ(nelements, stage1_processed);
276
  ASSERT_EQ(nelements, stage2_processed);
277
  ASSERT_EQ(0u, initial_queue.length());
278
  ASSERT_EQ(0u, start_queue.length());
279
  ASSERT_EQ(0u, middle_queue.length());
280
  ASSERT_EQ(nelements, final_queue.length());
281
  while (final_queue.pop() != nullptr) {}
282

283
  FREE_C_HEAP_ARRAY(Element, elements);
284
}
285

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.