51 using std::this_thread::sleep_for;
54 Time t100us =
Time{FSecs{1, 10
'000}}; 55 Time t200us = t100us + t100us; 56 Time t500us = t200us + t200us + t100us; 57 Time t1ms = Time{1,0}; 59 const uint TYPICAL_TIME_FOR_ONE_SCHEDULE_us = 3; 65 /*************************************************************************//** 66 * @test Scheduler component integration test: use the service API for 67 * state control and to add Jobs and watch processing patterns. 68 * @see SchedulerActivity_test 69 * @see SchedulerInvocation_test 70 * @see SchedulerCommutator_test 71 * @see SchedulerLoadControl_test 73 class SchedulerService_test : public Test 95 Scheduler scheduler{bFlow, watch}; 96 CHECK (scheduler.empty()); 98 auto task = onetimeCrunch(4ms); 99 CHECK (1 == task.remainingInvocations()); 102 , InvocationInstanceID() 105 scheduler.defineSchedule(job) 109 CHECK (not scheduler.empty()); 111 sleep_for (3ms); // not invoked yet 112 CHECK (1 == task.remainingInvocations()); 115 CHECK (0 == task.remainingInvocations()); 116 } // task has been invoked 129 postNewTask (Scheduler& scheduler, Activity& chain, Time start) 131 ActivationEvent actEvent{chain, start, start + Time{50,0}}; // add dummy deadline +50ms 132 scheduler.layer2_.postChain (actEvent, scheduler.layer1_); 142 BlockFlowAlloc bFlow; 143 EngineObserver watch; 144 Scheduler scheduler{bFlow, watch}; 145 CHECK (scheduler.empty()); 147 Activity dummy{Activity::FEED}; 148 auto postIt = [&] { postNewTask (scheduler, dummy, RealClock::now()+t200us); }; 151 CHECK (not scheduler.empty());// repeated »tick« task enlisted.... 154 CHECK (not scheduler.empty()); 156 scheduler.terminateProcessing(); 157 CHECK (scheduler.empty()); 162 CHECK (not scheduler.empty()); 163 //... and just walk away => scheduler unwinds cleanly from destructor 164 }// Note: BlockFlow and WorkForce unwinding is covered in dedicated tests 187 BlockFlowAlloc bFlow; 188 EngineObserver watch; 189 Scheduler scheduler{bFlow, watch}; 190 CHECK (scheduler.empty()); 192 // use a single FEED as content 193 Activity dummy{Activity::FEED}; 195 auto anchor = RealClock::now(); 196 auto offset = [&](Time when =RealClock::now()){ return _raw(when) - _raw(anchor); }; 198 auto createLoad = [&](Offset start, uint cnt) 199 { // use internal API (this test is declared as friend) 200 for (uint i=0; i<cnt; ++i) // flood the queue 201 postNewTask (scheduler, dummy, anchor + start + TimeValue{i}); 205 auto LOAD_PEAK_DURATION_us = 2000; 206 auto fatPackage = LOAD_PEAK_DURATION_us/TYPICAL_TIME_FOR_ONE_SCHEDULE_us; 208 createLoad (Offset{Time{ 5,0}}, fatPackage); 209 createLoad (Offset{Time{15,0}}, fatPackage); 212 cout << "Timing : start-up required.."<<offset()<<" µs"<<endl; 214 // now watch change of load and look out for two peaks.... 223 _Fmt row{"%6d | Load: %5.3f Head:%5d Lag:%6d\n"}; 225 while (not scheduler.empty()) // should fall empty at end 228 double load = scheduler.getLoadIndicator(); 239 peak1_max = max (load, peak1_max); 243 peak1_dur = offset() - peak1_s; 254 peak2_max = max (load, peak2_max); 258 peak2_dur = offset() - peak2_s; 262 cout << row % offset() % load 263 % offset(scheduler.layer1_.headTime()) 264 % scheduler.loadControl_.averageLag(); 266 uint done = offset(); 268 //--------Summary-Table------------------------------ 269 _Fmt peak{"\nPeak %d ....... %5d +%dµs %34tmax=%3.1f"}; 270 cout << "-------+-------------+----------+----------" 272 << peak % 1 % peak1_s % peak1_dur % peak1_max 273 << peak % 2 % peak2_s % peak2_dur % peak2_max 274 << "\nTick ....... "<<done 278 CHECK (peak1_s > 5000); // first peak was scheduled at 5ms 279 CHECK (peak1_s < 10000); 280 CHECK (peak2_s > 15000); // second peak was scheduled at 15ms 281 CHECK (peak2_s < 20000); 282 CHECK (peak1_max > 2.0); 283 CHECK (peak2_max > 2.0); 285 CHECK (done > 50000); // »Tick« period is 50ms 286 // and this tick should determine end of timeline 288 cout << "\nwaiting for shutdown of WorkForce"; 289 while (scheduler.workForce_.size() > 0) 292 cout << "." << std::flush; 294 uint shutdown = offset(); 295 cout << "\nShutdown after "<<shutdown / 1.0e6<<"sec"<<endl; 296 CHECK (shutdown > 2.0e6); 332 BlockFlowAlloc bFlow; 333 EngineObserver watch; 334 Scheduler scheduler{bFlow, watch}; 336 ActivityDetector detector; 337 Activity& probe = detector.buildActivationProbe ("testProbe"); 344 auto post = [&](Time start) 345 { // this test class is declared friend to get a backdoor into Scheduler internals... 346 scheduler.layer2_.acquireGoomingToken(); 347 postNewTask (scheduler, probe, start); 350 auto pullWork = [&] { 351 delay_us = lib::test::benchmarkTime([&]{ res = scheduler.doWork(); }); 352 slip_us = _raw(detector.invokeTime(probe)) - _raw(start); 353 cout << "res:"<<res<<" delay="<<delay_us<<"µs slip="<<slip_us<<"µs"<<endl; 357 auto wasClose = [](TimeValue a, TimeValue b) 358 { // 500µs are considered "close" 359 return Duration{Offset{a,b}} < Duration{FSecs{1,2000}}; 361 auto wasInvoked = [&](Time start) 363 Time invoked = detector.invokeTime (probe); 364 return invoked >= start 365 and wasClose (invoked, start); 369 cout << "pullWork() on empty queue..."<<endl; 370 pullWork(); // Call the work-Function on empty Scheduler queue 371 CHECK (activity::WAIT == res); // the result instructs this thread to go to sleep immediately 374 cout << "Due at pullWork()..."<<endl; 375 TimeVar now = RealClock::now(); 376 start = now + t100us; // Set a schedule 100ms ahead of "now" 378 CHECK (not scheduler.empty()); // was enqueued 379 CHECK (not wasInvoked(start)); // ...but not activated yet 381 sleep_for (100us); // wait beyond the planned start point (typically waits ~150µs or more) 383 CHECK (wasInvoked(start)); 384 CHECK (slip_us < 300); // Note: typically there is a slip of 100..200µs, because sleep waits longer 385 CHECK (scheduler.empty()); // The scheduler is empty now and this thread will go to sleep, 386 CHECK (delay_us < 20200); // however the sleep-cycle is first re-shuffled by a wait between 0 ... 20ms 387 CHECK (activity::PASS == res); // this thread is instructed to check back once 389 CHECK (activity::WAIT == res); // ...yet since the queue is still empty, it is sent immediately to sleep 390 CHECK (delay_us < 40); 393 cout << "next some time ahead => up-front delay"<<endl; 394 now = RealClock::now(); 395 start = now + t500us; // Set a schedule significantly into the future... 397 CHECK (not scheduler.empty()); 399 pullWork(); // ...and invoke the work-Function immediately "now" 400 CHECK (activity::PASS == res); // Result: this thread was kept in sleep in the work-Function 401 CHECK (not wasInvoked(start)); // but the next dispatch did not happen yet; we are instructed to re-invoke immediately 402 CHECK (delay_us > 500); // this proves that there was a delay to wait for the next schedule 403 CHECK (delay_us < 1000); 404 pullWork(); // if we now re-invoke the work-Function as instructed... 405 CHECK (wasInvoked(start)); // then the next schedule is already slightly overdue and immediately invoked 406 CHECK (scheduler.empty()); // the queue is empty and thus this thread will be sent to sleep 407 CHECK (delay_us < 20200); // but beforehand the sleep-cycle is re-shuffled by a wait between 0 ... 20ms 408 CHECK (slip_us < 300); 409 CHECK (activity::PASS == res); // instruction to check back once 411 CHECK (activity::WAIT == res); // but next call will send this thread to sleep right away 412 CHECK (delay_us < 40); 415 cout << "follow-up with some distance => follow-up delay"<<endl; 416 now = RealClock::now(); 417 start = now + t100us; 418 post (start); // This time the schedule is set to be "soon" 419 post (start+t1ms); // But another schedule is placed 1ms behind 420 sleep_for (100us); // wait for "soon" to pass... 422 CHECK (wasInvoked(start)); // Result: the first invocation happened immediately 423 CHECK (slip_us < 300); 424 CHECK (delay_us > 900); // yet this thread was afterwards kept in sleep to await the next task; 425 CHECK (activity::PASS == res); // returns instruction to re-invoke immediately 426 CHECK (not scheduler.empty()); // since there is still work in the queue 428 start += t1ms; // (just re-adjust the reference point to calculate slip_us) 429 pullWork(); // re-invoke immediately as instructed 430 CHECK (wasInvoked(start)); // Result: also the next Activity has been dispatched 431 CHECK (slip_us < 400); // not much slip 432 CHECK (delay_us < 20200); // ...and the post-delay is used to re-shuffle the sleep cycle as usual 433 CHECK (activity::PASS == res); // since queue is empty, we will call back once... 434 CHECK (scheduler.empty()); 436 CHECK (activity::WAIT == res); // and then go to sleep. 439 cout << "already tended-next => re-target capacity"<<endl; 440 now = RealClock::now(); 441 start = now + t500us; // Set the next schedule with some distance... 444 // Access scheduler internals (as friend) 445 CHECK (start == scheduler.layer1_.headTime()); // next schedule indeed appears as next-head 446 CHECK (not scheduler.loadControl_.tendedNext(start)); // but this next time was not yet marked as "tended" 448 scheduler.loadControl_.tendNext(start); // manipulate scheduler to mark next-head as "tended" 449 CHECK ( scheduler.loadControl_.tendedNext(start)); 451 CHECK (start == scheduler.layer1_.headTime()); // other state still the same 452 CHECK (not scheduler.empty()); 455 CHECK (not wasInvoked(start)); // since next-head was marked as "tended"... 456 CHECK (not scheduler.empty()); // ...this thread is not used to dispatch it 457 CHECK (delay_us < 6000); // rather it is re-focussed as free capacity within WORK_HORIZON 477 BlockFlowAlloc bFlow; 478 EngineObserver watch; 479 Scheduler scheduler{bFlow, watch}; 481 // prevent scale-up of the Scheuler's
WorkForce 489 Job testJob{detector.buildMockJob(
"testJob", nominal, 1337)};
491 CHECK (scheduler.empty());
494 scheduler.defineSchedule(testJob)
499 CHECK (not scheduler.empty());
502 scheduler.layer2_.maybeFeed(scheduler.layer1_);
505 auto entry = scheduler.layer1_.peekHead();
506 auto now = RealClock::now();
512 CHECK (
entry.startTime() - now < _uTicks( 400us));
513 CHECK (
entry.deathTime() - now < _uTicks(2400us));
514 CHECK (
entry.manifestation == 0);
515 CHECK (
entry.isCompulsory ==
false);
522 CHECK (activity::PASS == scheduler.doWork());
524 CHECK (detector.verifySeqIncrement(1)
525 .beforeInvocation(
"testJob").
arg(
"7.007", 1337));
554 cout <<
_Fmt{
"Test-Load: Nodes: %d Levels: %d ∅Node/Level: %3.1f Forks: %d Joins: %d"}
564 size_t expectedHash = testLoad.getHash();
568 auto LOAD_BASE = 500us;
569 testLoad.performGraphSynchronously(LOAD_BASE);
570 CHECK (testLoad.getHash() == expectedHash);
572 double referenceTime = testLoad.calcRuntimeReference(LOAD_BASE);
573 cout <<
"refTime(singleThr): "<<referenceTime/1000<<
"ms"<<endl;
581 double performanceTime =
582 testLoad.setupSchedule(scheduler)
583 .withLoadTimeBase(LOAD_BASE)
584 .withJobDeadline(30ms)
587 cout <<
"runTime(Scheduler): "<<performanceTime/1000<<
"ms"<<endl;
590 CHECK (testLoad.getHash() == expectedHash);
593 CHECK (performanceTime < 2*referenceTime);
const StatKey STAT_NODE
all nodes
signal start of some processing and transition grooming mode ⟼ *work mode
#define TRANSIENTLY(_OO_)
Macro to simplify capturing assignments.
AnyPair entry(Query< TY > const &query, typename WrapReturn< TY >::Wrapper &obj)
helper to simplify creating mock table entries, wrapped correctly
TestChainLoad && buildTopology()
Use current configuration and seed to (re)build Node connectivity.
Generate synthetic computation load for Scheduler performance tests.
Functions to perform (multithreaded) timing measurement on a given functor.
A Generator for synthetic Render Jobs for Scheduler load testing.
A front-end for using printf-style formatting.
Lumiera's internal time value datatype.
post a message providing a chain of further time-bound Activities
Abstract Base Class for all testcases.
»Scheduler-Service« : coordinate render activities.
Service for coordination and dispatch of render activities.
Diagnostic context to record and evaluate activations within the Scheduler.
#define MARK_TEST_FUN
Macro to mark the current test function in STDOUT.
Simple test class runner.
Tiny helper functions and shortcuts to be used everywhere Consider this header to be effectively incl...
TestChainLoad && configureShape_chain_loadBursts()
preconfigured topology: single graph with massive »load bursts«
const StatKey STAT_JOIN
joining node
uint incrementSeq()
increment the internal invocation sequence number
probe window + count-down; activate next Activity, else re-schedule
Diagnostic setup to instrument and observe Activity activations.
Statistic computeGraphStatistics()
Operator on TestChainLoad to evaluate current graph connectivity.
Test helper to perform temporary manipulations within a test scope.
dispatch a JobFunctor into a worker thread
const StatKey STAT_FORK
forking node
static size_t COMPUTATION_CAPACITY
Nominal »full size« of a pool of concurrent workers.
Individual frame rendering task, forwarding to a closure.
a family of time value like entities and their relationships.
Pool of worker threads for rendering.
ActivityMatch & arg(ARGS const &...args)
qualifier: additionally match the function arguments
Vault-Layer implementation namespace root.
Collector and aggregator for performance data.