1 /*
2 *Copyright (C) 2018 Laurent Tréguier
3 *
4 *This file is part of DLS.
5 *
6 *DLS is free software: you can redistribute it and/or modify
7 *it under the terms of the GNU General Public License as published by
8 *the Free Software Foundation, either version 3 of the License, or
9 *(at your option) any later version.
10 *
11 *DLS is distributed in the hope that it will be useful,
12 *but WITHOUT ANY WARRANTY; without even the implied warranty of
13 *MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 *GNU General Public License for more details.
15 *
16 *You should have received a copy of the GNU General Public License
17 *along with DLS. If not, see <http://www.gnu.org/licenses/>.
18 *
19 */
20
21 module dls.util.communicator;
22
23 import std.stdio : File;
24
25 // Socket can't be used with a shared aliasing.
26 // However, the communicator's methods are all called either from the main
27 // thread, or from inside a synchronized block, so __gshared is ok.
28 private __gshared Communicator _communicator;
29 private __gshared File _stdin;
30 private __gshared File _stdout;
31
32 shared static this()
33 {
34 import std.stdio : stdin, stdout;
35
36 _stdin = stdin;
37 _stdout = stdout;
38
39 version (Windows)
40 {
41 stdin = File("NUL", "rb");
42 stdout = File("NUL", "wb");
43 }
44 else version (Posix)
45 {
46 stdin = File("/dev/null", "rb");
47 stdout = File("/dev/null", "wb");
48 }
49 }
50
51 shared static ~this()
52 {
53 if (_communicator !is null)
54 {
55 destroy(communicator);
56 }
57 }
58
59 @property Communicator communicator()
60 {
61 return _communicator;
62 }
63
64 @property void communicator(Communicator c)
65 {
66 assert(_communicator is null);
67 _communicator = c;
68 }
69
70 interface Communicator
71 {
72 bool hasData();
73 bool hasPendingData();
74 char[] read(const size_t size);
75 void write(const char[] data);
76 void flush();
77 }
78
79 class StdioCommunicator : Communicator
80 {
81 import std.parallelism : Task, TaskPool;
82
83 private bool _checkPending;
84 private TaskPool _pool;
85 private Task!(readChar)* _background;
86
87 static char readChar()
88 {
89 static char[1] buffer;
90 auto result = _stdin.rawRead(buffer);
91
92 if (result.length > 0)
93 {
94 return result[0];
95 }
96
97 throw new Exception("No input data");
98 }
99
100 this(bool checkPendingData)
101 {
102 _checkPending = checkPendingData;
103
104 if (checkPendingData)
105 {
106 _pool = new TaskPool(1);
107 _pool.isDaemon = true;
108 startBackground();
109 }
110 }
111
112 ~this()
113 {
114 if (_checkPending)
115 {
116 _pool.stop();
117 }
118 }
119
120 bool hasData()
121 {
122 return _stdin.isOpen && !_stdin.eof;
123 }
124
125 bool hasPendingData()
126 {
127 try
128 {
129 return _checkPending && _background.done;
130 }
131 catch (Exception e)
132 {
133 return false;
134 }
135 }
136
137 char[] read(const size_t size)
138 {
139 if (size == 0)
140 {
141 return [];
142 }
143
144 static char[] buffer;
145 buffer.length = size;
146
147 if (!_checkPending)
148 {
149 return _stdin.rawRead(buffer);
150 }
151
152 scope(exit)
153 {
154 startBackground();
155 }
156
157 try
158 {
159 buffer[0] = _background.yieldForce();
160 }
161 catch (Exception e)
162 {
163 return hasData() ? _stdin.rawRead(buffer) : [];
164 }
165
166 if (size > 1)
167 {
168 buffer = buffer[0 .. _stdin.rawRead(buffer[1 .. $]).length + 1];
169 }
170
171 return buffer;
172 }
173
174 void write(const char[] data)
175 {
176 _stdout.rawWrite(data);
177 }
178
179 void flush()
180 {
181 _stdout.flush();
182 }
183
184 private void startBackground()
185 {
186 import std.parallelism : task;
187
188 if (_checkPending && hasData())
189 {
190 _background = task!readChar;
191 _pool.put(_background);
192 }
193 }
194 }
195
196 class SocketCommunicator : Communicator
197 {
198 import std.socket : Socket;
199
200 private Socket _socket;
201
202 this(ushort port)
203 {
204 import std.socket : AddressInfo, InternetAddress, SocketOption,
205 SocketOptionLevel, TcpSocket;
206
207 _socket = new TcpSocket(new InternetAddress("localhost", port));
208 _socket.setOption(SocketOptionLevel.TCP, SocketOption.TCP_NODELAY, 1);
209 }
210
211 bool hasData()
212 {
213 synchronized (_socket)
214 {
215 return _socket.isAlive;
216 }
217 }
218
219 bool hasPendingData()
220 {
221 import std.socket : SocketFlags;
222
223 static char[1] buffer;
224 ptrdiff_t result;
225
226 synchronized (_socket)
227 {
228 _socket.blocking = false;
229 result = _socket.receive(buffer, SocketFlags.PEEK);
230 _socket.blocking = true;
231 }
232
233 return result != Socket.ERROR && result > 0;
234 }
235
236 char[] read(const size_t size)
237 {
238 static char[] buffer;
239 buffer.length = size;
240 ptrdiff_t totalBytesReceived;
241 ptrdiff_t bytesReceived;
242
243 do
244 {
245 synchronized (_socket)
246 {
247 bytesReceived = _socket.receive(buffer);
248 }
249
250 if (bytesReceived != Socket.ERROR)
251 {
252 totalBytesReceived += bytesReceived;
253 }
254 else if (bytesReceived == 0)
255 {
256 buffer.length = totalBytesReceived;
257 break;
258 }
259 }
260 while (bytesReceived == Socket.ERROR || totalBytesReceived < size);
261
262 return buffer;
263 }
264
265 void write(const char[] data)
266 {
267 ptrdiff_t totalBytesSent;
268 ptrdiff_t bytesSent;
269
270 do
271 {
272 synchronized (_socket)
273 {
274 bytesSent = _socket.send(data[totalBytesSent .. $]);
275 }
276
277 if (bytesSent != Socket.ERROR)
278 {
279 totalBytesSent += bytesSent;
280 }
281 }
282 while (bytesSent == Socket.ERROR || totalBytesSent < data.length);
283 }
284
285 void flush()
286 {
287 }
288 }