diff --git a/core/src/main/java/com/google/adk/models/Gemini.java b/core/src/main/java/com/google/adk/models/Gemini.java index 74cf78b9..7aaaa27d 100644 --- a/core/src/main/java/com/google/adk/models/Gemini.java +++ b/core/src/main/java/com/google/adk/models/Gemini.java @@ -17,9 +17,12 @@ package com.google.adk.models; import static com.google.common.base.StandardSystemProperty.JAVA_VERSION; +import static net.javacrumbs.futureconverter.java8guava.FutureConverter.toListenableFuture; import com.google.adk.Version; +import com.google.async.rxjava3.Singles; import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.ListenableFuture; import com.google.errorprone.annotations.CanIgnoreReturnValue; import com.google.genai.Client; import com.google.genai.ResponseStream; @@ -32,11 +35,11 @@ import com.google.genai.types.LiveConnectConfig; import com.google.genai.types.Part; import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.schedulers.Schedulers; import java.util.ArrayList; import java.util.List; import java.util.Objects; import java.util.Optional; -import java.util.concurrent.CompletableFuture; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -218,14 +221,17 @@ public Flowable generateContent(LlmRequest llmRequest, boolean stre if (stream) { logger.debug("Sending streaming generateContent request to model {}", effectiveModelName); - CompletableFuture> streamFuture = - apiClient.async.models.generateContentStream( - effectiveModelName, llmRequest.contents(), config); + ListenableFuture> streamFuture = + toListenableFuture( + apiClient.async.models.generateContentStream( + effectiveModelName, llmRequest.contents(), config)); return Flowable.defer( () -> processRawResponses( - Flowable.fromFuture(streamFuture).flatMapIterable(iterable -> iterable))) + Singles.toSingle(() -> streamFuture, Schedulers.io()) + .toFlowable() + .flatMapIterable(iterable -> iterable))) .filter( llmResponse -> llmResponse @@ -243,12 +249,17 @@ public Flowable generateContent(LlmRequest llmRequest, boolean stre .orElse(false)); } else { logger.debug("Sending generateContent request to model {}", effectiveModelName); - return Flowable.fromFuture( - apiClient - .async - .models - .generateContent(effectiveModelName, llmRequest.contents(), config) - .thenApplyAsync(LlmResponse::create)); + final LlmRequest finalLlmRequest = llmRequest; + return Singles.toSingle( + () -> + toListenableFuture( + apiClient + .async + .models + .generateContent(effectiveModelName, finalLlmRequest.contents(), config) + .thenApplyAsync(LlmResponse::create)), + Schedulers.io()) + .toFlowable(); } }