From 9b88cc418ac0b3845204c7524a858182595b677a Mon Sep 17 00:00:00 2001 From: Wu-Hui Date: Thu, 29 Aug 2019 15:17:47 -0400 Subject: [PATCH 1/7] Ported and delegated --- Firestore/Source/Core/FSTSyncEngine.mm | 564 +----------------- .../src/firebase/firestore/core/sync_engine.h | 288 +++++++++ .../firebase/firestore/core/sync_engine.mm | 498 ++++++++++++++++ 3 files changed, 808 insertions(+), 542 deletions(-) create mode 100644 Firestore/core/src/firebase/firestore/core/sync_engine.h create mode 100644 Firestore/core/src/firebase/firestore/core/sync_engine.mm diff --git a/Firestore/Source/Core/FSTSyncEngine.mm b/Firestore/Source/Core/FSTSyncEngine.mm index 134800a9e65..40be821b359 100644 --- a/Firestore/Source/Core/FSTSyncEngine.mm +++ b/Firestore/Source/Core/FSTSyncEngine.mm @@ -28,6 +28,7 @@ #include "Firestore/core/include/firebase/firestore/firestore_errors.h" #include "Firestore/core/src/firebase/firestore/auth/user.h" +#include "Firestore/core/src/firebase/firestore/core/sync_engine.h" #include "Firestore/core/src/firebase/firestore/core/target_id_generator.h" #include "Firestore/core/src/firebase/firestore/core/transaction.h" #include "Firestore/core/src/firebase/firestore/core/transaction_runner.h" @@ -56,6 +57,7 @@ using firebase::firestore::auth::User; using firebase::firestore::core::LimboDocumentChange; using firebase::firestore::core::Query; +using firebase::firestore::core::SyncEngine; using firebase::firestore::core::SyncEngineCallback; using firebase::firestore::core::TargetIdGenerator; using firebase::firestore::core::Transaction; @@ -94,103 +96,6 @@ NS_ASSUME_NONNULL_BEGIN -// Limbo documents don't use persistence, and are eagerly GC'd. So, listens for them don't need -// real sequence numbers. -static const ListenSequenceNumber kIrrelevantSequenceNumber = -1; - -#pragma mark - FSTQueryView - -/** - * FSTQueryView contains all of the info that FSTSyncEngine needs to track for a particular - * query and view. - */ -@interface FSTQueryView : NSObject - -- (instancetype)initWithQuery:(Query)query - targetID:(TargetId)targetID - resumeToken:(ByteString)resumeToken - view:(core::View)view NS_DESIGNATED_INITIALIZER; - -- (instancetype)init NS_UNAVAILABLE; - -/** The query itself. */ -- (const Query &)query; - -/** The targetID created by the client that is used in the watch stream to identify this query. */ -@property(nonatomic, assign, readonly) TargetId targetID; - -/** - * An identifier from the datastore backend that indicates the last state of the results that - * was received. This can be used to indicate where to continue receiving new doc changes for the - * query. - */ -- (const ByteString &)resumeToken; - -/** - * The view is responsible for computing the final merged truth of what docs are in the query. - * It gets notified of local and remote changes, and applies the query filters and limits to - * determine the most correct possible results. - */ -- (core::View *)view; - -@end - -@implementation FSTQueryView { - Query _query; - ByteString _resumeToken; - util::DelayedConstructor _view; -} - -- (instancetype)initWithQuery:(Query)query - targetID:(TargetId)targetID - resumeToken:(ByteString)resumeToken - view:(View)view { - if (self = [super init]) { - _query = std::move(query); - _targetID = targetID; - _resumeToken = std::move(resumeToken); - _view.Init(std::move(view)); - } - return self; -} - -- (const Query &)query { - return _query; -} - -- (const ByteString &)resumeToken { - return _resumeToken; -} - -- (View *)view { - return &*_view; -} - -@end - -#pragma mark - LimboResolution - -/** Tracks a limbo resolution. */ -class LimboResolution { - public: - LimboResolution() { - } - - explicit LimboResolution(const DocumentKey &key) : key{key} { - } - - DocumentKey key; - - /** - * Set to true once we've received a document. This is used in remoteKeysForTarget and - * ultimately used by `WatchChangeAggregator` to decide whether it needs to manufacture a delete - * event for the target once the target is CURRENT. - */ - bool document_received = false; -}; - -#pragma mark - FSTSyncEngine - @interface FSTSyncEngine () /** The local store, used to persist mutations and cached documents. */ @@ -199,175 +104,41 @@ @interface FSTSyncEngine () @end @implementation FSTSyncEngine { - /** The remote store for sending writes, watches, etc. to the backend. */ - RemoteStore *_remoteStore; - - /** - * A callback to be notified when queries being listened to produce new view snapshots or errors. - */ - SyncEngineCallback *_callback; - - /** Used for creating the TargetId for the listens used to resolve limbo documents. */ - TargetIdGenerator _targetIdGenerator; - - /** Stores user completion blocks, indexed by user and BatchId. */ - std::unordered_map *, HashUser> - _mutationCompletionBlocks; - - /** Stores user callbacks waiting for pending writes to be acknowledged. */ - std::unordered_map> _pendingWritesCallbacks; - - /** FSTQueryViews for all active queries, indexed by query. */ - std::unordered_map _queryViewsByQuery; - - /** FSTQueryViews for all active queries, indexed by target ID. */ - std::unordered_map _queryViewsByTarget; - - /** - * When a document is in limbo, we create a special listen to resolve it. This maps the - * DocumentKey of each limbo document to the TargetId of the listen resolving it. - */ - std::map _limboTargetsByKey; - - /** - * Basically the inverse of limboTargetsByKey, a map of target ID to a LimboResolution (which - * includes the DocumentKey as well as whether we've received a document for the target). - */ - std::map _limboResolutionsByTarget; - - User _currentUser; - - /** Used to track any documents that are currently in limbo. */ - ReferenceSet _limboDocumentRefs; + std::unique_ptr _syncEngine; } - (instancetype)initWithLocalStore:(FSTLocalStore *)localStore remoteStore:(RemoteStore *)remoteStore initialUser:(const User &)initialUser { if (self = [super init]) { - _localStore = localStore; - _remoteStore = remoteStore; - - _targetIdGenerator = TargetIdGenerator::SyncEngineTargetIdGenerator(); - _currentUser = initialUser; + _syncEngine = absl::make_unique(localStore, remoteStore, initialUser); } return self; } - (void)setCallback:(SyncEngineCallback *)callback { - _callback = callback; + _syncEngine->SetCallback(callback); } - (TargetId)listenToQuery:(Query)query { - [self assertCallbackExistsForSelector:_cmd]; - HARD_ASSERT(_queryViewsByQuery.find(query) == _queryViewsByQuery.end(), - "We already listen to query: %s", query.ToString()); - - QueryData queryData = [self.localStore allocateQuery:query]; - ViewSnapshot viewSnapshot = [self initializeViewAndComputeSnapshotForQueryData:queryData]; - _callback->OnViewSnapshots({viewSnapshot}); - - _remoteStore->Listen(queryData); - return queryData.target_id(); -} - -- (ViewSnapshot)initializeViewAndComputeSnapshotForQueryData:(const QueryData &)queryData { - DocumentMap docs = [self.localStore executeQuery:queryData.query()]; - DocumentKeySet remoteKeys = [self.localStore remoteDocumentKeysForTarget:queryData.target_id()]; - - View view(queryData.query(), std::move(remoteKeys)); - ViewDocumentChanges viewDocChanges = view.ComputeDocumentChanges(docs.underlying_map()); - ViewChange viewChange = view.ApplyChanges(viewDocChanges); - HARD_ASSERT(viewChange.limbo_changes().empty(), - "View returned limbo docs before target ack from the server."); - - FSTQueryView *queryView = [[FSTQueryView alloc] initWithQuery:queryData.query() - targetID:queryData.target_id() - resumeToken:queryData.resume_token() - view:view]; - _queryViewsByQuery[queryData.query()] = queryView; - _queryViewsByTarget[queryData.target_id()] = queryView; - - HARD_ASSERT(viewChange.snapshot().has_value(), - "applyChangesToDocuments for new view should always return a snapshot"); - return viewChange.snapshot().value(); + return _syncEngine->Listen(query); } - (void)stopListeningToQuery:(const Query &)query { - [self assertCallbackExistsForSelector:_cmd]; - - FSTQueryView *queryView = _queryViewsByQuery[query]; - HARD_ASSERT(queryView, "Trying to stop listening to a query not found"); - - [self.localStore releaseQuery:query]; - _remoteStore->StopListening(queryView.targetID); - [self removeAndCleanupQuery:queryView]; + _syncEngine->StopListening(query); } - (void)writeMutations:(std::vector &&)mutations completion:(FSTVoidErrorBlock)completion { - [self assertCallbackExistsForSelector:_cmd]; - - LocalWriteResult result = [self.localStore locallyWriteMutations:std::move(mutations)]; - [self addMutationCompletionBlock:completion batchID:result.batch_id()]; - - [self emitNewSnapshotsAndNotifyLocalStoreWithChanges:result.changes() remoteEvent:absl::nullopt]; - _remoteStore->FillWritePipeline(); -} - -- (void)registerPendingWritesCallback:(StatusCallback)callback { - if (!_remoteStore->CanUseNetwork()) { - LOG_DEBUG("The network is disabled. The task returned by 'awaitPendingWrites()' will not " - "complete until the network is enabled."); - } - - int largestPendingBatchId = [self.localStore getHighestUnacknowledgedBatchId]; - - if (largestPendingBatchId == kBatchIdUnknown) { - // Trigger the callback right away if there is no pending writes at the moment. - callback(Status::OK()); - return; - } - - auto it = _pendingWritesCallbacks.find(largestPendingBatchId); - if (it != _pendingWritesCallbacks.end()) { - it->second.push_back(std::move(callback)); - } else { - _pendingWritesCallbacks.emplace(largestPendingBatchId, - std::vector({std::move(callback)})); - } -} - -/** Triggers callbacks waiting for this batch id to get acknowledged by server, if there are any. */ -- (void)triggerPendingWriteCallbacksWithBatchId:(int)batchId { - auto it = _pendingWritesCallbacks.find(batchId); - if (it != _pendingWritesCallbacks.end()) { - for (const auto &callback : it->second) { - callback(Status::OK()); + _syncEngine->WriteMutations(std::move(mutations), [completion](Status status) { + if (completion) { + completion(status.ToNSError()); } - - _pendingWritesCallbacks.erase(it); - } + }); } -- (void)failOutstandingPendingWritesAwaitingCallbacks:(absl::string_view)errorMessage { - for (const auto &entry : _pendingWritesCallbacks) { - for (const auto &callback : entry.second) { - callback(Status(Error::Cancelled, errorMessage)); - } - } - - _pendingWritesCallbacks.clear(); -} - -- (void)addMutationCompletionBlock:(FSTVoidErrorBlock)completion batchID:(BatchId)batchID { - NSMutableDictionary *completionBlocks = - _mutationCompletionBlocks[_currentUser]; - if (!completionBlocks) { - completionBlocks = [NSMutableDictionary dictionary]; - _mutationCompletionBlocks[_currentUser] = completionBlocks; - } - [completionBlocks setObject:completion forKey:@(batchID)]; +- (void)registerPendingWritesCallback:(StatusCallback)callback { + _syncEngine->RegisterPendingWritesCallback(std::move(callback)); } /** @@ -387,330 +158,39 @@ - (void)transactionWithRetries:(int)retries workerQueue:(const std::shared_ptr &)workerQueue updateCallback:(core::TransactionUpdateCallback)updateCallback resultCallback:(core::TransactionResultCallback)resultCallback { - workerQueue->VerifyIsCurrentQueue(); - HARD_ASSERT(retries >= 0, "Got negative number of retries for transaction"); - - // Allocate a shared_ptr so that the TransactionRunner can outlive this frame. - auto runner = std::make_shared(workerQueue, _remoteStore, updateCallback, - resultCallback); - runner->Run(); + _syncEngine->Transaction(retries, workerQueue, updateCallback, resultCallback); } - (void)applyRemoteEvent:(const RemoteEvent &)remoteEvent { - [self assertCallbackExistsForSelector:_cmd]; - - // Update `receivedDocument` as appropriate for any limbo targets. - for (const auto &entry : remoteEvent.target_changes()) { - TargetId targetID = entry.first; - const TargetChange &change = entry.second; - const auto iter = _limboResolutionsByTarget.find(targetID); - if (iter != _limboResolutionsByTarget.end()) { - LimboResolution &limboResolution = iter->second; - // Since this is a limbo resolution lookup, it's for a single document and it could be - // added, modified, or removed, but not a combination. - HARD_ASSERT(change.added_documents().size() + change.modified_documents().size() + - change.removed_documents().size() <= - 1, - "Limbo resolution for single document contains multiple changes."); - - if (change.added_documents().size() > 0) { - limboResolution.document_received = true; - } else if (change.modified_documents().size() > 0) { - HARD_ASSERT(limboResolution.document_received, - "Received change for limbo target document without add."); - } else if (change.removed_documents().size() > 0) { - HARD_ASSERT(limboResolution.document_received, - "Received remove for limbo target document without add."); - limboResolution.document_received = false; - } else { - // This was probably just a CURRENT targetChange or similar. - } - } - } - - MaybeDocumentMap changes = [self.localStore applyRemoteEvent:remoteEvent]; - [self emitNewSnapshotsAndNotifyLocalStoreWithChanges:changes remoteEvent:remoteEvent]; + _syncEngine->HandleRemoteEvent(remoteEvent); } - (void)applyChangedOnlineState:(OnlineState)onlineState { - [self assertCallbackExistsForSelector:_cmd]; - - std::vector newViewSnapshots; - for (const auto &entry : _queryViewsByQuery) { - FSTQueryView *queryView = entry.second; - ViewChange viewChange = queryView.view->ApplyOnlineStateChange(onlineState); - HARD_ASSERT(viewChange.limbo_changes().empty(), - "OnlineState should not affect limbo documents."); - if (viewChange.snapshot().has_value()) { - newViewSnapshots.push_back(*std::move(viewChange).snapshot()); - } - } - - _callback->OnViewSnapshots(std::move(newViewSnapshots)); - _callback->HandleOnlineStateChange(onlineState); + _syncEngine->HandleOnlineStateChange(onlineState); } - (void)rejectListenWithTargetID:(const TargetId)targetID error:(NSError *)error { - [self assertCallbackExistsForSelector:_cmd]; - - const auto iter = _limboResolutionsByTarget.find(targetID); - if (iter != _limboResolutionsByTarget.end()) { - const DocumentKey limboKey = iter->second.key; - // Since this query failed, we won't want to manually unlisten to it. - // So go ahead and remove it from bookkeeping. - _limboTargetsByKey.erase(limboKey); - _limboResolutionsByTarget.erase(targetID); - - // TODO(dimond): Retry on transient errors? - - // It's a limbo doc. Create a synthetic event saying it was deleted. This is kind of a hack. - // Ideally, we would have a method in the local store to purge a document. However, it would - // be tricky to keep all of the local store's invariants with another method. - NoDocument doc(limboKey, SnapshotVersion::None(), /* has_committed_mutations= */ false); - DocumentKeySet limboDocuments = DocumentKeySet{limboKey}; - RemoteEvent event{SnapshotVersion::None(), /*target_changes=*/{}, /*target_mismatches=*/{}, - /*document_updates=*/{{limboKey, doc}}, std::move(limboDocuments)}; - [self applyRemoteEvent:event]; - } else { - auto found = _queryViewsByTarget.find(targetID); - HARD_ASSERT(found != _queryViewsByTarget.end(), "Unknown targetId: %s", targetID); - FSTQueryView *queryView = found->second; - const Query &query = queryView.query; - [self.localStore releaseQuery:query]; - [self removeAndCleanupQuery:queryView]; - if ([self errorIsInteresting:error]) { - LOG_WARN("Listen for query at %s failed: %s", query.path().CanonicalString(), - error.localizedDescription); - } - _callback->OnError(query, Status::FromNSError(error)); - } + _syncEngine->HandleRejectedListen(targetID, Status::FromNSError(error)); } - (void)applySuccessfulWriteWithResult:(const MutationBatchResult &)batchResult { - [self assertCallbackExistsForSelector:_cmd]; - - // The local store may or may not be able to apply the write result and raise events immediately - // (depending on whether the watcher is caught up), so we raise user callbacks first so that they - // consistently happen before listen events. - [self processUserCallbacksForBatchID:batchResult.batch().batch_id() error:nil]; - - [self triggerPendingWriteCallbacksWithBatchId:batchResult.batch().batch_id()]; - - MaybeDocumentMap changes = [self.localStore acknowledgeBatchWithResult:batchResult]; - [self emitNewSnapshotsAndNotifyLocalStoreWithChanges:changes remoteEvent:absl::nullopt]; + _syncEngine->HandleSuccessfulWrite(batchResult); } - (void)rejectFailedWriteWithBatchID:(BatchId)batchID error:(NSError *)error { - [self assertCallbackExistsForSelector:_cmd]; - MaybeDocumentMap changes = [self.localStore rejectBatchID:batchID]; - - if (!changes.empty() && [self errorIsInteresting:error]) { - const DocumentKey &minKey = changes.min()->first; - LOG_WARN("Write at %s failed: %s", minKey.ToString(), error.localizedDescription); - } - - // The local store may or may not be able to apply the write result and raise events immediately - // (depending on whether the watcher is caught up), so we raise user callbacks first so that they - // consistently happen before listen events. - [self processUserCallbacksForBatchID:batchID error:error]; - - [self triggerPendingWriteCallbacksWithBatchId:batchID]; - - [self emitNewSnapshotsAndNotifyLocalStoreWithChanges:changes remoteEvent:absl::nullopt]; -} - -- (void)processUserCallbacksForBatchID:(BatchId)batchID error:(NSError *_Nullable)error { - NSMutableDictionary *completionBlocks = - _mutationCompletionBlocks[_currentUser]; - - // NOTE: Mutations restored from persistence won't have completion blocks, so it's okay for - // this (or the completion below) to be nil. - if (completionBlocks) { - NSNumber *boxedBatchID = @(batchID); - FSTVoidErrorBlock completion = completionBlocks[boxedBatchID]; - if (completion) { - completion(error); - [completionBlocks removeObjectForKey:boxedBatchID]; - } - } -} - -- (void)assertCallbackExistsForSelector:(SEL)methodSelector { - HARD_ASSERT(_callback, "Tried to call '%s' before callback was registered.", - NSStringFromSelector(methodSelector)); -} - -- (void)removeAndCleanupQuery:(FSTQueryView *)queryView { - _queryViewsByQuery.erase(queryView.query); - _queryViewsByTarget.erase(queryView.targetID); - - DocumentKeySet limboKeys = _limboDocumentRefs.ReferencedKeys(queryView.targetID); - _limboDocumentRefs.RemoveReferences(queryView.targetID); - for (const DocumentKey &key : limboKeys) { - if (!_limboDocumentRefs.ContainsKey(key)) { - // We removed the last reference for this key. - [self removeLimboTargetForKey:key]; - } - } -} - -/** - * Computes a new snapshot from the changes and calls the registered callback with the new snapshot. - */ -- (void)emitNewSnapshotsAndNotifyLocalStoreWithChanges:(const MaybeDocumentMap &)changes - remoteEvent:(const absl::optional &) - maybeRemoteEvent { - std::vector newSnapshots; - std::vector documentChangesInAllViews; - - for (const auto &entry : _queryViewsByQuery) { - FSTQueryView *queryView = entry.second; - const View &view = *queryView.view; - ViewDocumentChanges viewDocChanges = view.ComputeDocumentChanges(changes); - if (viewDocChanges.needs_refill()) { - // The query has a limit and some docs were removed/updated, so we need to re-run the - // query against the local store to make sure we didn't lose any good docs that had been - // past the limit. - DocumentMap docs = [self.localStore executeQuery:queryView.query]; - viewDocChanges = view.ComputeDocumentChanges(docs.underlying_map(), viewDocChanges); - } - - absl::optional targetChange; - if (maybeRemoteEvent.has_value()) { - const RemoteEvent &remoteEvent = maybeRemoteEvent.value(); - auto it = remoteEvent.target_changes().find(queryView.targetID); - if (it != remoteEvent.target_changes().end()) { - targetChange = it->second; - } - } - ViewChange viewChange = queryView.view->ApplyChanges(viewDocChanges, targetChange); - - [self updateTrackedLimboDocumentsWithChanges:viewChange.limbo_changes() - targetID:queryView.targetID]; - - if (viewChange.snapshot().has_value()) { - newSnapshots.push_back(*viewChange.snapshot()); - LocalViewChanges docChanges = - LocalViewChanges::FromViewSnapshot(*viewChange.snapshot(), queryView.targetID); - documentChangesInAllViews.push_back(std::move(docChanges)); - } - } - - _callback->OnViewSnapshots(std::move(newSnapshots)); - [self.localStore notifyLocalViewChanges:documentChangesInAllViews]; -} - -/** Updates the limbo document state for the given targetID. */ -- (void)updateTrackedLimboDocumentsWithChanges: - (const std::vector &)limboChanges - targetID:(TargetId)targetID { - for (const LimboDocumentChange &limboChange : limboChanges) { - switch (limboChange.type()) { - case LimboDocumentChange::Type::Added: - _limboDocumentRefs.AddReference(limboChange.key(), targetID); - [self trackLimboChange:limboChange]; - break; - - case LimboDocumentChange::Type::Removed: - LOG_DEBUG("Document no longer in limbo: %s", limboChange.key().ToString()); - _limboDocumentRefs.RemoveReference(limboChange.key(), targetID); - if (!_limboDocumentRefs.ContainsKey(limboChange.key())) { - // We removed the last reference for this key - [self removeLimboTargetForKey:limboChange.key()]; - } - break; - - default: - HARD_FAIL("Unknown limbo change type: %s", limboChange.type()); - } - } -} - -- (void)trackLimboChange:(const LimboDocumentChange &)limboChange { - const DocumentKey &key = limboChange.key(); - - if (_limboTargetsByKey.find(key) == _limboTargetsByKey.end()) { - LOG_DEBUG("New document in limbo: %s", key.ToString()); - TargetId limboTargetID = _targetIdGenerator.NextId(); - Query query(key.path()); - QueryData queryData(std::move(query), limboTargetID, kIrrelevantSequenceNumber, - QueryPurpose::LimboResolution); - _limboResolutionsByTarget.emplace(limboTargetID, LimboResolution{key}); - _remoteStore->Listen(queryData); - _limboTargetsByKey[key] = limboTargetID; - } -} - -- (void)removeLimboTargetForKey:(const DocumentKey &)key { - const auto iter = _limboTargetsByKey.find(key); - if (iter == _limboTargetsByKey.end()) { - // This target already got removed, because the query failed. - return; - } - TargetId limboTargetID = iter->second; - _remoteStore->StopListening(limboTargetID); - _limboTargetsByKey.erase(key); - _limboResolutionsByTarget.erase(limboTargetID); + _syncEngine->HandleRejectedWrite(batchID, Status::FromNSError(error)); } // Used for testing - (std::map)currentLimboDocuments { - // Return defensive copy - return _limboTargetsByKey; + return _syncEngine->GetCurrentLimboDocuments(); } - (void)credentialDidChangeWithUser:(const firebase::firestore::auth::User &)user { - BOOL userChanged = (_currentUser != user); - _currentUser = user; - - if (userChanged) { - // Fails callbacks waiting for pending writes requested by previous user. - [self failOutstandingPendingWritesAwaitingCallbacks: - "'waitForPendingWrites' callback is cancelled due to a user change."]; - // Notify local store and emit any resulting events from swapping out the mutation queue. - MaybeDocumentMap changes = [self.localStore userDidChange:user]; - [self emitNewSnapshotsAndNotifyLocalStoreWithChanges:changes remoteEvent:absl::nullopt]; - } - - // Notify remote store so it can restart its streams. - _remoteStore->HandleCredentialChange(); + _syncEngine->HandleCredentialChange(user); } - - (DocumentKeySet)remoteKeysForTarget:(TargetId)targetId { - const auto iter = _limboResolutionsByTarget.find(targetId); - if (iter != _limboResolutionsByTarget.end() && iter->second.document_received) { - return DocumentKeySet{iter->second.key}; - } else { - auto found = _queryViewsByTarget.find(targetId); - FSTQueryView *queryView = found != _queryViewsByTarget.end() ? found->second : nil; - return queryView ? queryView.view->synced_documents() : DocumentKeySet{}; - } -} - -/** - * Decides if the error likely represents a developer mistake such as forgetting to create an index - * or permission denied. Used to decide whether an error is worth automatically logging as a - * warning. - */ -- (BOOL)errorIsInteresting:(NSError *)error { - if (error.domain == FIRFirestoreErrorDomain) { - if (error.code == FIRFirestoreErrorCodeFailedPrecondition && - [error.localizedDescription containsString:@"requires an index"]) { - return YES; - } else if (error.code == FIRFirestoreErrorCodePermissionDenied) { - return YES; - } - } - - return NO; -} - -- (BOOL)isRetryableTransactionError:(const Status &)error { - // In transactions, the backend will fail outdated reads with FAILED_PRECONDITION and - // non-matching document versions with ABORTED. These errors should be retried. - Error code = error.code(); - return code == Error::Aborted || code == Error::FailedPrecondition || - !Datastore::IsPermanentError(error); + return _syncEngine->GetRemoteKeys(targetId); } @end diff --git a/Firestore/core/src/firebase/firestore/core/sync_engine.h b/Firestore/core/src/firebase/firestore/core/sync_engine.h new file mode 100644 index 00000000000..de1c4523e21 --- /dev/null +++ b/Firestore/core/src/firebase/firestore/core/sync_engine.h @@ -0,0 +1,288 @@ +/* + * Copyright 2019 Google + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FIRESTORE_CORE_SRC_FIREBASE_FIRESTORE_CORE_SYNC_ENGINE_H_ +#define FIRESTORE_CORE_SRC_FIREBASE_FIRESTORE_CORE_SYNC_ENGINE_H_ + +#include +#include +#include +#include +#include +#include + +#import "Firestore/Source/Local/FSTLocalStore.h" + +#include "Firestore/core/src/firebase/firestore/core/query.h" +#include "Firestore/core/src/firebase/firestore/core/sync_engine_callback.h" +#include "Firestore/core/src/firebase/firestore/core/target_id_generator.h" +#include "Firestore/core/src/firebase/firestore/core/view.h" +#include "Firestore/core/src/firebase/firestore/core/view_snapshot.h" +#include "Firestore/core/src/firebase/firestore/local/query_data.h" +#include "Firestore/core/src/firebase/firestore/local/reference_set.h" +#include "Firestore/core/src/firebase/firestore/model/document_key_set.h" +#include "Firestore/core/src/firebase/firestore/model/maybe_document.h" +#include "Firestore/core/src/firebase/firestore/remote/remote_store.h" +#include "Firestore/core/src/firebase/firestore/util/delayed_constructor.h" +#include "Firestore/core/src/firebase/firestore/util/status.h" + +namespace firebase { +namespace firestore { +namespace core { + +/** + * SyncEngine is the central controller in the client SDK architecture. It is + * the glue code between the EventManager, LocalStore, and RemoteStore. Some of + * SyncEngine's responsibilities include: + * 1. Coordinating client requests and remote events between the EventManager + * and the local and remote data stores. + * 2. Managing a View object for each query, providing the unified view between + * the local and remote data stores. + * 3. Notifying the RemoteStore when the LocalStore has new mutations in its + * queue that need sending to the backend. + * + * The SyncEngine’s methods should only ever be called by methods running on our + * own worker queue. + */ +class SyncEngine { + public: + SyncEngine(FSTLocalStore* local_store, + remote::RemoteStore* remote_store, + const auth::User initial_user); + + void SetCallback(SyncEngineCallback* callback) { + sync_engine_callback_ = callback; + } + + /** + * Initiates a new listen. The FSTLocalStore will be queried for initial data + * and the listen will be sent to the `RemoteStore` to get remote data. The + * registered SyncEngineCallback will be notified of resulting view + * snapshots and/or listen errors. + * + * @return the target ID assigned to the query. + */ + model::TargetId Listen(Query query); + + /** Stops listening to a query previously listened to via `Listen`. */ + void StopListening(const Query& query); + + /** + * Initiates the write of local mutation batch which involves adding the + * writes to the mutation queue, notifying the remote store about new + * mutations, and raising events for any changes this write caused. The + * provided callback will be called once the write has been acked or + * rejected by the backend (or failed locally for any other reason). + */ + void WriteMutations(std::vector&& mutations, + util::StatusCallback callback); + + /** + * Registers a user callback that is called when all pending mutations at the + * moment of calling are acknowledged . + */ + void RegisterPendingWritesCallback(util::StatusCallback callback); + + /** + * Runs the given transaction block up to retries times and then calls + * completion. + * + * @param retries The number of times to try before giving up. + * @param worker_queue The queue to dispatch sync engine calls to. + * @param update_callback The callback to call to execute the user's + * transaction. + * @param result_callback The callback to call when the transaction is + * finished or failed. + */ + void Transaction(int retries, + const std::shared_ptr& worker_queue, + core::TransactionUpdateCallback update_callback, + core::TransactionResultCallback result_callback); + + void HandleCredentialChange(const auth::User& user); + + // Implements `RemoteStoreCallback` + void HandleRemoteEvent(const remote::RemoteEvent& remote_event); + void HandleRejectedListen(model::TargetId target_id, util::Status error); + void HandleSuccessfulWrite(const model::MutationBatchResult& batch_result); + void HandleRejectedWrite(firebase::firestore::model::BatchId batchID, + util::Status error); + void HandleOnlineStateChange(model::OnlineState online_state); + model::DocumentKeySet GetRemoteKeys(model::TargetId targetId); + + // For tests only + std::map GetCurrentLimboDocuments() { + // Return defensive copy + return limbo_targets_by_key_; + } + + private: + /** + * QueryView contains all of the info that SyncEngine needs to track for a + * particular query and view. + */ + class QueryView { + public: + QueryView(Query query, + model::TargetId target_id, + nanopb::ByteString resume_token, + View view) + : query_(std::move(query)), + target_id_(target_id), + resume_token_(std::move(resume_token)), + view_(std::move(view)) { + } + + const Query& query() { + return query_; + } + + /** + * The target ID created by the client that is used in the watch stream to + * identify this query. + */ + model::TargetId target_id() { + return target_id_; + } + + /** + * An identifier from the datastore backend that indicates the last state of + * the results that was received. This can be used to indicate where to + * continue receiving new doc changes for the query. + */ + const nanopb::ByteString resume_token() { + return resume_token_; + } + + /** + * The view is responsible for computing the final merged truth of what docs + * are in the query. It gets notified of local and remote changes, and + * applies the query filters and limits to determine the most correct + * possible results. + */ + View* view() { + return &view_; + } + + private: + Query query_; + model::TargetId target_id_; + nanopb::ByteString resume_token_; + View view_; + }; + + /** Tracks a limbo resolution. */ + class LimboResolution { + public: + LimboResolution() { + } + + explicit LimboResolution(const model::DocumentKey& key) : key{key} { + } + + model::DocumentKey key; + + /** + * Set to true once we've received a document. This is used in + * remoteKeysForTarget and ultimately used by `WatchChangeAggregator` to + * decide whether it needs to manufacture a delete event for the target once + * the target is CURRENT. + */ + bool document_received = false; + }; + + void AssertCallbackExists(std::string source); + + ViewSnapshot InitializeViewAndComputeSnapshot( + const local::QueryData& query_data); + + void RemoveAndCleanupQuery(const std::shared_ptr& query_view); + + void RemoveLimboTarget(const model::DocumentKey& key); + + void EmitNewSnapshotsAndNotifyLocalStore( + const model::MaybeDocumentMap& changes, + const absl::optional& maybe_remote_event); + + /** Updates the limbo document state for the given targetID. */ + void UpdateTrackedLimboDocuments( + const std::vector& limbo_changes, + model::TargetId target_id); + + void TrackLimboChange(const LimboDocumentChange& limbo_change); + + void NotifyUser(model::BatchId batch_id, util::Status status); + + /** Triggers callbacks waiting for this batch id to get acknowledged by + * server, if there are any. */ + void TriggerPendingWriteCallbacks(model::BatchId batch_id); + void FailOutstandingPendingWriteCallbacks(absl::string_view message); + + bool ErrorIsInteresting(util::Status error); + + /** The local store, used to persist mutations and cached documents. */ + FSTLocalStore* local_store_; + + /** The remote store for sending writes, watches, etc. to the backend. */ + remote::RemoteStore* remote_store_; + + auth::User current_user_; + SyncEngineCallback* sync_engine_callback_; + + /** Used for creating the TargetId for the listens used to resolve limbo + * documents. */ + TargetIdGenerator target_id_generator_; + + /** Stores user completion blocks, indexed by user and BatchId. */ + std::unordered_map, + auth::HashUser> + mutation_callbacks_; + + /** Stores user callbacks waiting for pending writes to be acknowledged. */ + std::unordered_map> + pending_writes_callbacks_; + + /** QueryViews for all active queries, indexed by query. */ + std::unordered_map> query_views_by_query_; + + /** QueryViews for all active queries, indexed by target ID. */ + std::unordered_map> + query_views_by_target_; + + /** + * When a document is in limbo, we create a special listen to resolve it. This + * maps the DocumentKey of each limbo document to the TargetId of the listen + * resolving it. + */ + std::map limbo_targets_by_key_; + + /** + * Basically the inverse of limbo_targets_by_key_, a map of target ID to a + * LimboResolution (which includes the DocumentKey as well as whether we've + * received a document for the target). + */ + std::map limbo_resolutions_by_target_; + + /** Used to track any documents that are currently in limbo. */ + local::ReferenceSet limbo_document_refs_; +}; + +} // namespace core +} // namespace firestore +} // namespace firebase + +#endif // FIRESTORE_CORE_SRC_FIREBASE_FIRESTORE_CORE_SYNC_ENGINE_H_ diff --git a/Firestore/core/src/firebase/firestore/core/sync_engine.mm b/Firestore/core/src/firebase/firestore/core/sync_engine.mm new file mode 100644 index 00000000000..41190b812e1 --- /dev/null +++ b/Firestore/core/src/firebase/firestore/core/sync_engine.mm @@ -0,0 +1,498 @@ +/* + * Copyright 2019 Google + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "Firestore/core/src/firebase/firestore/core/sync_engine.h" + +#include "Firestore/core/include/firebase/firestore/firestore_errors.h" +#include "Firestore/core/src/firebase/firestore/core/transaction.h" +#include "Firestore/core/src/firebase/firestore/core/transaction_runner.h" +#include "Firestore/core/src/firebase/firestore/local/query_data.h" +#include "Firestore/core/src/firebase/firestore/model/document_key.h" +#include "Firestore/core/src/firebase/firestore/model/document_key_set.h" +#include "Firestore/core/src/firebase/firestore/model/document_map.h" +#include "Firestore/core/src/firebase/firestore/model/document_set.h" +#include "Firestore/core/src/firebase/firestore/model/no_document.h" +#include "Firestore/core/src/firebase/firestore/util/async_queue.h" +#include "Firestore/core/src/firebase/firestore/util/log.h" +#include "Firestore/core/src/firebase/firestore/util/status.h" + +using auth::User; +using local::LocalViewChanges; +using local::LocalWriteResult; +using local::QueryData; +using local::QueryPurpose; +using model::BatchId; +using model::DocumentKey; +using model::DocumentKeySet; +using model::DocumentMap; +using model::kBatchIdUnknown; +using model::ListenSequenceNumber; +using model::MaybeDocumentMap; +using model::NoDocument; +using model::SnapshotVersion; +using model::TargetId; +using remote::RemoteEvent; +using remote::TargetChange; +// using util::AsyncQueue; +// using util::StatusCallback; + +namespace firebase { +namespace firestore { +namespace core { + +// Limbo documents don't use persistence, and are eagerly GC'd. So, listens for +// them don't need real sequence numbers. +static const ListenSequenceNumber kIrrelevantSequenceNumber = -1; + +SyncEngine::SyncEngine(FSTLocalStore* local_store, + remote::RemoteStore* remote_store, + const auth::User initial_user) + : local_store_(local_store), + remote_store_(remote_store), + current_user_(initial_user), + target_id_generator_(TargetIdGenerator::SyncEngineTargetIdGenerator()) { +} + +void SyncEngine::AssertCallbackExists(std::string source) { + HARD_ASSERT(sync_engine_callback_, + "Tried to call '%s' before callback was registered.", source); +} + +TargetId SyncEngine::Listen(Query query) { + AssertCallbackExists("Listen"); + HARD_ASSERT(query_views_by_query_.find(query) == query_views_by_query_.end(), + "We already listen to query: %s", query.ToString()); + + QueryData queryData = [local_store_ allocateQuery:query]; + ViewSnapshot viewSnapshot = InitializeViewAndComputeSnapshot(queryData); + sync_engine_callback_->OnViewSnapshots({viewSnapshot}); + + remote_store_->Listen(queryData); + return queryData.target_id(); +} + +ViewSnapshot SyncEngine::InitializeViewAndComputeSnapshot( + const local::QueryData& query_data) { + DocumentMap docs = [local_store_ executeQuery:query_data.query()]; + DocumentKeySet remote_keys = + [local_store_ remoteDocumentKeysForTarget:query_data.target_id()]; + + View view(query_data.query(), std::move(remote_keys)); + ViewDocumentChanges view_doc_changes = + view.ComputeDocumentChanges(docs.underlying_map()); + ViewChange view_change = view.ApplyChanges(view_doc_changes); + HARD_ASSERT(view_change.limbo_changes().empty(), + "View returned limbo docs before target ack from the server."); + + auto query_view = + std::make_shared(query_data.query(), query_data.target_id(), + query_data.resume_token(), view); + query_views_by_query_[query_data.query()] = query_view; + query_views_by_target_[query_data.target_id()] = query_view; + + HARD_ASSERT( + view_change.snapshot().has_value(), + "ApplyChanges to documents for new view should always return a snapshot"); + return view_change.snapshot().value(); +} + +void SyncEngine::StopListening(const Query& query) { + AssertCallbackExists("StopListening"); + const std::shared_ptr& query_view = query_views_by_query_[query]; + HARD_ASSERT(query_view, "Trying to stop listening to a query not found"); + + [local_store_ releaseQuery:query]; + remote_store_->StopListening(query_view->target_id()); + RemoveAndCleanupQuery(query_view); +} + +void SyncEngine::RemoveAndCleanupQuery( + const std::shared_ptr& query_view) { + query_views_by_query_.erase(query_view->query()); + query_views_by_target_.erase(query_view->target_id()); + + DocumentKeySet limbo_keys = + limbo_document_refs_.ReferencedKeys(query_view->target_id()); + limbo_document_refs_.RemoveReferences(query_view->target_id()); + for (const DocumentKey& key : limbo_keys) { + if (!limbo_document_refs_.ContainsKey(key)) { + // We removed the last reference for this key. + RemoveLimboTarget(key); + } + } +} + +void SyncEngine::WriteMutations(std::vector&& mutations, + util::StatusCallback callback) { + AssertCallbackExists("WriteMutations"); + + LocalWriteResult result = + [local_store_ locallyWriteMutations:std::move(mutations)]; + mutation_callbacks_[current_user_].insert( + std::make_pair(result.batch_id(), callback)); + + EmitNewSnapshotsAndNotifyLocalStore(result.changes(), absl::nullopt); + remote_store_->FillWritePipeline(); +} + +void SyncEngine::RegisterPendingWritesCallback(util::StatusCallback callback) { + if (!remote_store_->CanUseNetwork()) { + LOG_DEBUG("The network is disabled. The task returned by " + "'waitForPendingWrites()' will not " + "complete until the network is enabled."); + } + + int largest_pending_batch_id = [local_store_ getHighestUnacknowledgedBatchId]; + + if (largest_pending_batch_id == kBatchIdUnknown) { + // Trigger the callback right away if there is no pending writes at the + // moment. + callback(util::Status::OK()); + return; + } + + pending_writes_callbacks_[largest_pending_batch_id].push_back( + std::move(callback)); +} + +void SyncEngine::Transaction( + int retries, + const std::shared_ptr& worker_queue, + TransactionUpdateCallback update_callback, + TransactionResultCallback result_callback) { + worker_queue->VerifyIsCurrentQueue(); + HARD_ASSERT(retries >= 0, "Got negative number of retries for transaction"); + + // Allocate a shared_ptr so that the TransactionRunner can outlive this frame. + auto runner = std::make_shared( + worker_queue, remote_store_, update_callback, result_callback); + runner->Run(); +} + +void SyncEngine::HandleCredentialChange(const auth::User& user) { + bool user_changed = (current_user_ != user); + current_user_ = user; + + if (user_changed) { + // Fails callbacks waiting for pending writes requested by previous user. + FailOutstandingPendingWriteCallbacks( + "'waitForPendingWrites' callback is cancelled due to a user change."); + // Notify local store and emit any resulting events from swapping out the + // mutation queue. + MaybeDocumentMap changes = [local_store_ userDidChange:user]; + EmitNewSnapshotsAndNotifyLocalStore(changes, absl::nullopt); + } + + // Notify remote store so it can restart its streams. + remote_store_->HandleCredentialChange(); +} + +void SyncEngine::HandleRemoteEvent(const RemoteEvent& remote_event) { + AssertCallbackExists("HandleRemoteEvent"); + + // Update received document as appropriate for any limbo targets. + for (const auto& entry : remote_event.target_changes()) { + TargetId target_id = entry.first; + const TargetChange& change = entry.second; + const auto iter = limbo_resolutions_by_target_.find(target_id); + if (iter != limbo_resolutions_by_target_.end()) { + LimboResolution& limbo_resolution = iter->second; + // Since this is a limbo resolution lookup, it's for a single document and + // it could be added, modified, or removed, but not a combination. + HARD_ASSERT( + change.added_documents().size() + change.modified_documents().size() + + change.removed_documents().size() <= + 1, + "Limbo resolution for single document contains multiple changes."); + + if (change.added_documents().size() > 0) { + limbo_resolution.document_received = true; + } else if (change.modified_documents().size() > 0) { + HARD_ASSERT(limbo_resolution.document_received, + "Received change for limbo target document without add."); + } else if (change.removed_documents().size() > 0) { + HARD_ASSERT(limbo_resolution.document_received, + "Received remove for limbo target document without add."); + limbo_resolution.document_received = false; + } else { + // This was probably just a CURRENT target change or similar. + } + } + } + + MaybeDocumentMap changes = [local_store_ applyRemoteEvent:remote_event]; + EmitNewSnapshotsAndNotifyLocalStore(changes, remote_event); +} + +void SyncEngine::HandleRejectedListen(TargetId target_id, util::Status error) { + AssertCallbackExists("HandleRejectedListen"); + + const auto iter = limbo_resolutions_by_target_.find(target_id); + if (iter != limbo_resolutions_by_target_.end()) { + const DocumentKey limbo_key = iter->second.key; + // Since this query failed, we won't want to manually unlisten to it. + // So go ahead and remove it from bookkeeping. + limbo_targets_by_key_.erase(limbo_key); + limbo_resolutions_by_target_.erase(target_id); + + // TODO(dimond): Retry on transient errors? + + // It's a limbo doc. Create a synthetic event saying it was deleted. This is + // kind of a hack. Ideally, we would have a method in the local store to + // purge a document. However, it would be tricky to keep all of the local + // store's invariants with another method. + NoDocument doc(limbo_key, SnapshotVersion::None(), + /* has_committed_mutations= */ false); + DocumentKeySet limbo_documents = DocumentKeySet{limbo_key}; + RemoteEvent event{SnapshotVersion::None(), /*target_changes=*/{}, + /*target_mismatches=*/{}, + /*document_updates=*/{{limbo_key, doc}}, + std::move(limbo_documents)}; + HandleRemoteEvent(event); + } else { + auto found = query_views_by_target_.find(target_id); + HARD_ASSERT(found != query_views_by_target_.end(), "Unknown target id: %s", + target_id); + auto query_view = found->second; + const Query& query = query_view->query(); + [local_store_ releaseQuery:query]; + RemoveAndCleanupQuery(query_view); + if (ErrorIsInteresting(error)) { + LOG_WARN("Listen for query at %s failed: %s", + query.path().CanonicalString(), error.error_message()); + } + sync_engine_callback_->OnError(query, error); + } +} + +void SyncEngine::HandleSuccessfulWrite( + const model::MutationBatchResult& batch_result) { + AssertCallbackExists("HandleSuccessfulWrite"); + + // The local store may or may not be able to apply the write result and raise + // events immediately (depending on whether the watcher is caught up), so we + // raise user callbacks first so that they consistently happen before listen + // events. + NotifyUser(batch_result.batch().batch_id(), util::Status::OK()); + + TriggerPendingWriteCallbacks(batch_result.batch().batch_id()); + + MaybeDocumentMap changes = + [local_store_ acknowledgeBatchWithResult:batch_result]; + EmitNewSnapshotsAndNotifyLocalStore(changes, absl::nullopt); +} + +void SyncEngine::HandleRejectedWrite( + firebase::firestore::model::BatchId batch_id, util::Status error) { + AssertCallbackExists("HandleRejectedWrite"); + MaybeDocumentMap changes = [local_store_ rejectBatchID:batch_id]; + + if (!changes.empty() && ErrorIsInteresting(error)) { + const DocumentKey& min_key = changes.min()->first; + LOG_WARN("Write at %s failed: %s", min_key.ToString(), + error.error_message()); + } + + // The local store may or may not be able to apply the write result and raise + // events immediately (depending on whether the watcher is caught up), so we + // raise user callbacks first so that they consistently happen before listen + // events. + NotifyUser(batch_id, error); + + TriggerPendingWriteCallbacks(batch_id); + + EmitNewSnapshotsAndNotifyLocalStore(changes, absl::nullopt); +} + +void SyncEngine::HandleOnlineStateChange(model::OnlineState online_state) { + AssertCallbackExists("HandleOnlineStateChange"); + + std::vector new_view_snapshot; + for (const auto& entry : query_views_by_query_) { + auto& query_view = entry.second; + ViewChange view_change = + query_view->view()->ApplyOnlineStateChange(online_state); + HARD_ASSERT(view_change.limbo_changes().empty(), + "OnlineState should not affect limbo documents."); + if (view_change.snapshot().has_value()) { + new_view_snapshot.push_back(*std::move(view_change).snapshot()); + } + } + + sync_engine_callback_->OnViewSnapshots(std::move(new_view_snapshot)); + sync_engine_callback_->HandleOnlineStateChange(online_state); +} + +DocumentKeySet SyncEngine::GetRemoteKeys(TargetId target_id) { + const auto iter = limbo_resolutions_by_target_.find(target_id); + if (iter != limbo_resolutions_by_target_.end() && + iter->second.document_received) { + return DocumentKeySet{iter->second.key}; + } else { + auto found = query_views_by_target_.find(target_id); + if (found != query_views_by_target_.end() && found->second) { + return found->second->view()->synced_documents(); + } + return DocumentKeySet{}; + } +} + +void SyncEngine::NotifyUser(BatchId batch_id, util::Status status) { + auto it = mutation_callbacks_.find(current_user_); + + // NOTE: Mutations restored from persistence won't have callbacks, so + // it's okay for this (or the callback below) to not exist. + if (it != mutation_callbacks_.end()) { + std::unordered_map& callbacks = it->second; + auto callback_it = callbacks.find(batch_id); + if (callback_it != callbacks.end()) { + callback_it->second(status); + callbacks.erase(callback_it); + } + } +} + +void SyncEngine::TriggerPendingWriteCallbacks(BatchId batch_id) { + auto it = pending_writes_callbacks_.find(batch_id); + if (it != pending_writes_callbacks_.end()) { + for (const auto& callback : it->second) { + callback(util::Status::OK()); + } + + pending_writes_callbacks_.erase(it); + } +} + +void SyncEngine::FailOutstandingPendingWriteCallbacks( + absl::string_view message) { + for (const auto& entry : pending_writes_callbacks_) { + for (const auto& callback : entry.second) { + callback(util::Status(Error::Cancelled, message)); + } + } + + pending_writes_callbacks_.clear(); +} + +bool SyncEngine::ErrorIsInteresting(util::Status error) { + bool missing_index = + (error.code() == Error::FailedPrecondition && + error.error_message().find("requires an index") != std::string::npos); + bool no_permission = (error.code() == Error::PermissionDenied); + return missing_index || no_permission; +} + +void SyncEngine::EmitNewSnapshotsAndNotifyLocalStore( + const MaybeDocumentMap& changes, + const absl::optional& maybe_remote_event) { + std::vector new_snapshots; + std::vector document_changes_in_all_views; + + for (const auto& entry : query_views_by_query_) { + auto& query_view = entry.second; + const View& view = *query_view->view(); + ViewDocumentChanges view_doc_changes = view.ComputeDocumentChanges(changes); + if (view_doc_changes.needs_refill()) { + // The query has a limit and some docs were removed/updated, so we need to + // re-run the query against the local store to make sure we didn't lose + // any good docs that had been past the limit. + DocumentMap docs = [local_store_ executeQuery:query_view->query()]; + view_doc_changes = + view.ComputeDocumentChanges(docs.underlying_map(), view_doc_changes); + } + + absl::optional target_changes; + if (maybe_remote_event.has_value()) { + const RemoteEvent& remote_event = maybe_remote_event.value(); + auto it = remote_event.target_changes().find(query_view->target_id()); + if (it != remote_event.target_changes().end()) { + target_changes = it->second; + } + } + ViewChange view_change = + query_view->view()->ApplyChanges(view_doc_changes, target_changes); + + UpdateTrackedLimboDocuments(view_change.limbo_changes(), + query_view->target_id()); + + if (view_change.snapshot().has_value()) { + new_snapshots.push_back(*view_change.snapshot()); + LocalViewChanges doc_changes = LocalViewChanges::FromViewSnapshot( + *view_change.snapshot(), query_view->target_id()); + document_changes_in_all_views.push_back(std::move(doc_changes)); + } + } + + sync_engine_callback_->OnViewSnapshots(std::move(new_snapshots)); + [local_store_ notifyLocalViewChanges:document_changes_in_all_views]; +} + +void SyncEngine::UpdateTrackedLimboDocuments( + const std::vector& limbo_changes, TargetId target_id) { + for (const LimboDocumentChange& limbo_change : limbo_changes) { + switch (limbo_change.type()) { + case LimboDocumentChange::Type::Added: + limbo_document_refs_.AddReference(limbo_change.key(), target_id); + TrackLimboChange(limbo_change); + break; + + case LimboDocumentChange::Type::Removed: + LOG_DEBUG("Document no longer in limbo: %s", + limbo_change.key().ToString()); + limbo_document_refs_.RemoveReference(limbo_change.key(), target_id); + if (!limbo_document_refs_.ContainsKey(limbo_change.key())) { + // We removed the last reference for this key + RemoveLimboTarget(limbo_change.key()); + } + break; + + default: + HARD_FAIL("Unknown limbo change type: %s", limbo_change.type()); + } + } +} + +void SyncEngine::TrackLimboChange(const LimboDocumentChange& limbo_change) { + const DocumentKey& key = limbo_change.key(); + + if (limbo_targets_by_key_.find(key) == limbo_targets_by_key_.end()) { + LOG_DEBUG("New document in limbo: %s", key.ToString()); + TargetId limbo_target_id = target_id_generator_.NextId(); + Query query(key.path()); + QueryData query_data(std::move(query), limbo_target_id, + kIrrelevantSequenceNumber, + QueryPurpose::LimboResolution); + limbo_resolutions_by_target_.emplace(limbo_target_id, LimboResolution{key}); + remote_store_->Listen(query_data); + limbo_targets_by_key_[key] = limbo_target_id; + } +} + +void SyncEngine::RemoveLimboTarget(const DocumentKey& key) { + const auto iter = limbo_targets_by_key_.find(key); + if (iter == limbo_targets_by_key_.end()) { + // This target already got removed, because the query failed. + return; + } + TargetId limbo_target_id = iter->second; + remote_store_->StopListening(limbo_target_id); + limbo_targets_by_key_.erase(key); + limbo_resolutions_by_target_.erase(limbo_target_id); +} + +} // namespace core +} // namespace firestore +} // namespace firebase From b6c3e3162b0759f8d20cbb69481bb91530bcf3a0 Mon Sep 17 00:00:00 2001 From: Wu-Hui Date: Thu, 29 Aug 2019 15:26:47 -0400 Subject: [PATCH 2/7] using util --- .../firebase/firestore/core/sync_engine.mm | 43 +++++++++---------- 1 file changed, 21 insertions(+), 22 deletions(-) diff --git a/Firestore/core/src/firebase/firestore/core/sync_engine.mm b/Firestore/core/src/firebase/firestore/core/sync_engine.mm index 41190b812e1..2f22112f724 100644 --- a/Firestore/core/src/firebase/firestore/core/sync_engine.mm +++ b/Firestore/core/src/firebase/firestore/core/sync_engine.mm @@ -29,6 +29,9 @@ #include "Firestore/core/src/firebase/firestore/util/log.h" #include "Firestore/core/src/firebase/firestore/util/status.h" +namespace firebase { +namespace firestore { +namespace core { using auth::User; using local::LocalViewChanges; using local::LocalWriteResult; @@ -46,12 +49,9 @@ using model::TargetId; using remote::RemoteEvent; using remote::TargetChange; -// using util::AsyncQueue; -// using util::StatusCallback; - -namespace firebase { -namespace firestore { -namespace core { +using util::AsyncQueue; +using util::Status; +using util::StatusCallback; // Limbo documents don't use persistence, and are eagerly GC'd. So, listens for // them don't need real sequence numbers. @@ -136,7 +136,7 @@ } void SyncEngine::WriteMutations(std::vector&& mutations, - util::StatusCallback callback) { + StatusCallback callback) { AssertCallbackExists("WriteMutations"); LocalWriteResult result = @@ -148,7 +148,7 @@ remote_store_->FillWritePipeline(); } -void SyncEngine::RegisterPendingWritesCallback(util::StatusCallback callback) { +void SyncEngine::RegisterPendingWritesCallback(StatusCallback callback) { if (!remote_store_->CanUseNetwork()) { LOG_DEBUG("The network is disabled. The task returned by " "'waitForPendingWrites()' will not " @@ -160,7 +160,7 @@ if (largest_pending_batch_id == kBatchIdUnknown) { // Trigger the callback right away if there is no pending writes at the // moment. - callback(util::Status::OK()); + callback(Status::OK()); return; } @@ -168,11 +168,10 @@ std::move(callback)); } -void SyncEngine::Transaction( - int retries, - const std::shared_ptr& worker_queue, - TransactionUpdateCallback update_callback, - TransactionResultCallback result_callback) { +void SyncEngine::Transaction(int retries, + const std::shared_ptr& worker_queue, + TransactionUpdateCallback update_callback, + TransactionResultCallback result_callback) { worker_queue->VerifyIsCurrentQueue(); HARD_ASSERT(retries >= 0, "Got negative number of retries for transaction"); @@ -237,7 +236,7 @@ EmitNewSnapshotsAndNotifyLocalStore(changes, remote_event); } -void SyncEngine::HandleRejectedListen(TargetId target_id, util::Status error) { +void SyncEngine::HandleRejectedListen(TargetId target_id, Status error) { AssertCallbackExists("HandleRejectedListen"); const auto iter = limbo_resolutions_by_target_.find(target_id); @@ -286,7 +285,7 @@ NoDocument doc(limbo_key, SnapshotVersion::None(), // events immediately (depending on whether the watcher is caught up), so we // raise user callbacks first so that they consistently happen before listen // events. - NotifyUser(batch_result.batch().batch_id(), util::Status::OK()); + NotifyUser(batch_result.batch().batch_id(), Status::OK()); TriggerPendingWriteCallbacks(batch_result.batch().batch_id()); @@ -296,7 +295,7 @@ NoDocument doc(limbo_key, SnapshotVersion::None(), } void SyncEngine::HandleRejectedWrite( - firebase::firestore::model::BatchId batch_id, util::Status error) { + firebase::firestore::model::BatchId batch_id, Status error) { AssertCallbackExists("HandleRejectedWrite"); MaybeDocumentMap changes = [local_store_ rejectBatchID:batch_id]; @@ -350,13 +349,13 @@ NoDocument doc(limbo_key, SnapshotVersion::None(), } } -void SyncEngine::NotifyUser(BatchId batch_id, util::Status status) { +void SyncEngine::NotifyUser(BatchId batch_id, Status status) { auto it = mutation_callbacks_.find(current_user_); // NOTE: Mutations restored from persistence won't have callbacks, so // it's okay for this (or the callback below) to not exist. if (it != mutation_callbacks_.end()) { - std::unordered_map& callbacks = it->second; + std::unordered_map& callbacks = it->second; auto callback_it = callbacks.find(batch_id); if (callback_it != callbacks.end()) { callback_it->second(status); @@ -369,7 +368,7 @@ NoDocument doc(limbo_key, SnapshotVersion::None(), auto it = pending_writes_callbacks_.find(batch_id); if (it != pending_writes_callbacks_.end()) { for (const auto& callback : it->second) { - callback(util::Status::OK()); + callback(Status::OK()); } pending_writes_callbacks_.erase(it); @@ -380,14 +379,14 @@ NoDocument doc(limbo_key, SnapshotVersion::None(), absl::string_view message) { for (const auto& entry : pending_writes_callbacks_) { for (const auto& callback : entry.second) { - callback(util::Status(Error::Cancelled, message)); + callback(Status(Error::Cancelled, message)); } } pending_writes_callbacks_.clear(); } -bool SyncEngine::ErrorIsInteresting(util::Status error) { +bool SyncEngine::ErrorIsInteresting(Status error) { bool missing_index = (error.code() == Error::FailedPrecondition && error.error_message().find("requires an index") != std::string::npos); From 4336ece946a9d27e5504db96937b316a1bc62889 Mon Sep 17 00:00:00 2001 From: Wu-Hui Date: Thu, 29 Aug 2019 23:31:32 -0400 Subject: [PATCH 3/7] use shared_ptr by value --- .../core/src/firebase/firestore/core/sync_engine.h | 12 ++++++++---- .../core/src/firebase/firestore/core/sync_engine.mm | 6 +++--- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/Firestore/core/src/firebase/firestore/core/sync_engine.h b/Firestore/core/src/firebase/firestore/core/sync_engine.h index de1c4523e21..3d3bfd705e0 100644 --- a/Firestore/core/src/firebase/firestore/core/sync_engine.h +++ b/Firestore/core/src/firebase/firestore/core/sync_engine.h @@ -226,8 +226,10 @@ class SyncEngine { void NotifyUser(model::BatchId batch_id, util::Status status); - /** Triggers callbacks waiting for this batch id to get acknowledged by - * server, if there are any. */ + /** + * Triggers callbacks waiting for this batch id to get acknowledged by + * server, if there are any. + */ void TriggerPendingWriteCallbacks(model::BatchId batch_id); void FailOutstandingPendingWriteCallbacks(absl::string_view message); @@ -242,8 +244,10 @@ class SyncEngine { auth::User current_user_; SyncEngineCallback* sync_engine_callback_; - /** Used for creating the TargetId for the listens used to resolve limbo - * documents. */ + /** + * Used for creating the TargetId for the listens used to resolve limbo + * documents. + */ TargetIdGenerator target_id_generator_; /** Stores user completion blocks, indexed by user and BatchId. */ diff --git a/Firestore/core/src/firebase/firestore/core/sync_engine.mm b/Firestore/core/src/firebase/firestore/core/sync_engine.mm index 2f22112f724..58f1f2f645d 100644 --- a/Firestore/core/src/firebase/firestore/core/sync_engine.mm +++ b/Firestore/core/src/firebase/firestore/core/sync_engine.mm @@ -111,7 +111,7 @@ void SyncEngine::StopListening(const Query& query) { AssertCallbackExists("StopListening"); - const std::shared_ptr& query_view = query_views_by_query_[query]; + auto query_view = query_views_by_query_[query]; HARD_ASSERT(query_view, "Trying to stop listening to a query not found"); [local_store_ releaseQuery:query]; @@ -321,7 +321,7 @@ NoDocument doc(limbo_key, SnapshotVersion::None(), std::vector new_view_snapshot; for (const auto& entry : query_views_by_query_) { - auto& query_view = entry.second; + auto query_view = entry.second; ViewChange view_change = query_view->view()->ApplyOnlineStateChange(online_state); HARD_ASSERT(view_change.limbo_changes().empty(), @@ -401,7 +401,7 @@ NoDocument doc(limbo_key, SnapshotVersion::None(), std::vector document_changes_in_all_views; for (const auto& entry : query_views_by_query_) { - auto& query_view = entry.second; + auto query_view = entry.second; const View& view = *query_view->view(); ViewDocumentChanges view_doc_changes = view.ComputeDocumentChanges(changes); if (view_doc_changes.needs_refill()) { From 8885c2454c3a6fa2d7f9daf7b7f0aa6b693101db Mon Sep 17 00:00:00 2001 From: Wu-Hui Date: Thu, 29 Aug 2019 23:50:34 -0400 Subject: [PATCH 4/7] reformat --- Firestore/core/src/firebase/firestore/core/sync_engine.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Firestore/core/src/firebase/firestore/core/sync_engine.h b/Firestore/core/src/firebase/firestore/core/sync_engine.h index 3d3bfd705e0..8edd86dc295 100644 --- a/Firestore/core/src/firebase/firestore/core/sync_engine.h +++ b/Firestore/core/src/firebase/firestore/core/sync_engine.h @@ -244,7 +244,7 @@ class SyncEngine { auth::User current_user_; SyncEngineCallback* sync_engine_callback_; - /** + /** * Used for creating the TargetId for the listens used to resolve limbo * documents. */ From 4e83273becc393c250c34a42c6351829ca11600f Mon Sep 17 00:00:00 2001 From: Wu-Hui Date: Sat, 31 Aug 2019 22:00:52 -0400 Subject: [PATCH 5/7] addressing comments #1 --- Firestore/Source/Core/FSTSyncEngine.mm | 7 +- .../src/firebase/firestore/core/sync_engine.h | 41 +++-- .../firebase/firestore/core/sync_engine.mm | 171 ++++++++++-------- 3 files changed, 121 insertions(+), 98 deletions(-) diff --git a/Firestore/Source/Core/FSTSyncEngine.mm b/Firestore/Source/Core/FSTSyncEngine.mm index 40be821b359..b39c14e9b63 100644 --- a/Firestore/Source/Core/FSTSyncEngine.mm +++ b/Firestore/Source/Core/FSTSyncEngine.mm @@ -158,11 +158,12 @@ - (void)transactionWithRetries:(int)retries workerQueue:(const std::shared_ptr &)workerQueue updateCallback:(core::TransactionUpdateCallback)updateCallback resultCallback:(core::TransactionResultCallback)resultCallback { - _syncEngine->Transaction(retries, workerQueue, updateCallback, resultCallback); + _syncEngine->Transaction(retries, workerQueue, std::move(updateCallback), + std::move(resultCallback)); } - (void)applyRemoteEvent:(const RemoteEvent &)remoteEvent { - _syncEngine->HandleRemoteEvent(remoteEvent); + _syncEngine->ApplyRemoteEvent(remoteEvent); } - (void)applyChangedOnlineState:(OnlineState)onlineState { @@ -186,7 +187,7 @@ - (void)rejectFailedWriteWithBatchID:(BatchId)batchID error:(NSError *)error { return _syncEngine->GetCurrentLimboDocuments(); } -- (void)credentialDidChangeWithUser:(const firebase::firestore::auth::User &)user { +- (void)credentialDidChangeWithUser:(const User &)user { _syncEngine->HandleCredentialChange(user); } - (DocumentKeySet)remoteKeysForTarget:(TargetId)targetId { diff --git a/Firestore/core/src/firebase/firestore/core/sync_engine.h b/Firestore/core/src/firebase/firestore/core/sync_engine.h index 8edd86dc295..049188238c5 100644 --- a/Firestore/core/src/firebase/firestore/core/sync_engine.h +++ b/Firestore/core/src/firebase/firestore/core/sync_engine.h @@ -17,6 +17,10 @@ #ifndef FIRESTORE_CORE_SRC_FIREBASE_FIRESTORE_CORE_SYNC_ENGINE_H_ #define FIRESTORE_CORE_SRC_FIREBASE_FIRESTORE_CORE_SYNC_ENGINE_H_ +#if !defined(__OBJC__) +#error "This header only supports Objective-C++" +#endif // !defined(__OBJC__) + #include #include #include @@ -36,7 +40,6 @@ #include "Firestore/core/src/firebase/firestore/model/document_key_set.h" #include "Firestore/core/src/firebase/firestore/model/maybe_document.h" #include "Firestore/core/src/firebase/firestore/remote/remote_store.h" -#include "Firestore/core/src/firebase/firestore/util/delayed_constructor.h" #include "Firestore/core/src/firebase/firestore/util/status.h" namespace firebase { @@ -61,14 +64,14 @@ class SyncEngine { public: SyncEngine(FSTLocalStore* local_store, remote::RemoteStore* remote_store, - const auth::User initial_user); + const auth::User& initial_user); void SetCallback(SyncEngineCallback* callback) { sync_engine_callback_ = callback; } /** - * Initiates a new listen. The FSTLocalStore will be queried for initial data + * Initiates a new listen. The LocalStore will be queried for initial data * and the listen will be sent to the `RemoteStore` to get remote data. The * registered SyncEngineCallback will be notified of resulting view * snapshots and/or listen errors. @@ -115,16 +118,17 @@ class SyncEngine { void HandleCredentialChange(const auth::User& user); // Implements `RemoteStoreCallback` - void HandleRemoteEvent(const remote::RemoteEvent& remote_event); + void ApplyRemoteEvent(const remote::RemoteEvent& remote_event); void HandleRejectedListen(model::TargetId target_id, util::Status error); void HandleSuccessfulWrite(const model::MutationBatchResult& batch_result); void HandleRejectedWrite(firebase::firestore::model::BatchId batchID, util::Status error); void HandleOnlineStateChange(model::OnlineState online_state); - model::DocumentKeySet GetRemoteKeys(model::TargetId targetId); + model::DocumentKeySet GetRemoteKeys(model::TargetId targetId) const; // For tests only - std::map GetCurrentLimboDocuments() { + std::map GetCurrentLimboDocuments() + const { // Return defensive copy return limbo_targets_by_key_; } @@ -146,7 +150,7 @@ class SyncEngine { view_(std::move(view)) { } - const Query& query() { + const Query& query() const { return query_; } @@ -154,7 +158,7 @@ class SyncEngine { * The target ID created by the client that is used in the watch stream to * identify this query. */ - model::TargetId target_id() { + model::TargetId target_id() const { return target_id_; } @@ -163,7 +167,7 @@ class SyncEngine { * the results that was received. This can be used to indicate where to * continue receiving new doc changes for the query. */ - const nanopb::ByteString resume_token() { + const nanopb::ByteString resume_token() const { return resume_token_; } @@ -173,8 +177,8 @@ class SyncEngine { * applies the query filters and limits to determine the most correct * possible results. */ - View* view() { - return &view_; + View& view() { + return view_; } private: @@ -187,8 +191,7 @@ class SyncEngine { /** Tracks a limbo resolution. */ class LimboResolution { public: - LimboResolution() { - } + LimboResolution() = default; explicit LimboResolution(const model::DocumentKey& key) : key{key} { } @@ -204,7 +207,7 @@ class SyncEngine { bool document_received = false; }; - void AssertCallbackExists(std::string source); + void AssertCallbackExists(absl::string_view source); ViewSnapshot InitializeViewAndComputeSnapshot( const local::QueryData& query_data); @@ -217,7 +220,7 @@ class SyncEngine { const model::MaybeDocumentMap& changes, const absl::optional& maybe_remote_event); - /** Updates the limbo document state for the given targetID. */ + /** Updates the limbo document state for the given target_id. */ void UpdateTrackedLimboDocuments( const std::vector& limbo_changes, model::TargetId target_id); @@ -233,16 +236,14 @@ class SyncEngine { void TriggerPendingWriteCallbacks(model::BatchId batch_id); void FailOutstandingPendingWriteCallbacks(absl::string_view message); - bool ErrorIsInteresting(util::Status error); - /** The local store, used to persist mutations and cached documents. */ FSTLocalStore* local_store_; /** The remote store for sending writes, watches, etc. to the backend. */ - remote::RemoteStore* remote_store_; + remote::RemoteStore* remote_store_ = nullptr; auth::User current_user_; - SyncEngineCallback* sync_engine_callback_; + SyncEngineCallback* sync_engine_callback_ = nullptr; /** * Used for creating the TargetId for the listens used to resolve limbo @@ -250,7 +251,7 @@ class SyncEngine { */ TargetIdGenerator target_id_generator_; - /** Stores user completion blocks, indexed by user and BatchId. */ + /** Stores user completion blocks, indexed by User and BatchId. */ std::unordered_map, auth::HashUser> diff --git a/Firestore/core/src/firebase/firestore/core/sync_engine.mm b/Firestore/core/src/firebase/firestore/core/sync_engine.mm index 58f1f2f645d..05b05cde68d 100644 --- a/Firestore/core/src/firebase/firestore/core/sync_engine.mm +++ b/Firestore/core/src/firebase/firestore/core/sync_engine.mm @@ -29,9 +29,29 @@ #include "Firestore/core/src/firebase/firestore/util/log.h" #include "Firestore/core/src/firebase/firestore/util/status.h" +namespace { + +using firebase::firestore::Error; +using firebase::firestore::model::ListenSequenceNumber; +using firebase::firestore::util::Status; + +// Limbo documents don't use persistence, and are eagerly GC'd. So, listens for +// them don't need real sequence numbers. +const ListenSequenceNumber kIrrelevantSequenceNumber = -1; + +bool ErrorIsInteresting(const Status& error) { + bool missing_index = + (error.code() == Error::FailedPrecondition && + error.error_message().find("requires an index") != std::string::npos); + bool no_permission = (error.code() == Error::PermissionDenied); + return missing_index || no_permission; +} + +} // namespace namespace firebase { namespace firestore { namespace core { + using auth::User; using local::LocalViewChanges; using local::LocalWriteResult; @@ -53,20 +73,16 @@ using util::Status; using util::StatusCallback; -// Limbo documents don't use persistence, and are eagerly GC'd. So, listens for -// them don't need real sequence numbers. -static const ListenSequenceNumber kIrrelevantSequenceNumber = -1; - SyncEngine::SyncEngine(FSTLocalStore* local_store, remote::RemoteStore* remote_store, - const auth::User initial_user) + const auth::User& initial_user) : local_store_(local_store), remote_store_(remote_store), current_user_(initial_user), target_id_generator_(TargetIdGenerator::SyncEngineTargetIdGenerator()) { } -void SyncEngine::AssertCallbackExists(std::string source) { +void SyncEngine::AssertCallbackExists(absl::string_view source) { HARD_ASSERT(sync_engine_callback_, "Tried to call '%s' before callback was registered.", source); } @@ -76,12 +92,16 @@ HARD_ASSERT(query_views_by_query_.find(query) == query_views_by_query_.end(), "We already listen to query: %s", query.ToString()); - QueryData queryData = [local_store_ allocateQuery:query]; - ViewSnapshot viewSnapshot = InitializeViewAndComputeSnapshot(queryData); - sync_engine_callback_->OnViewSnapshots({viewSnapshot}); + QueryData query_data = [local_store_ allocateQuery:query]; + ViewSnapshot view_snapshot = InitializeViewAndComputeSnapshot(query_data); + std::vector snapshots; + // Not using the `std::initializer_list` constructor to avoid extra copies. + snapshots.push_back(std::move(view_snapshot)); + sync_engine_callback_->OnViewSnapshots(std::move(snapshots)); - remote_store_->Listen(queryData); - return queryData.target_id(); + // TODO(wuandy): move `query_data` into `Listen`. + remote_store_->Listen(query_data); + return query_data.target_id(); } ViewSnapshot SyncEngine::InitializeViewAndComputeSnapshot( @@ -99,7 +119,7 @@ auto query_view = std::make_shared(query_data.query(), query_data.target_id(), - query_data.resume_token(), view); + query_data.resume_token(), std::move(view)); query_views_by_query_[query_data.query()] = query_view; query_views_by_target_[query_data.target_id()] = query_view; @@ -142,7 +162,7 @@ LocalWriteResult result = [local_store_ locallyWriteMutations:std::move(mutations)]; mutation_callbacks_[current_user_].insert( - std::make_pair(result.batch_id(), callback)); + std::make_pair(result.batch_id(), std::move(callback))); EmitNewSnapshotsAndNotifyLocalStore(result.changes(), absl::nullopt); remote_store_->FillWritePipeline(); @@ -176,8 +196,9 @@ HARD_ASSERT(retries >= 0, "Got negative number of retries for transaction"); // Allocate a shared_ptr so that the TransactionRunner can outlive this frame. - auto runner = std::make_shared( - worker_queue, remote_store_, update_callback, result_callback); + auto runner = std::make_shared(worker_queue, remote_store_, + std::move(update_callback), + std::move(result_callback)); runner->Run(); } @@ -199,36 +220,37 @@ remote_store_->HandleCredentialChange(); } -void SyncEngine::HandleRemoteEvent(const RemoteEvent& remote_event) { +void SyncEngine::ApplyRemoteEvent(const RemoteEvent& remote_event) { AssertCallbackExists("HandleRemoteEvent"); // Update received document as appropriate for any limbo targets. for (const auto& entry : remote_event.target_changes()) { TargetId target_id = entry.first; const TargetChange& change = entry.second; - const auto iter = limbo_resolutions_by_target_.find(target_id); - if (iter != limbo_resolutions_by_target_.end()) { - LimboResolution& limbo_resolution = iter->second; - // Since this is a limbo resolution lookup, it's for a single document and - // it could be added, modified, or removed, but not a combination. - HARD_ASSERT( - change.added_documents().size() + change.modified_documents().size() + - change.removed_documents().size() <= - 1, - "Limbo resolution for single document contains multiple changes."); - - if (change.added_documents().size() > 0) { - limbo_resolution.document_received = true; - } else if (change.modified_documents().size() > 0) { - HARD_ASSERT(limbo_resolution.document_received, - "Received change for limbo target document without add."); - } else if (change.removed_documents().size() > 0) { - HARD_ASSERT(limbo_resolution.document_received, - "Received remove for limbo target document without add."); - limbo_resolution.document_received = false; - } else { - // This was probably just a CURRENT target change or similar. - } + auto it = limbo_resolutions_by_target_.find(target_id); + if (it == limbo_resolutions_by_target_.end()) { + continue; + } + LimboResolution& limbo_resolution = it->second; + // Since this is a limbo resolution lookup, it's for a single document and + // it could be added, modified, or removed, but not a combination. + HARD_ASSERT( + change.added_documents().size() + change.modified_documents().size() + + change.removed_documents().size() <= + 1, + "Limbo resolution for single document contains multiple changes."); + + if (!change.added_documents().empty()) { + limbo_resolution.document_received = true; + } else if (!change.modified_documents().empty()) { + HARD_ASSERT(limbo_resolution.document_received, + "Received change for limbo target document without add."); + } else if (!change.removed_documents().empty()) { + HARD_ASSERT(limbo_resolution.document_received, + "Received remove for limbo target document without add."); + limbo_resolution.document_received = false; + } else { + // This was probably just a CURRENT target change or similar. } } @@ -239,9 +261,9 @@ void SyncEngine::HandleRejectedListen(TargetId target_id, Status error) { AssertCallbackExists("HandleRejectedListen"); - const auto iter = limbo_resolutions_by_target_.find(target_id); - if (iter != limbo_resolutions_by_target_.end()) { - const DocumentKey limbo_key = iter->second.key; + auto it = limbo_resolutions_by_target_.find(target_id); + if (it != limbo_resolutions_by_target_.end()) { + DocumentKey limbo_key = it->second.key; // Since this query failed, we won't want to manually unlisten to it. // So go ahead and remove it from bookkeeping. limbo_targets_by_key_.erase(limbo_key); @@ -260,7 +282,8 @@ NoDocument doc(limbo_key, SnapshotVersion::None(), /*target_mismatches=*/{}, /*document_updates=*/{{limbo_key, doc}}, std::move(limbo_documents)}; - HandleRemoteEvent(event); + ApplyRemoteEvent(event); + } else { auto found = query_views_by_target_.find(target_id); HARD_ASSERT(found != query_views_by_target_.end(), "Unknown target id: %s", @@ -273,7 +296,7 @@ NoDocument doc(limbo_key, SnapshotVersion::None(), LOG_WARN("Listen for query at %s failed: %s", query.path().CanonicalString(), error.error_message()); } - sync_engine_callback_->OnError(query, error); + sync_engine_callback_->OnError(query, std::move(error)); } } @@ -297,6 +320,7 @@ NoDocument doc(limbo_key, SnapshotVersion::None(), void SyncEngine::HandleRejectedWrite( firebase::firestore::model::BatchId batch_id, Status error) { AssertCallbackExists("HandleRejectedWrite"); + MaybeDocumentMap changes = [local_store_ rejectBatchID:batch_id]; if (!changes.empty() && ErrorIsInteresting(error)) { @@ -309,7 +333,7 @@ NoDocument doc(limbo_key, SnapshotVersion::None(), // events immediately (depending on whether the watcher is caught up), so we // raise user callbacks first so that they consistently happen before listen // events. - NotifyUser(batch_id, error); + NotifyUser(batch_id, std::move(error)); TriggerPendingWriteCallbacks(batch_id); @@ -321,9 +345,9 @@ NoDocument doc(limbo_key, SnapshotVersion::None(), std::vector new_view_snapshot; for (const auto& entry : query_views_by_query_) { - auto query_view = entry.second; + const auto& query_view = entry.second; ViewChange view_change = - query_view->view()->ApplyOnlineStateChange(online_state); + query_view->view().ApplyOnlineStateChange(online_state); HARD_ASSERT(view_change.limbo_changes().empty(), "OnlineState should not affect limbo documents."); if (view_change.snapshot().has_value()) { @@ -335,15 +359,15 @@ NoDocument doc(limbo_key, SnapshotVersion::None(), sync_engine_callback_->HandleOnlineStateChange(online_state); } -DocumentKeySet SyncEngine::GetRemoteKeys(TargetId target_id) { - const auto iter = limbo_resolutions_by_target_.find(target_id); - if (iter != limbo_resolutions_by_target_.end() && - iter->second.document_received) { - return DocumentKeySet{iter->second.key}; +DocumentKeySet SyncEngine::GetRemoteKeys(TargetId target_id) const { + auto it = limbo_resolutions_by_target_.find(target_id); + if (it != limbo_resolutions_by_target_.end() && + it->second.document_received) { + return DocumentKeySet{it->second.key}; } else { auto found = query_views_by_target_.find(target_id); - if (found != query_views_by_target_.end() && found->second) { - return found->second->view()->synced_documents(); + if (found != query_views_by_target_.end()) { + return found->second->view().synced_documents(); } return DocumentKeySet{}; } @@ -354,13 +378,15 @@ NoDocument doc(limbo_key, SnapshotVersion::None(), // NOTE: Mutations restored from persistence won't have callbacks, so // it's okay for this (or the callback below) to not exist. - if (it != mutation_callbacks_.end()) { - std::unordered_map& callbacks = it->second; - auto callback_it = callbacks.find(batch_id); - if (callback_it != callbacks.end()) { - callback_it->second(status); - callbacks.erase(callback_it); - } + if (it == mutation_callbacks_.end()) { + return; + } + + std::unordered_map& callbacks = it->second; + auto callback_it = callbacks.find(batch_id); + if (callback_it != callbacks.end()) { + callback_it->second(std::move(status)); + callbacks.erase(callback_it); } } @@ -386,14 +412,6 @@ NoDocument doc(limbo_key, SnapshotVersion::None(), pending_writes_callbacks_.clear(); } -bool SyncEngine::ErrorIsInteresting(Status error) { - bool missing_index = - (error.code() == Error::FailedPrecondition && - error.error_message().find("requires an index") != std::string::npos); - bool no_permission = (error.code() == Error::PermissionDenied); - return missing_index || no_permission; -} - void SyncEngine::EmitNewSnapshotsAndNotifyLocalStore( const MaybeDocumentMap& changes, const absl::optional& maybe_remote_event) { @@ -401,8 +419,8 @@ NoDocument doc(limbo_key, SnapshotVersion::None(), std::vector document_changes_in_all_views; for (const auto& entry : query_views_by_query_) { - auto query_view = entry.second; - const View& view = *query_view->view(); + const auto& query_view = entry.second; + const View& view = query_view->view(); ViewDocumentChanges view_doc_changes = view.ComputeDocumentChanges(changes); if (view_doc_changes.needs_refill()) { // The query has a limit and some docs were removed/updated, so we need to @@ -422,7 +440,7 @@ NoDocument doc(limbo_key, SnapshotVersion::None(), } } ViewChange view_change = - query_view->view()->ApplyChanges(view_doc_changes, target_changes); + query_view->view().ApplyChanges(view_doc_changes, target_changes); UpdateTrackedLimboDocuments(view_change.limbo_changes(), query_view->target_id()); @@ -469,24 +487,27 @@ NoDocument doc(limbo_key, SnapshotVersion::None(), if (limbo_targets_by_key_.find(key) == limbo_targets_by_key_.end()) { LOG_DEBUG("New document in limbo: %s", key.ToString()); + TargetId limbo_target_id = target_id_generator_.NextId(); Query query(key.path()); QueryData query_data(std::move(query), limbo_target_id, kIrrelevantSequenceNumber, QueryPurpose::LimboResolution); limbo_resolutions_by_target_.emplace(limbo_target_id, LimboResolution{key}); + // TODO(wuandy): move `query_data` into `Listen`. remote_store_->Listen(query_data); limbo_targets_by_key_[key] = limbo_target_id; } } void SyncEngine::RemoveLimboTarget(const DocumentKey& key) { - const auto iter = limbo_targets_by_key_.find(key); - if (iter == limbo_targets_by_key_.end()) { + auto it = limbo_targets_by_key_.find(key); + if (it == limbo_targets_by_key_.end()) { // This target already got removed, because the query failed. return; } - TargetId limbo_target_id = iter->second; + + TargetId limbo_target_id = it->second; remote_store_->StopListening(limbo_target_id); limbo_targets_by_key_.erase(key); limbo_resolutions_by_target_.erase(limbo_target_id); From 116091da3afece54651cc4e66d9e47ff816d66cf Mon Sep 17 00:00:00 2001 From: Wu-Hui Date: Wed, 4 Sep 2019 10:24:29 -0400 Subject: [PATCH 6/7] addressing comments #2 --- .../src/firebase/firestore/core/sync_engine.h | 5 +++- .../firebase/firestore/core/sync_engine.mm | 27 +++++++++++-------- 2 files changed, 20 insertions(+), 12 deletions(-) diff --git a/Firestore/core/src/firebase/firestore/core/sync_engine.h b/Firestore/core/src/firebase/firestore/core/sync_engine.h index 049188238c5..61969506e63 100644 --- a/Firestore/core/src/firebase/firestore/core/sync_engine.h +++ b/Firestore/core/src/firebase/firestore/core/sync_engine.h @@ -41,6 +41,7 @@ #include "Firestore/core/src/firebase/firestore/model/maybe_document.h" #include "Firestore/core/src/firebase/firestore/remote/remote_store.h" #include "Firestore/core/src/firebase/firestore/util/status.h" +#include "absl/strings/string_view.h" namespace firebase { namespace firestore { @@ -167,7 +168,7 @@ class SyncEngine { * the results that was received. This can be used to indicate where to * continue receiving new doc changes for the query. */ - const nanopb::ByteString resume_token() const { + const nanopb::ByteString& resume_token() const { return resume_token_; } @@ -261,6 +262,8 @@ class SyncEngine { std::unordered_map> pending_writes_callbacks_; + // Shared pointers are used to avoid creating and storing two copies of the + // same `QueryView` and for consistency with other platforms. /** QueryViews for all active queries, indexed by query. */ std::unordered_map> query_views_by_query_; diff --git a/Firestore/core/src/firebase/firestore/core/sync_engine.mm b/Firestore/core/src/firebase/firestore/core/sync_engine.mm index 05b05cde68d..4d03ccd958b 100644 --- a/Firestore/core/src/firebase/firestore/core/sync_engine.mm +++ b/Firestore/core/src/firebase/firestore/core/sync_engine.mm @@ -29,11 +29,15 @@ #include "Firestore/core/src/firebase/firestore/util/log.h" #include "Firestore/core/src/firebase/firestore/util/status.h" +namespace firebase { +namespace firestore { +namespace core { + namespace { -using firebase::firestore::Error; -using firebase::firestore::model::ListenSequenceNumber; -using firebase::firestore::util::Status; +using firestore::Error; +using model::ListenSequenceNumber; +using util::Status; // Limbo documents don't use persistence, and are eagerly GC'd. So, listens for // them don't need real sequence numbers. @@ -48,9 +52,6 @@ bool ErrorIsInteresting(const Status& error) { } } // namespace -namespace firebase { -namespace firestore { -namespace core { using auth::User; using local::LocalViewChanges; @@ -89,6 +90,7 @@ bool ErrorIsInteresting(const Status& error) { TargetId SyncEngine::Listen(Query query) { AssertCallbackExists("Listen"); + HARD_ASSERT(query_views_by_query_.find(query) == query_views_by_query_.end(), "We already listen to query: %s", query.ToString()); @@ -131,6 +133,7 @@ bool ErrorIsInteresting(const Status& error) { void SyncEngine::StopListening(const Query& query) { AssertCallbackExists("StopListening"); + auto query_view = query_views_by_query_[query]; HARD_ASSERT(query_view, "Trying to stop listening to a query not found"); @@ -231,13 +234,15 @@ bool ErrorIsInteresting(const Status& error) { if (it == limbo_resolutions_by_target_.end()) { continue; } + LimboResolution& limbo_resolution = it->second; // Since this is a limbo resolution lookup, it's for a single document and // it could be added, modified, or removed, but not a combination. + auto changed_documents_count = change.added_documents().size() + + change.modified_documents().size() + + change.removed_documents().size(); HARD_ASSERT( - change.added_documents().size() + change.modified_documents().size() + - change.removed_documents().size() <= - 1, + changed_documents_count <= 1, "Limbo resolution for single document contains multiple changes."); if (!change.added_documents().empty()) { @@ -420,7 +425,7 @@ NoDocument doc(limbo_key, SnapshotVersion::None(), for (const auto& entry : query_views_by_query_) { const auto& query_view = entry.second; - const View& view = query_view->view(); + View& view = query_view->view(); ViewDocumentChanges view_doc_changes = view.ComputeDocumentChanges(changes); if (view_doc_changes.needs_refill()) { // The query has a limit and some docs were removed/updated, so we need to @@ -440,7 +445,7 @@ NoDocument doc(limbo_key, SnapshotVersion::None(), } } ViewChange view_change = - query_view->view().ApplyChanges(view_doc_changes, target_changes); + view.ApplyChanges(view_doc_changes, target_changes); UpdateTrackedLimboDocuments(view_change.limbo_changes(), query_view->target_id()); From 3ec0f5ff72d6e915517afa358d52b77d2e1ddee2 Mon Sep 17 00:00:00 2001 From: Wu-Hui Date: Wed, 4 Sep 2019 14:25:51 -0400 Subject: [PATCH 7/7] addressing comments #3 --- .../firebase/firestore/core/sync_engine.mm | 31 +++++++++---------- 1 file changed, 14 insertions(+), 17 deletions(-) diff --git a/Firestore/core/src/firebase/firestore/core/sync_engine.mm b/Firestore/core/src/firebase/firestore/core/sync_engine.mm index 4d03ccd958b..8d9a61c0598 100644 --- a/Firestore/core/src/firebase/firestore/core/sync_engine.mm +++ b/Firestore/core/src/firebase/firestore/core/sync_engine.mm @@ -36,23 +36,6 @@ namespace { using firestore::Error; -using model::ListenSequenceNumber; -using util::Status; - -// Limbo documents don't use persistence, and are eagerly GC'd. So, listens for -// them don't need real sequence numbers. -const ListenSequenceNumber kIrrelevantSequenceNumber = -1; - -bool ErrorIsInteresting(const Status& error) { - bool missing_index = - (error.code() == Error::FailedPrecondition && - error.error_message().find("requires an index") != std::string::npos); - bool no_permission = (error.code() == Error::PermissionDenied); - return missing_index || no_permission; -} - -} // namespace - using auth::User; using local::LocalViewChanges; using local::LocalWriteResult; @@ -74,6 +57,20 @@ bool ErrorIsInteresting(const Status& error) { using util::Status; using util::StatusCallback; +// Limbo documents don't use persistence, and are eagerly GC'd. So, listens for +// them don't need real sequence numbers. +const ListenSequenceNumber kIrrelevantSequenceNumber = -1; + +bool ErrorIsInteresting(const Status& error) { + bool missing_index = + (error.code() == Error::FailedPrecondition && + error.error_message().find("requires an index") != std::string::npos); + bool no_permission = (error.code() == Error::PermissionDenied); + return missing_index || no_permission; +} + +} // namespace + SyncEngine::SyncEngine(FSTLocalStore* local_store, remote::RemoteStore* remote_store, const auth::User& initial_user)