XRootD
Loading...
Searching...
No Matches
XrdPfcIOFileBlock.cc
Go to the documentation of this file.
1//----------------------------------------------------------------------------------
2// Copyright (c) 2014 by Board of Trustees of the Leland Stanford, Jr., University
3// Author: Alja Mrak-Tadel, Matevz Tadel, Brian Bockelman
4//----------------------------------------------------------------------------------
5// XRootD is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Lesser General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// XRootD is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14//
15// You should have received a copy of the GNU Lesser General Public License
16// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17//----------------------------------------------------------------------------------
18
19#include "XrdPfcIOFileBlock.hh"
20#include "XrdPfc.hh"
21#include "XrdPfcStats.hh"
22#include "XrdPfcTrace.hh"
23
24#include "XrdOss/XrdOss.hh"
26#include "XrdSys/XrdSysError.hh"
27
28#include "XrdOuc/XrdOucEnv.hh"
29
30#include <cmath>
31#include <sstream>
32#include <cstdio>
33#include <iostream>
34#include <cassert>
35#include <fcntl.h>
36
37using namespace XrdPfc;
38
39//______________________________________________________________________________
41 IO(io, cache), m_localStat(0), m_info(cache.GetTrace(), false), m_info_file(0)
42{
44 GetBlockSizeFromPath();
45 initLocalStat();
46}
47
48//______________________________________________________________________________
50{
51 // called from Detach() if no sync is needed or
52 // from Cache's sync thread
53
54 TRACEIO(Debug, "deleting IOFileBlock");
55}
56
57// Check if m_mutex is needed at all, it is only used in ioActive and DetachFinalize
58// and in Read for block selection -- see if Prefetch Read requires mutex
59// to be held.
60// I think I need it in ioActive and Read.
61
62//______________________________________________________________________________
64{
65 IO::Update(iocp);
66 {
67 XrdSysMutexHelper lock(&m_mutex);
68
69 for (std::map<int, File*>::iterator it = m_blocks.begin(); it != m_blocks.end(); ++it)
70 {
71 // Need to update all File / block objects.
72 if (it->second) it->second->ioUpdated(this);
73 }
74 }
75}
76
77//______________________________________________________________________________
79{
80 // Called from XrdPosixFile when local connection is closed.
81
83
84 bool active = false;
85 {
86 XrdSysMutexHelper lock(&m_mutex);
87
88 for (std::map<int, File*>::iterator it = m_blocks.begin(); it != m_blocks.end(); ++it)
89 {
90 // Need to initiate stop on all File / block objects.
91 if (it->second && it->second->ioActive(this))
92 {
93 active = true;
94 }
95 }
96 }
97
98 return active;
99}
100
101//______________________________________________________________________________
103{
104 // Effectively a destructor.
105
106 TRACEIO(Info, "DetachFinalize() " << this);
107
108 CloseInfoFile();
109 {
110 XrdSysMutexHelper lock(&m_mutex);
111 for (std::map<int, File*>::iterator it = m_blocks.begin(); it != m_blocks.end(); ++it)
112 {
113 if (it->second)
114 {
115 it->second->RequestSyncOfDetachStats();
116 m_cache.ReleaseFile(it->second, this);
117 }
118 }
119 }
120
121 delete this;
122}
123
124//______________________________________________________________________________
125void IOFileBlock::CloseInfoFile()
126{
127 // write access statistics to info file and close it
128 // detach time is needed for file purge
129 if (m_info_file)
130 {
131 if (m_info.GetFileSize() > 0)
132 {
133 // We do not maintain access statistics for individual blocks.
134 Stats as;
135 m_info.WriteIOStatDetach(as);
136 }
137 m_info.Write(m_info_file, GetFilename().c_str());
138 m_info_file->Fsync();
139 m_info_file->Close();
140
141 delete m_info_file;
142 m_info_file = 0;
143 }
144}
145
146//______________________________________________________________________________
147void IOFileBlock::GetBlockSizeFromPath()
148{
149 const static std::string tag = "hdfsbsize=";
150
151 std::string path = GetInput()->Path();
152 size_t pos1 = path.find(tag);
153 size_t t = tag.length();
154
155 if (pos1 != path.npos)
156 {
157 pos1 += t;
158 size_t pos2 = path.find("&", pos1);
159 if (pos2 != path.npos )
160 {
161 std::string bs = path.substr(pos1, pos2 - pos1);
162 m_blocksize = atoi(bs.c_str());
163 }
164 else
165 {
166 m_blocksize = atoi(path.substr(pos1).c_str());
167 }
168
169 TRACEIO(Debug, "GetBlockSizeFromPath(), blocksize = " << m_blocksize );
170 }
171}
172
173//______________________________________________________________________________
174File* IOFileBlock::newBlockFile(long long off, int blocksize)
175{
176 // NOTE: Can return 0 if opening of a local file fails!
177
178 std::string fname = GetFilename();
179
180 std::stringstream ss;
181 ss << fname;
182 char offExt[64];
183 // filename like <origpath>___<size>_<offset>
184 sprintf(&offExt[0], "___%lld_%lld", m_blocksize, off);
185 ss << &offExt[0];
186 fname = ss.str();
187
188 TRACEIO(Debug, "FileBlock(), create XrdPfcFile ");
189
190 File *file = Cache::GetInstance().GetFile(fname, this, off, blocksize);
191 return file;
192}
193
194//______________________________________________________________________________
195int IOFileBlock::Fstat(struct stat &sbuff)
196{
197 // local stat is create in constructor. if file was on disk before
198 // attach that the only way stat was not successful is becuse there
199 // were info file read errors
200 if ( ! m_localStat) return -ENOENT;
201
202 memcpy(&sbuff, m_localStat, sizeof(struct stat));
203 return 0;
204}
205
206//______________________________________________________________________________
208{
209 if ( ! m_localStat) return -ENOENT;
210
211 return m_localStat->st_size;
212}
213
214//______________________________________________________________________________
215int IOFileBlock::initLocalStat()
216{
217 std::string path = GetFilename() + Info::s_infoExtension;
218
219 int res = -1;
220 struct stat tmpStat;
221 XrdOucEnv myEnv;
222
223 // try to read from existing file
224 if (m_cache.GetOss()->Stat(path.c_str(), &tmpStat) == XrdOssOK)
225 {
226 m_info_file = m_cache.GetOss()->newFile(m_cache.RefConfiguration().m_username.c_str());
227 if (m_info_file->Open(path.c_str(), O_RDWR, 0600, myEnv) == XrdOssOK)
228 {
229 if (m_info.Read(m_info_file, path.c_str()))
230 {
231 tmpStat.st_size = m_info.GetFileSize();
232 TRACEIO(Info, "initCachedStat successfully read size from existing info file = " << tmpStat.st_size);
233 res = 0;
234 }
235 else
236 {
237 // file exist but can't read it
238 TRACEIO(Debug, "initCachedStat info file is not complete");
239 }
240 }
241 }
242
243 // if there is no local info file, try to read from client and then save stat into a new *cinfo file
244 if (res)
245 {
246 if (m_info_file) { delete m_info_file; m_info_file = 0; }
247
248 res = GetInput()->Fstat(tmpStat);
249 TRACEIO(Debug, "initCachedStat get stat from client res= " << res << "size = " << tmpStat.st_size);
250 if (res == 0)
251 {
252 if (m_cache.GetOss()->Create(m_cache.RefConfiguration().m_username.c_str(), path.c_str(), 0600, myEnv, XRDOSS_mkpath) == XrdOssOK)
253 {
254 m_info_file = m_cache.GetOss()->newFile(m_cache.RefConfiguration().m_username.c_str());
255 if (m_info_file->Open(path.c_str(), O_RDWR, 0600, myEnv) == XrdOssOK)
256 {
257 // This is writing the top-level cinfo
258 // The info file is used to get file size on defer open
259 // don't initalize buffer, it does not hold useful information in this case
260 m_info.SetBufferSizeFileSizeAndCreationTime(m_cache.RefConfiguration().m_bufferSize, tmpStat.st_size);
261 // m_info.DisableDownloadStatus(); -- this stopped working a while back.
262 m_info.Write(m_info_file, path.c_str());
263 m_info_file->Fsync();
264 }
265 else
266 {
267 TRACEIO(Error, "initCachedStat can't open info file path");
268 }
269 }
270 else
271 {
272 TRACEIO(Error, "initCachedStat can't create info file path");
273 }
274 }
275 }
276
277 if (res == 0)
278 {
279 m_localStat = new struct stat;
280 memcpy(m_localStat, &tmpStat, sizeof(struct stat));
281 }
282
283 return res;
284}
285
286//______________________________________________________________________________
287int IOFileBlock::Read(char *buff, long long off, int size)
288{
289 // protect from reads over the file size
290
291 long long fileSize = FSize();
292
293 if (off >= fileSize)
294 return 0;
295 if (off < 0)
296 {
297 return -EINVAL;
298 }
299 if (off + size > fileSize)
300 size = fileSize - off;
301
302 long long off0 = off;
303 int idx_first = off0 / m_blocksize;
304 int idx_last = (off0 + size - 1) / m_blocksize;
305 int bytes_read = 0;
306 TRACEIO(Dump, "Read() "<< off << "@" << size << " block range ["<< idx_first << ", " << idx_last << "]");
307
308 for (int blockIdx = idx_first; blockIdx <= idx_last; ++blockIdx)
309 {
310 // locate block
311 File *fb;
312 m_mutex.Lock();
313 std::map<int, File*>::iterator it = m_blocks.find(blockIdx);
314 if (it != m_blocks.end())
315 {
316 fb = it->second;
317 }
318 else
319 {
320 size_t pbs = m_blocksize;
321 // check if this is last block
322 int lastIOFileBlock = (fileSize-1)/m_blocksize;
323 if (blockIdx == lastIOFileBlock )
324 {
325 pbs = fileSize - blockIdx*m_blocksize;
326 // TRACEIO(Dump, "Read() last block, change output file size to " << pbs);
327 }
328
329 // Note: File* can be 0 and stored as 0 if local open fails!
330 fb = newBlockFile(blockIdx*m_blocksize, pbs);
331 m_blocks.insert(std::make_pair(blockIdx, fb));
332 }
333 m_mutex.UnLock();
334
335 // edit size if read request is reaching more than a block
336 int readBlockSize = size;
337 if (idx_first != idx_last)
338 {
339 if (blockIdx == idx_first)
340 {
341 readBlockSize = (blockIdx + 1) * m_blocksize - off0;
342 TRACEIO(Dump, "Read partially till the end of the block");
343 }
344 else if (blockIdx == idx_last)
345 {
346 readBlockSize = (off0 + size) - blockIdx * m_blocksize;
347 TRACEIO(Dump, "Read partially till the end of the block");
348 }
349 else
350 {
351 readBlockSize = m_blocksize;
352 }
353 }
354
355 TRACEIO(Dump, "Read() block[ " << blockIdx << "] read-block-size[" << readBlockSize << "], offset[" << readBlockSize << "] off = " << off );
356
357 int retvalBlock;
358 if (fb != 0)
359 {
360 struct ZHandler : public ReadReqRH
361 { using ReadReqRH::ReadReqRH;
362 XrdSysCondVar m_cond {0};
363 int m_retval {0};
364
365 void Done(int result) override
366 { m_cond.Lock(); m_retval = result; m_cond.Signal(); m_cond.UnLock(); }
367 };
368
369 ReadReqRHCond rh(ObtainReadSid(), nullptr);
370
371 rh.m_cond.Lock();
372 retvalBlock = fb->Read(this, buff, off, readBlockSize, &rh);
373 if (retvalBlock == -EWOULDBLOCK)
374 {
375 rh.m_cond.Wait();
376 retvalBlock = rh.m_retval;
377 }
378 rh.m_cond.UnLock();
379 }
380 else
381 {
382 retvalBlock = GetInput()->Read(buff, off, readBlockSize);
383 }
384
385 TRACEIO(Dump, "Read() Block read returned " << retvalBlock);
386 if (retvalBlock == readBlockSize)
387 {
388 bytes_read += retvalBlock;
389 buff += retvalBlock;
390 off += retvalBlock;
391 }
392 else if (retvalBlock >= 0)
393 {
394 TRACEIO(Warning, "Read() incomplete read, missing bytes " << readBlockSize-retvalBlock);
395 return -EIO;
396 }
397 else
398 {
399 TRACEIO(Error, "Read() read error, retval" << retvalBlock);
400 return retvalBlock;
401 }
402 }
403
404 return bytes_read;
405}
#define XrdOssOK
Definition XrdOss.hh:50
#define XRDOSS_mkpath
Definition XrdOss.hh:466
#define TRACEIO(act, x)
#define stat(a, b)
Definition XrdPosix.hh:101
bool Debug
virtual int Fsync()
Definition XrdOss.hh:144
virtual int Close(long long *retsz=0)=0
virtual int Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env)
Definition XrdOss.hh:200
virtual XrdOssDF * newFile(const char *tident)=0
virtual int Stat(const char *path, struct stat *buff, int opts=0, XrdOucEnv *envP=0)=0
virtual int Read(char *buff, long long offs, int rlen)=0
virtual int Fstat(struct stat &sbuff)
virtual const char * Path()=0
XrdOucCacheIO()
Construct and Destructor.
Attaches/creates and detaches/deletes cache-io objects for disk based cache.
Definition XrdPfc.hh:152
File * GetFile(const std::string &, IO *, long long off=0, long long filesize=0)
Definition XrdPfc.cc:389
const Configuration & RefConfiguration() const
Reference XrdPfc configuration.
Definition XrdPfc.hh:204
static Cache & GetInstance()
Singleton access.
Definition XrdPfc.cc:132
XrdOss * GetOss() const
Definition XrdPfc.hh:268
int Read(IO *io, char *buff, long long offset, int size, ReadReqRH *rh)
Normal read.
void Update(XrdOucCacheIO &iocp) override
long long FSize() override
int Fstat(struct stat &sbuff) override
int Read(char *Buffer, long long Offset, int Length) override
IOFileBlock(XrdOucCacheIO *io, Cache &cache)
bool ioActive() override
Abstract virtual method of XrdPfc::IO Called to check if destruction needs to be done in a separate t...
void DetachFinalize() override
Abstract virtual method of XrdPfc::IO Called to destruct the IO object after it is no longer used.
IO(XrdOucCacheIO *io, Cache &cache)
Definition XrdPfcIO.cc:7
XrdOucCacheIO * GetInput()
Definition XrdPfcIO.cc:31
Cache & m_cache
reference to Cache object
Definition XrdPfcIO.hh:50
friend class File
Definition XrdPfcIO.hh:80
XrdSysTrace * GetTrace()
Definition XrdPfcIO.hh:45
const char * RefreshLocation()
Definition XrdPfcIO.hh:55
void Update(XrdOucCacheIO &iocp) override
Definition XrdPfcIO.cc:17
unsigned short ObtainReadSid()
Definition XrdPfcIO.hh:57
std::string GetFilename()
Definition XrdPfcIO.cc:36
Status of cached file. Can be read from and written into a binary file.
Definition XrdPfcInfo.hh:41
static const char * s_infoExtension
bool Write(XrdOssDF *fp, const char *dname, const char *fname=0)
long long GetFileSize() const
Get file size.
bool Read(XrdOssDF *fp, const char *dname, const char *fname=0)
Read content of cinfo file into this object.
void WriteIOStatDetach(Stats &s)
Write close time together with bytes missed, hits, and disk.
Statistics of cache utilisation by a File object.
long long m_hdfsbsize
used with m_hdfsmode, default 128MB
Definition XrdPfc.hh:115
std::string m_username
username passed to oss plugin
Definition XrdPfc.hh:87
ReadReqRH(unsigned short sid, XrdOucCacheIOCB *iocb)
Definition XrdPfcFile.hh:56