View Javadoc
1   /*
2    * Copyright (C) 2013 4th Line GmbH, Switzerland
3    *
4    * The contents of this file are subject to the terms of either the GNU
5    * Lesser General Public License Version 2 or later ("LGPL") or the
6    * Common Development and Distribution License Version 1 or later
7    * ("CDDL") (collectively, the "License"). You may not use this file
8    * except in compliance with the License. See LICENSE.txt for more
9    * information.
10   *
11   * This program is distributed in the hope that it will be useful,
12   * but WITHOUT ANY WARRANTY; without even the implied warranty of
13   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
14   */
15  
16  package org.fourthline.cling.model.gena;
17  
18  import org.fourthline.cling.model.ServiceManager;
19  import org.fourthline.cling.model.UserConstants;
20  import org.fourthline.cling.model.message.header.SubscriptionIdHeader;
21  import org.fourthline.cling.model.meta.LocalService;
22  import org.fourthline.cling.model.meta.StateVariable;
23  import org.fourthline.cling.model.state.StateVariableValue;
24  import org.fourthline.cling.model.types.UnsignedIntegerFourBytes;
25  import org.seamless.util.Exceptions;
26  
27  import java.beans.PropertyChangeEvent;
28  import java.beans.PropertyChangeListener;
29  import java.net.URL;
30  import java.util.Collection;
31  import java.util.Date;
32  import java.util.HashMap;
33  import java.util.HashSet;
34  import java.util.List;
35  import java.util.Map;
36  import java.util.Set;
37  import java.util.UUID;
38  import java.util.logging.Level;
39  import java.util.logging.Logger;
40  
41  /**
42   * An incoming subscription to a local service.
43   * <p>
44   * Uses the {@link org.fourthline.cling.model.ServiceManager} to read the initial state of
45   * the {@link org.fourthline.cling.model.meta.LocalService} on instantation. Typically, the
46   * {@link #registerOnService()} method is called next, and from this point forward all
47   * {@link org.fourthline.cling.model.ServiceManager#EVENTED_STATE_VARIABLES} property change
48   * events are detected by this subscription. After moderation of state variable values
49   * (frequency and range of changes), the {@link #eventReceived()} method is called.
50   * Delivery of the event message to the subscriber is not part of this class, but the
51   * implementor of {@link #eventReceived()}.
52   * </p>
53   *
54   * @author Christian Bauer
55   */
56  public abstract class LocalGENASubscription extends GENASubscription<LocalService> implements PropertyChangeListener {
57  
58      private static Logger log = Logger.getLogger(LocalGENASubscription.class.getName());
59  
60      final List<URL> callbackURLs;
61  
62      // Moderation history
63      final Map<String, Long> lastSentTimestamp = new HashMap<>();
64      final Map<String, Long> lastSentNumericValue = new HashMap<>();
65  
66      protected LocalGENASubscription(LocalService service, List<URL> callbackURLs) throws Exception {
67          super(service);
68          this.callbackURLs = callbackURLs;
69      }
70  
71      public LocalGENASubscription(LocalService service,
72                                   Integer requestedDurationSeconds, List<URL> callbackURLs) throws Exception {
73          super(service);
74  
75          setSubscriptionDuration(requestedDurationSeconds);
76  
77          log.fine("Reading initial state of local service at subscription time");
78          long currentTime = new Date().getTime();
79          this.currentValues.clear();
80  
81          Collection<StateVariableValue> values = getService().getManager().getCurrentState();
82  
83          log.finer("Got evented state variable values: " + values.size());
84  
85          for (StateVariableValue value : values) {
86              this.currentValues.put(value.getStateVariable().getName(), value);
87  
88              if (log.isLoggable(Level.FINEST)) {
89                  log.finer("Read state variable value '" + value.getStateVariable().getName() + "': " + value.toString());
90              }
91  
92              // Preserve "last sent" state for future moderation
93              lastSentTimestamp.put(value.getStateVariable().getName(), currentTime);
94              if (value.getStateVariable().isModeratedNumericType()) {
95                  lastSentNumericValue.put(value.getStateVariable().getName(), Long.valueOf(value.toString()));
96              }
97          }
98  
99          this.subscriptionId = SubscriptionIdHeader.PREFIX + UUID.randomUUID();
100         this.currentSequence = new UnsignedIntegerFourBytes(0);
101         this.callbackURLs = callbackURLs;
102     }
103 
104     synchronized public List<URL> getCallbackURLs() {
105         return callbackURLs;
106     }
107 
108     /**
109      * Adds a property change listener on the {@link org.fourthline.cling.model.ServiceManager}.
110      */
111     synchronized public void registerOnService() {
112         getService().getManager().getPropertyChangeSupport().addPropertyChangeListener(this);
113     }
114 
115     synchronized public void establish() {
116         established();
117     }
118 
119     /**
120      * Removes a property change listener on the {@link org.fourthline.cling.model.ServiceManager}.
121      */
122     synchronized public void end(CancelReason reason) {
123         try {
124             getService().getManager().getPropertyChangeSupport().removePropertyChangeListener(this);
125         } catch (Exception ex) {
126             log.warning("Removal of local service property change listener failed: " + Exceptions.unwrap(ex));
127         }
128         ended(reason);
129     }
130 
131     /**
132      * Moderates {@link org.fourthline.cling.model.ServiceManager#EVENTED_STATE_VARIABLES} events and state variable
133      * values, calls {@link #eventReceived()}.
134      */
135     synchronized public void propertyChange(PropertyChangeEvent e) {
136         if (!e.getPropertyName().equals(ServiceManager.EVENTED_STATE_VARIABLES)) return;
137 
138         log.fine("Eventing triggered, getting state for subscription: " + getSubscriptionId());
139 
140         long currentTime = new Date().getTime();
141 
142         Collection<StateVariableValue> newValues = (Collection) e.getNewValue();
143         Set<String> excludedVariables = moderateStateVariables(currentTime, newValues);
144 
145         currentValues.clear();
146         for (StateVariableValue newValue : newValues) {
147             String name = newValue.getStateVariable().getName();
148             if (!excludedVariables.contains(name)) {
149                 log.fine("Adding state variable value to current values of event: " + newValue.getStateVariable() + " = " + newValue);
150                 currentValues.put(newValue.getStateVariable().getName(), newValue);
151 
152                 // Preserve "last sent" state for future moderation
153                 lastSentTimestamp.put(name, currentTime);
154                 if (newValue.getStateVariable().isModeratedNumericType()) {
155                     lastSentNumericValue.put(name, Long.valueOf(newValue.toString()));
156                 }
157             }
158         }
159 
160         if (currentValues.size() > 0) {
161             log.fine("Propagating new state variable values to subscription: " + this);
162             // TODO: I'm not happy with this design, this dispatches to a separate thread which _then_
163             // is supposed to lock and read the values off this instance. That obviously doesn't work
164             // so it's currently a hack in SendingEvent.java
165             eventReceived();
166         } else {
167             log.fine("No state variable values for event (all moderated out?), not triggering event");
168         }
169     }
170 
171     /**
172      * Checks whether a state variable is moderated, and if this change is within the maximum rate and range limits.
173      *
174      * @param currentTime The current unix time.
175      * @param values The state variable values to moderate.
176      * @return A collection of state variable values that although they might have changed, are excluded from the event.
177      */
178     synchronized protected Set<String> moderateStateVariables(long currentTime, Collection<StateVariableValue> values) {
179 
180         Set<String> excludedVariables = new HashSet<>();
181 
182         // Moderate event variables that have a maximum rate or minimum delta
183         for (StateVariableValue stateVariableValue : values) {
184 
185             StateVariable stateVariable = stateVariableValue.getStateVariable();
186             String stateVariableName = stateVariableValue.getStateVariable().getName();
187 
188             if (stateVariable.getEventDetails().getEventMaximumRateMilliseconds() == 0 &&
189                     stateVariable.getEventDetails().getEventMinimumDelta() == 0) {
190                 log.finer("Variable is not moderated: " + stateVariable);
191                 continue;
192             }
193 
194             // That should actually never happen, because we always "send" it as the initial state/event
195             if (!lastSentTimestamp.containsKey(stateVariableName)) {
196                 log.finer("Variable is moderated but was never sent before: " + stateVariable);
197                 continue;
198             }
199 
200             if (stateVariable.getEventDetails().getEventMaximumRateMilliseconds() > 0) {
201                 long timestampLastSent = lastSentTimestamp.get(stateVariableName);
202                 long timestampNextSend = timestampLastSent + (stateVariable.getEventDetails().getEventMaximumRateMilliseconds());
203                 if (currentTime <= timestampNextSend) {
204                     log.finer("Excluding state variable with maximum rate: " + stateVariable);
205                     excludedVariables.add(stateVariableName);
206                     continue;
207                 }
208             }
209 
210             if (stateVariable.isModeratedNumericType() && lastSentNumericValue.get(stateVariableName) != null) {
211 
212                 long oldValue = Long.valueOf(lastSentNumericValue.get(stateVariableName));
213                 long newValue = Long.valueOf(stateVariableValue.toString());
214                 long minDelta = stateVariable.getEventDetails().getEventMinimumDelta();
215 
216                 if (newValue > oldValue && newValue - oldValue < minDelta) {
217                     log.finer("Excluding state variable with minimum delta: " + stateVariable);
218                     excludedVariables.add(stateVariableName);
219                     continue;
220                 }
221 
222                 if (newValue < oldValue && oldValue - newValue < minDelta) {
223                     log.finer("Excluding state variable with minimum delta: " + stateVariable);
224                     excludedVariables.add(stateVariableName);
225                 }
226             }
227 
228         }
229         return excludedVariables;
230     }
231 
232     synchronized public void incrementSequence() {
233         this.currentSequence.increment(true);
234     }
235 
236     /**
237      * @param requestedDurationSeconds If <code>null</code> defaults to
238      *                                 {@link org.fourthline.cling.model.UserConstants#DEFAULT_SUBSCRIPTION_DURATION_SECONDS}
239      */
240     synchronized public void setSubscriptionDuration(Integer requestedDurationSeconds) {
241         this.requestedDurationSeconds =
242                 requestedDurationSeconds == null
243                         ? UserConstants.DEFAULT_SUBSCRIPTION_DURATION_SECONDS
244                         : requestedDurationSeconds;
245 
246         setActualSubscriptionDurationSeconds(this.requestedDurationSeconds);
247     }
248 
249     public abstract void ended(CancelReason reason);
250 
251 }