View Javadoc
1   /*
2    * Copyright (C) 2013 4th Line GmbH, Switzerland
3    *
4    * The contents of this file are subject to the terms of either the GNU
5    * Lesser General Public License Version 2 or later ("LGPL") or the
6    * Common Development and Distribution License Version 1 or later
7    * ("CDDL") (collectively, the "License"). You may not use this file
8    * except in compliance with the License. See LICENSE.txt for more
9    * information.
10   *
11   * This program is distributed in the hope that it will be useful,
12   * but WITHOUT ANY WARRANTY; without even the implied warranty of
13   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
14   */
15  
16  package org.fourthline.cling.test.gena;
17  
18  import org.fourthline.cling.UpnpService;
19  import org.fourthline.cling.controlpoint.SubscriptionCallback;
20  import org.fourthline.cling.mock.MockRouter;
21  import org.fourthline.cling.mock.MockUpnpService;
22  import org.fourthline.cling.model.UnsupportedDataException;
23  import org.fourthline.cling.model.gena.CancelReason;
24  import org.fourthline.cling.model.gena.GENASubscription;
25  import org.fourthline.cling.model.gena.RemoteGENASubscription;
26  import org.fourthline.cling.model.message.StreamRequestMessage;
27  import org.fourthline.cling.model.message.StreamResponseMessage;
28  import org.fourthline.cling.model.message.UpnpRequest;
29  import org.fourthline.cling.model.message.UpnpResponse;
30  import org.fourthline.cling.model.message.gena.IncomingEventRequestMessage;
31  import org.fourthline.cling.model.message.gena.OutgoingEventRequestMessage;
32  import org.fourthline.cling.model.message.header.CallbackHeader;
33  import org.fourthline.cling.model.message.header.SubscriptionIdHeader;
34  import org.fourthline.cling.model.message.header.TimeoutHeader;
35  import org.fourthline.cling.model.message.header.UpnpHeader;
36  import org.fourthline.cling.model.meta.RemoteDevice;
37  import org.fourthline.cling.model.meta.RemoteService;
38  import org.fourthline.cling.model.state.StateVariableValue;
39  import org.fourthline.cling.model.types.UnsignedIntegerFourBytes;
40  import org.fourthline.cling.protocol.ProtocolCreationException;
41  import org.fourthline.cling.test.data.SampleData;
42  import org.fourthline.cling.transport.RouterException;
43  import org.seamless.util.URIUtil;
44  import org.testng.annotations.Test;
45  
46  import java.net.URI;
47  import java.net.URL;
48  import java.util.ArrayList;
49  import java.util.List;
50  import java.util.concurrent.Semaphore;
51  
52  import static org.testng.Assert.assertEquals;
53  
54  public class OutgoingSubscriptionLifecycleTest {
55  
56      @Test
57      public void subscriptionLifecycle() throws Exception {
58  
59          MockUpnpService upnpService = new MockUpnpService() {
60              @Override
61              protected MockRouter createRouter() {
62                  return new MockRouter(getConfiguration(), getProtocolFactory()) {
63                  @Override
64                  public StreamResponseMessage[] getStreamResponseMessages() {
65  
66                      return new StreamResponseMessage[]{
67                              createSubscribeResponseMessage(),
68                              createUnsubscribeResponseMessage()
69  
70                      };
71                  }
72                  };
73              }
74          };
75  
76          final List<Boolean> testAssertions = new ArrayList<>();
77  
78          // Register remote device and its service
79          RemoteDevice device = SampleData.createRemoteDevice();
80          upnpService.getRegistry().addDevice(device);
81  
82          RemoteService service = SampleData.getFirstService(device);
83  
84          SubscriptionCallback callback = new SubscriptionCallback(service) {
85  
86              @Override
87              protected void failed(GENASubscription subscription,
88                                    UpnpResponse responseStatus,
89                                    Exception exception,
90                                    String defaultMsg) {
91                  testAssertions.add(false);
92              }
93  
94              @Override
95              public void established(GENASubscription subscription) {
96                  testAssertions.add(true);
97              }
98  
99              @Override
100             public void ended(GENASubscription subscription, CancelReason reason, UpnpResponse responseStatus) {
101                 assert reason == null;
102                 assertEquals(responseStatus.getStatusCode(), UpnpResponse.Status.OK.getStatusCode());
103                 testAssertions.add(true);
104             }
105 
106             public void eventReceived(GENASubscription subscription) {
107                 assertEquals(subscription.getCurrentValues().get("Status").toString(), "0");
108                 assertEquals(subscription.getCurrentValues().get("Target").toString(), "1");
109                 testAssertions.add(true);
110             }
111 
112             public void eventsMissed(GENASubscription subscription, int numberOfMissedEvents) {
113                 testAssertions.add(false);
114             }
115 
116         };
117 
118         upnpService.getControlPoint().execute(callback);
119 
120         // Subscription process OK?
121         for (Boolean testAssertion : testAssertions) {
122             assert testAssertion;
123         }
124 
125         // Simulate received event
126         upnpService.getProtocolFactory().createReceivingSync(
127                 createEventRequestMessage(upnpService, callback)
128         ).run();
129 
130         assertEquals(callback.getSubscription().getCurrentSequence().getValue(), Long.valueOf(0));
131         assertEquals(callback.getSubscription().getSubscriptionId(), "uuid:1234");
132         assertEquals(callback.getSubscription().getActualDurationSeconds(), 180);
133 
134         List<URL> callbackURLs = ((RemoteGENASubscription) callback.getSubscription())
135                 .getEventCallbackURLs(upnpService.getRouter().getActiveStreamServers(null), upnpService.getConfiguration().getNamespace());
136 
137         callback.end();
138 
139         assert callback.getSubscription() == null;
140 
141         assertEquals(testAssertions.size(), 3);
142         for (Boolean testAssertion : testAssertions) {
143             assert testAssertion;
144         }
145 
146         List<StreamRequestMessage> sentMessages = upnpService.getRouter().getSentStreamRequestMessages();
147         assertEquals(sentMessages.size(), 2);
148         assertEquals(
149                 (sentMessages.get(0).getOperation()).getMethod(),
150                 UpnpRequest.Method.SUBSCRIBE
151         );
152         assertEquals(
153             sentMessages.get(0).getHeaders().getFirstHeader(UpnpHeader.Type.TIMEOUT, TimeoutHeader.class).getValue(),
154                 Integer.valueOf(1800)
155         );
156 
157         assertEquals(callbackURLs.size(), 1);
158         assertEquals(
159             sentMessages.get(0).getHeaders().getFirstHeader(UpnpHeader.Type.CALLBACK, CallbackHeader.class).getValue().get(0),
160                 callbackURLs.get(0)
161         );
162 
163         assertEquals(
164                 (sentMessages.get(1).getOperation()).getMethod(),
165                 UpnpRequest.Method.UNSUBSCRIBE
166         );
167         assertEquals(
168             sentMessages.get(1).getHeaders().getFirstHeader(UpnpHeader.Type.SID, SubscriptionIdHeader.class).getValue(),
169                 "uuid:1234"
170         );
171     }
172 
173     @Test
174     public void subscriptionLifecycleNotifyBeforeResponse() throws Exception {
175 
176         final RemoteDevice device = SampleData.createRemoteDevice();
177         final RemoteService service = SampleData.getFirstService(device);
178 
179         final StreamResponseMessage subscribeResponseMessage = createSubscribeResponseMessage();
180         final Semaphore subscribeResponseSemaphore = new Semaphore(0);
181 
182         final MockUpnpService upnpService = new MockUpnpService() {
183             @Override
184             protected MockRouter createRouter() {
185                 return new MockRouter(getConfiguration(), getProtocolFactory()) {
186                     @Override
187                     public StreamResponseMessage getStreamResponseMessage(StreamRequestMessage request) {
188                         try {
189                             // bloc subscription response until the first notification
190                             subscribeResponseSemaphore.acquire();
191                         } catch (InterruptedException e) {
192                             throw new RuntimeException(e);
193                         }
194                         return subscribeResponseMessage;
195                     }
196                 };
197             }
198         };
199 
200         final List<Boolean> testAssertions = new ArrayList<>();
201         final List<Boolean> notificationCalled = new ArrayList<>();
202 
203         // Register remote device and its service
204         upnpService.getRegistry().addDevice(device);
205 
206         // send subscription request
207         final SubscriptionCallback callback = new SubscriptionCallback(service) {
208 
209             @Override
210             protected void failed(GENASubscription subscription,
211                                   UpnpResponse responseStatus,
212                                   Exception exception,
213                                   String defaultMsg) {
214                 testAssertions.add(false);
215             }
216 
217             @Override
218             public void established(GENASubscription subscription) {
219                 testAssertions.add(true);
220             }
221 
222             @Override
223             public void ended(GENASubscription subscription, CancelReason reason, UpnpResponse responseStatus) {
224             }
225 
226             public void eventReceived(GENASubscription subscription) {
227                 assertEquals(subscription.getCurrentValues().get("Status").toString(), "0");
228                 assertEquals(subscription.getCurrentValues().get("Target").toString(), "1");
229                 testAssertions.add(true);
230                 notificationCalled.add(true);
231             }
232 
233             public void eventsMissed(GENASubscription subscription, int numberOfMissedEvents) {
234                 testAssertions.add(false);
235             }
236 
237         };
238 
239         // send subscription request is a separate thread
240         Thread subscribeThread = new Thread(new Runnable() {
241             @Override
242             public void run() {
243                 upnpService.getControlPoint().execute(callback);
244             }
245         });
246         subscribeThread.start();
247 
248         // generate notification in a separate thread
249         // use a dummy GENASubscription for that to have a valid subscriptionId
250        final GENASubscription subscription = new RemoteGENASubscription(service, 180) {
251             @Override
252             public void established() {
253             }
254             @Override
255             public void eventReceived() {
256             }
257             @Override
258             public void invalidMessage(UnsupportedDataException ex) {
259             }
260             @Override
261             public void failed(UpnpResponse responseStatus) {
262             }
263             @Override
264             public void ended(CancelReason reason, UpnpResponse responseStatus) {
265 
266             }
267             @Override
268             public void eventsMissed(int numberOfMissedEvents) {
269             }
270         };
271         subscription.setSubscriptionId("uuid:1234");
272 
273         Thread notifyThread = new Thread(new Runnable() {
274             @Override
275             public void run() {
276                 // Simulate received event before the subscription response
277                 try {
278                     upnpService.getProtocolFactory().createReceivingSync(
279                             createEventRequestMessage(upnpService, service, subscription)
280                     ).run();
281                 } catch (ProtocolCreationException e) {
282                     testAssertions.add(false);
283                 }
284 
285             }
286         });
287         notifyThread.start();
288 
289         // give time to process notification
290         Thread.sleep(1000);
291 
292         // notification should not have been received before the subscribe response
293         assertEquals(0, notificationCalled.size());
294 
295         // unlock subscription response
296         subscribeResponseSemaphore.release();
297 
298         subscribeThread.join();
299         notifyThread.join();
300 
301         // notification should have been received after the subscribe response
302         assertEquals(1, notificationCalled.size());
303 
304         for (Boolean testAssertion : testAssertions) {
305             assert testAssertion;
306         }
307     }
308 
309 
310 
311     protected StreamResponseMessage createSubscribeResponseMessage() {
312         StreamResponseMessage msg = new StreamResponseMessage(new UpnpResponse(UpnpResponse.Status.OK));
313         msg.getHeaders().add(
314                 UpnpHeader.Type.SID, new SubscriptionIdHeader("uuid:1234")
315         );
316         msg.getHeaders().add(
317                 UpnpHeader.Type.TIMEOUT, new TimeoutHeader(180)
318         );
319         return msg;
320     }
321 
322     protected StreamResponseMessage createUnsubscribeResponseMessage() {
323         return new StreamResponseMessage(new UpnpResponse(UpnpResponse.Status.OK));
324     }
325 
326     protected IncomingEventRequestMessage createEventRequestMessage(final UpnpService upnpService, final SubscriptionCallback callback) {
327 
328         List<StateVariableValue> values = new ArrayList<>();
329         values.add(
330                 new StateVariableValue(callback.getService().getStateVariable("Status"), false)
331         );
332         values.add(
333                 new StateVariableValue(callback.getService().getStateVariable("Target"), true)
334         );
335 
336         OutgoingEventRequestMessage outgoing = new OutgoingEventRequestMessage(
337                 callback.getSubscription(),
338                 URIUtil.toURL(URI.create("http://10.0.0.123/this/is/ignored/anyway")),
339                 new UnsignedIntegerFourBytes(0),
340                 values
341         );
342         outgoing.getOperation().setUri(
343                 upnpService.getConfiguration().getNamespace().getEventCallbackPath(callback.getService())
344         );
345 
346         upnpService.getConfiguration().getGenaEventProcessor().writeBody(outgoing);
347 
348         return new IncomingEventRequestMessage(outgoing, ((RemoteGENASubscription)callback.getSubscription()).getService());
349     }
350 
351 
352     protected IncomingEventRequestMessage createEventRequestMessage(final UpnpService upnpService, final RemoteService service, final GENASubscription subscription) {
353 
354         List<StateVariableValue> values = new ArrayList<>();
355         values.add(
356                 new StateVariableValue(service.getStateVariable("Status"), false)
357         );
358         values.add(
359                 new StateVariableValue(service.getStateVariable("Target"), true)
360         );
361 
362         OutgoingEventRequestMessage outgoing = new OutgoingEventRequestMessage(
363                 subscription,
364                 URIUtil.toURL(URI.create("http://10.0.0.123/this/is/ignored/anyway")),
365                 new UnsignedIntegerFourBytes(0),
366                 values
367         );
368         outgoing.getOperation().setUri(
369                 upnpService.getConfiguration().getNamespace().getEventCallbackPath(service)
370         );
371 
372         upnpService.getConfiguration().getGenaEventProcessor().writeBody(outgoing);
373 
374         return new IncomingEventRequestMessage(outgoing, service);
375     }
376 
377  }