source: PinConnection/Protocols/CommProtocol1.pas

Last change on this file was 575, checked in by chronos, 5 months ago
  • Modified: Remove U prefix from unit names of PinConnection package.
File size: 16.8 KB
Line 
1unit CommProtocol1;
2
3interface
4
5uses
6 Classes, SysUtils, VarBlockSerializer, syncobjs, CommPin, Threading,
7 DebugLog, StreamHelper, StopWatch, SpecializedList, Common, DateUtils;
8
9type
10 ECommResponseCodeError = class(Exception);
11 ECommTimeout = class(Exception);
12 ECommError = class(Exception);
13 ENotActive = class(Exception);
14
15 TResponseError = (rcNone, rcCommandNotSupported, rcSequenceOutOfRange,
16 rcEWrongParameters, rcVarIntDecode, rcDropped);
17 TMessageType = (mtNone, mtRequest, mtResponse);
18
19 TCommProtocol = class;
20
21 { TDeviceProtocolSession }
22
23 TDeviceProtocolSession = class
24 private
25 RepeatCounter: integer;
26 ReceiveEvent: TSimpleEvent;
27 Request: TStreamHelper;
28 ResponseParameters: TVarBlockIndexed;
29 TransmitTime: TDateTime;
30 public
31 Lock: TCriticalSection;
32 Enabled: Boolean;
33 SequenceNumber: Integer;
34 ResponseCode: Integer;
35 CommandError: Integer;
36 RaiseError: Boolean;
37 Timeouted: Boolean;
38 CommandIndex: TListInteger;
39 Latency: TDateTime;
40 constructor Create;
41 destructor Destroy; override;
42 end;
43
44 { TDeviceProtocolSessionList }
45
46 TDeviceProtocolSessionList = class(TListObject)
47 private
48 function GetSequenceNumber: Integer;
49 public
50 SequenceNumber: integer;
51 MaxSequenceNumber: integer;
52 MaxSessionCount: integer;
53 Parent: TCommProtocol;
54 Lock: TCriticalSection;
55 procedure Add(Session: TDeviceProtocolSession);
56 function GetBySequence(Sequence: integer): TDeviceProtocolSession;
57 procedure Remove(Session: TDeviceProtocolSession);
58 constructor Create;
59 destructor Destroy; override;
60 procedure Assign(Source: TListObject);
61 end;
62
63 TAfterRequest = procedure(Command: TListInteger; Parameters: TVarBlockIndexed;
64 Result: TVarBlockIndexed; var ResponseError: TResponseError;
65 var CommandError: integer) of object;
66
67 { TRetransmitCheckThread }
68
69 TRetransmitCheckThread = class(TTermThread)
70 public
71 Parent: TCommProtocol;
72 CheckPeriod: Integer;
73 procedure Execute; override;
74 end;
75
76 { TRemoteBuffer }
77
78 TRemoteBuffer = class
79 Lock: TCriticalSection;
80 Size: Integer;
81 Used: Integer;
82 procedure Allocate(AValue: Integer);
83 procedure Release(AValue: Integer);
84 procedure Assign(Source: TRemoteBuffer);
85 constructor Create;
86 destructor Destroy; override;
87 end;
88
89 { TCommProtocol }
90
91 TCommProtocol = class
92 private
93 FActive: Boolean;
94 FOnAfterRequest: TAfterRequest;
95 FOnCommand: TAfterRequest;
96 FOnDebugLog: TDebugLogAddEvent;
97 OnAfterRequest: TAfterRequest;
98 RetransmitThread: TRetransmitCheckThread;
99 procedure HandleRequest(Stream: TStream);
100 procedure SetActive(const AValue: Boolean);
101 public
102 RetransmitTimeout: TDateTime;
103 RetransmitRepeatCount: integer;
104 RetransmitTotalCount: integer;
105 RemoteBuffer: TRemoteBuffer;
106 WrongSequenceCount: integer;
107 Sessions: TDeviceProtocolSessionList;
108 Pin: TCommPin;
109 LastCommandResponseTime: TDateTime;
110 LastLatency: TDateTime;
111 Lock: TCriticalSection;
112 procedure DataReceive(Sender: TCommPin; Stream: TStream); virtual;
113 procedure SendCommand(Command: array of integer;
114 ResponseParameters: TVarBlockIndexed = nil;
115 RequestParameters: TVarBlockIndexed = nil; ARaiseError: boolean = True);
116 constructor Create; virtual;
117 destructor Destroy; override;
118 procedure Assign(Source: TCommProtocol); virtual;
119 property OnAfterRequest: TAfterRequest read FOnAfterRequest write FOnAfterRequest;
120 property OnCommand: TAfterRequest read FOnCommand write FOnCommand;
121 property OnDebugLog: TDebugLogAddEvent read FOnDebugLog write FOnDebugLog;
122 property Active: Boolean read FActive write SetActive;
123 end;
124
125resourcestring
126 SRemoteBufferInconsistency = 'Remote buffer inconsistency';
127 SResponseError = 'Command %0:s response error %1:s';
128 SResponseTimeout = 'Response timeout';
129 SWrongSequenceCount = 'Receive wrong sequence number %d';
130 SDeviceProtocol = 'Device protocol';
131 SProtocolDecodeError = 'Data decode error';
132 SProtocolNotActive = 'Device protocol not active';
133
134
135implementation
136
137{ TRemoteBuffer }
138
139procedure TRemoteBuffer.Allocate(AValue: Integer);
140begin
141 try
142 Lock.Acquire;
143
144 // Wait for free remote buffer
145 while (Used + AValue) > Size do
146 try
147 Lock.Release;
148 Sleep(1);
149 finally
150 Lock.Acquire;
151 end;
152 Used := Used + AValue;
153 finally
154 Lock.Release;
155 end;
156end;
157
158procedure TRemoteBuffer.Release(AValue: Integer);
159begin
160 try
161 Lock.Acquire;
162 Used := Used - AValue;
163 if Used < 0 then
164 raise Exception.Create(SRemoteBufferInconsistency);
165 finally
166 Lock.Release;
167 end;
168end;
169
170procedure TRemoteBuffer.Assign(Source: TRemoteBuffer);
171begin
172 Used := Source.Used;
173 Size := Source.Size;
174end;
175
176constructor TRemoteBuffer.Create;
177begin
178 Lock := TCriticalSection.Create;
179 Size := 127;
180 Used := 0;
181end;
182
183destructor TRemoteBuffer.Destroy;
184begin
185 Lock.Free;
186 inherited Destroy;
187end;
188
189
190procedure TCommProtocol.DataReceive(Sender: TCommPin; Stream: TStream);
191var
192 ResponseSequenceNumber: Integer;
193 Session: TDeviceProtocolSession;
194 MessageType: Integer;
195 Request: TVarBlockIndexed;
196 TempStream: TMemoryStream;
197begin
198 try
199 TempStream := TMemoryStream.Create;
200 Request := TVarBlockIndexed.Create;
201 Request.Enclose := False;
202 with Request do
203 try
204 Stream.Position := 0;
205 ReadFromStream(Stream);
206 if TestIndex(0) then
207 MessageType := ReadVarUInt(0);
208 if MessageType = Integer(mtResponse) then begin
209 if TestIndex(1) then begin
210 ResponseSequenceNumber := ReadVarUInt(1);
211 try
212 Sessions.Lock.Acquire;
213 Session := Sessions.GetBySequence(ResponseSequenceNumber);
214 if Assigned(Session) then begin
215 with Session do try
216 Session.Lock.Acquire;
217 if TestIndex(2) and Assigned(ResponseParameters) then begin
218 ReadVarStream(2, TempStream);
219 ResponseParameters.Enclose := False;
220 ResponseParameters.ReadFromStream(TempStream);
221 end;
222 if TestIndex(3) then ResponseCode := ReadVarUInt(3)
223 else ResponseCode := 0;
224 if TestIndex(4) then CommandError := ReadVarUInt(4)
225 else CommandError := 0;
226 Latency := Now - TransmitTime;
227 Enabled := False;
228 ReceiveEvent.SetEvent;
229 finally
230 Session.Lock.Release;
231 end;
232 end else begin
233 Inc(WrongSequenceCount);
234 if Assigned(FOnDebugLog) then
235 FOnDebugLog(SDeviceProtocol, Format(SWrongSequenceCount, [ResponseSequenceNumber]));
236 end;
237 finally
238 Sessions.Lock.Release;
239 end;
240 end;
241 end else
242 if MessageType = Integer(mtRequest) then HandleRequest(Stream);
243 except
244 on EReadError do begin
245 if Assigned(FOnDebugLog) then
246 FOnDebugLog(SDeviceProtocol, SProtocolDecodeError);
247 end;
248 end;
249 finally
250 TempStream.Free;
251 Request.Free;
252 end;
253end;
254
255procedure TCommProtocol.HandleRequest(Stream: TStream);
256var
257 ResponseCode: TResponseError;
258 Response: TVarBlockIndexed;
259 ResponseData: TVarBlockIndexed;
260 RequestData: TVarBlockIndexed;
261 CommandIndex: TListInteger;
262 CommandStream: TVarBlockIndexed;
263 SequenceNumber: integer;
264 CommandError: integer;
265 MessageType: integer;
266 Command: TVarBlockIndexed;
267 TempStream: TMemoryStream;
268begin
269 try
270 TempStream := TMemoryStream.Create;
271 Command := TVarBlockIndexed.Create;
272 CommandIndex := TListInteger.Create;
273 Response := TVarBlockIndexed.Create;
274 ResponseData := TVarBlockIndexed.Create;
275 CommandStream := TVarBlockIndexed.Create;
276 RequestData := TVarBlockIndexed.Create;
277 ResponseCode := rcNone;
278 Command.Enclose := False;
279 Command.ReadFromStream(Stream);
280 with Command do begin
281 if TestIndex(0) then
282 MessageType := ReadVarUInt(0);
283 if TestIndex(1) then
284 SequenceNumber := ReadVarUInt(1);
285 if TestIndex(2) then
286 ReadVarUIntArray(2, CommandIndex);
287 if TestIndex(3) then
288 ReadVarIndexedBlock(3, RequestData);
289 if Assigned(FOnCommand) then
290 FOnCommand(CommandIndex, RequestData, ResponseData, ResponseCode, CommandError)
291 else
292 ResponseCode := rcCommandNotSupported;
293 end;
294 with Response do begin
295 Enclose := False;
296 WriteVarUInt(0, Integer(mtResponse));
297 WriteVarUInt(1, Integer(SequenceNumber));
298 if ResponseData.Items.Count > 0 then
299 WriteVarIndexedBlock(2, ResponseData);
300 WriteVarUInt(3, Integer(ResponseCode));
301 WriteVarUInt(4, Integer(CommandError));
302 WriteToStream(TempStream);
303 if ResponseCode <> rcDropped then
304 Pin.Send(TempStream);
305 end;
306
307 if Assigned(FOnAfterRequest) then
308 FOnAfterRequest(CommandIndex, RequestData, ResponseData,
309 ResponseCode, CommandError);
310 finally
311 TempStream.Free;
312 RequestData.Free;
313 CommandStream.Free;
314 ResponseData.Free;
315 Response.Free;
316 Command.Free;
317 CommandIndex.Free;
318 end;
319end;
320
321procedure TCommProtocol.SetActive(const AValue: Boolean);
322begin
323 if FActive = AValue then Exit;
324 FActive := AValue;
325 if AValue then begin
326 RetransmitThread := TRetransmitCheckThread.Create(True);
327 with RetransmitThread do begin
328 CheckPeriod := 100; // ms
329 Parent := Self;
330 FreeOnTerminate := False;
331 Name := 'CommProtocol';
332 Start;
333 end;
334 end else begin
335 // Wait for empty session list
336 try
337 Sessions.Lock.Acquire;
338 while Sessions.Count > 0 do
339 try
340 Sessions.Lock.Release;
341 Sleep(1);
342 finally
343 Sessions.Lock.Acquire;
344 end;
345 finally
346 Sessions.Lock.Release;
347 end;
348 FreeAndNil(RetransmitThread);
349 end;
350end;
351
352procedure TCommProtocol.SendCommand(Command: array of integer;
353 ResponseParameters: TVarBlockIndexed = nil; RequestParameters: TVarBlockIndexed = nil;
354 ARaiseError: boolean = True);
355var
356 Session: TDeviceProtocolSession;
357 NewRequest: TVarBlockIndexed;
358begin
359 if FActive then begin
360 try
361 //Lock.Acquire;
362 Session := TDeviceProtocolSession.Create;
363 Sessions.Add(Session);
364 NewRequest := TVarBlockIndexed.Create;
365
366 Session.ResponseParameters := ResponseParameters;
367 with Session do begin
368 try
369 Lock.Acquire;
370 CommandIndex.Clear;
371 CommandIndex.AddArray(Command);
372 with NewRequest do begin
373 Enclose := False;
374 WriteVarUInt(0, Integer(mtRequest));
375 WriteVarUInt(1, SequenceNumber);
376 WriteVarUIntArray(2, CommandIndex);
377 if Assigned(RequestParameters) then
378 WriteVarIndexedBlock(3, RequestParameters);
379 end;
380 RaiseError := ARaiseError;
381 NewRequest.WriteToStream(Request);
382
383 RemoteBuffer.Allocate(Request.Size);
384
385 //StopWatch.Start;
386 TransmitTime := Now;
387 Pin.Send(Request);
388 Enabled := True;
389 finally
390 Lock.Release;
391 end;
392 try
393 while ReceiveEvent.WaitFor(1) = wrTimeout do begin
394 if Timeouted then
395 raise ECommTimeout.Create(SResponseTimeout);
396 end;
397 if ResponseCode <> Integer(rcNone) then begin
398 if Assigned(FOnDebugLog) then
399 FOnDebugLog(SDeviceProtocol, Format(SResponseError, [CommandIndex.Implode('.', IntToStr), IntToStr(ResponseCode)]));
400 raise ECommResponseCodeError.Create(Format(SResponseError, [CommandIndex.Implode('.', IntToStr), IntToStr(ResponseCode)]));
401 end;
402 LastCommandResponseTime := Now;
403 LastLatency := Latency;
404 finally
405 RemoteBuffer.Release(Session.Request.Size);
406 Sessions.Remove(Session);
407 end;
408 end;
409 finally
410 NewRequest.Free;
411 //Lock.Free;
412 end;
413 end else raise ENotActive.Create(SProtocolNotActive);
414end;
415
416constructor TCommProtocol.Create;
417begin
418 RemoteBuffer := TRemoteBuffer.Create;
419 Lock := TCriticalSection.Create;
420 Pin := TCommPin.Create;
421 Pin.OnReceive := DataReceive;
422 Sessions := TDeviceProtocolSessionList.Create;
423 Sessions.Parent := Self;
424 RetransmitTimeout := 3 * OneSecond;
425 RetransmitRepeatCount := 3;
426 RetransmitTotalCount := 0;
427end;
428
429destructor TCommProtocol.Destroy;
430begin
431 Active := False;
432 Sessions.Free;
433 Pin.Free;
434 Lock.Free;
435 RemoteBuffer.Free;
436 inherited;
437end;
438
439procedure TCommProtocol.Assign(Source: TCommProtocol);
440begin
441 LastCommandResponseTime := Source.LastCommandResponseTime;
442 LastLatency := Source.LastLatency;
443 RemoteBuffer.Assign(Source.RemoteBuffer);
444 WrongSequenceCount := Source.WrongSequenceCount;
445 RetransmitTimeout := Source.RetransmitTimeout;
446 RetransmitRepeatCount := Source.RetransmitRepeatCount;
447 RetransmitTotalCount := Source.RetransmitTotalCount;
448 Pin.Connect(Source.Pin.RemotePin);
449 OnCommand := Source.OnCommand;
450 OnAfterRequest := Source.OnAfterRequest;
451 OnDebugLog := Source.OnDebugLog;
452 Sessions.Assign(Source.Sessions);
453 Active := Source.Active;
454end;
455
456{ TDeviceProtocolSession }
457
458constructor TDeviceProtocolSession.Create;
459begin
460 ResponseCode := 0;
461 Lock := TCriticalSection.Create;
462 ReceiveEvent := TSimpleEvent.Create;
463 //ReceiveEvent.ManualReset := True;
464 Request := TStreamHelper.Create;
465 ResponseParameters := nil;
466 CommandIndex := TListInteger.Create;
467 Latency := 0;
468 TransmitTime := 0;
469end;
470
471destructor TDeviceProtocolSession.Destroy;
472begin
473 CommandIndex.Free;
474 Request.Free;
475 ReceiveEvent.Free;
476 Lock.Free;
477 inherited Destroy;
478end;
479
480{ TDeviceProtocolSessionList }
481
482procedure TDeviceProtocolSessionList.Add(Session: TDeviceProtocolSession);
483begin
484 // Block if no free session available
485 try
486 Lock.Acquire;
487 Session.SequenceNumber := GetSequenceNumber;
488 while Count >= MaxSessionCount do
489 begin
490 try
491 Lock.Release;
492 Sleep(1);
493 finally
494 Lock.Acquire;
495 end;
496 end;
497 inherited Add(Session);
498 finally
499 Lock.Release;
500 end;
501end;
502
503function TDeviceProtocolSessionList.GetBySequence(Sequence: integer):
504TDeviceProtocolSession;
505var
506 I: integer;
507begin
508 I := 0;
509 while (I < Count) and (TDeviceProtocolSession(Items[I]).SequenceNumber <> Sequence) do
510 Inc(I);
511 if I < Count then
512 Result := TDeviceProtocolSession(Items[I])
513 else
514 Result := nil;
515end;
516
517procedure TDeviceProtocolSessionList.Remove(Session: TDeviceProtocolSession);
518begin
519 try
520 Lock.Acquire;
521 inherited Remove(TObject(Session));
522 finally
523 Lock.Release;
524 end;
525end;
526
527constructor TDeviceProtocolSessionList.Create;
528begin
529 inherited Create;
530 Lock := TCriticalSection.Create;
531 MaxSessionCount := 100;
532 MaxSequenceNumber := 127;
533end;
534
535destructor TDeviceProtocolSessionList.Destroy;
536begin
537 // Free session list before freeing Lock
538 // instead of freeing in inherited Destroy in TListObject
539 try
540// Lock.Acquire;
541 Clear;
542 finally
543// Lock.Release;
544 end;
545
546 Lock.Free;
547 inherited;
548end;
549
550procedure TDeviceProtocolSessionList.Assign(Source: TListObject);
551begin
552 MaxSequenceNumber := TDeviceProtocolSessionList(Source).MaxSequenceNumber;
553 MaxSessionCount := TDeviceProtocolSessionList(Source).MaxSessionCount;
554 SequenceNumber := TDeviceProtocolSessionList(Source).SequenceNumber;
555
556 inherited;
557end;
558
559function TDeviceProtocolSessionList.GetSequenceNumber: Integer;
560begin
561 try
562 //Lock.Acquire;
563 Inc(SequenceNumber);
564 if SequenceNumber > MaxSequenceNumber then
565 SequenceNumber := 0;
566 Result := SequenceNumber;
567 finally
568 //Lock.Release;
569 end;
570end;
571
572{ TRetransmitCheckThread }
573
574procedure TRetransmitCheckThread.Execute;
575var
576 I: Integer;
577 C: Integer;
578 Session: TDeviceProtocolSession;
579begin
580 with Parent do
581 repeat
582 try
583 Parent.Sessions.Lock.Acquire;
584 I := 0;
585 while I < Sessions.Count do begin
586 Session := TDeviceProtocolSession(Sessions[I]);
587 with Session do
588 if Enabled then begin
589 try
590 Session.Lock.Acquire;
591 if (TransmitTime > 0) and (Now > (TransmitTime + RetransmitTimeout)) then begin
592 if RepeatCounter < RetransmitRepeatCount then begin
593 Pin.Send(Request);
594 TransmitTime := Now;
595 Inc(RepeatCounter);
596 Inc(RetransmitTotalCount);
597 end else
598 Timeouted := True;
599 end;
600 finally
601 Session.Lock.Release;
602 end;
603 end;
604 Inc(I);
605 end;
606 finally
607 Parent.Sessions.Lock.Release;
608 end;
609
610 if not Terminated then
611 Sleep(CheckPeriod);
612 until Terminated;
613end;
614
615end.
616
Note: See TracBrowser for help on using the repository browser.