Ignore:
Timestamp:
Feb 8, 2011, 11:02:02 AM (13 years ago)
Author:
george
Message:
Location:
PinConnection
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • PinConnection

    • Property svn:ignore
      •  

        old new  
        11lib
         2backup
  • PinConnection/UCommProtocol.pas

    r129 r163  
    66
    77uses
    8   Classes, SysUtils, UVarBlockSerializer, syncobjs, UCommPin,
    9   UDebugLog, UStreamHelper, StopWatch, SpecializedList, UCommon;
     8  Classes, SysUtils, UVarBlockSerializer, syncobjs, UCommPin, UMicroThreading,
     9  UDebugLog, UStreamHelper, StopWatch, SpecializedList, UCommon, UPlatform,
     10  DateUtils;
    1011
    1112type
     
    2526  private
    2627    RepeatCounter: integer;
    27     ReceiveEvent: TEvent;
     28    ReceiveEvent: TMicroThreadEvent;
    2829    Request: TStreamHelper;
    2930    ResponseParameters: TVarBlockIndexed;
    3031    TransmitTime: TDateTime;
    31     StopWatch: TStopWatch;
    3232  public
    33     Lock: TCriticalSection;
     33    Lock: TMicroThreadCriticalSection;
    3434    SequenceNumber: Integer;
    3535    ResponseCode: Integer;
     
    3838    Timeouted: Boolean;
    3939    CommandIndex: TListInteger;
    40     Latency: Double;
     40    Latency: TDateTime;
    4141    constructor Create;
    4242    destructor Destroy; override;
     
    4848    SequenceNumber: integer;
    4949    Parent: TCommProtocol;
    50     Lock: TCriticalSection;
    51     function GetFree: TDeviceProtocolSession;
     50    Lock: TMicroThreadCriticalSection;
     51    procedure Add(Session: TDeviceProtocolSession);
    5252    function GetBySequence(Sequence: integer): TDeviceProtocolSession;
    5353    procedure Remove(Session: TDeviceProtocolSession);
    5454    constructor Create;
    5555    destructor Destroy; override;
     56    function GetSequenceNumber: Integer;
    5657  end;
    5758
     
    6263  { TRetransmitCheckThread }
    6364
    64   TRetransmitCheckThread = class(TThread)
     65  TRetransmitCheckThread = class(TMicroThread)
    6566  public
    6667    Parent: TCommProtocol;
     
    9495    Pin: TCommPin;
    9596    LastCommandResponseTime: TDateTime;
    96     LastLatency: Double;
     97    LastLatency: TDateTime;
    9798    procedure SendCommand(Command: array of integer;
    9899      ResponseParameters: TVarBlockIndexed = nil;
     
    149150                if TestIndex(4) then CommandError := ReadVarUInt(4)
    150151                  else CommandError := 0;
    151                 StopWatch.Stop;
    152                 Latency := StopWatch.ElapsedMiliseconds;
     152                Latency := NowPrecise - TransmitTime;
    153153                ReceiveEvent.SetEvent;
    154154              finally
     
    254254      Parent := Self;
    255255      FreeOnTerminate := False;
     256      Name := 'CommProtocol';
    256257      Start;
    257258    end;
     
    268269  NewRequest: TVarBlockIndexed;
    269270begin
    270   Session := Sessions.GetFree;
    271271  try
     272    Session := TDeviceProtocolSession.Create;
     273    Sessions.Add(Session);
    272274    NewRequest := TVarBlockIndexed.Create;
    273275
     
    276278      try
    277279        Session.Lock.Acquire;
     280        Session.SequenceNumber := Sessions.GetSequenceNumber;
    278281        CommandIndex.Clear;
    279282        CommandIndex.AddArray(Command);
     
    290293
    291294        // Wait for free remote buffer
    292         repeat
    293           Sleep(1);
    294         until (RemoteBufferUsed + Request.Size) <= RemoteBufferSize;
    295 
    296         StopWatch.Start;
    297         TransmitTime := Now;
     295        while (RemoteBufferUsed + Request.Size) > RemoteBufferSize do
     296          MTSleep(1 * OneMillisecond);
     297
     298        //StopWatch.Start;
     299        TransmitTime := NowPrecise;
    298300        Pin.Send(Request);
    299301      finally
     
    301303      end;
    302304      try
    303         RemoteBufferUsed := RemoteBufferUsed + Request.Size;
    304         while ReceiveEvent.WaitFor(10) = wrTimeout do begin
     305        try
     306          Sessions.Lock.Acquire;
     307          RemoteBufferUsed := RemoteBufferUsed + Request.Size;
     308        finally
     309          Sessions.Lock.Release;
     310        end;
     311        while MTWaitForEvent(ReceiveEvent, 10 * OneMillisecond) = wrTimeout do begin
    305312          if Timeouted then
    306313            raise ECommTimeout.Create(SResponseTimeout);
     
    311318          raise ECommResponseCodeError.Create(Format(SResponseError, [CommandIndex.Implode('.', IntToStr), IntToStr(ResponseCode)]));
    312319        end;
    313         LastCommandResponseTime := Now;
     320        LastCommandResponseTime := NowPrecise;
    314321        LastLatency := Latency;
    315322      finally
    316         RemoteBufferUsed := RemoteBufferUsed - Session.Request.Size;
    317         if RemoteBufferUsed < 0 then RemoteBufferUsed := 0;
     323        try
     324          Sessions.Lock.Acquire;
     325          RemoteBufferUsed := RemoteBufferUsed - Session.Request.Size;
     326          if RemoteBufferUsed < 0 then RemoteBufferUsed := 0;
     327        finally
     328          Sessions.Lock.Release;
     329        end;
    318330        Sessions.Remove(Session);
    319331      end;
     
    332344  MaxSessionCount := 10;
    333345  MaxSequenceNumber := 127;
    334   RetransmitTimeout := 1 / 24 / 3600 * 1;
     346  RetransmitTimeout := 2 * OneSecond;
    335347  RetransmitRepeatCount := 3;
    336348  RetransmitTotalCount := 0;
     
    350362constructor TDeviceProtocolSession.Create;
    351363begin
    352   StopWatch := TStopWatch.Create;
    353364  ResponseCode := 0;
    354   Lock := TCriticalSection.Create;
    355   ReceiveEvent := TEvent.Create(nil, False, False, '');
     365  Lock := TMicroThreadCriticalSection.Create;
     366  ReceiveEvent := TMicroThreadEvent.Create;
     367  ReceiveEvent.AutoReset := False;
    356368  Request := TStreamHelper.Create;
    357369  ResponseParameters := nil;
    358370  CommandIndex := TListInteger.Create;
    359371  Latency := 0;
     372  TransmitTime := 0;
    360373end;
    361374
     
    366379  ReceiveEvent.Free;
    367380  Lock.Free;
    368   StopWatch.Free;
    369381  inherited Destroy;
    370382end;
     
    372384{ TDeviceProtocolSessionList }
    373385
    374 function TDeviceProtocolSessionList.GetFree: TDeviceProtocolSession;
     386procedure TDeviceProtocolSessionList.Add(Session: TDeviceProtocolSession);
    375387begin
    376388  // Block if no free session available
    377   Lock.Acquire;
    378389  try
     390    Lock.Acquire;
    379391    while Count >= Parent.MaxSessionCount do
    380392    begin
    381       Lock.Release;
    382       Sleep(1);
    383       Lock.Acquire;
    384     end;
    385     Result := TDeviceProtocolSession.Create;
    386     Result.SequenceNumber := SequenceNumber;
    387     Inc(SequenceNumber);
    388     if SequenceNumber > Parent.MaxSequenceNumber then
    389       SequenceNumber := 0;
    390     Add(Result);
     393      try
     394        Lock.Release;
     395        MTSleep(1 * OneMillisecond);
     396      finally
     397        Lock.Acquire;
     398      end;
     399    end;
     400    inherited Add(Session);
    391401  finally
    392402    Lock.Release;
     
    421431begin
    422432  inherited Create;
    423   Lock := TCriticalSection.Create;
     433  Lock := TMicroThreadCriticalSection.Create;
    424434end;
    425435
     
    428438  // Free session list before freeing Lock
    429439  // instead of freeing in inherited Destroy in TListObject
    430   Lock.Acquire;
    431   Clear;
    432   Lock.Release;
     440  try
     441    Lock.Acquire;
     442    Clear;
     443  finally
     444    Lock.Release;
     445  end;
    433446
    434447  Lock.Free;
     
    436449end;
    437450
     451function TDeviceProtocolSessionList.GetSequenceNumber: Integer;
     452begin
     453  try
     454    Lock.Acquire;
     455    Inc(SequenceNumber);
     456    if SequenceNumber > Parent.MaxSequenceNumber then
     457      SequenceNumber := 0;
     458    Result := SequenceNumber;
     459  finally
     460    Lock.Release;
     461  end;
     462end;
     463
    438464{ TRetransmitCheckThread }
    439465
    440466procedure TRetransmitCheckThread.Execute;
    441467var
    442   I: integer;
    443 begin
    444   with Parent do repeat
    445       try
    446         try
    447           Parent.Sessions.Lock.Acquire;
    448           for I := 0 to Sessions.Count - 1 do begin
    449           with TDeviceProtocolSession(Sessions[I]) do begin
    450               try
    451                 Lock.Acquire;
    452                 if Now > (TransmitTime + RetransmitTimeout) then begin
    453                   if RepeatCounter < RetransmitRepeatCount then begin
    454                     Pin.Send(Request);
    455                     StopWatch.Start;
    456                     TransmitTime := Now;
    457                     Inc(RepeatCounter);
    458                     Inc(RetransmitTotalCount);
    459                   end else
    460                     Timeouted := True;
    461                 end;
    462               finally
    463                 Lock.Release;
    464               end;
     468  I: Integer;
     469  C: Integer;
     470  Session: TDeviceProtocolSession;
     471begin
     472  with Parent do
     473  repeat
     474    try
     475      Parent.Sessions.Lock.Acquire;
     476      I := 0;
     477      while I < Sessions.Count do begin
     478        Session := TDeviceProtocolSession(Sessions[I]);
     479        with TDeviceProtocolSession(Sessions[I]) do begin
     480          try
     481            Session.Lock.Acquire;
     482            if (TransmitTime > 0) and (NowPrecise > (TransmitTime + RetransmitTimeout)) then begin
     483              if RepeatCounter < RetransmitRepeatCount then begin
     484                Pin.Send(Request);
     485                TransmitTime := NowPrecise;
     486                Inc(RepeatCounter);
     487                Inc(RetransmitTotalCount);
     488              end else
     489                Timeouted := True;
    465490            end;
     491          finally
     492            Session.Lock.Release;
    466493          end;
    467         finally
    468           Parent.Sessions.Lock.Release;
    469         end;
    470 
    471       if not Terminated then
    472         Sleep(CheckPeriod);
    473       except
    474         on E: Exception do begin
    475           if Assigned(ExceptionHandler) then ExceptionHandler(Self, E);
    476         end;
     494        end;
     495        Inc(I);
    477496      end;
    478     until Terminated;
     497    finally
     498      Parent.Sessions.Lock.Release;
     499    end;
     500
     501    if not Terminated then
     502      MTSleep(CheckPeriod * OneMillisecond);
     503  until Terminated;
    479504end;
    480505
Note: See TracChangeset for help on using the changeset viewer.