View Javadoc
1   /*
2    * Copyright (C) 2013 4th Line GmbH, Switzerland
3    *
4    * The contents of this file are subject to the terms of either the GNU
5    * Lesser General Public License Version 2 or later ("LGPL") or the
6    * Common Development and Distribution License Version 1 or later
7    * ("CDDL") (collectively, the "License"). You may not use this file
8    * except in compliance with the License. See LICENSE.txt for more
9    * information.
10   *
11   * This program is distributed in the hope that it will be useful,
12   * but WITHOUT ANY WARRANTY; without even the implied warranty of
13   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
14   */
15  
16  package org.fourthline.cling.transport.impl.jetty;
17  
18  import org.eclipse.jetty.client.ContentExchange;
19  import org.eclipse.jetty.client.HttpClient;
20  import org.eclipse.jetty.client.HttpExchange;
21  import org.eclipse.jetty.http.HttpFields;
22  import org.eclipse.jetty.http.HttpHeaders;
23  import org.eclipse.jetty.io.ByteArrayBuffer;
24  import org.eclipse.jetty.util.thread.ExecutorThreadPool;
25  import org.fourthline.cling.model.message.StreamRequestMessage;
26  import org.fourthline.cling.model.message.StreamResponseMessage;
27  import org.fourthline.cling.model.message.UpnpHeaders;
28  import org.fourthline.cling.model.message.UpnpMessage;
29  import org.fourthline.cling.model.message.UpnpRequest;
30  import org.fourthline.cling.model.message.UpnpResponse;
31  import org.fourthline.cling.model.message.header.ContentTypeHeader;
32  import org.fourthline.cling.model.message.header.UpnpHeader;
33  import org.fourthline.cling.transport.spi.AbstractStreamClient;
34  import org.fourthline.cling.transport.spi.InitializationException;
35  import org.fourthline.cling.transport.spi.StreamClient;
36  import org.seamless.util.Exceptions;
37  import org.seamless.util.MimeType;
38  
39  import java.io.UnsupportedEncodingException;
40  import java.util.List;
41  import java.util.Map;
42  import java.util.concurrent.Callable;
43  import java.util.logging.Level;
44  import java.util.logging.Logger;
45  
46  /**
47   * Implementation based on Jetty 8 client API.
48   * <p>
49   * This implementation works on Android, dependencies are the <code>jetty-client</code>
50   * Maven module.
51   * </p>
52   *
53   * @author Christian Bauer
54   */
55  public class StreamClientImpl extends AbstractStreamClient<StreamClientConfigurationImpl, StreamClientImpl.HttpContentExchange> {
56  
57      final private static Logger log = Logger.getLogger(StreamClient.class.getName());
58  
59      final protected StreamClientConfigurationImpl configuration;
60      final protected HttpClient client;
61  
62      public StreamClientImpl(StreamClientConfigurationImpl configuration) throws InitializationException {
63          this.configuration = configuration;
64  
65          log.info("Starting Jetty HttpClient...");
66          client = new HttpClient();
67  
68          // Jetty client needs threads for its internal expiration routines, which we don't need but
69          // can't disable, so let's abuse the request executor service for this
70          client.setThreadPool(
71              new ExecutorThreadPool(getConfiguration().getRequestExecutorService()) {
72                  @Override
73                  protected void doStop() throws Exception {
74                      // Do nothing, don't shut down the Cling ExecutorService when Jetty stops!
75                  }
76              }
77          );
78  
79          // These are some safety settings, we should never run into these timeouts as we
80          // do our own expiration checking
81          client.setTimeout((configuration.getTimeoutSeconds()+5) * 1000);
82          client.setConnectTimeout((configuration.getTimeoutSeconds()+5) * 1000);
83  
84          client.setMaxRetries(configuration.getRequestRetryCount());
85  
86          try {
87              client.start();
88          } catch (Exception ex) {
89              throw new InitializationException(
90                  "Could not start Jetty HTTP client: " + ex, ex
91              );
92          }
93      }
94  
95      @Override
96      public StreamClientConfigurationImpl getConfiguration() {
97          return configuration;
98      }
99  
100     @Override
101     protected HttpContentExchange createRequest(StreamRequestMessage requestMessage) {
102         return new HttpContentExchange(getConfiguration(), client, requestMessage);
103     }
104 
105     @Override
106     protected Callable<StreamResponseMessage> createCallable(final StreamRequestMessage requestMessage,
107                                                              final HttpContentExchange exchange) {
108         return new Callable<StreamResponseMessage>() {
109             public StreamResponseMessage call() throws Exception {
110 
111                 if (log.isLoggable(Level.FINE))
112                     log.fine("Sending HTTP request: " + requestMessage);
113 
114                 client.send(exchange);
115                 int exchangeState = exchange.waitForDone();
116 
117                 if (exchangeState == HttpExchange.STATUS_COMPLETED) {
118                     try {
119                         return exchange.createResponse();
120                     } catch (Throwable t) {
121                         log.log(Level.WARNING, "Error reading response: " + requestMessage, Exceptions.unwrap(t));
122                         return null;
123                     }
124                 } else if (exchangeState == HttpExchange.STATUS_CANCELLED) {
125                     // That's ok, happens when we abort the exchange after timeout
126                     return null;
127                 } else if (exchangeState == HttpExchange.STATUS_EXCEPTED) {
128                     // The warnings of the "excepted" condition are logged in HttpContentExchange
129                     return null;
130                 } else {
131                     log.warning("Unhandled HTTP exchange status: " + exchangeState);
132                     return null;
133                 }
134             }
135         };
136     }
137 
138     @Override
139     protected void abort(HttpContentExchange exchange) {
140         exchange.cancel();
141     }
142 
143     @Override
144     protected boolean logExecutionException(Throwable t) {
145         return false;
146     }
147 
148     @Override
149     public void stop() {
150         try {
151             client.stop();
152         } catch (Exception ex) {
153             log.info("Error stopping HTTP client: " + ex);
154         }
155     }
156 
157     static public class HttpContentExchange extends ContentExchange {
158 
159         final protected StreamClientConfigurationImpl configuration;
160         final protected HttpClient client;
161         final protected StreamRequestMessage requestMessage;
162 
163         protected Throwable exception;
164 
165         public HttpContentExchange(StreamClientConfigurationImpl configuration,
166                                    HttpClient client,
167                                    StreamRequestMessage requestMessage) {
168             super(true);
169             this.configuration = configuration;
170             this.client = client;
171             this.requestMessage = requestMessage;
172             applyRequestURLMethod();
173             applyRequestHeaders();
174             applyRequestBody();
175         }
176 
177         @Override
178         protected void onConnectionFailed(Throwable t) {
179             log.log(Level.WARNING, "HTTP connection failed: " + requestMessage, Exceptions.unwrap(t));
180         }
181 
182         @Override
183         protected void onException(Throwable t) {
184             log.log(Level.WARNING, "HTTP request failed: " + requestMessage, Exceptions.unwrap(t));
185         }
186 
187         public StreamClientConfigurationImpl getConfiguration() {
188             return configuration;
189         }
190 
191         public StreamRequestMessage getRequestMessage() {
192             return requestMessage;
193         }
194 
195         protected void applyRequestURLMethod() {
196             final UpnpRequest requestOperation = getRequestMessage().getOperation();
197             if (log.isLoggable(Level.FINE))
198                 log.fine(
199                     "Preparing HTTP request message with method '"
200                         + requestOperation.getHttpMethodName()
201                         + "': " + getRequestMessage()
202                 );
203 
204             setURL(requestOperation.getURI().toString());
205             setMethod(requestOperation.getHttpMethodName());
206         }
207 
208         protected void applyRequestHeaders() {
209             // Headers
210             UpnpHeaders headers = getRequestMessage().getHeaders();
211             if (log.isLoggable(Level.FINE))
212                 log.fine("Writing headers on HttpContentExchange: " + headers.size());
213             // TODO Always add the Host header
214             // TODO: ? setRequestHeader(UpnpHeader.Type.HOST.getHttpName(), );
215             // Add the default user agent if not already set on the message
216             if (!headers.containsKey(UpnpHeader.Type.USER_AGENT)) {
217                 setRequestHeader(
218                     UpnpHeader.Type.USER_AGENT.getHttpName(),
219                     getConfiguration().getUserAgentValue(
220                         getRequestMessage().getUdaMajorVersion(),
221                         getRequestMessage().getUdaMinorVersion())
222                 );
223             }
224             for (Map.Entry<String, List<String>> entry : headers.entrySet()) {
225                 for (String v : entry.getValue()) {
226                     String headerName = entry.getKey();
227                     if (log.isLoggable(Level.FINE))
228                         log.fine("Setting header '" + headerName + "': " + v);
229                     addRequestHeader(headerName, v);
230                 }
231             }
232         }
233 
234         protected void applyRequestBody() {
235             // Body
236             if (getRequestMessage().hasBody()) {
237                 if (getRequestMessage().getBodyType() == UpnpMessage.BodyType.STRING) {
238                     if (log.isLoggable(Level.FINE))
239                         log.fine("Writing textual request body: " + getRequestMessage());
240 
241                     MimeType contentType =
242                         getRequestMessage().getContentTypeHeader() != null
243                             ? getRequestMessage().getContentTypeHeader().getValue()
244                             : ContentTypeHeader.DEFAULT_CONTENT_TYPE_UTF8;
245 
246                     String charset =
247                         getRequestMessage().getContentTypeCharset() != null
248                             ? getRequestMessage().getContentTypeCharset()
249                             : "UTF-8";
250 
251                     setRequestContentType(contentType.toString());
252                     ByteArrayBuffer buffer;
253                     try {
254                         buffer = new ByteArrayBuffer(getRequestMessage().getBodyString(), charset);
255                     } catch (UnsupportedEncodingException ex) {
256                         throw new RuntimeException("Unsupported character encoding: " + charset, ex);
257                     }
258                     setRequestHeader(HttpHeaders.CONTENT_LENGTH, String.valueOf(buffer.length()));
259                     setRequestContent(buffer);
260 
261                 } else {
262                     if (log.isLoggable(Level.FINE))
263                         log.fine("Writing binary request body: " + getRequestMessage());
264 
265                     if (getRequestMessage().getContentTypeHeader() == null)
266                         throw new RuntimeException(
267                             "Missing content type header in request message: " + requestMessage
268                         );
269                     MimeType contentType = getRequestMessage().getContentTypeHeader().getValue();
270 
271                     setRequestContentType(contentType.toString());
272                     ByteArrayBuffer buffer;
273                     buffer = new ByteArrayBuffer(getRequestMessage().getBodyBytes());
274                     setRequestHeader(HttpHeaders.CONTENT_LENGTH, String.valueOf(buffer.length()));
275                     setRequestContent(buffer);
276                 }
277             }
278         }
279 
280         protected StreamResponseMessage createResponse() {
281             // Status
282             UpnpResponse responseOperation =
283                 new UpnpResponse(
284                     getResponseStatus(),
285                     UpnpResponse.Status.getByStatusCode(getResponseStatus()).getStatusMsg()
286                 );
287 
288             if (log.isLoggable(Level.FINE))
289                 log.fine("Received response: " + responseOperation);
290 
291             StreamResponseMessage responseMessage = new StreamResponseMessage(responseOperation);
292 
293             // Headers
294             UpnpHeaders headers = new UpnpHeaders();
295             HttpFields responseFields = getResponseFields();
296             for (String name : responseFields.getFieldNamesCollection()) {
297                 for (String value : responseFields.getValuesCollection(name)) {
298                     headers.add(name, value);
299                 }
300             }
301             responseMessage.setHeaders(headers);
302 
303             // Body
304             byte[] bytes = getResponseContentBytes();
305             if (bytes != null && bytes.length > 0 && responseMessage.isContentTypeMissingOrText()) {
306 
307                 if (log.isLoggable(Level.FINE))
308                     log.fine("Response contains textual entity body, converting then setting string on message");
309                 try {
310                     responseMessage.setBodyCharacters(bytes);
311                 } catch (UnsupportedEncodingException ex) {
312                     throw new RuntimeException("Unsupported character encoding: " + ex, ex);
313                 }
314 
315             } else if (bytes != null && bytes.length > 0) {
316 
317                 if (log.isLoggable(Level.FINE))
318                     log.fine("Response contains binary entity body, setting bytes on message");
319                 responseMessage.setBody(UpnpMessage.BodyType.BYTES, bytes);
320 
321             } else {
322                 if (log.isLoggable(Level.FINE))
323                     log.fine("Response did not contain entity body");
324             }
325 
326             if (log.isLoggable(Level.FINE))
327                 log.fine("Response message complete: " + responseMessage);
328             return responseMessage;
329         }
330     }
331 }
332 
333