Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,10 @@ node.subscribeToValues(valueConsumer, fs, sampleRate)
Subscribe to value changes on this node. On each value change valueConsumer function is called
with value of the nodes value_type and UTC Unix timestamp in nanoseconds (nanoseconds from 01.01.1970).
Timestamp refers to the time of value change in connected application on target controller.
If the client detects a time synchronization offset larger than 3 seconds,
incoming timestamps are automatically adjusted by that offset. The offset is
determined by sending three consecutive time-sync requests and using the
smallest offset reported.

- Example

Expand Down
63 changes: 60 additions & 3 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -725,6 +725,9 @@ studio.internal = (function(proto) {
var onError;
var onOpen;
var reauthRequestPending = false;
var timeOffsetNs = 0;
var timeSyncTimer = null;
var timeSyncSamples = [];
socket.binaryType = proto.BINARY_TYPE;
nodeMap.set(systemNode.id(), systemNode);
handler.onContainer = handleIncomingContainer;
Expand All @@ -747,7 +750,10 @@ studio.internal = (function(proto) {

onMessage = function(evt) { handler.handle(evt.data); };
onError = function (ev) { console.log("Socket error: " + ev.data); };
onOpen = function() { appConnection.resubscribe(systemNode); };
onOpen = function() {
appConnection.resubscribe(systemNode);
startTimeSync();
};
onClosed = function (event) {
var reason;

Expand Down Expand Up @@ -781,6 +787,8 @@ studio.internal = (function(proto) {
reason = "Unknown reason";

console.log("Socket close: " + reason);
stopTimeSync();
timeOffsetNs = 0;

if (autoConnect)
{
Expand Down Expand Up @@ -834,6 +842,53 @@ studio.internal = (function(proto) {
requests = [];
}

function makeCurrentTimeRequest() {
var msg = new proto.Container();
msg.message_type = proto.ContainerType.eCurrentTimeRequest;
send(msg);
}

function beginTimeSync() {
if (timeSyncSamples.length === 0) {
makeCurrentTimeRequest();
}
}

function parseCurrentTimeResponse(serverTime) {
var localNs = Date.now() * 1000000;
var offsetCandidate = serverTime - localNs;
timeSyncSamples.push(offsetCandidate);
if (timeSyncSamples.length < 3) {
makeCurrentTimeRequest();
return;
}
var minOffset = Math.min.apply(null, timeSyncSamples);
timeSyncSamples = [];
if (Math.abs(minOffset) > 3000000000) {
timeOffsetNs = minOffset;
}
}

function startTimeSync() {
if (!timeSyncTimer) {
beginTimeSync();
timeSyncTimer = setInterval(beginTimeSync, 30000);
}
}

function stopTimeSync() {
if (timeSyncTimer) {
clearInterval(timeSyncTimer);
timeSyncTimer = null;
}
}

function applyTimeOffset(ts) {
if (ts === undefined || ts === null)
return ts;
return ts + timeOffsetNs;
}

this.makeStructureRequest = function(id) {
var msg = new proto.Container();
msg.message_type = proto.ContainerType.eStructureRequest;
Expand Down Expand Up @@ -990,7 +1045,7 @@ studio.internal = (function(proto) {
var variantValue = protoResponse[i];
var node = nodeMap.get(variantValue.node_id);
if (node)
node.receiveValue(proto.valueFromVariant(variantValue, node.info().value_type), variantValue.timestamp);
node.receiveValue(proto.valueFromVariant(variantValue, node.info().value_type), applyTimeOffset(variantValue.timestamp));
}
}

Expand All @@ -1014,7 +1069,7 @@ studio.internal = (function(proto) {
sender: variantValue.sender,
code: variantValue.code,
status: variantValue.status,
timestamp: variantValue.timestamp,
timestamp: applyTimeOffset(variantValue.timestamp),
data: variantValue.data
};
node.receiveEvent(event);
Expand Down Expand Up @@ -1054,6 +1109,7 @@ studio.internal = (function(proto) {
}

function handleIncomingContainer(protoContainer, metadata) {
startTimeSync();
switch(protoContainer.message_type){
case proto.ContainerType.eStructureResponse:
parseStructureResponse(protoContainer.structure_response);
Expand All @@ -1068,6 +1124,7 @@ studio.internal = (function(proto) {
parseEventResponse(protoContainer.event_response);
break;
case proto.ContainerType.eCurrentTimeResponse:
parseCurrentTimeResponse(protoContainer.current_time_response);
break;
case proto.ContainerType.eReauthResponse:
parseReauthResponse(protoContainer.re_auth_response, metadata);
Expand Down