View Javadoc
1   /*
2    * Copyright (C) 2013 4th Line GmbH, Switzerland
3    *
4    * The contents of this file are subject to the terms of either the GNU
5    * Lesser General Public License Version 2 or later ("LGPL") or the
6    * Common Development and Distribution License Version 1 or later
7    * ("CDDL") (collectively, the "License"). You may not use this file
8    * except in compliance with the License. See LICENSE.txt for more
9    * information.
10   *
11   * This program is distributed in the hope that it will be useful,
12   * but WITHOUT ANY WARRANTY; without even the implied warranty of
13   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
14   */
15  
16  package org.fourthline.cling.transport;
17  
18  import org.fourthline.cling.UpnpServiceConfiguration;
19  import org.fourthline.cling.model.NetworkAddress;
20  import org.fourthline.cling.model.message.IncomingDatagramMessage;
21  import org.fourthline.cling.model.message.OutgoingDatagramMessage;
22  import org.fourthline.cling.model.message.StreamRequestMessage;
23  import org.fourthline.cling.model.message.StreamResponseMessage;
24  import org.fourthline.cling.protocol.ProtocolCreationException;
25  import org.fourthline.cling.protocol.ProtocolFactory;
26  import org.fourthline.cling.protocol.ReceivingAsync;
27  import org.fourthline.cling.transport.spi.DatagramIO;
28  import org.fourthline.cling.transport.spi.InitializationException;
29  import org.fourthline.cling.transport.spi.MulticastReceiver;
30  import org.fourthline.cling.transport.spi.NetworkAddressFactory;
31  import org.fourthline.cling.transport.spi.NoNetworkException;
32  import org.fourthline.cling.transport.spi.StreamClient;
33  import org.fourthline.cling.transport.spi.StreamServer;
34  import org.fourthline.cling.transport.spi.UpnpStream;
35  import org.seamless.util.Exceptions;
36  
37  import javax.enterprise.context.ApplicationScoped;
38  import javax.enterprise.event.Observes;
39  import javax.enterprise.inject.Default;
40  import javax.inject.Inject;
41  import java.net.BindException;
42  import java.net.DatagramPacket;
43  import java.net.InetAddress;
44  import java.net.NetworkInterface;
45  import java.util.ArrayList;
46  import java.util.Collections;
47  import java.util.HashMap;
48  import java.util.Iterator;
49  import java.util.List;
50  import java.util.Map;
51  import java.util.concurrent.TimeUnit;
52  import java.util.concurrent.locks.Lock;
53  import java.util.concurrent.locks.ReentrantReadWriteLock;
54  import java.util.logging.Level;
55  import java.util.logging.Logger;
56  
57  /**
58   * Default implementation of network message router.
59   * <p>
60   * Initializes and starts listening for data on the network when enabled.
61   * </p>
62   *
63   * @author Christian Bauer
64   */
65  @ApplicationScoped
66  public class RouterImpl implements Router {
67  
68      private static Logger log = Logger.getLogger(Router.class.getName());
69  
70      protected UpnpServiceConfiguration configuration;
71      protected ProtocolFactory protocolFactory;
72  
73      protected volatile boolean enabled;
74      protected ReentrantReadWriteLock routerLock = new ReentrantReadWriteLock(true);
75      protected Lock readLock = routerLock.readLock();
76      protected Lock writeLock = routerLock.writeLock();
77  
78      // These are created/destroyed when the router is enabled/disabled
79      protected NetworkAddressFactory networkAddressFactory;
80      protected StreamClient streamClient;
81      protected final Map<NetworkInterface, MulticastReceiver> multicastReceivers = new HashMap<>();
82      protected final Map<InetAddress, DatagramIO> datagramIOs = new HashMap<>();
83      protected final Map<InetAddress, StreamServer> streamServers = new HashMap<>();
84  
85      protected RouterImpl() {
86      }
87  
88      /**
89       * @param configuration   The configuration used by this router.
90       * @param protocolFactory The protocol factory used by this router.
91       */
92      @Inject
93      public RouterImpl(UpnpServiceConfiguration configuration, ProtocolFactory protocolFactory) {
94          log.info("Creating Router: " + getClass().getName());
95          this.configuration = configuration;
96          this.protocolFactory = protocolFactory;
97      }
98  
99      public boolean enable(@Observes @Default EnableRouter event) throws RouterException {
100         return enable();
101     }
102 
103     public boolean disable(@Observes @Default DisableRouter event) throws RouterException {
104         return disable();
105     }
106 
107     public UpnpServiceConfiguration getConfiguration() {
108         return configuration;
109     }
110 
111     public ProtocolFactory getProtocolFactory() {
112         return protocolFactory;
113     }
114 
115     /**
116      * Initializes listening services: First an instance of {@link org.fourthline.cling.transport.spi.MulticastReceiver}
117      * is bound to each network interface. Then an instance of {@link org.fourthline.cling.transport.spi.DatagramIO} and
118      * {@link org.fourthline.cling.transport.spi.StreamServer} is bound to each bind address returned by the network
119      * address factory, respectively. There is only one instance of
120      * {@link org.fourthline.cling.transport.spi.StreamClient} created and managed by this router.
121      */
122     @Override
123     public boolean enable() throws RouterException {
124         lock(writeLock);
125         try {
126             if (!enabled) {
127                 try {
128                     log.fine("Starting networking services...");
129                     networkAddressFactory = getConfiguration().createNetworkAddressFactory();
130 
131                     startInterfaceBasedTransports(networkAddressFactory.getNetworkInterfaces());
132                     startAddressBasedTransports(networkAddressFactory.getBindAddresses());
133 
134                     // The transports possibly removed some unusable network interfaces/addresses
135                     if (!networkAddressFactory.hasUsableNetwork()) {
136                         throw new NoNetworkException(
137                             "No usable network interface and/or addresses available, check the log for errors."
138                         );
139                     }
140 
141                     // Start the HTTP client last, we don't even have to try if there is no network
142                     streamClient = getConfiguration().createStreamClient();
143 
144                     enabled = true;
145                     return true;
146                 } catch (InitializationException ex) {
147                     handleStartFailure(ex);
148                 }
149             }
150             return false;
151         } finally {
152             unlock(writeLock);
153         }
154     }
155 
156     @Override
157     public boolean disable() throws RouterException {
158         lock(writeLock);
159         try {
160             if (enabled) {
161                 log.fine("Disabling network services...");
162 
163                 if (streamClient != null) {
164                     log.fine("Stopping stream client connection management/pool");
165                     streamClient.stop();
166                     streamClient = null;
167                 }
168 
169                 for (Map.Entry<InetAddress, StreamServer> entry : streamServers.entrySet()) {
170                     log.fine("Stopping stream server on address: " + entry.getKey());
171                     entry.getValue().stop();
172                 }
173                 streamServers.clear();
174 
175                 for (Map.Entry<NetworkInterface, MulticastReceiver> entry : multicastReceivers.entrySet()) {
176                     log.fine("Stopping multicast receiver on interface: " + entry.getKey().getDisplayName());
177                     entry.getValue().stop();
178                 }
179                 multicastReceivers.clear();
180 
181                 for (Map.Entry<InetAddress, DatagramIO> entry : datagramIOs.entrySet()) {
182                     log.fine("Stopping datagram I/O on address: " + entry.getKey());
183                     entry.getValue().stop();
184                 }
185                 datagramIOs.clear();
186 
187                 networkAddressFactory = null;
188                 enabled = false;
189                 return true;
190             }
191             return false;
192         } finally {
193             unlock(writeLock);
194         }
195     }
196 
197     @Override
198     public void shutdown() throws RouterException {
199         disable();
200     }
201 
202     @Override
203     public boolean isEnabled() {
204         return enabled;
205     }
206 
207     @Override
208     public void handleStartFailure(InitializationException ex) throws InitializationException {
209         if (ex instanceof NoNetworkException) {
210             log.info("Unable to initialize network router, no network found.");
211         } else {
212             log.severe("Unable to initialize network router: " + ex);
213             log.severe("Cause: " + Exceptions.unwrap(ex));
214         }
215     }
216 
217     public List<NetworkAddress> getActiveStreamServers(InetAddress preferredAddress) throws RouterException {
218         lock(readLock);
219         try {
220             if (enabled && streamServers.size() > 0) {
221                 List<NetworkAddress> streamServerAddresses = new ArrayList<>();
222 
223                 StreamServer preferredServer;
224                 if (preferredAddress != null &&
225                     (preferredServer = streamServers.get(preferredAddress)) != null) {
226                     streamServerAddresses.add(
227                         new NetworkAddress(
228                             preferredAddress,
229                             preferredServer.getPort(),
230                             networkAddressFactory.getHardwareAddress(preferredAddress)
231 
232                         )
233                     );
234                     return streamServerAddresses;
235                 }
236 
237                 for (Map.Entry<InetAddress, StreamServer> entry : streamServers.entrySet()) {
238                     byte[] hardwareAddress = networkAddressFactory.getHardwareAddress(entry.getKey());
239                     streamServerAddresses.add(
240                         new NetworkAddress(entry.getKey(), entry.getValue().getPort(), hardwareAddress)
241                     );
242                 }
243                 return streamServerAddresses;
244             } else {
245                 return Collections.EMPTY_LIST;
246             }
247         } finally {
248             unlock(readLock);
249         }
250     }
251 
252     /**
253      * Obtains the asynchronous protocol {@code Executor} and runs the protocol created
254      * by the {@link org.fourthline.cling.protocol.ProtocolFactory} for the given message.
255      * <p>
256      * If the factory doesn't create a protocol, the message is dropped immediately without
257      * creating another thread or consuming further resources. This means we can filter the
258      * datagrams in the protocol factory and e.g. completely disable discovery or only
259      * allow notification message from some known services we'd like to work with.
260      * </p>
261      *
262      * @param msg The received datagram message.
263      */
264     public void received(IncomingDatagramMessage msg) {
265         if (!enabled) {
266             log.fine("Router disabled, ignoring incoming message: " + msg);
267             return;
268         }
269         try {
270             ReceivingAsync protocol = getProtocolFactory().createReceivingAsync(msg);
271             if (protocol == null) {
272                 if (log.isLoggable(Level.FINEST))
273                     log.finest("No protocol, ignoring received message: " + msg);
274                 return;
275             }
276             if (log.isLoggable(Level.FINE))
277                 log.fine("Received asynchronous message: " + msg);
278             getConfiguration().getAsyncProtocolExecutor().execute(protocol);
279         } catch (ProtocolCreationException ex) {
280             log.warning("Handling received datagram failed - " + Exceptions.unwrap(ex).toString());
281         }
282     }
283 
284     /**
285      * Obtains the synchronous protocol {@code Executor} and runs the
286      * {@link org.fourthline.cling.transport.spi.UpnpStream} directly.
287      *
288      * @param stream The received {@link org.fourthline.cling.transport.spi.UpnpStream}.
289      */
290     public void received(UpnpStream stream) {
291         if (!enabled) {
292             log.fine("Router disabled, ignoring incoming: " + stream);
293             return;
294         }
295         log.fine("Received synchronous stream: " + stream);
296         getConfiguration().getSyncProtocolExecutorService().execute(stream);
297     }
298 
299     /**
300      * Sends the UDP datagram on all bound {@link org.fourthline.cling.transport.spi.DatagramIO}s.
301      *
302      * @param msg The UDP datagram message to send.
303      */
304     public void send(OutgoingDatagramMessage msg) throws RouterException {
305         lock(readLock);
306         try {
307             if (enabled) {
308                 for (DatagramIO datagramIO : datagramIOs.values()) {
309                     datagramIO.send(msg);
310                 }
311             } else {
312                 log.fine("Router disabled, not sending datagram: " + msg);
313             }
314         } finally {
315             unlock(readLock);
316         }
317     }
318 
319     /**
320      * Sends the TCP stream request with the {@link org.fourthline.cling.transport.spi.StreamClient}.
321      *
322      * @param msg The TCP (HTTP) stream message to send.
323      * @return The return value of the {@link org.fourthline.cling.transport.spi.StreamClient#sendRequest(StreamRequestMessage)}
324      *         method or <code>null</code> if no <code>StreamClient</code> is available.
325      */
326     public StreamResponseMessage send(StreamRequestMessage msg) throws RouterException {
327         lock(readLock);
328         try {
329             if (enabled) {
330                 if (streamClient == null) {
331                     log.fine("No StreamClient available, not sending: " + msg);
332                     return null;
333                 }
334                 log.fine("Sending via TCP unicast stream: " + msg);
335                 try {
336                     return streamClient.sendRequest(msg);
337                 } catch (InterruptedException ex) {
338                     throw new RouterException("Sending stream request was interrupted", ex);
339                 }
340             } else {
341                 log.fine("Router disabled, not sending stream request: " + msg);
342                 return null;
343             }
344         } finally {
345             unlock(readLock);
346         }
347     }
348 
349     /**
350      * Sends the given bytes as a broadcast on all bound {@link org.fourthline.cling.transport.spi.DatagramIO}s,
351      * using source port 9.
352      * <p>
353      * TODO: Support source port parameter
354      * </p>
355      *
356      * @param bytes The byte payload of the UDP datagram.
357      */
358     public void broadcast(byte[] bytes) throws RouterException {
359         lock(readLock);
360         try {
361             if (enabled) {
362                 for (Map.Entry<InetAddress, DatagramIO> entry : datagramIOs.entrySet()) {
363                     InetAddress broadcast = networkAddressFactory.getBroadcastAddress(entry.getKey());
364                     if (broadcast != null) {
365                         log.fine("Sending UDP datagram to broadcast address: " + broadcast.getHostAddress());
366                         DatagramPacket packet = new DatagramPacket(bytes, bytes.length, broadcast, 9);
367                         entry.getValue().send(packet);
368                     }
369                 }
370             } else {
371                 log.fine("Router disabled, not broadcasting bytes: " + bytes.length);
372             }
373         } finally {
374             unlock(readLock);
375         }
376     }
377 
378     protected void startInterfaceBasedTransports(Iterator<NetworkInterface> interfaces) throws InitializationException {
379         while (interfaces.hasNext()) {
380             NetworkInterface networkInterface = interfaces.next();
381 
382             // We only have the MulticastReceiver as an interface-based transport
383             MulticastReceiver multicastReceiver = getConfiguration().createMulticastReceiver(networkAddressFactory);
384             if (multicastReceiver == null) {
385                 log.info("Configuration did not create a MulticastReceiver for: " + networkInterface);
386             } else {
387                 try {
388                     if (log.isLoggable(Level.FINE))
389                         log.fine("Init multicast receiver on interface: " + networkInterface.getDisplayName());
390                     multicastReceiver.init(
391                         networkInterface,
392                         this,
393                         networkAddressFactory,
394                         getConfiguration().getDatagramProcessor()
395                     );
396 
397                     multicastReceivers.put(networkInterface, multicastReceiver);
398                 } catch (InitializationException ex) {
399                     /* TODO: What are some recoverable exceptions for this?
400                     log.warning(
401                         "Ignoring network interface '"
402                             + networkInterface.getDisplayName()
403                             + "' init failure of MulticastReceiver: " + ex.toString());
404                     if (log.isLoggable(Level.FINE))
405                         log.log(Level.FINE, "Initialization exception root cause", Exceptions.unwrap(ex));
406                     log.warning("Removing unusable interface " + interface);
407                     it.remove();
408                     continue; // Don't need to try anything else on this interface
409                     */
410                     throw ex;
411                 }
412             }
413         }
414 
415         for (Map.Entry<NetworkInterface, MulticastReceiver> entry : multicastReceivers.entrySet()) {
416             if (log.isLoggable(Level.FINE))
417                 log.fine("Starting multicast receiver on interface: " + entry.getKey().getDisplayName());
418             getConfiguration().getMulticastReceiverExecutor().execute(entry.getValue());
419         }
420     }
421 
422     protected void startAddressBasedTransports(Iterator<InetAddress> addresses) throws InitializationException {
423         while (addresses.hasNext()) {
424             InetAddress address = addresses.next();
425 
426             // HTTP servers
427             StreamServer streamServer = getConfiguration().createStreamServer(networkAddressFactory);
428             if (streamServer == null) {
429                 log.info("Configuration did not create a StreamServer for: " + address);
430             } else {
431                 try {
432                     if (log.isLoggable(Level.FINE))
433                         log.fine("Init stream server on address: " + address);
434                     streamServer.init(address, this);
435                     streamServers.put(address, streamServer);
436                 } catch (InitializationException ex) {
437                     // Try to recover
438                     Throwable cause = Exceptions.unwrap(ex);
439                     if (cause instanceof BindException) {
440                         log.warning("Failed to init StreamServer: " + cause);
441                         if (log.isLoggable(Level.FINE))
442                             log.log(Level.FINE, "Initialization exception root cause", cause);
443                         log.warning("Removing unusable address: " + address);
444                         addresses.remove();
445                         continue; // Don't try anything else with this address
446                     }
447                     throw ex;
448                 }
449             }
450 
451             // Datagram I/O
452             DatagramIO datagramIO = getConfiguration().createDatagramIO(networkAddressFactory);
453             if (datagramIO == null) {
454                 log.info("Configuration did not create a StreamServer for: " + address);
455             } else {
456                 try {
457                     if (log.isLoggable(Level.FINE))
458                         log.fine("Init datagram I/O on address: " + address);
459                     datagramIO.init(address, this, getConfiguration().getDatagramProcessor());
460                     datagramIOs.put(address, datagramIO);
461                 } catch (InitializationException ex) {
462                     /* TODO: What are some recoverable exceptions for this?
463                     Throwable cause = Exceptions.unwrap(ex);
464                     if (cause instanceof BindException) {
465                         log.warning("Failed to init datagram I/O: " + cause);
466                         if (log.isLoggable(Level.FINE))
467                             log.log(Level.FINE, "Initialization exception root cause", cause);
468                         log.warning("Removing unusable address: " + address);
469                         addresses.remove();
470                         continue; // Don't try anything else with this address
471                     }
472                     */
473                     throw ex;
474                 }
475             }
476         }
477 
478         for (Map.Entry<InetAddress, StreamServer> entry : streamServers.entrySet()) {
479             if (log.isLoggable(Level.FINE))
480                 log.fine("Starting stream server on address: " + entry.getKey());
481             getConfiguration().getStreamServerExecutorService().execute(entry.getValue());
482         }
483 
484         for (Map.Entry<InetAddress, DatagramIO> entry : datagramIOs.entrySet()) {
485             if (log.isLoggable(Level.FINE))
486                 log.fine("Starting datagram I/O on address: " + entry.getKey());
487             getConfiguration().getDatagramIOExecutor().execute(entry.getValue());
488         }
489     }
490 
491     protected void lock(Lock lock, int timeoutMilliseconds) throws RouterException {
492         try {
493             log.finest("Trying to obtain lock with timeout milliseconds '" + timeoutMilliseconds + "': " + lock.getClass().getSimpleName());
494             if (lock.tryLock(timeoutMilliseconds, TimeUnit.MILLISECONDS)) {
495                 log.finest("Acquired router lock: " + lock.getClass().getSimpleName());
496             } else {
497                 throw new RouterException(
498                     "Router wasn't available exclusively after waiting " + timeoutMilliseconds + "ms, lock failed: "
499                         + lock.getClass().getSimpleName()
500                 );
501             }
502         } catch (InterruptedException ex) {
503             throw new RouterException(
504                 "Interruption while waiting for exclusive access: " + lock.getClass().getSimpleName(), ex
505             );
506         }
507     }
508 
509     protected void lock(Lock lock) throws RouterException {
510         lock(lock, getLockTimeoutMillis());
511     }
512 
513     protected void unlock(Lock lock) {
514         log.finest("Releasing router lock: " + lock.getClass().getSimpleName());
515         lock.unlock();
516     }
517 
518     /**
519      * @return Defaults to 6 seconds, should be longer than it takes the router to be enabled/disabled.
520      */
521     protected int getLockTimeoutMillis() {
522         return 6000;
523     }
524 
525 }