VirtualBox

source: vbox/trunk/src/libs/xpcom18a4/nsprpub/pr/tests/multiwait.c@ 55761

最後變更 在這個檔案從55761是 1,由 vboxsync 提交於 55 年 前

import

  • 屬性 svn:eol-style 設為 native
  • 屬性 svn:keywords 設為 Author Date Id Revision
檔案大小: 22.8 KB
 
1/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2/* ***** BEGIN LICENSE BLOCK *****
3 * Version: MPL 1.1/GPL 2.0/LGPL 2.1
4 *
5 * The contents of this file are subject to the Mozilla Public License Version
6 * 1.1 (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 * http://www.mozilla.org/MPL/
9 *
10 * Software distributed under the License is distributed on an "AS IS" basis,
11 * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
12 * for the specific language governing rights and limitations under the
13 * License.
14 *
15 * The Original Code is the Netscape Portable Runtime (NSPR).
16 *
17 * The Initial Developer of the Original Code is
18 * Netscape Communications Corporation.
19 * Portions created by the Initial Developer are Copyright (C) 1998-2000
20 * the Initial Developer. All Rights Reserved.
21 *
22 * Contributor(s):
23 *
24 * Alternatively, the contents of this file may be used under the terms of
25 * either the GNU General Public License Version 2 or later (the "GPL"), or
26 * the GNU Lesser General Public License Version 2.1 or later (the "LGPL"),
27 * in which case the provisions of the GPL or the LGPL are applicable instead
28 * of those above. If you wish to allow use of your version of this file only
29 * under the terms of either the GPL or the LGPL, and not to allow others to
30 * use your version of this file under the terms of the MPL, indicate your
31 * decision by deleting the provisions above and replace them with the notice
32 * and other provisions required by the GPL or the LGPL. If you do not delete
33 * the provisions above, a recipient may use your version of this file under
34 * the terms of any one of the MPL, the GPL or the LGPL.
35 *
36 * ***** END LICENSE BLOCK ***** */
37
38#include "prio.h"
39#include "prprf.h"
40#include "prlog.h"
41#include "prmem.h"
42#include "pratom.h"
43#include "prlock.h"
44#include "prmwait.h"
45#include "prclist.h"
46#include "prerror.h"
47#include "prinrval.h"
48#include "prnetdb.h"
49#include "prthread.h"
50
51#include "plstr.h"
52#include "plerror.h"
53#include "plgetopt.h"
54
55#include <string.h>
56
57typedef struct Shared
58{
59 const char *title;
60 PRLock *list_lock;
61 PRWaitGroup *group;
62 PRIntervalTime timeout;
63} Shared;
64
65typedef enum Verbosity {silent, quiet, chatty, noisy} Verbosity;
66
67static PRFileDesc *debug = NULL;
68static PRInt32 desc_allocated = 0;
69static PRUint16 default_port = 12273;
70static enum Verbosity verbosity = quiet;
71static PRInt32 ops_required = 1000, ops_done = 0;
72static PRThreadScope thread_scope = PR_LOCAL_THREAD;
73static PRIntn client_threads = 20, worker_threads = 2, wait_objects = 50;
74
75#if defined(DEBUG)
76#define MW_ASSERT(_expr) \
77 ((_expr)?((void)0):_MW_Assert(# _expr,__FILE__,__LINE__))
78static void _MW_Assert(const char *s, const char *file, PRIntn ln)
79{
80 if (NULL != debug) PL_FPrintError(debug, NULL);
81 PR_Assert(s, file, ln);
82} /* _MW_Assert */
83#else
84#define MW_ASSERT(_expr)
85#endif
86
87static void PrintRecvDesc(PRRecvWait *desc, const char *msg)
88{
89 const char *tag[] = {
90 "PR_MW_INTERRUPT", "PR_MW_TIMEOUT",
91 "PR_MW_FAILURE", "PR_MW_SUCCESS", "PR_MW_PENDING"};
92 PR_fprintf(
93 debug, "%s: PRRecvWait(@0x%x): {fd: 0x%x, outcome: %s, tmo: %u}\n",
94 msg, desc, desc->fd, tag[desc->outcome + 3], desc->timeout);
95} /* PrintRecvDesc */
96
97static Shared *MakeShared(const char *title)
98{
99 Shared *shared = PR_NEWZAP(Shared);
100 shared->group = PR_CreateWaitGroup(1);
101 shared->timeout = PR_SecondsToInterval(1);
102 shared->list_lock = PR_NewLock();
103 shared->title = title;
104 return shared;
105} /* MakeShared */
106
107static void DestroyShared(Shared *shared)
108{
109 PRStatus rv;
110 if (verbosity > quiet)
111 PR_fprintf(debug, "%s: destroying group\n", shared->title);
112 rv = PR_DestroyWaitGroup(shared->group);
113 MW_ASSERT(PR_SUCCESS == rv);
114 PR_DestroyLock(shared->list_lock);
115 PR_DELETE(shared);
116} /* DestroyShared */
117
118static PRRecvWait *CreateRecvWait(PRFileDesc *fd, PRIntervalTime timeout)
119{
120 PRRecvWait *desc_out = PR_NEWZAP(PRRecvWait);
121 MW_ASSERT(NULL != desc_out);
122
123 MW_ASSERT(NULL != fd);
124 desc_out->fd = fd;
125 desc_out->timeout = timeout;
126 desc_out->buffer.length = 120;
127 desc_out->buffer.start = PR_CALLOC(120);
128
129 PR_AtomicIncrement(&desc_allocated);
130
131 if (verbosity > chatty)
132 PrintRecvDesc(desc_out, "Allocated");
133 return desc_out;
134} /* CreateRecvWait */
135
136static void DestroyRecvWait(PRRecvWait *desc_out)
137{
138 if (verbosity > chatty)
139 PrintRecvDesc(desc_out, "Destroying");
140 PR_Close(desc_out->fd);
141 if (NULL != desc_out->buffer.start)
142 PR_DELETE(desc_out->buffer.start);
143 PR_Free(desc_out);
144 (void)PR_AtomicDecrement(&desc_allocated);
145} /* DestroyRecvWait */
146
147static void CancelGroup(Shared *shared)
148{
149 PRRecvWait *desc_out;
150
151 if (verbosity > quiet)
152 PR_fprintf(debug, "%s Reclaiming wait descriptors\n", shared->title);
153
154 do
155 {
156 desc_out = PR_CancelWaitGroup(shared->group);
157 if (NULL != desc_out) DestroyRecvWait(desc_out);
158 } while (NULL != desc_out);
159
160 MW_ASSERT(0 == desc_allocated);
161 MW_ASSERT(PR_GROUP_EMPTY_ERROR == PR_GetError());
162} /* CancelGroup */
163
164static void PR_CALLBACK ClientThread(void* arg)
165{
166 PRStatus rv;
167 PRInt32 bytes;
168 PRIntn empty_flags = 0;
169 PRNetAddr server_address;
170 unsigned char buffer[100];
171 Shared *shared = (Shared*)arg;
172 PRFileDesc *server = PR_NewTCPSocket();
173 if ((NULL == server)
174 && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) return;
175 MW_ASSERT(NULL != server);
176
177 if (verbosity > chatty)
178 PR_fprintf(debug, "%s: Server socket @0x%x\n", shared->title, server);
179
180 /* Initialize the buffer so that Purify won't complain */
181 memset(buffer, 0, sizeof(buffer));
182
183 rv = PR_InitializeNetAddr(PR_IpAddrLoopback, default_port, &server_address);
184 MW_ASSERT(PR_SUCCESS == rv);
185
186 if (verbosity > quiet)
187 PR_fprintf(debug, "%s: Client opening connection\n", shared->title);
188 rv = PR_Connect(server, &server_address, PR_INTERVAL_NO_TIMEOUT);
189
190 if (PR_FAILURE == rv)
191 {
192 if (verbosity > silent) PL_FPrintError(debug, "Client connect failed");
193 return;
194 }
195
196 while (ops_done < ops_required)
197 {
198 bytes = PR_Send(
199 server, buffer, sizeof(buffer), empty_flags, PR_INTERVAL_NO_TIMEOUT);
200 if ((-1 == bytes) && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) break;
201 MW_ASSERT(sizeof(buffer) == bytes);
202 if (verbosity > chatty)
203 PR_fprintf(
204 debug, "%s: Client sent %d bytes\n",
205 shared->title, sizeof(buffer));
206 bytes = PR_Recv(
207 server, buffer, sizeof(buffer), empty_flags, PR_INTERVAL_NO_TIMEOUT);
208 if (verbosity > chatty)
209 PR_fprintf(
210 debug, "%s: Client received %d bytes\n",
211 shared->title, sizeof(buffer));
212 if ((-1 == bytes) && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) break;
213 MW_ASSERT(sizeof(buffer) == bytes);
214 PR_Sleep(shared->timeout);
215 }
216 rv = PR_Close(server);
217 MW_ASSERT(PR_SUCCESS == rv);
218
219} /* ClientThread */
220
221static void OneInThenCancelled(Shared *shared)
222{
223 PRStatus rv;
224 PRRecvWait *desc_out, *desc_in = PR_NEWZAP(PRRecvWait);
225
226 shared->timeout = PR_INTERVAL_NO_TIMEOUT;
227
228 desc_in->fd = PR_NewTCPSocket();
229 desc_in->timeout = shared->timeout;
230
231 if (verbosity > chatty) PrintRecvDesc(desc_in, "Adding desc");
232
233 rv = PR_AddWaitFileDesc(shared->group, desc_in);
234 MW_ASSERT(PR_SUCCESS == rv);
235
236 if (verbosity > chatty) PrintRecvDesc(desc_in, "Cancelling");
237 rv = PR_CancelWaitFileDesc(shared->group, desc_in);
238 MW_ASSERT(PR_SUCCESS == rv);
239
240 desc_out = PR_WaitRecvReady(shared->group);
241 MW_ASSERT(desc_out == desc_in);
242 MW_ASSERT(PR_MW_INTERRUPT == desc_out->outcome);
243 MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError());
244 if (verbosity > chatty) PrintRecvDesc(desc_out, "Ready");
245
246 rv = PR_Close(desc_in->fd);
247 MW_ASSERT(PR_SUCCESS == rv);
248
249 if (verbosity > quiet)
250 PR_fprintf(debug, "%s: destroying group\n", shared->title);
251
252 PR_DELETE(desc_in);
253} /* OneInThenCancelled */
254
255static void OneOpOneThread(Shared *shared)
256{
257 PRStatus rv;
258 PRRecvWait *desc_out, *desc_in = PR_NEWZAP(PRRecvWait);
259
260 desc_in->fd = PR_NewTCPSocket();
261 desc_in->timeout = shared->timeout;
262
263 if (verbosity > chatty) PrintRecvDesc(desc_in, "Adding desc");
264
265 rv = PR_AddWaitFileDesc(shared->group, desc_in);
266 MW_ASSERT(PR_SUCCESS == rv);
267 desc_out = PR_WaitRecvReady(shared->group);
268 MW_ASSERT(desc_out == desc_in);
269 MW_ASSERT(PR_MW_TIMEOUT == desc_out->outcome);
270 MW_ASSERT(PR_IO_TIMEOUT_ERROR == PR_GetError());
271 if (verbosity > chatty) PrintRecvDesc(desc_out, "Ready");
272
273 rv = PR_Close(desc_in->fd);
274 MW_ASSERT(PR_SUCCESS == rv);
275
276 PR_DELETE(desc_in);
277} /* OneOpOneThread */
278
279static void ManyOpOneThread(Shared *shared)
280{
281 PRStatus rv;
282 PRIntn index;
283 PRRecvWait *desc_in;
284 PRRecvWait *desc_out;
285
286 if (verbosity > quiet)
287 PR_fprintf(debug, "%s: adding %d descs\n", shared->title, wait_objects);
288
289 for (index = 0; index < wait_objects; ++index)
290 {
291 desc_in = CreateRecvWait(PR_NewTCPSocket(), shared->timeout);
292
293 rv = PR_AddWaitFileDesc(shared->group, desc_in);
294 MW_ASSERT(PR_SUCCESS == rv);
295 }
296
297 while (ops_done < ops_required)
298 {
299 desc_out = PR_WaitRecvReady(shared->group);
300 MW_ASSERT(PR_MW_TIMEOUT == desc_out->outcome);
301 MW_ASSERT(PR_IO_TIMEOUT_ERROR == PR_GetError());
302 if (verbosity > chatty) PrintRecvDesc(desc_out, "Ready/readding");
303 rv = PR_AddWaitFileDesc(shared->group, desc_out);
304 MW_ASSERT(PR_SUCCESS == rv);
305 (void)PR_AtomicIncrement(&ops_done);
306 }
307
308 CancelGroup(shared);
309} /* ManyOpOneThread */
310
311static void PR_CALLBACK SomeOpsThread(void *arg)
312{
313 PRRecvWait *desc_out;
314 PRStatus rv = PR_SUCCESS;
315 Shared *shared = (Shared*)arg;
316 do /* until interrupted */
317 {
318 desc_out = PR_WaitRecvReady(shared->group);
319 if (NULL == desc_out)
320 {
321 MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError());
322 if (verbosity > quiet) PR_fprintf(debug, "Aborted\n");
323 break;
324 }
325 MW_ASSERT(PR_MW_TIMEOUT == desc_out->outcome);
326 MW_ASSERT(PR_IO_TIMEOUT_ERROR == PR_GetError());
327 if (verbosity > chatty) PrintRecvDesc(desc_out, "Ready");
328
329 if (verbosity > chatty) PrintRecvDesc(desc_out, "Re-Adding");
330 desc_out->timeout = shared->timeout;
331 rv = PR_AddWaitFileDesc(shared->group, desc_out);
332 PR_AtomicIncrement(&ops_done);
333 if (ops_done > ops_required) break;
334 } while (PR_SUCCESS == rv);
335 MW_ASSERT(PR_SUCCESS == rv);
336} /* SomeOpsThread */
337
338static void SomeOpsSomeThreads(Shared *shared)
339{
340 PRStatus rv;
341 PRThread **thread;
342 PRIntn index;
343 PRRecvWait *desc_in;
344
345 thread = (PRThread**)PR_CALLOC(sizeof(PRThread*) * worker_threads);
346
347 /* Create some threads */
348
349 if (verbosity > quiet)
350 PR_fprintf(debug, "%s: creating threads\n", shared->title);
351 for (index = 0; index < worker_threads; ++index)
352 {
353 thread[index] = PR_CreateThread(
354 PR_USER_THREAD, SomeOpsThread, shared,
355 PR_PRIORITY_HIGH, thread_scope,
356 PR_JOINABLE_THREAD, 16 * 1024);
357 }
358
359 /* then create some operations */
360 if (verbosity > quiet)
361 PR_fprintf(debug, "%s: creating desc\n", shared->title);
362 for (index = 0; index < wait_objects; ++index)
363 {
364 desc_in = CreateRecvWait(PR_NewTCPSocket(), shared->timeout);
365 rv = PR_AddWaitFileDesc(shared->group, desc_in);
366 MW_ASSERT(PR_SUCCESS == rv);
367 }
368
369 if (verbosity > quiet)
370 PR_fprintf(debug, "%s: sleeping\n", shared->title);
371 while (ops_done < ops_required) PR_Sleep(shared->timeout);
372
373 if (verbosity > quiet)
374 PR_fprintf(debug, "%s: interrupting/joining threads\n", shared->title);
375 for (index = 0; index < worker_threads; ++index)
376 {
377 rv = PR_Interrupt(thread[index]);
378 MW_ASSERT(PR_SUCCESS == rv);
379 rv = PR_JoinThread(thread[index]);
380 MW_ASSERT(PR_SUCCESS == rv);
381 }
382 PR_DELETE(thread);
383
384 CancelGroup(shared);
385} /* SomeOpsSomeThreads */
386
387static PRStatus ServiceRequest(Shared *shared, PRRecvWait *desc)
388{
389 PRInt32 bytes_out;
390
391 if (verbosity > chatty)
392 PR_fprintf(
393 debug, "%s: Service received %d bytes\n",
394 shared->title, desc->bytesRecv);
395
396 if (0 == desc->bytesRecv) goto quitting;
397 if ((-1 == desc->bytesRecv)
398 && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) goto aborted;
399
400 bytes_out = PR_Send(
401 desc->fd, desc->buffer.start, desc->bytesRecv, 0, shared->timeout);
402 if (verbosity > chatty)
403 PR_fprintf(
404 debug, "%s: Service sent %d bytes\n",
405 shared->title, bytes_out);
406
407 if ((-1 == bytes_out)
408 && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) goto aborted;
409 MW_ASSERT(bytes_out == desc->bytesRecv);
410
411 return PR_SUCCESS;
412
413aborted:
414quitting:
415 return PR_FAILURE;
416} /* ServiceRequest */
417
418static void PR_CALLBACK ServiceThread(void *arg)
419{
420 PRStatus rv = PR_SUCCESS;
421 PRRecvWait *desc_out = NULL;
422 Shared *shared = (Shared*)arg;
423 do /* until interrupted */
424 {
425 if (NULL != desc_out)
426 {
427 desc_out->timeout = PR_INTERVAL_NO_TIMEOUT;
428 if (verbosity > chatty)
429 PrintRecvDesc(desc_out, "Service re-adding");
430 rv = PR_AddWaitFileDesc(shared->group, desc_out);
431 MW_ASSERT(PR_SUCCESS == rv);
432 }
433
434 desc_out = PR_WaitRecvReady(shared->group);
435 if (NULL == desc_out)
436 {
437 MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError());
438 break;
439 }
440
441 switch (desc_out->outcome)
442 {
443 case PR_MW_SUCCESS:
444 {
445 PR_AtomicIncrement(&ops_done);
446 if (verbosity > chatty)
447 PrintRecvDesc(desc_out, "Service ready");
448 rv = ServiceRequest(shared, desc_out);
449 break;
450 }
451 case PR_MW_INTERRUPT:
452 MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError());
453 rv = PR_FAILURE; /* if interrupted, then exit */
454 break;
455 case PR_MW_TIMEOUT:
456 MW_ASSERT(PR_IO_TIMEOUT_ERROR == PR_GetError());
457 case PR_MW_FAILURE:
458 if (verbosity > silent)
459 PL_FPrintError(debug, "RecvReady failure");
460 break;
461 default:
462 break;
463 }
464 } while (PR_SUCCESS == rv);
465
466 if (NULL != desc_out) DestroyRecvWait(desc_out);
467
468} /* ServiceThread */
469
470static void PR_CALLBACK EnumerationThread(void *arg)
471{
472 PRStatus rv;
473 PRIntn count;
474 PRRecvWait *desc;
475 Shared *shared = (Shared*)arg;
476 PRIntervalTime five_seconds = PR_SecondsToInterval(5);
477 PRMWaitEnumerator *enumerator = PR_CreateMWaitEnumerator(shared->group);
478 MW_ASSERT(NULL != enumerator);
479
480 while (PR_SUCCESS == PR_Sleep(five_seconds))
481 {
482 count = 0;
483 desc = NULL;
484 while (NULL != (desc = PR_EnumerateWaitGroup(enumerator, desc)))
485 {
486 if (verbosity > chatty) PrintRecvDesc(desc, shared->title);
487 count += 1;
488 }
489 if (verbosity > silent)
490 PR_fprintf(debug,
491 "%s Enumerated %d objects\n", shared->title, count);
492 }
493
494 MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError());
495
496
497 rv = PR_DestroyMWaitEnumerator(enumerator);
498 MW_ASSERT(PR_SUCCESS == rv);
499} /* EnumerationThread */
500
501static void PR_CALLBACK ServerThread(void *arg)
502{
503 PRStatus rv;
504 PRIntn index;
505 PRRecvWait *desc_in;
506 PRThread **worker_thread;
507 Shared *shared = (Shared*)arg;
508 PRFileDesc *listener, *service;
509 PRNetAddr server_address, client_address;
510
511 worker_thread = (PRThread**)PR_CALLOC(sizeof(PRThread*) * worker_threads);
512 if (verbosity > quiet)
513 PR_fprintf(debug, "%s: Server creating worker_threads\n", shared->title);
514 for (index = 0; index < worker_threads; ++index)
515 {
516 worker_thread[index] = PR_CreateThread(
517 PR_USER_THREAD, ServiceThread, shared,
518 PR_PRIORITY_HIGH, thread_scope,
519 PR_JOINABLE_THREAD, 16 * 1024);
520 }
521
522 rv = PR_InitializeNetAddr(PR_IpAddrAny, default_port, &server_address);
523 MW_ASSERT(PR_SUCCESS == rv);
524
525 listener = PR_NewTCPSocket(); MW_ASSERT(NULL != listener);
526 if (verbosity > chatty)
527 PR_fprintf(
528 debug, "%s: Server listener socket @0x%x\n",
529 shared->title, listener);
530 rv = PR_Bind(listener, &server_address); MW_ASSERT(PR_SUCCESS == rv);
531 rv = PR_Listen(listener, 10); MW_ASSERT(PR_SUCCESS == rv);
532 while (ops_done < ops_required)
533 {
534 if (verbosity > quiet)
535 PR_fprintf(debug, "%s: Server accepting connection\n", shared->title);
536 service = PR_Accept(listener, &client_address, PR_INTERVAL_NO_TIMEOUT);
537 if (NULL == service)
538 {
539 if (PR_PENDING_INTERRUPT_ERROR == PR_GetError()) break;
540 PL_PrintError("Accept failed");
541 MW_ASSERT(!"Accept failed");
542 }
543 else
544 {
545 desc_in = CreateRecvWait(service, shared->timeout);
546 desc_in->timeout = PR_INTERVAL_NO_TIMEOUT;
547 if (verbosity > chatty)
548 PrintRecvDesc(desc_in, "Service adding");
549 rv = PR_AddWaitFileDesc(shared->group, desc_in);
550 MW_ASSERT(PR_SUCCESS == rv);
551 }
552 }
553
554 if (verbosity > quiet)
555 PR_fprintf(debug, "%s: Server interrupting worker_threads\n", shared->title);
556 for (index = 0; index < worker_threads; ++index)
557 {
558 rv = PR_Interrupt(worker_thread[index]);
559 MW_ASSERT(PR_SUCCESS == rv);
560 rv = PR_JoinThread(worker_thread[index]);
561 MW_ASSERT(PR_SUCCESS == rv);
562 }
563 PR_DELETE(worker_thread);
564
565 PR_Close(listener);
566
567 CancelGroup(shared);
568
569} /* ServerThread */
570
571static void RealOneGroupIO(Shared *shared)
572{
573 /*
574 ** Create a server that listens for connections and then services
575 ** requests that come in over those connections. The server never
576 ** deletes a connection and assumes a basic RPC model of operation.
577 **
578 ** Use worker_threads threads to service how every many open ports
579 ** there might be.
580 **
581 ** Oh, ya. Almost forget. Create (some) clients as well.
582 */
583 PRStatus rv;
584 PRIntn index;
585 PRThread *server_thread, *enumeration_thread, **client_thread;
586
587 if (verbosity > quiet)
588 PR_fprintf(debug, "%s: creating server_thread\n", shared->title);
589
590 server_thread = PR_CreateThread(
591 PR_USER_THREAD, ServerThread, shared,
592 PR_PRIORITY_HIGH, thread_scope,
593 PR_JOINABLE_THREAD, 16 * 1024);
594
595 if (verbosity > quiet)
596 PR_fprintf(debug, "%s: creating enumeration_thread\n", shared->title);
597
598 enumeration_thread = PR_CreateThread(
599 PR_USER_THREAD, EnumerationThread, shared,
600 PR_PRIORITY_HIGH, thread_scope,
601 PR_JOINABLE_THREAD, 16 * 1024);
602
603 if (verbosity > quiet)
604 PR_fprintf(debug, "%s: snoozing before creating clients\n", shared->title);
605 PR_Sleep(5 * shared->timeout);
606
607 if (verbosity > quiet)
608 PR_fprintf(debug, "%s: creating client_threads\n", shared->title);
609 client_thread = (PRThread**)PR_CALLOC(sizeof(PRThread*) * client_threads);
610 for (index = 0; index < client_threads; ++index)
611 {
612 client_thread[index] = PR_CreateThread(
613 PR_USER_THREAD, ClientThread, shared,
614 PR_PRIORITY_NORMAL, thread_scope,
615 PR_JOINABLE_THREAD, 16 * 1024);
616 }
617
618 while (ops_done < ops_required) PR_Sleep(shared->timeout);
619
620 if (verbosity > quiet)
621 PR_fprintf(debug, "%s: interrupting/joining client_threads\n", shared->title);
622 for (index = 0; index < client_threads; ++index)
623 {
624 rv = PR_Interrupt(client_thread[index]);
625 MW_ASSERT(PR_SUCCESS == rv);
626 rv = PR_JoinThread(client_thread[index]);
627 MW_ASSERT(PR_SUCCESS == rv);
628 }
629 PR_DELETE(client_thread);
630
631 if (verbosity > quiet)
632 PR_fprintf(debug, "%s: interrupting/joining enumeration_thread\n", shared->title);
633 rv = PR_Interrupt(enumeration_thread);
634 MW_ASSERT(PR_SUCCESS == rv);
635 rv = PR_JoinThread(enumeration_thread);
636 MW_ASSERT(PR_SUCCESS == rv);
637
638 if (verbosity > quiet)
639 PR_fprintf(debug, "%s: interrupting/joining server_thread\n", shared->title);
640 rv = PR_Interrupt(server_thread);
641 MW_ASSERT(PR_SUCCESS == rv);
642 rv = PR_JoinThread(server_thread);
643 MW_ASSERT(PR_SUCCESS == rv);
644} /* RealOneGroupIO */
645
646static void RunThisOne(
647 void (*func)(Shared*), const char *name, const char *test_name)
648{
649 Shared *shared;
650 if ((NULL == test_name) || (0 == PL_strcmp(name, test_name)))
651 {
652 if (verbosity > silent)
653 PR_fprintf(debug, "%s()\n", name);
654 shared = MakeShared(name);
655 ops_done = 0;
656 func(shared); /* run the test */
657 MW_ASSERT(0 == desc_allocated);
658 DestroyShared(shared);
659 }
660} /* RunThisOne */
661
662static Verbosity ChangeVerbosity(Verbosity verbosity, PRIntn delta)
663{
664 PRIntn verbage = (PRIntn)verbosity;
665 return (Verbosity)(verbage += delta);
666} /* ChangeVerbosity */
667
668PRIntn main(PRIntn argc, char **argv)
669{
670 PLOptStatus os;
671 const char *test_name = NULL;
672 PLOptState *opt = PL_CreateOptState(argc, argv, "dqGc:o:p:t:w:");
673
674 while (PL_OPT_EOL != (os = PL_GetNextOpt(opt)))
675 {
676 if (PL_OPT_BAD == os) continue;
677 switch (opt->option)
678 {
679 case 0:
680 test_name = opt->value;
681 break;
682 case 'd': /* debug mode */
683 if (verbosity < noisy)
684 verbosity = ChangeVerbosity(verbosity, 1);
685 break;
686 case 'q': /* debug mode */
687 if (verbosity > silent)
688 verbosity = ChangeVerbosity(verbosity, -1);
689 break;
690 case 'G': /* use global threads */
691 thread_scope = PR_GLOBAL_THREAD;
692 break;
693 case 'c': /* number of client threads */
694 client_threads = atoi(opt->value);
695 break;
696 case 'o': /* operations to compelete */
697 ops_required = atoi(opt->value);
698 break;
699 case 'p': /* default port */
700 default_port = atoi(opt->value);
701 break;
702 case 't': /* number of threads waiting */
703 worker_threads = atoi(opt->value);
704 break;
705 case 'w': /* number of wait objects */
706 wait_objects = atoi(opt->value);
707 break;
708 default:
709 break;
710 }
711 }
712 PL_DestroyOptState(opt);
713
714 if (verbosity > 0)
715 debug = PR_GetSpecialFD(PR_StandardError);
716
717 RunThisOne(OneInThenCancelled, "OneInThenCancelled", test_name);
718 RunThisOne(OneOpOneThread, "OneOpOneThread", test_name);
719 RunThisOne(ManyOpOneThread, "ManyOpOneThread", test_name);
720 RunThisOne(SomeOpsSomeThreads, "SomeOpsSomeThreads", test_name);
721 RunThisOne(RealOneGroupIO, "RealOneGroupIO", test_name);
722 return 0;
723} /* main */
724
725/* multwait.c */
注意: 瀏覽 TracBrowser 來幫助您使用儲存庫瀏覽器

© 2024 Oracle Support Privacy / Do Not Sell My Info Terms of Use Trademark Policy Automated Access Etiquette