support callbacks on sync request calls in PullBox OperationRouter (and remove 2sec close wait hack)

too bad this loses the nice singleTimeCallbackOnCompleteServerOpSpecsPlay var ;)
This commit is contained in:
Friedrich W. H. Kossebau 2013-09-02 02:29:04 +02:00
parent 880101b221
commit 64e8ea689b

View File

@ -57,27 +57,31 @@ define("webodf/editor/server/pullbox/OperationRouter", [], function () {
"use strict"; "use strict";
var operationFactory, var operationFactory,
singleTimeCallbackOnCompleteServerOpSpecsPlay,
/**@type{function(!ops.Operation)}*/ /**@type{function(!ops.Operation)}*/
playbackFunction, playbackFunction,
/**@type{?{active:!boolean}}*/ syncOpsTimeout = null,
pullingTimeOutFlag = null,
/**@type{!boolean}*/ /**@type{!boolean}*/
triggerPushingOpsActivated = false, isInstantSyncRequested = false,
/**@type{!boolean}*/ /**@type{!boolean}*/
playUnplayedServerOpSpecsTriggered = false, isPushingOpsTriggered = false,
/**@type{!boolean}*/ /**@type{!boolean}*/
syncLock = false, isPlayingUnplayedServerOpSpecs = false,
/**@type{!boolean}*/
isSyncCallRunning = false,
/**@type{!boolean}*/ /**@type{!boolean}*/
hasUnresolvableConflict = false, hasUnresolvableConflict = false,
/**@type{!boolean}*/ /**@type{!boolean}*/
syncingBlocked = false, syncingBlocked = false,
/** @type {!string} id of latest op stack state known on the server */ /** @type {!string} id of latest op stack state known on the server */
lastServerSeq = "", lastServerSeq = "",
/** @type {!Array.<!Function>} sync request callbacks created since the last sync call to the server */
syncRequestCallbacksQueue = [],
/** @type {!Array.<!Object>} ops created since the last sync call to the server */ /** @type {!Array.<!Object>} ops created since the last sync call to the server */
unsyncedClientOpspecQueue = [], unsyncedClientOpspecQueue = [],
/** @type {!Array.<!Object>} ops created since the last sync call to the server */ /** @type {!Array.<!Object>} ops already received from the server but not yet applied */
unplayedServerOpspecQueue = [], unplayedServerOpspecQueue = [],
/** @type {!Array.<!Function>} sync request callbacks which should be called after the received ops have been applied server */
uncalledSyncRequestCallbacksQueue = [],
/** @type {!Array.<!function(!boolean):undefined>} ops created since the last sync call to the server */ /** @type {!Array.<!function(!boolean):undefined>} ops created since the last sync call to the server */
hasLocalUnsyncedOpsStateSubscribers = [], hasLocalUnsyncedOpsStateSubscribers = [],
/**@type{!boolean}*/ /**@type{!boolean}*/
@ -149,9 +153,9 @@ runtime.log("Merged: from "+opspecs.length+" to "+result.length+" specs");
* @return {undefined} * @return {undefined}
*/ */
function doPlayUnplayedServerOpSpecs() { function doPlayUnplayedServerOpSpecs() {
var opspec, op, startTime; var opspec, op, startTime, i;
playUnplayedServerOpSpecsTriggered = false; isPlayingUnplayedServerOpSpecs = false;
// take start time // take start time
startTime = (new Date()).getTime(); startTime = (new Date()).getTime();
@ -178,19 +182,19 @@ runtime.log("Merged: from "+opspecs.length+" to "+result.length+" specs");
// still unplayed opspecs? // still unplayed opspecs?
if (unplayedServerOpspecQueue.length > 0) { if (unplayedServerOpspecQueue.length > 0) {
// let other events be handled. then continue // let other events be handled. then continue
playUnplayedServerOpSpecsTriggered = true; isPlayingUnplayedServerOpSpecs = true;
runtime.getWindow().setTimeout(doPlayUnplayedServerOpSpecs, 1); runtime.getWindow().setTimeout(doPlayUnplayedServerOpSpecs, 1);
} else { } else {
// This is such a sad hack. But there is no other way for now to inject // finally call all the callbacks waiting for that sync!
// the callback after the initial replay. for (i = 0; i < uncalledSyncRequestCallbacksQueue.length; i += 1) {
if (singleTimeCallbackOnCompleteServerOpSpecsPlay) { uncalledSyncRequestCallbacksQueue[i]();
singleTimeCallbackOnCompleteServerOpSpecsPlay();
singleTimeCallbackOnCompleteServerOpSpecsPlay = null;
} }
uncalledSyncRequestCallbacksQueue = [];
} }
} }
if (playUnplayedServerOpSpecsTriggered) { if (isPlayingUnplayedServerOpSpecs) {
return; return;
} }
doPlayUnplayedServerOpSpecs(); doPlayUnplayedServerOpSpecs();
@ -198,11 +202,13 @@ runtime.log("Merged: from "+opspecs.length+" to "+result.length+" specs");
/** /**
* @param {Array.<!Object>} opspecs * @param {Array.<!Object>} opspecs
* @param {Array.<!Function>} callbacks
* @return {undefined} * @return {undefined}
*/ */
function receiveOpSpecsFromNetwork(opspecs) { function receiveOpSpecsFromNetwork(opspecs, callbacks) {
// append to existing unplayed // append to existing unplayed
unplayedServerOpspecQueue = unplayedServerOpspecQueue.concat(opspecs); unplayedServerOpspecQueue = unplayedServerOpspecQueue.concat(opspecs);
uncalledSyncRequestCallbacksQueue = uncalledSyncRequestCallbacksQueue.concat(callbacks);
} }
/** /**
@ -245,146 +251,154 @@ runtime.log("Merged: from "+opspecs.length+" to "+result.length+" specs");
* @return {undefined} * @return {undefined}
*/ */
function syncOps() { function syncOps() {
function triggerPullingOps() { var syncedClientOpspecs,
var flag = {active: true}; syncRequestCallbacksArray;
// provide flag globally
pullingTimeOutFlag = flag; if (isSyncCallRunning || hasUnresolvableConflict) {
runtime.getWindow().setTimeout(function() { return;
runtime.log("Pulling activated:" + flag.active); }
// remove our flag // TODO: hack, remove
pullingTimeOutFlag = null; if (syncingBlocked) {
if (flag.active) { return;
syncOps();
}
}, pullingIntervall);
} }
/** // no more timeout or instant pull request in any case
* @return {undefined} syncOpsTimeout = null;
*/ isInstantSyncRequested = false;
function doSyncOps() { // set lock
var syncedClientOpspecs; isSyncCallRunning = true;
if (syncLock || hasUnresolvableConflict) { // take specs from queue, if any
return; syncedClientOpspecs = unsyncedClientOpspecQueue;
unsyncedClientOpspecQueue = [];
syncRequestCallbacksArray = syncRequestCallbacksQueue;
syncRequestCallbacksQueue = [];
server.call({
command: 'sync_ops',
args: {
es_id: sessionId,
member_id: memberId,
seq_head: String(lastServerSeq),
client_ops: syncedClientOpspecs
} }
}, function(responseData) {
var response = /** @type{{result:string, head_seq:string, ops:Array.<!Object>}} */(runtime.fromJson(responseData));
// TODO: hack, remove // TODO: hack, remove
if (syncingBlocked) { if (syncingBlocked) {
return; return;
} }
syncLock = true; runtime.log("sync-ops reply: " + responseData);
// take specs from queue, if any // just new ops?
syncedClientOpspecs = unsyncedClientOpspecQueue; if (response.result === "new_ops") {
unsyncedClientOpspecQueue = []; if (response.ops.length > 0) {
// no new locally in the meantime?
server.call({ if (unsyncedClientOpspecQueue.length === 0) {
command: 'sync_ops', receiveOpSpecsFromNetwork(compressOpSpecs(response.ops), syncRequestCallbacksArray);
args: { } else {
es_id: sessionId, // transform server ops against new local ones and apply,
member_id: memberId, // transform and send new local ops to server
seq_head: String(lastServerSeq), runtime.log("meh, have new ops locally meanwhile, have to do transformations.");
client_ops: syncedClientOpspecs hasUnresolvableConflict = !handleOpsSyncConflict(compressOpSpecs(response.ops));
} syncRequestCallbacksQueue = syncRequestCallbacksArray.concat(syncRequestCallbacksQueue);
}, function(responseData) { }
var shouldRetryInstantly = false,
response = /** @type{{result:string, head_seq:string, ops:Array.<!Object>}} */(runtime.fromJson(responseData));
// TODO: hack, remove
if (syncingBlocked) {
return;
}
runtime.log("sync-ops reply: " + responseData);
// just new ops?
if (response.result === "new_ops") {
if (response.ops.length > 0) {
// no new locally in the meantime?
if (unsyncedClientOpspecQueue.length === 0) {
receiveOpSpecsFromNetwork(compressOpSpecs(response.ops));
} else {
// transform server ops against new local ones and apply,
// transform and send new local ops to server
runtime.log("meh, have new ops locally meanwhile, have to do transformations.");
hasUnresolvableConflict = !handleOpsSyncConflict(compressOpSpecs(response.ops));
}
// and note server state
lastServerSeq = response.head_seq;
}
} else if (response.result === "added") {
runtime.log("All added to server");
// note server state
lastServerSeq = response.head_seq;
updateHasLocalUnsyncedOpsState();
} else if (response.result === "conflict") {
// put the send ops back into the outgoing queue
unsyncedClientOpspecQueue = syncedClientOpspecs.concat(unsyncedClientOpspecQueue);
// transform server ops against new local ones and apply,
// transform and request new send new local ops to server
runtime.log("meh, server has new ops meanwhile, have to do transformations.");
hasUnresolvableConflict = !handleOpsSyncConflict(compressOpSpecs(response.ops));
// and note server state // and note server state
lastServerSeq = response.head_seq; lastServerSeq = response.head_seq;
// try again instantly
if (!hasUnresolvableConflict) {
shouldRetryInstantly = true;
}
} else {
runtime.assert(false, "Unexpected result on sync-ops call: "+response.result);
} }
} else if (response.result === "added") {
syncLock = false; runtime.log("All added to server");
receiveOpSpecsFromNetwork([], syncRequestCallbacksArray);
if (hasUnresolvableConflict) { // note server state
// TODO: offer option to reload session automatically? lastServerSeq = response.head_seq;
runtime.assert(false, updateHasLocalUnsyncedOpsState();
"Sorry to tell:\n" + } else if (response.result === "conflict") {
"we hit a pair of operations in a state which yet need to be supported for transformation against each other.\n" + // put the send ops back into the outgoing queue
"Client disconnected from session, no further editing accepted.\n\n" + unsyncedClientOpspecQueue = syncedClientOpspecs.concat(unsyncedClientOpspecQueue);
"Please reconnect manually for now."); syncRequestCallbacksQueue = syncRequestCallbacksArray.concat(syncRequestCallbacksQueue);
} else { // transform server ops against new local ones and apply,
if (shouldRetryInstantly) { // transform and request new send new local ops to server
doSyncOps(); runtime.log("meh, server has new ops meanwhile, have to do transformations.");
} else { hasUnresolvableConflict = !handleOpsSyncConflict(compressOpSpecs(response.ops));
runtime.log("Preparing next: " + (unsyncedClientOpspecQueue.length === 0)); // and note server state
// prepare next sync lastServerSeq = response.head_seq;
// nothing to push right now? // try again instantly
if (unsyncedClientOpspecQueue.length === 0) { if (!hasUnresolvableConflict) {
triggerPullingOps(); isInstantSyncRequested = true;
}
}
playUnplayedServerOpSpecs();
} }
}); } else {
} runtime.assert(false, "Unexpected result on sync-ops call: "+response.result);
doSyncOps(); }
// unlock
isSyncCallRunning = false;
if (hasUnresolvableConflict) {
// TODO: offer option to reload session automatically?
runtime.assert(false,
"Sorry to tell:\n" +
"we hit a pair of operations in a state which yet need to be supported for transformation against each other.\n" +
"Client disconnected from session, no further editing accepted.\n\n" +
"Please reconnect manually for now.");
} else {
// prepare next sync
if (isInstantSyncRequested) {
syncOps();
} else {
syncOpsTimeout = runtime.getWindow().setTimeout(function() {
syncOpsTimeout = null;
syncOps();
}, (unsyncedClientOpspecQueue.length === 0) ? pullingIntervall : pushingIntervall);
}
playUnplayedServerOpSpecs();
}
});
} }
function triggerPushingOps() { function triggerPushingOps() {
if (syncLock || triggerPushingOpsActivated) { if (isSyncCallRunning || isPushingOpsTriggered) {
return; return;
} }
triggerPushingOpsActivated = true; isPushingOpsTriggered = true;
// disable current pulling timeout // disable current pulling timeout
if (pullingTimeOutFlag) { if (syncOpsTimeout) {
pullingTimeOutFlag.active = false; runtime.clearTimeout(syncOpsTimeout);
syncOpsTimeout = null;
} }
// TODO: how stupid! if the pulling timeout was close to done, this will extend it
// solution: split pulling into two timeouts, with second as short as pushing,
// and only cancel the first half
runtime.getWindow().setTimeout(function() { runtime.getWindow().setTimeout(function() {
runtime.log("Pushing activated"); runtime.log("Pushing activated");
triggerPushingOpsActivated = false; isPushingOpsTriggered = false;
syncOps(); syncOps();
}, pushingIntervall); }, pushingIntervall);
} }
this.requestReplay = function (done_cb) { /**
singleTimeCallbackOnCompleteServerOpSpecsPlay = done_cb; * @param {!Funtion} cb
* @return {undefined}
*/
function requestInstantOpsSync(cb) {
syncRequestCallbacksQueue.push(cb);
// disable current pulling timeout
if (syncOpsTimeout) {
runtime.clearTimeout(syncOpsTimeout);
syncOpsTimeout = null;
}
syncOps(); syncOps();
}; };
this.requestReplay = function (done_cb) {
requestInstantOpsSync(done_cb);
};
/** /**
* Sets the factory to use to create operation instances from operation specs. * Sets the factory to use to create operation instances from operation specs.
* *
@ -450,23 +464,24 @@ runtime.log("Pushing activated");
* A callback is called on success. * A callback is called on success.
*/ */
this.close = function (cb) { this.close = function (cb) {
function writeSessionStateToFile() { function cbSuccess(fileData) {
function cbSuccess(fileData) { server.writeSessionStateToFile(sessionId, memberId, lastServerSeq, fileData, cb);
server.writeSessionStateToFile(sessionId, memberId, lastServerSeq, fileData, cb);
};
odfContainer.createByteArray(cbSuccess, cb);
} }
// TODO: hack, rather add callback to syncOps for success and properly close things function doClose() {
syncOps();
runtime.getWindow().setTimeout(function() {
syncingBlocked = true; syncingBlocked = true;
if (hasPushedModificationOps) { if (hasPushedModificationOps) {
writeSessionStateToFile(); odfContainer.createByteArray(cbSuccess, cb);
} else { } else {
cb(); cb();
} }
}, 2000); }
if (hasLocalUnsyncedOps) {
requestInstantOpsSync(doClose);
} else {
doClose();
}
}; };
this.getHasLocalUnsyncedOpsAndUpdates = function (subscriber) { this.getHasLocalUnsyncedOpsAndUpdates = function (subscriber) {