Changeset 163 for PinConnection/UCommProtocol.pas
- Timestamp:
- Feb 8, 2011, 11:02:02 AM (13 years ago)
- Location:
- PinConnection
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
PinConnection
- Property svn:ignore
-
old new 1 1 lib 2 backup
-
- Property svn:ignore
-
PinConnection/UCommProtocol.pas
r129 r163 6 6 7 7 uses 8 Classes, SysUtils, UVarBlockSerializer, syncobjs, UCommPin, 9 UDebugLog, UStreamHelper, StopWatch, SpecializedList, UCommon; 8 Classes, SysUtils, UVarBlockSerializer, syncobjs, UCommPin, UMicroThreading, 9 UDebugLog, UStreamHelper, StopWatch, SpecializedList, UCommon, UPlatform, 10 DateUtils; 10 11 11 12 type … … 25 26 private 26 27 RepeatCounter: integer; 27 ReceiveEvent: T Event;28 ReceiveEvent: TMicroThreadEvent; 28 29 Request: TStreamHelper; 29 30 ResponseParameters: TVarBlockIndexed; 30 31 TransmitTime: TDateTime; 31 StopWatch: TStopWatch;32 32 public 33 Lock: T CriticalSection;33 Lock: TMicroThreadCriticalSection; 34 34 SequenceNumber: Integer; 35 35 ResponseCode: Integer; … … 38 38 Timeouted: Boolean; 39 39 CommandIndex: TListInteger; 40 Latency: Double;40 Latency: TDateTime; 41 41 constructor Create; 42 42 destructor Destroy; override; … … 48 48 SequenceNumber: integer; 49 49 Parent: TCommProtocol; 50 Lock: T CriticalSection;51 function GetFree: TDeviceProtocolSession;50 Lock: TMicroThreadCriticalSection; 51 procedure Add(Session: TDeviceProtocolSession); 52 52 function GetBySequence(Sequence: integer): TDeviceProtocolSession; 53 53 procedure Remove(Session: TDeviceProtocolSession); 54 54 constructor Create; 55 55 destructor Destroy; override; 56 function GetSequenceNumber: Integer; 56 57 end; 57 58 … … 62 63 { TRetransmitCheckThread } 63 64 64 TRetransmitCheckThread = class(T Thread)65 TRetransmitCheckThread = class(TMicroThread) 65 66 public 66 67 Parent: TCommProtocol; … … 94 95 Pin: TCommPin; 95 96 LastCommandResponseTime: TDateTime; 96 LastLatency: Double;97 LastLatency: TDateTime; 97 98 procedure SendCommand(Command: array of integer; 98 99 ResponseParameters: TVarBlockIndexed = nil; … … 149 150 if TestIndex(4) then CommandError := ReadVarUInt(4) 150 151 else CommandError := 0; 151 StopWatch.Stop; 152 Latency := StopWatch.ElapsedMiliseconds; 152 Latency := NowPrecise - TransmitTime; 153 153 ReceiveEvent.SetEvent; 154 154 finally … … 254 254 Parent := Self; 255 255 FreeOnTerminate := False; 256 Name := 'CommProtocol'; 256 257 Start; 257 258 end; … … 268 269 NewRequest: TVarBlockIndexed; 269 270 begin 270 Session := Sessions.GetFree;271 271 try 272 Session := TDeviceProtocolSession.Create; 273 Sessions.Add(Session); 272 274 NewRequest := TVarBlockIndexed.Create; 273 275 … … 276 278 try 277 279 Session.Lock.Acquire; 280 Session.SequenceNumber := Sessions.GetSequenceNumber; 278 281 CommandIndex.Clear; 279 282 CommandIndex.AddArray(Command); … … 290 293 291 294 // Wait for free remote buffer 292 repeat 293 Sleep(1); 294 until (RemoteBufferUsed + Request.Size) <= RemoteBufferSize; 295 296 StopWatch.Start; 297 TransmitTime := Now; 295 while (RemoteBufferUsed + Request.Size) > RemoteBufferSize do 296 MTSleep(1 * OneMillisecond); 297 298 //StopWatch.Start; 299 TransmitTime := NowPrecise; 298 300 Pin.Send(Request); 299 301 finally … … 301 303 end; 302 304 try 303 RemoteBufferUsed := RemoteBufferUsed + Request.Size; 304 while ReceiveEvent.WaitFor(10) = wrTimeout do begin 305 try 306 Sessions.Lock.Acquire; 307 RemoteBufferUsed := RemoteBufferUsed + Request.Size; 308 finally 309 Sessions.Lock.Release; 310 end; 311 while MTWaitForEvent(ReceiveEvent, 10 * OneMillisecond) = wrTimeout do begin 305 312 if Timeouted then 306 313 raise ECommTimeout.Create(SResponseTimeout); … … 311 318 raise ECommResponseCodeError.Create(Format(SResponseError, [CommandIndex.Implode('.', IntToStr), IntToStr(ResponseCode)])); 312 319 end; 313 LastCommandResponseTime := Now ;320 LastCommandResponseTime := NowPrecise; 314 321 LastLatency := Latency; 315 322 finally 316 RemoteBufferUsed := RemoteBufferUsed - Session.Request.Size; 317 if RemoteBufferUsed < 0 then RemoteBufferUsed := 0; 323 try 324 Sessions.Lock.Acquire; 325 RemoteBufferUsed := RemoteBufferUsed - Session.Request.Size; 326 if RemoteBufferUsed < 0 then RemoteBufferUsed := 0; 327 finally 328 Sessions.Lock.Release; 329 end; 318 330 Sessions.Remove(Session); 319 331 end; … … 332 344 MaxSessionCount := 10; 333 345 MaxSequenceNumber := 127; 334 RetransmitTimeout := 1 / 24 / 3600 * 1;346 RetransmitTimeout := 2 * OneSecond; 335 347 RetransmitRepeatCount := 3; 336 348 RetransmitTotalCount := 0; … … 350 362 constructor TDeviceProtocolSession.Create; 351 363 begin 352 StopWatch := TStopWatch.Create;353 364 ResponseCode := 0; 354 Lock := TCriticalSection.Create; 355 ReceiveEvent := TEvent.Create(nil, False, False, ''); 365 Lock := TMicroThreadCriticalSection.Create; 366 ReceiveEvent := TMicroThreadEvent.Create; 367 ReceiveEvent.AutoReset := False; 356 368 Request := TStreamHelper.Create; 357 369 ResponseParameters := nil; 358 370 CommandIndex := TListInteger.Create; 359 371 Latency := 0; 372 TransmitTime := 0; 360 373 end; 361 374 … … 366 379 ReceiveEvent.Free; 367 380 Lock.Free; 368 StopWatch.Free;369 381 inherited Destroy; 370 382 end; … … 372 384 { TDeviceProtocolSessionList } 373 385 374 function TDeviceProtocolSessionList.GetFree: TDeviceProtocolSession;386 procedure TDeviceProtocolSessionList.Add(Session: TDeviceProtocolSession); 375 387 begin 376 388 // Block if no free session available 377 Lock.Acquire;378 389 try 390 Lock.Acquire; 379 391 while Count >= Parent.MaxSessionCount do 380 392 begin 381 Lock.Release; 382 Sleep(1); 383 Lock.Acquire; 384 end; 385 Result := TDeviceProtocolSession.Create; 386 Result.SequenceNumber := SequenceNumber; 387 Inc(SequenceNumber); 388 if SequenceNumber > Parent.MaxSequenceNumber then 389 SequenceNumber := 0; 390 Add(Result); 393 try 394 Lock.Release; 395 MTSleep(1 * OneMillisecond); 396 finally 397 Lock.Acquire; 398 end; 399 end; 400 inherited Add(Session); 391 401 finally 392 402 Lock.Release; … … 421 431 begin 422 432 inherited Create; 423 Lock := T CriticalSection.Create;433 Lock := TMicroThreadCriticalSection.Create; 424 434 end; 425 435 … … 428 438 // Free session list before freeing Lock 429 439 // instead of freeing in inherited Destroy in TListObject 430 Lock.Acquire; 431 Clear; 432 Lock.Release; 440 try 441 Lock.Acquire; 442 Clear; 443 finally 444 Lock.Release; 445 end; 433 446 434 447 Lock.Free; … … 436 449 end; 437 450 451 function TDeviceProtocolSessionList.GetSequenceNumber: Integer; 452 begin 453 try 454 Lock.Acquire; 455 Inc(SequenceNumber); 456 if SequenceNumber > Parent.MaxSequenceNumber then 457 SequenceNumber := 0; 458 Result := SequenceNumber; 459 finally 460 Lock.Release; 461 end; 462 end; 463 438 464 { TRetransmitCheckThread } 439 465 440 466 procedure TRetransmitCheckThread.Execute; 441 467 var 442 I: integer; 443 begin 444 with Parent do repeat 445 try 446 try 447 Parent.Sessions.Lock.Acquire; 448 for I := 0 to Sessions.Count - 1 do begin 449 with TDeviceProtocolSession(Sessions[I]) do begin 450 try 451 Lock.Acquire; 452 if Now > (TransmitTime + RetransmitTimeout) then begin 453 if RepeatCounter < RetransmitRepeatCount then begin 454 Pin.Send(Request); 455 StopWatch.Start; 456 TransmitTime := Now; 457 Inc(RepeatCounter); 458 Inc(RetransmitTotalCount); 459 end else 460 Timeouted := True; 461 end; 462 finally 463 Lock.Release; 464 end; 468 I: Integer; 469 C: Integer; 470 Session: TDeviceProtocolSession; 471 begin 472 with Parent do 473 repeat 474 try 475 Parent.Sessions.Lock.Acquire; 476 I := 0; 477 while I < Sessions.Count do begin 478 Session := TDeviceProtocolSession(Sessions[I]); 479 with TDeviceProtocolSession(Sessions[I]) do begin 480 try 481 Session.Lock.Acquire; 482 if (TransmitTime > 0) and (NowPrecise > (TransmitTime + RetransmitTimeout)) then begin 483 if RepeatCounter < RetransmitRepeatCount then begin 484 Pin.Send(Request); 485 TransmitTime := NowPrecise; 486 Inc(RepeatCounter); 487 Inc(RetransmitTotalCount); 488 end else 489 Timeouted := True; 465 490 end; 491 finally 492 Session.Lock.Release; 466 493 end; 467 finally 468 Parent.Sessions.Lock.Release; 469 end; 470 471 if not Terminated then 472 Sleep(CheckPeriod); 473 except 474 on E: Exception do begin 475 if Assigned(ExceptionHandler) then ExceptionHandler(Self, E); 476 end; 494 end; 495 Inc(I); 477 496 end; 478 until Terminated; 497 finally 498 Parent.Sessions.Lock.Release; 499 end; 500 501 if not Terminated then 502 MTSleep(CheckPeriod * OneMillisecond); 503 until Terminated; 479 504 end; 480 505
Note:
See TracChangeset
for help on using the changeset viewer.