1 | /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
|
---|
2 | /* ***** BEGIN LICENSE BLOCK *****
|
---|
3 | * Version: MPL 1.1/GPL 2.0/LGPL 2.1
|
---|
4 | *
|
---|
5 | * The contents of this file are subject to the Mozilla Public License Version
|
---|
6 | * 1.1 (the "License"); you may not use this file except in compliance with
|
---|
7 | * the License. You may obtain a copy of the License at
|
---|
8 | * http://www.mozilla.org/MPL/
|
---|
9 | *
|
---|
10 | * Software distributed under the License is distributed on an "AS IS" basis,
|
---|
11 | * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
|
---|
12 | * for the specific language governing rights and limitations under the
|
---|
13 | * License.
|
---|
14 | *
|
---|
15 | * The Original Code is the Netscape Portable Runtime (NSPR).
|
---|
16 | *
|
---|
17 | * The Initial Developer of the Original Code is
|
---|
18 | * Netscape Communications Corporation.
|
---|
19 | * Portions created by the Initial Developer are Copyright (C) 1998-2000
|
---|
20 | * the Initial Developer. All Rights Reserved.
|
---|
21 | *
|
---|
22 | * Contributor(s):
|
---|
23 | *
|
---|
24 | * Alternatively, the contents of this file may be used under the terms of
|
---|
25 | * either the GNU General Public License Version 2 or later (the "GPL"), or
|
---|
26 | * the GNU Lesser General Public License Version 2.1 or later (the "LGPL"),
|
---|
27 | * in which case the provisions of the GPL or the LGPL are applicable instead
|
---|
28 | * of those above. If you wish to allow use of your version of this file only
|
---|
29 | * under the terms of either the GPL or the LGPL, and not to allow others to
|
---|
30 | * use your version of this file under the terms of the MPL, indicate your
|
---|
31 | * decision by deleting the provisions above and replace them with the notice
|
---|
32 | * and other provisions required by the GPL or the LGPL. If you do not delete
|
---|
33 | * the provisions above, a recipient may use your version of this file under
|
---|
34 | * the terms of any one of the MPL, the GPL or the LGPL.
|
---|
35 | *
|
---|
36 | * ***** END LICENSE BLOCK ***** */
|
---|
37 |
|
---|
38 | #include "prio.h"
|
---|
39 | #include "prprf.h"
|
---|
40 | #include "prlog.h"
|
---|
41 | #include "prmem.h"
|
---|
42 | #include "pratom.h"
|
---|
43 | #include "prlock.h"
|
---|
44 | #include "prmwait.h"
|
---|
45 | #include "prclist.h"
|
---|
46 | #include "prerror.h"
|
---|
47 | #include "prinrval.h"
|
---|
48 | #include "prnetdb.h"
|
---|
49 | #include "prthread.h"
|
---|
50 |
|
---|
51 | #include "plstr.h"
|
---|
52 | #include "plerror.h"
|
---|
53 | #include "plgetopt.h"
|
---|
54 |
|
---|
55 | #include <string.h>
|
---|
56 |
|
---|
57 | typedef struct Shared
|
---|
58 | {
|
---|
59 | const char *title;
|
---|
60 | PRLock *list_lock;
|
---|
61 | PRWaitGroup *group;
|
---|
62 | PRIntervalTime timeout;
|
---|
63 | } Shared;
|
---|
64 |
|
---|
65 | typedef enum Verbosity {silent, quiet, chatty, noisy} Verbosity;
|
---|
66 |
|
---|
67 | static PRFileDesc *debug = NULL;
|
---|
68 | static PRInt32 desc_allocated = 0;
|
---|
69 | static PRUint16 default_port = 12273;
|
---|
70 | static enum Verbosity verbosity = quiet;
|
---|
71 | static PRInt32 ops_required = 1000, ops_done = 0;
|
---|
72 | static PRThreadScope thread_scope = PR_LOCAL_THREAD;
|
---|
73 | static PRIntn client_threads = 20, worker_threads = 2, wait_objects = 50;
|
---|
74 |
|
---|
75 | #if defined(DEBUG)
|
---|
76 | #define MW_ASSERT(_expr) \
|
---|
77 | ((_expr)?((void)0):_MW_Assert(# _expr,__FILE__,__LINE__))
|
---|
78 | static void _MW_Assert(const char *s, const char *file, PRIntn ln)
|
---|
79 | {
|
---|
80 | if (NULL != debug) PL_FPrintError(debug, NULL);
|
---|
81 | PR_Assert(s, file, ln);
|
---|
82 | } /* _MW_Assert */
|
---|
83 | #else
|
---|
84 | #define MW_ASSERT(_expr)
|
---|
85 | #endif
|
---|
86 |
|
---|
87 | static void PrintRecvDesc(PRRecvWait *desc, const char *msg)
|
---|
88 | {
|
---|
89 | const char *tag[] = {
|
---|
90 | "PR_MW_INTERRUPT", "PR_MW_TIMEOUT",
|
---|
91 | "PR_MW_FAILURE", "PR_MW_SUCCESS", "PR_MW_PENDING"};
|
---|
92 | PR_fprintf(
|
---|
93 | debug, "%s: PRRecvWait(@0x%x): {fd: 0x%x, outcome: %s, tmo: %u}\n",
|
---|
94 | msg, desc, desc->fd, tag[desc->outcome + 3], desc->timeout);
|
---|
95 | } /* PrintRecvDesc */
|
---|
96 |
|
---|
97 | static Shared *MakeShared(const char *title)
|
---|
98 | {
|
---|
99 | Shared *shared = PR_NEWZAP(Shared);
|
---|
100 | shared->group = PR_CreateWaitGroup(1);
|
---|
101 | shared->timeout = PR_SecondsToInterval(1);
|
---|
102 | shared->list_lock = PR_NewLock();
|
---|
103 | shared->title = title;
|
---|
104 | return shared;
|
---|
105 | } /* MakeShared */
|
---|
106 |
|
---|
107 | static void DestroyShared(Shared *shared)
|
---|
108 | {
|
---|
109 | PRStatus rv;
|
---|
110 | if (verbosity > quiet)
|
---|
111 | PR_fprintf(debug, "%s: destroying group\n", shared->title);
|
---|
112 | rv = PR_DestroyWaitGroup(shared->group);
|
---|
113 | MW_ASSERT(PR_SUCCESS == rv);
|
---|
114 | PR_DestroyLock(shared->list_lock);
|
---|
115 | PR_DELETE(shared);
|
---|
116 | } /* DestroyShared */
|
---|
117 |
|
---|
118 | static PRRecvWait *CreateRecvWait(PRFileDesc *fd, PRIntervalTime timeout)
|
---|
119 | {
|
---|
120 | PRRecvWait *desc_out = PR_NEWZAP(PRRecvWait);
|
---|
121 | MW_ASSERT(NULL != desc_out);
|
---|
122 |
|
---|
123 | MW_ASSERT(NULL != fd);
|
---|
124 | desc_out->fd = fd;
|
---|
125 | desc_out->timeout = timeout;
|
---|
126 | desc_out->buffer.length = 120;
|
---|
127 | desc_out->buffer.start = PR_CALLOC(120);
|
---|
128 |
|
---|
129 | PR_AtomicIncrement(&desc_allocated);
|
---|
130 |
|
---|
131 | if (verbosity > chatty)
|
---|
132 | PrintRecvDesc(desc_out, "Allocated");
|
---|
133 | return desc_out;
|
---|
134 | } /* CreateRecvWait */
|
---|
135 |
|
---|
136 | static void DestroyRecvWait(PRRecvWait *desc_out)
|
---|
137 | {
|
---|
138 | if (verbosity > chatty)
|
---|
139 | PrintRecvDesc(desc_out, "Destroying");
|
---|
140 | PR_Close(desc_out->fd);
|
---|
141 | if (NULL != desc_out->buffer.start)
|
---|
142 | PR_DELETE(desc_out->buffer.start);
|
---|
143 | PR_Free(desc_out);
|
---|
144 | (void)PR_AtomicDecrement(&desc_allocated);
|
---|
145 | } /* DestroyRecvWait */
|
---|
146 |
|
---|
147 | static void CancelGroup(Shared *shared)
|
---|
148 | {
|
---|
149 | PRRecvWait *desc_out;
|
---|
150 |
|
---|
151 | if (verbosity > quiet)
|
---|
152 | PR_fprintf(debug, "%s Reclaiming wait descriptors\n", shared->title);
|
---|
153 |
|
---|
154 | do
|
---|
155 | {
|
---|
156 | desc_out = PR_CancelWaitGroup(shared->group);
|
---|
157 | if (NULL != desc_out) DestroyRecvWait(desc_out);
|
---|
158 | } while (NULL != desc_out);
|
---|
159 |
|
---|
160 | MW_ASSERT(0 == desc_allocated);
|
---|
161 | MW_ASSERT(PR_GROUP_EMPTY_ERROR == PR_GetError());
|
---|
162 | } /* CancelGroup */
|
---|
163 |
|
---|
164 | static void PR_CALLBACK ClientThread(void* arg)
|
---|
165 | {
|
---|
166 | PRStatus rv;
|
---|
167 | PRInt32 bytes;
|
---|
168 | PRIntn empty_flags = 0;
|
---|
169 | PRNetAddr server_address;
|
---|
170 | unsigned char buffer[100];
|
---|
171 | Shared *shared = (Shared*)arg;
|
---|
172 | PRFileDesc *server = PR_NewTCPSocket();
|
---|
173 | if ((NULL == server)
|
---|
174 | && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) return;
|
---|
175 | MW_ASSERT(NULL != server);
|
---|
176 |
|
---|
177 | if (verbosity > chatty)
|
---|
178 | PR_fprintf(debug, "%s: Server socket @0x%x\n", shared->title, server);
|
---|
179 |
|
---|
180 | /* Initialize the buffer so that Purify won't complain */
|
---|
181 | memset(buffer, 0, sizeof(buffer));
|
---|
182 |
|
---|
183 | rv = PR_InitializeNetAddr(PR_IpAddrLoopback, default_port, &server_address);
|
---|
184 | MW_ASSERT(PR_SUCCESS == rv);
|
---|
185 |
|
---|
186 | if (verbosity > quiet)
|
---|
187 | PR_fprintf(debug, "%s: Client opening connection\n", shared->title);
|
---|
188 | rv = PR_Connect(server, &server_address, PR_INTERVAL_NO_TIMEOUT);
|
---|
189 |
|
---|
190 | if (PR_FAILURE == rv)
|
---|
191 | {
|
---|
192 | if (verbosity > silent) PL_FPrintError(debug, "Client connect failed");
|
---|
193 | return;
|
---|
194 | }
|
---|
195 |
|
---|
196 | while (ops_done < ops_required)
|
---|
197 | {
|
---|
198 | bytes = PR_Send(
|
---|
199 | server, buffer, sizeof(buffer), empty_flags, PR_INTERVAL_NO_TIMEOUT);
|
---|
200 | if ((-1 == bytes) && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) break;
|
---|
201 | MW_ASSERT(sizeof(buffer) == bytes);
|
---|
202 | if (verbosity > chatty)
|
---|
203 | PR_fprintf(
|
---|
204 | debug, "%s: Client sent %d bytes\n",
|
---|
205 | shared->title, sizeof(buffer));
|
---|
206 | bytes = PR_Recv(
|
---|
207 | server, buffer, sizeof(buffer), empty_flags, PR_INTERVAL_NO_TIMEOUT);
|
---|
208 | if (verbosity > chatty)
|
---|
209 | PR_fprintf(
|
---|
210 | debug, "%s: Client received %d bytes\n",
|
---|
211 | shared->title, sizeof(buffer));
|
---|
212 | if ((-1 == bytes) && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) break;
|
---|
213 | MW_ASSERT(sizeof(buffer) == bytes);
|
---|
214 | PR_Sleep(shared->timeout);
|
---|
215 | }
|
---|
216 | rv = PR_Close(server);
|
---|
217 | MW_ASSERT(PR_SUCCESS == rv);
|
---|
218 |
|
---|
219 | } /* ClientThread */
|
---|
220 |
|
---|
221 | static void OneInThenCancelled(Shared *shared)
|
---|
222 | {
|
---|
223 | PRStatus rv;
|
---|
224 | PRRecvWait *desc_out, *desc_in = PR_NEWZAP(PRRecvWait);
|
---|
225 |
|
---|
226 | shared->timeout = PR_INTERVAL_NO_TIMEOUT;
|
---|
227 |
|
---|
228 | desc_in->fd = PR_NewTCPSocket();
|
---|
229 | desc_in->timeout = shared->timeout;
|
---|
230 |
|
---|
231 | if (verbosity > chatty) PrintRecvDesc(desc_in, "Adding desc");
|
---|
232 |
|
---|
233 | rv = PR_AddWaitFileDesc(shared->group, desc_in);
|
---|
234 | MW_ASSERT(PR_SUCCESS == rv);
|
---|
235 |
|
---|
236 | if (verbosity > chatty) PrintRecvDesc(desc_in, "Cancelling");
|
---|
237 | rv = PR_CancelWaitFileDesc(shared->group, desc_in);
|
---|
238 | MW_ASSERT(PR_SUCCESS == rv);
|
---|
239 |
|
---|
240 | desc_out = PR_WaitRecvReady(shared->group);
|
---|
241 | MW_ASSERT(desc_out == desc_in);
|
---|
242 | MW_ASSERT(PR_MW_INTERRUPT == desc_out->outcome);
|
---|
243 | MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError());
|
---|
244 | if (verbosity > chatty) PrintRecvDesc(desc_out, "Ready");
|
---|
245 |
|
---|
246 | rv = PR_Close(desc_in->fd);
|
---|
247 | MW_ASSERT(PR_SUCCESS == rv);
|
---|
248 |
|
---|
249 | if (verbosity > quiet)
|
---|
250 | PR_fprintf(debug, "%s: destroying group\n", shared->title);
|
---|
251 |
|
---|
252 | PR_DELETE(desc_in);
|
---|
253 | } /* OneInThenCancelled */
|
---|
254 |
|
---|
255 | static void OneOpOneThread(Shared *shared)
|
---|
256 | {
|
---|
257 | PRStatus rv;
|
---|
258 | PRRecvWait *desc_out, *desc_in = PR_NEWZAP(PRRecvWait);
|
---|
259 |
|
---|
260 | desc_in->fd = PR_NewTCPSocket();
|
---|
261 | desc_in->timeout = shared->timeout;
|
---|
262 |
|
---|
263 | if (verbosity > chatty) PrintRecvDesc(desc_in, "Adding desc");
|
---|
264 |
|
---|
265 | rv = PR_AddWaitFileDesc(shared->group, desc_in);
|
---|
266 | MW_ASSERT(PR_SUCCESS == rv);
|
---|
267 | desc_out = PR_WaitRecvReady(shared->group);
|
---|
268 | MW_ASSERT(desc_out == desc_in);
|
---|
269 | MW_ASSERT(PR_MW_TIMEOUT == desc_out->outcome);
|
---|
270 | MW_ASSERT(PR_IO_TIMEOUT_ERROR == PR_GetError());
|
---|
271 | if (verbosity > chatty) PrintRecvDesc(desc_out, "Ready");
|
---|
272 |
|
---|
273 | rv = PR_Close(desc_in->fd);
|
---|
274 | MW_ASSERT(PR_SUCCESS == rv);
|
---|
275 |
|
---|
276 | PR_DELETE(desc_in);
|
---|
277 | } /* OneOpOneThread */
|
---|
278 |
|
---|
279 | static void ManyOpOneThread(Shared *shared)
|
---|
280 | {
|
---|
281 | PRStatus rv;
|
---|
282 | PRIntn index;
|
---|
283 | PRRecvWait *desc_in;
|
---|
284 | PRRecvWait *desc_out;
|
---|
285 |
|
---|
286 | if (verbosity > quiet)
|
---|
287 | PR_fprintf(debug, "%s: adding %d descs\n", shared->title, wait_objects);
|
---|
288 |
|
---|
289 | for (index = 0; index < wait_objects; ++index)
|
---|
290 | {
|
---|
291 | desc_in = CreateRecvWait(PR_NewTCPSocket(), shared->timeout);
|
---|
292 |
|
---|
293 | rv = PR_AddWaitFileDesc(shared->group, desc_in);
|
---|
294 | MW_ASSERT(PR_SUCCESS == rv);
|
---|
295 | }
|
---|
296 |
|
---|
297 | while (ops_done < ops_required)
|
---|
298 | {
|
---|
299 | desc_out = PR_WaitRecvReady(shared->group);
|
---|
300 | MW_ASSERT(PR_MW_TIMEOUT == desc_out->outcome);
|
---|
301 | MW_ASSERT(PR_IO_TIMEOUT_ERROR == PR_GetError());
|
---|
302 | if (verbosity > chatty) PrintRecvDesc(desc_out, "Ready/readding");
|
---|
303 | rv = PR_AddWaitFileDesc(shared->group, desc_out);
|
---|
304 | MW_ASSERT(PR_SUCCESS == rv);
|
---|
305 | (void)PR_AtomicIncrement(&ops_done);
|
---|
306 | }
|
---|
307 |
|
---|
308 | CancelGroup(shared);
|
---|
309 | } /* ManyOpOneThread */
|
---|
310 |
|
---|
311 | static void PR_CALLBACK SomeOpsThread(void *arg)
|
---|
312 | {
|
---|
313 | PRRecvWait *desc_out;
|
---|
314 | PRStatus rv = PR_SUCCESS;
|
---|
315 | Shared *shared = (Shared*)arg;
|
---|
316 | do /* until interrupted */
|
---|
317 | {
|
---|
318 | desc_out = PR_WaitRecvReady(shared->group);
|
---|
319 | if (NULL == desc_out)
|
---|
320 | {
|
---|
321 | MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError());
|
---|
322 | if (verbosity > quiet) PR_fprintf(debug, "Aborted\n");
|
---|
323 | break;
|
---|
324 | }
|
---|
325 | MW_ASSERT(PR_MW_TIMEOUT == desc_out->outcome);
|
---|
326 | MW_ASSERT(PR_IO_TIMEOUT_ERROR == PR_GetError());
|
---|
327 | if (verbosity > chatty) PrintRecvDesc(desc_out, "Ready");
|
---|
328 |
|
---|
329 | if (verbosity > chatty) PrintRecvDesc(desc_out, "Re-Adding");
|
---|
330 | desc_out->timeout = shared->timeout;
|
---|
331 | rv = PR_AddWaitFileDesc(shared->group, desc_out);
|
---|
332 | PR_AtomicIncrement(&ops_done);
|
---|
333 | if (ops_done > ops_required) break;
|
---|
334 | } while (PR_SUCCESS == rv);
|
---|
335 | MW_ASSERT(PR_SUCCESS == rv);
|
---|
336 | } /* SomeOpsThread */
|
---|
337 |
|
---|
338 | static void SomeOpsSomeThreads(Shared *shared)
|
---|
339 | {
|
---|
340 | PRStatus rv;
|
---|
341 | PRThread **thread;
|
---|
342 | PRIntn index;
|
---|
343 | PRRecvWait *desc_in;
|
---|
344 |
|
---|
345 | thread = (PRThread**)PR_CALLOC(sizeof(PRThread*) * worker_threads);
|
---|
346 |
|
---|
347 | /* Create some threads */
|
---|
348 |
|
---|
349 | if (verbosity > quiet)
|
---|
350 | PR_fprintf(debug, "%s: creating threads\n", shared->title);
|
---|
351 | for (index = 0; index < worker_threads; ++index)
|
---|
352 | {
|
---|
353 | thread[index] = PR_CreateThread(
|
---|
354 | PR_USER_THREAD, SomeOpsThread, shared,
|
---|
355 | PR_PRIORITY_HIGH, thread_scope,
|
---|
356 | PR_JOINABLE_THREAD, 16 * 1024);
|
---|
357 | }
|
---|
358 |
|
---|
359 | /* then create some operations */
|
---|
360 | if (verbosity > quiet)
|
---|
361 | PR_fprintf(debug, "%s: creating desc\n", shared->title);
|
---|
362 | for (index = 0; index < wait_objects; ++index)
|
---|
363 | {
|
---|
364 | desc_in = CreateRecvWait(PR_NewTCPSocket(), shared->timeout);
|
---|
365 | rv = PR_AddWaitFileDesc(shared->group, desc_in);
|
---|
366 | MW_ASSERT(PR_SUCCESS == rv);
|
---|
367 | }
|
---|
368 |
|
---|
369 | if (verbosity > quiet)
|
---|
370 | PR_fprintf(debug, "%s: sleeping\n", shared->title);
|
---|
371 | while (ops_done < ops_required) PR_Sleep(shared->timeout);
|
---|
372 |
|
---|
373 | if (verbosity > quiet)
|
---|
374 | PR_fprintf(debug, "%s: interrupting/joining threads\n", shared->title);
|
---|
375 | for (index = 0; index < worker_threads; ++index)
|
---|
376 | {
|
---|
377 | rv = PR_Interrupt(thread[index]);
|
---|
378 | MW_ASSERT(PR_SUCCESS == rv);
|
---|
379 | rv = PR_JoinThread(thread[index]);
|
---|
380 | MW_ASSERT(PR_SUCCESS == rv);
|
---|
381 | }
|
---|
382 | PR_DELETE(thread);
|
---|
383 |
|
---|
384 | CancelGroup(shared);
|
---|
385 | } /* SomeOpsSomeThreads */
|
---|
386 |
|
---|
387 | static PRStatus ServiceRequest(Shared *shared, PRRecvWait *desc)
|
---|
388 | {
|
---|
389 | PRInt32 bytes_out;
|
---|
390 |
|
---|
391 | if (verbosity > chatty)
|
---|
392 | PR_fprintf(
|
---|
393 | debug, "%s: Service received %d bytes\n",
|
---|
394 | shared->title, desc->bytesRecv);
|
---|
395 |
|
---|
396 | if (0 == desc->bytesRecv) goto quitting;
|
---|
397 | if ((-1 == desc->bytesRecv)
|
---|
398 | && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) goto aborted;
|
---|
399 |
|
---|
400 | bytes_out = PR_Send(
|
---|
401 | desc->fd, desc->buffer.start, desc->bytesRecv, 0, shared->timeout);
|
---|
402 | if (verbosity > chatty)
|
---|
403 | PR_fprintf(
|
---|
404 | debug, "%s: Service sent %d bytes\n",
|
---|
405 | shared->title, bytes_out);
|
---|
406 |
|
---|
407 | if ((-1 == bytes_out)
|
---|
408 | && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) goto aborted;
|
---|
409 | MW_ASSERT(bytes_out == desc->bytesRecv);
|
---|
410 |
|
---|
411 | return PR_SUCCESS;
|
---|
412 |
|
---|
413 | aborted:
|
---|
414 | quitting:
|
---|
415 | return PR_FAILURE;
|
---|
416 | } /* ServiceRequest */
|
---|
417 |
|
---|
418 | static void PR_CALLBACK ServiceThread(void *arg)
|
---|
419 | {
|
---|
420 | PRStatus rv = PR_SUCCESS;
|
---|
421 | PRRecvWait *desc_out = NULL;
|
---|
422 | Shared *shared = (Shared*)arg;
|
---|
423 | do /* until interrupted */
|
---|
424 | {
|
---|
425 | if (NULL != desc_out)
|
---|
426 | {
|
---|
427 | desc_out->timeout = PR_INTERVAL_NO_TIMEOUT;
|
---|
428 | if (verbosity > chatty)
|
---|
429 | PrintRecvDesc(desc_out, "Service re-adding");
|
---|
430 | rv = PR_AddWaitFileDesc(shared->group, desc_out);
|
---|
431 | MW_ASSERT(PR_SUCCESS == rv);
|
---|
432 | }
|
---|
433 |
|
---|
434 | desc_out = PR_WaitRecvReady(shared->group);
|
---|
435 | if (NULL == desc_out)
|
---|
436 | {
|
---|
437 | MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError());
|
---|
438 | break;
|
---|
439 | }
|
---|
440 |
|
---|
441 | switch (desc_out->outcome)
|
---|
442 | {
|
---|
443 | case PR_MW_SUCCESS:
|
---|
444 | {
|
---|
445 | PR_AtomicIncrement(&ops_done);
|
---|
446 | if (verbosity > chatty)
|
---|
447 | PrintRecvDesc(desc_out, "Service ready");
|
---|
448 | rv = ServiceRequest(shared, desc_out);
|
---|
449 | break;
|
---|
450 | }
|
---|
451 | case PR_MW_INTERRUPT:
|
---|
452 | MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError());
|
---|
453 | rv = PR_FAILURE; /* if interrupted, then exit */
|
---|
454 | break;
|
---|
455 | case PR_MW_TIMEOUT:
|
---|
456 | MW_ASSERT(PR_IO_TIMEOUT_ERROR == PR_GetError());
|
---|
457 | case PR_MW_FAILURE:
|
---|
458 | if (verbosity > silent)
|
---|
459 | PL_FPrintError(debug, "RecvReady failure");
|
---|
460 | break;
|
---|
461 | default:
|
---|
462 | break;
|
---|
463 | }
|
---|
464 | } while (PR_SUCCESS == rv);
|
---|
465 |
|
---|
466 | if (NULL != desc_out) DestroyRecvWait(desc_out);
|
---|
467 |
|
---|
468 | } /* ServiceThread */
|
---|
469 |
|
---|
470 | static void PR_CALLBACK EnumerationThread(void *arg)
|
---|
471 | {
|
---|
472 | PRStatus rv;
|
---|
473 | PRIntn count;
|
---|
474 | PRRecvWait *desc;
|
---|
475 | Shared *shared = (Shared*)arg;
|
---|
476 | PRIntervalTime five_seconds = PR_SecondsToInterval(5);
|
---|
477 | PRMWaitEnumerator *enumerator = PR_CreateMWaitEnumerator(shared->group);
|
---|
478 | MW_ASSERT(NULL != enumerator);
|
---|
479 |
|
---|
480 | while (PR_SUCCESS == PR_Sleep(five_seconds))
|
---|
481 | {
|
---|
482 | count = 0;
|
---|
483 | desc = NULL;
|
---|
484 | while (NULL != (desc = PR_EnumerateWaitGroup(enumerator, desc)))
|
---|
485 | {
|
---|
486 | if (verbosity > chatty) PrintRecvDesc(desc, shared->title);
|
---|
487 | count += 1;
|
---|
488 | }
|
---|
489 | if (verbosity > silent)
|
---|
490 | PR_fprintf(debug,
|
---|
491 | "%s Enumerated %d objects\n", shared->title, count);
|
---|
492 | }
|
---|
493 |
|
---|
494 | MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError());
|
---|
495 |
|
---|
496 |
|
---|
497 | rv = PR_DestroyMWaitEnumerator(enumerator);
|
---|
498 | MW_ASSERT(PR_SUCCESS == rv);
|
---|
499 | } /* EnumerationThread */
|
---|
500 |
|
---|
501 | static void PR_CALLBACK ServerThread(void *arg)
|
---|
502 | {
|
---|
503 | PRStatus rv;
|
---|
504 | PRIntn index;
|
---|
505 | PRRecvWait *desc_in;
|
---|
506 | PRThread **worker_thread;
|
---|
507 | Shared *shared = (Shared*)arg;
|
---|
508 | PRFileDesc *listener, *service;
|
---|
509 | PRNetAddr server_address, client_address;
|
---|
510 |
|
---|
511 | worker_thread = (PRThread**)PR_CALLOC(sizeof(PRThread*) * worker_threads);
|
---|
512 | if (verbosity > quiet)
|
---|
513 | PR_fprintf(debug, "%s: Server creating worker_threads\n", shared->title);
|
---|
514 | for (index = 0; index < worker_threads; ++index)
|
---|
515 | {
|
---|
516 | worker_thread[index] = PR_CreateThread(
|
---|
517 | PR_USER_THREAD, ServiceThread, shared,
|
---|
518 | PR_PRIORITY_HIGH, thread_scope,
|
---|
519 | PR_JOINABLE_THREAD, 16 * 1024);
|
---|
520 | }
|
---|
521 |
|
---|
522 | rv = PR_InitializeNetAddr(PR_IpAddrAny, default_port, &server_address);
|
---|
523 | MW_ASSERT(PR_SUCCESS == rv);
|
---|
524 |
|
---|
525 | listener = PR_NewTCPSocket(); MW_ASSERT(NULL != listener);
|
---|
526 | if (verbosity > chatty)
|
---|
527 | PR_fprintf(
|
---|
528 | debug, "%s: Server listener socket @0x%x\n",
|
---|
529 | shared->title, listener);
|
---|
530 | rv = PR_Bind(listener, &server_address); MW_ASSERT(PR_SUCCESS == rv);
|
---|
531 | rv = PR_Listen(listener, 10); MW_ASSERT(PR_SUCCESS == rv);
|
---|
532 | while (ops_done < ops_required)
|
---|
533 | {
|
---|
534 | if (verbosity > quiet)
|
---|
535 | PR_fprintf(debug, "%s: Server accepting connection\n", shared->title);
|
---|
536 | service = PR_Accept(listener, &client_address, PR_INTERVAL_NO_TIMEOUT);
|
---|
537 | if (NULL == service)
|
---|
538 | {
|
---|
539 | if (PR_PENDING_INTERRUPT_ERROR == PR_GetError()) break;
|
---|
540 | PL_PrintError("Accept failed");
|
---|
541 | MW_ASSERT(!"Accept failed");
|
---|
542 | }
|
---|
543 | else
|
---|
544 | {
|
---|
545 | desc_in = CreateRecvWait(service, shared->timeout);
|
---|
546 | desc_in->timeout = PR_INTERVAL_NO_TIMEOUT;
|
---|
547 | if (verbosity > chatty)
|
---|
548 | PrintRecvDesc(desc_in, "Service adding");
|
---|
549 | rv = PR_AddWaitFileDesc(shared->group, desc_in);
|
---|
550 | MW_ASSERT(PR_SUCCESS == rv);
|
---|
551 | }
|
---|
552 | }
|
---|
553 |
|
---|
554 | if (verbosity > quiet)
|
---|
555 | PR_fprintf(debug, "%s: Server interrupting worker_threads\n", shared->title);
|
---|
556 | for (index = 0; index < worker_threads; ++index)
|
---|
557 | {
|
---|
558 | rv = PR_Interrupt(worker_thread[index]);
|
---|
559 | MW_ASSERT(PR_SUCCESS == rv);
|
---|
560 | rv = PR_JoinThread(worker_thread[index]);
|
---|
561 | MW_ASSERT(PR_SUCCESS == rv);
|
---|
562 | }
|
---|
563 | PR_DELETE(worker_thread);
|
---|
564 |
|
---|
565 | PR_Close(listener);
|
---|
566 |
|
---|
567 | CancelGroup(shared);
|
---|
568 |
|
---|
569 | } /* ServerThread */
|
---|
570 |
|
---|
571 | static void RealOneGroupIO(Shared *shared)
|
---|
572 | {
|
---|
573 | /*
|
---|
574 | ** Create a server that listens for connections and then services
|
---|
575 | ** requests that come in over those connections. The server never
|
---|
576 | ** deletes a connection and assumes a basic RPC model of operation.
|
---|
577 | **
|
---|
578 | ** Use worker_threads threads to service how every many open ports
|
---|
579 | ** there might be.
|
---|
580 | **
|
---|
581 | ** Oh, ya. Almost forget. Create (some) clients as well.
|
---|
582 | */
|
---|
583 | PRStatus rv;
|
---|
584 | PRIntn index;
|
---|
585 | PRThread *server_thread, *enumeration_thread, **client_thread;
|
---|
586 |
|
---|
587 | if (verbosity > quiet)
|
---|
588 | PR_fprintf(debug, "%s: creating server_thread\n", shared->title);
|
---|
589 |
|
---|
590 | server_thread = PR_CreateThread(
|
---|
591 | PR_USER_THREAD, ServerThread, shared,
|
---|
592 | PR_PRIORITY_HIGH, thread_scope,
|
---|
593 | PR_JOINABLE_THREAD, 16 * 1024);
|
---|
594 |
|
---|
595 | if (verbosity > quiet)
|
---|
596 | PR_fprintf(debug, "%s: creating enumeration_thread\n", shared->title);
|
---|
597 |
|
---|
598 | enumeration_thread = PR_CreateThread(
|
---|
599 | PR_USER_THREAD, EnumerationThread, shared,
|
---|
600 | PR_PRIORITY_HIGH, thread_scope,
|
---|
601 | PR_JOINABLE_THREAD, 16 * 1024);
|
---|
602 |
|
---|
603 | if (verbosity > quiet)
|
---|
604 | PR_fprintf(debug, "%s: snoozing before creating clients\n", shared->title);
|
---|
605 | PR_Sleep(5 * shared->timeout);
|
---|
606 |
|
---|
607 | if (verbosity > quiet)
|
---|
608 | PR_fprintf(debug, "%s: creating client_threads\n", shared->title);
|
---|
609 | client_thread = (PRThread**)PR_CALLOC(sizeof(PRThread*) * client_threads);
|
---|
610 | for (index = 0; index < client_threads; ++index)
|
---|
611 | {
|
---|
612 | client_thread[index] = PR_CreateThread(
|
---|
613 | PR_USER_THREAD, ClientThread, shared,
|
---|
614 | PR_PRIORITY_NORMAL, thread_scope,
|
---|
615 | PR_JOINABLE_THREAD, 16 * 1024);
|
---|
616 | }
|
---|
617 |
|
---|
618 | while (ops_done < ops_required) PR_Sleep(shared->timeout);
|
---|
619 |
|
---|
620 | if (verbosity > quiet)
|
---|
621 | PR_fprintf(debug, "%s: interrupting/joining client_threads\n", shared->title);
|
---|
622 | for (index = 0; index < client_threads; ++index)
|
---|
623 | {
|
---|
624 | rv = PR_Interrupt(client_thread[index]);
|
---|
625 | MW_ASSERT(PR_SUCCESS == rv);
|
---|
626 | rv = PR_JoinThread(client_thread[index]);
|
---|
627 | MW_ASSERT(PR_SUCCESS == rv);
|
---|
628 | }
|
---|
629 | PR_DELETE(client_thread);
|
---|
630 |
|
---|
631 | if (verbosity > quiet)
|
---|
632 | PR_fprintf(debug, "%s: interrupting/joining enumeration_thread\n", shared->title);
|
---|
633 | rv = PR_Interrupt(enumeration_thread);
|
---|
634 | MW_ASSERT(PR_SUCCESS == rv);
|
---|
635 | rv = PR_JoinThread(enumeration_thread);
|
---|
636 | MW_ASSERT(PR_SUCCESS == rv);
|
---|
637 |
|
---|
638 | if (verbosity > quiet)
|
---|
639 | PR_fprintf(debug, "%s: interrupting/joining server_thread\n", shared->title);
|
---|
640 | rv = PR_Interrupt(server_thread);
|
---|
641 | MW_ASSERT(PR_SUCCESS == rv);
|
---|
642 | rv = PR_JoinThread(server_thread);
|
---|
643 | MW_ASSERT(PR_SUCCESS == rv);
|
---|
644 | } /* RealOneGroupIO */
|
---|
645 |
|
---|
646 | static void RunThisOne(
|
---|
647 | void (*func)(Shared*), const char *name, const char *test_name)
|
---|
648 | {
|
---|
649 | Shared *shared;
|
---|
650 | if ((NULL == test_name) || (0 == PL_strcmp(name, test_name)))
|
---|
651 | {
|
---|
652 | if (verbosity > silent)
|
---|
653 | PR_fprintf(debug, "%s()\n", name);
|
---|
654 | shared = MakeShared(name);
|
---|
655 | ops_done = 0;
|
---|
656 | func(shared); /* run the test */
|
---|
657 | MW_ASSERT(0 == desc_allocated);
|
---|
658 | DestroyShared(shared);
|
---|
659 | }
|
---|
660 | } /* RunThisOne */
|
---|
661 |
|
---|
662 | static Verbosity ChangeVerbosity(Verbosity verbosity, PRIntn delta)
|
---|
663 | {
|
---|
664 | PRIntn verbage = (PRIntn)verbosity;
|
---|
665 | return (Verbosity)(verbage += delta);
|
---|
666 | } /* ChangeVerbosity */
|
---|
667 |
|
---|
668 | PRIntn main(PRIntn argc, char **argv)
|
---|
669 | {
|
---|
670 | PLOptStatus os;
|
---|
671 | const char *test_name = NULL;
|
---|
672 | PLOptState *opt = PL_CreateOptState(argc, argv, "dqGc:o:p:t:w:");
|
---|
673 |
|
---|
674 | while (PL_OPT_EOL != (os = PL_GetNextOpt(opt)))
|
---|
675 | {
|
---|
676 | if (PL_OPT_BAD == os) continue;
|
---|
677 | switch (opt->option)
|
---|
678 | {
|
---|
679 | case 0:
|
---|
680 | test_name = opt->value;
|
---|
681 | break;
|
---|
682 | case 'd': /* debug mode */
|
---|
683 | if (verbosity < noisy)
|
---|
684 | verbosity = ChangeVerbosity(verbosity, 1);
|
---|
685 | break;
|
---|
686 | case 'q': /* debug mode */
|
---|
687 | if (verbosity > silent)
|
---|
688 | verbosity = ChangeVerbosity(verbosity, -1);
|
---|
689 | break;
|
---|
690 | case 'G': /* use global threads */
|
---|
691 | thread_scope = PR_GLOBAL_THREAD;
|
---|
692 | break;
|
---|
693 | case 'c': /* number of client threads */
|
---|
694 | client_threads = atoi(opt->value);
|
---|
695 | break;
|
---|
696 | case 'o': /* operations to compelete */
|
---|
697 | ops_required = atoi(opt->value);
|
---|
698 | break;
|
---|
699 | case 'p': /* default port */
|
---|
700 | default_port = atoi(opt->value);
|
---|
701 | break;
|
---|
702 | case 't': /* number of threads waiting */
|
---|
703 | worker_threads = atoi(opt->value);
|
---|
704 | break;
|
---|
705 | case 'w': /* number of wait objects */
|
---|
706 | wait_objects = atoi(opt->value);
|
---|
707 | break;
|
---|
708 | default:
|
---|
709 | break;
|
---|
710 | }
|
---|
711 | }
|
---|
712 | PL_DestroyOptState(opt);
|
---|
713 |
|
---|
714 | if (verbosity > 0)
|
---|
715 | debug = PR_GetSpecialFD(PR_StandardError);
|
---|
716 |
|
---|
717 | RunThisOne(OneInThenCancelled, "OneInThenCancelled", test_name);
|
---|
718 | RunThisOne(OneOpOneThread, "OneOpOneThread", test_name);
|
---|
719 | RunThisOne(ManyOpOneThread, "ManyOpOneThread", test_name);
|
---|
720 | RunThisOne(SomeOpsSomeThreads, "SomeOpsSomeThreads", test_name);
|
---|
721 | RunThisOne(RealOneGroupIO, "RealOneGroupIO", test_name);
|
---|
722 | return 0;
|
---|
723 | } /* main */
|
---|
724 |
|
---|
725 | /* multwait.c */
|
---|