1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.fourthline.cling.transport.impl.jetty;
17
18 import org.eclipse.jetty.client.ContentExchange;
19 import org.eclipse.jetty.client.HttpClient;
20 import org.eclipse.jetty.client.HttpExchange;
21 import org.eclipse.jetty.http.HttpFields;
22 import org.eclipse.jetty.http.HttpHeaders;
23 import org.eclipse.jetty.io.ByteArrayBuffer;
24 import org.eclipse.jetty.util.thread.ExecutorThreadPool;
25 import org.fourthline.cling.model.message.StreamRequestMessage;
26 import org.fourthline.cling.model.message.StreamResponseMessage;
27 import org.fourthline.cling.model.message.UpnpHeaders;
28 import org.fourthline.cling.model.message.UpnpMessage;
29 import org.fourthline.cling.model.message.UpnpRequest;
30 import org.fourthline.cling.model.message.UpnpResponse;
31 import org.fourthline.cling.model.message.header.ContentTypeHeader;
32 import org.fourthline.cling.model.message.header.UpnpHeader;
33 import org.fourthline.cling.transport.spi.AbstractStreamClient;
34 import org.fourthline.cling.transport.spi.InitializationException;
35 import org.fourthline.cling.transport.spi.StreamClient;
36 import org.seamless.util.Exceptions;
37 import org.seamless.util.MimeType;
38
39 import java.io.UnsupportedEncodingException;
40 import java.util.List;
41 import java.util.Map;
42 import java.util.concurrent.Callable;
43 import java.util.logging.Level;
44 import java.util.logging.Logger;
45
46
47
48
49
50
51
52
53
54
55 public class StreamClientImpl extends AbstractStreamClient<StreamClientConfigurationImpl, StreamClientImpl.HttpContentExchange> {
56
57 final private static Logger log = Logger.getLogger(StreamClient.class.getName());
58
59 final protected StreamClientConfigurationImpl configuration;
60 final protected HttpClient client;
61
62 public StreamClientImpl(StreamClientConfigurationImpl configuration) throws InitializationException {
63 this.configuration = configuration;
64
65 log.info("Starting Jetty HttpClient...");
66 client = new HttpClient();
67
68
69
70 client.setThreadPool(
71 new ExecutorThreadPool(getConfiguration().getRequestExecutorService()) {
72 @Override
73 protected void doStop() throws Exception {
74
75 }
76 }
77 );
78
79
80
81 client.setTimeout((configuration.getTimeoutSeconds()+5) * 1000);
82 client.setConnectTimeout((configuration.getTimeoutSeconds()+5) * 1000);
83
84 client.setMaxRetries(configuration.getRequestRetryCount());
85
86 try {
87 client.start();
88 } catch (Exception ex) {
89 throw new InitializationException(
90 "Could not start Jetty HTTP client: " + ex, ex
91 );
92 }
93 }
94
95 @Override
96 public StreamClientConfigurationImpl getConfiguration() {
97 return configuration;
98 }
99
100 @Override
101 protected HttpContentExchange createRequest(StreamRequestMessage requestMessage) {
102 return new HttpContentExchange(getConfiguration(), client, requestMessage);
103 }
104
105 @Override
106 protected Callable<StreamResponseMessage> createCallable(final StreamRequestMessage requestMessage,
107 final HttpContentExchange exchange) {
108 return new Callable<StreamResponseMessage>() {
109 public StreamResponseMessage call() throws Exception {
110
111 if (log.isLoggable(Level.FINE))
112 log.fine("Sending HTTP request: " + requestMessage);
113
114 client.send(exchange);
115 int exchangeState = exchange.waitForDone();
116
117 if (exchangeState == HttpExchange.STATUS_COMPLETED) {
118 try {
119 return exchange.createResponse();
120 } catch (Throwable t) {
121 log.log(Level.WARNING, "Error reading response: " + requestMessage, Exceptions.unwrap(t));
122 return null;
123 }
124 } else if (exchangeState == HttpExchange.STATUS_CANCELLED) {
125
126 return null;
127 } else if (exchangeState == HttpExchange.STATUS_EXCEPTED) {
128
129 return null;
130 } else {
131 log.warning("Unhandled HTTP exchange status: " + exchangeState);
132 return null;
133 }
134 }
135 };
136 }
137
138 @Override
139 protected void abort(HttpContentExchange exchange) {
140 exchange.cancel();
141 }
142
143 @Override
144 protected boolean logExecutionException(Throwable t) {
145 return false;
146 }
147
148 @Override
149 public void stop() {
150 try {
151 client.stop();
152 } catch (Exception ex) {
153 log.info("Error stopping HTTP client: " + ex);
154 }
155 }
156
157 static public class HttpContentExchange extends ContentExchange {
158
159 final protected StreamClientConfigurationImpl configuration;
160 final protected HttpClient client;
161 final protected StreamRequestMessage requestMessage;
162
163 protected Throwable exception;
164
165 public HttpContentExchange(StreamClientConfigurationImpl configuration,
166 HttpClient client,
167 StreamRequestMessage requestMessage) {
168 super(true);
169 this.configuration = configuration;
170 this.client = client;
171 this.requestMessage = requestMessage;
172 applyRequestURLMethod();
173 applyRequestHeaders();
174 applyRequestBody();
175 }
176
177 @Override
178 protected void onConnectionFailed(Throwable t) {
179 log.log(Level.WARNING, "HTTP connection failed: " + requestMessage, Exceptions.unwrap(t));
180 }
181
182 @Override
183 protected void onException(Throwable t) {
184 log.log(Level.WARNING, "HTTP request failed: " + requestMessage, Exceptions.unwrap(t));
185 }
186
187 public StreamClientConfigurationImpl getConfiguration() {
188 return configuration;
189 }
190
191 public StreamRequestMessage getRequestMessage() {
192 return requestMessage;
193 }
194
195 protected void applyRequestURLMethod() {
196 final UpnpRequest requestOperation = getRequestMessage().getOperation();
197 if (log.isLoggable(Level.FINE))
198 log.fine(
199 "Preparing HTTP request message with method '"
200 + requestOperation.getHttpMethodName()
201 + "': " + getRequestMessage()
202 );
203
204 setURL(requestOperation.getURI().toString());
205 setMethod(requestOperation.getHttpMethodName());
206 }
207
208 protected void applyRequestHeaders() {
209
210 UpnpHeaders headers = getRequestMessage().getHeaders();
211 if (log.isLoggable(Level.FINE))
212 log.fine("Writing headers on HttpContentExchange: " + headers.size());
213
214
215
216 if (!headers.containsKey(UpnpHeader.Type.USER_AGENT)) {
217 setRequestHeader(
218 UpnpHeader.Type.USER_AGENT.getHttpName(),
219 getConfiguration().getUserAgentValue(
220 getRequestMessage().getUdaMajorVersion(),
221 getRequestMessage().getUdaMinorVersion())
222 );
223 }
224 for (Map.Entry<String, List<String>> entry : headers.entrySet()) {
225 for (String v : entry.getValue()) {
226 String headerName = entry.getKey();
227 if (log.isLoggable(Level.FINE))
228 log.fine("Setting header '" + headerName + "': " + v);
229 addRequestHeader(headerName, v);
230 }
231 }
232 }
233
234 protected void applyRequestBody() {
235
236 if (getRequestMessage().hasBody()) {
237 if (getRequestMessage().getBodyType() == UpnpMessage.BodyType.STRING) {
238 if (log.isLoggable(Level.FINE))
239 log.fine("Writing textual request body: " + getRequestMessage());
240
241 MimeType contentType =
242 getRequestMessage().getContentTypeHeader() != null
243 ? getRequestMessage().getContentTypeHeader().getValue()
244 : ContentTypeHeader.DEFAULT_CONTENT_TYPE_UTF8;
245
246 String charset =
247 getRequestMessage().getContentTypeCharset() != null
248 ? getRequestMessage().getContentTypeCharset()
249 : "UTF-8";
250
251 setRequestContentType(contentType.toString());
252 ByteArrayBuffer buffer;
253 try {
254 buffer = new ByteArrayBuffer(getRequestMessage().getBodyString(), charset);
255 } catch (UnsupportedEncodingException ex) {
256 throw new RuntimeException("Unsupported character encoding: " + charset, ex);
257 }
258 setRequestHeader(HttpHeaders.CONTENT_LENGTH, String.valueOf(buffer.length()));
259 setRequestContent(buffer);
260
261 } else {
262 if (log.isLoggable(Level.FINE))
263 log.fine("Writing binary request body: " + getRequestMessage());
264
265 if (getRequestMessage().getContentTypeHeader() == null)
266 throw new RuntimeException(
267 "Missing content type header in request message: " + requestMessage
268 );
269 MimeType contentType = getRequestMessage().getContentTypeHeader().getValue();
270
271 setRequestContentType(contentType.toString());
272 ByteArrayBuffer buffer;
273 buffer = new ByteArrayBuffer(getRequestMessage().getBodyBytes());
274 setRequestHeader(HttpHeaders.CONTENT_LENGTH, String.valueOf(buffer.length()));
275 setRequestContent(buffer);
276 }
277 }
278 }
279
280 protected StreamResponseMessage createResponse() {
281
282 UpnpResponse responseOperation =
283 new UpnpResponse(
284 getResponseStatus(),
285 UpnpResponse.Status.getByStatusCode(getResponseStatus()).getStatusMsg()
286 );
287
288 if (log.isLoggable(Level.FINE))
289 log.fine("Received response: " + responseOperation);
290
291 StreamResponseMessage responseMessage = new StreamResponseMessage(responseOperation);
292
293
294 UpnpHeaders headers = new UpnpHeaders();
295 HttpFields responseFields = getResponseFields();
296 for (String name : responseFields.getFieldNamesCollection()) {
297 for (String value : responseFields.getValuesCollection(name)) {
298 headers.add(name, value);
299 }
300 }
301 responseMessage.setHeaders(headers);
302
303
304 byte[] bytes = getResponseContentBytes();
305 if (bytes != null && bytes.length > 0 && responseMessage.isContentTypeMissingOrText()) {
306
307 if (log.isLoggable(Level.FINE))
308 log.fine("Response contains textual entity body, converting then setting string on message");
309 try {
310 responseMessage.setBodyCharacters(bytes);
311 } catch (UnsupportedEncodingException ex) {
312 throw new RuntimeException("Unsupported character encoding: " + ex, ex);
313 }
314
315 } else if (bytes != null && bytes.length > 0) {
316
317 if (log.isLoggable(Level.FINE))
318 log.fine("Response contains binary entity body, setting bytes on message");
319 responseMessage.setBody(UpnpMessage.BodyType.BYTES, bytes);
320
321 } else {
322 if (log.isLoggable(Level.FINE))
323 log.fine("Response did not contain entity body");
324 }
325
326 if (log.isLoggable(Level.FINE))
327 log.fine("Response message complete: " + responseMessage);
328 return responseMessage;
329 }
330 }
331 }
332
333