Changeset 163 for PinConnection


Ignore:
Timestamp:
Feb 8, 2011, 11:02:02 AM (13 years ago)
Author:
george
Message:
Location:
PinConnection
Files:
8 edited

Legend:

Unmodified
Added
Removed
  • PinConnection

    • Property svn:ignore
      •  

        old new  
        11lib
         2backup
  • PinConnection/PinConnection.lpk

    r123 r163  
    66    <Author Value="Chronos"/>
    77    <CompilerOptions>
    8       <Version Value="9"/>
     8      <Version Value="10"/>
    99      <PathDelim Value="\"/>
    1010      <SearchPaths>
     
    1717    <Description Value="System for class bidirectional communication interconnection."/>
    1818    <License Value="GNU/GPL"/>
    19     <Version Minor="1"/>
     19    <Version Minor="2"/>
    2020    <Files Count="9">
    2121      <Item1>
     
    5757    </Files>
    5858    <Type Value="RunAndDesignTime"/>
    59     <RequiredPkgs Count="4">
     59    <RequiredPkgs Count="5">
    6060      <Item1>
    61         <PackageName Value="synapse"/>
     61        <PackageName Value="MicroThreading"/>
    6262      </Item1>
    6363      <Item2>
    64         <PackageName Value="Common"/>
     64        <PackageName Value="synapse"/>
    6565      </Item2>
    6666      <Item3>
    67         <PackageName Value="CoolStreaming"/>
     67        <PackageName Value="Common"/>
    6868      </Item3>
    6969      <Item4>
     70        <PackageName Value="CoolStreaming"/>
     71      </Item4>
     72      <Item5>
    7073        <PackageName Value="FCL"/>
    7174        <MinVersion Major="1" Valid="True"/>
    72       </Item4>
     75      </Item5>
    7376    </RequiredPkgs>
    7477    <UsageOptions>
  • PinConnection/UCommProtocol.pas

    r129 r163  
    66
    77uses
    8   Classes, SysUtils, UVarBlockSerializer, syncobjs, UCommPin,
    9   UDebugLog, UStreamHelper, StopWatch, SpecializedList, UCommon;
     8  Classes, SysUtils, UVarBlockSerializer, syncobjs, UCommPin, UMicroThreading,
     9  UDebugLog, UStreamHelper, StopWatch, SpecializedList, UCommon, UPlatform,
     10  DateUtils;
    1011
    1112type
     
    2526  private
    2627    RepeatCounter: integer;
    27     ReceiveEvent: TEvent;
     28    ReceiveEvent: TMicroThreadEvent;
    2829    Request: TStreamHelper;
    2930    ResponseParameters: TVarBlockIndexed;
    3031    TransmitTime: TDateTime;
    31     StopWatch: TStopWatch;
    3232  public
    33     Lock: TCriticalSection;
     33    Lock: TMicroThreadCriticalSection;
    3434    SequenceNumber: Integer;
    3535    ResponseCode: Integer;
     
    3838    Timeouted: Boolean;
    3939    CommandIndex: TListInteger;
    40     Latency: Double;
     40    Latency: TDateTime;
    4141    constructor Create;
    4242    destructor Destroy; override;
     
    4848    SequenceNumber: integer;
    4949    Parent: TCommProtocol;
    50     Lock: TCriticalSection;
    51     function GetFree: TDeviceProtocolSession;
     50    Lock: TMicroThreadCriticalSection;
     51    procedure Add(Session: TDeviceProtocolSession);
    5252    function GetBySequence(Sequence: integer): TDeviceProtocolSession;
    5353    procedure Remove(Session: TDeviceProtocolSession);
    5454    constructor Create;
    5555    destructor Destroy; override;
     56    function GetSequenceNumber: Integer;
    5657  end;
    5758
     
    6263  { TRetransmitCheckThread }
    6364
    64   TRetransmitCheckThread = class(TThread)
     65  TRetransmitCheckThread = class(TMicroThread)
    6566  public
    6667    Parent: TCommProtocol;
     
    9495    Pin: TCommPin;
    9596    LastCommandResponseTime: TDateTime;
    96     LastLatency: Double;
     97    LastLatency: TDateTime;
    9798    procedure SendCommand(Command: array of integer;
    9899      ResponseParameters: TVarBlockIndexed = nil;
     
    149150                if TestIndex(4) then CommandError := ReadVarUInt(4)
    150151                  else CommandError := 0;
    151                 StopWatch.Stop;
    152                 Latency := StopWatch.ElapsedMiliseconds;
     152                Latency := NowPrecise - TransmitTime;
    153153                ReceiveEvent.SetEvent;
    154154              finally
     
    254254      Parent := Self;
    255255      FreeOnTerminate := False;
     256      Name := 'CommProtocol';
    256257      Start;
    257258    end;
     
    268269  NewRequest: TVarBlockIndexed;
    269270begin
    270   Session := Sessions.GetFree;
    271271  try
     272    Session := TDeviceProtocolSession.Create;
     273    Sessions.Add(Session);
    272274    NewRequest := TVarBlockIndexed.Create;
    273275
     
    276278      try
    277279        Session.Lock.Acquire;
     280        Session.SequenceNumber := Sessions.GetSequenceNumber;
    278281        CommandIndex.Clear;
    279282        CommandIndex.AddArray(Command);
     
    290293
    291294        // Wait for free remote buffer
    292         repeat
    293           Sleep(1);
    294         until (RemoteBufferUsed + Request.Size) <= RemoteBufferSize;
    295 
    296         StopWatch.Start;
    297         TransmitTime := Now;
     295        while (RemoteBufferUsed + Request.Size) > RemoteBufferSize do
     296          MTSleep(1 * OneMillisecond);
     297
     298        //StopWatch.Start;
     299        TransmitTime := NowPrecise;
    298300        Pin.Send(Request);
    299301      finally
     
    301303      end;
    302304      try
    303         RemoteBufferUsed := RemoteBufferUsed + Request.Size;
    304         while ReceiveEvent.WaitFor(10) = wrTimeout do begin
     305        try
     306          Sessions.Lock.Acquire;
     307          RemoteBufferUsed := RemoteBufferUsed + Request.Size;
     308        finally
     309          Sessions.Lock.Release;
     310        end;
     311        while MTWaitForEvent(ReceiveEvent, 10 * OneMillisecond) = wrTimeout do begin
    305312          if Timeouted then
    306313            raise ECommTimeout.Create(SResponseTimeout);
     
    311318          raise ECommResponseCodeError.Create(Format(SResponseError, [CommandIndex.Implode('.', IntToStr), IntToStr(ResponseCode)]));
    312319        end;
    313         LastCommandResponseTime := Now;
     320        LastCommandResponseTime := NowPrecise;
    314321        LastLatency := Latency;
    315322      finally
    316         RemoteBufferUsed := RemoteBufferUsed - Session.Request.Size;
    317         if RemoteBufferUsed < 0 then RemoteBufferUsed := 0;
     323        try
     324          Sessions.Lock.Acquire;
     325          RemoteBufferUsed := RemoteBufferUsed - Session.Request.Size;
     326          if RemoteBufferUsed < 0 then RemoteBufferUsed := 0;
     327        finally
     328          Sessions.Lock.Release;
     329        end;
    318330        Sessions.Remove(Session);
    319331      end;
     
    332344  MaxSessionCount := 10;
    333345  MaxSequenceNumber := 127;
    334   RetransmitTimeout := 1 / 24 / 3600 * 1;
     346  RetransmitTimeout := 2 * OneSecond;
    335347  RetransmitRepeatCount := 3;
    336348  RetransmitTotalCount := 0;
     
    350362constructor TDeviceProtocolSession.Create;
    351363begin
    352   StopWatch := TStopWatch.Create;
    353364  ResponseCode := 0;
    354   Lock := TCriticalSection.Create;
    355   ReceiveEvent := TEvent.Create(nil, False, False, '');
     365  Lock := TMicroThreadCriticalSection.Create;
     366  ReceiveEvent := TMicroThreadEvent.Create;
     367  ReceiveEvent.AutoReset := False;
    356368  Request := TStreamHelper.Create;
    357369  ResponseParameters := nil;
    358370  CommandIndex := TListInteger.Create;
    359371  Latency := 0;
     372  TransmitTime := 0;
    360373end;
    361374
     
    366379  ReceiveEvent.Free;
    367380  Lock.Free;
    368   StopWatch.Free;
    369381  inherited Destroy;
    370382end;
     
    372384{ TDeviceProtocolSessionList }
    373385
    374 function TDeviceProtocolSessionList.GetFree: TDeviceProtocolSession;
     386procedure TDeviceProtocolSessionList.Add(Session: TDeviceProtocolSession);
    375387begin
    376388  // Block if no free session available
    377   Lock.Acquire;
    378389  try
     390    Lock.Acquire;
    379391    while Count >= Parent.MaxSessionCount do
    380392    begin
    381       Lock.Release;
    382       Sleep(1);
    383       Lock.Acquire;
    384     end;
    385     Result := TDeviceProtocolSession.Create;
    386     Result.SequenceNumber := SequenceNumber;
    387     Inc(SequenceNumber);
    388     if SequenceNumber > Parent.MaxSequenceNumber then
    389       SequenceNumber := 0;
    390     Add(Result);
     393      try
     394        Lock.Release;
     395        MTSleep(1 * OneMillisecond);
     396      finally
     397        Lock.Acquire;
     398      end;
     399    end;
     400    inherited Add(Session);
    391401  finally
    392402    Lock.Release;
     
    421431begin
    422432  inherited Create;
    423   Lock := TCriticalSection.Create;
     433  Lock := TMicroThreadCriticalSection.Create;
    424434end;
    425435
     
    428438  // Free session list before freeing Lock
    429439  // instead of freeing in inherited Destroy in TListObject
    430   Lock.Acquire;
    431   Clear;
    432   Lock.Release;
     440  try
     441    Lock.Acquire;
     442    Clear;
     443  finally
     444    Lock.Release;
     445  end;
    433446
    434447  Lock.Free;
     
    436449end;
    437450
     451function TDeviceProtocolSessionList.GetSequenceNumber: Integer;
     452begin
     453  try
     454    Lock.Acquire;
     455    Inc(SequenceNumber);
     456    if SequenceNumber > Parent.MaxSequenceNumber then
     457      SequenceNumber := 0;
     458    Result := SequenceNumber;
     459  finally
     460    Lock.Release;
     461  end;
     462end;
     463
    438464{ TRetransmitCheckThread }
    439465
    440466procedure TRetransmitCheckThread.Execute;
    441467var
    442   I: integer;
    443 begin
    444   with Parent do repeat
    445       try
    446         try
    447           Parent.Sessions.Lock.Acquire;
    448           for I := 0 to Sessions.Count - 1 do begin
    449           with TDeviceProtocolSession(Sessions[I]) do begin
    450               try
    451                 Lock.Acquire;
    452                 if Now > (TransmitTime + RetransmitTimeout) then begin
    453                   if RepeatCounter < RetransmitRepeatCount then begin
    454                     Pin.Send(Request);
    455                     StopWatch.Start;
    456                     TransmitTime := Now;
    457                     Inc(RepeatCounter);
    458                     Inc(RetransmitTotalCount);
    459                   end else
    460                     Timeouted := True;
    461                 end;
    462               finally
    463                 Lock.Release;
    464               end;
     468  I: Integer;
     469  C: Integer;
     470  Session: TDeviceProtocolSession;
     471begin
     472  with Parent do
     473  repeat
     474    try
     475      Parent.Sessions.Lock.Acquire;
     476      I := 0;
     477      while I < Sessions.Count do begin
     478        Session := TDeviceProtocolSession(Sessions[I]);
     479        with TDeviceProtocolSession(Sessions[I]) do begin
     480          try
     481            Session.Lock.Acquire;
     482            if (TransmitTime > 0) and (NowPrecise > (TransmitTime + RetransmitTimeout)) then begin
     483              if RepeatCounter < RetransmitRepeatCount then begin
     484                Pin.Send(Request);
     485                TransmitTime := NowPrecise;
     486                Inc(RepeatCounter);
     487                Inc(RetransmitTotalCount);
     488              end else
     489                Timeouted := True;
    465490            end;
     491          finally
     492            Session.Lock.Release;
    466493          end;
    467         finally
    468           Parent.Sessions.Lock.Release;
    469         end;
    470 
    471       if not Terminated then
    472         Sleep(CheckPeriod);
    473       except
    474         on E: Exception do begin
    475           if Assigned(ExceptionHandler) then ExceptionHandler(Self, E);
    476         end;
     494        end;
     495        Inc(I);
    477496      end;
    478     until Terminated;
     497    finally
     498      Parent.Sessions.Lock.Release;
     499    end;
     500
     501    if not Terminated then
     502      MTSleep(CheckPeriod * OneMillisecond);
     503  until Terminated;
    479504end;
    480505
  • PinConnection/UCommSerialPort.pas

    r119 r163  
    66
    77uses
    8   Classes, USerialPort, UCommPin, SysUtils;
     8  Classes, USerialPort, UCommPin, SysUtils, UMicroThreading, DateUtils,
     9  SyncObjs;
    910
    1011type
     
    1415    procedure ReceiveData(Stream: TMemoryStream);
    1516  public
     17    Lock: TMicroThreadCriticalSection;
    1618    Pin: TCommPin;
    1719    destructor Destroy; override;
     
    3335begin
    3436  inherited;
     37  Lock := TMicroThreadCriticalSection.Create;
    3538  Pin := TCommPin.Create;
    3639  Pin.OnReceive := Receive;
     
    4245  OnReceiveData := nil;
    4346  Pin.Free;
     47  Lock.Free;
    4448  inherited;
    4549end;
     
    4953  Stream.Position := 0;
    5054  repeat
    51     SendStreamRaw(Stream);
    52     Sleep(1);
     55    try
     56      Lock.Acquire;
     57      SendStreamRaw(Stream);
     58    finally
     59      Lock.Release;
     60    end;
     61    MTSleep(1 * OneMillisecond);
    5362  until Stream.Position = Stream.Size;
    5463end;
  • PinConnection/UCommSocket.pas

    r129 r163  
    66
    77uses
    8   Classes, SysUtils, blcksock, UCommPin, UCommon;
     8  Classes, SysUtils, blcksock, UCommPin, UCommon, UMicroThreading,
     9  DateUtils;
    910
    1011type
     
    1516  { TCommSocketReceiveThread }
    1617
    17   TCommSocketReceiveThread = class(TThread)
     18  TCommSocketReceiveThread = class(TMicroThread)
    1819  public
    1920    Parent: TCommSocket;
     
    9394  InBufferUsed := 0;
    9495  with Parent do repeat
    95     try
    96       if InBufferUsed = 0 then Sleep(1);
     96      if InBufferUsed = 0 then MTSleep(1 * OneMillisecond)
     97        else Yield;
    9798      if Assigned(Socket) then
    9899      with Socket do
    99       if CanRead(100) then begin
     100      if CanRead(0) then begin
    100101        InBufferUsed := WaitingData;
    101102        if InBufferUsed > 0 then begin
     
    109110        end else InBufferUsed := 0;
    110111      end else InBufferUsed := 0;
    111     except
    112       on E: Exception do
    113         if Assigned(ExceptionHandler) then ExceptionHandler(Self, E);
    114     end;
    115112  until Terminated;
    116113end;
  • PinConnection/UCommThread.pas

    r129 r163  
    66
    77uses
    8   Classes, SysUtils, blcksock, UCommPin, SyncObjs, UStreamHelper, UCommon;
     8  Classes, SysUtils, blcksock, UCommPin, SyncObjs, UStreamHelper, UCommon,
     9  UMicroThreading, DateUtils;
    910
    1011type
     
    1516  { TCommThreadReceiveThread }
    1617
    17   TCommThreadReceiveThread = class(TThread)
     18  TCommThreadReceiveThread = class(TMicroThread)
    1819  public
    1920    Parent: TCommThread;
     
    3334    FReceiveThread: TCommThreadReceiveThread;
    3435    FInputBuffer: TMemoryStream;
    35     FInputBufferLock: TCriticalSection;
    36     FDataAvailable: TEvent;
     36    FInputBufferLock: TMicroThreadCriticalSection;
     37    FDataAvailable: TMicroThreadEvent;
    3738    procedure ReceiveData(Sender: TCommPin; Stream: TStream);
    3839    procedure ExtReceiveData(Sender: TCommPin; Stream: TStream);
     
    9192  inherited Create;
    9293  FInputBuffer := TMemoryStream.Create;
    93   FInputBufferLock := TCriticalSection.Create;
     94  FInputBufferLock := TMicroThreadCriticalSection.Create;
    9495  Ext := TCommPin.Create;
    9596  Ext.OnReceive := ExtReceiveData;
    9697  Pin := TCommPin.Create;
    9798  Pin.OnReceive := ReceiveData;
    98   FDataAvailable := TEvent.Create(nil, False, False, '');
     99  FDataAvailable := TMicroThreadEvent.Create;
    99100end;
    100101
     
    119120    StreamHelper := TStreamHelper.Create(Stream);
    120121  with Parent do repeat
    121     try
    122       if FDataAvailable.WaitFor(1) = wrSignaled then try
     122      if FDataAvailable.WaitFor(1 * OneMillisecond) = wrSignaled then try
    123123        FInputBufferLock.Acquire;
    124124        Stream.Size := 0;
     
    129129        FInputBufferLock.Release;
    130130      end;
    131     except
    132       on E: Exception do
    133         if Assigned(ExceptionHandler) then ExceptionHandler(Self, E);
    134     end;
     131      Yield;
    135132  until Terminated;
    136133
  • PinConnection/UPacketBurst.pas

    r129 r163  
    66
    77uses
    8   Classes, UCommPin, SyncObjs, UStreamHelper, UCommon, SysUtils;
     8  Classes, UCommPin, SyncObjs, UStreamHelper, UCommon, SysUtils,
     9  UMicroThreading, DateUtils;
    910
    1011type
    1112  TPacketBurst = class;
    1213
    13   TPacketBurstSendThread = class(TThread)
     14  TPacketBurstSendThread = class(TMicroThread)
    1415    PacketBurst: TPacketBurst;
    1516    procedure Execute; override;
    1617  end;
    1718
     19  { TPacketBurst }
     20
    1821  TPacketBurst = class
    1922  private
    20     SendThreadEvent: TEvent;
     23    FActive: Boolean;
     24    SendThreadEvent: TMicroThreadEvent;
    2125    SendThread: TPacketBurstSendThread;
    2226    SendStreamLock: TCriticalSection;
     
    2529    procedure PacketSingleReceive(Sender: TCommPin; Stream: TStream);
    2630    procedure PacketBurstReceive(Sender: TCommPin; Stream: TStream);
     31    procedure SetActive(const AValue: Boolean);
    2732  public
    28     SendPeriod: Integer; // ms
     33    SendPeriod: TDateTime;
    2934    SendBurstSize: Integer;
    3035    PacketSinglePin: TCommPin;
     
    3237    destructor Destroy; override;
    3338    constructor Create;
     39    property Active: Boolean read FActive write SetActive;
    3440  end;
    3541
     
    4450  PacketBurstPin := TCommPin.Create;
    4551  PacketBurstPin.OnReceive := PacketBurstReceive;
    46   SendThread := TPacketBurstSendThread.Create(True);
    47   SendThread.PacketBurst := Self;
    48   SendThread.Start;
     52  SendThread := TMicroThreadEvent.Create;
     53  SendPeriod := OneMillisecond;
    4954end;
    5055
    5156destructor TPacketBurst.Destroy;
    5257begin
    53   SendThread.Free;
     58  Active := False;
     59  SendThreadEvent.Free;
    5460  PacketSinglePin.Free;
    5561  PacketBurstPin.Free;
     
    7985end;
    8086
     87procedure TPacketBurst.SetActive(const AValue: Boolean);
     88begin
     89  if FActive = AValue then Exit;
     90  FActive := AValue;
     91  if AValue then begin
     92    SendThread := TPacketBurstSendThread.Create(True);
     93    SendThread.FreeOnTerminate := False;
     94    SendThread.PacketBurst := Self;
     95    SendThread.Name := 'PacketBurst';
     96    SendThread.Start;
     97  end else begin
     98    FreeAndNil(SendThread);
     99  end;
     100end;
     101
    81102procedure TPacketBurst.PacketSingleReceive(Sender: TCommPin; Stream: TStream);
    82103var
     
    101122  Stream: TStreamHelper;
    102123begin
    103   inherited;
    104   try
    105124  try
    106125    Stream := TStreamHelper.Create;
     
    127146    Stream.Free;
    128147  end;
    129   except
    130     on E: Exception do
    131       if Assigned(ExceptionHandler) then ExceptionHandler(Self, E);
    132   end;
    133148end;
    134149
  • PinConnection/USerialPort.pas

    r129 r163  
    66
    77uses
    8   Classes, SysUtils, SynaSer, StdCtrls, Dialogs, UCommon;
     8  Classes, SysUtils, SynaSer, StdCtrls, Dialogs, UCommon, UMicroThreading,
     9  DateUtils;
    910
    1011type
     
    2223  { TSerialPortReceiveThread }
    2324
    24   TSerialPortReceiveThread = class(TThread)
     25  TSerialPortReceiveThread = class(TMicroThread)
    2526  public
    2627    Parent: TSerialPort;
     
    154155  FReceiveThread.FreeOnTerminate := False;
    155156  FReceiveThread.Parent := Self;
     157  FReceiveThread.Name := 'SerialPort';
    156158  FReceiveThread.Start;
    157159end;
     
    290292  InBufferUsed := 0;
    291293  with Parent do repeat
    292     try
    293       if InBufferUsed = 0 then Sleep(1);
     294      if InBufferUsed = 0 then MTSleep(1 * OneMillisecond)
     295        else Yield;
    294296      if Active then begin
    295297        InBufferUsed := WaitingData;
     
    305307        end else InBufferUsed := 0;
    306308      end else InBufferUsed := 0;
    307     except
    308       on E: Exception do
    309         if Assigned(ExceptionHandler) then ExceptionHandler(Self, E);
    310     end;
    311309  until Terminated;
    312310end;
Note: See TracChangeset for help on using the changeset viewer.