1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.fourthline.cling.transport.impl;
17
18 import org.fourthline.cling.model.message.Connection;
19 import org.fourthline.cling.transport.Router;
20 import org.fourthline.cling.transport.spi.InitializationException;
21 import org.fourthline.cling.transport.spi.StreamServer;
22
23 import javax.servlet.AsyncContext;
24 import javax.servlet.AsyncEvent;
25 import javax.servlet.AsyncListener;
26 import javax.servlet.Servlet;
27 import javax.servlet.ServletException;
28 import javax.servlet.http.HttpServlet;
29 import javax.servlet.http.HttpServletRequest;
30 import javax.servlet.http.HttpServletResponse;
31
32 import java.io.IOException;
33 import java.net.InetAddress;
34 import java.net.UnknownHostException;
35 import java.util.logging.Level;
36 import java.util.logging.Logger;
37
38
39
40
41
42
43 public class AsyncServletStreamServerImpl implements StreamServer<AsyncServletStreamServerConfigurationImpl> {
44
45 final private static Logger log = Logger.getLogger(StreamServer.class.getName());
46
47 final protected AsyncServletStreamServerConfigurationImpl configuration;
48 protected int localPort;
49 protected String hostAddress;
50
51 public AsyncServletStreamServerImpl(AsyncServletStreamServerConfigurationImpl configuration) {
52 this.configuration = configuration;
53 }
54
55 public AsyncServletStreamServerConfigurationImpl getConfiguration() {
56 return configuration;
57 }
58
59 synchronized public void init(InetAddress bindAddress, final Router router) throws InitializationException {
60 try {
61 if (log.isLoggable(Level.FINE))
62 log.fine("Setting executor service on servlet container adapter");
63 getConfiguration().getServletContainerAdapter().setExecutorService(
64 router.getConfiguration().getStreamServerExecutorService()
65 );
66
67 if (log.isLoggable(Level.FINE))
68 log.fine("Adding connector: " + bindAddress + ":" + getConfiguration().getListenPort());
69 hostAddress = bindAddress.getHostAddress();
70 localPort = getConfiguration().getServletContainerAdapter().addConnector(
71 hostAddress,
72 getConfiguration().getListenPort()
73 );
74
75 String contextPath = router.getConfiguration().getNamespace().getBasePath().getPath();
76 getConfiguration().getServletContainerAdapter().registerServlet(contextPath, createServlet(router));
77
78 } catch (Exception ex) {
79 throw new InitializationException("Could not initialize " + getClass().getSimpleName() + ": " + ex.toString(), ex);
80 }
81 }
82
83 synchronized public int getPort() {
84 return this.localPort;
85 }
86
87 synchronized public void stop() {
88 getConfiguration().getServletContainerAdapter().removeConnector(hostAddress, localPort);
89 }
90
91 public void run() {
92 getConfiguration().getServletContainerAdapter().startIfNotRunning();
93 }
94
95 private int mCounter = 0;
96
97 protected Servlet createServlet(final Router router) {
98 return new HttpServlet() {
99 @Override
100 protected void service(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
101
102 final long startTime = System.currentTimeMillis();
103 final int counter = mCounter++;
104 if (log.isLoggable(Level.FINE))
105 log.fine(String.format("HttpServlet.service(): id: %3d, request URI: %s", counter, req.getRequestURI()));
106
107 AsyncContext async = req.startAsync();
108 async.setTimeout(getConfiguration().getAsyncTimeoutSeconds()*1000);
109
110 async.addListener(new AsyncListener() {
111
112 @Override
113 public void onTimeout(AsyncEvent arg0) throws IOException {
114 long duration = System.currentTimeMillis() - startTime;
115 if (log.isLoggable(Level.FINE))
116 log.fine(String.format("AsyncListener.onTimeout(): id: %3d, duration: %,4d, request: %s", counter, duration, arg0.getSuppliedRequest()));
117 }
118
119
120 @Override
121 public void onStartAsync(AsyncEvent arg0) throws IOException {
122 if (log.isLoggable(Level.FINE))
123 log.fine(String.format("AsyncListener.onStartAsync(): id: %3d, request: %s", counter, arg0.getSuppliedRequest()));
124 }
125
126
127 @Override
128 public void onError(AsyncEvent arg0) throws IOException {
129 long duration = System.currentTimeMillis() - startTime;
130 if (log.isLoggable(Level.FINE))
131 log.fine(String.format("AsyncListener.onError(): id: %3d, duration: %,4d, response: %s", counter, duration, arg0.getSuppliedResponse()));
132 }
133
134
135 @Override
136 public void onComplete(AsyncEvent arg0) throws IOException {
137 long duration = System.currentTimeMillis() - startTime;
138 if (log.isLoggable(Level.FINE))
139 log.fine(String.format("AsyncListener.onComplete(): id: %3d, duration: %,4d, response: %s", counter, duration, arg0.getSuppliedResponse()));
140 }
141
142 });
143
144 AsyncServletUpnpStream stream =
145 new AsyncServletUpnpStream(router.getProtocolFactory(), async, req) {
146 @Override
147 protected Connection createConnection() {
148 return new AsyncServletConnection(getRequest());
149 }
150 };
151
152 router.received(stream);
153 }
154 };
155 }
156
157
158
159
160
161
162
163
164 protected boolean isConnectionOpen(HttpServletRequest request) {
165 return true;
166 }
167
168 protected class AsyncServletConnection implements Connection {
169
170 protected HttpServletRequest request;
171
172 public AsyncServletConnection(HttpServletRequest request) {
173 this.request = request;
174 }
175
176 public HttpServletRequest getRequest() {
177 return request;
178 }
179
180 @Override
181 public boolean isOpen() {
182 return AsyncServletStreamServerImpl.this.isConnectionOpen(getRequest());
183 }
184
185 @Override
186 public InetAddress getRemoteAddress() {
187 try {
188 return InetAddress.getByName(getRequest().getRemoteAddr());
189 } catch (UnknownHostException ex) {
190 throw new RuntimeException(ex);
191 }
192 }
193
194 @Override
195 public InetAddress getLocalAddress() {
196 try {
197 return InetAddress.getByName(getRequest().getLocalAddr());
198 } catch (UnknownHostException ex) {
199 throw new RuntimeException(ex);
200 }
201 }
202 }
203 }