source: PinConnection/UPacketBurst.pas

Last change on this file was 440, checked in by chronos, 12 years ago
  • Fixed: Thread safe access to serial port pin interface using lock.
File size: 3.8 KB
Line 
1unit UPacketBurst;
2
3{$mode Delphi}{$H+}
4
5interface
6
7uses
8 Classes, UCommPin, SyncObjs, UCommon, SysUtils, SpecializedList,
9 DateUtils, UBinarySerializer;
10
11type
12 TPacketBurst = class;
13
14 TPacketBurstSendThread = class(TThread)
15 PacketBurst: TPacketBurst;
16 procedure Execute; override;
17 end;
18
19 { TPacketBurst }
20
21 TPacketBurst = class(TCommNode)
22 private
23 SendThreadEvent: TEvent;
24 SendThread: TPacketBurstSendThread;
25 SendStreamLock: TCriticalSection;
26 SendStream: TBinarySerializer;
27 ReceiveStream: TBinarySerializer;
28 procedure PacketSingleReceive(Sender: TCommPin; Stream: TListByte);
29 procedure PacketBurstReceive(Sender: TCommPin; Stream: TListByte);
30 protected
31 procedure SetActive(const AValue: Boolean); override;
32 public
33 SendPeriod: Integer;
34 SendBurstSize: Integer;
35 PacketSinglePin: TCommPin;
36 PacketBurstPin: TCommPin;
37 destructor Destroy; override;
38 constructor Create(AOwner: TComponent); override;
39 end;
40
41implementation
42
43{ TSerialPort }
44
45constructor TPacketBurst.Create(AOwner: TComponent);
46begin
47 inherited;
48 PacketSinglePin := TCommPin.Create;
49 PacketSinglePin.OnReceive := PacketSingleReceive;
50 PacketSinglePin.Node := Self;
51 PacketBurstPin := TCommPin.Create;
52 PacketBurstPin.OnReceive := PacketBurstReceive;
53 PacketBurstPin.Node := Self;
54 SendThreadEvent := TSimpleEvent.Create;
55 SendPeriod := 1;
56end;
57
58destructor TPacketBurst.Destroy;
59begin
60 Active := False;
61 SendThreadEvent.Free;
62 PacketSinglePin.Free;
63 PacketBurstPin.Free;
64 inherited;
65end;
66
67procedure TPacketBurst.PacketBurstReceive(Sender: TCommPin; Stream: TListByte);
68var
69 PacketStream: TListByte;
70 Size: Word;
71begin
72 try
73 PacketStream := TListByte.Create;
74 ReceiveStream.Position := ReceiveStream.List.Count;
75 ReceiveStream.WriteList(Stream, 0, Stream.Count);
76 ReceiveStream.Position := 0;
77 Size := ReceiveStream.ReadWord;
78 while Size < ReceiveStream.List.Count do begin
79 PacketStream.Count := Size;
80 ReceiveStream.ReadList(PacketStream, 0, Size);
81 PacketSinglePin.Send(PacketStream);
82 Size := ReceiveStream.ReadWord;
83 end;
84 finally
85 PacketStream.Free;
86 end;
87end;
88
89procedure TPacketBurst.SetActive(const AValue: Boolean);
90begin
91 if FActive = AValue then Exit;
92 FActive := AValue;
93 if AValue then begin
94 SendThread := TPacketBurstSendThread.Create(True);
95 SendThread.FreeOnTerminate := False;
96 SendThread.PacketBurst := Self;
97 //SendThread.Name := 'PacketBurst';
98 SendThread.Start;
99 end else begin
100 FreeAndNil(SendThread);
101 end;
102 inherited;
103end;
104
105procedure TPacketBurst.PacketSingleReceive(Sender: TCommPin; Stream: TListByte);
106var
107 SignalEvent: Boolean;
108begin
109 try
110 SendStreamLock.Acquire;
111 SendStream.WriteWord(Stream.Count);
112 SendStream.WriteList(Stream, 0, Stream.Count);
113 SignalEvent := SendStream.List.Count > SendBurstSize;
114 finally
115 SendStreamLock.Release;
116 end;
117 if SignalEvent then SendThreadEvent.SetEvent;
118end;
119
120{ TPacketBurstSendThread }
121
122procedure TPacketBurstSendThread.Execute;
123var
124 Stream: TListByte;
125begin
126 try
127 Stream := TListByte.Create;
128 with PacketBurst do
129 repeat
130 if SendThreadEvent.WaitFor(SendPeriod) = wrSignaled then
131 try
132 SendStreamLock.Acquire;
133 SendStream.Position := 0;
134 if SendStream.List.Count < SendBurstSize then begin
135 PacketBurstPin.Send(SendStream.List);
136 SendStream.List.Count := 0;
137 end else
138 while (SendStream.List.Count - SendStream.Position) > SendBurstSize do begin
139 Stream.Count := 0;
140 SendStream.ReadList(Stream, 0, SendBurstSize);
141 PacketBurstPin.Send(Stream);
142 end;
143 finally
144 SendStreamLock.Release;
145 end;
146 until Terminated;
147 finally
148 Stream.Free;
149 end;
150end;
151
152end.
Note: See TracBrowser for help on using the repository browser.