1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.fourthline.cling.test.gena;
17
18 import org.fourthline.cling.UpnpService;
19 import org.fourthline.cling.controlpoint.SubscriptionCallback;
20 import org.fourthline.cling.mock.MockRouter;
21 import org.fourthline.cling.mock.MockUpnpService;
22 import org.fourthline.cling.model.UnsupportedDataException;
23 import org.fourthline.cling.model.gena.CancelReason;
24 import org.fourthline.cling.model.gena.GENASubscription;
25 import org.fourthline.cling.model.gena.RemoteGENASubscription;
26 import org.fourthline.cling.model.message.StreamRequestMessage;
27 import org.fourthline.cling.model.message.StreamResponseMessage;
28 import org.fourthline.cling.model.message.UpnpRequest;
29 import org.fourthline.cling.model.message.UpnpResponse;
30 import org.fourthline.cling.model.message.gena.IncomingEventRequestMessage;
31 import org.fourthline.cling.model.message.gena.OutgoingEventRequestMessage;
32 import org.fourthline.cling.model.message.header.CallbackHeader;
33 import org.fourthline.cling.model.message.header.SubscriptionIdHeader;
34 import org.fourthline.cling.model.message.header.TimeoutHeader;
35 import org.fourthline.cling.model.message.header.UpnpHeader;
36 import org.fourthline.cling.model.meta.RemoteDevice;
37 import org.fourthline.cling.model.meta.RemoteService;
38 import org.fourthline.cling.model.state.StateVariableValue;
39 import org.fourthline.cling.model.types.UnsignedIntegerFourBytes;
40 import org.fourthline.cling.protocol.ProtocolCreationException;
41 import org.fourthline.cling.test.data.SampleData;
42 import org.fourthline.cling.transport.RouterException;
43 import org.seamless.util.URIUtil;
44 import org.testng.annotations.Test;
45
46 import java.net.URI;
47 import java.net.URL;
48 import java.util.ArrayList;
49 import java.util.List;
50 import java.util.concurrent.Semaphore;
51
52 import static org.testng.Assert.assertEquals;
53
54 public class OutgoingSubscriptionLifecycleTest {
55
56 @Test
57 public void subscriptionLifecycle() throws Exception {
58
59 MockUpnpService upnpService = new MockUpnpService() {
60 @Override
61 protected MockRouter createRouter() {
62 return new MockRouter(getConfiguration(), getProtocolFactory()) {
63 @Override
64 public StreamResponseMessage[] getStreamResponseMessages() {
65
66 return new StreamResponseMessage[]{
67 createSubscribeResponseMessage(),
68 createUnsubscribeResponseMessage()
69
70 };
71 }
72 };
73 }
74 };
75
76 final List<Boolean> testAssertions = new ArrayList<>();
77
78
79 RemoteDevice device = SampleData.createRemoteDevice();
80 upnpService.getRegistry().addDevice(device);
81
82 RemoteService service = SampleData.getFirstService(device);
83
84 SubscriptionCallback callback = new SubscriptionCallback(service) {
85
86 @Override
87 protected void failed(GENASubscription subscription,
88 UpnpResponse responseStatus,
89 Exception exception,
90 String defaultMsg) {
91 testAssertions.add(false);
92 }
93
94 @Override
95 public void established(GENASubscription subscription) {
96 testAssertions.add(true);
97 }
98
99 @Override
100 public void ended(GENASubscription subscription, CancelReason reason, UpnpResponse responseStatus) {
101 assert reason == null;
102 assertEquals(responseStatus.getStatusCode(), UpnpResponse.Status.OK.getStatusCode());
103 testAssertions.add(true);
104 }
105
106 public void eventReceived(GENASubscription subscription) {
107 assertEquals(subscription.getCurrentValues().get("Status").toString(), "0");
108 assertEquals(subscription.getCurrentValues().get("Target").toString(), "1");
109 testAssertions.add(true);
110 }
111
112 public void eventsMissed(GENASubscription subscription, int numberOfMissedEvents) {
113 testAssertions.add(false);
114 }
115
116 };
117
118 upnpService.getControlPoint().execute(callback);
119
120
121 for (Boolean testAssertion : testAssertions) {
122 assert testAssertion;
123 }
124
125
126 upnpService.getProtocolFactory().createReceivingSync(
127 createEventRequestMessage(upnpService, callback)
128 ).run();
129
130 assertEquals(callback.getSubscription().getCurrentSequence().getValue(), Long.valueOf(0));
131 assertEquals(callback.getSubscription().getSubscriptionId(), "uuid:1234");
132 assertEquals(callback.getSubscription().getActualDurationSeconds(), 180);
133
134 List<URL> callbackURLs = ((RemoteGENASubscription) callback.getSubscription())
135 .getEventCallbackURLs(upnpService.getRouter().getActiveStreamServers(null), upnpService.getConfiguration().getNamespace());
136
137 callback.end();
138
139 assert callback.getSubscription() == null;
140
141 assertEquals(testAssertions.size(), 3);
142 for (Boolean testAssertion : testAssertions) {
143 assert testAssertion;
144 }
145
146 List<StreamRequestMessage> sentMessages = upnpService.getRouter().getSentStreamRequestMessages();
147 assertEquals(sentMessages.size(), 2);
148 assertEquals(
149 (sentMessages.get(0).getOperation()).getMethod(),
150 UpnpRequest.Method.SUBSCRIBE
151 );
152 assertEquals(
153 sentMessages.get(0).getHeaders().getFirstHeader(UpnpHeader.Type.TIMEOUT, TimeoutHeader.class).getValue(),
154 Integer.valueOf(1800)
155 );
156
157 assertEquals(callbackURLs.size(), 1);
158 assertEquals(
159 sentMessages.get(0).getHeaders().getFirstHeader(UpnpHeader.Type.CALLBACK, CallbackHeader.class).getValue().get(0),
160 callbackURLs.get(0)
161 );
162
163 assertEquals(
164 (sentMessages.get(1).getOperation()).getMethod(),
165 UpnpRequest.Method.UNSUBSCRIBE
166 );
167 assertEquals(
168 sentMessages.get(1).getHeaders().getFirstHeader(UpnpHeader.Type.SID, SubscriptionIdHeader.class).getValue(),
169 "uuid:1234"
170 );
171 }
172
173 @Test
174 public void subscriptionLifecycleNotifyBeforeResponse() throws Exception {
175
176 final RemoteDevice device = SampleData.createRemoteDevice();
177 final RemoteService service = SampleData.getFirstService(device);
178
179 final StreamResponseMessage subscribeResponseMessage = createSubscribeResponseMessage();
180 final Semaphore subscribeResponseSemaphore = new Semaphore(0);
181
182 final MockUpnpService upnpService = new MockUpnpService() {
183 @Override
184 protected MockRouter createRouter() {
185 return new MockRouter(getConfiguration(), getProtocolFactory()) {
186 @Override
187 public StreamResponseMessage getStreamResponseMessage(StreamRequestMessage request) {
188 try {
189
190 subscribeResponseSemaphore.acquire();
191 } catch (InterruptedException e) {
192 throw new RuntimeException(e);
193 }
194 return subscribeResponseMessage;
195 }
196 };
197 }
198 };
199
200 final List<Boolean> testAssertions = new ArrayList<>();
201 final List<Boolean> notificationCalled = new ArrayList<>();
202
203
204 upnpService.getRegistry().addDevice(device);
205
206
207 final SubscriptionCallback callback = new SubscriptionCallback(service) {
208
209 @Override
210 protected void failed(GENASubscription subscription,
211 UpnpResponse responseStatus,
212 Exception exception,
213 String defaultMsg) {
214 testAssertions.add(false);
215 }
216
217 @Override
218 public void established(GENASubscription subscription) {
219 testAssertions.add(true);
220 }
221
222 @Override
223 public void ended(GENASubscription subscription, CancelReason reason, UpnpResponse responseStatus) {
224 }
225
226 public void eventReceived(GENASubscription subscription) {
227 assertEquals(subscription.getCurrentValues().get("Status").toString(), "0");
228 assertEquals(subscription.getCurrentValues().get("Target").toString(), "1");
229 testAssertions.add(true);
230 notificationCalled.add(true);
231 }
232
233 public void eventsMissed(GENASubscription subscription, int numberOfMissedEvents) {
234 testAssertions.add(false);
235 }
236
237 };
238
239
240 Thread subscribeThread = new Thread(new Runnable() {
241 @Override
242 public void run() {
243 upnpService.getControlPoint().execute(callback);
244 }
245 });
246 subscribeThread.start();
247
248
249
250 final GENASubscription subscription = new RemoteGENASubscription(service, 180) {
251 @Override
252 public void established() {
253 }
254 @Override
255 public void eventReceived() {
256 }
257 @Override
258 public void invalidMessage(UnsupportedDataException ex) {
259 }
260 @Override
261 public void failed(UpnpResponse responseStatus) {
262 }
263 @Override
264 public void ended(CancelReason reason, UpnpResponse responseStatus) {
265
266 }
267 @Override
268 public void eventsMissed(int numberOfMissedEvents) {
269 }
270 };
271 subscription.setSubscriptionId("uuid:1234");
272
273 Thread notifyThread = new Thread(new Runnable() {
274 @Override
275 public void run() {
276
277 try {
278 upnpService.getProtocolFactory().createReceivingSync(
279 createEventRequestMessage(upnpService, service, subscription)
280 ).run();
281 } catch (ProtocolCreationException e) {
282 testAssertions.add(false);
283 }
284
285 }
286 });
287 notifyThread.start();
288
289
290 Thread.sleep(1000);
291
292
293 assertEquals(0, notificationCalled.size());
294
295
296 subscribeResponseSemaphore.release();
297
298 subscribeThread.join();
299 notifyThread.join();
300
301
302 assertEquals(1, notificationCalled.size());
303
304 for (Boolean testAssertion : testAssertions) {
305 assert testAssertion;
306 }
307 }
308
309
310
311 protected StreamResponseMessage createSubscribeResponseMessage() {
312 StreamResponseMessage msg = new StreamResponseMessage(new UpnpResponse(UpnpResponse.Status.OK));
313 msg.getHeaders().add(
314 UpnpHeader.Type.SID, new SubscriptionIdHeader("uuid:1234")
315 );
316 msg.getHeaders().add(
317 UpnpHeader.Type.TIMEOUT, new TimeoutHeader(180)
318 );
319 return msg;
320 }
321
322 protected StreamResponseMessage createUnsubscribeResponseMessage() {
323 return new StreamResponseMessage(new UpnpResponse(UpnpResponse.Status.OK));
324 }
325
326 protected IncomingEventRequestMessage createEventRequestMessage(final UpnpService upnpService, final SubscriptionCallback callback) {
327
328 List<StateVariableValue> values = new ArrayList<>();
329 values.add(
330 new StateVariableValue(callback.getService().getStateVariable("Status"), false)
331 );
332 values.add(
333 new StateVariableValue(callback.getService().getStateVariable("Target"), true)
334 );
335
336 OutgoingEventRequestMessage outgoing = new OutgoingEventRequestMessage(
337 callback.getSubscription(),
338 URIUtil.toURL(URI.create("http://10.0.0.123/this/is/ignored/anyway")),
339 new UnsignedIntegerFourBytes(0),
340 values
341 );
342 outgoing.getOperation().setUri(
343 upnpService.getConfiguration().getNamespace().getEventCallbackPath(callback.getService())
344 );
345
346 upnpService.getConfiguration().getGenaEventProcessor().writeBody(outgoing);
347
348 return new IncomingEventRequestMessage(outgoing, ((RemoteGENASubscription)callback.getSubscription()).getService());
349 }
350
351
352 protected IncomingEventRequestMessage createEventRequestMessage(final UpnpService upnpService, final RemoteService service, final GENASubscription subscription) {
353
354 List<StateVariableValue> values = new ArrayList<>();
355 values.add(
356 new StateVariableValue(service.getStateVariable("Status"), false)
357 );
358 values.add(
359 new StateVariableValue(service.getStateVariable("Target"), true)
360 );
361
362 OutgoingEventRequestMessage outgoing = new OutgoingEventRequestMessage(
363 subscription,
364 URIUtil.toURL(URI.create("http://10.0.0.123/this/is/ignored/anyway")),
365 new UnsignedIntegerFourBytes(0),
366 values
367 );
368 outgoing.getOperation().setUri(
369 upnpService.getConfiguration().getNamespace().getEventCallbackPath(service)
370 );
371
372 upnpService.getConfiguration().getGenaEventProcessor().writeBody(outgoing);
373
374 return new IncomingEventRequestMessage(outgoing, service);
375 }
376
377 }