source: PinConnection/PacketBurst.pas

Last change on this file was 575, checked in by chronos, 5 months ago
  • Modified: Remove U prefix from unit names of PinConnection package.
File size: 3.8 KB
Line 
1unit PacketBurst;
2
3interface
4
5uses
6 Classes, CommPin, SyncObjs, Common, SysUtils, SpecializedList,
7 DateUtils, BinarySerializer;
8
9type
10 TPacketBurst = class;
11
12 TPacketBurstSendThread = class(TThread)
13 PacketBurst: TPacketBurst;
14 procedure Execute; override;
15 end;
16
17 { TPacketBurst }
18
19 TPacketBurst = class(TCommNode)
20 private
21 SendThreadEvent: TEvent;
22 SendThread: TPacketBurstSendThread;
23 SendStreamLock: TCriticalSection;
24 SendStream: TBinarySerializer;
25 ReceiveStream: TBinarySerializer;
26 procedure PacketSingleReceive(Sender: TCommPin; Stream: TListByte);
27 procedure PacketBurstReceive(Sender: TCommPin; Stream: TListByte);
28 protected
29 procedure SetActive(const AValue: Boolean); override;
30 public
31 SendPeriod: Integer;
32 SendBurstSize: Integer;
33 PacketSinglePin: TCommPin;
34 PacketBurstPin: TCommPin;
35 destructor Destroy; override;
36 constructor Create(AOwner: TComponent); override;
37 end;
38
39
40implementation
41
42{ TSerialPort }
43
44constructor TPacketBurst.Create(AOwner: TComponent);
45begin
46 inherited;
47 PacketSinglePin := TCommPin.Create;
48 PacketSinglePin.OnReceive := PacketSingleReceive;
49 PacketSinglePin.Node := Self;
50 PacketBurstPin := TCommPin.Create;
51 PacketBurstPin.OnReceive := PacketBurstReceive;
52 PacketBurstPin.Node := Self;
53 SendThreadEvent := TSimpleEvent.Create;
54 SendPeriod := 1;
55end;
56
57destructor TPacketBurst.Destroy;
58begin
59 Active := False;
60 FreeAndNil(SendThreadEvent);
61 FreeAndNil(PacketSinglePin);
62 FreeAndNil(PacketBurstPin);
63 inherited;
64end;
65
66procedure TPacketBurst.PacketBurstReceive(Sender: TCommPin; Stream: TListByte);
67var
68 PacketStream: TListByte;
69 Size: Word;
70begin
71 try
72 PacketStream := TListByte.Create;
73 ReceiveStream.Position := ReceiveStream.List.Count;
74 ReceiveStream.WriteList(Stream, 0, Stream.Count);
75 ReceiveStream.Position := 0;
76 Size := ReceiveStream.ReadWord;
77 while Size < ReceiveStream.List.Count do begin
78 PacketStream.Count := Size;
79 ReceiveStream.ReadList(PacketStream, 0, Size);
80 PacketSinglePin.Send(PacketStream);
81 Size := ReceiveStream.ReadWord;
82 end;
83 finally
84 PacketStream.Free;
85 end;
86end;
87
88procedure TPacketBurst.SetActive(const AValue: Boolean);
89begin
90 if FActive = AValue then Exit;
91 FActive := AValue;
92 if AValue then begin
93 SendThread := TPacketBurstSendThread.Create(True);
94 SendThread.FreeOnTerminate := False;
95 SendThread.PacketBurst := Self;
96 //SendThread.Name := 'PacketBurst';
97 SendThread.Start;
98 end else begin
99 FreeAndNil(SendThread);
100 end;
101 inherited;
102end;
103
104procedure TPacketBurst.PacketSingleReceive(Sender: TCommPin; Stream: TListByte);
105var
106 SignalEvent: Boolean;
107begin
108 try
109 SendStreamLock.Acquire;
110 SendStream.WriteWord(Stream.Count);
111 SendStream.WriteList(Stream, 0, Stream.Count);
112 SignalEvent := SendStream.List.Count > SendBurstSize;
113 finally
114 SendStreamLock.Release;
115 end;
116 if SignalEvent then SendThreadEvent.SetEvent;
117end;
118
119{ TPacketBurstSendThread }
120
121procedure TPacketBurstSendThread.Execute;
122var
123 Stream: TListByte;
124begin
125 try
126 Stream := TListByte.Create;
127 with PacketBurst do
128 repeat
129 if SendThreadEvent.WaitFor(SendPeriod) = wrSignaled then
130 try
131 SendStreamLock.Acquire;
132 SendStream.Position := 0;
133 if SendStream.List.Count < SendBurstSize then begin
134 PacketBurstPin.Send(SendStream.List);
135 SendStream.List.Count := 0;
136 end else
137 while (SendStream.List.Count - SendStream.Position) > SendBurstSize do begin
138 Stream.Count := 0;
139 SendStream.ReadList(Stream, 0, SendBurstSize);
140 PacketBurstPin.Send(Stream);
141 end;
142 finally
143 SendStreamLock.Release;
144 end;
145 until Terminated;
146 finally
147 Stream.Free;
148 end;
149end;
150
151end.
Note: See TracBrowser for help on using the repository browser.