source: PinConnection/Protocols/UCommProtocol1.pas

Last change on this file was 482, checked in by chronos, 8 years ago
  • Moved: Application specific protocols moved to Protocols subdirectory as code example.
File size: 16.8 KB
Line 
1unit UCommProtocol1;
2
3{$mode delphi}
4
5interface
6
7uses
8 Classes, SysUtils, UVarBlockSerializer, syncobjs, UCommPin, UThreading,
9 UDebugLog, UStreamHelper, StopWatch, SpecializedList, UCommon,
10 DateUtils;
11
12type
13 ECommResponseCodeError = class(Exception);
14 ECommTimeout = class(Exception);
15 ECommError = class(Exception);
16 ENotActive = class(Exception);
17
18 TResponseError = (rcNone, rcCommandNotSupported, rcSequenceOutOfRange,
19 rcEWrongParameters, rcVarIntDecode, rcDropped);
20 TMessageType = (mtNone, mtRequest, mtResponse);
21
22 TCommProtocol = class;
23
24 { TDeviceProtocolSession }
25
26 TDeviceProtocolSession = class
27 private
28 RepeatCounter: integer;
29 ReceiveEvent: TSimpleEvent;
30 Request: TStreamHelper;
31 ResponseParameters: TVarBlockIndexed;
32 TransmitTime: TDateTime;
33 public
34 Lock: TCriticalSection;
35 Enabled: Boolean;
36 SequenceNumber: Integer;
37 ResponseCode: Integer;
38 CommandError: Integer;
39 RaiseError: Boolean;
40 Timeouted: Boolean;
41 CommandIndex: TListInteger;
42 Latency: TDateTime;
43 constructor Create;
44 destructor Destroy; override;
45 end;
46
47 { TDeviceProtocolSessionList }
48
49 TDeviceProtocolSessionList = class(TListObject)
50 private
51 function GetSequenceNumber: Integer;
52 public
53 SequenceNumber: integer;
54 MaxSequenceNumber: integer;
55 MaxSessionCount: integer;
56 Parent: TCommProtocol;
57 Lock: TCriticalSection;
58 procedure Add(Session: TDeviceProtocolSession);
59 function GetBySequence(Sequence: integer): TDeviceProtocolSession;
60 procedure Remove(Session: TDeviceProtocolSession);
61 constructor Create;
62 destructor Destroy; override;
63 procedure Assign(Source: TListObject);
64 end;
65
66 TAfterRequest = procedure(Command: TListInteger; Parameters: TVarBlockIndexed;
67 Result: TVarBlockIndexed; var ResponseError: TResponseError;
68 var CommandError: integer) of object;
69
70 { TRetransmitCheckThread }
71
72 TRetransmitCheckThread = class(TTermThread)
73 public
74 Parent: TCommProtocol;
75 CheckPeriod: Integer;
76 procedure Execute; override;
77 end;
78
79 { TRemoteBuffer }
80
81 TRemoteBuffer = class
82 Lock: TCriticalSection;
83 Size: Integer;
84 Used: Integer;
85 procedure Allocate(AValue: Integer);
86 procedure Release(AValue: Integer);
87 procedure Assign(Source: TRemoteBuffer);
88 constructor Create;
89 destructor Destroy; override;
90 end;
91
92 { TCommProtocol }
93
94 TCommProtocol = class
95 private
96 FActive: Boolean;
97 FOnAfterRequest: TAfterRequest;
98 FOnCommand: TAfterRequest;
99 FOnDebugLog: TDebugLogAddEvent;
100 OnAfterRequest: TAfterRequest;
101 RetransmitThread: TRetransmitCheckThread;
102 procedure HandleRequest(Stream: TStream);
103 procedure SetActive(const AValue: Boolean);
104 public
105 RetransmitTimeout: TDateTime;
106 RetransmitRepeatCount: integer;
107 RetransmitTotalCount: integer;
108 RemoteBuffer: TRemoteBuffer;
109 WrongSequenceCount: integer;
110 Sessions: TDeviceProtocolSessionList;
111 Pin: TCommPin;
112 LastCommandResponseTime: TDateTime;
113 LastLatency: TDateTime;
114 Lock: TCriticalSection;
115 procedure DataReceive(Sender: TCommPin; Stream: TStream); virtual;
116 procedure SendCommand(Command: array of integer;
117 ResponseParameters: TVarBlockIndexed = nil;
118 RequestParameters: TVarBlockIndexed = nil; ARaiseError: boolean = True);
119 constructor Create; virtual;
120 destructor Destroy; override;
121 procedure Assign(Source: TCommProtocol); virtual;
122 property OnAfterRequest: TAfterRequest read FOnAfterRequest write FOnAfterRequest;
123 property OnCommand: TAfterRequest read FOnCommand write FOnCommand;
124 property OnDebugLog: TDebugLogAddEvent read FOnDebugLog write FOnDebugLog;
125 property Active: Boolean read FActive write SetActive;
126 end;
127
128resourcestring
129 SRemoteBufferInconsistency = 'Remote buffer inconsistency';
130 SResponseError = 'Command %0:s response error %1:s';
131 SResponseTimeout = 'Response timeout';
132 SWrongSequenceCount = 'Receive wrong sequence number %d';
133 SDeviceProtocol = 'Device protocol';
134 SProtocolDecodeError = 'Data decode error';
135 SProtocolNotActive = 'Device protocol not active';
136
137implementation
138
139{ TRemoteBuffer }
140
141procedure TRemoteBuffer.Allocate(AValue: Integer);
142begin
143 try
144 Lock.Acquire;
145
146 // Wait for free remote buffer
147 while (Used + AValue) > Size do
148 try
149 Lock.Release;
150 Sleep(1);
151 finally
152 Lock.Acquire;
153 end;
154 Used := Used + AValue;
155 finally
156 Lock.Release;
157 end;
158end;
159
160procedure TRemoteBuffer.Release(AValue: Integer);
161begin
162 try
163 Lock.Acquire;
164 Used := Used - AValue;
165 if Used < 0 then
166 raise Exception.Create(SRemoteBufferInconsistency);
167 finally
168 Lock.Release;
169 end;
170end;
171
172procedure TRemoteBuffer.Assign(Source: TRemoteBuffer);
173begin
174 Used := Source.Used;
175 Size := Source.Size;
176end;
177
178constructor TRemoteBuffer.Create;
179begin
180 Lock := TCriticalSection.Create;
181 Size := 127;
182 Used := 0;
183end;
184
185destructor TRemoteBuffer.Destroy;
186begin
187 Lock.Free;
188 inherited Destroy;
189end;
190
191
192procedure TCommProtocol.DataReceive(Sender: TCommPin; Stream: TStream);
193var
194 ResponseSequenceNumber: Integer;
195 Session: TDeviceProtocolSession;
196 MessageType: Integer;
197 Request: TVarBlockIndexed;
198 TempStream: TMemoryStream;
199begin
200 try
201 TempStream := TMemoryStream.Create;
202 Request := TVarBlockIndexed.Create;
203 Request.Enclose := False;
204 with Request do
205 try
206 Stream.Position := 0;
207 ReadFromStream(Stream);
208 if TestIndex(0) then
209 MessageType := ReadVarUInt(0);
210 if MessageType = Integer(mtResponse) then begin
211 if TestIndex(1) then begin
212 ResponseSequenceNumber := ReadVarUInt(1);
213 try
214 Sessions.Lock.Acquire;
215 Session := Sessions.GetBySequence(ResponseSequenceNumber);
216 if Assigned(Session) then begin
217 with Session do try
218 Session.Lock.Acquire;
219 if TestIndex(2) and Assigned(ResponseParameters) then begin
220 ReadVarStream(2, TempStream);
221 ResponseParameters.Enclose := False;
222 ResponseParameters.ReadFromStream(TempStream);
223 end;
224 if TestIndex(3) then ResponseCode := ReadVarUInt(3)
225 else ResponseCode := 0;
226 if TestIndex(4) then CommandError := ReadVarUInt(4)
227 else CommandError := 0;
228 Latency := Now - TransmitTime;
229 Enabled := False;
230 ReceiveEvent.SetEvent;
231 finally
232 Session.Lock.Release;
233 end;
234 end else begin
235 Inc(WrongSequenceCount);
236 if Assigned(FOnDebugLog) then
237 FOnDebugLog(SDeviceProtocol, Format(SWrongSequenceCount, [ResponseSequenceNumber]));
238 end;
239 finally
240 Sessions.Lock.Release;
241 end;
242 end;
243 end else
244 if MessageType = Integer(mtRequest) then HandleRequest(Stream);
245 except
246 on EReadError do begin
247 if Assigned(FOnDebugLog) then
248 FOnDebugLog(SDeviceProtocol, SProtocolDecodeError);
249 end;
250 end;
251 finally
252 TempStream.Free;
253 Request.Free;
254 end;
255end;
256
257procedure TCommProtocol.HandleRequest(Stream: TStream);
258var
259 ResponseCode: TResponseError;
260 Response: TVarBlockIndexed;
261 ResponseData: TVarBlockIndexed;
262 RequestData: TVarBlockIndexed;
263 CommandIndex: TListInteger;
264 CommandStream: TVarBlockIndexed;
265 SequenceNumber: integer;
266 CommandError: integer;
267 MessageType: integer;
268 Command: TVarBlockIndexed;
269 TempStream: TMemoryStream;
270begin
271 try
272 TempStream := TMemoryStream.Create;
273 Command := TVarBlockIndexed.Create;
274 CommandIndex := TListInteger.Create;
275 Response := TVarBlockIndexed.Create;
276 ResponseData := TVarBlockIndexed.Create;
277 CommandStream := TVarBlockIndexed.Create;
278 RequestData := TVarBlockIndexed.Create;
279 ResponseCode := rcNone;
280 Command.Enclose := False;
281 Command.ReadFromStream(Stream);
282 with Command do begin
283 if TestIndex(0) then
284 MessageType := ReadVarUInt(0);
285 if TestIndex(1) then
286 SequenceNumber := ReadVarUInt(1);
287 if TestIndex(2) then
288 ReadVarUIntArray(2, CommandIndex);
289 if TestIndex(3) then
290 ReadVarIndexedBlock(3, RequestData);
291 if Assigned(FOnCommand) then
292 FOnCommand(CommandIndex, RequestData, ResponseData, ResponseCode, CommandError)
293 else
294 ResponseCode := rcCommandNotSupported;
295 end;
296 with Response do begin
297 Enclose := False;
298 WriteVarUInt(0, Integer(mtResponse));
299 WriteVarUInt(1, Integer(SequenceNumber));
300 if ResponseData.Items.Count > 0 then
301 WriteVarIndexedBlock(2, ResponseData);
302 WriteVarUInt(3, Integer(ResponseCode));
303 WriteVarUInt(4, Integer(CommandError));
304 WriteToStream(TempStream);
305 if ResponseCode <> rcDropped then
306 Pin.Send(TempStream);
307 end;
308
309 if Assigned(FOnAfterRequest) then
310 FOnAfterRequest(CommandIndex, RequestData, ResponseData,
311 ResponseCode, CommandError);
312 finally
313 TempStream.Free;
314 RequestData.Free;
315 CommandStream.Free;
316 ResponseData.Free;
317 Response.Free;
318 Command.Free;
319 CommandIndex.Free;
320 end;
321end;
322
323procedure TCommProtocol.SetActive(const AValue: Boolean);
324begin
325 if FActive = AValue then Exit;
326 FActive := AValue;
327 if AValue then begin
328 RetransmitThread := TRetransmitCheckThread.Create(True);
329 with RetransmitThread do begin
330 CheckPeriod := 100; // ms
331 Parent := Self;
332 FreeOnTerminate := False;
333 Name := 'CommProtocol';
334 Start;
335 end;
336 end else begin
337 // Wait for empty session list
338 try
339 Sessions.Lock.Acquire;
340 while Sessions.Count > 0 do
341 try
342 Sessions.Lock.Release;
343 Sleep(1);
344 finally
345 Sessions.Lock.Acquire;
346 end;
347 finally
348 Sessions.Lock.Release;
349 end;
350 FreeAndNil(RetransmitThread);
351 end;
352end;
353
354procedure TCommProtocol.SendCommand(Command: array of integer;
355 ResponseParameters: TVarBlockIndexed = nil; RequestParameters: TVarBlockIndexed = nil;
356 ARaiseError: boolean = True);
357var
358 Session: TDeviceProtocolSession;
359 NewRequest: TVarBlockIndexed;
360begin
361 if FActive then begin
362 try
363 //Lock.Acquire;
364 Session := TDeviceProtocolSession.Create;
365 Sessions.Add(Session);
366 NewRequest := TVarBlockIndexed.Create;
367
368 Session.ResponseParameters := ResponseParameters;
369 with Session do begin
370 try
371 Lock.Acquire;
372 CommandIndex.Clear;
373 CommandIndex.AddArray(Command);
374 with NewRequest do begin
375 Enclose := False;
376 WriteVarUInt(0, Integer(mtRequest));
377 WriteVarUInt(1, SequenceNumber);
378 WriteVarUIntArray(2, CommandIndex);
379 if Assigned(RequestParameters) then
380 WriteVarIndexedBlock(3, RequestParameters);
381 end;
382 RaiseError := ARaiseError;
383 NewRequest.WriteToStream(Request);
384
385 RemoteBuffer.Allocate(Request.Size);
386
387 //StopWatch.Start;
388 TransmitTime := Now;
389 Pin.Send(Request);
390 Enabled := True;
391 finally
392 Lock.Release;
393 end;
394 try
395 while ReceiveEvent.WaitFor(1) = wrTimeout do begin
396 if Timeouted then
397 raise ECommTimeout.Create(SResponseTimeout);
398 end;
399 if ResponseCode <> Integer(rcNone) then begin
400 if Assigned(FOnDebugLog) then
401 FOnDebugLog(SDeviceProtocol, Format(SResponseError, [CommandIndex.Implode('.', IntToStr), IntToStr(ResponseCode)]));
402 raise ECommResponseCodeError.Create(Format(SResponseError, [CommandIndex.Implode('.', IntToStr), IntToStr(ResponseCode)]));
403 end;
404 LastCommandResponseTime := Now;
405 LastLatency := Latency;
406 finally
407 RemoteBuffer.Release(Session.Request.Size);
408 Sessions.Remove(Session);
409 end;
410 end;
411 finally
412 NewRequest.Free;
413 //Lock.Free;
414 end;
415 end else raise ENotActive.Create(SProtocolNotActive);
416end;
417
418constructor TCommProtocol.Create;
419begin
420 RemoteBuffer := TRemoteBuffer.Create;
421 Lock := TCriticalSection.Create;
422 Pin := TCommPin.Create;
423 Pin.OnReceive := DataReceive;
424 Sessions := TDeviceProtocolSessionList.Create;
425 Sessions.Parent := Self;
426 RetransmitTimeout := 3 * OneSecond;
427 RetransmitRepeatCount := 3;
428 RetransmitTotalCount := 0;
429end;
430
431destructor TCommProtocol.Destroy;
432begin
433 Active := False;
434 Sessions.Free;
435 Pin.Free;
436 Lock.Free;
437 RemoteBuffer.Free;
438 inherited;
439end;
440
441procedure TCommProtocol.Assign(Source: TCommProtocol);
442begin
443 LastCommandResponseTime := Source.LastCommandResponseTime;
444 LastLatency := Source.LastLatency;
445 RemoteBuffer.Assign(Source.RemoteBuffer);
446 WrongSequenceCount := Source.WrongSequenceCount;
447 RetransmitTimeout := Source.RetransmitTimeout;
448 RetransmitRepeatCount := Source.RetransmitRepeatCount;
449 RetransmitTotalCount := Source.RetransmitTotalCount;
450 Pin.Connect(Source.Pin.RemotePin);
451 OnCommand := Source.OnCommand;
452 OnAfterRequest := Source.OnAfterRequest;
453 OnDebugLog := Source.OnDebugLog;
454 Sessions.Assign(Source.Sessions);
455 Active := Source.Active;
456end;
457
458{ TDeviceProtocolSession }
459
460constructor TDeviceProtocolSession.Create;
461begin
462 ResponseCode := 0;
463 Lock := TCriticalSection.Create;
464 ReceiveEvent := TSimpleEvent.Create;
465 //ReceiveEvent.ManualReset := True;
466 Request := TStreamHelper.Create;
467 ResponseParameters := nil;
468 CommandIndex := TListInteger.Create;
469 Latency := 0;
470 TransmitTime := 0;
471end;
472
473destructor TDeviceProtocolSession.Destroy;
474begin
475 CommandIndex.Free;
476 Request.Free;
477 ReceiveEvent.Free;
478 Lock.Free;
479 inherited Destroy;
480end;
481
482{ TDeviceProtocolSessionList }
483
484procedure TDeviceProtocolSessionList.Add(Session: TDeviceProtocolSession);
485begin
486 // Block if no free session available
487 try
488 Lock.Acquire;
489 Session.SequenceNumber := GetSequenceNumber;
490 while Count >= MaxSessionCount do
491 begin
492 try
493 Lock.Release;
494 Sleep(1);
495 finally
496 Lock.Acquire;
497 end;
498 end;
499 inherited Add(Session);
500 finally
501 Lock.Release;
502 end;
503end;
504
505function TDeviceProtocolSessionList.GetBySequence(Sequence: integer):
506TDeviceProtocolSession;
507var
508 I: integer;
509begin
510 I := 0;
511 while (I < Count) and (TDeviceProtocolSession(Items[I]).SequenceNumber <> Sequence) do
512 Inc(I);
513 if I < Count then
514 Result := TDeviceProtocolSession(Items[I])
515 else
516 Result := nil;
517end;
518
519procedure TDeviceProtocolSessionList.Remove(Session: TDeviceProtocolSession);
520begin
521 try
522 Lock.Acquire;
523 inherited Remove(TObject(Session));
524 finally
525 Lock.Release;
526 end;
527end;
528
529constructor TDeviceProtocolSessionList.Create;
530begin
531 inherited Create;
532 Lock := TCriticalSection.Create;
533 MaxSessionCount := 100;
534 MaxSequenceNumber := 127;
535end;
536
537destructor TDeviceProtocolSessionList.Destroy;
538begin
539 // Free session list before freeing Lock
540 // instead of freeing in inherited Destroy in TListObject
541 try
542// Lock.Acquire;
543 Clear;
544 finally
545// Lock.Release;
546 end;
547
548 Lock.Free;
549 inherited Destroy;
550end;
551
552procedure TDeviceProtocolSessionList.Assign(Source: TListObject);
553begin
554 MaxSequenceNumber := TDeviceProtocolSessionList(Source).MaxSequenceNumber;
555 MaxSessionCount := TDeviceProtocolSessionList(Source).MaxSessionCount;
556 SequenceNumber := TDeviceProtocolSessionList(Source).SequenceNumber;
557
558 inherited;
559end;
560
561function TDeviceProtocolSessionList.GetSequenceNumber: Integer;
562begin
563 try
564 //Lock.Acquire;
565 Inc(SequenceNumber);
566 if SequenceNumber > MaxSequenceNumber then
567 SequenceNumber := 0;
568 Result := SequenceNumber;
569 finally
570 //Lock.Release;
571 end;
572end;
573
574{ TRetransmitCheckThread }
575
576procedure TRetransmitCheckThread.Execute;
577var
578 I: Integer;
579 C: Integer;
580 Session: TDeviceProtocolSession;
581begin
582 with Parent do
583 repeat
584 try
585 Parent.Sessions.Lock.Acquire;
586 I := 0;
587 while I < Sessions.Count do begin
588 Session := TDeviceProtocolSession(Sessions[I]);
589 with Session do
590 if Enabled then begin
591 try
592 Session.Lock.Acquire;
593 if (TransmitTime > 0) and (Now > (TransmitTime + RetransmitTimeout)) then begin
594 if RepeatCounter < RetransmitRepeatCount then begin
595 Pin.Send(Request);
596 TransmitTime := Now;
597 Inc(RepeatCounter);
598 Inc(RetransmitTotalCount);
599 end else
600 Timeouted := True;
601 end;
602 finally
603 Session.Lock.Release;
604 end;
605 end;
606 Inc(I);
607 end;
608 finally
609 Parent.Sessions.Lock.Release;
610 end;
611
612 if not Terminated then
613 Sleep(CheckPeriod);
614 until Terminated;
615end;
616
617end.
618
Note: See TracBrowser for help on using the repository browser.