XRootD
Loading...
Searching...
No Matches
XrdScheduler Class Reference

#include <XrdScheduler.hh>

+ Inheritance diagram for XrdScheduler:
+ Collaboration diagram for XrdScheduler:

Public Member Functions

 XrdScheduler (int minw=3, int maxw=128, int maxi=12)
 
 XrdScheduler (XrdSysError *eP, XrdOucTrace *tP, int minw=8, int maxw=8192, int maxi=780)
 
 XrdScheduler (XrdSysError *eP, XrdSysTrace *tP, int minw=8, int maxw=8192, int maxi=780)
 
 ~XrdScheduler ()
 
int Active ()
 
void Cancel (XrdJob *jp)
 
int canStick ()
 
void DoIt ()
 
pid_t Fork (const char *id)
 
void * Reaper ()
 
void Run ()
 
void Schedule (int num, XrdJob *jfirst, XrdJob *jlast)
 
void Schedule (XrdJob *jp)
 
void Schedule (XrdJob *jp, time_t atime)
 
void setParms (int minw, int maxw, int avlt, int maxi, int once=0)
 
void Start ()
 
int Stats (char *buff, int blen, int do_sync=0)
 
void TimeSched ()
 
- Public Member Functions inherited from XrdJob
 XrdJob (const char *desc="")
 
virtual ~XrdJob ()
 

Public Attributes

int max_QLength
 
int num_Jobs
 
int num_Limited
 
int num_TCreate
 
int num_TDestroy
 
- Public Attributes inherited from XrdJob
const char * Comment
 
XrdJobNextJob
 

Detailed Description

Definition at line 45 of file XrdScheduler.hh.

Constructor & Destructor Documentation

◆ XrdScheduler() [1/3]

XrdScheduler::XrdScheduler ( XrdSysError * eP,
XrdSysTrace * tP,
int minw = 8,
int maxw = 8192,
int maxi = 780 )

Definition at line 97 of file XrdScheduler.cc.

99 : XrdJob("underused thread monitor"),
100 XrdTraceOld(0), WorkAvail(0, "sched work")
101{
102 Boot(eP, tP, minw, maxw, maxi);
103}
XrdJob(const char *desc="")
Definition XrdJob.hh:51

References XrdJob::XrdJob().

+ Here is the call graph for this function:

◆ XrdScheduler() [2/3]

XrdScheduler::XrdScheduler ( XrdSysError * eP,
XrdOucTrace * tP,
int minw = 8,
int maxw = 8192,
int maxi = 780 )

Definition at line 108 of file XrdScheduler.cc.

110 : XrdJob("underused thread monitor"),
111 XrdTraceOld(tP), WorkAvail(0, "sched work")
112{
113
114// Invoke the main initialization function with a new style trace object
115//
116 Boot(eP, new XrdSysTrace("Xrd", eP->logger()), minw, maxw, maxi);
117}
XrdSysLogger * logger(XrdSysLogger *lp=0)

References XrdJob::XrdJob(), and XrdSysError::logger().

+ Here is the call graph for this function:

◆ XrdScheduler() [3/3]

XrdScheduler::XrdScheduler ( int minw = 3,
int maxw = 128,
int maxi = 12 )

Definition at line 123 of file XrdScheduler.cc.

124 : XrdJob("underused thread monitor"),
125 XrdTraceOld(0), WorkAvail(0, "sched work")
126{
127 XrdSysLogger *Logger;
128 int eFD;
129
130// Get a file descriptor mirroring standard error
131//
132#if ( defined(__linux__) || defined(__GNU__) ) && defined(F_DUPFD_CLOEXEC)
133 eFD = fcntl(STDERR_FILENO, F_DUPFD_CLOEXEC, 0);
134#else
135 eFD = dup(STDERR_FILENO);
136 fcntl(eFD, F_SETFD, FD_CLOEXEC);
137#endif
138
139// Now we need to get a logger object. We make this a real dumb one.
140//
141 Logger = new XrdSysLogger(eFD, 0);
142 XrdLog = new XrdSysError(Logger);
143
144// Now get a trace object
145//
146 XrdTrace = new XrdSysTrace("Xrd", Logger);
147 if (getenv("XRDDEBUG") != 0) XrdTrace->What = TRACE_SCHED;
148
149// Set remaining values. We do no use maximum possible threads here.
150//
151 Init(minw, maxw, maxi);
152}
static XrdSysLogger Logger
int fcntl(int fd, int cmd,...)
#define TRACE_SCHED
Definition XrdTrace.hh:42

References XrdJob::XrdJob(), fcntl(), Logger, and TRACE_SCHED.

+ Here is the call graph for this function:

◆ ~XrdScheduler()

XrdScheduler::~XrdScheduler ( )

Definition at line 218 of file XrdScheduler.cc.

219{
220}

Member Function Documentation

◆ Active()

int XrdScheduler::Active ( )
inline

Definition at line 49 of file XrdScheduler.hh.

49{return num_Workers - idl_Workers + num_JobsinQ;}

◆ Cancel()

void XrdScheduler::Cancel ( XrdJob * jp)

Definition at line 226 of file XrdScheduler.cc.

227{
228 XrdJob *p, *pp = 0;
229
230// Lock the queue
231//
232 TimerMutex.Lock();
233
234// Find the matching job, if any
235//
236 p = TimerQueue;
237 while(p && p != jp) {pp = p; p = p->NextJob;}
238
239// Delete the job element
240//
241 if (p)
242 {if (pp) pp->NextJob = p->NextJob;
243 else TimerQueue = p->NextJob;
244 TRACE(SCHED, "time event " <<jp->Comment <<" cancelled");
245 }
246
247// All done
248//
249 TimerMutex.UnLock();
250}
#define TRACE(act, x)
Definition XrdTrace.hh:63
XrdJob * NextJob
Definition XrdJob.hh:46
const char * Comment
Definition XrdJob.hh:47

References XrdJob::XrdJob(), XrdJob::Comment, XrdJob::NextJob, and TRACE.

Referenced by Schedule(), and setParms().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ canStick()

int XrdScheduler::canStick ( )
inline

Definition at line 53 of file XrdScheduler.hh.

53 {return num_Workers < stk_Workers
54 || (num_Workers-idl_Workers) < stk_Workers;}

◆ DoIt()

void XrdScheduler::DoIt ( )
virtual

Implements XrdJob.

Definition at line 256 of file XrdScheduler.cc.

257{
258 int num_kill, num_idle;
259
260// Now check if there are too many idle threads (kill them if there are)
261//
262 if (!num_JobsinQ)
263 {DispatchMutex.Lock(); num_idle = idl_Workers; DispatchMutex.UnLock();
264 num_kill = num_idle - min_Workers;
265 TRACE(SCHED, num_Workers <<" threads; " <<num_idle <<" idle");
266 if (num_kill > 0)
267 {if (num_kill > 1) num_kill = num_kill/2;
268 SchedMutex.Lock();
269 num_Layoffs = num_kill;
270 while(num_kill--) WorkAvail.Post();
271 SchedMutex.UnLock();
272 }
273 }
274
275// Check if we should reschedule ourselves
276//
277 if (max_Workidl > 0) Schedule((XrdJob *)this, max_Workidl+time(0));
278}
void Schedule(XrdJob *jp)

References XrdJob::XrdJob(), Schedule(), and TRACE.

+ Here is the call graph for this function:

◆ Fork()

pid_t XrdScheduler::Fork ( const char * id)

Definition at line 286 of file XrdScheduler.cc.

287{
288 static int retc, ReaperStarted = 0;
289 pthread_t tid;
290 pid_t pid;
291
292// Fork
293//
294 if ((pid = fork()) < 0)
295 {XrdLog->Emsg("Scheduler",errno,"fork to handle",id);
296 return pid;
297 }
298 if (!pid) return pid;
299
300// Obtain the status of the reaper thread.
301//
302 ReaperMutex.Lock();
303 firstPID = new XrdSchedulerPID(pid, firstPID);
304 retc = ReaperStarted;
305 ReaperStarted = 1;
306 ReaperMutex.UnLock();
307
308// Start the reaper thread if it has not started.
309//
310 if (!retc)
311 if ((retc = XrdSysThread::Run(&tid, XrdStartReaper, (void *)this,
312 0, "Process reaper")))
313 {XrdLog->Emsg("Scheduler", retc, "create reaper thread");
314 ReaperStarted = 0;
315 }
316
317 return pid;
318}
void * XrdStartReaper(void *carg)
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)

References XrdSysThread::Run(), and XrdStartReaper().

+ Here is the call graph for this function:

◆ Reaper()

void * XrdScheduler::Reaper ( )

Definition at line 324 of file XrdScheduler.cc.

325{
326 int status;
327 pid_t pid;
328 XrdSchedulerPID *tp, *ptp, *xtp;
329#if defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_5)
330 struct timespec ts = { 1, 0 };
331#else
332 sigset_t Sset;
333 int signum;
334
335// Set up for signal handling. Note: main() must block this signal at start)
336//
337 sigemptyset(&Sset);
338 sigaddset(&Sset, SIGCHLD);
339#endif
340
341// Wait for all outstanding children
342//
343 do {ReaperMutex.Lock();
344 tp = firstPID; ptp = 0;
345 while(tp)
346 {do {pid = waitpid(tp->pid, &status, WNOHANG);}
347 while (pid < 0 && errno == EINTR);
348 if (pid > 0)
349 {if (TRACING(TRACE_SCHED)) traceExit(pid, status);
350 xtp = tp; tp = tp->next;
351 if (ptp) ptp->next = tp;
352 else firstPID = tp;
353 delete xtp;
354 } else {ptp = tp; tp = tp->next;}
355 }
356 ReaperMutex.UnLock();
357#if defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_5)
358 // Mac OS X sigwait() is broken on <= 10.4.
359 } while (nanosleep(&ts, 0) <= 0);
360#else
361 } while(sigwait(&Sset, &signum) >= 0);
362#endif
363 return (void *)0;
364}
#define TRACING(x)
Definition XrdTrace.hh:70
XrdSchedulerPID * next

References XrdSchedulerPID::next, XrdSchedulerPID::pid, TRACE_SCHED, and TRACING.

Referenced by XrdStartReaper().

+ Here is the caller graph for this function:

◆ Run()

void XrdScheduler::Run ( )

Definition at line 370 of file XrdScheduler.cc.

371{
372 int waiting;
373 XrdJob *jp;
374
375// Wait for work then do it (an endless task for a worker thread)
376//
377 do {do {DispatchMutex.Lock(); idl_Workers++;DispatchMutex.UnLock();
378 WorkAvail.Wait();
379 DispatchMutex.Lock();waiting = --idl_Workers;DispatchMutex.UnLock();
380 SchedMutex.Lock();
381 if ((jp = WorkFirst))
382 {if (!(WorkFirst = jp->NextJob)) WorkLast = 0;
383 if (num_JobsinQ) num_JobsinQ--;
384 else XrdLog->Emsg("Scheduler","Job queue count underflow!");
385 } else {
386 num_JobsinQ = 0;
387 if (num_Layoffs > 0)
388 {num_Layoffs--;
389 if (waiting)
390 {num_TDestroy++; num_Workers--;
391 TRACE(SCHED, "terminating thread; workers=" <<num_Workers);
392 SchedMutex.UnLock();
393 return;
394 }
395 }
396 }
397 SchedMutex.UnLock();
398 } while(!jp);
399
400 // Check if we should hire a new worker (we always want 1 idle thread)
401 // before running this job.
402 //
403 if (!waiting) hireWorker();
404 if (TRACING(TRACE_SCHED) && *(jp->Comment) != '.')
405 {TRACE(SCHED, "running " <<jp->Comment <<" inq=" <<num_JobsinQ);}
406 jp->DoIt();
407 } while(1);
408}
virtual void DoIt()=0

References XrdJob::XrdJob(), XrdJob::Comment, XrdJob::DoIt(), XrdJob::NextJob, num_TDestroy, TRACE, TRACE_SCHED, and TRACING.

Referenced by XrdStartWorking().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Schedule() [1/3]

void XrdScheduler::Schedule ( int num,
XrdJob * jfirst,
XrdJob * jlast )

Definition at line 445 of file XrdScheduler.cc.

446{
447
448// Lock down our data area
449//
450 SchedMutex.Lock();
451
452// Place the request list on the queue
453//
454 jlast->NextJob = 0;
455 if (WorkFirst)
456 {WorkLast->NextJob = jfirst;
457 WorkLast = jlast;
458 } else {
459 WorkFirst = jfirst;
460 WorkLast = jlast;
461 }
462
463// Calculate statistics
464//
465 num_Jobs += numjobs;
466 num_JobsinQ += numjobs;
467 if (num_JobsinQ > max_QLength) max_QLength = num_JobsinQ;
468
469// Indicate number of jobs to work on
470//
471 while(numjobs--) WorkAvail.Post();
472
473// Unlock the data area and return
474//
475 SchedMutex.UnLock();
476}

References XrdJob::XrdJob(), max_QLength, XrdJob::NextJob, and num_Jobs.

+ Here is the call graph for this function:

◆ Schedule() [2/3]

void XrdScheduler::Schedule ( XrdJob * jp)

Definition at line 414 of file XrdScheduler.cc.

415{
416// Lock down our data area
417//
418 SchedMutex.Lock();
419
420// Place the request on the queue and broadcast it
421//
422 jp->NextJob = 0;
423 if (WorkFirst)
424 {WorkLast->NextJob = jp;
425 WorkLast = jp;
426 } else {
427 WorkFirst = jp;
428 WorkLast = jp;
429 }
430 WorkAvail.Post();
431
432// Calculate statistics
433//
434 num_Jobs++;
435 num_JobsinQ++;
436 if (num_JobsinQ > max_QLength) max_QLength = num_JobsinQ;
437
438// Unlock the data area and return
439//
440 SchedMutex.UnLock();
441}

References XrdJob::XrdJob(), max_QLength, XrdJob::NextJob, and num_Jobs.

Referenced by XrdXrootdJob::XrdXrootdJob(), XrdXrootdPrepare::XrdXrootdPrepare(), XrdPollE::Disable(), XrdPollPoll::Disable(), DoIt(), XrdXrootdCallBack::Done(), mainAccept(), mainAdmin(), setParms(), XrdPollE::Start(), XrdPollPoll::Start(), Start(), and TimeSched().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Schedule() [3/3]

void XrdScheduler::Schedule ( XrdJob * jp,
time_t atime )

Definition at line 480 of file XrdScheduler.cc.

481{
482 XrdJob *pp = 0, *p;
483
484// Cancel this event, if scheduled
485//
486 Cancel(jp);
487
488// Lock the queue
489//
490 if (TRACING(TRACE_SCHED) && *(jp->Comment) != '.')
491 {TRACE(SCHED, "scheduling " <<jp->Comment <<" in " <<atime-time(0) <<" seconds");}
492 jp->SchedTime = atime;
493 TimerMutex.Lock();
494
495// Find the insertion point for the work element
496//
497 p = TimerQueue;
498 while(p && p->SchedTime <= atime) {pp = p; p = p->NextJob;}
499
500// Insert the job element
501//
502 jp->NextJob = p;
503 if (pp) pp->NextJob = jp;
504 else {TimerQueue = jp; TimerRings.Signal();}
505
506// All done
507//
508 TimerMutex.UnLock();
509}
void Cancel(XrdJob *jp)

References XrdJob::XrdJob(), Cancel(), XrdJob::Comment, XrdJob::NextJob, TRACE, TRACE_SCHED, and TRACING.

+ Here is the call graph for this function:

◆ setParms()

void XrdScheduler::setParms ( int minw,
int maxw,
int avlt,
int maxi,
int once = 0 )

Definition at line 515 of file XrdScheduler.cc.

516{
517 static int isSet = 0;
518
519// Lock the data area and check for 1-time set
520//
521 SchedMutex.Lock();
522 if (once && isSet) {SchedMutex.UnLock(); return;}
523 isSet = 1;
524
525// get a consistent view of all the values
526//
527 if (maxw <= 0) maxw = max_Workers;
528 if (minw < 0) minw = min_Workers;
529 if (minw > maxw) minw = maxw;
530 if (avlw < 0) avlw = maxw/4*3;
531 else if (avlw > maxw) avlw = maxw;
532
533// Set the values
534//
535 min_Workers = minw;
536 max_Workers = maxw;
537 stk_Workers = maxw - avlw;
538 if (maxi >=0) max_Workidl = maxi;
539
540// Unlock the data area
541//
542 SchedMutex.UnLock();
543
544// If we have an idle interval, schedule the idle check
545//
546 if (maxi > 0)
547 {Cancel((XrdJob *)this);
548 Schedule((XrdJob *)this, (time_t)maxi+time(0));
549 }
550
551// Debug the info
552//
553 TRACE(SCHED,"Set min_Workers=" <<min_Workers <<" max_Workers=" <<max_Workers);
554 TRACE(SCHED,"Set stk_Workers=" <<stk_Workers <<" max_Workidl=" <<max_Workidl);
555}

References XrdJob::XrdJob(), Cancel(), Schedule(), and TRACE.

+ Here is the call graph for this function:

◆ Start()

void XrdScheduler::Start ( )

Definition at line 561 of file XrdScheduler.cc.

562{
563 int retc, numw;
564 pthread_t tid;
565
566// Provide ABI compatibility for XrdOucTrace which is deprecated!
567//
568 if (getenv("XRDDEBUG") != 0) XrdTrace->What = TRACE_SCHED;
569 else if (XrdTraceOld) XrdTrace->What |= XrdTraceOld->What;
570
571// Start a time based scheduler
572//
573 if ((retc = XrdSysThread::Run(&tid, XrdStartTSched, (void *)this,
574 XRDSYSTHREAD_BIND, "Time scheduler")))
575 XrdLog->Emsg("Scheduler", retc, "create time scheduler thread");
576
577// If we an idle interval, schedule the idle check
578//
579 if (max_Workidl > 0) Schedule((XrdJob *)this, (time_t)max_Workidl+time(0));
580
581// Start 1/3 of the minimum number of threads
582//
583 if (!(numw = min_Workers/3)) numw = 2;
584 while(numw--) hireWorker(0);
585
586// Unlock the data area
587//
588 TRACE(SCHED, "Starting with " <<num_Workers <<" workers" );
589}
void * XrdStartTSched(void *carg)
#define XRDSYSTHREAD_BIND

References XrdJob::XrdJob(), XrdSysThread::Run(), Schedule(), TRACE, TRACE_SCHED, XrdStartTSched(), and XRDSYSTHREAD_BIND.

+ Here is the call graph for this function:

◆ Stats()

int XrdScheduler::Stats ( char * buff,
int blen,
int do_sync = 0 )

Definition at line 595 of file XrdScheduler.cc.

596{
597 int cnt_Jobs, cnt_JobsinQ, xam_QLength, cnt_Workers, cnt_idl;
598 int cnt_TCreate, cnt_TDestroy, cnt_Limited;
599 static char statfmt[] = "<stats id=\"sched\"><jobs>%d</jobs>"
600 "<inq>%d</inq><maxinq>%d</maxinq>"
601 "<threads>%d</threads><idle>%d</idle>"
602 "<tcr>%d</tcr><tde>%d</tde>"
603 "<tlimr>%d</tlimr></stats>";
604
605// If only length wanted, do so
606//
607 if (!buff) return sizeof(statfmt) + 16*8;
608
609// Get values protected by the Dispatch lock (avoid lock if no sync needed)
610//
611 if (do_sync) DispatchMutex.Lock();
612 cnt_idl = idl_Workers;
613 if (do_sync) DispatchMutex.UnLock();
614
615// Get values protected by the Scheduler lock (avoid lock if no sync needed)
616//
617 if (do_sync) SchedMutex.Lock();
618 cnt_Workers = num_Workers;
619 cnt_Jobs = num_Jobs;
620 cnt_JobsinQ = num_JobsinQ;
621 xam_QLength = max_QLength;
622 cnt_TCreate = num_TCreate;
623 cnt_TDestroy= num_TDestroy;
624 cnt_Limited = num_Limited;
625 if (do_sync) SchedMutex.UnLock();
626
627// Format the stats and return them
628//
629 return snprintf(buff, blen, statfmt, cnt_Jobs, cnt_JobsinQ, xam_QLength,
630 cnt_Workers, cnt_idl, cnt_TCreate, cnt_TDestroy,
631 cnt_Limited);
632}

References max_QLength, num_Jobs, num_Limited, num_TCreate, and num_TDestroy.

◆ TimeSched()

void XrdScheduler::TimeSched ( )

Definition at line 638 of file XrdScheduler.cc.

639{
640 XrdJob *jp;
641 int wtime;
642
643// Continuous loop until we find some work here
644//
645 do {TimerMutex.Lock();
646 if (TimerQueue) wtime = TimerQueue->SchedTime-time(0);
647 else wtime = 60*60;
648 if (wtime > 0)
649 {TimerMutex.UnLock();
650 TimerRings.Wait(wtime);
651 } else {
652 jp = TimerQueue;
653 TimerQueue = jp->NextJob;
654 Schedule(jp);
655 TimerMutex.UnLock();
656 }
657 } while(1);
658}

References XrdJob::XrdJob(), XrdJob::NextJob, and Schedule().

Referenced by XrdStartTSched().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

Member Data Documentation

◆ max_QLength

int XrdScheduler::max_QLength

Definition at line 81 of file XrdScheduler.hh.

Referenced by Schedule(), Schedule(), and Stats().

◆ num_Jobs

int XrdScheduler::num_Jobs

Definition at line 80 of file XrdScheduler.hh.

Referenced by Schedule(), Schedule(), and Stats().

◆ num_Limited

int XrdScheduler::num_Limited

Definition at line 82 of file XrdScheduler.hh.

Referenced by Stats().

◆ num_TCreate

int XrdScheduler::num_TCreate

Definition at line 78 of file XrdScheduler.hh.

Referenced by Stats().

◆ num_TDestroy

int XrdScheduler::num_TDestroy

Definition at line 79 of file XrdScheduler.hh.

Referenced by Run(), and Stats().


The documentation for this class was generated from the following files: