source: PinConnection/CommThread.pas

Last change on this file was 575, checked in by chronos, 5 months ago
  • Modified: Remove U prefix from unit names of PinConnection package.
File size: 4.7 KB
Line 
1unit CommThread;
2
3interface
4
5uses
6 Classes, SysUtils, blcksock, CommPin, SyncObjs, Common,
7 DateUtils, Threading, SpecializedList, BinarySerializer;
8
9type
10 TCommThread = class;
11
12 TReceiveDataEvent = procedure(Stream: TMemoryStream) of object;
13
14 { TCommThreadReceiveThread }
15
16 TCommThreadReceiveThread = class(TTermThread)
17 public
18 Parent: TCommThread;
19 Stream: TBinarySerializer;
20 procedure Execute; override;
21 constructor Create(CreateSuspended: Boolean;
22 const StackSize: SizeUInt = DefaultStackSize);
23 destructor Destroy; override;
24 end;
25
26 { TCommThread }
27
28 TCommThread = class(TCommNode)
29 private
30 //FOnReceiveData: TReceiveDataEvent;
31 FReceiveThread: TCommThreadReceiveThread;
32 FInputBuffer: TBinarySerializer;
33 FInputBufferLock: TCriticalSection;
34 FDataAvailable: TEvent;
35 FStatusEvent: TEvent;
36 FStatusValue: Integer;
37 procedure PinReceiveData(Sender: TCommPin; Stream: TListByte);
38 procedure PinSetStatus(Sender: TCommPin; Status: Integer);
39 procedure ExtReceiveData(Sender: TCommPin; Stream: TListByte);
40 procedure ExtSetStatus(Sender: TCommPin; AStatus: Integer);
41 protected
42 procedure SetActive(const AValue: Boolean); override;
43 public
44 Ext: TCommPin;
45 Pin: TCommPin;
46 constructor Create(AOwner: TComponent); override;
47 destructor Destroy; override;
48 end;
49
50
51implementation
52
53{ TCommThread }
54
55procedure TCommThread.PinReceiveData(Sender: TCommPin; Stream: TListByte);
56begin
57 if FActive then Ext.Send(Stream);
58end;
59
60procedure TCommThread.PinSetStatus(Sender: TCommPin; Status: Integer);
61begin
62 if FActive then Ext.Status := Status;
63end;
64
65procedure TCommThread.ExtReceiveData(Sender: TCommPin; Stream: TListByte);
66begin
67 try
68 FInputBufferLock.Acquire;
69 FInputBuffer.WriteList(Stream, 0, Stream.Count);
70 FDataAvailable.SetEvent;
71 finally
72 FInputBufferLock.Release;
73 end;
74end;
75
76procedure TCommThread.ExtSetStatus(Sender: TCommPin; AStatus: Integer);
77begin
78 try
79 FInputBufferLock.Acquire;
80 FStatusValue := AStatus;
81 FStatusEvent.SetEvent;
82 finally
83 FInputBufferLock.Release;
84 end;
85end;
86
87procedure TCommThread.SetActive(const AValue: Boolean);
88begin
89 if FActive = AValue then Exit;
90 FActive := AValue;
91
92 if AValue then begin
93 FReceiveThread := TCommThreadReceiveThread.Create(True);
94 FReceiveThread.FreeOnTerminate := False;
95 FReceiveThread.Parent := Self;
96 FReceiveThread.Name := 'CommThread';
97 FReceiveThread.Start;
98 end else begin
99 FreeAndNil(FReceiveThread);
100 end;
101 inherited;
102end;
103
104constructor TCommThread.Create(AOwner: TComponent);
105begin
106 inherited;
107 FInputBuffer := TBinarySerializer.Create;
108 FInputBuffer.List := TListByte.Create;
109 FInputBuffer.OwnsList := True;
110 FInputBufferLock := TCriticalSection.Create;
111 Ext := TCommPin.Create;
112 Ext.OnReceive := ExtReceiveData;
113 Ext.OnSetSatus := ExtSetStatus;
114 Ext.Node := Self;
115 Pin := TCommPin.Create;
116 Pin.OnReceive := PinReceiveData;
117 Pin.OnSetSatus := PinSetStatus;
118 Pin.Node := Self;
119 FDataAvailable := TSimpleEvent.Create;
120 FStatusEvent := TSimpleEvent.Create;
121end;
122
123destructor TCommThread.Destroy;
124begin
125 Active := False;
126 FInputBufferLock.Acquire;
127 FreeAndNil(FInputBuffer);
128 FreeAndNil(FInputBufferLock);
129 FreeAndNil(Ext);
130 FreeAndNil(Pin);
131 FreeAndNil(FStatusEvent);
132 FreeAndNil(FDataAvailable);
133 inherited;
134end;
135
136{ TCommThreadReceiveThread }
137
138procedure TCommThreadReceiveThread.Execute;
139var
140 TempStatus: Integer;
141 DoSleep: Boolean;
142begin
143 with Parent do
144 repeat
145 DoSleep := True;
146 // Check if new data arrived
147 if FDataAvailable.WaitFor(0) = wrSignaled then begin
148 DoSleep := False;
149 try
150 FInputBufferLock.Acquire;
151 Stream.List.Assign(FInputBuffer.List);
152 FDataAvailable.ResetEvent;
153 FInputBuffer.Clear;
154 finally
155 FInputBufferLock.Release;
156 end; // else Yield;
157 Pin.Send(Stream.List);
158 end;
159
160 // Check if state changed
161 if FStatusEvent.WaitFor(0) = wrSignaled then begin
162 DoSleep := False;
163 try
164 FInputBufferLock.Acquire;
165 TempStatus := FStatusValue;
166 finally
167 FStatusEvent.ResetEvent;
168 FInputBufferLock.Release;
169 end;
170 Pin.Status := TempStatus;
171 end;
172 if not Terminated and DoSleep then begin
173 Sleep(1);
174 end;
175 until Terminated;
176end;
177
178constructor TCommThreadReceiveThread.Create(CreateSuspended: Boolean;
179 const StackSize: SizeUInt);
180begin
181 inherited;
182 Stream := TBinarySerializer.Create;
183 Stream.List := TListByte.Create;
184 Stream.OwnsList := True;
185end;
186
187destructor TCommThreadReceiveThread.Destroy;
188begin
189 FreeAndNil(Stream);
190 inherited;
191end;
192
193end.
194
Note: See TracBrowser for help on using the repository browser.