1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.fourthline.cling.model.gena;
17
18 import org.fourthline.cling.model.ServiceManager;
19 import org.fourthline.cling.model.UserConstants;
20 import org.fourthline.cling.model.message.header.SubscriptionIdHeader;
21 import org.fourthline.cling.model.meta.LocalService;
22 import org.fourthline.cling.model.meta.StateVariable;
23 import org.fourthline.cling.model.state.StateVariableValue;
24 import org.fourthline.cling.model.types.UnsignedIntegerFourBytes;
25 import org.seamless.util.Exceptions;
26
27 import java.beans.PropertyChangeEvent;
28 import java.beans.PropertyChangeListener;
29 import java.net.URL;
30 import java.util.Collection;
31 import java.util.Date;
32 import java.util.HashMap;
33 import java.util.HashSet;
34 import java.util.List;
35 import java.util.Map;
36 import java.util.Set;
37 import java.util.UUID;
38 import java.util.logging.Level;
39 import java.util.logging.Logger;
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56 public abstract class LocalGENASubscription extends GENASubscription<LocalService> implements PropertyChangeListener {
57
58 private static Logger log = Logger.getLogger(LocalGENASubscription.class.getName());
59
60 final List<URL> callbackURLs;
61
62
63 final Map<String, Long> lastSentTimestamp = new HashMap<>();
64 final Map<String, Long> lastSentNumericValue = new HashMap<>();
65
66 protected LocalGENASubscription(LocalService service, List<URL> callbackURLs) throws Exception {
67 super(service);
68 this.callbackURLs = callbackURLs;
69 }
70
71 public LocalGENASubscription(LocalService service,
72 Integer requestedDurationSeconds, List<URL> callbackURLs) throws Exception {
73 super(service);
74
75 setSubscriptionDuration(requestedDurationSeconds);
76
77 log.fine("Reading initial state of local service at subscription time");
78 long currentTime = new Date().getTime();
79 this.currentValues.clear();
80
81 Collection<StateVariableValue> values = getService().getManager().getCurrentState();
82
83 log.finer("Got evented state variable values: " + values.size());
84
85 for (StateVariableValue value : values) {
86 this.currentValues.put(value.getStateVariable().getName(), value);
87
88 if (log.isLoggable(Level.FINEST)) {
89 log.finer("Read state variable value '" + value.getStateVariable().getName() + "': " + value.toString());
90 }
91
92
93 lastSentTimestamp.put(value.getStateVariable().getName(), currentTime);
94 if (value.getStateVariable().isModeratedNumericType()) {
95 lastSentNumericValue.put(value.getStateVariable().getName(), Long.valueOf(value.toString()));
96 }
97 }
98
99 this.subscriptionId = SubscriptionIdHeader.PREFIX + UUID.randomUUID();
100 this.currentSequence = new UnsignedIntegerFourBytes(0);
101 this.callbackURLs = callbackURLs;
102 }
103
104 synchronized public List<URL> getCallbackURLs() {
105 return callbackURLs;
106 }
107
108
109
110
111 synchronized public void registerOnService() {
112 getService().getManager().getPropertyChangeSupport().addPropertyChangeListener(this);
113 }
114
115 synchronized public void establish() {
116 established();
117 }
118
119
120
121
122 synchronized public void end(CancelReason reason) {
123 try {
124 getService().getManager().getPropertyChangeSupport().removePropertyChangeListener(this);
125 } catch (Exception ex) {
126 log.warning("Removal of local service property change listener failed: " + Exceptions.unwrap(ex));
127 }
128 ended(reason);
129 }
130
131
132
133
134
135 synchronized public void propertyChange(PropertyChangeEvent e) {
136 if (!e.getPropertyName().equals(ServiceManager.EVENTED_STATE_VARIABLES)) return;
137
138 log.fine("Eventing triggered, getting state for subscription: " + getSubscriptionId());
139
140 long currentTime = new Date().getTime();
141
142 Collection<StateVariableValue> newValues = (Collection) e.getNewValue();
143 Set<String> excludedVariables = moderateStateVariables(currentTime, newValues);
144
145 currentValues.clear();
146 for (StateVariableValue newValue : newValues) {
147 String name = newValue.getStateVariable().getName();
148 if (!excludedVariables.contains(name)) {
149 log.fine("Adding state variable value to current values of event: " + newValue.getStateVariable() + " = " + newValue);
150 currentValues.put(newValue.getStateVariable().getName(), newValue);
151
152
153 lastSentTimestamp.put(name, currentTime);
154 if (newValue.getStateVariable().isModeratedNumericType()) {
155 lastSentNumericValue.put(name, Long.valueOf(newValue.toString()));
156 }
157 }
158 }
159
160 if (currentValues.size() > 0) {
161 log.fine("Propagating new state variable values to subscription: " + this);
162
163
164
165 eventReceived();
166 } else {
167 log.fine("No state variable values for event (all moderated out?), not triggering event");
168 }
169 }
170
171
172
173
174
175
176
177
178 synchronized protected Set<String> moderateStateVariables(long currentTime, Collection<StateVariableValue> values) {
179
180 Set<String> excludedVariables = new HashSet<>();
181
182
183 for (StateVariableValue stateVariableValue : values) {
184
185 StateVariable stateVariable = stateVariableValue.getStateVariable();
186 String stateVariableName = stateVariableValue.getStateVariable().getName();
187
188 if (stateVariable.getEventDetails().getEventMaximumRateMilliseconds() == 0 &&
189 stateVariable.getEventDetails().getEventMinimumDelta() == 0) {
190 log.finer("Variable is not moderated: " + stateVariable);
191 continue;
192 }
193
194
195 if (!lastSentTimestamp.containsKey(stateVariableName)) {
196 log.finer("Variable is moderated but was never sent before: " + stateVariable);
197 continue;
198 }
199
200 if (stateVariable.getEventDetails().getEventMaximumRateMilliseconds() > 0) {
201 long timestampLastSent = lastSentTimestamp.get(stateVariableName);
202 long timestampNextSend = timestampLastSent + (stateVariable.getEventDetails().getEventMaximumRateMilliseconds());
203 if (currentTime <= timestampNextSend) {
204 log.finer("Excluding state variable with maximum rate: " + stateVariable);
205 excludedVariables.add(stateVariableName);
206 continue;
207 }
208 }
209
210 if (stateVariable.isModeratedNumericType() && lastSentNumericValue.get(stateVariableName) != null) {
211
212 long oldValue = Long.valueOf(lastSentNumericValue.get(stateVariableName));
213 long newValue = Long.valueOf(stateVariableValue.toString());
214 long minDelta = stateVariable.getEventDetails().getEventMinimumDelta();
215
216 if (newValue > oldValue && newValue - oldValue < minDelta) {
217 log.finer("Excluding state variable with minimum delta: " + stateVariable);
218 excludedVariables.add(stateVariableName);
219 continue;
220 }
221
222 if (newValue < oldValue && oldValue - newValue < minDelta) {
223 log.finer("Excluding state variable with minimum delta: " + stateVariable);
224 excludedVariables.add(stateVariableName);
225 }
226 }
227
228 }
229 return excludedVariables;
230 }
231
232 synchronized public void incrementSequence() {
233 this.currentSequence.increment(true);
234 }
235
236
237
238
239
240 synchronized public void setSubscriptionDuration(Integer requestedDurationSeconds) {
241 this.requestedDurationSeconds =
242 requestedDurationSeconds == null
243 ? UserConstants.DEFAULT_SUBSCRIPTION_DURATION_SECONDS
244 : requestedDurationSeconds;
245
246 setActualSubscriptionDurationSeconds(this.requestedDurationSeconds);
247 }
248
249 public abstract void ended(CancelReason reason);
250
251 }