source: MicroThreading/UMicroThreading.pas

Last change on this file was 170, checked in by george, 13 years ago
  • Added: Support for logging critical section lifetime.
  • Fixed: Wrong MicroThread registraction in WaitFor internal list.
File size: 34.8 KB
Line 
1// Date: 2010-02-07
2
3(*
4Not implemented yet
5- Stack limit checking
6- measurement of cpu usage by micro threads
7- wait for multiple objects
8- micro thread priority
9*)
10
11unit UMicroThreading;
12
13{$mode Delphi}{$H+}
14{$asmmode intel}
15
16interface
17
18uses
19 {$IFDEF UNIX}{$IFDEF UseCThreads}
20 cthreads,
21 {$ENDIF}{$ENDIF}
22 Classes, ExtCtrls, SysUtils, Contnrs, SyncObjs, DateUtils, Dialogs, Forms,
23 UPlatform, UMicroThreadList, UThreadEx;
24
25const
26 DefaultStackSize = $8000;
27
28resourcestring
29 SStackOverflow = 'Microthread %d stack error. Pointer %s , range < %s ; %s >';
30 SNilThreadReference = 'Can''t release nil thread.';
31 SManagerMicroThreadRunning = 'Manager already have running microthread';
32 SManagerReferenceLost = 'Reference to manager lost';
33 SCantDetermineThreadID = 'Can''t determine thread for id %d';
34 SNotInMicroThread = 'Not in microthread';
35 SReleaseNotAcquiredLock = 'Release on not acquired lock';
36 SMethodNotAssigned = 'Method for microthread not assigned';
37 SCriticalSectionDecrement = 'Critical section counter decremented to negative number';
38
39
40type
41 EMicroThreadError = class(Exception);
42
43 TMicroThread = class;
44 TMicroThreadScheduler = class;
45 TMicroThreadManager = class;
46
47 TMicroThreadState = (tsNone, tsWaiting, tsRunning, tsBlocked, tsSuspended);
48 TMicroThreadBlockState = (tbsNone, tbsSleeping, tbsWaitFor, tbsTerminating,
49 tbsTerminated, tbsCriticalSection);
50
51 { TMicroThreadCriticalSection }
52
53 TMicroThreadCriticalSection = class
54 private
55 FId: Integer;
56 FMicroThreads: TObjectList;
57 //Lock: TCriticalSection;
58 FCounter: Integer;
59 public
60 procedure Acquire;
61 procedure Release;
62 constructor Create;
63 destructor Destroy; override;
64 property Id: Integer read FId;
65 end;
66
67 { TMicroThreadEvent }
68
69 TMicroThreadEvent = class
70 private
71 FAutoReset: Boolean;
72 FSignaled: Boolean;
73 FMicroThreads: TObjectList;
74 FMicroThreadsLock: TCriticalSection;
75 public
76 procedure SetEvent;
77 procedure ResetEvent;
78 function WaitFor(Duration: TDateTime): TWaitResult;
79 constructor Create;
80 destructor Destroy; override;
81 property Signaled: Boolean read FSignaled;
82 property AutoReset: Boolean read FAutoReset write FAutoReset;
83 end;
84
85 { TMicroThread }
86
87 TMicroThread = class
88 private
89 FFreeOnTerminate: Boolean;
90 FExecutionStartTime: TDateTime;
91 FExecutionEndTime: TDateTime;
92 FExecutionTime: TDateTime;
93 FExecutionCount: Integer;
94 FStack: Pointer;
95 FStackPointer: Pointer; // Register SP
96 FStackSize: Integer;
97 FBasePointer: Pointer; // Register BP
98 FExceptObjectStack: PExceptObject;
99 FExceptAddrStack: PExceptAddr;
100 FExecuted: Boolean; // At first go through Execute method, then switch context
101 FBlockState: TMicroThreadBlockState;
102 FBlockTime: TDateTime;
103 FState: TMicroThreadState;
104 FStatePending: TMicroThreadState;
105 FScheduler: TMicroThreadScheduler;
106 FManager: TMicroThreadManager;
107 FId: Integer;
108 procedure CallExecute;
109 function GetStackUsed: Integer;
110 function GetTerminated: Boolean;
111 procedure SetManager(const AValue: TMicroThreadManager);
112 procedure SetScheduler(const AValue: TMicroThreadScheduler);
113 procedure CheckStack;
114 public
115 Name: string;
116 Priority: Integer;
117 Completion: Single; // Can be used for progress information usually in range <0, 1>
118 procedure Execute; virtual;
119
120 procedure Yield;
121 procedure MTSleep(Duration: TDateTime); // No conflicting name to global Sleep procedure
122 procedure WaitFor;
123 procedure Terminate;
124 procedure Start;
125 procedure Resume;
126 procedure Suspend;
127 procedure Synchronize(AMethod: TThreadMethod);
128
129 constructor Create(CreateSuspended: Boolean;
130 const StackSize: SizeUInt = DefaultStackSize);
131 destructor Destroy; override;
132 property Id: Integer read FId;
133 property State: TMicroThreadState read FState;
134 property BlockState: TMicroThreadBlockState read FBlockState;
135 property ExecutionTime: TDateTime read FExecutionTime;
136 property ExecutionCount: Integer read FExecutionCount;
137 property FreeOnTerminate: Boolean read FFreeOnTerminate
138 write FFreeOnTerminate;
139 property Terminated: Boolean read GetTerminated;
140 property Scheduler: TMicroThreadScheduler read FScheduler
141 write SetScheduler;
142 property Manager: TMicroThreadManager read FManager write SetManager;
143 property StackUsed: Integer read GetStackUsed;
144 property BasePointer: Pointer read FBasePointer;
145 end;
146
147 TMicroThreadMethod = procedure(MicroThread: TMicroThread) of object;
148
149 { TMicroThreadSimple }
150
151 TMicroThreadSimple = class(TMicroThread)
152 Method: TProcedureOfObject;
153 procedure Execute; override;
154 end;
155
156 TMicroThreadThreadState = (ttsReady, ttsRunning, ttsTerminated);
157
158 { TMicroThreadThread }
159
160 TMicroThreadThread = class(TThreadEx)
161 Manager: TMicroThreadManager;
162 State: TMicroThreadThreadState;
163 procedure Execute; override;
164 constructor Create(CreateSuspended: Boolean;
165 const StackSize: SizeUInt = DefaultStackSize);
166 destructor Destroy; override;
167 end;
168
169 { TMicroThreadManager }
170
171 TMicroThreadManager = class
172 private
173 FStack: Pointer;
174 FStackSize: Pointer;
175 FStackPointer: Pointer;
176 FBasePointer: Pointer;
177 FExceptObjectStack: PExceptObject;
178 FExceptAddrStack: PExceptAddr;
179 FExecuteCount: Integer;
180 FExecutedCount: Integer;
181 FCurrentMicroThread: TMicroThread;
182 FScheduler: TMicroThreadScheduler;
183 FThread: TMicroThreadThread;
184 FId: Integer;
185 FLoopDuration: TDateTime;
186 FLoopStart: TDateTime;
187 procedure SetCurrentMicroThread(const AValue: TMicroThread);
188 function Execute(Count: Integer): Integer;
189 property CurrentMicroThread: TMicroThread read FCurrentMicroThread
190 write SetCurrentMicroThread;
191 public
192 procedure Yield;
193 procedure Synchronize(AMethod: TThreadMethod);
194 constructor Create;
195 destructor Destroy; override;
196 property Scheduler: TMicroThreadScheduler read FScheduler;
197 property LoopDuration: TDateTime read FLoopDuration;
198 function GetCurrentMicroThreadId: Integer;
199 end;
200
201 TMicroThreadSchedulerState = (ssStopped, ssRunning, ssTerminating);
202
203 { TMicroThreadScheduler }
204
205 TMicroThreadScheduler = class
206 private
207 FActive: Boolean;
208 FThreadPool: TObjectList;
209 FThreadPoolLock: TCriticalSection;
210 FThreadPoolSize: Integer;
211 FRoundRobinIndex: Integer;
212 FMicroThreadLastId: Integer;
213 FCriticalSectionLastId: Integer;
214 FMainThreadTerminated: Boolean;
215 FMicroThreads: TObjectList; // TList<TMicroThread>
216 FMainThreadManager: TMicroThreadManager;
217 FMicroThreadsLock: TCriticalSection;
218 FState: TMicroThreadSchedulerState;
219 FUseMainThread: Boolean;
220 FMainThreadStarter: TTimer;
221 FEvents: TObjectList;
222 FMainThreadOutsideStart: TDateTime;
223 FMainThreadOutsideDuration: TDateTime;
224 function GetMicroThreadCount: Integer;
225 function GetThreadPoolCount: Integer;
226 function GetThreadPoolSize: Integer;
227 procedure SetActive(const AValue: Boolean);
228 procedure SetThreadPoolSize(const AValue: Integer);
229 procedure GetNextMicroThread(Manager: TMicroThreadManager);
230 procedure ReleaseMicroThread(MicroThread: TMicroThread);
231 procedure SetUseMainThread(const AValue: Boolean);
232 procedure Start;
233 procedure Stop;
234 procedure UpdateThreadPoolSize;
235 procedure MainThreadStart(Sender: TObject);
236 procedure MainThreadTick(Data: PtrInt);
237 function GetCriticalSectionId: Integer;
238 public
239 BurstCount: Integer;
240 function Add(MicroThread: TMicroThread): Integer;
241 function AddMethod(Method: TProcedureOfObject;
242 WaitForFinish: Boolean = True; ThreadName: string = ''): Integer;
243 procedure Remove(MicroThread: TMicroThread; Free: Boolean = True);
244 constructor Create;
245 destructor Destroy; override;
246 property ThreadPool: TObjectList read FThreadPool;
247 property ThreadPoolLock: TCriticalSection read FThreadPoolLock;
248 property ThreadPoolSize: Integer read GetThreadPoolSize
249 write SetThreadPoolSize;
250 property ThreadPoolCount: Integer read GetThreadPoolCount;
251 property MicroThreads: TObjectList read FMicroThreads;
252 property MicroThreadsLock: TCriticalSection read FMicroThreadsLock;
253 property MicroThreadCount: Integer read GetMicroThreadCount;
254 property MainThreadManager: TMicroThreadManager read FMainThreadManager;
255 property Active: Boolean read FActive write SetActive;
256 property UseMainThread: Boolean read FUseMainThread write SetUseMainThread;
257 property MainThreadOutsideDuration: TDateTime read FMainThreadOutsideDuration;
258 end;
259
260 TMicroThreadList = class(TComponent)
261 private
262 public
263 Form: TMicroThreadListForm;
264 constructor Create(AOwner: TComponent); override;
265 end;
266
267 TMicroThreadExceptionEvent = procedure(Sender: TObject; E: Exception) of object;
268
269var
270 MainScheduler: TMicroThreadScheduler;
271 ExceptionHandler: TMicroThreadExceptionEvent;
272
273const
274 MicroThreadStateText: array[TMicroThreadState] of string = ('None', 'Waiting',
275 'Running', 'Blocked', 'Suspended');
276 MicroThreadBlockStateText: array[TMicroThreadBlockState] of string = ('None',
277 'Sleeping', 'WaitFor', 'Terminating', 'Terminated', 'CriticalSection');
278 MicroThreadThreadStateText: array[TMicroThreadThreadState] of string = (
279 'Ready', 'Running', 'Terminated');
280
281function GetCurrentMicroThread: TMicroThread;
282procedure MTSleep(Duration: TDateTime);
283procedure MTSynchronize(Method: TThreadMethod);
284procedure Log(Text: string);
285procedure Register;
286
287const
288 LogFileName: string = 'Log.txt';
289 LogEnabled: Boolean = False;
290
291implementation
292
293//var
294// StaticManagers: TObjectList; // TList<TMicroThreadManager>;
295// StaticManager: TMicroThreadManager;
296// StaticMicroThread: TMicroThread;
297
298procedure Register;
299begin
300 RegisterComponents('MicroThreading', [TMicroThreadList]);
301end;
302
303function GetMicroThreadId: Integer;
304var
305 MT: TMicroThread;
306begin
307 MT := GetCurrentMicroThread;
308 if Assigned(MT) then Result := MT.Id
309 else Result := -1;
310end;
311
312function GetCurrentMicroThread: TMicroThread;
313var
314 Thread: TThread;
315begin
316 with MainScheduler do
317 try
318 FMicroThreadsLock.Acquire;
319 if MainThreadID = ThreadID then Result := MainThreadManager.CurrentMicroThread
320 else begin
321 Thread := TThreadEx.CurrentThread;
322 if Assigned(Thread) then
323 Result := TMicroThreadThread(Thread).Manager.CurrentMicroThread
324 else Result := nil;
325 end;
326 finally
327 FMicroThreadsLock.Release;
328 end;
329end;
330
331procedure MTSleep(Duration: TDateTime);
332var
333 MT: TMicroThread;
334begin
335 MT := GetCurrentMicroThread;
336 if Assigned(MT) then MT.MTSleep(Duration)
337 else raise EMicroThreadError.Create(SNotInMicroThread);
338end;
339
340procedure MTSynchronize(Method: TThreadMethod);
341var
342 Thread: TThread;
343begin
344 if GetCurrentThreadId <> MainThreadID then begin
345 Thread := TThreadEx.CurrentThread;
346 if Assigned(Thread) then TThread.Synchronize(Thread, Method)
347 else raise EMicroThreadError.Create(Format(SCantDetermineThreadID, [GetCurrentThreadId]));
348 end else Method;
349end;
350
351var
352 LogLock: TCriticalSection;
353
354procedure Log(Text: string);
355var
356 LogFile: TextFile;
357begin
358 if LogEnabled then
359 try
360 LogLock.Acquire;
361 AssignFile(LogFile, LogFileName);
362 if FileExists(LogFileName) then Append(LogFile)
363 else Rewrite(LogFile);
364 WriteLn(LogFile, Text);
365 CloseFile(LogFile);
366 finally
367 LogLock.Release;
368 end;
369end;
370
371{ TMicroThreadCriticalSection }
372
373procedure TMicroThreadCriticalSection.Acquire;
374var
375 MT: TMicroThread;
376 Event: TMicroThreadEvent;
377begin
378 MT := GetCurrentMicroThread;
379 if Assigned(MT) then
380 try
381 MainScheduler.FMicroThreadsLock.Acquire;
382 {$IFDEF DebugCriticalSection}
383 Log('CriticalSection(' + IntToStr(FId) + ') Acquire start, MicroThread: ' + IntToStr(MT.Id) + '(' + MT.Name + ')');
384 {$ENDIF}
385 //Lock.Acquire;
386 Inc(FCounter);
387 {$IFDEF DebugCriticalSection}
388 Log('CriticalSection(' + IntToStr(FId) + ') Acquire Counter: ' + IntToStr(FCounter));
389 {$ENDIF}
390 if FCounter > 1 then begin
391 FMicroThreads.Add(MT);
392 MT.FBlockState := tbsCriticalSection;
393 MT.FStatePending := tsBlocked;
394 try
395 //Lock.Release;
396 MainScheduler.FMicroThreadsLock.Release;
397 MT.Yield;
398 finally
399 MainScheduler.FMicroThreadsLock.Acquire;
400 //Lock.Acquire;
401 end;
402 end;
403 finally
404 {$IFDEF DebugCriticalSection}
405 Log('CriticalSection(' + IntToStr(FId) + ') Acquire end: Id:' + IntToStr(MT.Id) + ' Name:' + MT.Name);
406 {$ENDIF}
407 //Lock.Release;
408 MainScheduler.FMicroThreadsLock.Release;
409 end else
410 raise EMicroThreadError.Create(SNotInMicroThread);
411end;
412
413procedure TMicroThreadCriticalSection.Release;
414var
415 MT: TMicroThread;
416begin
417 try
418 MainScheduler.FMicroThreadsLock.Acquire;
419 {$IFDEF DebugCriticalSection}
420 MT := GetCurrentMicroThread;
421 if Assigned(MT) then
422 Log('CriticalSection(' + IntToStr(FId) + ') Release start: Id:' + IntToStr(MT.Id) + ' Name:' + MT.Name)
423 else Log('CriticalSection(' + IntToStr(FId) + ') Release start: no microthread');
424 {$ENDIF}
425 //Lock.Acquire;
426 Dec(FCounter);
427 {$IFDEF DebugCriticalSection}
428 Log('CriticalSection(' + IntToStr(FId) + ') Release Counter: ' + IntToStr(FCounter));
429 {$ENDIF}
430 if FMicroThreads.Count > 0 then begin
431 // Release one waiting micro thread
432 TMicroThread(FMicroThreads[0]).FState := tsWaiting;
433 TMicroThread(FMicroThreads[0]).FStatePending := tsNone;
434 FMicroThreads.Delete(0);
435 end;
436 if FCounter < 0 then
437 raise EMicroThreadError.Create(SCriticalSectionDecrement);
438 finally
439 {$IFDEF DebugCriticalSection}
440 Log('CriticalSection(' + IntToStr(FId) + ') Release end: Id:' + IntToStr(MT.Id) + ' Name:' + MT.Name);
441 {$ENDIF}
442 //Lock.Release;
443 MainScheduler.FMicroThreadsLock.Release;
444 end;
445end;
446
447constructor TMicroThreadCriticalSection.Create;
448begin
449 //Lock := TCriticalSection.Create;
450 FMicroThreads := TObjectList.Create;
451 FMicroThreads.OwnsObjects := False;
452 FId := MainScheduler.GetCriticalSectionId;
453end;
454
455destructor TMicroThreadCriticalSection.Destroy;
456begin
457 try
458 MainScheduler.FMicroThreadsLock.Acquire;
459 //Lock.Acquire;
460
461 while FMicroThreads.Count > 0 do begin
462 // Release one waiting micro thread and lower counter
463 TMicroThread(FMicroThreads[0]).FState := tsWaiting;
464 FMicroThreads.Delete(0);
465 end;
466 finally
467 //Lock.Release;
468 MainScheduler.FMicroThreadsLock.Release;
469 end;
470 FMicroThreads.Free;
471 //Lock.Free;
472 inherited Destroy;
473end;
474
475{ TMicroThreadList }
476
477constructor TMicroThreadList.Create(AOwner: TComponent);
478begin
479 inherited;
480 Form := TMicroThreadListForm.Create(Self);
481end;
482
483{ TMicroThreadMethod }
484
485procedure TMicroThreadEvent.SetEvent;
486var
487 I: Integer;
488begin
489 try
490 MainScheduler.FMicroThreadsLock.Acquire;
491 for I := 0 to FMicroThreads.Count - 1 do
492 with TMicroThread(FMicroThreads[I]) do begin
493 if (FState = tsBlocked) and (FBlockState = tbsWaitFor) then begin
494 FState := tsWaiting;
495 FBlockTime := 0; // Set signaled state using block time variable
496 end;
497 end;
498 if not FAutoReset then FSignaled := True;
499 finally
500 MainScheduler.FMicroThreadsLock.Release;
501 end;
502end;
503
504procedure TMicroThreadEvent.ResetEvent;
505begin
506 FSignaled := False;
507end;
508
509function TMicroThreadEvent.WaitFor(Duration: TDateTime): TWaitResult;
510var
511 MT: TMicroThread;
512begin
513 MT := GetCurrentMicroThread;
514 if Assigned(MT) then begin
515 try
516 FMicroThreadsLock.Acquire;
517 if Signaled then begin
518 Result := wrSignaled;
519 Exit;
520 end;
521 FMicroThreads.Add(MT);
522 MT.FBlockTime := NowPrecise + Duration;
523 MT.FBlockState := tbsWaitFor;
524 MT.FStatePending := tsBlocked;
525 finally
526 FMicroThreadsLock.Release;
527 end;
528 MT.Yield;
529 if (MT.FBlockTime <> 0) and (MT.FBlockTime < NowPrecise) then
530 Result := wrTimeout else Result := wrSignaled;
531
532 try
533 FMicroThreadsLock.Acquire;
534 FMicroThreads.Remove(Self);
535 finally
536 FMicroThreadsLock.Release;
537 end
538 end else
539 raise EMicroThreadError.Create(SNotInMicroThread);
540end;
541
542constructor TMicroThreadEvent.Create;
543begin
544 FAutoReset := True;
545 FMicroThreads := TObjectList.Create;
546 FMicroThreads.OwnsObjects := False;
547 FMicroThreadsLock := TCriticalSection.Create;
548 MainScheduler.FEvents.Add(Self);
549end;
550
551destructor TMicroThreadEvent.Destroy;
552begin
553 try
554 MainScheduler.FEvents.OwnsObjects := False;
555 MainScheduler.FEvents.Delete(MainScheduler.FEvents.IndexOf(Self));
556 finally
557 MainScheduler.FEvents.OwnsObjects := True;
558 end;
559 FMicroThreadsLock.Free;
560 FMicroThreads.Free;
561 inherited Destroy;
562end;
563
564{ TMicroThreadManager }
565
566procedure TMicroThreadManager.SetCurrentMicroThread(const AValue: TMicroThread
567 );
568begin
569 if FCurrentMicroThread = AValue then Exit;
570 if Assigned(FCurrentMicroThread) then
571 FCurrentMicroThread.FManager := nil;
572 FCurrentMicroThread := AValue;
573 if Assigned(FCurrentMicroThread) then
574 FCurrentMicroThread.FManager := Self;
575end;
576
577function TMicroThreadManager.Execute(Count: Integer): Integer;
578begin
579 //FLoopStart := NowPrecise;
580 FStack := StackBottom;
581 FStackSize := StackBottom + StackLength;
582 FExecuteCount := Count;
583 FExecutedCount := 0;
584 Yield;
585 Result := FExecutedCount;
586 //FLoopDuration := NowPrecise - FLoopStart;
587end;
588
589procedure TMicroThreadManager.Yield;
590begin
591 if Assigned(FCurrentMicroThread) then begin
592 FCurrentMicroThread.FExecutionEndTime := NowPrecise;
593 FCurrentMicroThread.FExecutionTime := FCurrentMicroThread.FExecutionTime +
594 (FCurrentMicroThread.FExecutionEndTime - FCurrentMicroThread.FExecutionStartTime);
595
596 FCurrentMicroThread.FExceptObjectStack := GetExceptionObjectStack;
597 FCurrentMicroThread.FExceptAddrStack := GetExceptionAddrStack;
598 asm
599 // Store microthread stack
600 mov ecx, Self
601 mov eax, [ecx].TMicroThreadManager.FCurrentMicroThread
602 mov edx, esp
603 mov ebx, ebp
604 mov [eax].TMicroThread.FStackPointer, edx
605 mov [eax].TMicroThread.FBasePointer, ebx
606
607 // Restore manager stack
608 mov edx, [ecx].TMicroThreadManager.FStackPointer
609 mov ebx, [ecx].TMicroThreadManager.FBasePointer
610 mov esp, edx
611 mov ebp, ebx
612 end;
613 SetExceptionObjectStack(FExceptObjectStack);
614 SetExceptionAddrStack(FExceptAddrStack);
615 FCurrentMicroThread.CheckStack;
616 FScheduler.ReleaseMicroThread(FCurrentMicroThread);
617 end;
618
619 if FExecutedCount < FExecuteCount then begin
620 FScheduler.GetNextMicroThread(Self);
621 if Assigned(FCurrentMicroThread) then begin
622 Inc(FExecutedCount);
623 FCurrentMicroThread.FExecutionStartTime := NowPrecise;
624 FExceptObjectStack := GetExceptionObjectStack;
625 FExceptAddrStack := GetExceptionAddrStack;
626 asm
627 // Store manager stack
628 mov eax, Self
629 mov edx, esp
630 mov ebx, ebp
631 mov [eax].TMicroThreadManager.FStackPointer, edx
632 mov [eax].TMicroThreadManager.FBasePointer, ebx
633 end;
634 if not FCurrentMicroThread.FExecuted then begin
635 // First time micro thread execution
636 FCurrentMicroThread.FExecuted := True;
637 SetExceptionObjectStack(FCurrentMicroThread.FExceptObjectStack);
638 SetExceptionAddrStack(FCurrentMicroThread.FExceptAddrStack);
639 asm
640 // Restore microthread stack
641 mov ecx, Self
642 mov eax, [ecx].TMicroThreadManager.FCurrentMicroThread
643 mov edx, [eax].TMicroThread.FStackPointer
644 mov ebx, [eax].TMicroThread.FBasePointer
645 mov esp, edx
646 mov ebp, ebx
647 // We want to call virtual method Execute
648 // but methods can be called only statically from assembler
649 // Then static method CallExecute is calling virtual method Execute
650 call TMicroThread.CallExecute
651
652 // Restore manager stack
653 // ecx register is set by CallExecute to running micro thread
654 mov eax, [ecx].TMicroThread.FManager
655 mov edx, [eax].TMicroThreadManager.FStackPointer
656 mov ebx, [eax].TMicroThreadManager.FBasePointer
657 mov esp, edx
658 mov ebp, ebx
659 end;
660 SetExceptionObjectStack(FExceptObjectStack);
661 SetExceptionAddrStack(FExceptAddrStack);
662 FCurrentMicroThread.CheckStack;
663 FCurrentMicroThread.FExecutionEndTime := NowPrecise;
664 FCurrentMicroThread.FExecutionTime := FCurrentMicroThread.FExecutionTime +
665 (FCurrentMicroThread.FExecutionEndTime - FCurrentMicroThread.FExecutionStartTime);
666 FCurrentMicroThread.FStatePending := tsBlocked;
667 FCurrentMicroThread.FBlockState := tbsTerminated;
668 if FCurrentMicroThread.FFreeOnTerminate then begin
669 // Microthread is finished, remove it from queue
670 with FScheduler do
671 try
672 FMicroThreadsLock.Acquire;
673 FMicroThreads.Delete(FMicroThreads.IndexOf(FCurrentMicroThread));
674 FCurrentMicroThread := nil;
675 finally
676 FMicroThreadsLock.Release;
677 end;
678 end else begin
679 FScheduler.ReleaseMicroThread(FCurrentMicroThread);
680 end;
681 //FCurrentMicroThread.FManager := nil;
682 //FScheduler.ReleaseMicroThread(FCurrentMicroThread);
683 //FCurrentMicroThread := nil;
684 end else
685 begin
686 // Regular selected microthread execution
687 FCurrentMicroThread.CheckStack;
688 SetExceptionObjectStack(FCurrentMicroThread.FExceptObjectStack);
689 SetExceptionAddrStack(FCurrentMicroThread.FExceptAddrStack);
690 asm
691 // Restore microthread stack
692 mov ecx, Self
693 mov eax, [ecx].TMicroThreadManager.FCurrentMicroThread
694 mov edx, [eax].TMicroThread.FStackPointer
695 mov ebx, [eax].TMicroThread.FBasePointer
696 mov esp, edx
697 mov ebp, ebx
698 end;
699 end;
700 end;
701 end;
702end;
703
704procedure TMicroThreadManager.Synchronize(AMethod: TThreadMethod);
705begin
706 if Assigned(FThread) then
707 FThread.Synchronize(FThread, AMethod)
708 else AMethod;
709end;
710
711constructor TMicroThreadManager.Create;
712begin
713 FCurrentMicroThread := nil;
714 FThread := nil;
715end;
716
717destructor TMicroThreadManager.Destroy;
718begin
719 inherited Destroy;
720end;
721
722function TMicroThreadManager.GetCurrentMicroThreadId: Integer;
723begin
724 try
725 FScheduler.FMicroThreadsLock.Acquire;
726 if Assigned(FCurrentMicroThread) then
727 Result := FCurrentMicroThread.Id
728 else Result := 0;
729 finally
730 FScheduler.FMicroThreadsLock.Release;
731 end;
732end;
733
734{ TMicroThreadThread }
735
736procedure TMicroThreadThread.Execute;
737var
738 ExecutedCount: Integer;
739begin
740 try
741 repeat
742 State := ttsRunning;
743 ExecutedCount := Manager.Execute(MainScheduler.BurstCount);
744 State := ttsReady;
745 if ExecutedCount = 0 then Sleep(1);
746 until Terminated;
747 except
748 on E: Exception do
749 if Assigned(ExceptionHandler) then ExceptionHandler(Self, E);
750 end;
751end;
752
753constructor TMicroThreadThread.Create(CreateSuspended: Boolean;
754 const StackSize: SizeUInt);
755begin
756 inherited;
757 State := ttsReady;
758 Manager := TMicroThreadManager.Create;
759end;
760
761destructor TMicroThreadThread.Destroy;
762begin
763 Manager.Free;
764 inherited Destroy;
765end;
766
767{ TMicroThreadSimple }
768
769procedure TMicroThreadSimple.Execute;
770begin
771 inherited Execute;
772 if Assigned(Method) then Method
773 else raise EMicroThreadError.Create(SMethodNotAssigned);
774end;
775
776{ TMicroThread }
777
778procedure TMicroThread.CallExecute;
779begin
780 try
781 Execute;
782 except
783 on E: Exception do
784 if Assigned(ExceptionHandler) then
785 if GetCurrentThreadId = MainThreadID then ExceptionHandler(Self, E)
786 else ExceptionHandler(TThreadEx.CurrentThread, E);
787 end;
788 asm
789 mov ecx, Self
790 end;
791end;
792
793function TMicroThread.GetStackUsed: Integer;
794begin
795 Result := FStack + FStackSize - FStackPointer;
796end;
797
798function TMicroThread.GetTerminated: Boolean;
799begin
800 Result := (FState = tsBlocked) and (FBlockState = tbsTerminated);
801end;
802
803procedure TMicroThread.SetManager(const AValue: TMicroThreadManager);
804begin
805 if FManager = AValue then Exit;
806 if Assigned(FManager) then FManager.CurrentMicroThread := nil;
807 FManager := AValue;
808 if Assigned(FManager) then FManager.CurrentMicroThread := Self;
809end;
810
811procedure TMicroThread.SetScheduler(const AValue: TMicroThreadScheduler);
812begin
813 FScheduler := AValue;
814end;
815
816procedure TMicroThread.CheckStack;
817begin
818 if not ((FStackPointer > FStack) and (FStackPointer < (FStack + FStackSize)))
819 then raise EStackOverflow.Create(Format(SStackOverflow,
820 [FId, IntToHex(Integer(FStackPointer), 8), IntToHex(Integer(FStack), 8),
821 IntToHex(Integer(FStack + FStackSize), 8)]));
822end;
823
824procedure TMicroThread.Execute;
825begin
826
827end;
828
829procedure TMicroThread.Yield;
830begin
831 if not Assigned(FManager) then
832 raise EMicroThreadError.Create(SManagerReferenceLost);
833 if FStatePending = tsNone then
834 FStatePending := tsWaiting;
835 FManager.Yield;
836end;
837
838procedure TMicroThread.WaitFor;
839begin
840 if GetMicroThreadId <> -1 then begin
841 // Called from another microthread
842 while not ((FState = tsBlocked) and (FBlockState = tbsTerminated)) do begin
843 MTSleep(1 * OneMillisecond);
844 end;
845 end else begin
846 // Called directly from main thread
847 while not ((FState = tsBlocked) and (FBlockState = tbsTerminated)) do begin
848 Sleep(1);
849 Application.ProcessMessages;
850 end;
851 end;
852end;
853
854procedure TMicroThread.MTSleep(Duration: TDateTime);
855begin
856 FBlockTime := NowPrecise + Duration;
857 FBlockState := tbsSleeping;
858 FStatePending := tsBlocked;
859 Yield;
860end;
861
862constructor TMicroThread.Create(CreateSuspended: Boolean;
863 const StackSize: SizeUInt = DefaultStackSize);
864begin
865 // Setup stack
866 FStackSize := StackSize;
867 FStack := GetMem(FStackSize);
868 FBasePointer := 0; // FStack + FStackSize - SizeOf(Pointer);
869 FStackPointer := FStack + FStackSize - 2 * SizeOf(Pointer);
870 FillChar(FStackPointer^, 2 * SizeOf(Pointer), 0);
871
872 FExecutionTime := 0;
873 FState := tsWaiting;
874 FStatePending := tsNone;
875 if CreateSuspended then begin
876 FState := tsSuspended;
877 end;
878 FFreeOnTerminate := True;
879 MainScheduler.Add(Self);
880end;
881
882procedure TMicroThread.Terminate;
883begin
884 FBlockState := tbsTerminated;
885 FStatePending := tsBlocked;
886end;
887
888procedure TMicroThread.Start;
889begin
890 FState := tsWaiting;
891end;
892
893destructor TMicroThread.Destroy;
894begin
895 MainScheduler.Remove(Self, False);
896 //Terminate;
897 //WaitFor;
898 FreeMem(FStack);
899 inherited Destroy;
900end;
901
902procedure TMicroThread.Resume;
903begin
904 if FState = tsSuspended then
905 FStatePending := tsWaiting;
906end;
907
908procedure TMicroThread.Suspend;
909var
910 MT: TMicroThread;
911begin
912 FStatePending := tsSuspended;
913 MT := GetCurrentMicroThread;
914 if Assigned(MT) then Yield;
915end;
916
917procedure TMicroThread.Synchronize(AMethod: TThreadMethod);
918begin
919 FManager.Synchronize(AMethod);
920end;
921
922
923{ TMicroThreadScheduler }
924
925function TMicroThreadScheduler.Add(MicroThread: TMicroThread): Integer;
926begin
927 try
928 FMicroThreadsLock.Acquire;
929 Inc(FMicroThreadLastId);
930 MicroThread.FId := FMicroThreadLastId;
931 MicroThread.FScheduler := Self;
932 Result := FMicroThreads.Add(MicroThread);
933 finally
934 FMicroThreadsLock.Release;
935 end;
936end;
937
938function TMicroThreadScheduler.AddMethod(Method: TProcedureOfObject;
939 WaitForFinish: Boolean = True; ThreadName: string = ''): Integer;
940var
941 NewMicroThread: TMicroThreadSimple;
942 CurrentMT: TMicroThread;
943begin
944 try
945 NewMicroThread := TMicroThreadSimple.Create(True);
946 NewMicroThread.Name := ThreadName;
947 NewMicroThread.Method := Method;
948 NewMicroThread.FScheduler := Self;
949 NewMicroThread.FreeOnTerminate := not WaitForFinish;
950 NewMicroThread.Start;
951 if WaitForFinish then begin
952 CurrentMT := GetCurrentMicroThread;
953 try
954 FMicroThreadsLock.Acquire;
955 while not ((NewMicroThread.FState = tsBlocked) and
956 (NewMicroThread.FBlockState = tbsTerminated)) do begin
957 try
958 FMicroThreadsLock.Release;
959 if Assigned(CurrentMT) then CurrentMT.MTSleep(1 * OneMillisecond)
960 else begin
961 Sleep(1);
962 Application.ProcessMessages;
963 end;
964 finally
965 FMicroThreadsLock.Acquire;
966 end;
967 end;
968 finally
969 FMicroThreadsLock.Release;
970 end;
971 end;
972 finally
973 if WaitForFinish then NewMicroThread.Free;
974 end;
975end;
976
977procedure TMicroThreadScheduler.Remove(MicroThread: TMicroThread;
978 Free: Boolean = True);
979begin
980 try
981 FMicroThreadsLock.Acquire;
982 if not Free then FMicroThreads.OwnsObjects := False;
983 FMicroThreads.Remove(MicroThread);
984 FMicroThreads.OwnsObjects := True;
985 finally
986 FMicroThreadsLock.Release;
987 end;
988end;
989
990constructor TMicroThreadScheduler.Create;
991begin
992 FEvents := TObjectList.Create;
993 FMainThreadStarter := TTimer.Create(nil);
994 FMainThreadStarter.Enabled := False;
995 FMainThreadStarter.Interval := 1;
996 FMainThreadStarter.OnTimer := MainThreadStart;
997 FMainThreadTerminated := True;
998 FMicroThreadsLock := TCriticalSection.Create;
999 FMicroThreads := TObjectList.Create;
1000 FThreadPool := TObjectList.Create;
1001 FThreadPoolLock := TCriticalSection.Create;
1002 FRoundRobinIndex := -1;
1003 FMainThreadManager := TMicroThreadManager.Create;
1004 FMainThreadManager.FScheduler := Self;
1005 UseMainThread := False;
1006 BurstCount := 50;
1007end;
1008
1009destructor TMicroThreadScheduler.Destroy;
1010begin
1011 Active := False;
1012 FMainThreadStarter.Free;
1013 FMainThreadManager.Free;
1014 FThreadPool.Free;
1015 FThreadPoolLock.Free;
1016 FMicroThreads.Free;
1017 FMicroThreadsLock.Free;
1018 FEvents.Free;
1019 inherited Destroy;
1020end;
1021
1022procedure TMicroThreadScheduler.Start;
1023begin
1024 UpdateThreadPoolSize;
1025 FState := ssRunning;
1026 if FUseMainThread then
1027 FMainThreadStarter.Enabled := True;
1028end;
1029
1030procedure TMicroThreadScheduler.Stop;
1031var
1032 I: Integer;
1033begin
1034 FState := ssTerminating;
1035 // Wait for all thread managers to finish
1036 try
1037 FThreadPoolLock.Acquire;
1038 for I := 0 to FThreadPool.Count - 1 do begin
1039 TMicroThreadThread(FThreadPool[I]).Terminate;
1040 end;
1041 for I := 0 to FThreadPool.Count - 1 do begin
1042 TMicroThreadThread(FThreadPool[I]).WaitFor;
1043 end;
1044 FThreadPool.Clear;
1045 finally
1046 FThreadPoolLock.Release;
1047 end;
1048
1049 repeat
1050 Application.ProcessMessages;
1051 Sleep(1);
1052 until FMainThreadTerminated or (not FUseMainThread);
1053 FState := ssStopped;
1054end;
1055
1056procedure TMicroThreadScheduler.UpdateThreadPoolSize;
1057var
1058 NewThread: TMicroThreadThread;
1059begin
1060 try
1061 FThreadPoolLock.Acquire;
1062 if FThreadPoolSize > FThreadPool.Count then begin
1063 FThreadPool.Capacity := FThreadPoolSize;
1064 while FThreadPool.Count < FThreadPoolSize do begin
1065 NewThread := TMicroThreadThread.Create(True);
1066 NewThread.Manager.FScheduler := Self;
1067 NewThread.Manager.FId := FThreadPool.Count + 1;
1068 NewThread.Manager.FThread := NewThread;
1069 //NewThread.OnTerminate := PoolThreadTerminated;
1070 NewThread.FreeOnTerminate := False;
1071 ThreadPool.Add(NewThread);
1072 NewThread.Resume;
1073 end;
1074 end else begin
1075 while FThreadPool.Count > FThreadPoolSize do begin
1076 TMicroThreadThread(FThreadPool[FThreadPool.Count - 1]).Terminate;
1077 TMicroThreadThread(FThreadPool[FThreadPool.Count - 1]).WaitFor;
1078 FThreadPool.Delete(FThreadPool.Count - 1);
1079 end;
1080 end;
1081 finally
1082 FThreadPoolLock.Release;
1083 end;
1084end;
1085
1086procedure TMicroThreadScheduler.MainThreadStart(Sender: TObject);
1087begin
1088 FMainThreadStarter.Enabled := False;
1089 FMainThreadTerminated := False;
1090 Application.QueueAsyncCall(MainThreadTick, 0);
1091end;
1092
1093procedure TMicroThreadScheduler.MainThreadTick(Data: PtrInt);
1094var
1095 Executed: Integer;
1096 StartTime: TDateTime;
1097 Duration: TDateTime;
1098begin
1099// try
1100 FMainThreadOutsideDuration := NowPrecise - FMainThreadOutsideStart;
1101 BurstCount := 1;
1102 Duration := 50 * OneMillisecond;
1103 StartTime := Now;
1104 Executed := -1;
1105 while (Executed <> 0) and ((Now - StartTime) < Duration) do begin
1106 Executed := FMainThreadManager.Execute(BurstCount);
1107 end;
1108 //if Executed = 0 then Sleep(1);
1109 // If not terminated then queue next tick else terminate
1110 if (FState = ssRunning) and FUseMainThread then
1111 Application.QueueAsyncCall(MainThreadTick, 0)
1112 else FMainThreadTerminated := True;
1113// except
1114// FMainThreadTerminated := True;
1115// raise;
1116// end;
1117 FMainThreadOutsideStart := NowPrecise;
1118end;
1119
1120function TMicroThreadScheduler.GetCriticalSectionId: Integer;
1121begin
1122 Inc(FCriticalSectionLastId);
1123 Result := FCriticalSectionLastId;
1124end;
1125
1126procedure TMicroThreadScheduler.GetNextMicroThread(Manager: TMicroThreadManager);
1127var
1128 I: Integer;
1129 CurrentTime: TDateTime;
1130 Selected: TMicroThread;
1131begin
1132 try
1133 FMicroThreadsLock.Acquire;
1134 CurrentTime := NowPrecise;
1135 I := 0;
1136 Selected := nil;
1137 Inc(FRoundRobinIndex);
1138 if FRoundRobinIndex >= FMicroThreads.Count then
1139 FRoundRobinIndex := 0;
1140 while (I < FMicroThreads.Count) do
1141 with TMicroThread(FMicroThreads[FRoundRobinIndex]) do begin
1142 if (FState = tsWaiting) then Break
1143 else
1144 if (FState = tsBlocked) then begin
1145 // Wakeup sleeping threads
1146 if (FBlockState = tbsSleeping) and
1147 (FBlockTime < CurrentTime) then begin
1148 FState := tsWaiting;
1149 FBlockState := tbsNone;
1150 Break;
1151 end
1152 else
1153 // Unblock event waiting threads
1154 if (FBlockState = tbsWaitFor) and
1155 (FBlockTime < CurrentTime) then begin
1156 FState := tsWaiting;
1157 FBlockState := tbsNone;
1158 Break;
1159 end;
1160 end;
1161 // Go to next thread
1162 Inc(I);
1163 FRoundRobinIndex := (FRoundRobinIndex + 1) mod FMicroThreads.Count;
1164 end;
1165 if I < FMicroThreads.Count then begin
1166// if Assigned(Manager.FCurrentMicroThread) then
1167// raise EMicroThreadError.Create(SManagerMicroThreadRunning);
1168 Selected := TMicroThread(FMicroThreads[FRoundRobinIndex]);
1169 Selected.FState := tsRunning;
1170 Inc(Selected.FExecutionCount);
1171 end;
1172 Manager.CurrentMicroThread := Selected;
1173 finally
1174 FMicroThreadsLock.Release;
1175 end;
1176end;
1177
1178procedure TMicroThreadScheduler.ReleaseMicroThread(MicroThread: TMicroThread);
1179begin
1180// if not Assigned(MicroThread) then
1181// raise EMicroThreadError.Create(SNilThreadReference);
1182 try
1183 FMicroThreadsLock.Acquire;
1184 if MicroThread.FStatePending <> tsNone then begin
1185 MicroThread.FState := MicroThread.FStatePending;
1186 MicroThread.FStatePending := tsNone;
1187 end;
1188 MicroThread.Manager := nil;
1189 finally
1190 FMicroThreadsLock.Release;
1191 end;
1192end;
1193
1194procedure TMicroThreadScheduler.SetUseMainThread(const AValue: Boolean);
1195begin
1196 if FUseMainThread = AValue then Exit;
1197 FUseMainThread := AValue;
1198 if FState = ssRunning then begin
1199 if AValue then FMainThreadStarter.Enabled := True;
1200 end;
1201end;
1202
1203function TMicroThreadScheduler.GetMicroThreadCount: Integer;
1204begin
1205 try
1206 FMicroThreadsLock.Acquire;
1207 Result := FMicroThreads.Count;
1208 finally
1209 FMicroThreadsLock.Release;
1210 end;
1211end;
1212
1213function TMicroThreadScheduler.GetThreadPoolCount: Integer;
1214begin
1215 try
1216 FThreadPoolLock.Acquire;
1217 Result := FThreadPool.Count;
1218 finally
1219 FThreadPoolLock.Release;
1220 end;
1221end;
1222
1223function TMicroThreadScheduler.GetThreadPoolSize: Integer;
1224begin
1225 Result := FThreadPoolSize;
1226end;
1227
1228procedure TMicroThreadScheduler.SetActive(const AValue: Boolean);
1229begin
1230 if FActive = AValue then Exit;
1231 FActive := AValue;
1232 if AValue then Start
1233 else Stop;
1234end;
1235
1236procedure TMicroThreadScheduler.SetThreadPoolSize(const AValue: Integer);
1237begin
1238 FThreadPoolSize := AValue;
1239 if FState = ssRunning then
1240 UpdateThreadPoolSize;
1241end;
1242
1243initialization
1244
1245DeleteFile(LogFileName);
1246LogLock := TCriticalSection.Create;
1247MainScheduler := TMicroThreadScheduler.Create;
1248
1249finalization
1250
1251MainScheduler.Free;
1252LogLock.Free;
1253
1254end.
1255
Note: See TracBrowser for help on using the repository browser.