XRootD
Loading...
Searching...
No Matches
XrdCmsPrepare.cc
Go to the documentation of this file.
1/******************************************************************************/
2/* */
3/* X r d C m s P r e p a r e . c c */
4/* */
5/* (c) 2007 by the Board of Trustees of the Leland Stanford, Jr., University */
6/* All Rights Reserved */
7/* Produced by Andrew Hanushevsky for Stanford University under contract */
8/* DE-AC02-76-SFO0515 with the Department of Energy */
9/* */
10/* This file is part of the XRootD software suite. */
11/* */
12/* XRootD is free software: you can redistribute it and/or modify it under */
13/* the terms of the GNU Lesser General Public License as published by the */
14/* Free Software Foundation, either version 3 of the License, or (at your */
15/* option) any later version. */
16/* */
17/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20/* License for more details. */
21/* */
22/* You should have received a copy of the GNU Lesser General Public License */
23/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24/* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25/* */
26/* The copyright holder's institutional names and contributor's names may not */
27/* be used to endorse or promote products derived from this software without */
28/* specific prior written permission of the institution or contributor. */
29/******************************************************************************/
30
31#include <fcntl.h>
32#include <cstdlib>
33#include <unistd.h>
34#include <sys/types.h>
35#include <sys/stat.h>
36
39#include "XrdCms/XrdCmsTrace.hh"
40#include "XrdFrc/XrdFrcProxy.hh"
41#include "XrdNet/XrdNetMsg.hh"
42#include "XrdOss/XrdOss.hh"
43#include "XrdOuc/XrdOucEnv.hh"
44#include "XrdOuc/XrdOucMsubs.hh"
45#include "XrdOuc/XrdOucTList.hh"
46#include "XrdSys/XrdSysError.hh"
47
48using namespace XrdCms;
49
50/******************************************************************************/
51/* S t a t i c O b j e c t s */
52/******************************************************************************/
53
55
56/******************************************************************************/
57/* G l o b a l s & E x t e r n a l F u n c t i o n s */
58/******************************************************************************/
59
60// This function is applied to all prepare queue entries. It checks if the file
61// in online and if so, returns a -1 to delete the entry from the queue. O/W
62// it returns a zero which keeps the entry in the queue. The key is the LFN.
63//
64int XrdCmsScrubScan(const char *key, char *cip, void *xargp)
65{
66 struct stat buf;
67
68// Use oss interface to determine whether the file exists or not
69//
70 return (Config.ossFS->Stat(key, &buf, XRDOSS_resonly) ? 0 : -1);
71}
72
73/******************************************************************************/
74/* C o n s t r u c t o r */
75/******************************************************************************/
76
77XrdCmsPrepare::XrdCmsPrepare() : XrdJob("Prep cache scrubber"),
78 prepSched(&Say)
79{prepif = 0;
80 preppid = 0;
81 resetcnt = scrub2rst = 3;
82 scrubtime= 20*60;
83 NumFiles = 0;
84 lastemsg = time(0);
85 Relay = 0; // This will be initialized via Init()!
86 PrepFrm = 0;
87 prepOK = 0;
88 N2N = 0;
89}
90
91/******************************************************************************/
92/* A d d */
93/******************************************************************************/
94
96{
97 char *pdata[XrdOucMsubs::maxElem+2], prtybuff[8], *pP=prtybuff;
98 int rc, pdlen[XrdOucMsubs::maxElem + 2];
99
100// Check if we are using the built-in mechanism
101//
102 if (PrepFrm)
103 {rc = PrepFrm->Add('+',pargs.path, pargs.opaque,pargs.Ident,pargs.reqid,
104 pargs.notify,pargs.mode,atoi(pargs.prty));
105 if (rc) Say.Emsg("Add", rc, "prepare", pargs.path);
106 else {PTMutex.Lock();
107 if (!PTable.Add(pargs.path, 0, 0, Hash_data_is_key)) NumFiles++;
108 PTMutex.UnLock();
109 }
110 return rc == 0;
111 }
112
113// Restart the scheduler if need be
114//
115 PTMutex.Lock();
116 if (!prepif || !prepSched.isAlive())
117 {Say.Emsg("Add","No prepare manager; prepare",pargs.reqid,"ignored.");
118 PTMutex.UnLock();
119 return 0;
120 }
121
122// Write out the header line
123//
124 if (!prepMsg)
125 {*pP++ = pargs.prty[0]; *pP = '\0';
126 pdata[0] = (char *)"+ "; pdlen[0] = 2;
127 pdata[1] = pargs.reqid; pdlen[1] = strlen(pargs.reqid);
128 pdata[2] = (char *)" "; pdlen[2] = 1;
129 pdata[3] = pargs.notify; pdlen[3] = strlen(pargs.notify);
130 pdata[4] = (char *)" "; pdlen[4] = 1;
131 pdata[5] = prtybuff; pdlen[5] = strlen(prtybuff);
132 pdata[6] = (char *)" "; pdlen[6] = 1;
133 pdata[7] = pargs.mode; pdlen[7] = strlen(pargs.mode);
134 pdata[8] = (char *)" "; pdlen[8] = 1;
135 pdata[9] = pargs.path; pdlen[9] = strlen(pargs.path);
136 pdata[10] = (char *)"\n"; pdlen[10] = 1;
137 pdata[11]= 0; pdlen[11]= 0;
138 if (!(rc = prepSched.Put((const char **)pdata, (const int *)pdlen)))
139 if (!PTable.Add(pargs.path, 0, 0, Hash_data_is_key)) NumFiles++;
140 } else {
141 int Oflag = (index(pargs.mode, (int)'w') ? O_RDWR : 0);
142 mode_t Prty = atoi(pargs.prty);
143 XrdOucEnv Env(pargs.opaque);
144 XrdOucMsubsInfo Info(pargs.Ident, &Env, N2N, pargs.path,
145 pargs.notify, Prty, Oflag, pargs.mode, pargs.reqid);
146 int k = prepMsg->Subs(Info, pdata, pdlen);
147 pdata[k] = (char *)"\n"; pdlen[k++] = 1;
148 pdata[k] = 0; pdlen[k] = 0;
149 if (!(rc = prepSched.Put((const char **)pdata, (const int *)pdlen)))
150 if (!PTable.Add(pargs.path, 0, 0, Hash_data_is_key)) NumFiles++;
151 }
152
153// All done
154//
155 PTMutex.UnLock();
156 return rc == 0;
157}
158
159/******************************************************************************/
160/* D e l */
161/******************************************************************************/
162
163int XrdCmsPrepare::Del(char *reqid)
164{
165 char *pdata[4];
166 int rc, pdlen[4];
167
168// Use our built-in mechanism if so wanted
169//
170 if (PrepFrm)
171 {if ((rc = PrepFrm->Del('-', reqid)))
172 Say.Emsg("Del", rc, "unprepare", reqid);
173 return rc == 0;
174 }
175
176// Restart the scheduler if need be
177//
178 PTMutex.Lock();
179 if (!prepif || !prepSched.isAlive())
180 {Say.Emsg("Del","No prepare manager; unprepare",reqid,"ignored.");
181 PTMutex.UnLock();
182 return 0;
183 }
184
185// Write out the delete request
186//
187 pdata[0] = (char *)"- ";
188 pdlen[0] = 2;
189 pdata[1] = reqid;
190 pdlen[1] = strlen(reqid);
191 pdata[2] = (char *)"\n";
192 pdlen[2] = 1;
193 pdata[3] = (char *)0;
194 pdlen[3] = 0;
195 rc = prepSched.Put((const char **)pdata, (const int *)pdlen);
196 PTMutex.UnLock();
197 return rc == 0;
198}
199
200/******************************************************************************/
201/* D o I t */
202/******************************************************************************/
203
205{
206// Simply scrub the cache
207//
208 Scrub();
209 Sched->Schedule((XrdJob *)this,scrubtime+time(0));
210}
211
212/******************************************************************************/
213/* E x i s t s */
214/******************************************************************************/
215
217{
218 int Found;
219
220// Lock the hash table
221//
222 PTMutex.Lock();
223
224// Look up the entry
225//
226 Found = (NumFiles ? PTable.Find(path) != 0 : 0);
227
228// All done
229//
230 PTMutex.UnLock();
231 return Found;
232}
233
234/******************************************************************************/
235/* G o n e */
236/******************************************************************************/
237
238void XrdCmsPrepare::Gone(char *path)
239{
240
241// Lock the hash table
242//
243 PTMutex.Lock();
244
245// Delete the entry
246//
247 if (NumFiles > 0 && PTable.Del(path) == 0) NumFiles--;
248
249// All done
250//
251 PTMutex.UnLock();
252}
253
254/******************************************************************************/
255/* I n f o r m */
256/******************************************************************************/
257
258void XrdCmsPrepare::Inform(const char *cmd, XrdCmsPrepArgs *pargs)
259{
260 EPNAME("Inform")
261 struct iovec Msg[8];
262 char *mdest, *minfo;
263
264// See if requestor wants a response
265//
266 if (!index(pargs->mode, (int)'n')
267 || strncmp("udp://", pargs->notify, 6)
268 || !Relay)
269 {DEBUG(pargs->Ident <<' ' <<cmd <<' ' <<pargs->reqid <<" not sent to "
270 <<pargs->notify);
271 return;
272 }
273
274// Extract out destination and argument
275//
276 mdest = pargs->notify+6;
277 if ((minfo = index(mdest, (int)'/')))
278 {*minfo = '\0'; minfo++;}
279 if (!minfo || !*minfo) minfo = (char *)"*";
280 DEBUG("Sending " <<mdest <<": " <<cmd <<' '<<pargs->reqid <<' ' <<minfo);
281
282// Create message to be sent
283//
284 Msg[0].iov_base = (char *)cmd; Msg[0].iov_len = strlen(cmd);
285 Msg[1].iov_base = (char *)" "; Msg[1].iov_len = 1;
286 Msg[2].iov_base = pargs->reqid; Msg[2].iov_len = strlen(pargs->reqid);
287 Msg[3].iov_base = (char *)" "; Msg[3].iov_len = 1;
288 Msg[4].iov_base = minfo; Msg[4].iov_len = strlen(minfo);
289 Msg[5].iov_base = (char *)" "; Msg[5].iov_len = 1;
290 Msg[6].iov_base = pargs->path; Msg[6].iov_len = (pargs->pathlen)-1;
291 Msg[7].iov_base = (char *)"\n"; Msg[7].iov_len = 1;
292
293// Send the message and return
294//
295 Relay->Send(Msg, 8, mdest);
296}
297
298/******************************************************************************/
299/* I n i t */
300/******************************************************************************/
301
303{
304// Obtain a msg object. We need to do it outside of a global constructor!
305//
306 Relay = new XrdNetMsg(&Say);
307}
308
309/******************************************************************************/
310/* P r e p a r e */
311/******************************************************************************/
312
314{
315 EPNAME("Prepare");
316 int rc;
317
318// Check if this file is not online, prepare it
319//
320 if (!(rc = isOnline(pargs->path)))
321 {DEBUG("Preparing " <<pargs->reqid <<' ' <<pargs->notify <<' '
322 <<pargs->prty <<' ' <<pargs->mode <<' ' <<pargs->path);
323 if (!Config.DiskSS) Say.Emsg("Prepare","staging disallowed; ignoring prep",
324 pargs->Ident, pargs->reqid);
325 else Add(*pargs);
326 return;
327 }
328
329// If the file is really online, inform the requestor
330//
331 if (rc > 0) Inform("avail", pargs);
332}
333
334/******************************************************************************/
335/* R e s e t */
336/******************************************************************************/
337
338void XrdCmsPrepare::Reset(const char *iName, const char *aPath, int aMode)
339{
340 EPNAME("Reset");
341 char baseAP[1024], *Slash;
342
343// This is a call from the configurator. No need to do anything if we have
344// no interface to initialize.
345//
346 if (!prepif) return;
347
348// If this is a built-in mechanism, then allocate the prepare interface
349// and initialize it. This is a one-time thing and it better work right away.
350// In any case, do a standard reset.
351//
352 if (!*prepif)
353 {PrepFrm = new XrdFrcProxy(Say.logger(), iName);
354 DEBUG("Initializing internal FRM prepare interface.");
355 strcpy(baseAP, aPath); baseAP[strlen(baseAP)-1] = '\0';
356 if ((Slash = rindex(baseAP, '/'))) *Slash = '\0';
357 if (!(prepOK = PrepFrm->Init(XrdFrcProxy::opStg, baseAP, aMode)))
358 {Say.Emsg("Reset", "Built-in prepare init failed; prepare disabled.");
359 return;
360 }
361 }
362
363// Reset the interface and schedule a scrub
364//
365 Reset();
366 if (scrubtime) Sched->Schedule((XrdJob *)this,scrubtime+time(0));
367
368}
369
370/******************************************************************************/
371/* s e t P a r m s */
372/******************************************************************************/
373
374int XrdCmsPrepare::setParms(int rcnt, int stime, int deco)
375{if (rcnt > 0) resetcnt = scrub2rst = rcnt;
376 if (stime > 0) scrubtime = stime;
377 doEcho = deco;
378 return 0;
379}
380
381int XrdCmsPrepare::setParms(const char *ifpgm, char *ifmsg)
382{if (ifpgm)
383 {const char *Slash = rindex(ifpgm, '/');
384 if (prepif) free(prepif);
385 if (Slash && !strcmp(Slash+1, "frm_xfragent")) ifpgm = "";
386 prepif = strdup(ifpgm);
387 }
388 if (ifmsg)
389 {if (prepMsg) delete prepMsg;
390 prepMsg = new XrdOucMsubs(&Say);
391 if (!(prepMsg->Parse("prepmsg", ifmsg)))
392 {delete prepMsg; prepMsg = 0; return 1;}
393 }
394 return 0;
395}
396
397/******************************************************************************/
398/* P r i v a t e M e t h o d s */
399/******************************************************************************/
400/******************************************************************************/
401/* i s O n l i n e */
402/******************************************************************************/
403
404int XrdCmsPrepare::isOnline(char *path)
405{
406 static const int Sopts = XRDOSS_resonly | XRDOSS_updtatm;
407 struct stat buf;
408
409// Issue the stat() via oss plugin. If it indicates the file is not there is
410// still might be logically here because it's in a staging queue.
411//
412 if (Config.ossFS->Stat(path, &buf, Sopts))
413 {if (Config.DiskSS && Exists(path)) return -1;
414 else return 0;
415 }
416 return 1;
417}
418
419/******************************************************************************/
420/* R e s e t */
421/******************************************************************************/
422
423void XrdCmsPrepare::Reset() // Must be called with PTMutex locked!
424{
425 char *lp, *pdata[] = {(char *)"?\n", 0};
426 int pdlen[] = {2, 0};
427
428// Hanlde via built-in mechanism
429//
430 if (PrepFrm)
431 {XrdFrcProxy::Queues State(XrdFrcProxy::opStg);
432 char Buff[1024];
433 if (prepOK)
434 {PTable.Purge(); NumFiles = 0;
435 while(PrepFrm->List(State, Buff, sizeof(Buff)))
436 {PTable.Add(Buff, 0, 0, Hash_data_is_key); NumFiles++;
437 if (doEcho) Say.Emsg("Reset","Prepare pending for",Buff);
438 }
439 }
440 return;
441 }
442
443// Check if we really have an interface to reset
444//
445 if (!prepif)
446 {Say.Emsg("Reset", "Prepare program not specified; prepare disabled.");
447 return;
448 }
449
450// Do it the slow external way
451//
452 if (!prepSched.isAlive() && !startIF()) return;
453 if (prepSched.Put((const char **)pdata, (const int *)pdlen))
454 {Say.Emsg("Prepare", prepSched.LastError(), "write to", prepif);
455 prepSched.Drain(); prepOK = 0;
456 }
457 else {PTable.Purge(); NumFiles = 0;
458 while((lp = prepSched.GetLine()) && *lp)
459 {PTable.Add(lp, 0, 0, Hash_data_is_key); NumFiles++;
460 if (doEcho) Say.Emsg("Reset","Prepare pending for",lp);
461 }
462 }
463}
464
465/******************************************************************************/
466/* S c r u b */
467/******************************************************************************/
468
469void XrdCmsPrepare::Scrub()
470{
471 PTMutex.Lock();
472 if (scrub2rst <= 0)
473 {Reset();
474 scrub2rst = resetcnt;
475 }
476 else {PTable.Apply(XrdCmsScrubScan, (void *)0);
477 scrub2rst--;
478 }
479 if (!PrepFrm && !prepSched.isAlive()) startIF();
480 PTMutex.UnLock();
481}
482
483/******************************************************************************/
484/* s t a r t I F */
485/******************************************************************************/
486
487int XrdCmsPrepare::startIF() // Must be called with PTMutex locked!
488{
489 EPNAME("startIF")
490
491// If we are using a local interface then there is nothing to start.
492//
493 if (PrepFrm) return prepOK;
494
495// Complain if there is no external prepare program
496//
497 if (!prepif)
498 {Say.Emsg("startIF","Prepare program not specified; prepare disabled.");
499 return (prepOK = 0);
500 }
501
502// Setup the external program
503//
504 DEBUG("Prepare: Starting " <<prepif);
505 if (prepSched.Exec(prepif, 1))
506 {time_t eNow = time(0);
507 prepOK = 0;
508 if ((eNow - lastemsg) >= 60)
509 {lastemsg = eNow;
510 Say.Emsg("Prepare", prepSched.LastError(), "start", prepif);
511 }
512 } else prepOK = 1;
513
514// All done
515//
516 return prepOK;
517}
#define DEBUG(x)
#define EPNAME(x)
int XrdCmsScrubScan(const char *key, char *cip, void *xargp)
#define XRDOSS_resonly
Definition XrdOss.hh:486
#define XRDOSS_updtatm
Definition XrdOss.hh:487
@ Hash_data_is_key
Definition XrdOucHash.hh:52
#define stat(a, b)
Definition XrdPosix.hh:101
bool Exists
if(ec< 0) ec
void Prepare(XrdCmsPrepArgs *pargs)
int setParms(int rcnt, int stime, int deco=0)
void Gone(char *path)
int Exists(char *path)
void Inform(const char *cmd, XrdCmsPrepArgs *pargs)
int Add(XrdCmsPrepArgs &pargs)
int Del(char *reqid)
void Reset(const char *iName, const char *aPath, int aMode)
static const int opStg
XrdJob(const char *desc="")
Definition XrdJob.hh:51
static const int maxElem
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
XrdScheduler * Sched
XrdSysError Say
XrdCmsPrepare PrepQ
XrdCmsConfig Config