1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.fourthline.cling.transport;
17
18 import org.fourthline.cling.UpnpServiceConfiguration;
19 import org.fourthline.cling.model.NetworkAddress;
20 import org.fourthline.cling.model.message.IncomingDatagramMessage;
21 import org.fourthline.cling.model.message.OutgoingDatagramMessage;
22 import org.fourthline.cling.model.message.StreamRequestMessage;
23 import org.fourthline.cling.model.message.StreamResponseMessage;
24 import org.fourthline.cling.protocol.ProtocolCreationException;
25 import org.fourthline.cling.protocol.ProtocolFactory;
26 import org.fourthline.cling.protocol.ReceivingAsync;
27 import org.fourthline.cling.transport.spi.DatagramIO;
28 import org.fourthline.cling.transport.spi.InitializationException;
29 import org.fourthline.cling.transport.spi.MulticastReceiver;
30 import org.fourthline.cling.transport.spi.NetworkAddressFactory;
31 import org.fourthline.cling.transport.spi.NoNetworkException;
32 import org.fourthline.cling.transport.spi.StreamClient;
33 import org.fourthline.cling.transport.spi.StreamServer;
34 import org.fourthline.cling.transport.spi.UpnpStream;
35 import org.seamless.util.Exceptions;
36
37 import javax.enterprise.context.ApplicationScoped;
38 import javax.enterprise.event.Observes;
39 import javax.enterprise.inject.Default;
40 import javax.inject.Inject;
41 import java.net.BindException;
42 import java.net.DatagramPacket;
43 import java.net.InetAddress;
44 import java.net.NetworkInterface;
45 import java.util.ArrayList;
46 import java.util.Collections;
47 import java.util.HashMap;
48 import java.util.Iterator;
49 import java.util.List;
50 import java.util.Map;
51 import java.util.concurrent.TimeUnit;
52 import java.util.concurrent.locks.Lock;
53 import java.util.concurrent.locks.ReentrantReadWriteLock;
54 import java.util.logging.Level;
55 import java.util.logging.Logger;
56
57
58
59
60
61
62
63
64
65 @ApplicationScoped
66 public class RouterImpl implements Router {
67
68 private static Logger log = Logger.getLogger(Router.class.getName());
69
70 protected UpnpServiceConfiguration configuration;
71 protected ProtocolFactory protocolFactory;
72
73 protected volatile boolean enabled;
74 protected ReentrantReadWriteLock routerLock = new ReentrantReadWriteLock(true);
75 protected Lock readLock = routerLock.readLock();
76 protected Lock writeLock = routerLock.writeLock();
77
78
79 protected NetworkAddressFactory networkAddressFactory;
80 protected StreamClient streamClient;
81 protected final Map<NetworkInterface, MulticastReceiver> multicastReceivers = new HashMap<>();
82 protected final Map<InetAddress, DatagramIO> datagramIOs = new HashMap<>();
83 protected final Map<InetAddress, StreamServer> streamServers = new HashMap<>();
84
85 protected RouterImpl() {
86 }
87
88
89
90
91
92 @Inject
93 public RouterImpl(UpnpServiceConfiguration configuration, ProtocolFactory protocolFactory) {
94 log.info("Creating Router: " + getClass().getName());
95 this.configuration = configuration;
96 this.protocolFactory = protocolFactory;
97 }
98
99 public boolean enable(@Observes @Default EnableRouter event) throws RouterException {
100 return enable();
101 }
102
103 public boolean disable(@Observes @Default DisableRouter event) throws RouterException {
104 return disable();
105 }
106
107 public UpnpServiceConfiguration getConfiguration() {
108 return configuration;
109 }
110
111 public ProtocolFactory getProtocolFactory() {
112 return protocolFactory;
113 }
114
115
116
117
118
119
120
121
122 @Override
123 public boolean enable() throws RouterException {
124 lock(writeLock);
125 try {
126 if (!enabled) {
127 try {
128 log.fine("Starting networking services...");
129 networkAddressFactory = getConfiguration().createNetworkAddressFactory();
130
131 startInterfaceBasedTransports(networkAddressFactory.getNetworkInterfaces());
132 startAddressBasedTransports(networkAddressFactory.getBindAddresses());
133
134
135 if (!networkAddressFactory.hasUsableNetwork()) {
136 throw new NoNetworkException(
137 "No usable network interface and/or addresses available, check the log for errors."
138 );
139 }
140
141
142 streamClient = getConfiguration().createStreamClient();
143
144 enabled = true;
145 return true;
146 } catch (InitializationException ex) {
147 handleStartFailure(ex);
148 }
149 }
150 return false;
151 } finally {
152 unlock(writeLock);
153 }
154 }
155
156 @Override
157 public boolean disable() throws RouterException {
158 lock(writeLock);
159 try {
160 if (enabled) {
161 log.fine("Disabling network services...");
162
163 if (streamClient != null) {
164 log.fine("Stopping stream client connection management/pool");
165 streamClient.stop();
166 streamClient = null;
167 }
168
169 for (Map.Entry<InetAddress, StreamServer> entry : streamServers.entrySet()) {
170 log.fine("Stopping stream server on address: " + entry.getKey());
171 entry.getValue().stop();
172 }
173 streamServers.clear();
174
175 for (Map.Entry<NetworkInterface, MulticastReceiver> entry : multicastReceivers.entrySet()) {
176 log.fine("Stopping multicast receiver on interface: " + entry.getKey().getDisplayName());
177 entry.getValue().stop();
178 }
179 multicastReceivers.clear();
180
181 for (Map.Entry<InetAddress, DatagramIO> entry : datagramIOs.entrySet()) {
182 log.fine("Stopping datagram I/O on address: " + entry.getKey());
183 entry.getValue().stop();
184 }
185 datagramIOs.clear();
186
187 networkAddressFactory = null;
188 enabled = false;
189 return true;
190 }
191 return false;
192 } finally {
193 unlock(writeLock);
194 }
195 }
196
197 @Override
198 public void shutdown() throws RouterException {
199 disable();
200 }
201
202 @Override
203 public boolean isEnabled() {
204 return enabled;
205 }
206
207 @Override
208 public void handleStartFailure(InitializationException ex) throws InitializationException {
209 if (ex instanceof NoNetworkException) {
210 log.info("Unable to initialize network router, no network found.");
211 } else {
212 log.severe("Unable to initialize network router: " + ex);
213 log.severe("Cause: " + Exceptions.unwrap(ex));
214 }
215 }
216
217 public List<NetworkAddress> getActiveStreamServers(InetAddress preferredAddress) throws RouterException {
218 lock(readLock);
219 try {
220 if (enabled && streamServers.size() > 0) {
221 List<NetworkAddress> streamServerAddresses = new ArrayList<>();
222
223 StreamServer preferredServer;
224 if (preferredAddress != null &&
225 (preferredServer = streamServers.get(preferredAddress)) != null) {
226 streamServerAddresses.add(
227 new NetworkAddress(
228 preferredAddress,
229 preferredServer.getPort(),
230 networkAddressFactory.getHardwareAddress(preferredAddress)
231
232 )
233 );
234 return streamServerAddresses;
235 }
236
237 for (Map.Entry<InetAddress, StreamServer> entry : streamServers.entrySet()) {
238 byte[] hardwareAddress = networkAddressFactory.getHardwareAddress(entry.getKey());
239 streamServerAddresses.add(
240 new NetworkAddress(entry.getKey(), entry.getValue().getPort(), hardwareAddress)
241 );
242 }
243 return streamServerAddresses;
244 } else {
245 return Collections.EMPTY_LIST;
246 }
247 } finally {
248 unlock(readLock);
249 }
250 }
251
252
253
254
255
256
257
258
259
260
261
262
263
264 public void received(IncomingDatagramMessage msg) {
265 if (!enabled) {
266 log.fine("Router disabled, ignoring incoming message: " + msg);
267 return;
268 }
269 try {
270 ReceivingAsync protocol = getProtocolFactory().createReceivingAsync(msg);
271 if (protocol == null) {
272 if (log.isLoggable(Level.FINEST))
273 log.finest("No protocol, ignoring received message: " + msg);
274 return;
275 }
276 if (log.isLoggable(Level.FINE))
277 log.fine("Received asynchronous message: " + msg);
278 getConfiguration().getAsyncProtocolExecutor().execute(protocol);
279 } catch (ProtocolCreationException ex) {
280 log.warning("Handling received datagram failed - " + Exceptions.unwrap(ex).toString());
281 }
282 }
283
284
285
286
287
288
289
290 public void received(UpnpStream stream) {
291 if (!enabled) {
292 log.fine("Router disabled, ignoring incoming: " + stream);
293 return;
294 }
295 log.fine("Received synchronous stream: " + stream);
296 getConfiguration().getSyncProtocolExecutorService().execute(stream);
297 }
298
299
300
301
302
303
304 public void send(OutgoingDatagramMessage msg) throws RouterException {
305 lock(readLock);
306 try {
307 if (enabled) {
308 for (DatagramIO datagramIO : datagramIOs.values()) {
309 datagramIO.send(msg);
310 }
311 } else {
312 log.fine("Router disabled, not sending datagram: " + msg);
313 }
314 } finally {
315 unlock(readLock);
316 }
317 }
318
319
320
321
322
323
324
325
326 public StreamResponseMessage send(StreamRequestMessage msg) throws RouterException {
327 lock(readLock);
328 try {
329 if (enabled) {
330 if (streamClient == null) {
331 log.fine("No StreamClient available, not sending: " + msg);
332 return null;
333 }
334 log.fine("Sending via TCP unicast stream: " + msg);
335 try {
336 return streamClient.sendRequest(msg);
337 } catch (InterruptedException ex) {
338 throw new RouterException("Sending stream request was interrupted", ex);
339 }
340 } else {
341 log.fine("Router disabled, not sending stream request: " + msg);
342 return null;
343 }
344 } finally {
345 unlock(readLock);
346 }
347 }
348
349
350
351
352
353
354
355
356
357
358 public void broadcast(byte[] bytes) throws RouterException {
359 lock(readLock);
360 try {
361 if (enabled) {
362 for (Map.Entry<InetAddress, DatagramIO> entry : datagramIOs.entrySet()) {
363 InetAddress broadcast = networkAddressFactory.getBroadcastAddress(entry.getKey());
364 if (broadcast != null) {
365 log.fine("Sending UDP datagram to broadcast address: " + broadcast.getHostAddress());
366 DatagramPacket packet = new DatagramPacket(bytes, bytes.length, broadcast, 9);
367 entry.getValue().send(packet);
368 }
369 }
370 } else {
371 log.fine("Router disabled, not broadcasting bytes: " + bytes.length);
372 }
373 } finally {
374 unlock(readLock);
375 }
376 }
377
378 protected void startInterfaceBasedTransports(Iterator<NetworkInterface> interfaces) throws InitializationException {
379 while (interfaces.hasNext()) {
380 NetworkInterface networkInterface = interfaces.next();
381
382
383 MulticastReceiver multicastReceiver = getConfiguration().createMulticastReceiver(networkAddressFactory);
384 if (multicastReceiver == null) {
385 log.info("Configuration did not create a MulticastReceiver for: " + networkInterface);
386 } else {
387 try {
388 if (log.isLoggable(Level.FINE))
389 log.fine("Init multicast receiver on interface: " + networkInterface.getDisplayName());
390 multicastReceiver.init(
391 networkInterface,
392 this,
393 networkAddressFactory,
394 getConfiguration().getDatagramProcessor()
395 );
396
397 multicastReceivers.put(networkInterface, multicastReceiver);
398 } catch (InitializationException ex) {
399
400
401
402
403
404
405
406
407
408
409
410 throw ex;
411 }
412 }
413 }
414
415 for (Map.Entry<NetworkInterface, MulticastReceiver> entry : multicastReceivers.entrySet()) {
416 if (log.isLoggable(Level.FINE))
417 log.fine("Starting multicast receiver on interface: " + entry.getKey().getDisplayName());
418 getConfiguration().getMulticastReceiverExecutor().execute(entry.getValue());
419 }
420 }
421
422 protected void startAddressBasedTransports(Iterator<InetAddress> addresses) throws InitializationException {
423 while (addresses.hasNext()) {
424 InetAddress address = addresses.next();
425
426
427 StreamServer streamServer = getConfiguration().createStreamServer(networkAddressFactory);
428 if (streamServer == null) {
429 log.info("Configuration did not create a StreamServer for: " + address);
430 } else {
431 try {
432 if (log.isLoggable(Level.FINE))
433 log.fine("Init stream server on address: " + address);
434 streamServer.init(address, this);
435 streamServers.put(address, streamServer);
436 } catch (InitializationException ex) {
437
438 Throwable cause = Exceptions.unwrap(ex);
439 if (cause instanceof BindException) {
440 log.warning("Failed to init StreamServer: " + cause);
441 if (log.isLoggable(Level.FINE))
442 log.log(Level.FINE, "Initialization exception root cause", cause);
443 log.warning("Removing unusable address: " + address);
444 addresses.remove();
445 continue;
446 }
447 throw ex;
448 }
449 }
450
451
452 DatagramIO datagramIO = getConfiguration().createDatagramIO(networkAddressFactory);
453 if (datagramIO == null) {
454 log.info("Configuration did not create a StreamServer for: " + address);
455 } else {
456 try {
457 if (log.isLoggable(Level.FINE))
458 log.fine("Init datagram I/O on address: " + address);
459 datagramIO.init(address, this, getConfiguration().getDatagramProcessor());
460 datagramIOs.put(address, datagramIO);
461 } catch (InitializationException ex) {
462
463
464
465
466
467
468
469
470
471
472
473 throw ex;
474 }
475 }
476 }
477
478 for (Map.Entry<InetAddress, StreamServer> entry : streamServers.entrySet()) {
479 if (log.isLoggable(Level.FINE))
480 log.fine("Starting stream server on address: " + entry.getKey());
481 getConfiguration().getStreamServerExecutorService().execute(entry.getValue());
482 }
483
484 for (Map.Entry<InetAddress, DatagramIO> entry : datagramIOs.entrySet()) {
485 if (log.isLoggable(Level.FINE))
486 log.fine("Starting datagram I/O on address: " + entry.getKey());
487 getConfiguration().getDatagramIOExecutor().execute(entry.getValue());
488 }
489 }
490
491 protected void lock(Lock lock, int timeoutMilliseconds) throws RouterException {
492 try {
493 log.finest("Trying to obtain lock with timeout milliseconds '" + timeoutMilliseconds + "': " + lock.getClass().getSimpleName());
494 if (lock.tryLock(timeoutMilliseconds, TimeUnit.MILLISECONDS)) {
495 log.finest("Acquired router lock: " + lock.getClass().getSimpleName());
496 } else {
497 throw new RouterException(
498 "Router wasn't available exclusively after waiting " + timeoutMilliseconds + "ms, lock failed: "
499 + lock.getClass().getSimpleName()
500 );
501 }
502 } catch (InterruptedException ex) {
503 throw new RouterException(
504 "Interruption while waiting for exclusive access: " + lock.getClass().getSimpleName(), ex
505 );
506 }
507 }
508
509 protected void lock(Lock lock) throws RouterException {
510 lock(lock, getLockTimeoutMillis());
511 }
512
513 protected void unlock(Lock lock) {
514 log.finest("Releasing router lock: " + lock.getClass().getSimpleName());
515 lock.unlock();
516 }
517
518
519
520
521 protected int getLockTimeoutMillis() {
522 return 6000;
523 }
524
525 }