diff --git a/js/editor/server/pullbox/OperationRouter.js b/js/editor/server/pullbox/OperationRouter.js index 9fc13a70..7db2a53a 100644 --- a/js/editor/server/pullbox/OperationRouter.js +++ b/js/editor/server/pullbox/OperationRouter.js @@ -57,27 +57,31 @@ define("webodf/editor/server/pullbox/OperationRouter", [], function () { "use strict"; var operationFactory, - singleTimeCallbackOnCompleteServerOpSpecsPlay, /**@type{function(!ops.Operation)}*/ playbackFunction, - /**@type{?{active:!boolean}}*/ - pullingTimeOutFlag = null, + syncOpsTimeout = null, /**@type{!boolean}*/ - triggerPushingOpsActivated = false, + isInstantSyncRequested = false, /**@type{!boolean}*/ - playUnplayedServerOpSpecsTriggered = false, + isPushingOpsTriggered = false, /**@type{!boolean}*/ - syncLock = false, + isPlayingUnplayedServerOpSpecs = false, + /**@type{!boolean}*/ + isSyncCallRunning = false, /**@type{!boolean}*/ hasUnresolvableConflict = false, /**@type{!boolean}*/ syncingBlocked = false, /** @type {!string} id of latest op stack state known on the server */ lastServerSeq = "", + /** @type {!Array.} sync request callbacks created since the last sync call to the server */ + syncRequestCallbacksQueue = [], /** @type {!Array.} ops created since the last sync call to the server */ unsyncedClientOpspecQueue = [], - /** @type {!Array.} ops created since the last sync call to the server */ + /** @type {!Array.} ops already received from the server but not yet applied */ unplayedServerOpspecQueue = [], + /** @type {!Array.} sync request callbacks which should be called after the received ops have been applied server */ + uncalledSyncRequestCallbacksQueue = [], /** @type {!Array.} ops created since the last sync call to the server */ hasLocalUnsyncedOpsStateSubscribers = [], /**@type{!boolean}*/ @@ -149,9 +153,9 @@ runtime.log("Merged: from "+opspecs.length+" to "+result.length+" specs"); * @return {undefined} */ function doPlayUnplayedServerOpSpecs() { - var opspec, op, startTime; + var opspec, op, startTime, i; - playUnplayedServerOpSpecsTriggered = false; + isPlayingUnplayedServerOpSpecs = false; // take start time startTime = (new Date()).getTime(); @@ -178,19 +182,19 @@ runtime.log("Merged: from "+opspecs.length+" to "+result.length+" specs"); // still unplayed opspecs? if (unplayedServerOpspecQueue.length > 0) { // let other events be handled. then continue - playUnplayedServerOpSpecsTriggered = true; + isPlayingUnplayedServerOpSpecs = true; runtime.getWindow().setTimeout(doPlayUnplayedServerOpSpecs, 1); } else { - // This is such a sad hack. But there is no other way for now to inject - // the callback after the initial replay. - if (singleTimeCallbackOnCompleteServerOpSpecsPlay) { - singleTimeCallbackOnCompleteServerOpSpecsPlay(); - singleTimeCallbackOnCompleteServerOpSpecsPlay = null; + // finally call all the callbacks waiting for that sync! + for (i = 0; i < uncalledSyncRequestCallbacksQueue.length; i += 1) { + uncalledSyncRequestCallbacksQueue[i](); } + + uncalledSyncRequestCallbacksQueue = []; } } - if (playUnplayedServerOpSpecsTriggered) { + if (isPlayingUnplayedServerOpSpecs) { return; } doPlayUnplayedServerOpSpecs(); @@ -198,11 +202,13 @@ runtime.log("Merged: from "+opspecs.length+" to "+result.length+" specs"); /** * @param {Array.} opspecs + * @param {Array.} callbacks * @return {undefined} */ - function receiveOpSpecsFromNetwork(opspecs) { + function receiveOpSpecsFromNetwork(opspecs, callbacks) { // append to existing unplayed unplayedServerOpspecQueue = unplayedServerOpspecQueue.concat(opspecs); + uncalledSyncRequestCallbacksQueue = uncalledSyncRequestCallbacksQueue.concat(callbacks); } /** @@ -245,146 +251,154 @@ runtime.log("Merged: from "+opspecs.length+" to "+result.length+" specs"); * @return {undefined} */ function syncOps() { - function triggerPullingOps() { - var flag = {active: true}; - // provide flag globally - pullingTimeOutFlag = flag; - runtime.getWindow().setTimeout(function() { -runtime.log("Pulling activated:" + flag.active); - // remove our flag - pullingTimeOutFlag = null; - if (flag.active) { - syncOps(); - } - }, pullingIntervall); + var syncedClientOpspecs, + syncRequestCallbacksArray; + + if (isSyncCallRunning || hasUnresolvableConflict) { + return; + } + // TODO: hack, remove + if (syncingBlocked) { + return; } - /** - * @return {undefined} - */ - function doSyncOps() { - var syncedClientOpspecs; + // no more timeout or instant pull request in any case + syncOpsTimeout = null; + isInstantSyncRequested = false; + // set lock + isSyncCallRunning = true; - if (syncLock || hasUnresolvableConflict) { - return; + // take specs from queue, if any + syncedClientOpspecs = unsyncedClientOpspecQueue; + unsyncedClientOpspecQueue = []; + syncRequestCallbacksArray = syncRequestCallbacksQueue; + syncRequestCallbacksQueue = []; + + server.call({ + command: 'sync_ops', + args: { + es_id: sessionId, + member_id: memberId, + seq_head: String(lastServerSeq), + client_ops: syncedClientOpspecs } + }, function(responseData) { + var response = /** @type{{result:string, head_seq:string, ops:Array.}} */(runtime.fromJson(responseData)); + // TODO: hack, remove if (syncingBlocked) { return; } - syncLock = true; + runtime.log("sync-ops reply: " + responseData); - // take specs from queue, if any - syncedClientOpspecs = unsyncedClientOpspecQueue; - unsyncedClientOpspecQueue = []; - - server.call({ - command: 'sync_ops', - args: { - es_id: sessionId, - member_id: memberId, - seq_head: String(lastServerSeq), - client_ops: syncedClientOpspecs - } - }, function(responseData) { - var shouldRetryInstantly = false, - response = /** @type{{result:string, head_seq:string, ops:Array.}} */(runtime.fromJson(responseData)); - - // TODO: hack, remove - if (syncingBlocked) { - return; - } - - runtime.log("sync-ops reply: " + responseData); - - // just new ops? - if (response.result === "new_ops") { - if (response.ops.length > 0) { - // no new locally in the meantime? - if (unsyncedClientOpspecQueue.length === 0) { - receiveOpSpecsFromNetwork(compressOpSpecs(response.ops)); - } else { - // transform server ops against new local ones and apply, - // transform and send new local ops to server - runtime.log("meh, have new ops locally meanwhile, have to do transformations."); - hasUnresolvableConflict = !handleOpsSyncConflict(compressOpSpecs(response.ops)); - } - // and note server state - lastServerSeq = response.head_seq; - } - } else if (response.result === "added") { - runtime.log("All added to server"); - // note server state - lastServerSeq = response.head_seq; - updateHasLocalUnsyncedOpsState(); - } else if (response.result === "conflict") { - // put the send ops back into the outgoing queue - unsyncedClientOpspecQueue = syncedClientOpspecs.concat(unsyncedClientOpspecQueue); - // transform server ops against new local ones and apply, - // transform and request new send new local ops to server - runtime.log("meh, server has new ops meanwhile, have to do transformations."); - hasUnresolvableConflict = !handleOpsSyncConflict(compressOpSpecs(response.ops)); + // just new ops? + if (response.result === "new_ops") { + if (response.ops.length > 0) { + // no new locally in the meantime? + if (unsyncedClientOpspecQueue.length === 0) { + receiveOpSpecsFromNetwork(compressOpSpecs(response.ops), syncRequestCallbacksArray); + } else { + // transform server ops against new local ones and apply, + // transform and send new local ops to server + runtime.log("meh, have new ops locally meanwhile, have to do transformations."); + hasUnresolvableConflict = !handleOpsSyncConflict(compressOpSpecs(response.ops)); + syncRequestCallbacksQueue = syncRequestCallbacksArray.concat(syncRequestCallbacksQueue); + } // and note server state lastServerSeq = response.head_seq; - // try again instantly - if (!hasUnresolvableConflict) { - shouldRetryInstantly = true; - } - } else { - runtime.assert(false, "Unexpected result on sync-ops call: "+response.result); } - - syncLock = false; - - if (hasUnresolvableConflict) { - // TODO: offer option to reload session automatically? - runtime.assert(false, - "Sorry to tell:\n" + - "we hit a pair of operations in a state which yet need to be supported for transformation against each other.\n" + - "Client disconnected from session, no further editing accepted.\n\n" + - "Please reconnect manually for now."); - } else { - if (shouldRetryInstantly) { - doSyncOps(); - } else { -runtime.log("Preparing next: " + (unsyncedClientOpspecQueue.length === 0)); - // prepare next sync - // nothing to push right now? - if (unsyncedClientOpspecQueue.length === 0) { - triggerPullingOps(); - } - } - playUnplayedServerOpSpecs(); + } else if (response.result === "added") { + runtime.log("All added to server"); + receiveOpSpecsFromNetwork([], syncRequestCallbacksArray); + // note server state + lastServerSeq = response.head_seq; + updateHasLocalUnsyncedOpsState(); + } else if (response.result === "conflict") { + // put the send ops back into the outgoing queue + unsyncedClientOpspecQueue = syncedClientOpspecs.concat(unsyncedClientOpspecQueue); + syncRequestCallbacksQueue = syncRequestCallbacksArray.concat(syncRequestCallbacksQueue); + // transform server ops against new local ones and apply, + // transform and request new send new local ops to server + runtime.log("meh, server has new ops meanwhile, have to do transformations."); + hasUnresolvableConflict = !handleOpsSyncConflict(compressOpSpecs(response.ops)); + // and note server state + lastServerSeq = response.head_seq; + // try again instantly + if (!hasUnresolvableConflict) { + isInstantSyncRequested = true; } - }); - } - doSyncOps(); + } else { + runtime.assert(false, "Unexpected result on sync-ops call: "+response.result); + } + + // unlock + isSyncCallRunning = false; + + if (hasUnresolvableConflict) { + // TODO: offer option to reload session automatically? + runtime.assert(false, + "Sorry to tell:\n" + + "we hit a pair of operations in a state which yet need to be supported for transformation against each other.\n" + + "Client disconnected from session, no further editing accepted.\n\n" + + "Please reconnect manually for now."); + } else { + // prepare next sync + if (isInstantSyncRequested) { + syncOps(); + } else { + syncOpsTimeout = runtime.getWindow().setTimeout(function() { + syncOpsTimeout = null; + syncOps(); + }, (unsyncedClientOpspecQueue.length === 0) ? pullingIntervall : pushingIntervall); + } + playUnplayedServerOpSpecs(); + } + }); } function triggerPushingOps() { - if (syncLock || triggerPushingOpsActivated) { + if (isSyncCallRunning || isPushingOpsTriggered) { return; } - triggerPushingOpsActivated = true; + isPushingOpsTriggered = true; // disable current pulling timeout - if (pullingTimeOutFlag) { - pullingTimeOutFlag.active = false; + if (syncOpsTimeout) { + runtime.clearTimeout(syncOpsTimeout); + syncOpsTimeout = null; } + // TODO: how stupid! if the pulling timeout was close to done, this will extend it + // solution: split pulling into two timeouts, with second as short as pushing, + // and only cancel the first half runtime.getWindow().setTimeout(function() { runtime.log("Pushing activated"); - triggerPushingOpsActivated = false; + isPushingOpsTriggered = false; syncOps(); }, pushingIntervall); } - this.requestReplay = function (done_cb) { - singleTimeCallbackOnCompleteServerOpSpecsPlay = done_cb; + /** + * @param {!Funtion} cb + * @return {undefined} + */ + function requestInstantOpsSync(cb) { + syncRequestCallbacksQueue.push(cb); + + // disable current pulling timeout + if (syncOpsTimeout) { + runtime.clearTimeout(syncOpsTimeout); + syncOpsTimeout = null; + } + syncOps(); }; + this.requestReplay = function (done_cb) { + requestInstantOpsSync(done_cb); + }; + /** * Sets the factory to use to create operation instances from operation specs. * @@ -450,23 +464,24 @@ runtime.log("Pushing activated"); * A callback is called on success. */ this.close = function (cb) { - function writeSessionStateToFile() { - function cbSuccess(fileData) { - server.writeSessionStateToFile(sessionId, memberId, lastServerSeq, fileData, cb); - }; - odfContainer.createByteArray(cbSuccess, cb); + function cbSuccess(fileData) { + server.writeSessionStateToFile(sessionId, memberId, lastServerSeq, fileData, cb); } - // TODO: hack, rather add callback to syncOps for success and properly close things - syncOps(); - runtime.getWindow().setTimeout(function() { + function doClose() { syncingBlocked = true; if (hasPushedModificationOps) { - writeSessionStateToFile(); + odfContainer.createByteArray(cbSuccess, cb); } else { cb(); } - }, 2000); + } + + if (hasLocalUnsyncedOps) { + requestInstantOpsSync(doClose); + } else { + doClose(); + } }; this.getHasLocalUnsyncedOpsAndUpdates = function (subscriber) {