Skip to content

Commit fade6ea

Browse files
authored
Merge c9d70a3 into d7a56a1
2 parents d7a56a1 + c9d70a3 commit fade6ea

File tree

5 files changed

+90
-103
lines changed

5 files changed

+90
-103
lines changed

firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/DataConnectCredentialsTokenManager.kt

+42-53
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ import kotlinx.coroutines.ensureActive
4747
import kotlinx.coroutines.flow.MutableStateFlow
4848
import kotlinx.coroutines.flow.filter
4949
import kotlinx.coroutines.flow.first
50+
import kotlinx.coroutines.flow.getAndUpdate
5051
import kotlinx.coroutines.launch
5152

5253
/** Base class that shares logic for managing the Auth token and AppCheck token. */
@@ -148,9 +149,18 @@ internal sealed class DataConnectCredentialsTokenManager<T : Any>(
148149
*/
149150
fun close() {
150151
logger.debug { "close()" }
152+
151153
weakThis.clear()
152154
coroutineScope.cancel()
153-
setClosedState()
155+
156+
val oldState = state.getAndUpdate { State.Closed }
157+
when (oldState) {
158+
is State.Closed -> {}
159+
is State.New -> {}
160+
is State.StateWithProvider -> {
161+
removeTokenListener(oldState.provider)
162+
}
163+
}
154164
}
155165

156166
/**
@@ -175,51 +185,30 @@ internal sealed class DataConnectCredentialsTokenManager<T : Any>(
175185
logger.debug { "awaitTokenProvider() done: currentState=$currentState" }
176186
}
177187

178-
// This function must ONLY be called from close().
179-
private fun setClosedState() {
180-
while (true) {
181-
val oldState = state.value
182-
val provider: T? =
183-
when (oldState) {
184-
is State.Closed -> return
185-
is State.New -> null
186-
is State.Idle -> oldState.provider
187-
is State.Active -> oldState.provider
188-
}
189-
190-
if (state.compareAndSet(oldState, State.Closed)) {
191-
provider?.let { removeTokenListener(it) }
192-
break
193-
}
194-
}
195-
}
196-
197188
/**
198189
* Sets a flag to force-refresh the token upon the next call to [getToken].
199190
*
200191
* If [close] has been called, this method does nothing.
201192
*/
202193
fun forceRefresh() {
203194
logger.debug { "forceRefresh()" }
204-
while (true) {
205-
val oldState = state.value
206-
val newState: State.StateWithForceTokenRefresh<T> =
195+
val oldState =
196+
state.getAndUpdate { oldState ->
207197
when (oldState) {
208-
is State.Closed -> return
198+
is State.Closed -> State.Closed
209199
is State.New -> oldState.copy(forceTokenRefresh = true)
210200
is State.Idle -> oldState.copy(forceTokenRefresh = true)
211-
is State.Active -> {
212-
val message = "needs token refresh (wgrwbrvjxt)"
213-
oldState.job.cancel(message, ForceRefresh(message))
214-
State.Idle(oldState.provider, forceTokenRefresh = true)
215-
}
201+
is State.Active -> State.Idle(oldState.provider, forceTokenRefresh = true)
216202
}
217-
218-
check(newState.forceTokenRefresh) {
219-
"newState.forceTokenRefresh should be true (error code gnvr2wx7nz)"
220203
}
221-
if (state.compareAndSet(oldState, newState)) {
222-
break
204+
205+
when (oldState) {
206+
is State.Closed -> {}
207+
is State.New -> {}
208+
is State.Idle -> {}
209+
is State.Active -> {
210+
val message = "needs token refresh (wgrwbrvjxt)"
211+
oldState.job.cancel(message, ForceRefresh(message))
223212
}
224213
}
225214
}
@@ -350,30 +339,30 @@ internal sealed class DataConnectCredentialsTokenManager<T : Any>(
350339
logger.debug { "onProviderAvailable(newProvider=$newProvider)" }
351340
addTokenListener(newProvider)
352341

353-
while (true) {
354-
val oldState = state.value
355-
val newState =
342+
val oldState =
343+
state.getAndUpdate { oldState ->
356344
when (oldState) {
357-
is State.Closed -> {
358-
logger.debug {
359-
"onProviderAvailable(newProvider=$newProvider)" +
360-
" unregistering token listener that was just added"
361-
}
362-
removeTokenListener(newProvider)
363-
break
364-
}
345+
is State.Closed -> State.Closed
365346
is State.New -> State.Idle(newProvider, oldState.forceTokenRefresh)
366347
is State.Idle -> State.Idle(newProvider, oldState.forceTokenRefresh)
367-
is State.Active -> {
368-
val newProviderClassName = newProvider::class.qualifiedName
369-
val message = "a new provider $newProviderClassName is available (symhxtmazy)"
370-
oldState.job.cancel(message, NewProvider(message))
371-
State.Idle(newProvider, forceTokenRefresh = false)
372-
}
348+
is State.Active -> State.Idle(newProvider, forceTokenRefresh = false)
373349
}
350+
}
374351

375-
if (state.compareAndSet(oldState, newState)) {
376-
break
352+
when (oldState) {
353+
is State.Closed -> {
354+
logger.debug {
355+
"onProviderAvailable(newProvider=$newProvider)" +
356+
" unregistering token listener that was just added"
357+
}
358+
removeTokenListener(newProvider)
359+
}
360+
is State.New -> {}
361+
is State.Idle -> {}
362+
is State.Active -> {
363+
val newProviderClassName = newProvider::class.qualifiedName
364+
val message = "a new provider $newProviderClassName is available (symhxtmazy)"
365+
oldState.job.cancel(message, NewProvider(message))
377366
}
378367
}
379368
}

firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/FirebaseDataConnectImpl.kt

+29-22
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ import kotlinx.coroutines.async
5454
import kotlinx.coroutines.cancel
5555
import kotlinx.coroutines.flow.MutableStateFlow
5656
import kotlinx.coroutines.flow.collect
57+
import kotlinx.coroutines.flow.updateAndGet
5758
import kotlinx.coroutines.runBlocking
5859
import kotlinx.coroutines.sync.Mutex
5960
import kotlinx.coroutines.sync.withLock
@@ -406,34 +407,40 @@ internal class FirebaseDataConnectImpl(
406407
dataConnectAuth.close()
407408
dataConnectAppCheck.close()
408409

409-
// Start the job to asynchronously close the gRPC client.
410-
while (true) {
411-
val oldCloseJob = closeJob.value
412-
413-
oldCloseJob.ref?.let {
414-
if (!it.isCancelled) {
415-
return it
416-
}
410+
// Create the "close job" to asynchronously close the gRPC client.
411+
@OptIn(DelicateCoroutinesApi::class)
412+
val newCloseJob =
413+
GlobalScope.async<Unit>(start = CoroutineStart.LAZY) {
414+
lazyGrpcRPCs.initializedValueOrNull?.close()
417415
}
416+
newCloseJob.invokeOnCompletion { exception ->
417+
if (exception === null) {
418+
logger.debug { "close() completed successfully" }
419+
} else {
420+
logger.warn(exception) { "close() failed" }
421+
}
422+
}
418423

419-
@OptIn(DelicateCoroutinesApi::class)
420-
val newCloseJob =
421-
GlobalScope.async<Unit>(start = CoroutineStart.LAZY) {
422-
lazyGrpcRPCs.initializedValueOrNull?.close()
423-
}
424-
425-
newCloseJob.invokeOnCompletion { exception ->
426-
if (exception === null) {
427-
logger.debug { "close() completed successfully" }
424+
// Register the new "close job", unless there is a "close job" already in progress or one that
425+
// completed successfully.
426+
val updatedCloseJob =
427+
closeJob.updateAndGet { oldCloseJob ->
428+
if (oldCloseJob.ref !== null && !oldCloseJob.ref.isCancelled) {
429+
oldCloseJob
428430
} else {
429-
logger.warn(exception) { "close() failed" }
431+
NullableReference(newCloseJob)
430432
}
431433
}
432434

433-
if (closeJob.compareAndSet(oldCloseJob, NullableReference(newCloseJob))) {
434-
newCloseJob.start()
435-
return newCloseJob
436-
}
435+
// If the updated "close job" was the one that we created, then start it!
436+
if (updatedCloseJob.ref === newCloseJob) {
437+
newCloseJob.start()
438+
}
439+
440+
// Return the job "close job" that is active or already completed so that the caller can await
441+
// its result.
442+
return checkNotNull(updatedCloseJob.ref) {
443+
"updatedCloseJob.ref should not have been null (error code y5fk4ntdnd)"
437444
}
438445
}
439446

firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/QuerySubscriptionImpl.kt

+10-14
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import kotlinx.coroutines.cancelAndJoin
2727
import kotlinx.coroutines.flow.Flow
2828
import kotlinx.coroutines.flow.MutableStateFlow
2929
import kotlinx.coroutines.flow.channelFlow
30+
import kotlinx.coroutines.flow.update
3031
import kotlinx.coroutines.launch
3132

3233
internal class QuerySubscriptionImpl<Data, Variables>(query: QueryRefImpl<Data, Variables>) :
@@ -80,22 +81,17 @@ internal class QuerySubscriptionImpl<Data, Variables>(query: QueryRefImpl<Data,
8081
}
8182

8283
private fun updateLastResult(prospectiveLastResult: QuerySubscriptionResultImpl) {
83-
// Update the last result in a compare-and-swap loop so that there is no possibility of
84-
// clobbering a newer result with an older result, compared using their sequence numbers.
8584
// TODO: Fix this so that results from an old query do not clobber results from a new query,
8685
// as set by a call to update()
87-
while (true) {
88-
val currentLastResult = _lastResult.value
89-
if (currentLastResult.ref != null) {
90-
val currentSequenceNumber = currentLastResult.ref.sequencedResult.sequenceNumber
91-
val prospectiveSequenceNumber = prospectiveLastResult.sequencedResult.sequenceNumber
92-
if (currentSequenceNumber >= prospectiveSequenceNumber) {
93-
return
94-
}
95-
}
96-
97-
if (_lastResult.compareAndSet(currentLastResult, NullableReference(prospectiveLastResult))) {
98-
return
86+
_lastResult.update { currentLastResult ->
87+
if (
88+
currentLastResult.ref != null &&
89+
currentLastResult.ref.sequencedResult.sequenceNumber >=
90+
prospectiveLastResult.sequencedResult.sequenceNumber
91+
) {
92+
currentLastResult
93+
} else {
94+
NullableReference(prospectiveLastResult)
9995
}
10096
}
10197
}

firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/querymgr/RegisteredDataDeserialzer.kt

+5-7
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import kotlinx.coroutines.channels.BufferOverflow
3131
import kotlinx.coroutines.flow.MutableSharedFlow
3232
import kotlinx.coroutines.flow.MutableStateFlow
3333
import kotlinx.coroutines.flow.onSubscription
34+
import kotlinx.coroutines.flow.update
3435
import kotlinx.coroutines.withContext
3536
import kotlinx.serialization.DeserializationStrategy
3637
import kotlinx.serialization.modules.SerializersModule
@@ -84,17 +85,14 @@ internal class RegisteredDataDeserializer<T>(
8485
lazyDeserialize(requestId, sequencedResult)
8586
)
8687

87-
// Use a compare-and-swap ("CAS") loop to ensure that an old update never clobbers a newer one.
88-
while (true) {
89-
val currentUpdate = latestUpdate.value
88+
latestUpdate.update { currentUpdate ->
9089
if (
9190
currentUpdate.ref !== null &&
9291
currentUpdate.ref.sequenceNumber > sequencedResult.sequenceNumber
9392
) {
94-
break // don't clobber a newer update with an older one
95-
}
96-
if (latestUpdate.compareAndSet(currentUpdate, NullableReference(newUpdate))) {
97-
break
93+
currentUpdate // don't clobber a newer update with an older one
94+
} else {
95+
NullableReference(newUpdate)
9896
}
9997
}
10098

firebase-dataconnect/testutil/src/main/kotlin/com/google/firebase/dataconnect/testutil/SuspendingCountDownLatch.kt

+4-7
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package com.google.firebase.dataconnect.testutil
1919
import kotlinx.coroutines.flow.MutableStateFlow
2020
import kotlinx.coroutines.flow.filter
2121
import kotlinx.coroutines.flow.first
22+
import kotlinx.coroutines.flow.update
2223

2324
/**
2425
* An implementation of [java.util.concurrent.CountDownLatch] that suspends instead of blocking.
@@ -60,14 +61,10 @@ class SuspendingCountDownLatch(count: Int) {
6061
* @throws IllegalStateException if called when the count has already reached zero.
6162
*/
6263
fun countDown(): SuspendingCountDownLatch {
63-
while (true) {
64-
val oldValue = _count.value
64+
_count.update { oldValue ->
6565
check(oldValue > 0) { "countDown() called too many times (oldValue=$oldValue)" }
66-
67-
val newValue = oldValue - 1
68-
if (_count.compareAndSet(oldValue, newValue)) {
69-
return this
70-
}
66+
oldValue - 1
7167
}
68+
return this
7269
}
7370
}

0 commit comments

Comments
 (0)