Changeset 163
- Timestamp:
- Feb 8, 2011, 11:02:02 AM (14 years ago)
- Location:
- PinConnection
- Files:
-
- 8 edited
Legend:
- Unmodified
- Added
- Removed
-
PinConnection
- Property svn:ignore
-
old new 1 1 lib 2 backup
-
- Property svn:ignore
-
PinConnection/PinConnection.lpk
r123 r163 6 6 <Author Value="Chronos"/> 7 7 <CompilerOptions> 8 <Version Value=" 9"/>8 <Version Value="10"/> 9 9 <PathDelim Value="\"/> 10 10 <SearchPaths> … … 17 17 <Description Value="System for class bidirectional communication interconnection."/> 18 18 <License Value="GNU/GPL"/> 19 <Version Minor=" 1"/>19 <Version Minor="2"/> 20 20 <Files Count="9"> 21 21 <Item1> … … 57 57 </Files> 58 58 <Type Value="RunAndDesignTime"/> 59 <RequiredPkgs Count=" 4">59 <RequiredPkgs Count="5"> 60 60 <Item1> 61 <PackageName Value=" synapse"/>61 <PackageName Value="MicroThreading"/> 62 62 </Item1> 63 63 <Item2> 64 <PackageName Value=" Common"/>64 <PackageName Value="synapse"/> 65 65 </Item2> 66 66 <Item3> 67 <PackageName Value="Co olStreaming"/>67 <PackageName Value="Common"/> 68 68 </Item3> 69 69 <Item4> 70 <PackageName Value="CoolStreaming"/> 71 </Item4> 72 <Item5> 70 73 <PackageName Value="FCL"/> 71 74 <MinVersion Major="1" Valid="True"/> 72 </Item 4>75 </Item5> 73 76 </RequiredPkgs> 74 77 <UsageOptions> -
PinConnection/UCommProtocol.pas
r129 r163 6 6 7 7 uses 8 Classes, SysUtils, UVarBlockSerializer, syncobjs, UCommPin, 9 UDebugLog, UStreamHelper, StopWatch, SpecializedList, UCommon; 8 Classes, SysUtils, UVarBlockSerializer, syncobjs, UCommPin, UMicroThreading, 9 UDebugLog, UStreamHelper, StopWatch, SpecializedList, UCommon, UPlatform, 10 DateUtils; 10 11 11 12 type … … 25 26 private 26 27 RepeatCounter: integer; 27 ReceiveEvent: T Event;28 ReceiveEvent: TMicroThreadEvent; 28 29 Request: TStreamHelper; 29 30 ResponseParameters: TVarBlockIndexed; 30 31 TransmitTime: TDateTime; 31 StopWatch: TStopWatch;32 32 public 33 Lock: T CriticalSection;33 Lock: TMicroThreadCriticalSection; 34 34 SequenceNumber: Integer; 35 35 ResponseCode: Integer; … … 38 38 Timeouted: Boolean; 39 39 CommandIndex: TListInteger; 40 Latency: Double;40 Latency: TDateTime; 41 41 constructor Create; 42 42 destructor Destroy; override; … … 48 48 SequenceNumber: integer; 49 49 Parent: TCommProtocol; 50 Lock: T CriticalSection;51 function GetFree: TDeviceProtocolSession;50 Lock: TMicroThreadCriticalSection; 51 procedure Add(Session: TDeviceProtocolSession); 52 52 function GetBySequence(Sequence: integer): TDeviceProtocolSession; 53 53 procedure Remove(Session: TDeviceProtocolSession); 54 54 constructor Create; 55 55 destructor Destroy; override; 56 function GetSequenceNumber: Integer; 56 57 end; 57 58 … … 62 63 { TRetransmitCheckThread } 63 64 64 TRetransmitCheckThread = class(T Thread)65 TRetransmitCheckThread = class(TMicroThread) 65 66 public 66 67 Parent: TCommProtocol; … … 94 95 Pin: TCommPin; 95 96 LastCommandResponseTime: TDateTime; 96 LastLatency: Double;97 LastLatency: TDateTime; 97 98 procedure SendCommand(Command: array of integer; 98 99 ResponseParameters: TVarBlockIndexed = nil; … … 149 150 if TestIndex(4) then CommandError := ReadVarUInt(4) 150 151 else CommandError := 0; 151 StopWatch.Stop; 152 Latency := StopWatch.ElapsedMiliseconds; 152 Latency := NowPrecise - TransmitTime; 153 153 ReceiveEvent.SetEvent; 154 154 finally … … 254 254 Parent := Self; 255 255 FreeOnTerminate := False; 256 Name := 'CommProtocol'; 256 257 Start; 257 258 end; … … 268 269 NewRequest: TVarBlockIndexed; 269 270 begin 270 Session := Sessions.GetFree;271 271 try 272 Session := TDeviceProtocolSession.Create; 273 Sessions.Add(Session); 272 274 NewRequest := TVarBlockIndexed.Create; 273 275 … … 276 278 try 277 279 Session.Lock.Acquire; 280 Session.SequenceNumber := Sessions.GetSequenceNumber; 278 281 CommandIndex.Clear; 279 282 CommandIndex.AddArray(Command); … … 290 293 291 294 // Wait for free remote buffer 292 repeat 293 Sleep(1); 294 until (RemoteBufferUsed + Request.Size) <= RemoteBufferSize; 295 296 StopWatch.Start; 297 TransmitTime := Now; 295 while (RemoteBufferUsed + Request.Size) > RemoteBufferSize do 296 MTSleep(1 * OneMillisecond); 297 298 //StopWatch.Start; 299 TransmitTime := NowPrecise; 298 300 Pin.Send(Request); 299 301 finally … … 301 303 end; 302 304 try 303 RemoteBufferUsed := RemoteBufferUsed + Request.Size; 304 while ReceiveEvent.WaitFor(10) = wrTimeout do begin 305 try 306 Sessions.Lock.Acquire; 307 RemoteBufferUsed := RemoteBufferUsed + Request.Size; 308 finally 309 Sessions.Lock.Release; 310 end; 311 while MTWaitForEvent(ReceiveEvent, 10 * OneMillisecond) = wrTimeout do begin 305 312 if Timeouted then 306 313 raise ECommTimeout.Create(SResponseTimeout); … … 311 318 raise ECommResponseCodeError.Create(Format(SResponseError, [CommandIndex.Implode('.', IntToStr), IntToStr(ResponseCode)])); 312 319 end; 313 LastCommandResponseTime := Now ;320 LastCommandResponseTime := NowPrecise; 314 321 LastLatency := Latency; 315 322 finally 316 RemoteBufferUsed := RemoteBufferUsed - Session.Request.Size; 317 if RemoteBufferUsed < 0 then RemoteBufferUsed := 0; 323 try 324 Sessions.Lock.Acquire; 325 RemoteBufferUsed := RemoteBufferUsed - Session.Request.Size; 326 if RemoteBufferUsed < 0 then RemoteBufferUsed := 0; 327 finally 328 Sessions.Lock.Release; 329 end; 318 330 Sessions.Remove(Session); 319 331 end; … … 332 344 MaxSessionCount := 10; 333 345 MaxSequenceNumber := 127; 334 RetransmitTimeout := 1 / 24 / 3600 * 1;346 RetransmitTimeout := 2 * OneSecond; 335 347 RetransmitRepeatCount := 3; 336 348 RetransmitTotalCount := 0; … … 350 362 constructor TDeviceProtocolSession.Create; 351 363 begin 352 StopWatch := TStopWatch.Create;353 364 ResponseCode := 0; 354 Lock := TCriticalSection.Create; 355 ReceiveEvent := TEvent.Create(nil, False, False, ''); 365 Lock := TMicroThreadCriticalSection.Create; 366 ReceiveEvent := TMicroThreadEvent.Create; 367 ReceiveEvent.AutoReset := False; 356 368 Request := TStreamHelper.Create; 357 369 ResponseParameters := nil; 358 370 CommandIndex := TListInteger.Create; 359 371 Latency := 0; 372 TransmitTime := 0; 360 373 end; 361 374 … … 366 379 ReceiveEvent.Free; 367 380 Lock.Free; 368 StopWatch.Free;369 381 inherited Destroy; 370 382 end; … … 372 384 { TDeviceProtocolSessionList } 373 385 374 function TDeviceProtocolSessionList.GetFree: TDeviceProtocolSession;386 procedure TDeviceProtocolSessionList.Add(Session: TDeviceProtocolSession); 375 387 begin 376 388 // Block if no free session available 377 Lock.Acquire;378 389 try 390 Lock.Acquire; 379 391 while Count >= Parent.MaxSessionCount do 380 392 begin 381 Lock.Release; 382 Sleep(1); 383 Lock.Acquire; 384 end; 385 Result := TDeviceProtocolSession.Create; 386 Result.SequenceNumber := SequenceNumber; 387 Inc(SequenceNumber); 388 if SequenceNumber > Parent.MaxSequenceNumber then 389 SequenceNumber := 0; 390 Add(Result); 393 try 394 Lock.Release; 395 MTSleep(1 * OneMillisecond); 396 finally 397 Lock.Acquire; 398 end; 399 end; 400 inherited Add(Session); 391 401 finally 392 402 Lock.Release; … … 421 431 begin 422 432 inherited Create; 423 Lock := T CriticalSection.Create;433 Lock := TMicroThreadCriticalSection.Create; 424 434 end; 425 435 … … 428 438 // Free session list before freeing Lock 429 439 // instead of freeing in inherited Destroy in TListObject 430 Lock.Acquire; 431 Clear; 432 Lock.Release; 440 try 441 Lock.Acquire; 442 Clear; 443 finally 444 Lock.Release; 445 end; 433 446 434 447 Lock.Free; … … 436 449 end; 437 450 451 function TDeviceProtocolSessionList.GetSequenceNumber: Integer; 452 begin 453 try 454 Lock.Acquire; 455 Inc(SequenceNumber); 456 if SequenceNumber > Parent.MaxSequenceNumber then 457 SequenceNumber := 0; 458 Result := SequenceNumber; 459 finally 460 Lock.Release; 461 end; 462 end; 463 438 464 { TRetransmitCheckThread } 439 465 440 466 procedure TRetransmitCheckThread.Execute; 441 467 var 442 I: integer; 443 begin 444 with Parent do repeat 445 try 446 try 447 Parent.Sessions.Lock.Acquire; 448 for I := 0 to Sessions.Count - 1 do begin 449 with TDeviceProtocolSession(Sessions[I]) do begin 450 try 451 Lock.Acquire; 452 if Now > (TransmitTime + RetransmitTimeout) then begin 453 if RepeatCounter < RetransmitRepeatCount then begin 454 Pin.Send(Request); 455 StopWatch.Start; 456 TransmitTime := Now; 457 Inc(RepeatCounter); 458 Inc(RetransmitTotalCount); 459 end else 460 Timeouted := True; 461 end; 462 finally 463 Lock.Release; 464 end; 468 I: Integer; 469 C: Integer; 470 Session: TDeviceProtocolSession; 471 begin 472 with Parent do 473 repeat 474 try 475 Parent.Sessions.Lock.Acquire; 476 I := 0; 477 while I < Sessions.Count do begin 478 Session := TDeviceProtocolSession(Sessions[I]); 479 with TDeviceProtocolSession(Sessions[I]) do begin 480 try 481 Session.Lock.Acquire; 482 if (TransmitTime > 0) and (NowPrecise > (TransmitTime + RetransmitTimeout)) then begin 483 if RepeatCounter < RetransmitRepeatCount then begin 484 Pin.Send(Request); 485 TransmitTime := NowPrecise; 486 Inc(RepeatCounter); 487 Inc(RetransmitTotalCount); 488 end else 489 Timeouted := True; 465 490 end; 491 finally 492 Session.Lock.Release; 466 493 end; 467 finally 468 Parent.Sessions.Lock.Release; 469 end; 470 471 if not Terminated then 472 Sleep(CheckPeriod); 473 except 474 on E: Exception do begin 475 if Assigned(ExceptionHandler) then ExceptionHandler(Self, E); 476 end; 494 end; 495 Inc(I); 477 496 end; 478 until Terminated; 497 finally 498 Parent.Sessions.Lock.Release; 499 end; 500 501 if not Terminated then 502 MTSleep(CheckPeriod * OneMillisecond); 503 until Terminated; 479 504 end; 480 505 -
PinConnection/UCommSerialPort.pas
r119 r163 6 6 7 7 uses 8 Classes, USerialPort, UCommPin, SysUtils; 8 Classes, USerialPort, UCommPin, SysUtils, UMicroThreading, DateUtils, 9 SyncObjs; 9 10 10 11 type … … 14 15 procedure ReceiveData(Stream: TMemoryStream); 15 16 public 17 Lock: TMicroThreadCriticalSection; 16 18 Pin: TCommPin; 17 19 destructor Destroy; override; … … 33 35 begin 34 36 inherited; 37 Lock := TMicroThreadCriticalSection.Create; 35 38 Pin := TCommPin.Create; 36 39 Pin.OnReceive := Receive; … … 42 45 OnReceiveData := nil; 43 46 Pin.Free; 47 Lock.Free; 44 48 inherited; 45 49 end; … … 49 53 Stream.Position := 0; 50 54 repeat 51 SendStreamRaw(Stream); 52 Sleep(1); 55 try 56 Lock.Acquire; 57 SendStreamRaw(Stream); 58 finally 59 Lock.Release; 60 end; 61 MTSleep(1 * OneMillisecond); 53 62 until Stream.Position = Stream.Size; 54 63 end; -
PinConnection/UCommSocket.pas
r129 r163 6 6 7 7 uses 8 Classes, SysUtils, blcksock, UCommPin, UCommon; 8 Classes, SysUtils, blcksock, UCommPin, UCommon, UMicroThreading, 9 DateUtils; 9 10 10 11 type … … 15 16 { TCommSocketReceiveThread } 16 17 17 TCommSocketReceiveThread = class(T Thread)18 TCommSocketReceiveThread = class(TMicroThread) 18 19 public 19 20 Parent: TCommSocket; … … 93 94 InBufferUsed := 0; 94 95 with Parent do repeat 95 try96 if InBufferUsed = 0 then Sleep(1);96 if InBufferUsed = 0 then MTSleep(1 * OneMillisecond) 97 else Yield; 97 98 if Assigned(Socket) then 98 99 with Socket do 99 if CanRead( 100) then begin100 if CanRead(0) then begin 100 101 InBufferUsed := WaitingData; 101 102 if InBufferUsed > 0 then begin … … 109 110 end else InBufferUsed := 0; 110 111 end else InBufferUsed := 0; 111 except112 on E: Exception do113 if Assigned(ExceptionHandler) then ExceptionHandler(Self, E);114 end;115 112 until Terminated; 116 113 end; -
PinConnection/UCommThread.pas
r129 r163 6 6 7 7 uses 8 Classes, SysUtils, blcksock, UCommPin, SyncObjs, UStreamHelper, UCommon; 8 Classes, SysUtils, blcksock, UCommPin, SyncObjs, UStreamHelper, UCommon, 9 UMicroThreading, DateUtils; 9 10 10 11 type … … 15 16 { TCommThreadReceiveThread } 16 17 17 TCommThreadReceiveThread = class(T Thread)18 TCommThreadReceiveThread = class(TMicroThread) 18 19 public 19 20 Parent: TCommThread; … … 33 34 FReceiveThread: TCommThreadReceiveThread; 34 35 FInputBuffer: TMemoryStream; 35 FInputBufferLock: T CriticalSection;36 FDataAvailable: T Event;36 FInputBufferLock: TMicroThreadCriticalSection; 37 FDataAvailable: TMicroThreadEvent; 37 38 procedure ReceiveData(Sender: TCommPin; Stream: TStream); 38 39 procedure ExtReceiveData(Sender: TCommPin; Stream: TStream); … … 91 92 inherited Create; 92 93 FInputBuffer := TMemoryStream.Create; 93 FInputBufferLock := T CriticalSection.Create;94 FInputBufferLock := TMicroThreadCriticalSection.Create; 94 95 Ext := TCommPin.Create; 95 96 Ext.OnReceive := ExtReceiveData; 96 97 Pin := TCommPin.Create; 97 98 Pin.OnReceive := ReceiveData; 98 FDataAvailable := T Event.Create(nil, False, False, '');99 FDataAvailable := TMicroThreadEvent.Create; 99 100 end; 100 101 … … 119 120 StreamHelper := TStreamHelper.Create(Stream); 120 121 with Parent do repeat 121 try 122 if FDataAvailable.WaitFor(1) = wrSignaled then try 122 if FDataAvailable.WaitFor(1 * OneMillisecond) = wrSignaled then try 123 123 FInputBufferLock.Acquire; 124 124 Stream.Size := 0; … … 129 129 FInputBufferLock.Release; 130 130 end; 131 except 132 on E: Exception do 133 if Assigned(ExceptionHandler) then ExceptionHandler(Self, E); 134 end; 131 Yield; 135 132 until Terminated; 136 133 -
PinConnection/UPacketBurst.pas
r129 r163 6 6 7 7 uses 8 Classes, UCommPin, SyncObjs, UStreamHelper, UCommon, SysUtils; 8 Classes, UCommPin, SyncObjs, UStreamHelper, UCommon, SysUtils, 9 UMicroThreading, DateUtils; 9 10 10 11 type 11 12 TPacketBurst = class; 12 13 13 TPacketBurstSendThread = class(T Thread)14 TPacketBurstSendThread = class(TMicroThread) 14 15 PacketBurst: TPacketBurst; 15 16 procedure Execute; override; 16 17 end; 17 18 19 { TPacketBurst } 20 18 21 TPacketBurst = class 19 22 private 20 SendThreadEvent: TEvent; 23 FActive: Boolean; 24 SendThreadEvent: TMicroThreadEvent; 21 25 SendThread: TPacketBurstSendThread; 22 26 SendStreamLock: TCriticalSection; … … 25 29 procedure PacketSingleReceive(Sender: TCommPin; Stream: TStream); 26 30 procedure PacketBurstReceive(Sender: TCommPin; Stream: TStream); 31 procedure SetActive(const AValue: Boolean); 27 32 public 28 SendPeriod: Integer; // ms33 SendPeriod: TDateTime; 29 34 SendBurstSize: Integer; 30 35 PacketSinglePin: TCommPin; … … 32 37 destructor Destroy; override; 33 38 constructor Create; 39 property Active: Boolean read FActive write SetActive; 34 40 end; 35 41 … … 44 50 PacketBurstPin := TCommPin.Create; 45 51 PacketBurstPin.OnReceive := PacketBurstReceive; 46 SendThread := TPacketBurstSendThread.Create(True); 47 SendThread.PacketBurst := Self; 48 SendThread.Start; 52 SendThread := TMicroThreadEvent.Create; 53 SendPeriod := OneMillisecond; 49 54 end; 50 55 51 56 destructor TPacketBurst.Destroy; 52 57 begin 53 SendThread.Free; 58 Active := False; 59 SendThreadEvent.Free; 54 60 PacketSinglePin.Free; 55 61 PacketBurstPin.Free; … … 79 85 end; 80 86 87 procedure TPacketBurst.SetActive(const AValue: Boolean); 88 begin 89 if FActive = AValue then Exit; 90 FActive := AValue; 91 if AValue then begin 92 SendThread := TPacketBurstSendThread.Create(True); 93 SendThread.FreeOnTerminate := False; 94 SendThread.PacketBurst := Self; 95 SendThread.Name := 'PacketBurst'; 96 SendThread.Start; 97 end else begin 98 FreeAndNil(SendThread); 99 end; 100 end; 101 81 102 procedure TPacketBurst.PacketSingleReceive(Sender: TCommPin; Stream: TStream); 82 103 var … … 101 122 Stream: TStreamHelper; 102 123 begin 103 inherited;104 try105 124 try 106 125 Stream := TStreamHelper.Create; … … 127 146 Stream.Free; 128 147 end; 129 except130 on E: Exception do131 if Assigned(ExceptionHandler) then ExceptionHandler(Self, E);132 end;133 148 end; 134 149 -
PinConnection/USerialPort.pas
r129 r163 6 6 7 7 uses 8 Classes, SysUtils, SynaSer, StdCtrls, Dialogs, UCommon; 8 Classes, SysUtils, SynaSer, StdCtrls, Dialogs, UCommon, UMicroThreading, 9 DateUtils; 9 10 10 11 type … … 22 23 { TSerialPortReceiveThread } 23 24 24 TSerialPortReceiveThread = class(T Thread)25 TSerialPortReceiveThread = class(TMicroThread) 25 26 public 26 27 Parent: TSerialPort; … … 154 155 FReceiveThread.FreeOnTerminate := False; 155 156 FReceiveThread.Parent := Self; 157 FReceiveThread.Name := 'SerialPort'; 156 158 FReceiveThread.Start; 157 159 end; … … 290 292 InBufferUsed := 0; 291 293 with Parent do repeat 292 try293 if InBufferUsed = 0 then Sleep(1);294 if InBufferUsed = 0 then MTSleep(1 * OneMillisecond) 295 else Yield; 294 296 if Active then begin 295 297 InBufferUsed := WaitingData; … … 305 307 end else InBufferUsed := 0; 306 308 end else InBufferUsed := 0; 307 except308 on E: Exception do309 if Assigned(ExceptionHandler) then ExceptionHandler(Self, E);310 end;311 309 until Terminated; 312 310 end;
Note:
See TracChangeset
for help on using the changeset viewer.