1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.fourthline.cling.protocol.sync;
17
18 import org.fourthline.cling.UpnpService;
19 import org.fourthline.cling.model.gena.CancelReason;
20 import org.fourthline.cling.model.gena.LocalGENASubscription;
21 import org.fourthline.cling.model.message.StreamRequestMessage;
22 import org.fourthline.cling.model.message.StreamResponseMessage;
23 import org.fourthline.cling.model.message.UpnpResponse;
24 import org.fourthline.cling.model.message.gena.IncomingSubscribeRequestMessage;
25 import org.fourthline.cling.model.message.gena.OutgoingSubscribeResponseMessage;
26 import org.fourthline.cling.model.meta.LocalService;
27 import org.fourthline.cling.model.resource.ServiceEventSubscriptionResource;
28 import org.fourthline.cling.protocol.ReceivingSync;
29 import org.fourthline.cling.transport.RouterException;
30 import org.seamless.util.Exceptions;
31
32 import java.net.URL;
33 import java.util.List;
34 import java.util.logging.Logger;
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56 public class ReceivingSubscribe extends ReceivingSync<StreamRequestMessage, OutgoingSubscribeResponseMessage> {
57
58 final private static Logger log = Logger.getLogger(ReceivingSubscribe.class.getName());
59
60 protected LocalGENASubscription subscription;
61
62 public ReceivingSubscribe(UpnpService upnpService, StreamRequestMessage inputMessage) {
63 super(upnpService, inputMessage);
64 }
65
66 protected OutgoingSubscribeResponseMessage executeSync() throws RouterException {
67
68 ServiceEventSubscriptionResource resource =
69 getUpnpService().getRegistry().getResource(
70 ServiceEventSubscriptionResource.class,
71 getInputMessage().getUri()
72 );
73
74 if (resource == null) {
75 log.fine("No local resource found: " + getInputMessage());
76 return null;
77 }
78
79 log.fine("Found local event subscription matching relative request URI: " + getInputMessage().getUri());
80
81 IncomingSubscribeRequestMessage requestMessage =
82 new IncomingSubscribeRequestMessage(getInputMessage(), resource.getModel());
83
84
85 if (requestMessage.getSubscriptionId() != null &&
86 (requestMessage.hasNotificationHeader() || requestMessage.getCallbackURLs() != null)) {
87 log.fine("Subscription ID and NT or Callback in subscribe request: " + getInputMessage());
88 return new OutgoingSubscribeResponseMessage(UpnpResponse.Status.BAD_REQUEST);
89 }
90
91 if (requestMessage.getSubscriptionId() != null) {
92 return processRenewal(resource.getModel(), requestMessage);
93 } else if (requestMessage.hasNotificationHeader() && requestMessage.getCallbackURLs() != null){
94 return processNewSubscription(resource.getModel(), requestMessage);
95 } else {
96 log.fine("No subscription ID, no NT or Callback, neither subscription or renewal: " + getInputMessage());
97 return new OutgoingSubscribeResponseMessage(UpnpResponse.Status.PRECONDITION_FAILED);
98 }
99
100 }
101
102 protected OutgoingSubscribeResponseMessage processRenewal(LocalService service,
103 IncomingSubscribeRequestMessage requestMessage) {
104
105 subscription = getUpnpService().getRegistry().getLocalSubscription(requestMessage.getSubscriptionId());
106
107
108 if (subscription == null) {
109 log.fine("Invalid subscription ID for renewal request: " + getInputMessage());
110 return new OutgoingSubscribeResponseMessage(UpnpResponse.Status.PRECONDITION_FAILED);
111 }
112
113 log.fine("Renewing subscription: " + subscription);
114 subscription.setSubscriptionDuration(requestMessage.getRequestedTimeoutSeconds());
115 if (getUpnpService().getRegistry().updateLocalSubscription(subscription)) {
116 return new OutgoingSubscribeResponseMessage(subscription);
117 } else {
118 log.fine("Subscription went away before it could be renewed: " + getInputMessage());
119 return new OutgoingSubscribeResponseMessage(UpnpResponse.Status.PRECONDITION_FAILED);
120 }
121 }
122
123 protected OutgoingSubscribeResponseMessage processNewSubscription(LocalService service,
124 IncomingSubscribeRequestMessage requestMessage) {
125 List<URL> callbackURLs = requestMessage.getCallbackURLs();
126
127
128 if (callbackURLs == null || callbackURLs.size() == 0) {
129 log.fine("Missing or invalid Callback URLs in subscribe request: " + getInputMessage());
130 return new OutgoingSubscribeResponseMessage(UpnpResponse.Status.PRECONDITION_FAILED);
131 }
132
133 if (!requestMessage.hasNotificationHeader()) {
134 log.fine("Missing or invalid NT header in subscribe request: " + getInputMessage());
135 return new OutgoingSubscribeResponseMessage(UpnpResponse.Status.PRECONDITION_FAILED);
136 }
137
138 Integer timeoutSeconds;
139 if(getUpnpService().getConfiguration().isReceivedSubscriptionTimeoutIgnored()) {
140 timeoutSeconds = null;
141 } else {
142 timeoutSeconds = requestMessage.getRequestedTimeoutSeconds();
143 }
144
145 try {
146 subscription = new LocalGENASubscription(service, timeoutSeconds, callbackURLs) {
147 public void established() {
148 }
149
150 public void ended(CancelReason reason) {
151 }
152
153 public void eventReceived() {
154
155 getUpnpService().getConfiguration().getSyncProtocolExecutorService().execute(
156 getUpnpService().getProtocolFactory().createSendingEvent(this)
157 );
158 }
159 };
160 } catch (Exception ex) {
161 log.warning("Couldn't create local subscription to service: " + Exceptions.unwrap(ex));
162 return new OutgoingSubscribeResponseMessage(UpnpResponse.Status.INTERNAL_SERVER_ERROR);
163 }
164
165 log.fine("Adding subscription to registry: " + subscription);
166 getUpnpService().getRegistry().addLocalSubscription(subscription);
167
168 log.fine("Returning subscription response, waiting to send initial event");
169 return new OutgoingSubscribeResponseMessage(subscription);
170 }
171
172 @Override
173 public void responseSent(StreamResponseMessage responseMessage) {
174 if (subscription == null) return;
175 if (responseMessage != null
176 && !responseMessage.getOperation().isFailed()
177 && subscription.getCurrentSequence().getValue() == 0) {
178
179
180
181
182
183
184 log.fine("Establishing subscription");
185 subscription.registerOnService();
186 subscription.establish();
187
188 log.fine("Response to subscription sent successfully, now sending initial event asynchronously");
189 getUpnpService().getConfiguration().getAsyncProtocolExecutor().execute(
190 getUpnpService().getProtocolFactory().createSendingEvent(subscription)
191 );
192
193 } else if (subscription.getCurrentSequence().getValue() == 0) {
194 log.fine("Subscription request's response aborted, not sending initial event");
195 if (responseMessage == null) {
196 log.fine("Reason: No response at all from subscriber");
197 } else {
198 log.fine("Reason: " + responseMessage.getOperation());
199 }
200 log.fine("Removing subscription from registry: " + subscription);
201 getUpnpService().getRegistry().removeLocalSubscription(subscription);
202 }
203 }
204
205 @Override
206 public void responseException(Throwable t) {
207 if (subscription == null) return;
208 log.fine("Response could not be send to subscriber, removing local GENA subscription: " + subscription);
209 getUpnpService().getRegistry().removeLocalSubscription(subscription);
210 }
211 }