1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package org.fourthline.cling.transport.spi;
16
17 import org.fourthline.cling.model.message.StreamRequestMessage;
18 import org.fourthline.cling.model.message.StreamResponseMessage;
19 import org.seamless.util.Exceptions;
20
21 import java.util.concurrent.Callable;
22 import java.util.concurrent.ExecutionException;
23 import java.util.concurrent.Future;
24 import java.util.concurrent.TimeUnit;
25 import java.util.concurrent.TimeoutException;
26 import java.util.logging.Level;
27 import java.util.logging.Logger;
28
29
30
31
32
33
34 public abstract class AbstractStreamClient<C extends StreamClientConfiguration, REQUEST> implements StreamClient<C> {
35
36 final private static Logger log = Logger.getLogger(StreamClient.class.getName());
37
38 @Override
39 public StreamResponseMessage sendRequest(StreamRequestMessage requestMessage) throws InterruptedException {
40
41 if (log.isLoggable(Level.FINE))
42 log.fine("Preparing HTTP request: " + requestMessage);
43
44 REQUEST request = createRequest(requestMessage);
45 if (request == null)
46 return null;
47
48 Callable<StreamResponseMessage> callable = createCallable(requestMessage, request);
49
50
51 long start = System.currentTimeMillis();
52
53
54 Future<StreamResponseMessage> future =
55 getConfiguration().getRequestExecutorService().submit(callable);
56
57
58 try {
59 if (log.isLoggable(Level.FINE))
60 log.fine(
61 "Waiting " + getConfiguration().getTimeoutSeconds()
62 + " seconds for HTTP request to complete: " + requestMessage
63 );
64 StreamResponseMessage response =
65 future.get(getConfiguration().getTimeoutSeconds(), TimeUnit.SECONDS);
66
67
68 long elapsed = System.currentTimeMillis() - start;
69 if (log.isLoggable(Level.FINEST))
70 log.finest("Got HTTP response in " + elapsed + "ms: " + requestMessage);
71 if (getConfiguration().getLogWarningSeconds() > 0
72 && elapsed > getConfiguration().getLogWarningSeconds()*1000) {
73 log.warning("HTTP request took a long time (" + elapsed + "ms): " + requestMessage);
74 }
75
76 return response;
77
78 } catch (InterruptedException ex) {
79
80 if (log.isLoggable(Level.FINE))
81 log.fine("Interruption, aborting request: " + requestMessage);
82 abort(request);
83 throw new InterruptedException("HTTP request interrupted and aborted");
84
85 } catch (TimeoutException ex) {
86
87 log.info(
88 "Timeout of " + getConfiguration().getTimeoutSeconds()
89 + " seconds while waiting for HTTP request to complete, aborting: " + requestMessage
90 );
91 abort(request);
92 return null;
93
94 } catch (ExecutionException ex) {
95 Throwable cause = ex.getCause();
96 if (!logExecutionException(cause)) {
97 log.log(Level.WARNING, "HTTP request failed: " + requestMessage, Exceptions.unwrap(cause));
98 }
99 return null;
100 } finally {
101 onFinally(request);
102 }
103 }
104
105
106
107
108
109 abstract protected REQUEST createRequest(StreamRequestMessage requestMessage);
110
111
112
113
114 abstract protected Callable<StreamResponseMessage> createCallable(StreamRequestMessage requestMessage,
115 REQUEST request);
116
117
118
119
120 abstract protected void abort(REQUEST request);
121
122
123
124
125 abstract protected boolean logExecutionException(Throwable t);
126
127 protected void onFinally(REQUEST request) {
128
129 }
130
131 }