Dart Documentationclean_sync.clientSubscription

Subscription class

class Subscription {
 // constructor arguments:
 String collectionName;
 DataSet collection;
 Connection _connection;
 // author field is not used anymore; we are keeping it in the DB mainly for debugging
 // and logging purposes
 String _author;
 IdGenerator _idGenerator;
 Function _handleData = handleData;
 Function _handleDiff = handleDiff;
 // Used for testing and debugging. If true, data (instead of diff) is
 // requested periodically.
 bool _forceDataRequesting = false;
 Map args = {};
 // Maps _id of a document to Future, that completes when server response
 // to document's update is completed
 Map<String, Future> _sentItems = {};
 // reflects changes to this.collection, that were not already sent to the server
 ChangeSet _modifiedItems = new ChangeSet();
 // flag used to prevent subscription to have multiple get_diff requests 'pending'.
 // This is mainly solved by clean_ajax itself; however, following is still possible:
 // 1. send_diff
 // 2. response obtained, response listener notified, end
 // 3. send_diff
 // 4. response listener process diff requested in step 1.
 // clearly, send_diff in step 3 can and should be avoided.
 bool requestLock = false;
 // this is another approach to obtain functionality formerly provided by clean_data
 // authors; when applying changes obtained from server, use this flag to
 // prevent detection and re-sending of these changes to the server
 bool updateLock = false;
 // all changes with version < _version MUST be already applied by this subscription.
 // Some of the later changes may also be applied; this happens, when collection
 // applies user change, but is not synced to the very last version at that moment.
 num _version = 0;

 // version exposed only for testing and debugging
 get version => _version;

 String toString() => 'Subscription(${_author}, ver: ${_version})';
 Completer _initialSync = new Completer();
 List<StreamSubscription> _subscriptions = [];
 StreamController _errorStreamController;
 Stream get errorStream {
   if (!_initialSync.isCompleted) throw new StateError("Initial sync not complete yet!");
   return _errorStreamController.stream;
 }

 /// Completes after first request to get data is answered and handled.
 Future get initialSync => _initialSync.future;

 Subscription.config(this.collectionName, this.collection, this._connection,
     this._author, this._idGenerator, this._handleData, this._handleDiff,
     this._forceDataRequesting, [this.args]);

 Subscription(this.collectionName, this._connection, this._author,
     this._idGenerator, [this.args]) {
   collection = new DataSet();
   collection.addIndex(['_id']);
   _errorStreamController = new StreamController.broadcast();
   start();
 }


 /**
  * Waits for initialSync of all provided subscriptions.
  */
 static Future wait(List<Subscription> subscriptions) {
   return Future.wait(
       subscriptions.map((subscription) => subscription.initialSync));
 }

 // TODO rename to something private-like
 void setupListeners() {
   _subscriptions.add(collection.onBeforeAdd.listen((data) {
     // if data["_id"] is null, it was added by this client and _id should be
     // assigned
     if(data["_id"] == null) {
       data["_id"] = _idGenerator.next();
     }
   }));

   var change = new ChangeSet();

   sendRequest(dynamic elem){
       Future result = _connection.send((){
         assert(_modifiedItems.changedItems.containsKey(elem));
         var req;
         if (_modifiedItems.addedItems.contains(elem)) {
           req = new ClientRequest("sync", {
             "action" : "add",
             "collection" : collectionName,
             "data" : elem,
             'args': args,
             "author" : _author
           });
         }
         if (_modifiedItems.strictlyChanged.containsKey(elem)) {
           req = new ClientRequest("sync", {
             "action" : "change",
             "collection" : collectionName,
             'args': args,
             "_id": elem["_id"],
             "change" : elem,
             "author" : _author
           });
         }
         if (_modifiedItems.removedItems.contains(elem)) {
           req = new ClientRequest("sync", {
             "action" : "remove",
             "collection" : collectionName,
             'args': args,
             "_id" : elem["_id"],
             "author" : _author
           });
         }
         _modifiedItems.changedItems.remove(elem);
         return req;
       }).then((result){
         if (result is Map)
           if (result['error'] != null)
             _errorStreamController.add(result['error']);
         return result;
       }).catchError((e){
         if(e is! CancelError) throw e;
       });

       var id = elem['_id'];
       _sentItems[id] = result;
       result.then((nextVersion){
         // if the request that already finished was last request modifying
         // current field, mark the field as free
         if (_sentItems[id] == result) {
           _sentItems.remove(id);
           // if there are some more changes, sent them
           if (_modifiedItems.changedItems.containsKey(elem)){
             sendRequest(elem);
           };
         }
       });
   };

   _subscriptions.add(collection.onChangeSync.listen((event) {
     if (!this.updateLock) {
       ChangeSet change = event['change'];
       _modifiedItems.mergeIn(change);
       for (var key in change.changedItems.keys) {
         if (!_sentItems.containsKey(key['_id'])) {
           sendRequest(key);
         }
       }
     }
   }));
 }

 _createDataRequest() => new ClientRequest("sync", {
   "action" : "get_data",
   "collection" : collectionName,
   'args': args
 });

 _createDiffRequest() {
   if (requestLock || _sentItems.isNotEmpty) {
     return null;
   } else {
     requestLock = true;
     return new ClientRequest("sync", {
     "action" : "get_diff",
     "collection" : collectionName,
     'args': args,
     "version" : _version
     });
   }
 }

 // TODO rename to something private-like
 void setupDataRequesting() {
   // request initial data
   _connection.send(_createDataRequest).then((response) {
     if (response['error'] != null) {
       if (!_initialSync.isCompleted) _initialSync.completeError(new DatabaseAccessError(response['error']));
       else _errorStreamController.add(new DatabaseAccessError(response['error']));
       return;
     }
     _version = response['version'];
     _handleData(response['data'], this, _author);

     logger.info("Got initial data, synced to version ${_version}");

     // TODO remove the check? (restart/dispose should to sth about initialSynd)
     if (!_initialSync.isCompleted) _initialSync.complete();

     var subscription = _connection
       .sendPeriodically(_forceDataRequesting ?
           _createDataRequest : _createDiffRequest)
       .listen((response) {
         requestLock = false;
         // id data and version was sent, diff is set to null
         if (response['error'] != null) {
           throw new Exception(response['error']);
         }
         if(response['diff'] == null) {
           _version = response['version'];
           _handleData(response['data'], this, _author);
         } else {
           if(!response['diff'].isEmpty) {
             _version = max(_version, _handleDiff(response['diff'], this, _author));
           } else {
               if (response.containsKey('version'))
                  _version = response['version'];
           }
         }
       }, onError: (e){
         if (e is! CancelError)throw e;
       });
     _subscriptions.add(subscription);
   });
 }

 void start() {
   setupListeners();
   setupDataRequesting();
 }

 Future dispose() {
   return Future.forEach(_subscriptions, (sub) => sub.cancel());
 }

 Future close() {
   return dispose()
     .then((_) =>
       Future.wait(_sentItems.values))
     .then((_) =>
        new Future.delayed(new Duration(milliseconds: 100), (){
         collection.dispose();
   }));
 }

 Future restart([Map args]) {
   this.args = args;
   return dispose().then((_) {
     start();
   });
 }

 Stream onClose() {

 }
}

Static Methods

Future wait(List<Subscription> subscriptions) #

Waits for initialSync of all provided subscriptions.

static Future wait(List<Subscription> subscriptions) {
 return Future.wait(
     subscriptions.map((subscription) => subscription.initialSync));
}

Constructors

new Subscription(String collectionName, Connection _connection, String _author, IdGenerator _idGenerator, [Map args]) #

Creates a new Object instance.

Object instances have no meaningful state, and are only useful through their identity. An Object instance is equal to itself only.

docs inherited from Object
Subscription(this.collectionName, this._connection, this._author,
   this._idGenerator, [this.args]) {
 collection = new DataSet();
 collection.addIndex(['_id']);
 _errorStreamController = new StreamController.broadcast();
 start();
}

new Subscription.config(String collectionName, DataSet collection, Connection _connection, String _author, IdGenerator _idGenerator, Function _handleData, Function _handleDiff, bool _forceDataRequesting, [Map args]) #

Subscription.config(this.collectionName, this.collection, this._connection,
   this._author, this._idGenerator, this._handleData, this._handleDiff,
   this._forceDataRequesting, [this.args]);

Properties

Map args #

Map args = {}

DataSet collection #

DataSet collection

String collectionName #

String collectionName

final Stream errorStream #

Stream get errorStream {
 if (!_initialSync.isCompleted) throw new StateError("Initial sync not complete yet!");
 return _errorStreamController.stream;
}

final Future initialSync #

Completes after first request to get data is answered and handled.

Future get initialSync => _initialSync.future;

bool requestLock #

bool requestLock = false

bool updateLock #

bool updateLock = false

final version #

get version => _version;

Methods

Future close() #

Future close() {
 return dispose()
   .then((_) =>
     Future.wait(_sentItems.values))
   .then((_) =>
      new Future.delayed(new Duration(milliseconds: 100), (){
       collection.dispose();
 }));
}

Future dispose() #

Future dispose() {
 return Future.forEach(_subscriptions, (sub) => sub.cancel());
}

Stream onClose() #

Stream onClose() {

}

Future restart([Map args]) #

Future restart([Map args]) {
 this.args = args;
 return dispose().then((_) {
   start();
 });
}

void setupDataRequesting() #

void setupDataRequesting() {
 // request initial data
 _connection.send(_createDataRequest).then((response) {
   if (response['error'] != null) {
     if (!_initialSync.isCompleted) _initialSync.completeError(new DatabaseAccessError(response['error']));
     else _errorStreamController.add(new DatabaseAccessError(response['error']));
     return;
   }
   _version = response['version'];
   _handleData(response['data'], this, _author);

   logger.info("Got initial data, synced to version ${_version}");

   // TODO remove the check? (restart/dispose should to sth about initialSynd)
   if (!_initialSync.isCompleted) _initialSync.complete();

   var subscription = _connection
     .sendPeriodically(_forceDataRequesting ?
         _createDataRequest : _createDiffRequest)
     .listen((response) {
       requestLock = false;
       // id data and version was sent, diff is set to null
       if (response['error'] != null) {
         throw new Exception(response['error']);
       }
       if(response['diff'] == null) {
         _version = response['version'];
         _handleData(response['data'], this, _author);
       } else {
         if(!response['diff'].isEmpty) {
           _version = max(_version, _handleDiff(response['diff'], this, _author));
         } else {
             if (response.containsKey('version'))
                _version = response['version'];
         }
       }
     }, onError: (e){
       if (e is! CancelError)throw e;
     });
   _subscriptions.add(subscription);
 });
}

void setupListeners() #

void setupListeners() {
 _subscriptions.add(collection.onBeforeAdd.listen((data) {
   // if data["_id"] is null, it was added by this client and _id should be
   // assigned
   if(data["_id"] == null) {
     data["_id"] = _idGenerator.next();
   }
 }));

 var change = new ChangeSet();

 sendRequest(dynamic elem){
     Future result = _connection.send((){
       assert(_modifiedItems.changedItems.containsKey(elem));
       var req;
       if (_modifiedItems.addedItems.contains(elem)) {
         req = new ClientRequest("sync", {
           "action" : "add",
           "collection" : collectionName,
           "data" : elem,
           'args': args,
           "author" : _author
         });
       }
       if (_modifiedItems.strictlyChanged.containsKey(elem)) {
         req = new ClientRequest("sync", {
           "action" : "change",
           "collection" : collectionName,
           'args': args,
           "_id": elem["_id"],
           "change" : elem,
           "author" : _author
         });
       }
       if (_modifiedItems.removedItems.contains(elem)) {
         req = new ClientRequest("sync", {
           "action" : "remove",
           "collection" : collectionName,
           'args': args,
           "_id" : elem["_id"],
           "author" : _author
         });
       }
       _modifiedItems.changedItems.remove(elem);
       return req;
     }).then((result){
       if (result is Map)
         if (result['error'] != null)
           _errorStreamController.add(result['error']);
       return result;
     }).catchError((e){
       if(e is! CancelError) throw e;
     });

     var id = elem['_id'];
     _sentItems[id] = result;
     result.then((nextVersion){
       // if the request that already finished was last request modifying
       // current field, mark the field as free
       if (_sentItems[id] == result) {
         _sentItems.remove(id);
         // if there are some more changes, sent them
         if (_modifiedItems.changedItems.containsKey(elem)){
           sendRequest(elem);
         };
       }
     });
 };

 _subscriptions.add(collection.onChangeSync.listen((event) {
   if (!this.updateLock) {
     ChangeSet change = event['change'];
     _modifiedItems.mergeIn(change);
     for (var key in change.changedItems.keys) {
       if (!_sentItems.containsKey(key['_id'])) {
         sendRequest(key);
       }
     }
   }
 }));
}

void start() #

void start() {
 setupListeners();
 setupDataRequesting();
}

String toString() #

Returns a string representation of this object.

docs inherited from Object
String toString() => 'Subscription(${_author}, ver: ${_version})';