54XrdInet *XrdCmsClientMan::Network = 0;
58const char *XrdCmsClientMan::ConfigFN = 0;
67 int cw,
int nr,
int rw,
int rd)
71 static int Instance = 0;
75 if ((dot = index(Host,
'.')))
76 {*dot =
'\0'; HPfx = strdup(Host); *dot =
'.';}
77 else HPfx = strdup(Host);
91 lastUpdt= lastTOut = time(0);
98 if (dally < 3) dally = 3;
99 else if (dally > 10) dally = 10;
104 manMask = 1<<Instance++;
114 if (Link) Link->Close();
115 if (Host) free(Host);
116 if (HPfx) free(HPfx);
117 if (NetBuff) NetBuff->Recycle();
132 {
Say.Emsg(
"Manager", Host,
"supplied invalid waitr msgid");
133 Resp.
setErrInfo(EILSEQ,
"redirector protocol error");
141 {
Say.Emsg(
"Manager",ENOMEM,
"allocate resp object for",Resp.
getErrUser());
150 if (msgid < maxMsgID) RespQ.Purge();
173 if (!mlen) mlen = strlen(msg);
181 {
if (!(allok = Link->Send(msg, mlen) > 0))
207 {
if (!(allok = Link->Send(
iov, iovcnt, iotot) > 0))
241 else if (Response.rrCode ==
kYR_status) setStatus();
248 if (Link) {Link->Close(); Link = 0;}
249 Active = 0; Suspend = 1;
254 Say.Emsg(
"ClientMan",
"Disconnected from", Host);
280 {
if (Active == RecvCnt)
281 {
if ((time(0)-lastTOut) >= repWait)
284 {Active = 0; Silent = 0; Suspend = 1;
285 if (Link && iMan == manInst)
287 manInst++; lClose =
true;
289 }
else if (Silent & 0x02 && repWait < repWMax) repWait++;
291 }
else {Active = RecvCnt; Silent = 0; lastTOut = time(0);}
298 theDelay = inQ * qTime;
301 theDelay = theDelay/1000 + (theDelay % 1000 ? 1 : 0);
302 if (theDelay < minDelay) theDelay = minDelay;
303 if (theDelay > maxDelay) theDelay = maxDelay;
307 TRACE(Redirect, user <<
" no resp from inst " <<iMan <<
" of "<<HPfx
309 <<
" inst " <<xMan <<(lClose ?
" closed" :
" steady")
310 <<
"; inQ " <<inQ <<
" delay " <<theDelay <<
" path=" <<path);
321int XrdCmsClientMan::Hookup()
326 char buff[256], hnBuff[264];
328 int rc, oldWait, tries = 12,
opts = 0;
338 const char *hn = getenv(
"XRDHOST");
340 {snprintf(hnBuff,
sizeof(hnBuff),
"myHN=%s", hn);
347 do {
while(!(lp = Network->Connect(Host, Port,
opts)))
350 else {
opts = 0; tries = 12;}
354 memset(&Data, 0,
sizeof(Data));
357 Data.
HoldTime =
static_cast<int>(getpid());
382 if ((oldWait = (repWait*20/100)) < 2) oldWait = 2;
383 if (Data.
HoldTime > repWMax*1000) repWait = repWMax;
384 else if (Data.
HoldTime <= 0) repWait = repWMax;
386 repWait = (repWait/1000) + (repWait % 1000 ? 1 : 0);
387 if (repWait > repWMax) repWait = repWMax;
388 else if (repWait < oldWait) repWait = oldWait;
396 sprintf(buff,
"v %d", Data.
Version);
397 Say.
Emsg(
"ClientMan", (Suspend ?
"Connected to suspended" :
"Connected to"),
399 DEBUG(Host <<
" qt=" <<qTime <<
"ms rw=" <<repWait);
407int XrdCmsClientMan::Receive()
415 if (Link->RecvAll((
char *)&Response, sizeof(Response)) > 0)
416 {
int dlen =
static_cast<int>(ntohs(Response.datalen));
418 DEBUG(Link->Name() <<
' ' <<dlen <<
" bytes on " <<Response.streamid);
420 if ((dlen > NetBuff->BuffSize())
421 && (Response.rrCode !=
kYR_data || !NetBuff->Resize(dlen)))
422 Say.
Emsg(
"ClientMan",
"Excessive msg length from", Host);
423 else {NetBuff->SetLen(dlen);
424 return Link->RecvAll(NetBuff->Buffer(), dlen);
434void XrdCmsClientMan::relayResp()
441 if (!(rp = RespQ.Rem(Response.streamid)))
442 {
DEBUG(Host <<
" replied to non-existent request; id=" <<Response.streamid);
448 rp->
Reply(HPfx, Response, NetBuff);
459int XrdCmsClientMan::chkStatus()
461 static CmsUpdateRequest Updt = {{0,
kYR_update, 0, 0}};
462 XrdSysMutexHelper mdMon(myData);
470 if ((nowTime - lastUpdt) >= 30)
472 if (Active) Link->Send((
char *)&Updt,
sizeof(Updt));
482void XrdCmsClientMan::setStatus()
485 const char *State = 0, *Event =
"?";
491 if (!Suspend) {Suspend = 1; State =
"suspended";}
495 if (Suspend) {Suspend = 0; State =
"resumed";}
499 DEBUG(Host <<
" sent " <<Event <<
" event");
500 if (State)
Say.
Emsg(
"setStatus",
"Manager", Host, State);
int Send(unsigned int &iMan, char *msg, int mlen=0)
int delayResp(XrdOucErrInfo &Resp)
int whatsUp(const char *user, const char *path, unsigned int iMan)
XrdCmsClientMan(char *host, int port, int cw, int nr, int rw, int rd)
static int Reply(const char *Man, XrdCms::CmsRRHdr &hdr, XrdOucBuffer *buff)
static int Login(XrdLink *Link, XrdCms::CmsLoginData &Data, int timeout=-1)
void Reply(const char *Man, XrdCms::CmsRRHdr &rrhdr, XrdOucBuffer *netbuff)
static XrdCmsResp * Alloc(XrdOucErrInfo *erp, int msgid)
int Close(bool defer=false)
const char * getErrUser()
int setErrInfo(int code, const char *emsg)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
static void Snooze(int seconds)
static const size_t Max_Error_Len