Subscription class
class Subscription { // constructor arguments: String collectionName; DataSet collection; Connection _connection; // author field is not used anymore; we are keeping it in the DB mainly for debugging // and logging purposes String _author; IdGenerator _idGenerator; Function _handleData = handleData; Function _handleDiff = handleDiff; // Used for testing and debugging. If true, data (instead of diff) is // requested periodically. bool _forceDataRequesting = false; Map args = {}; // Maps _id of a document to Future, that completes when server response // to document's update is completed Map<String, Future> _sentItems = {}; // reflects changes to this.collection, that were not already sent to the server ChangeSet _modifiedItems = new ChangeSet(); // flag used to prevent subscription to have multiple get_diff requests 'pending'. // This is mainly solved by clean_ajax itself; however, following is still possible: // 1. send_diff // 2. response obtained, response listener notified, end // 3. send_diff // 4. response listener process diff requested in step 1. // clearly, send_diff in step 3 can and should be avoided. bool requestLock = false; // this is another approach to obtain functionality formerly provided by clean_data // authors; when applying changes obtained from server, use this flag to // prevent detection and re-sending of these changes to the server bool updateLock = false; // all changes with version < _version MUST be already applied by this subscription. // Some of the later changes may also be applied; this happens, when collection // applies user change, but is not synced to the very last version at that moment. num _version = 0; // version exposed only for testing and debugging get version => _version; String toString() => 'Subscription(${_author}, ver: ${_version})'; Completer _initialSync = new Completer(); List<StreamSubscription> _subscriptions = []; StreamController _errorStreamController; Stream get errorStream { if (!_initialSync.isCompleted) throw new StateError("Initial sync not complete yet!"); return _errorStreamController.stream; } /// Completes after first request to get data is answered and handled. Future get initialSync => _initialSync.future; Subscription.config(this.collectionName, this.collection, this._connection, this._author, this._idGenerator, this._handleData, this._handleDiff, this._forceDataRequesting, [this.args]); Subscription(this.collectionName, this._connection, this._author, this._idGenerator, [this.args]) { collection = new DataSet(); collection.addIndex(['_id']); _errorStreamController = new StreamController.broadcast(); start(); } /** * Waits for initialSync of all provided subscriptions. */ static Future wait(List<Subscription> subscriptions) { return Future.wait( subscriptions.map((subscription) => subscription.initialSync)); } // TODO rename to something private-like void setupListeners() { _subscriptions.add(collection.onBeforeAdd.listen((data) { // if data["_id"] is null, it was added by this client and _id should be // assigned if(data["_id"] == null) { data["_id"] = _idGenerator.next(); } })); var change = new ChangeSet(); sendRequest(dynamic elem){ Future result = _connection.send((){ assert(_modifiedItems.changedItems.containsKey(elem)); var req; if (_modifiedItems.addedItems.contains(elem)) { req = new ClientRequest("sync", { "action" : "add", "collection" : collectionName, "data" : elem, 'args': args, "author" : _author }); } if (_modifiedItems.strictlyChanged.containsKey(elem)) { req = new ClientRequest("sync", { "action" : "change", "collection" : collectionName, 'args': args, "_id": elem["_id"], "change" : elem, "author" : _author }); } if (_modifiedItems.removedItems.contains(elem)) { req = new ClientRequest("sync", { "action" : "remove", "collection" : collectionName, 'args': args, "_id" : elem["_id"], "author" : _author }); } _modifiedItems.changedItems.remove(elem); return req; }).then((result){ if (result is Map) if (result['error'] != null) _errorStreamController.add(result['error']); return result; }).catchError((e){ if(e is! CancelError) throw e; }); var id = elem['_id']; _sentItems[id] = result; result.then((nextVersion){ // if the request that already finished was last request modifying // current field, mark the field as free if (_sentItems[id] == result) { _sentItems.remove(id); // if there are some more changes, sent them if (_modifiedItems.changedItems.containsKey(elem)){ sendRequest(elem); }; } }); }; _subscriptions.add(collection.onChangeSync.listen((event) { if (!this.updateLock) { ChangeSet change = event['change']; _modifiedItems.mergeIn(change); for (var key in change.changedItems.keys) { if (!_sentItems.containsKey(key['_id'])) { sendRequest(key); } } } })); } _createDataRequest() => new ClientRequest("sync", { "action" : "get_data", "collection" : collectionName, 'args': args }); _createDiffRequest() { if (requestLock || _sentItems.isNotEmpty) { return null; } else { requestLock = true; return new ClientRequest("sync", { "action" : "get_diff", "collection" : collectionName, 'args': args, "version" : _version }); } } // TODO rename to something private-like void setupDataRequesting() { // request initial data _connection.send(_createDataRequest).then((response) { if (response['error'] != null) { if (!_initialSync.isCompleted) _initialSync.completeError(new DatabaseAccessError(response['error'])); else _errorStreamController.add(new DatabaseAccessError(response['error'])); return; } _version = response['version']; _handleData(response['data'], this, _author); logger.info("Got initial data, synced to version ${_version}"); // TODO remove the check? (restart/dispose should to sth about initialSynd) if (!_initialSync.isCompleted) _initialSync.complete(); var subscription = _connection .sendPeriodically(_forceDataRequesting ? _createDataRequest : _createDiffRequest) .listen((response) { requestLock = false; // id data and version was sent, diff is set to null if (response['error'] != null) { throw new Exception(response['error']); } if(response['diff'] == null) { _version = response['version']; _handleData(response['data'], this, _author); } else { if(!response['diff'].isEmpty) { _version = max(_version, _handleDiff(response['diff'], this, _author)); } else { if (response.containsKey('version')) _version = response['version']; } } }, onError: (e){ if (e is! CancelError)throw e; }); _subscriptions.add(subscription); }); } void start() { setupListeners(); setupDataRequesting(); } Future dispose() { return Future.forEach(_subscriptions, (sub) => sub.cancel()); } Future close() { return dispose() .then((_) => Future.wait(_sentItems.values)) .then((_) => new Future.delayed(new Duration(milliseconds: 100), (){ collection.dispose(); })); } Future restart([Map args]) { this.args = args; return dispose().then((_) { start(); }); } Stream onClose() { } }
Static Methods
Future wait(List<Subscription> subscriptions) #
Waits for initialSync of all provided subscriptions.
static Future wait(List<Subscription> subscriptions) { return Future.wait( subscriptions.map((subscription) => subscription.initialSync)); }
Constructors
new Subscription(String collectionName, Connection _connection, String _author, IdGenerator _idGenerator, [Map args]) #
Creates a new Object instance.
Object instances have no meaningful state, and are only useful through their identity. An Object instance is equal to itself only.
docs inherited from Object
Subscription(this.collectionName, this._connection, this._author, this._idGenerator, [this.args]) { collection = new DataSet(); collection.addIndex(['_id']); _errorStreamController = new StreamController.broadcast(); start(); }
new Subscription.config(String collectionName, DataSet collection, Connection _connection, String _author, IdGenerator _idGenerator, Function _handleData, Function _handleDiff, bool _forceDataRequesting, [Map args]) #
Subscription.config(this.collectionName, this.collection, this._connection, this._author, this._idGenerator, this._handleData, this._handleDiff, this._forceDataRequesting, [this.args]);
Properties
final Stream errorStream #
Stream get errorStream { if (!_initialSync.isCompleted) throw new StateError("Initial sync not complete yet!"); return _errorStreamController.stream; }
final Future initialSync #
Completes after first request to get data is answered and handled.
Future get initialSync => _initialSync.future;
final version #
get version => _version;
Methods
Future close() #
Future close() { return dispose() .then((_) => Future.wait(_sentItems.values)) .then((_) => new Future.delayed(new Duration(milliseconds: 100), (){ collection.dispose(); })); }
Future dispose() #
Future dispose() { return Future.forEach(_subscriptions, (sub) => sub.cancel()); }
Future restart([Map args]) #
Future restart([Map args]) { this.args = args; return dispose().then((_) { start(); }); }
void setupDataRequesting() #
void setupDataRequesting() { // request initial data _connection.send(_createDataRequest).then((response) { if (response['error'] != null) { if (!_initialSync.isCompleted) _initialSync.completeError(new DatabaseAccessError(response['error'])); else _errorStreamController.add(new DatabaseAccessError(response['error'])); return; } _version = response['version']; _handleData(response['data'], this, _author); logger.info("Got initial data, synced to version ${_version}"); // TODO remove the check? (restart/dispose should to sth about initialSynd) if (!_initialSync.isCompleted) _initialSync.complete(); var subscription = _connection .sendPeriodically(_forceDataRequesting ? _createDataRequest : _createDiffRequest) .listen((response) { requestLock = false; // id data and version was sent, diff is set to null if (response['error'] != null) { throw new Exception(response['error']); } if(response['diff'] == null) { _version = response['version']; _handleData(response['data'], this, _author); } else { if(!response['diff'].isEmpty) { _version = max(_version, _handleDiff(response['diff'], this, _author)); } else { if (response.containsKey('version')) _version = response['version']; } } }, onError: (e){ if (e is! CancelError)throw e; }); _subscriptions.add(subscription); }); }
void setupListeners() #
void setupListeners() { _subscriptions.add(collection.onBeforeAdd.listen((data) { // if data["_id"] is null, it was added by this client and _id should be // assigned if(data["_id"] == null) { data["_id"] = _idGenerator.next(); } })); var change = new ChangeSet(); sendRequest(dynamic elem){ Future result = _connection.send((){ assert(_modifiedItems.changedItems.containsKey(elem)); var req; if (_modifiedItems.addedItems.contains(elem)) { req = new ClientRequest("sync", { "action" : "add", "collection" : collectionName, "data" : elem, 'args': args, "author" : _author }); } if (_modifiedItems.strictlyChanged.containsKey(elem)) { req = new ClientRequest("sync", { "action" : "change", "collection" : collectionName, 'args': args, "_id": elem["_id"], "change" : elem, "author" : _author }); } if (_modifiedItems.removedItems.contains(elem)) { req = new ClientRequest("sync", { "action" : "remove", "collection" : collectionName, 'args': args, "_id" : elem["_id"], "author" : _author }); } _modifiedItems.changedItems.remove(elem); return req; }).then((result){ if (result is Map) if (result['error'] != null) _errorStreamController.add(result['error']); return result; }).catchError((e){ if(e is! CancelError) throw e; }); var id = elem['_id']; _sentItems[id] = result; result.then((nextVersion){ // if the request that already finished was last request modifying // current field, mark the field as free if (_sentItems[id] == result) { _sentItems.remove(id); // if there are some more changes, sent them if (_modifiedItems.changedItems.containsKey(elem)){ sendRequest(elem); }; } }); }; _subscriptions.add(collection.onChangeSync.listen((event) { if (!this.updateLock) { ChangeSet change = event['change']; _modifiedItems.mergeIn(change); for (var key in change.changedItems.keys) { if (!_sentItems.containsKey(key['_id'])) { sendRequest(key); } } } })); }
void start() #
void start() { setupListeners(); setupDataRequesting(); }