Ignore:
Timestamp:
Jan 26, 2011, 2:16:19 PM (13 years ago)
Author:
george
Message:
  • Modified: Not completed thread safe support.
File:
1 edited

Legend:

Unmodified
Added
Removed
  • MicroThreading/UMicroThreading.pas

    r147 r148  
    105105    FTerminated: Boolean;
    106106    FTempPointer: Pointer;
     107    FCurrentMicroThread: TMicroThread;
     108    FScheduler: TMicroThreadScheduler;
    107109    function Execute(Count: Integer): Integer;
    108110  public
    109     Scheduler: TMicroThreadScheduler;
    110     CurrentMicroThread: TMicroThread;
    111111    procedure Yield;
    112112    constructor Create;
    113113    destructor Destroy; override;
    114   end;
     114    property Scheduler: TMicroThreadScheduler read FScheduler;
     115    property CurrentMicroThread: TMicroThread read FCurrentMicroThread;
     116  end;
     117
     118  TMicroThreadSchedulerState = (ssStopped, ssRunning, ssTerminating);
    115119
    116120  { TMicroThreadScheduler }
     
    118122  TMicroThreadScheduler = class
    119123  private
    120     ThreadPool: TThreadPool;
    121     RoundRobinIndex: Integer;
     124    FActive: Boolean;
     125    FThreadPool: TThreadPool;
     126    FThreadPoolLock: TCriticalSection;
     127    FThreadPoolSize: Integer;
     128    FRoundRobinIndex: Integer;
    122129    FLastId: Integer;
    123130    FFrequency: Int64;
     131    FTerminate: Boolean;
    124132    FTerminated: Boolean;
     133    FMicroThreads: TObjectList; // TList<TMicroThread>
     134    FMainThreadManager: TMicroThreadManager;
     135    FMicroThreadsLock: TCriticalSection;
     136    FState: TMicroThreadSchedulerState;
    125137    function GetMicroThreadCount: Integer;
    126138    function GetThreadPoolSize: Integer;
     139    procedure SetActive(const AValue: Boolean);
    127140    procedure SetThreadPoolSize(const AValue: Integer);
    128141    function GetNextMicroThread: TMicroThread;
     142    procedure WaitFor;
     143    procedure Start;
     144    procedure Stop;
     145    function ThreadPoolTerminated: Boolean;
    129146  public
    130     MainThreadManager: TMicroThreadManager;
    131     MicroThreads: TObjectList; // TList<TMicroThread>
    132     Lock: TCriticalSection;
    133147    function GetNow: TDateTime;
    134148    function Add(MicroThread: TMicroThread): Integer;
    135149    function AddMethod(Method: TMicroThreadEvent): Integer;
     150    function GetCPUCoreCount: Integer;
    136151    constructor Create;
    137152    destructor Destroy; override;
    138     procedure Start;
    139     procedure Stop;
    140153    property MicroThreadCount: Integer read GetMicroThreadCount;
    141154    property ThreadPoolSize: Integer read GetThreadPoolSize
    142155      write SetThreadPoolSize;
     156    property MicroThreads: TObjectList read FMicroThreads;
     157    property MicroThreadsLock: TCriticalSection read FMicroThreadsLock;
     158    property MainThreadManager: TMicroThreadManager read FMainThreadManager;
     159    property Active: Boolean read FActive write SetActive;
    143160  end;
    144161
     
    167184  with MainScheduler do begin
    168185    try
    169       Lock.Acquire;
     186      FMicroThreadsLock.Acquire;
    170187      I := 0;
    171       while (I < MicroThreads.Count) and
    172         not ((CurrentStack >= TMicroThread(MicroThreads[I]).FStack) and
    173         (CurrentStack <= (TMicroThread(MicroThreads[I]).FStack +
    174         TMicroThread(MicroThreads[I]).FStackSize))) do Inc(I);
    175       if I < MicroThreads.Count then begin
    176         Result := TMicroThread(MicroThreads[I]).FId;
     188      while (I < FMicroThreads.Count) and
     189        not ((CurrentStack >= TMicroThread(FMicroThreads[I]).FStack) and
     190        (CurrentStack <= (TMicroThread(FMicroThreads[I]).FStack +
     191        TMicroThread(FMicroThreads[I]).FStackSize))) do Inc(I);
     192      if I < FMicroThreads.Count then begin
     193        Result := TMicroThread(FMicroThreads[I]).FId;
    177194      end else Result := -1;
    178195    finally
    179       Lock.Release;
     196      FMicroThreadsLock.Release;
    180197    end;
    181198  end;
     
    199216  CurrentTime: TDateTime;
    200217begin
    201   CurrentTime := Scheduler.GetNow;
    202   if Assigned(CurrentMicroThread) then begin
    203     CurrentMicroThread.FExecutionEndTime := CurrentTime;
    204     CurrentMicroThread.FExecutionTime := CurrentMicroThread.FExecutionTime +
    205       (CurrentMicroThread.FExecutionEndTime - CurrentMicroThread.FExecutionStartTime);
    206     if CurrentMicroThread.FState = tsRunning then
    207       CurrentMicroThread.FState := tsWaiting;
    208     StaticMicroThread := CurrentMicroThread;
     218  CurrentTime := FScheduler.GetNow;
     219  if Assigned(FCurrentMicroThread) then begin
     220    FCurrentMicroThread.FExecutionEndTime := CurrentTime;
     221    FCurrentMicroThread.FExecutionTime := FCurrentMicroThread.FExecutionTime +
     222      (FCurrentMicroThread.FExecutionEndTime - FCurrentMicroThread.FExecutionStartTime);
     223    if FCurrentMicroThread.FState = tsRunning then
     224      FCurrentMicroThread.FState := tsWaiting;
     225    StaticMicroThread := FCurrentMicroThread;
    209226    asm
    210227      // Store microthread stack
     
    215232      mov [eax].TMicroThread.FBasePointer, edx
    216233    end;
    217     StaticManager := CurrentMicroThread.FManager;
     234    StaticManager := FCurrentMicroThread.FManager;
    218235    asm
    219       // Restore scheduler stack
     236      // Restore FScheduler stack
    220237      mov eax, StaticManager  // Self is invalid before BP restore
    221238      mov edx, [eax].TMicroThreadManager.FStackPointer
     
    224241      mov ebp, edx
    225242    end;
    226     CurrentMicroThread.FManager := nil;
    227     CurrentMicroThread := nil;
    228   end;
    229 
    230   CurrentMicroThread := Scheduler.GetNextMicroThread;
    231 
    232   if Assigned(CurrentMicroThread) and (FExecutedCount < FExecuteCount) then begin
    233     CurrentMicroThread.FManager := Self;
     243    FCurrentMicroThread.FManager := nil;
     244    FCurrentMicroThread := nil;
     245  end;
     246
     247  FCurrentMicroThread := FScheduler.GetNextMicroThread;
     248
     249  if Assigned(FCurrentMicroThread) and (FExecutedCount < FExecuteCount) then begin
     250    FCurrentMicroThread.FManager := Self;
    234251    Inc(FExecutedCount);
    235252    asm
    236       // Store scheduler stack
     253      // Store FScheduler stack
    237254      mov eax, Self
    238255      mov edx, esp
     
    241258      mov [eax].TMicroThreadManager.FBasePointer, edx
    242259    end;
    243     if not CurrentMicroThread.FExecuted then begin
    244       CurrentMicroThread.FExecuted := True;
    245       CurrentMicroThread.FState := tsRunning;
    246       CurrentMicroThread.FExecutionStartTime := CurrentTime;
    247       StaticMicroThread := CurrentMicroThread;
     260    if not FCurrentMicroThread.FExecuted then begin
     261      FCurrentMicroThread.FExecuted := True;
     262      FCurrentMicroThread.FState := tsRunning;
     263      FCurrentMicroThread.FExecutionStartTime := CurrentTime;
     264      StaticMicroThread := FCurrentMicroThread;
    248265      asm
    249266        // Restore microthread stack
     
    260277      end;
    261278      //FSelected.Method(FSelected);
    262       StaticManager := CurrentMicroThread.FManager;
     279      StaticManager := FCurrentMicroThread.FManager;
    263280      asm
    264         // Restore scheduler stack
     281        // Restore FScheduler stack
    265282        mov eax, StaticManager // Self is invalid before BP restore
    266283        mov edx, [eax].TMicroThreadManager.FStackPointer
     
    269286        mov ebp, edx
    270287      end;
    271       CurrentMicroThread.FManager := nil;
    272       CurrentMicroThread.FExecutionEndTime := CurrentTime;
    273       CurrentMicroThread.FExecutionTime := CurrentMicroThread.FExecutionTime +
    274        (CurrentMicroThread.FExecutionEndTime - CurrentMicroThread.FExecutionStartTime);
    275       CurrentMicroThread.FFinished := True;
    276       if CurrentMicroThread.FFreeOnTerminate then begin
     288      FCurrentMicroThread.FManager := nil;
     289      FCurrentMicroThread.FExecutionEndTime := CurrentTime;
     290      FCurrentMicroThread.FExecutionTime := FCurrentMicroThread.FExecutionTime +
     291       (FCurrentMicroThread.FExecutionEndTime - FCurrentMicroThread.FExecutionStartTime);
     292      FCurrentMicroThread.FFinished := True;
     293      if FCurrentMicroThread.FFreeOnTerminate then begin
    277294        // Microthread is finished, remove it from queue
    278         with Scheduler do
     295        with FScheduler do
    279296        try
    280           Lock.Acquire;
    281           MicroThreads.Delete(MicroThreads.IndexOf(CurrentMicroThread));
     297          FMicroThreadsLock.Acquire;
     298          FMicroThreads.Delete(FMicroThreads.IndexOf(FCurrentMicroThread));
    282299        finally
    283           Lock.Release;
     300          FMicroThreadsLock.Release;
    284301        end;
    285302      end;
    286       CurrentMicroThread := nil;
     303      FCurrentMicroThread := nil;
    287304    end else
    288     if CurrentMicroThread.State = tsWaiting then begin
     305    if FCurrentMicroThread.State = tsWaiting then begin
    289306      // Execute selected thread
    290       CurrentMicroThread.FState := tsRunning;
    291       CurrentMicroThread.FExecutionStartTime := CurrentTime;
    292       FTempPointer := CurrentMicroThread.FStackPointer;
     307      FCurrentMicroThread.FState := tsRunning;
     308      FCurrentMicroThread.FExecutionStartTime := CurrentTime;
     309      FTempPointer := FCurrentMicroThread.FStackPointer;
    293310      asm
    294311        // Restore microthread stack
     
    297314        mov esp, edx
    298315      end;
    299       FTempPointer := CurrentMicroThread.FBasePointer;
     316      FTempPointer := FCurrentMicroThread.FBasePointer;
    300317      asm
    301318        mov eax, Self
     
    305322    end;
    306323  end else begin
    307     CurrentMicroThread := nil;
     324    FCurrentMicroThread := nil;
    308325  end;
    309326end;
     
    311328constructor TMicroThreadManager.Create;
    312329begin
    313   CurrentMicroThread := nil;
     330  FCurrentMicroThread := nil;
    314331end;
    315332
     
    469486  MicroThread.FScheduler := Self;
    470487  MicroThread.FId := FLastId;
    471   Result := MicroThreads.Add(MicroThread);
     488  Result := FMicroThreads.Add(MicroThread);
    472489end;
    473490
     
    482499end;
    483500
     501function TMicroThreadScheduler.GetCPUCoreCount: Integer;
     502var
     503  SystemInfo: _SYSTEM_INFO;
     504begin
     505  GetSystemInfo(SystemInfo);
     506  Result := SystemInfo.dwNumberOfProcessors;
     507end;
     508
    484509constructor TMicroThreadScheduler.Create;
    485510begin
    486   Lock := TCriticalSection.Create;
    487   MicroThreads := TObjectList.Create;
    488   ThreadPool := TThreadPool.Create;
     511  FTerminated := True;
     512  FMicroThreadsLock := TCriticalSection.Create;
     513  FMicroThreads := TObjectList.Create;
     514  FThreadPool := TThreadPool.Create;
     515  FThreadPoolLock := TCriticalSection.Create;
    489516  {$IFDEF Windows}
    490517  QueryPerformanceFrequency(FFrequency);
    491518  {$ENDIF}
    492   RoundRobinIndex := -1;
    493   MainThreadManager := TMicroThreadManager.Create;
    494   MainThreadManager.Scheduler := Self;
     519  FRoundRobinIndex := -1;
     520  FMainThreadManager := TMicroThreadManager.Create;
     521  FMainThreadManager.FScheduler := Self;
    495522end;
    496523
    497524destructor TMicroThreadScheduler.Destroy;
    498525begin
    499   MainThreadManager.Free;
    500   FTerminated := True;
    501   ThreadPool.Free;
    502   MicroThreads.Free;
    503   Lock.Free;
     526  Active := False;
     527  FMainThreadManager.Free;
     528  FThreadPool.Free;
     529  FMicroThreads.Free;
     530  FMicroThreadsLock.Free;
    504531  inherited Destroy;
    505532end;
     
    511538begin
    512539  FTerminated := False;
    513   for I := 0 to ThreadPool.Count - 1 do
    514     TMicroThreadSchedulerPoolThread(ThreadPool[I]).Start;
     540  FTerminate := False;
     541  for I := 0 to FThreadPool.Count - 1 do
     542    TMicroThreadSchedulerPoolThread(FThreadPool[I]).Start;
    515543  repeat
    516     Executed := MainThreadManager.Execute(10);
     544    Executed := FMainThreadManager.Execute(10);
    517545    Application.ProcessMessages;
    518546    if Executed = 0 then Sleep(1);
    519   until FTerminated;
     547  until FTerminate;
     548  FTerminated := True;
    520549end;
    521550
     
    524553  I: Integer;
    525554begin
    526   for I := 0 to ThreadPool.Count - 1 do
    527     TMicroThreadSchedulerPoolThread(ThreadPool[I]).Terminate;
    528   FTerminated := True;
     555  try
     556    FThreadPoolLock.Acquire;
     557    for I := 0 to FThreadPool.Count - 1 do begin
     558      TMicroThreadSchedulerPoolThread(FThreadPool[I]).Terminate;
     559    end;
     560  finally
     561    FThreadPoolLock.Release;
     562  end;
     563  FTerminate := True;
     564
     565  // Wait for all thread managers to finish
     566  repeat
     567    Application.ProcessMessages;
     568    Sleep(1);
     569  until FTerminated and (ThreadPoolSize = 0);
     570end;
     571
     572function TMicroThreadScheduler.ThreadPoolTerminated: Boolean;
     573var
     574  I: Integer;
     575begin
     576  try
     577    FThreadPoolLock.Acquire;
     578    I := 0;
     579    while (I < FThreadPool.Count) and
     580      (TMicroThreadSchedulerPoolThread(FThreadPool[I]).Terminated do
     581  finally
     582    FThreadPoolLock.Release;
     583  end;
    529584end;
    530585
     
    537592  Result := nil;
    538593  try
    539     Lock.Acquire;
     594    FMicroThreadsLock.Acquire;
    540595    I := 0;
    541     Inc(RoundRobinIndex);
    542     if RoundRobinIndex >= MicroThreads.Count then
    543       RoundRobinIndex := 0;
    544     while (I < MicroThreads.Count) and
    545      (TMicroThread(MicroThreads[RoundRobinIndex]).State <> tsWaiting) do begin
     596    Inc(FRoundRobinIndex);
     597    if FRoundRobinIndex >= FMicroThreads.Count then
     598      FRoundRobinIndex := 0;
     599    while (I < FMicroThreads.Count) and
     600     (TMicroThread(FMicroThreads[FRoundRobinIndex]).State <> tsWaiting) do begin
    546601      // WakeUp sleeping threads
    547       if (TMicroThread(MicroThreads[RoundRobinIndex]).FState = tsSleeping) and
    548         (TMicroThread(MicroThreads[RoundRobinIndex]).FWakeupTime < CurrentTime) then
    549           TMicroThread(MicroThreads[RoundRobinIndex]).FState := tsWaiting else
     602      if (TMicroThread(FMicroThreads[FRoundRobinIndex]).FState = tsSleeping) and
     603        (TMicroThread(FMicroThreads[FRoundRobinIndex]).FWakeupTime < CurrentTime) then
     604          TMicroThread(FMicroThreads[FRoundRobinIndex]).FState := tsWaiting else
    550605      begin
    551606        // Go to next thread
    552607        Inc(I);
    553         Inc(RoundRobinIndex);
    554         if RoundRobinIndex >= MicroThreads.Count then
    555           RoundRobinIndex := 0;
     608        Inc(FRoundRobinIndex);
     609        if FRoundRobinIndex >= FMicroThreads.Count then
     610          FRoundRobinIndex := 0;
    556611      end;
    557612    end;
    558     if I < MicroThreads.Count then begin
    559       Result := TMicroThread(MicroThreads[RoundRobinIndex]);
     613    if I < FMicroThreads.Count then begin
     614      Result := TMicroThread(FMicroThreads[FRoundRobinIndex]);
    560615    end;
    561616  finally
    562     Lock.Release;
     617    FMicroThreadsLock.Release;
    563618  end;
    564619end;
     
    567622begin
    568623  try
    569     Lock.Acquire;
    570     Result := MicroThreads.Count;
     624    FMicroThreadsLock.Acquire;
     625    Result := FMicroThreads.Count;
    571626  finally
    572     Lock.Release;
     627    FMicroThreadsLock.Release;
    573628  end;
    574629end;
     
    576631function TMicroThreadScheduler.GetThreadPoolSize: Integer;
    577632begin
    578   Result := ThreadPool.Count;
     633  Result := FThreadPoolSize;
     634end;
     635
     636procedure TMicroThreadScheduler.SetActive(const AValue: Boolean);
     637begin
     638  if FActive = AValue then Exit;
     639  FActive := AValue;
     640  if AValue then Start
     641    else Stop;
    579642end;
    580643
     
    584647  NewThread: TMicroThreadSchedulerPoolThread;
    585648begin
    586   if AValue > ThreadPool.Count then begin
    587     ThreadPool.Capacity := AValue;
    588     while ThreadPool.Count < AValue do begin
    589       NewThread := TMicroThreadSchedulerPoolThread.Create(True);
    590       NewThread.Manager.Scheduler := Self;
    591       ThreadPool.Add(NewThread);
    592     end;
    593   end else
    594   ThreadPool.Count := AValue;
     649  FThreadPoolSize := AValue;
     650  if FState = ssRunning then
     651    SetThreadPoolCount
    595652end;
    596653
Note: See TracChangeset for help on using the changeset viewer.