Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,10 @@ node.subscribeToValues(valueConsumer, fs, sampleRate)
Subscribe to value changes on this node. On each value change valueConsumer function is called
with value of the nodes value_type and UTC Unix timestamp in nanoseconds (nanoseconds from 01.01.1970).
Timestamp refers to the time of value change in connected application on target controller.
If the client detects a time synchronization offset larger than 3 seconds,
incoming timestamps are automatically adjusted by that offset. The offset is
determined by sending three consecutive time-sync requests and using the
smallest offset reported.

- Example

Expand Down
68 changes: 65 additions & 3 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ studio.protocol.BINARY_TYPE = "arraybuffer";
*/
studio.internal = (function(proto) {
var obj = {};
var hostTimeOffsets = new Map();

obj.structure = {
REMOVE: 0,
Expand Down Expand Up @@ -715,6 +716,7 @@ studio.internal = (function(proto) {
var appName = "";
var appId = undefined;
var appUrl = composeUrl(url);
var host = new URL(appUrl).hostname;
var socket = new WebSocket(appUrl);
var handler = new proto.Handler(socket, notificationListener);
var requests = [];
Expand All @@ -725,6 +727,9 @@ studio.internal = (function(proto) {
var onError;
var onOpen;
var reauthRequestPending = false;
var timeOffsetNs = hostTimeOffsets.get(host) || 0;
var timeSyncTimer = null;
var timeSyncSamples = [];
socket.binaryType = proto.BINARY_TYPE;
nodeMap.set(systemNode.id(), systemNode);
handler.onContainer = handleIncomingContainer;
Expand All @@ -747,7 +752,13 @@ studio.internal = (function(proto) {

onMessage = function(evt) { handler.handle(evt.data); };
onError = function (ev) { console.log("Socket error: " + ev.data); };
onOpen = function() { appConnection.resubscribe(systemNode); };
onOpen = function() {
appConnection.resubscribe(systemNode);
startTimeSync();
if (!timeSyncTimer) {
timeSyncTimer = setInterval(startTimeSync, 30000);
}
};
onClosed = function (event) {
var reason;

Expand Down Expand Up @@ -781,6 +792,7 @@ studio.internal = (function(proto) {
reason = "Unknown reason";

console.log("Socket close: " + reason);
stopTimeSync();

if (autoConnect)
{
Expand Down Expand Up @@ -834,6 +846,55 @@ studio.internal = (function(proto) {
requests = [];
}

function makeCurrentTimeRequest() {
var msg = new proto.Container();
msg.message_type = proto.ContainerType.eCurrentTimeRequest;
send(msg);
}

function setTimeOffset(offset) {
timeOffsetNs = offset;
hostTimeOffsets.set(host, offset);
}

function beginTimeSync() {
if (timeSyncSamples.length === 0) {
makeCurrentTimeRequest();
}
}

function parseCurrentTimeResponse(serverTime) {
var localNs = Date.now() * 1000000;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here also 1E6

var offsetCandidate = serverTime - localNs;
timeSyncSamples.push(offsetCandidate);
if (timeSyncSamples.length < 3) {
makeCurrentTimeRequest();
return;
}
var minOffset = Math.min.apply(null, timeSyncSamples);
timeSyncSamples = [];
if (Math.abs(minOffset) > 3000000000) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer the 3E9 syntax for large numbers. It is very annoying to count the zeros

setTimeOffset(minOffset);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't there be an else clause to reset the offset for the case previously due to some network issue offset was very high but now is under 3 seconds?

}

function startTimeSync() {
beginTimeSync();
}

function stopTimeSync() {
if (timeSyncTimer) {
clearInterval(timeSyncTimer);
timeSyncTimer = null;
}
}

function applyTimeOffset(ts) {
if (ts === undefined || ts === null)
return ts;
return ts + (hostTimeOffsets.get(host) || 0);
}

this.makeStructureRequest = function(id) {
var msg = new proto.Container();
msg.message_type = proto.ContainerType.eStructureRequest;
Expand Down Expand Up @@ -990,7 +1051,7 @@ studio.internal = (function(proto) {
var variantValue = protoResponse[i];
var node = nodeMap.get(variantValue.node_id);
if (node)
node.receiveValue(proto.valueFromVariant(variantValue, node.info().value_type), variantValue.timestamp);
node.receiveValue(proto.valueFromVariant(variantValue, node.info().value_type), applyTimeOffset(variantValue.timestamp));
}
}

Expand All @@ -1014,7 +1075,7 @@ studio.internal = (function(proto) {
sender: variantValue.sender,
code: variantValue.code,
status: variantValue.status,
timestamp: variantValue.timestamp,
timestamp: applyTimeOffset(variantValue.timestamp),
data: variantValue.data
};
node.receiveEvent(event);
Expand Down Expand Up @@ -1068,6 +1129,7 @@ studio.internal = (function(proto) {
parseEventResponse(protoContainer.event_response);
break;
case proto.ContainerType.eCurrentTimeResponse:
parseCurrentTimeResponse(protoContainer.current_time_response);
break;
case proto.ContainerType.eReauthResponse:
parseReauthResponse(protoContainer.re_auth_response, metadata);
Expand Down