XRootD
Loading...
Searching...
No Matches
XrdClReplay.cc
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2011-2021 by European Organization for Nuclear Research (CERN)
3// Author: Michal Simon <michal.simon@cern.ch>
4// Co-Author: Andreas-Joachim Peters <andreas.joachim.peters@cern.ch>
5//------------------------------------------------------------------------------
6// This file is part of the XRootD software suite.
7//
8// XRootD is free software: you can redistribute it and/or modify
9// it under the terms of the GNU Lesser General Public License as published by
10// the Free Software Foundation, either version 3 of the License, or
11// (at your option) any later version.
12//
13// XRootD is distributed in the hope that it will be useful,
14// but WITHOUT ANY WARRANTY; without even the implied warranty of
15// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16// GNU General Public License for more details.
17//
18// You should have received a copy of the GNU Lesser General Public License
19// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
20//
21// In applying this licence, CERN does not waive the privileges and immunities
22// granted to it by virtue of its status as an Intergovernmental Organization
23// or submit itself to any jurisdiction.
24//------------------------------------------------------------------------------
25
27#include "XrdCl/XrdClUtils.hh"
30#include "XrdClAction.hh"
31#include "XrdClActionMetrics.hh"
32#include "XrdClReplayArgs.hh"
33#include <fstream>
34#include <vector>
35#include <tuple>
36#include <unordered_map>
37#include <chrono>
38#include <iostream>
39#include <thread>
40#include <iomanip>
41#include <atomic>
42#include <stdarg.h>
43#include <getopt.h>
44#include <map>
45#include <vector>
46#include <numeric>
47#include <mutex>
48#include <condition_variable>
49
50namespace XrdCl
51{
52
53//------------------------------------------------------------------------------
55//------------------------------------------------------------------------------
57{
58 public:
59
60 //--------------------------------------------------------------------------
62 //--------------------------------------------------------------------------
64 {
65 static BufferPool instance;
66 return instance;
67 }
68
69 //--------------------------------------------------------------------------
72 //--------------------------------------------------------------------------
73 std::shared_ptr<std::vector<char>> Allocate( size_t length )
74 {
75 std::unique_lock<std::mutex> lck( mtx );
76 cv.wait( lck, [this, length]{ return available >= length; } );
77 available -= length;
78 BufferDeleter del;
79 std::shared_ptr<std::vector<char>> buffer( new std::vector<char>( length, 'A' ), del );
80 return buffer;
81 }
82
83 private:
84
85 //--------------------------------------------------------------------------
87 //--------------------------------------------------------------------------
88 void Reclaim( size_t length )
89 {
90 std::unique_lock<std::mutex> lck(mtx);
91 available += length;
92 cv.notify_all();
93 }
94
95 //--------------------------------------------------------------------------
98 //--------------------------------------------------------------------------
99 struct BufferDeleter
100 {
101 void operator()( std::vector<char> *buff )
102 {
103 BufferPool::Instance().Reclaim( buff->size() );
104 delete buff;
105 }
106 };
107
108 static const size_t KB = 1024;
109 static const size_t MB = 1024 * KB;
110 static const size_t GB = 1024 * MB;
111
112 //--------------------------------------------------------------------------
115 //--------------------------------------------------------------------------
116 BufferPool() : mtx(), cv()
117 {
118 const char *maxsize = getenv( "XRD_MAXBUFFERSIZE" );
119 if( maxsize )
120 {
121 size_t len = strlen( maxsize );
122 size_t pos;
123 available = std::stoul( maxsize, &pos );
124 std::string sufix( len != pos ? maxsize + len - 2 : "" );
125 std::transform( sufix.begin(), sufix.end(), sufix.begin(), ::toupper );
126 if( !sufix.empty() )
127 {
128 if( sufix == "KB" )
129 available *= KB;
130 else if( sufix == "MB" )
131 available *= MB;
132 else if( sufix == "GB" )
133 available *= GB;
134 }
135 return;
136 }
137 available = std::numeric_limits<size_t>::max();
138 }
139
140 BufferPool( const BufferPool& ) = delete;
141 BufferPool( BufferPool&& ) = delete;
142
143 BufferPool& operator=( const BufferPool& ) = delete;
144 BufferPool& operator=( BufferPool& ) = delete;
145
146
147 size_t available;
148 std::mutex mtx;
149 std::condition_variable cv;
150};
151
152//------------------------------------------------------------------------------
154//------------------------------------------------------------------------------
156{
157 public:
158 //--------------------------------------------------------------------------
160 //--------------------------------------------------------------------------
162 : start(clock_t::now())
163 {
164 }
165
166 //--------------------------------------------------------------------------
168 //--------------------------------------------------------------------------
169 void reset() { start = clock_t::now(); }
170
171 //--------------------------------------------------------------------------
173 //--------------------------------------------------------------------------
174 double elapsed() const
175 {
176 return (1.0
177 * (std::chrono::duration_cast<std::chrono::nanoseconds>(clock_t::now() - start).count())
178 / 1000000000.0);
179 }
180
181 private:
182 using clock_t = std::chrono::high_resolution_clock;
183 std::chrono::time_point<clock_t> start; //< registered start time
184};
185
186//------------------------------------------------------------------------------
189//------------------------------------------------------------------------------
191{
192 public:
193 //------------------------------------------------------------------------
196 //------------------------------------------------------------------------
198 : sem(sem)
199 {
200 }
201
202 //------------------------------------------------------------------------
204 //------------------------------------------------------------------------
205 ~barrier_t() { sem.Post(); }
206
207 inline XrdSysSemaphore& get() { return sem; }
208
209 private:
210 XrdSysSemaphore& sem; //< the semaphore to be posted
211};
212
213//------------------------------------------------------------------------------
215//------------------------------------------------------------------------------
216bool AssureFile(const std::string& url, uint64_t size, bool viatruncate, bool verify)
217{
220 uint16_t timeout = 60;
221
222 {
223 // deal with existing files
224 auto file = std::make_unique<XrdCl::File>(false);
225 XRootDStatus status = file->Open(url, flags, mode, timeout);
226 if (status.IsOK())
227 {
228 StatInfo* statinfo;
229 // file exists already, verify the size
230 status = file->Stat(false, statinfo, timeout);
231 if (status.IsOK())
232 {
233 if (statinfo->GetSize() < size)
234 {
235 std::cerr
236 << "Error: file size is not sufficient, but I won't touch the file - aborting ...";
237 return false;
238 }
239 else
240 {
241 std::cout << "# ---> info: file exists and has sufficient size" << std::endl;
242 return true;
243 }
244 }
245 }
246 }
247
248 if (verify)
249 {
250 std::cerr << "Verify: file is missing or inaccessible: " << url << std::endl;
251 return false;
252 }
253
254 {
255 // deal with non-existing file
258 auto file = std::make_unique<XrdCl::File>(false);
259 XRootDStatus status = file->Open(url, wflags, wmode, timeout);
260 if (status.IsOK())
261 {
262 if (viatruncate)
263 {
264 // create a file via truncation
265 status = file->Truncate(size, timeout);
266 if (!status.IsOK())
267 {
268 std::cerr << "Error: " << status.ToString() << " - empty file might be left behind!"
269 << std::endl;
270 return false;
271 }
272 return true;
273 }
274 else
275 {
276 // create a file via writes
277 using buffer_t = std::vector<uint64_t>; //< data buffer
278 buffer_t buffer(32768);
279 size_t nbytes = 0;
280
281 while (nbytes < size)
282 {
283 size_t towrite = size - nbytes;
284 if (towrite > (buffer.size() * sizeof(uint64_t)))
285 towrite = buffer.size() * sizeof(uint64_t);
286 for (size_t i = 0; i < buffer.size(); ++i)
287 {
288 // we write the offset in this buffer
289 buffer[i] = nbytes / sizeof(uint64_t) + i;
290 }
291 status = file->Write(nbytes, towrite, buffer.data(), timeout);
292 if (!status.IsOK())
293 {
294 std::cerr << "Error: " << status.ToString() << " - failed to write file at offset "
295 << nbytes << " - incomplete file might be left behind!" << std::endl;
296 return false;
297 }
298 nbytes += towrite;
299 }
300 }
301 return true;
302 }
303 else
304 {
305 std::cerr << "Error: " << status.ToString() << " - failed to create file!" << std::endl;
306 }
307 }
308 return false;
309}
310
311//------------------------------------------------------------------------------
313//------------------------------------------------------------------------------
315{
316 using buffer_t = std::shared_ptr<std::vector<char>>; //< data buffer
317
318 public:
319 //--------------------------------------------------------------------------
327 //--------------------------------------------------------------------------
329 const std::string& action,
330 const std::string& args,
331 const std::string& orgststr,
332 const std::string& resp,
333 const double& duration)
334 : file(file)
335 , action(action)
336 , args(args)
337 , orgststr(orgststr)
338 , nominalduration(duration)
339 {
340 }
341
342 //--------------------------------------------------------------------------
345 //--------------------------------------------------------------------------
346 void Execute(std::shared_ptr<barrier_t>& ending,
347 std::shared_ptr<barrier_t>& closing,
348 ActionMetrics& metric,
349 bool simulate)
350 {
351 if (action == "Open") // open action
352 {
353 std::string url;
354 OpenFlags::Flags flags;
355 Access::Mode mode;
356 uint16_t timeout;
357 std::tie(url, flags, mode, timeout) = GetOpenArgs();
358
359 std::string lmetric;
360 if ((flags & OpenFlags::Update) || (flags & OpenFlags::Write))
361 {
362 metric.ios["OpenW::n"]++;
363 }
364 else
365 {
366 metric.ios["OpenR::n"]++;
367 }
368
369 metric.ios["Open::n"]++;
370
371 mytimer_t timer;
372
373 if (!simulate)
374 WaitFor(Open(file, url, flags, mode, timeout) >>
375 [this, orgststr{ orgststr }, ending, closing, timer, &metric](XRootDStatus& s) mutable
376 {
377 metric.addIos("Open", "e", HandleStatus(s, orgststr, "Open"));
378 metric.addDelays("Open", "tmeas", timer.elapsed());
379 ending.reset();
380 closing.reset();
381 });
382 else
383 {
384 ending.reset();
385 closing.reset();
386 }
387 }
388 else if (action == "Close") // close action
389 {
390 uint16_t timeout = GetCloseArgs();
391 mytimer_t timer;
392
393 if (closing)
394 {
395 auto& sem = closing->get();
396 closing.reset();
397 sem.Wait();
398 }
399
400 metric.ios["Close::n"]++;
401
402 if (!simulate)
403 Async(Close(file, timeout) >>
404 [this, orgststr{ orgststr }, ending, timer, &metric](XRootDStatus& s) mutable
405 {
406 metric.addIos("Close", "e", HandleStatus(s, orgststr, "Close"));
407 metric.addDelays("Close", "tmeas", timer.elapsed());
408 ending.reset();
409 });
410 else
411 {
412 ending.reset();
413 }
414 }
415 else if (action == "Stat") // stat action
416 {
417 bool force;
418 uint16_t timeout;
419 std::tie(force, timeout) = GetStatArgs();
420 metric.ios["Stat::n"]++;
421 mytimer_t timer;
422
423 if (!simulate)
424 Async(Stat(file, force, timeout) >>
425 [this, orgststr{ orgststr }, ending, closing, timer, &metric](XRootDStatus& s, StatInfo& r) mutable
426 {
427 metric.addIos("Stat", "e", HandleStatus(s, orgststr, "Stat"));
428 metric.addDelays("Stat", "tmeas", timer.elapsed());
429 ending.reset();
430 closing.reset();
431 });
432 else
433 {
434 ending.reset();
435 closing.reset();
436 }
437 }
438 else if (action == "Read") // read action
439 {
440 uint64_t offset;
441 buffer_t buffer;
442 uint16_t timeout;
443 std::tie(offset, buffer, timeout) = GetReadArgs();
444 metric.ios["Read::n"]++;
445 metric.ios["Read::b"] += buffer->size();
446 if ((offset + buffer->size()) > metric.ios["Read::o"])
447 metric.ios["Read::o"] = offset + buffer->size();
448
449 mytimer_t timer;
450 if (!simulate)
451 Async(Read(file, offset, buffer->size(), buffer->data(), timeout) >>
452 [buffer, orgststr{ orgststr }, ending, closing, timer, &metric](XRootDStatus& s,
453 ChunkInfo& r) mutable
454 {
455 metric.addIos("Read", "e", HandleStatus(s, orgststr, "Read"));
456 metric.addDelays("Read", "tmeas", timer.elapsed());
457 buffer.reset();
458 ending.reset();
459 closing.reset();
460 });
461 else
462 {
463 buffer.reset();
464 ending.reset();
465 closing.reset();
466 }
467 }
468 else if (action == "PgRead") // pgread action
469 {
470 uint64_t offset;
471 buffer_t buffer;
472 uint16_t timeout;
473 std::tie(offset, buffer, timeout) = GetPgReadArgs();
474 metric.ios["PgRead::n"]++;
475 metric.ios["PgRead::b"] += buffer->size();
476 if ((offset + buffer->size()) > metric.ios["Read::o"])
477 metric.ios["Read::o"] = offset + buffer->size();
478 mytimer_t timer;
479 if (!simulate)
480 Async(PgRead(file, offset, buffer->size(), buffer->data(), timeout) >>
481 [buffer, orgststr{ orgststr }, ending, closing, timer, &metric](XRootDStatus& s,
482 PageInfo& r) mutable
483 {
484 metric.addIos("PgRead", "e", HandleStatus(s, orgststr, "PgRead"));
485 metric.addDelays("PgRead", "tmeas", timer.elapsed());
486 buffer.reset();
487 ending.reset();
488 closing.reset();
489 });
490 else
491 {
492 buffer.reset();
493 ending.reset();
494 closing.reset();
495 }
496 }
497 else if (action == "Write") // write action
498 {
499 uint64_t offset;
500 buffer_t buffer;
501 uint16_t timeout;
502 std::tie(offset, buffer, timeout) = GetWriteArgs();
503 metric.ios["Write::n"]++;
504 metric.ios["Write::b"] += buffer->size();
505 if ((offset + buffer->size()) > metric.ios["Write::o"])
506 metric.ios["Write::o"] = offset + buffer->size();
507 mytimer_t timer;
508
509 if (!simulate)
510 Async(
511 Write(file, offset, buffer->size(), buffer->data(), timeout) >>
512 [buffer, orgststr{ orgststr }, ending, closing, timer, &metric](XRootDStatus& s) mutable
513 {
514 metric.addIos("Write", "e", HandleStatus(s, orgststr, "Write"));
515 metric.addDelays("Write", "tmeas", timer.elapsed());
516 buffer.reset();
517 ending.reset();
518 closing.reset();
519 });
520 else
521 {
522 buffer.reset();
523 ending.reset();
524 closing.reset();
525 }
526 }
527 else if (action == "PgWrite") // pgwrite action
528 {
529 uint64_t offset;
530 buffer_t buffer;
531 uint16_t timeout;
532 std::tie(offset, buffer, timeout) = GetPgWriteArgs();
533 metric.ios["PgWrite::n"]++;
534 metric.ios["PgWrite::b"] += buffer->size();
535 if ((offset + buffer->size()) > metric.ios["Write::o"])
536 metric.ios["Write::o"] = offset + buffer->size();
537 mytimer_t timer;
538 if (!simulate)
539 Async(
540 PgWrite(file, offset, buffer->size(), buffer->data(), timeout) >>
541 [buffer, orgststr{ orgststr }, ending, closing, timer, &metric](XRootDStatus& s) mutable
542 {
543 metric.addIos("PgWrite", "e", HandleStatus(s, orgststr, "PgWrite"));
544 metric.addDelays("PgWrite", "tmeas", timer.elapsed());
545 buffer.reset();
546 ending.reset();
547 closing.reset();
548 });
549 else
550 {
551 buffer.reset();
552 ending.reset();
553 closing.reset();
554 }
555 }
556 else if (action == "Sync") // sync action
557 {
558 uint16_t timeout = GetSyncArgs();
559 metric.ios["Sync::n"]++;
560 mytimer_t timer;
561 if (!simulate)
562 Async(Sync(file, timeout) >>
563 [this, orgststr{ orgststr }, ending, closing, timer, &metric](XRootDStatus& s) mutable
564 {
565 metric.addIos("Sync", "e", HandleStatus(s, orgststr, "Sync"));
566 metric.addDelays("Sync", "tmeas", timer.elapsed());
567 ending.reset();
568 closing.reset();
569 });
570 else
571 {
572 ending.reset();
573 closing.reset();
574 }
575 }
576 else if (action == "Truncate") // truncate action
577 {
578 uint64_t size;
579 uint16_t timeout;
580 std::tie(size, timeout) = GetTruncateArgs();
581 metric.ios["Truncate::n"]++;
582 if (size > metric.ios["Truncate::o"])
583 metric.ios["Truncate::o"] = size;
584
585 mytimer_t timer;
586 if (!simulate)
587 Async(Truncate(file, size, timeout) >>
588 [this, orgststr{ orgststr }, ending, closing, timer, &metric](XRootDStatus& s) mutable
589 {
590 metric.addIos("Truncate", "e", HandleStatus(s, orgststr, "Truncate"));
591 metric.addDelays("Truncate", "tmeas", timer.elapsed());
592 ending.reset();
593 closing.reset();
594 });
595 else
596 {
597 ending.reset();
598 closing.reset();
599 }
600 }
601 else if (action == "VectorRead") // vector read action
602 {
603 ChunkList chunks;
604 uint16_t timeout;
605 std::vector<buffer_t> buffers;
606 std::tie(chunks, timeout, buffers) = GetVectorReadArgs();
607 metric.ios["VectorRead::n"]++;
608 for (auto& ch : chunks)
609 {
610 metric.ios["VectorRead::b"] += ch.GetLength();
611 if ((ch.GetOffset() + ch.GetLength()) > metric.ios["Read::o"])
612 metric.ios["Read::o"] = ch.GetOffset() + ch.GetLength();
613 }
614
615 mytimer_t timer;
616 if (!simulate)
617 Async(
618 VectorRead(file, chunks, timeout) >>
619 [this, orgststr{ orgststr }, buffers, ending, closing, timer, &metric](XRootDStatus& s, VectorReadInfo& r) mutable
620 {
621 metric.addIos("VectorRead", "e", HandleStatus(s, orgststr, "VectorRead"));
622 metric.addDelays("VectorRead", "tmeas", timer.elapsed());
623 buffers.clear();
624 ending.reset();
625 closing.reset();
626 });
627 else
628 {
629 buffers.clear();
630 ending.reset();
631 closing.reset();
632 }
633 }
634 else if (action == "VectorWrite") // vector write
635 {
636 ChunkList chunks;
637 uint16_t timeout;
638 std::vector<buffer_t> buffers;
639 std::tie(chunks, timeout, buffers) = GetVectorWriteArgs();
640 metric.ios["VectorWrite::n"]++;
641 for (auto& ch : chunks)
642 {
643 metric.ios["VectorWrite::b"] += ch.GetLength();
644 if ((ch.GetOffset() + ch.GetLength()) > metric.ios["Write::o"])
645 metric.ios["Write::o"] = ch.GetOffset() + ch.GetLength();
646 }
647 mytimer_t timer;
648 if (!simulate)
649 Async(VectorWrite(file, chunks, timeout) >>
650 [this, orgststr{ orgststr }, buffers, ending, closing, timer, &metric](XRootDStatus& s) mutable
651 {
652 metric.addIos("VectorWrite", "e", HandleStatus(s, orgststr, "VectorWrite"));
653 metric.addDelays("VectorWrite", "tmeas", timer.elapsed());
654 buffers.clear();
655 ending.reset();
656 closing.reset();
657 });
658 else
659 {
660 buffers.clear();
661 ending.reset();
662 closing.reset();
663 }
664 }
665 else
666 {
667 DefaultEnv::GetLog()->Warning(AppMsg, "Cannot replyt %s action.", action.c_str());
668 }
669 }
670
671 //--------------------------------------------------------------------------
673 //--------------------------------------------------------------------------
674 double NominalDuration() const { return nominalduration; }
675
676 //--------------------------------------------------------------------------
678 //--------------------------------------------------------------------------
679 std::string Name() const { return action; }
680
681 private:
682 //--------------------------------------------------------------------------
684 //--------------------------------------------------------------------------
685 static bool HandleStatus(XRootDStatus& response, const std::string& orgstr, const std::string where="unknown")
686 {
687 std::string rspstr = response.ToString();
688 rspstr.erase(remove(rspstr.begin(), rspstr.end(), ' '), rspstr.end());
689
690 if (rspstr != orgstr)
691 {
693 "We were expecting status: %s, but "
694 "received: %s from: %s",
695 orgstr.c_str(),
696 rspstr.c_str(),
697 where.c_str());
698 return true;
699 }
700 else
701 {
702 return false;
703 }
704 }
705
706 //--------------------------------------------------------------------------
708 //--------------------------------------------------------------------------
709 std::tuple<std::string, OpenFlags::Flags, Access::Mode, uint16_t> GetOpenArgs()
710 {
711 std::vector<std::string> tokens;
712 Utils::splitString(tokens, args, ";");
713 if (tokens.size() != 4)
714 throw std::invalid_argument("Failed to parse open arguments.");
715 std::string url = tokens[0];
716 OpenFlags::Flags flags = static_cast<OpenFlags::Flags>(std::stoul(tokens[1]));
717 Access::Mode mode = static_cast<Access::Mode>(std::stoul(tokens[2]));
718 uint16_t timeout = static_cast<uint16_t>(std::stoul(tokens[3]));
719 return std::make_tuple(url, flags, mode, timeout);
720 }
721
722 //--------------------------------------------------------------------------
724 //--------------------------------------------------------------------------
725 uint16_t GetCloseArgs() { return static_cast<uint16_t>(std::stoul(args)); }
726
727 std::tuple<bool, uint16_t> GetStatArgs()
728 {
729 std::vector<std::string> tokens;
730 Utils::splitString(tokens, args, ";");
731 if (tokens.size() != 2)
732 throw std::invalid_argument("Failed to parse stat arguments.");
733 bool force = (tokens[0] == "true");
734 uint16_t timeout = static_cast<uint16_t>(std::stoul(tokens[1]));
735 return std::make_tuple(force, timeout);
736 }
737
738 //--------------------------------------------------------------------------
740 //--------------------------------------------------------------------------
741 std::tuple<uint64_t, buffer_t, uint16_t> GetReadArgs()
742 {
743 std::vector<std::string> tokens;
744 Utils::splitString(tokens, args, ";");
745 if (tokens.size() != 3)
746 throw std::invalid_argument("Failed to parse read arguments.");
747 uint64_t offset = std::stoull(tokens[0]);
748 uint32_t length = std::stoul(tokens[1]);
749 auto buffer = BufferPool::Instance().Allocate( length );
750 uint16_t timeout = static_cast<uint16_t>(std::stoul(tokens[2]));
751 return std::make_tuple(offset, buffer, timeout);
752 }
753
754 //--------------------------------------------------------------------------
756 //--------------------------------------------------------------------------
757 inline std::tuple<uint64_t, buffer_t, uint16_t> GetPgReadArgs() { return GetReadArgs(); }
758
759 //--------------------------------------------------------------------------
761 //--------------------------------------------------------------------------
762 inline std::tuple<uint64_t, buffer_t, uint16_t> GetWriteArgs() { return GetReadArgs(); }
763
764 //--------------------------------------------------------------------------
766 //--------------------------------------------------------------------------
767 inline std::tuple<uint64_t, buffer_t, uint16_t> GetPgWriteArgs() { return GetReadArgs(); }
768
769 //--------------------------------------------------------------------------
771 //--------------------------------------------------------------------------
772 uint16_t GetSyncArgs() { return static_cast<uint16_t>(std::stoul(args)); }
773
774 //--------------------------------------------------------------------------
776 //--------------------------------------------------------------------------
777 std::tuple<uint64_t, uint16_t> GetTruncateArgs()
778 {
779 std::vector<std::string> tokens;
780 Utils::splitString(tokens, args, ";");
781 if (tokens.size() != 2)
782 throw std::invalid_argument("Failed to parse truncate arguments.");
783 uint64_t size = std::stoull(tokens[0]);
784 uint16_t timeout = static_cast<uint16_t>(std::stoul(tokens[1]));
785 return std::make_tuple(size, timeout);
786 }
787
788 //--------------------------------------------------------------------------
790 //--------------------------------------------------------------------------
791 std::tuple<ChunkList, uint16_t, std::vector<buffer_t>> GetVectorReadArgs()
792 {
793 std::vector<std::string> tokens;
794 Utils::splitString(tokens, args, ";");
795 ChunkList chunks;
796 chunks.reserve( tokens.size() - 1 );
797 std::vector<buffer_t> buffers;
798 buffers.reserve( tokens.size() - 1 );
799 for (size_t i = 0; i < tokens.size() - 1; i += 2)
800 {
801 uint64_t offset = std::stoull(tokens[i]);
802 uint32_t length = std::stoul(tokens[i + 1]);
803 auto buffer = BufferPool::Instance().Allocate( length );
804 chunks.emplace_back(offset, length, buffer->data());
805 buffers.emplace_back( std::move( buffer ) );
806 }
807 uint16_t timeout = static_cast<uint16_t>(std::stoul(tokens.back()));
808 return std::make_tuple(std::move(chunks), timeout, std::move(buffers));
809 }
810
811 //--------------------------------------------------------------------------
813 //--------------------------------------------------------------------------
814 inline std::tuple<ChunkList, uint16_t, std::vector<buffer_t>> GetVectorWriteArgs() { return GetVectorReadArgs(); }
815
816 File& file; //< the file object
817 const std::string action; //< the action to be executed
818 const std::string args; //< arguments for the operation
819 std::string orgststr; //< the original response status of the action
820 double nominalduration; //< the original duration of execution
821};
822
823//------------------------------------------------------------------------------
825//------------------------------------------------------------------------------
826std::vector<std::string> ToColumns( const std::string &row )
827{
828 std::vector<std::string> columns;
829 size_t quotecnt = 0;
830 size_t pos = 0;
831 //----------------------------------------------------------------------------
833 //----------------------------------------------------------------------------
834 while( pos != std::string::npos && pos < row.size() )
835 {
836 if( row[pos] == '"' ) // we are handling a quoted column
837 {
838 if( quotecnt > 0 ) // this is a closing quote
839 {
840 if( pos + 1 < row.size() && row[pos + 1] != ',' ) // if it is not the last character in the row it should be followed by a comma
841 throw std::runtime_error( "Parsing error: missing comma" );
842 --quotecnt; // strip the quote
843 ++pos; // move to the comma or end of row
844 continue;
845 }
846 else // this is a opening quote
847 {
848 ++quotecnt;
849 auto b = std::next( row.begin(), pos + 1 ); // iterator to the beginning of our column
850 size_t posend = row.find( "\",", pos + 1 ); // position of the cursor to the end of our column
851 if( posend == std::string::npos && row[row.size() - 1] == '"' )
852 posend = row.size() - 1;
853 else if( posend == std::string::npos )
854 throw std::runtime_error( "Parsing error: missing closing quote" );
855 auto e = std::next( row.begin(), posend ); // iterator to the end of our column
856 columns.emplace_back( b, e ); // add the column to the result
857 pos = posend; // move to the next column
858 continue;
859 }
860 }
861 else if( row[pos] == ',' ) // we are handling a column separator
862 {
863 if( pos + 1 < row.size() && row[pos + 1] == '"' ) // check if the column is quoted
864 {
865 ++pos; // if yes we will handle this with the logic reserved for quoted columns
866 continue;
867 }
868 auto b = std::next( row.begin(), pos + 1 ); // iterator to the beginning of our column
869 size_t posend = row.find( ',', pos + 1 ); // position of the cursor to the end of our column
870 if( posend == std::string::npos )
871 posend = row.size();
872 auto e = std::next( row.begin(), posend ); // iterator to the end of our column
873 columns.emplace_back( b, e ); // add the column to the result
874 pos = posend; // move to the next column
875 continue;
876 }
877 else if( pos == 0 ) // we are handling the 1st column if not quoted
878 {
879 size_t posend = row.find( ',', pos + 1 ); // position of the cursor to the end of our column
880 if( posend == std::string::npos )
881 posend = row.size();
882 auto end = std::next( row.begin(), posend ); // iterator to the end of our column
883 columns.emplace_back( row.begin(), end ); // add the column to the result
884 pos = posend; // move to the next column
885 continue;
886 }
887 else
888 {
889 throw std::runtime_error( "Parsing error: invalid input file." );
890 }
891 }
892 return columns;
893}
894
895//------------------------------------------------------------------------------
897//------------------------------------------------------------------------------
898using action_list = std::multimap<double, ActionExecutor>;
899
900//------------------------------------------------------------------------------
903//------------------------------------------------------------------------------
904std::unordered_map<File*, action_list> ParseInput(const std::string& path,
905 double& t0,
906 double& t1,
907 std::unordered_map<File*, std::string>& filenames,
908 std::unordered_map<File*, double>& synchronicity,
909 std::unordered_map<File*, size_t>& responseerrors,
910 const std::vector<std::string>& option_regex)
911{
912 std::unordered_map<File*, action_list> result;
913 std::unique_ptr<std::ifstream> fin( path.empty() ? nullptr : new std::ifstream( path, std::ifstream::in ) );
914 std::istream &input = path.empty() ? std::cin : *fin;
915 std::string line;
916 std::unordered_map<uint64_t, File*> files;
917 std::unordered_map<uint64_t, double> last_stop;
918 std::unordered_map<uint64_t, double> overlaps;
919 std::unordered_map<uint64_t, double> overlaps_cnt;
920
921 t0 = 10e99;
922 t1 = 0;
923 while (input.good())
924 {
925 std::getline(input, line);
926 if (line.empty())
927 continue;
928 std::vector<std::string> tokens = ToColumns( line );
929 if (tokens.size() == 6)
930 tokens.emplace_back();
931 if (tokens.size() != 7)
932 {
933 throw std::invalid_argument("Invalid input file format.");
934 }
935
936 uint64_t id = std::stoull(tokens[0]); // file object ID
937 std::string action = tokens[1]; // action name (e.g. Open)
938 double start = std::stod(tokens[2]); // start time
939 std::string args = tokens[3]; // operation arguments
940 double stop = std::stod(tokens[4]); // stop time
941 std::string status = tokens[5]; // operation status
942 std::string resp = tokens[6]; // server response
943
944 if (option_regex.size())
945 {
946 for (auto& v : option_regex)
947 {
948 std::vector<std::string> tokens;
949 Utils::splitString(tokens, v, ":=");
950 std::regex src(tokens[0]);
951 if (tokens.size() != 2)
952 {
953 std::cerr
954 << "Error: invalid regex for argument replacement - must be format like <oldstring>:=<newstring>"
955 << std::endl;
956 exit(EINVAL);
957 }
958 else
959 {
960 // write the results to an output iterator
961 args = std::regex_replace(args, src, tokens[1]);
962 }
963 }
964 }
965
966 if (start < t0)
967 t0 = start;
968 if (stop > t1)
969 t1 = stop;
970
971 if (!files.count(id))
972 {
973 files[id] = new File(false);
974 files[id]->SetProperty("BundledClose", "true");
975 filenames[files[id]] = args;
976 filenames[files[id]].erase(args.find(";"));
977 overlaps[id] = 0;
978 overlaps_cnt[id] = 0;
979 last_stop[id] = stop;
980 }
981 else
982 {
983 overlaps_cnt[id]++;
984 if (start > last_stop[id])
985 {
986 overlaps[id]++;
987 }
988 last_stop[id] = stop;
989 }
990
991 last_stop[id] = stop;
992 double nominal_duration = stop - start;
993
994 if (status != "[SUCCESS]")
995 {
996 responseerrors[files[id]]++;
997 }
998 else
999 {
1000 result[files[id]].emplace(
1001 start, ActionExecutor(*files[id], action, args, status, resp, nominal_duration));
1002 }
1003 }
1004
1005 for (auto& it : overlaps)
1006 {
1007 // compute the synchronicity of requests
1008 synchronicity[files[it.first]] = 100.0 * (it.second / overlaps_cnt[it.first]);
1009 }
1010 return result;
1011}
1012
1013//------------------------------------------------------------------------------
1019//------------------------------------------------------------------------------
1020std::thread ExecuteActions(std::unique_ptr<File> file,
1021 action_list&& actions,
1022 double t0,
1023 double speed,
1024 ActionMetrics& metric,
1025 bool simulate)
1026{
1027 std::thread t(
1028 [file{ std::move(file) },
1029 actions{ std::move(actions) },
1030 t0,
1031 &metric,
1032 simulate,
1033 &speed]() mutable
1034 {
1035 XrdSysSemaphore endsem(0);
1036 XrdSysSemaphore closesem(0);
1037 auto ending = std::make_shared<barrier_t>(endsem);
1038 auto closing = std::make_shared<barrier_t>(closesem);
1039
1040 for (auto& p : actions)
1041 {
1042 auto& action = p.second;
1043
1044 auto tdelay = t0 ? ((p.first + t0) - XrdCl::Action::timeNow()) : 0;
1045 if (tdelay > 0)
1046 {
1047 tdelay /= speed;
1048 metric.delays[action.Name() + "::tloss"] += tdelay;
1049 std::this_thread::sleep_for(std::chrono::milliseconds((int) (tdelay * 1000)));
1050 }
1051 else
1052 {
1053 metric.delays[action.Name() + "::tgain"] += tdelay;
1054 }
1055
1056 mytimer_t timer;
1057 action.Execute(ending, closing, metric, simulate);
1058 metric.addDelays(action.Name(), "tnomi", action.NominalDuration());
1059 metric.addDelays(action.Name(), "texec", timer.elapsed());
1060 }
1061 ending.reset();
1062 closing.reset();
1063 endsem.Wait();
1064 file->GetProperty("LastURL", metric.url);
1065 file.reset();
1066 });
1067 return t;
1068}
1069
1070}
1071
1072void usage()
1073{
1074 std::cerr
1075 << "usage: xrdreplay [-p|--print] [-c|--create-data] [t|--truncate-data] [-l|--long] [-s|--summary] [-h|--help] [-r|--replace <arg>:=<newarg>] [-f|--suppress] <recordfilename>\n"
1076 << std::endl;
1077 std::cerr << " -h | --help : show this help" << std::endl;
1078 std::cerr
1079 << " -f | --suppress : force to run all IO with all successful result status - suppress all others"
1080 << std::endl;
1081 std::cerr
1082 << " - by default the player won't run with an unsuccessfully recorded IO"
1083 << std::endl;
1084 std::cerr << std::endl;
1085 std::cerr
1086 << " -p | --print : print only mode - shows all the IO for the given replay file without actually running any IO"
1087 << std::endl;
1088 std::cerr
1089 << " -s | --summary : print summary - shows all the aggregated IO counter summed for all files"
1090 << std::endl;
1091 std::cerr
1092 << " -l | --long : print long - show all file IO counter for each individual file"
1093 << std::endl;
1094 std::cerr
1095 << " -r | --replace <a>:=<b> : replace in the argument list the string <a> with <b> "
1096 << std::endl;
1097 std::cerr
1098 << " - option is usable several times e.g. to change storage prefixes or filenames"
1099 << std::endl;
1100 std::cerr << std::endl;
1101 std::cerr
1102 << "example: ... --replace file:://localhost:=root://xrootd.eu/ : redirect local file to remote"
1103 << std::endl;
1104 std::cerr << std::endl;
1105 exit(-1);
1106}
1107
1108int main(int argc, char** argv)
1109{
1110 XrdCl::ReplayArgs opt(argc, argv);
1111 int rc = 0;
1112
1113 try
1114 {
1115 double t0 = 0;
1116 double t1 = 0;
1117 std::unordered_map<XrdCl::File*, std::string> filenames;
1118 std::unordered_map<XrdCl::File*, double> synchronicity;
1119 std::unordered_map<XrdCl::File*, size_t> responseerrors;
1120 auto actions = XrdCl::ParseInput(opt.path(),
1121 t0,
1122 t1,
1123 filenames,
1124 synchronicity,
1125 responseerrors,
1126 opt.regex()); // parse the input file
1127 std::vector<std::thread> threads;
1128 std::unordered_map<XrdCl::File*, XrdCl::ActionMetrics> metrics;
1129 threads.reserve(actions.size());
1130 double toffset = XrdCl::Action::timeNow() - t0;
1131 XrdCl::mytimer_t timer;
1132 XrdCl::ActionMetrics summetric;
1133 bool sampling_error = false;
1134
1135 for (auto& action : actions)
1136 {
1137 metrics[action.first].fname = filenames[action.first];
1138 metrics[action.first].synchronicity = synchronicity[action.first];
1139 metrics[action.first].errors = responseerrors[action.first];
1140 if (metrics[action.first].errors)
1141 {
1142 sampling_error = true;
1143 }
1144 }
1145
1146 if (sampling_error)
1147 {
1148 std::cerr << "Warning: IO file contains unsuccessful samples!" << std::endl;
1149 if (!opt.suppress_error())
1150 {
1151 std::cerr << "... run with [-f] or [--suppress] option to suppress unsuccessful IO events!"
1152 << std::endl;
1153 exit(-1);
1154 }
1155 }
1156
1157
1158 if (opt.print())
1159 toffset = 0; // indicate not to follow timing
1160
1161 for (auto& action : actions)
1162 {
1163 // execute list of actions against file object
1164 threads.emplace_back(ExecuteActions(std::unique_ptr<XrdCl::File>(action.first),
1165 std::move(action.second),
1166 toffset,
1167 opt.speed(),
1168 metrics[action.first],
1169 opt.print()));
1170 }
1171
1172 for (auto& t : threads) // wait until we are done
1173 t.join();
1174
1175 if (opt.json())
1176 {
1177 std::cout << "{" << std::endl;
1178 if (opt.longformat())
1179 std::cout << " \"metrics\": [" << std::endl;
1180 }
1181
1182 for (auto& metric : metrics)
1183 {
1184 if (opt.longformat())
1185 {
1186 std::cout << metric.second.Dump(opt.json());
1187 }
1188 summetric.add(metric.second);
1189 }
1190
1191 if (opt.summary())
1192 std::cout << summetric.Dump(opt.json());
1193
1194 if (opt.json())
1195 {
1196 if (opt.longformat())
1197 std::cout << " ]," << std::endl;
1198 }
1199
1200 double tbench = timer.elapsed();
1201
1202 if (opt.json())
1203 {
1204 {
1205 std::cout << " \"iosummary\": { " << std::endl;
1206 if (!opt.print())
1207 {
1208 std::cout << " \"player::runtime\": " << tbench << "," << std::endl;
1209 }
1210 std::cout << " \"player::speed\": " << opt.speed() << "," << std::endl;
1211 std::cout << " \"sampled::runtime\": " << t1 - t0 << "," << std::endl;
1212 std::cout << " \"volume::totalread\": " << summetric.getBytesRead() << "," << std::endl;
1213 std::cout << " \"volume::totalwrite\": " << summetric.getBytesWritten() << ","
1214 << std::endl;
1215 std::cout << " \"volume::read\": " << summetric.ios["Read::b"] << "," << std::endl;
1216 std::cout << " \"volume::write\": " << summetric.ios["Write::b"] << "," << std::endl;
1217 std::cout << " \"volume::pgread\": " << summetric.ios["PgRead::b"] << "," << std::endl;
1218 std::cout << " \"volume::pgwrite\": " << summetric.ios["PgWrite::b"] << "," << std::endl;
1219 std::cout << " \"volume::vectorread\": " << summetric.ios["VectorRead::b"] << ","
1220 << std::endl;
1221 std::cout << " \"volume::vectorwrite\": " << summetric.ios["VectorWrite::b"] << ","
1222 << std::endl;
1223 std::cout << " \"iops::read\": " << summetric.ios["Read::n"] << "," << std::endl;
1224 std::cout << " \"iops::write\": " << summetric.ios["Write::n"] << "," << std::endl;
1225 std::cout << " \"iops::pgread\": " << summetric.ios["PgRead::n"] << "," << std::endl;
1226 std::cout << " \"iops::pgwrite\": " << summetric.ios["PgRead::n"] << "," << std::endl;
1227 std::cout << " \"iops::vectorread\": " << summetric.ios["VectorRead::n"] << ","
1228 << std::endl;
1229 std::cout << " \"iops::vectorwrite\": " << summetric.ios["VectorRead::n"] << ","
1230 << std::endl;
1231 std::cout << " \"files::read\": " << summetric.ios["OpenR::n"] << "," << std::endl;
1232 std::cout << " \"files::write\": " << summetric.ios["OpenW::n"] << "," << std::endl;
1233 std::cout << " \"datasetsize::read\": " << summetric.ios["Read::o"] << "," << std::endl;
1234 std::cout << " \"datasetsize::write\": " << summetric.ios["Write::o"] << "," << std::endl;
1235 if (!opt.print())
1236 {
1237 std::cout << " \"bandwidth::mb::read\": "
1238 << summetric.getBytesRead() / tbench / 1000000.0 << "," << std::endl;
1239 std::cout << " \"bandwdith::mb::write\": "
1240 << summetric.getBytesWritten() / tbench / 1000000.0 << "," << std::endl;
1241 std::cout << " \"performancemark\": " << (100.0 * (t1 - t0) / tbench) << ","
1242 << std::endl;
1243 std::cout << " \"gain::read\":"
1244 << (100.0 * summetric.delays["Read::tnomi"] / summetric.delays["Read::tmeas"])
1245 << "," << std::endl;
1246 std::cout << " \"gain::write\":"
1247 << (100.0 * summetric.delays["Write::tnomi"] / summetric.delays["Write::tmeas"])
1248 << std::endl;
1249 }
1250 std::cout << " \"synchronicity::read\":"
1251 << summetric.aggregated_synchronicity.ReadSynchronicity() << "," << std::endl;
1252 std::cout << " \"synchronicity::write\":"
1253 << summetric.aggregated_synchronicity.WriteSynchronicity() << "," << std::endl;
1254 std::cout << " \"response::error:\":" << summetric.ios["All::e"] << std::endl;
1255 std::cout << " }" << std::endl;
1256 std::cout << "}" << std::endl;
1257 }
1258 }
1259 else
1260 {
1261 std::cout << "# =============================================" << std::endl;
1262 if (!opt.print())
1263 std::cout << "# IO Summary" << std::endl;
1264 else
1265 std::cout << "# IO Summary (print mode)" << std::endl;
1266 std::cout << "# =============================================" << std::endl;
1267 if (!opt.print())
1268 {
1269 std::cout << "# Total Runtime : " << std::fixed << tbench << " s" << std::endl;
1270 }
1271 std::cout << "# Sampled Runtime : " << std::fixed << t1 - t0 << " s" << std::endl;
1272 std::cout << "# Playback Speed : " << std::fixed << std::setprecision(2) << opt.speed()
1273 << std::endl;
1274 std::cout << "# IO Volume (R) : " << std::fixed
1276 << " [ std:" << XrdCl::ActionMetrics::humanreadable(summetric.ios["Read::b"])
1277 << " vec:" << XrdCl::ActionMetrics::humanreadable(summetric.ios["VectorRead::b"])
1278 << " page:" << XrdCl::ActionMetrics::humanreadable(summetric.ios["PgRead::b"])
1279 << " ] " << std::endl;
1280 std::cout << "# IO Volume (W) : " << std::fixed
1282 << " [ std:" << XrdCl::ActionMetrics::humanreadable(summetric.ios["Write::b"])
1283 << " vec:" << XrdCl::ActionMetrics::humanreadable(summetric.ios["VectorWrite::b"])
1284 << " page:" << XrdCl::ActionMetrics::humanreadable(summetric.ios["PgWrite::b"])
1285 << " ] " << std::endl;
1286 std::cout << "# IOPS (R) : " << std::fixed << summetric.getIopsRead()
1287 << " [ std:" << summetric.ios["Read::n"]
1288 << " vec:" << summetric.ios["VectorRead::n"]
1289 << " page:" << summetric.ios["PgRead::n"] << " ] " << std::endl;
1290 std::cout << "# IOPS (W) : " << std::fixed << summetric.getIopsWrite()
1291 << " [ std:" << summetric.ios["Write::n"]
1292 << " vec:" << summetric.ios["VectorWrite::n"]
1293 << " page:" << summetric.ios["PgWrite::n"] << " ] " << std::endl;
1294 std::cout << "# Files (R) : " << std::fixed << summetric.ios["OpenR::n"] << std::endl;
1295 std::cout << "# Files (W) : " << std::fixed << summetric.ios["OpenW::n"] << std::endl;
1296 std::cout << "# Datasize (R) : " << std::fixed
1297 << XrdCl::ActionMetrics::humanreadable(summetric.ios["Read::o"]) << std::endl;
1298 std::cout << "# Datasize (W) : " << std::fixed
1299 << XrdCl::ActionMetrics::humanreadable(summetric.ios["Write::o"]) << std::endl;
1300 if (!opt.print())
1301 {
1302 std::cout << "# IO BW (R) : " << std::fixed << std::setprecision(2)
1303 << summetric.getBytesRead() / tbench / 1000000.0 << " MB/s" << std::endl;
1304 std::cout << "# IO BW (W) : " << std::fixed << std::setprecision(2)
1305 << summetric.getBytesRead() / tbench / 1000000.0 << " MB/s" << std::endl;
1306 }
1307 std::cout << "# ---------------------------------------------" << std::endl;
1308 std::cout << "# Quality Estimation" << std::endl;
1309 std::cout << "# ---------------------------------------------" << std::endl;
1310 if (!opt.print())
1311 {
1312 std::cout << "# Performance Mark : " << std::fixed << std::setprecision(2)
1313 << (100.0 * (t1 - t0) / tbench) << "%" << std::endl;
1314 std::cout << "# Gain Mark(R) : " << std::fixed << std::setprecision(2)
1315 << (100.0 * summetric.delays["Read::tnomi"] / summetric.delays["Read::tmeas"])
1316 << "%" << std::endl;
1317 std::cout << "# Gain Mark(W) : " << std::fixed << std::setprecision(2)
1318 << (100.0 * summetric.delays["Write::tnomi"] / summetric.delays["Write::tmeas"])
1319 << "%" << std::endl;
1320 }
1321 std::cout << "# Synchronicity(R) : " << std::fixed << std::setprecision(2)
1322 << summetric.aggregated_synchronicity.ReadSynchronicity() << "%" << std::endl;
1323 std::cout << "# Synchronicity(W) : " << std::fixed << std::setprecision(2)
1324 << summetric.aggregated_synchronicity.WriteSynchronicity() << "%" << std::endl;
1325 if (!opt.print())
1326 {
1327 std::cout << "# ---------------------------------------------" << std::endl;
1328 std::cout << "# Response Errors : " << std::fixed << summetric.ios["All::e"] << std::endl;
1329 std::cout << "# =============================================" << std::endl;
1330 if (summetric.ios["All::e"])
1331 {
1332 std::cerr << "Error: replay job failed with IO errors!" << std::endl;
1333 rc = -5;
1334 }
1335 }
1336 if (opt.create() || opt.verify())
1337 {
1338 std::cout << "# ---------------------------------------------" << std::endl;
1339 if (opt.create())
1340 {
1341 std::cout << "# Creating Dataset ..." << std::endl;
1342 }
1343 else
1344 {
1345 std::cout << "# Verifying Dataset ..." << std::endl;
1346 }
1347 uint64_t created_sofar = 0;
1348 for (auto& metric : metrics)
1349 {
1350 if (metric.second.getBytesRead() && !metric.second.getBytesWritten())
1351 {
1352 std::cout << "# ............................................." << std::endl;
1353 std::cout << "# file: " << metric.second.fname << std::endl;
1354 std::cout << "# size: "
1355 << XrdCl::ActionMetrics::humanreadable(metric.second.ios["Read::o"]) << " [ "
1356 << XrdCl::ActionMetrics::humanreadable(created_sofar) << " out of "
1357 << XrdCl::ActionMetrics::humanreadable(summetric.ios["Read::o"]) << " ] "
1358 << std::setprecision(2) << " ( "
1359 << 100.0 * created_sofar / summetric.ios["Read::o"] << "% )" << std::endl;
1360 if (!XrdCl::AssureFile(
1361 metric.second.fname, metric.second.ios["Read::o"], opt.truncate(), opt.verify()))
1362 {
1363 if (opt.verify())
1364 {
1365 rc = -5;
1366 }
1367 else
1368 {
1369 std::cerr << "Error: failed to assure that file " << metric.second.fname
1370 << " is stored with a size of "
1371 << XrdCl::ActionMetrics::humanreadable(metric.second.ios["Read::o"])
1372 << " !!!";
1373 rc = -5;
1374 }
1375 }
1376 }
1377 }
1378 }
1379 }
1380 }
1381 catch (const std::invalid_argument& ex)
1382 {
1383 std::cout << ex.what() << std::endl; // print parsing errors
1384 return 1;
1385 }
1386
1387 return rc;
1388}
struct stat Stat
Definition XrdCks.cc:49
void usage()
int main(int argc, char **argv)
XrdOucString File
Executes an action registered in the csv file.
double NominalDuration() const
Get nominal duration variable.
void Execute(std::shared_ptr< barrier_t > &ending, std::shared_ptr< barrier_t > &closing, ActionMetrics &metric, bool simulate)
std::string Name() const
Get aciton name.
ActionExecutor(File &file, const std::string &action, const std::string &args, const std::string &orgststr, const std::string &resp, const double &duration)
Buffer pool - to limit memory consumption.
std::shared_ptr< std::vector< char > > Allocate(size_t length)
static BufferPool & Instance()
Single instance access.
static Log * GetLog()
Get default log.
A file.
Definition XrdClFile.hh:46
void Warning(uint64_t topic, const char *format,...)
Report a warning.
Definition XrdClLog.cc:248
Args parse for XrdClReplay.
std::string & path()
std::vector< std::string > & regex()
Object stat info.
uint64_t GetSize() const
Get size (in bytes)
static void splitString(Container &result, const std::string &input, const std::string &delimiter)
Split a string.
Definition XrdClUtils.hh:56
~barrier_t()
Destructor.
XrdSysSemaphore & get()
barrier_t(XrdSysSemaphore &sem)
Timer helper class.
double elapsed() const
mytimer_t()
Constructor (record start time)
void reset()
Reset the start time.
VectorWriteImpl< false > VectorWrite(Ctx< File > file, Arg< ChunkList > chunks, uint16_t timeout=0)
Factory for creating VectorWriteImpl objects.
ReadImpl< false > Read(Ctx< File > file, Arg< uint64_t > offset, Arg< uint32_t > size, Arg< void * > buffer, uint16_t timeout=0)
Factory for creating ReadImpl objects.
std::unordered_map< File *, action_list > ParseInput(const std::string &path, double &t0, double &t1, std::unordered_map< File *, std::string > &filenames, std::unordered_map< File *, double > &synchronicity, std::unordered_map< File *, size_t > &responseerrors, const std::vector< std::string > &option_regex)
const uint64_t AppMsg
SyncImpl< false > Sync(Ctx< File > file, uint16_t timeout=0)
Factory for creating SyncImpl objects.
CloseImpl< false > Close(Ctx< File > file, uint16_t timeout=0)
Factory for creating CloseImpl objects.
XRootDStatus WaitFor(Pipeline pipeline, uint16_t timeout=0)
TruncateImpl< false > Truncate(Ctx< File > file, Arg< uint64_t > size, uint16_t timeout)
std::multimap< double, ActionExecutor > action_list
List of actions: start time - action.
PgReadImpl< false > PgRead(Ctx< File > file, Arg< uint64_t > offset, Arg< uint32_t > size, Arg< void * > buffer, uint16_t timeout=0)
Factory for creating PgReadImpl objects.
WriteImpl< false > Write(Ctx< File > file, Arg< uint64_t > offset, Arg< uint32_t > size, Arg< const void * > buffer, uint16_t timeout=0)
Factory for creating WriteImpl objects.
std::vector< std::string > ToColumns(const std::string &row)
Split a row into columns.
PgWriteImpl< false > PgWrite(Ctx< File > file, Arg< uint64_t > offset, Arg< uint32_t > size, Arg< void * > buffer, Arg< std::vector< uint32_t > > cksums, uint16_t timeout=0)
Factory for creating PgReadImpl objects.
std::vector< ChunkInfo > ChunkList
List of chunks.
VectorReadImpl< false > VectorRead(Ctx< File > file, Arg< ChunkList > chunks, Arg< void * > buffer, uint16_t timeout=0)
Factory for creating VectorReadImpl objects.
std::thread ExecuteActions(std::unique_ptr< File > file, action_list &&actions, double t0, double speed, ActionMetrics &metric, bool simulate)
OpenImpl< false > Open(Ctx< File > file, Arg< std::string > url, Arg< OpenFlags::Flags > flags, Arg< Access::Mode > mode=Access::None, uint16_t timeout=0)
Factory for creating ReadImpl objects.
std::future< XRootDStatus > Async(Pipeline pipeline, uint16_t timeout=0)
bool AssureFile(const std::string &url, uint64_t size, bool viatruncate, bool verify)
AssureFile creates input data files on the fly if required.
std::vector< char > buffer_t
@ UR
owner readable
@ UW
owner writable
@ UX
owner executable/browsable
Metrics struct storing all timing and IO information of an action.
synchronicity_t aggregated_synchronicity
void add(const ActionMetrics &other)
std::map< std::string, uint64_t > ios
void addIos(const std::string &action, const std::string &field, double value)
std::map< std::string, double > delays
void addDelays(const std::string &action, const std::string &field, double value)
std::string Dump(bool json) const
static std::string humanreadable(uint64_t insize)
static double timeNow()
Get curretn unix time in ns precision as a double.
Describe a data chunk for vector read.
Flags
Open flags, may be or'd when appropriate.
@ Read
Open only for reading.
@ Write
Open only for writing.
@ Update
Open for reading and writing.
bool IsOK() const
We're fine.
std::string ToString() const
Create a string representation.