diff --git a/README.rst b/README.rst index 62cf404..0c6f5a3 100644 --- a/README.rst +++ b/README.rst @@ -447,6 +447,10 @@ node.subscribeToValues(valueConsumer, fs, sampleRate) Subscribe to value changes on this node. On each value change valueConsumer function is called with value of the nodes value_type and UTC Unix timestamp in nanoseconds (nanoseconds from 01.01.1970). Timestamp refers to the time of value change in connected application on target controller. + If the client detects a time synchronization offset larger than 3 seconds, + incoming timestamps are automatically adjusted by that offset. The offset is + determined by sending three consecutive time-sync requests and using the + smallest offset reported. - Example diff --git a/index.js b/index.js index ba5dab8..9d97ba7 100644 --- a/index.js +++ b/index.js @@ -322,6 +322,7 @@ studio.protocol.BINARY_TYPE = "arraybuffer"; */ studio.internal = (function(proto) { var obj = {}; + var hostTimeOffsets = new Map(); obj.structure = { REMOVE: 0, @@ -715,6 +716,7 @@ studio.internal = (function(proto) { var appName = ""; var appId = undefined; var appUrl = composeUrl(url); + var host = new URL(appUrl).hostname; var socket = new WebSocket(appUrl); var handler = new proto.Handler(socket, notificationListener); var requests = []; @@ -725,6 +727,9 @@ studio.internal = (function(proto) { var onError; var onOpen; var reauthRequestPending = false; + var timeOffsetNs = hostTimeOffsets.get(host) || 0; + var timeSyncTimer = null; + var timeSyncSamples = []; socket.binaryType = proto.BINARY_TYPE; nodeMap.set(systemNode.id(), systemNode); handler.onContainer = handleIncomingContainer; @@ -747,7 +752,13 @@ studio.internal = (function(proto) { onMessage = function(evt) { handler.handle(evt.data); }; onError = function (ev) { console.log("Socket error: " + ev.data); }; - onOpen = function() { appConnection.resubscribe(systemNode); }; + onOpen = function() { + appConnection.resubscribe(systemNode); + startTimeSync(); + if (!timeSyncTimer) { + timeSyncTimer = setInterval(startTimeSync, 30000); + } + }; onClosed = function (event) { var reason; @@ -781,6 +792,7 @@ studio.internal = (function(proto) { reason = "Unknown reason"; console.log("Socket close: " + reason); + stopTimeSync(); if (autoConnect) { @@ -834,6 +846,55 @@ studio.internal = (function(proto) { requests = []; } + function makeCurrentTimeRequest() { + var msg = new proto.Container(); + msg.message_type = proto.ContainerType.eCurrentTimeRequest; + send(msg); + } + + function setTimeOffset(offset) { + timeOffsetNs = offset; + hostTimeOffsets.set(host, offset); + } + + function beginTimeSync() { + if (timeSyncSamples.length === 0) { + makeCurrentTimeRequest(); + } + } + + function parseCurrentTimeResponse(serverTime) { + var localNs = Date.now() * 1000000; + var offsetCandidate = serverTime - localNs; + timeSyncSamples.push(offsetCandidate); + if (timeSyncSamples.length < 3) { + makeCurrentTimeRequest(); + return; + } + var minOffset = Math.min.apply(null, timeSyncSamples); + timeSyncSamples = []; + if (Math.abs(minOffset) > 3000000000) { + setTimeOffset(minOffset); + } + } + + function startTimeSync() { + beginTimeSync(); + } + + function stopTimeSync() { + if (timeSyncTimer) { + clearInterval(timeSyncTimer); + timeSyncTimer = null; + } + } + + function applyTimeOffset(ts) { + if (ts === undefined || ts === null) + return ts; + return ts + (hostTimeOffsets.get(host) || 0); + } + this.makeStructureRequest = function(id) { var msg = new proto.Container(); msg.message_type = proto.ContainerType.eStructureRequest; @@ -990,7 +1051,7 @@ studio.internal = (function(proto) { var variantValue = protoResponse[i]; var node = nodeMap.get(variantValue.node_id); if (node) - node.receiveValue(proto.valueFromVariant(variantValue, node.info().value_type), variantValue.timestamp); + node.receiveValue(proto.valueFromVariant(variantValue, node.info().value_type), applyTimeOffset(variantValue.timestamp)); } } @@ -1014,7 +1075,7 @@ studio.internal = (function(proto) { sender: variantValue.sender, code: variantValue.code, status: variantValue.status, - timestamp: variantValue.timestamp, + timestamp: applyTimeOffset(variantValue.timestamp), data: variantValue.data }; node.receiveEvent(event); @@ -1068,6 +1129,7 @@ studio.internal = (function(proto) { parseEventResponse(protoContainer.event_response); break; case proto.ContainerType.eCurrentTimeResponse: + parseCurrentTimeResponse(protoContainer.current_time_response); break; case proto.ContainerType.eReauthResponse: parseReauthResponse(protoContainer.re_auth_response, metadata);