1818import io .dapr .client .domain .ConfigurationItem ;
1919import io .dapr .client .domain .GetConfigurationRequest ;
2020import io .dapr .client .domain .SubscribeConfigurationRequest ;
21+ import io .dapr .client .domain .SubscribeConfigurationResponse ;
22+ import io .dapr .client .domain .UnsubscribeConfigurationResponse ;
2123import reactor .core .Disposable ;
2224import reactor .core .publisher .Flux ;
2325import reactor .core .publisher .Mono ;
2426
2527import java .io .IOException ;
2628import java .util .ArrayList ;
2729import java .util .Arrays ;
30+ import java .util .Collections ;
2831import java .util .List ;
2932import java .util .Map ;
3033import java .util .concurrent .atomic .AtomicReference ;
@@ -43,41 +46,8 @@ public class ConfigurationClient {
4346 public static void main (String [] args ) throws Exception {
4447 try (DaprPreviewClient client = (new DaprClientBuilder ()).buildPreviewClient ()) {
4548 System .out .println ("Using preview client..." );
46- getConfigurationForaSingleKey (client );
47- getConfigurationsUsingVarargs (client );
4849 getConfigurations (client );
49- subscribeConfigurationRequestWithSubscribe (client );
50- }
51- }
52-
53- /**
54- * Gets configuration for a single key.
55- *
56- * @param client DaprPreviewClient object
57- */
58- public static void getConfigurationForaSingleKey (DaprPreviewClient client ) {
59- System .out .println ("*******trying to retrieve configuration given a single key********" );
60- try {
61- Mono <ConfigurationItem > item = client .getConfiguration (CONFIG_STORE_NAME , keys .get (0 ));
62- System .out .println ("Value ->" + item .block ().getValue () + " key ->" + item .block ().getKey ());
63- } catch (Exception ex ) {
64- System .out .println (ex .getMessage ());
65- }
66- }
67-
68- /**
69- * Gets configurations for varibale no. of arguments.
70- *
71- * @param client DaprPreviewClient object
72- */
73- public static void getConfigurationsUsingVarargs (DaprPreviewClient client ) {
74- System .out .println ("*******trying to retrieve configurations for a variable no. of keys********" );
75- try {
76- Mono <Map <String , ConfigurationItem >> items =
77- client .getConfiguration (CONFIG_STORE_NAME , "myconfig1" , "myconfig3" );
78- items .block ().forEach ((k ,v ) -> print (v , k ));
79- } catch (Exception ex ) {
80- System .out .println (ex .getMessage ());
50+ subscribeConfigurationRequest (client );
8151 }
8252 }
8353
@@ -106,36 +76,25 @@ public static void getConfigurations(DaprPreviewClient client) {
10676 *
10777 * @param client DaprPreviewClient object
10878 */
109- public static void subscribeConfigurationRequestWithSubscribe (DaprPreviewClient client ) {
110- System .out .println ("*****Subscribing to keys using subscribe method: " + keys .toString () + " *****" );
111- AtomicReference <Disposable > disposableAtomicReference = new AtomicReference <>();
112- SubscribeConfigurationRequest req = new SubscribeConfigurationRequest (CONFIG_STORE_NAME , keys );
79+ public static void subscribeConfigurationRequest (DaprPreviewClient client ) {
80+ System .out .println ("Subscribing to key: myconfig1" );
81+ SubscribeConfigurationRequest req = new SubscribeConfigurationRequest (
82+ CONFIG_STORE_NAME , Collections .singletonList ("myconfig1" ));
83+ Flux <SubscribeConfigurationResponse > outFlux = client .subscribeConfiguration (req );
11384 Runnable subscribeTask = () -> {
114- Flux <Map <String , ConfigurationItem >> outFlux = client .subscribeToConfiguration (req );
115- disposableAtomicReference .set (outFlux
116- .subscribe (
117- cis -> cis .forEach ((k ,v ) -> print (v , k ))
118- ));
85+ outFlux .subscribe (cis -> {
86+ System .out .println ("subscription ID : " + cis .getSubscriptionId ());
87+ System .out .println ("subscribing to key myconfig1 is successful" );
88+ });
11989 };
12090 new Thread (subscribeTask ).start ();
91+ // To ensure main thread does not die before outFlux subscribe gets called
92+ inducingSleepTime (5000 );
93+ }
94+
95+ private static void inducingSleepTime (int timeInMillis ) {
12196 try {
122- // To ensure that subscribeThread gets scheduled
123- Thread .sleep (0 );
124- } catch (InterruptedException e ) {
125- e .printStackTrace ();
126- }
127- Runnable updateKeys = () -> {
128- int i = 1 ;
129- while (i <= 3 ) {
130- executeDockerCommand (i );
131- i ++;
132- }
133- };
134- new Thread (updateKeys ).start ();
135- try {
136- // To ensure main thread does not die before outFlux subscribe gets called
137- Thread .sleep (10000 );
138- disposableAtomicReference .get ().dispose ();
97+ Thread .sleep (timeInMillis );
13998 } catch (InterruptedException e ) {
14099 e .printStackTrace ();
141100 }
@@ -144,22 +103,4 @@ public static void subscribeConfigurationRequestWithSubscribe(DaprPreviewClient
144103 private static void print (ConfigurationItem item , String key ) {
145104 System .out .println (item .getValue () + " : key ->" + key );
146105 }
147-
148- private static void executeDockerCommand (int postfix ) {
149- String [] command = new String [] {
150- "docker" , "exec" , "dapr_redis" , "redis-cli" ,
151- "SET" ,
152- "myconfig" + postfix , "update_myconfigvalue" + postfix + "||2"
153- };
154- ProcessBuilder processBuilder = new ProcessBuilder (command );
155- Process process = null ;
156- try {
157- process = processBuilder .start ();
158- process .waitFor ();
159- } catch (IOException e ) {
160- e .printStackTrace ();
161- } catch (InterruptedException e ) {
162- e .printStackTrace ();
163- }
164- }
165106}
0 commit comments