-
Notifications
You must be signed in to change notification settings - Fork 204
/
Copy pathclient.js
1180 lines (1072 loc) · 49.9 KB
/
client.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
* Copyright DataStax, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
'use strict';
const events = require('events');
const util = require('util');
const utils = require('./utils.js');
const errors = require('./errors.js');
const types = require('./types');
const { ProfileManager } = require('./execution-profile');
const requests = require('./requests');
const clientOptions = require('./client-options');
const ClientState = require('./metadata/client-state');
const description = require('../package.json').description;
const { version } = require('../package.json');
const { DefaultExecutionOptions } = require('./execution-options');
const ControlConnection = require('./control-connection');
const RequestHandler = require('./request-handler');
const PrepareHandler = require('./prepare-handler');
const InsightsClient = require('./insights-client');
const cloud = require('./datastax/cloud');
const GraphExecutor = require('./datastax/graph/graph-executor');
const promiseUtils = require('./promise-utils');
/**
* Max amount of pools being warmup in parallel, when warmup is enabled
* @private
*/
const warmupLimit = 32;
/**
* Client options.
* <p>While the driver provides lots of extensibility points and configurability, few client options are required.</p>
* <p>Default values for all settings are designed to be suitable for the majority of use cases, you should avoid
* fine tuning it when not needed.</p>
* <p>See [Client constructor]{@link Client} documentation for recommended options.</p>
* @typedef {Object} ClientOptions
* @property {Array.<string>} contactPoints
* Array of addresses or host names of the nodes to add as contact points.
* <p>
* Contact points are addresses of Cassandra nodes that the driver uses to discover the cluster topology.
* </p>
* <p>
* Only one contact point is required (the driver will retrieve the address of the other nodes automatically),
* but it is usually a good idea to provide more than one contact point, because if that single contact point is
* unavailable, the driver will not be able to initialize correctly.
* </p>
* @property {String} [localDataCenter] The local data center to use.
* <p>
* If using DCAwareRoundRobinPolicy (default), this option is required and only hosts from this data center are
* connected to and used in query plans.
* </p>
* @property {String} [keyspace] The logged keyspace for all the connections created within the {@link Client} instance.
* @property {Object} [credentials] An object containing the username and password for plain-text authentication.
* It configures the authentication provider to be used against Apache Cassandra's PasswordAuthenticator or DSE's
* DseAuthenticator, when default auth scheme is plain-text.
* <p>
* Note that you should configure either <code>credentials</code> or <code>authProvider</code> to connect to an
* auth-enabled cluster, but not both.
* </p>
* @property {String} [credentials.username] The username to use for plain-text authentication.
* @property {String} [credentials.password] The password to use for plain-text authentication.
* @property {Uuid} [id] A unique identifier assigned to a {@link Client} object, that will be communicated to the
* server (DSE 6.0+) to identify the client instance created with this options. When not defined, the driver will
* generate a random identifier.
* @property {String} [applicationName] An optional setting identifying the name of the application using
* the {@link Client} instance.
* <p>This value is passed to DSE and is useful as metadata for describing a client connection on the server side.</p>
* @property {String} [applicationVersion] An optional setting identifying the version of the application using
* the {@link Client} instance.
* <p>This value is passed to DSE and is useful as metadata for describing a client connection on the server side.</p>
* @property {Object} [monitorReporting] Options for reporting mechanism from the client to the DSE server, for
* versions that support it.
* @property {Boolean} [monitorReporting.enabled=true] Determines whether the reporting mechanism is enabled.
* Defaults to <code>true</code>.
* @property {Object} [cloud] The options to connect to a cloud instance.
* @property {String|URL} cloud.secureConnectBundle Determines the file path for the credentials file bundle.
* @property {Number} [refreshSchemaDelay] The default window size in milliseconds used to debounce node list and schema
* refresh metadata requests. Default: 1000.
* @property {Boolean} [isMetadataSyncEnabled] Determines whether client-side schema metadata retrieval and update is
* enabled.
* <p>Setting this value to <code>false</code> will cause keyspace information not to be automatically loaded, affecting
* replica calculation per token in the different keyspaces. When disabling metadata synchronization, use
* [Metadata.refreshKeyspaces()]{@link module:metadata~Metadata#refreshKeyspaces} to keep keyspace information up to
* date or token-awareness will not work correctly.</p>
* Default: <code>true</code>.
* @property {Boolean} [prepareOnAllHosts] Determines if the driver should prepare queries on all hosts in the cluster.
* Default: <code>true</code>.
* @property {Boolean} [rePrepareOnUp] Determines if the driver should re-prepare all cached prepared queries on a
* host when it marks it back up.
* Default: <code>true</code>.
* @property {Number} [maxPrepared] Determines the maximum amount of different prepared queries before evicting items
* from the internal cache. Reaching a high threshold hints that the queries are not being reused, like when
* hard-coding parameter values inside the queries.
* Default: <code>500</code>.
* @property {Object} [policies]
* @property {LoadBalancingPolicy} [policies.loadBalancing] The load balancing policy instance to be used to determine
* the coordinator per query.
* @property {RetryPolicy} [policies.retry] The retry policy.
* @property {ReconnectionPolicy} [policies.reconnection] The reconnection policy to be used.
* @property {AddressTranslator} [policies.addressResolution] The address resolution policy.
* @property {SpeculativeExecutionPolicy} [policies.speculativeExecution] The <code>SpeculativeExecutionPolicy</code>
* instance to be used to determine if the client should send speculative queries when the selected host takes more
* time than expected.
* <p>
* Default: <code>[NoSpeculativeExecutionPolicy]{@link
* module:policies/speculativeExecution~NoSpeculativeExecutionPolicy}</code>
* </p>
* @property {TimestampGenerator} [policies.timestampGeneration] The client-side
* [query timestamp generator]{@link module:policies/timestampGeneration~TimestampGenerator}.
* <p>
* Default: <code>[MonotonicTimestampGenerator]{@link module:policies/timestampGeneration~MonotonicTimestampGenerator}
* </code>
* </p>
* <p>Use <code>null</code> to disable client-side timestamp generation.</p>
* @property {QueryOptions} [queryOptions] Default options for all queries.
* @property {Object} [pooling] Pooling options.
* @property {Number} [pooling.heartBeatInterval] The amount of idle time in milliseconds that has to pass before the
* driver issues a request on an active connection to avoid idle time disconnections. Default: 30000.
* @property {Object} [pooling.coreConnectionsPerHost] Associative array containing amount of connections per host
* distance.
* @property {Number} [pooling.maxRequestsPerConnection] The maximum number of requests per connection. The default
* value is:
* <ul>
* <li>For modern protocol versions (v3 and above): 2048</li>
* <li>For older protocol versions (v1 and v2): 128</li>
* </ul>
* @property {Boolean} [pooling.warmup] Determines if all connections to hosts in the local datacenter must be opened on
* connect. Default: true.
* @property {Object} [protocolOptions]
* @property {Number} [protocolOptions.port] The port to use to connect to the Cassandra host. If not set through this
* method, the default port (9042) will be used instead.
* @property {Number} [protocolOptions.maxSchemaAgreementWaitSeconds] The maximum time in seconds to wait for schema
* agreement between nodes before returning from a DDL query. Default: 10.
* @property {Number} [protocolOptions.maxVersion] When set, it limits the maximum protocol version used to connect to
* the nodes.
* Useful for using the driver against a cluster that contains nodes with different major/minor versions of Cassandra.
* @property {Boolean} [protocolOptions.noCompact] When set to true, enables the NO_COMPACT startup option.
* <p>
* When this option is supplied <code>SELECT</code>, <code>UPDATE</code>, <code>DELETE</code>, and <code>BATCH</code>
* statements on <code>COMPACT STORAGE</code> tables function in "compatibility" mode which allows seeing these tables
* as if they were "regular" CQL tables.
* </p>
* <p>
* This option only effects interactions with interactions with tables using <code>COMPACT STORAGE</code> and is only
* supported by C* 3.0.16+, 3.11.2+, 4.0+ and DSE 6.0+.
* </p>
* @property {Object} [socketOptions]
* @property {Number} [socketOptions.connectTimeout] Connection timeout in milliseconds. Default: 5000.
* @property {Number} [socketOptions.defunctReadTimeoutThreshold] Determines the amount of requests that simultaneously
* have to timeout before closing the connection. Default: 64.
* @property {Boolean} [socketOptions.keepAlive] Whether to enable TCP keep-alive on the socket. Default: true.
* @property {Number} [socketOptions.keepAliveDelay] TCP keep-alive delay in milliseconds. Default: 0.
* @property {Number} [socketOptions.readTimeout] Per-host read timeout in milliseconds.
* <p>
* Please note that this is not the maximum time a call to {@link Client#execute} may have to wait;
* this is the maximum time that call will wait for one particular Cassandra host, but other hosts will be tried if
* one of them timeout. In other words, a {@link Client#execute} call may theoretically wait up to
* <code>readTimeout * number_of_cassandra_hosts</code> (though the total number of hosts tried for a given query also
* depends on the LoadBalancingPolicy in use).
* <p>When setting this value, keep in mind the following:</p>
* <ul>
* <li>the timeout settings used on the Cassandra side (*_request_timeout_in_ms in cassandra.yaml) should be taken
* into account when picking a value for this read timeout. You should pick a value a couple of seconds greater than
* the Cassandra timeout settings.
* </li>
* <li>
* the read timeout is only approximate and only control the timeout to one Cassandra host, not the full query.
* </li>
* </ul>
* Setting a value of 0 disables read timeouts. Default: <code>12000</code>.
* @property {Boolean} [socketOptions.tcpNoDelay] When set to true, it disables the Nagle algorithm. Default: true.
* @property {Number} [socketOptions.coalescingThreshold] Buffer length in bytes use by the write queue before flushing
* the frames. Default: 8000.
* @property {AuthProvider} [authProvider] Provider to be used to authenticate to an auth-enabled cluster.
* @property {RequestTracker} [requestTracker] The instance of RequestTracker used to monitor or log requests executed
* with this instance.
* @property {Object} [sslOptions] Client-to-node ssl options. When set the driver will use the secure layer.
* You can specify cert, ca, ... options named after the Node.js <code>tls.connect()</code> options.
* <p>
* It uses the same default values as Node.js <code>tls.connect()</code> except for <code>rejectUnauthorized</code>
* which is set to <code>false</code> by default (for historical reasons). This setting is likely to change
* in upcoming versions to enable validation by default.
* </p>
* @property {Object} [encoding] Encoding options.
* @property {Function} [encoding.map] Map constructor to use for Cassandra map<k,v> type encoding and decoding.
* If not set, it will default to Javascript Object with map keys as property names.
* @property {Function} [encoding.set] Set constructor to use for Cassandra set<k> type encoding and decoding.
* If not set, it will default to Javascript Array.
* @property {Boolean} [encoding.copyBuffer] Determines if the network buffer should be copied for buffer based data
* types (blob, uuid, timeuuid and inet).
* <p>
* Setting it to true will cause that the network buffer is copied for each row value of those types,
* causing additional allocations but freeing the network buffer to be reused.
* Setting it to true is a good choice for cases where the Row and ResultSet returned by the queries are long-lived
* objects.
* </p>
* <p>
* Setting it to false will cause less overhead and the reference of the network buffer to be maintained until the row
* / result set are de-referenced.
* Default: true.
* </p>
* @property {Boolean} [encoding.useUndefinedAsUnset] Valid for Cassandra 2.2 and above. Determines that, if a parameter
* is set to
* <code>undefined</code> it should be encoded as <code>unset</code>.
* <p>
* By default, ECMAScript <code>undefined</code> is encoded as <code>null</code> in the driver. Cassandra 2.2
* introduced the concept of unset.
* At driver level, you can set a parameter to unset using the field <code>types.unset</code>. Setting this flag to
* true allows you to use ECMAScript undefined as Cassandra <code>unset</code>.
* </p>
* <p>
* Default: true.
* </p>
* @property {Boolean} [encoding.useBigIntAsLong] Use [BigInt ECMAScript type](https://tc39.github.io/proposal-bigint/)
* to represent CQL bigint and counter data types.
* @property {Boolean} [encoding.useBigIntAsVarint] Use [BigInt ECMAScript
* type](https://tc39.github.io/proposal-bigint/) to represent CQL varint data type.
* @property {Array.<ExecutionProfile>} [profiles] The array of [execution profiles]{@link ExecutionProfile}.
* @property {Function} [promiseFactory] Function to be used to create a <code>Promise</code> from a
* callback-style function.
* <p>
* Promise libraries often provide different methods to create a promise. For example, you can use Bluebird's
* <code>Promise.fromCallback()</code> method.
* </p>
* <p>
* By default, the driver will use the
* [Promise constructor]{@link https://developer.mozilla.org/en/docs/Web/JavaScript/Reference/Global_Objects/Promise}.
* </p>
*/
/**
* Query options
* @typedef {Object} QueryOptions
* @property {Boolean} [autoPage] Determines if the driver must retrieve the following result pages automatically.
* <p>
* This setting is only considered by the [Client#eachRow()]{@link Client#eachRow} method. For more information,
* check the
* [paging results documentation]{@link https://docs.datastax.com/en/developer/nodejs-driver/latest/features/paging/}.
* </p>
* @property {Boolean} [captureStackTrace] Determines if the stack trace before the query execution should be
* maintained.
* <p>
* Useful for debugging purposes, it should be set to <code>false</code> under production environment as it adds an
* unnecessary overhead to each execution.
* </p>
* Default: false.
* @property {Number} [consistency] [Consistency level]{@link module:types~consistencies}.
* <p>
* Defaults to <code>localOne</code> for Apache Cassandra and DSE deployments.
* For DataStax Astra, it defaults to <code>localQuorum</code>.
* </p>
* @property {Object} [customPayload] Key-value payload to be passed to the server. On the Cassandra side,
* implementations of QueryHandler can use this data.
* @property {String} [executeAs] The user or role name to act as when executing this statement.
* <p>When set, it executes as a different user/role than the one currently authenticated (a.k.a. proxy execution).</p>
* <p>This feature is only available in DSE 5.1+.</p>
* @property {String|ExecutionProfile} [executionProfile] Name or instance of the [profile]{@link ExecutionProfile} to
* be used for this execution. If not set, it will the use "default" execution profile.
* @property {Number} [fetchSize] Amount of rows to retrieve per page.
* @property {Array|Array<Array>} [hints] Type hints for parameters given in the query, ordered as for the parameters.
* <p>For batch queries, an array of such arrays, ordered as with the queries in the batch.</p>
* @property {Host} [host] The host that should handle the query.
* <p>
* Use of this option is <em>heavily discouraged</em> and should only be used in the following cases:
* </p>
* <ol>
* <li>
* Querying node-local tables, such as tables in the <code>system</code> and <code>system_views</code>
* keyspaces.
* </li>
* <li>
* Applying a series of schema changes, where it may be advantageous to execute schema changes in sequence on the
* same node.
* </li>
* </ol>
* <p>
* Configuring a specific host causes the configured
* [LoadBalancingPolicy]{@link module:policies/loadBalancing~LoadBalancingPolicy} to be completely bypassed.
* However, if the load balancing policy dictates that the host is at a
* [distance of ignored]{@link module:types~distance} or there is no active connectivity to the host, the request will
* fail with a [NoHostAvailableError]{@link module:errors~NoHostAvailableError}.
* </p>
* @property {Boolean} [isIdempotent] Defines whether the query can be applied multiple times without changing the result
* beyond the initial application.
* <p>
* The query execution idempotence can be used at [RetryPolicy]{@link module:policies/retry~RetryPolicy} level to
* determine if an statement can be retried in case of request error or write timeout.
* </p>
* <p>Default: <code>false</code>.</p>
* @property {String} [keyspace] Specifies the keyspace for the query. It is used for the following:
* <ol>
* <li>To indicate what keyspace the statement is applicable to (protocol V5+ only). This is useful when the
* query does not provide an explicit keyspace and you want to override the current {@link Client#keyspace}.</li>
* <li>For query routing when the query operates on a different keyspace than the current {@link Client#keyspace}.</li>
* </ol>
* @property {Boolean} [logged] Determines if the batch should be written to the batchlog. Only valid for
* [Client#batch()]{@link Client#batch}, it will be ignored by other methods. Default: true.
* @property {Boolean} [counter] Determines if its a counter batch. Only valid for
* [Client#batch()]{@link Client#batch}, it will be ignored by other methods. Default: false.
* @property {Buffer|String} [pageState] Buffer or string token representing the paging state.
* <p>Useful for manual paging, if provided, the query will be executed starting from a given paging state.</p>
* @property {Boolean} [prepare] Determines if the query must be executed as a prepared statement.
* @property {Number} [readTimeout] When defined, it overrides the default read timeout
* (<code>socketOptions.readTimeout</code>) in milliseconds for this execution per coordinator.
* <p>
* Suitable for statements for which the coordinator may allow a longer server-side timeout, for example aggregation
* queries.
* </p>
* <p>
* A value of <code>0</code> disables client side read timeout for the execution. Default: <code>undefined</code>.
* </p>
* @property {RetryPolicy} [retry] Retry policy for the query.
* <p>
* This property can be used to specify a different [retry policy]{@link module:policies/retry} to the one specified
* in the {@link ClientOptions}.policies.
* </p>
* @property {Array} [routingIndexes] Index of the parameters that are part of the partition key to determine
* the routing.
* @property {Buffer|Array} [routingKey] Partition key(s) to determine which coordinator should be used for the query.
* @property {Array} [routingNames] Array of the parameters names that are part of the partition key to determine the
* routing. Only valid for non-prepared requests, it's recommended that you use the prepare flag instead.
* @property {Number} [serialConsistency] Serial consistency is the consistency level for the serial phase of
* conditional updates.
* This option will be ignored for anything else that a conditional update/insert.
* @property {Number|Long} [timestamp] The default timestamp for the query in microseconds from the unix epoch
* (00:00:00, January 1st, 1970).
* <p>If provided, this will replace the server side assigned timestamp as default timestamp.</p>
* <p>Use [generateTimestamp()]{@link module:types~generateTimestamp} utility method to generate a valid timestamp
* based on a Date and microseconds parts.</p>
* @property {Boolean} [traceQuery] Enable query tracing for the execution. Use query tracing to diagnose performance
* problems related to query executions. Default: false.
* <p>To retrieve trace, you can call [Metadata.getTrace()]{@link module:metadata~Metadata#getTrace} method.</p>
* @property {Object} [graphOptions] Default options for graph query executions.
* <p>
* These options are meant to provide defaults for all graph query executions. Consider using
* [execution profiles]{@link ExecutionProfile} if you plan to reuse different set of options across different
* query executions.
* </p>
* @property {String} [graphOptions.language] The graph language to use in graph queries. Default:
* <code>'gremlin-groovy'</code>.
* @property {String} [graphOptions.name] The graph name to be used in all graph queries.
* <p>
* This property is required but there is no default value for it. This value can be overridden at query level.
* </p>
* @property {Number} [graphOptions.readConsistency] Overrides the
* [consistency level]{@link module:types~consistencies}
* defined in the query options for graph read queries.
* @property {Number} [graphOptions.readTimeout] Overrides the default per-host read timeout (in milliseconds) for all
* graph queries. Default: <code>0</code>.
* <p>
* Use <code>null</code> to reset the value and use the default on <code>socketOptions.readTimeout</code> .
* </p>
* @property {String} [graphOptions.source] The graph traversal source name to use in graph queries. Default:
* <code>'g'</code>.
* @property {Number} [graphOptions.writeConsistency] Overrides the [consistency
* level]{@link module:types~consistencies} defined in the query options for graph write queries.
*/
/**
* Creates a new instance of {@link Client}.
* @classdesc
* Represents a database client that maintains multiple connections to the cluster nodes, providing methods to
* execute CQL statements.
* <p>
* The <code>Client</code> uses [policies]{@link module:policies} to decide which nodes to connect to, which node
* to use per each query execution, when it should retry failed or timed-out executions and how reconnection to down
* nodes should be made.
* </p>
* @extends EventEmitter
* @param {ClientOptions} options The options for this instance.
* @example <caption>Creating a new client instance</caption>
* const client = new Client({
* contactPoints: ['10.0.1.101', '10.0.1.102'],
* localDataCenter: 'datacenter1'
* });
* @example <caption>Executing a query</caption>
* const result = await client.connect();
* console.log(`Connected to ${client.hosts.length} nodes in the cluster: ${client.hosts.keys().join(', ')}`);
* @example <caption>Executing a query</caption>
* const result = await client.execute('SELECT key FROM system.local');
* const row = result.first();
* console.log(row['key']);
* @constructor
*/
function Client(options) {
events.EventEmitter.call(this);
this.options = clientOptions.extend({ logEmitter: this.emit.bind(this), id: types.Uuid.random() }, options);
Object.defineProperty(this, 'profileManager', { value: new ProfileManager(this.options) });
Object.defineProperty(this, 'controlConnection', {
value: new ControlConnection(this.options, this.profileManager), writable: true }
);
Object.defineProperty(this, 'insightsClient', { value: new InsightsClient(this)});
//Unlimited amount of listeners for internal event queues by default
this.setMaxListeners(0);
this.connected = false;
this.isShuttingDown = false;
/**
* Gets the name of the active keyspace.
* @type {String}
*/
this.keyspace = options.keyspace;
/**
* Gets the schema and cluster metadata information.
* @type {Metadata}
*/
this.metadata = this.controlConnection.metadata;
/**
* Gets an associative array of cluster hosts.
* @type {HostMap}
*/
this.hosts = this.controlConnection.hosts;
/**
* The [ClientMetrics]{@link module:metrics~ClientMetrics} instance used to expose measurements of its internal
* behavior and of the server as seen from the driver side.
* <p>By default, a [DefaultMetrics]{@link module:metrics~DefaultMetrics} instance is used.</p>
* @type {ClientMetrics}
*/
this.metrics = this.options.metrics;
this._graphExecutor = new GraphExecutor(this, options, this._execute);
}
util.inherits(Client, events.EventEmitter);
/**
* Emitted when a new host is added to the cluster.
* <ul>
* <li>{@link Host} The host being added.</li>
* </ul>
* @event Client#hostAdd
*/
/**
* Emitted when a host is removed from the cluster
* <ul>
* <li>{@link Host} The host being removed.</li>
* </ul>
* @event Client#hostRemove
*/
/**
* Emitted when a host in the cluster changed status from down to up.
* <ul>
* <li>{@link Host host} The host that changed the status.</li>
* </ul>
* @event Client#hostUp
*/
/**
* Emitted when a host in the cluster changed status from up to down.
* <ul>
* <li>{@link Host host} The host that changed the status.</li>
* </ul>
* @event Client#hostDown
*/
/**
* Attempts to connect to one of the [contactPoints]{@link ClientOptions} and discovers the rest the nodes of the
* cluster.
* <p>When the {@link Client} is already connected, it resolves immediately.</p>
* <p>It returns a <code>Promise</code> when a <code>callback</code> is not provided.</p>
* @param {function} [callback] The optional callback that is invoked when the pool is connected or it failed to
* connect.
* @example <caption>Usage example</caption>
* await client.connect();
*/
Client.prototype.connect = function (callback) {
if (this.connected && callback) {
// Avoid creating Promise to immediately resolve them
return callback();
}
return promiseUtils.optionalCallback(this._connect(), callback);
};
/**
* Async-only version of {@link Client#connect()}.
* @private
*/
Client.prototype._connect = async function () {
if (this.connected) {
return;
}
if (this.isShuttingDown) {
//it is being shutdown, don't allow further calls to connect()
throw new errors.NoHostAvailableError(null, 'Connecting after shutdown is not supported');
}
if (this.connecting) {
return promiseUtils.fromEvent(this, 'connected');
}
this.connecting = true;
this.log('info', util.format("Connecting to cluster using '%s' version %s", description, version));
try {
await cloud.init(this.options);
await this.controlConnection.init();
this.hosts = this.controlConnection.hosts;
await this.profileManager.init(this, this.hosts);
if (this.keyspace) {
await RequestHandler.setKeyspace(this);
}
clientOptions.setMetadataDependent(this);
await this._warmup();
} catch (err) {
// We should close the pools (if any) and reset the state to allow successive calls to connect()
await this.controlConnection.reset();
this.connected = false;
this.connecting = false;
this.emit('connected', err);
throw err;
}
this._setHostListeners();
// Set the distance of the control connection host relatively to this instance
this.profileManager.getDistance(this.controlConnection.host);
this.insightsClient.init();
this.connected = true;
this.connecting = false;
this.emit('connected');
};
/**
* Executes a query on an available connection.
* <p>The query can be prepared (recommended) or not depending on the [prepare]{@linkcode QueryOptions} flag.</p>
* <p>
* Some execution failures can be handled transparently by the driver, according to the
* [RetryPolicy]{@linkcode module:policies/retry~RetryPolicy} or the
* [SpeculativeExecutionPolicy]{@linkcode module:policies/speculativeExecution} used.
* </p>
* <p>It returns a <code>Promise</code> when a <code>callback</code> is not provided.</p>
* @param {String} query The query to execute.
* @param {Array|Object} [params] Array of parameter values or an associative array (object) containing parameter names
* as keys and its value.
* @param {QueryOptions} [options] The query options for the execution.
* @param {ResultCallback} [callback] Executes callback(err, result) when execution completed. When not defined, the
* method will return a promise.
* @example <caption>Promise-based API, using async/await</caption>
* const query = 'SELECT name, email FROM users WHERE id = ?';
* const result = await client.execute(query, [ id ], { prepare: true });
* const row = result.first();
* console.log('%s: %s', row['name'], row['email']);
* @example <caption>Callback-based API</caption>
* const query = 'SELECT name, email FROM users WHERE id = ?';
* client.execute(query, [ id ], { prepare: true }, function (err, result) {
* assert.ifError(err);
* const row = result.first();
* console.log('%s: %s', row['name'], row['email']);
* });
* @see {@link ExecutionProfile} to reuse a set of options across different query executions.
*/
Client.prototype.execute = function (query, params, options, callback) {
// This method acts as a wrapper for the async method _execute()
if (!callback) {
// Set default argument values for optional parameters
if (typeof options === 'function') {
callback = options;
options = null;
} else if (typeof params === 'function') {
callback = params;
params = null;
}
}
try {
const execOptions = DefaultExecutionOptions.create(options, this);
return promiseUtils.optionalCallback(this._execute(query, params, execOptions), callback);
}
catch (err) {
// There was an error when parsing the user options
if (callback) {
return callback(err);
}
return Promise.reject(err);
}
};
/**
* Executes a graph query.
* <p>It returns a <code>Promise</code> when a <code>callback</code> is not provided.</p>
* @param {String} query The gremlin query.
* @param {Object|null} [parameters] An associative array containing the key and values of the parameters.
* @param {GraphQueryOptions|null} [options] The graph query options.
* @param {Function} [callback] Function to execute when the response is retrieved, taking two arguments:
* <code>err</code> and <code>result</code>. When not defined, the method will return a promise.
* @example <caption>Promise-based API, using async/await</caption>
* const result = await client.executeGraph('g.V()');
* // Get the first item (vertex, edge, scalar value, ...)
* const vertex = result.first();
* console.log(vertex.label);
* @example <caption>Callback-based API</caption>
* client.executeGraph('g.V()', (err, result) => {
* const vertex = result.first();
* console.log(vertex.label);
* });
* @example <caption>Iterating through the results</caption>
* const result = await client.executeGraph('g.E()');
* for (let edge of result) {
* console.log(edge.label); // created
* });
* @example <caption>Using result.forEach()</caption>
* const result = await client.executeGraph('g.V().hasLabel("person")');
* result.forEach(function(vertex) {
* console.log(vertex.type); // vertex
* console.log(vertex.label); // person
* });
* @see {@link ExecutionProfile} to reuse a set of options across different query executions.
*/
Client.prototype.executeGraph = function (query, parameters, options, callback) {
callback = callback || (options ? options : parameters);
if (typeof callback === 'function') {
parameters = typeof parameters !== 'function' ? parameters : null;
return promiseUtils.toCallback(this._graphExecutor.send(query, parameters, options), callback);
}
return this._graphExecutor.send(query, parameters, options);
};
/**
* Executes the query and calls <code>rowCallback</code> for each row as soon as they are received. Calls the final
* <code>callback</code> after all rows have been sent, or when there is an error.
* <p>
* The query can be prepared (recommended) or not depending on the [prepare]{@linkcode QueryOptions} flag.
* </p>
* @param {String} query The query to execute
* @param {Array|Object} [params] Array of parameter values or an associative array (object) containing parameter names
* as keys and its value.
* @param {QueryOptions} [options] The query options.
* @param {function} rowCallback Executes <code>rowCallback(n, row)</code> per each row received, where n is the row
* index and row is the current Row.
* @param {function} [callback] Executes <code>callback(err, result)</code> after all rows have been received.
* <p>
* When dealing with paged results, [ResultSet#nextPage()]{@link module:types~ResultSet#nextPage} method can be used
* to retrieve the following page. In that case, <code>rowCallback()</code> will be again called for each row and
* the final callback will be invoked when all rows in the following page has been retrieved.
* </p>
* @example <caption>Using per-row callback and arrow functions</caption>
* client.eachRow(query, params, { prepare: true }, (n, row) => console.log(n, row), err => console.error(err));
* @example <caption>Overloads</caption>
* client.eachRow(query, rowCallback);
* client.eachRow(query, params, rowCallback);
* client.eachRow(query, params, options, rowCallback);
* client.eachRow(query, params, rowCallback, callback);
* client.eachRow(query, params, options, rowCallback, callback);
*/
Client.prototype.eachRow = function (query, params, options, rowCallback, callback) {
if (!callback && rowCallback && typeof options === 'function') {
callback = utils.validateFn(rowCallback, 'rowCallback');
rowCallback = options;
} else {
callback = callback || utils.noop;
rowCallback = utils.validateFn(rowCallback || options || params, 'rowCallback');
}
params = typeof params !== 'function' ? params : null;
let execOptions;
try {
execOptions = DefaultExecutionOptions.create(options, this, rowCallback);
}
catch (e) {
return callback(e);
}
let rowLength = 0;
const nextPage = () => promiseUtils.toCallback(this._execute(query, params, execOptions), pageCallback);
function pageCallback (err, result) {
if (err) {
return callback(err);
}
// Next requests in case paging (auto or explicit) is used
rowLength += result.rowLength;
if (result.rawPageState !== undefined) {
// Use new page state as next request page state
execOptions.setPageState(result.rawPageState);
if (execOptions.isAutoPage()) {
// Issue next request for the next page
return nextPage();
}
// Allows for explicit (manual) paging, in case the caller needs it
result.nextPage = nextPage;
}
// Finished auto-paging
result.rowLength = rowLength;
callback(null, result);
}
promiseUtils.toCallback(this._execute(query, params, execOptions), pageCallback);
};
/**
* Executes the query and pushes the rows to the result stream as soon as they received.
* <p>
* The stream is a [ReadableStream]{@linkcode https://nodejs.org/api/stream.html#stream_class_stream_readable} object
* that emits rows.
* It can be piped downstream and provides automatic pause/resume logic (it buffers when not read).
* </p>
* <p>
* The query can be prepared (recommended) or not depending on {@link QueryOptions}.prepare flag. Retries on multiple
* hosts if needed.
* </p>
* @param {String} query The query to prepare and execute.
* @param {Array|Object} [params] Array of parameter values or an associative array (object) containing parameter names
* as keys and its value
* @param {QueryOptions} [options] The query options.
* @param {function} [callback] executes callback(err) after all rows have been received or if there is an error
* @returns {ResultStream}
*/
Client.prototype.stream = function (query, params, options, callback) {
callback = callback || utils.noop;
// NOTE: the nodejs stream maintains yet another internal buffer
// we rely on the default stream implementation to keep memory
// usage reasonable.
const resultStream = new types.ResultStream({ objectMode: 1 });
function onFinish(err, result) {
if (err) {
resultStream.emit('error', err);
}
if (result && result.nextPage ) {
// allows for throttling as per the
// default nodejs stream implementation
resultStream._valve(function pageValve() {
try {
result.nextPage();
}
catch( ex ) {
resultStream.emit('error', ex );
}
});
return;
}
// Explicitly dropping the valve (closure)
resultStream._valve(null);
resultStream.add(null);
callback(err);
}
let sync = true;
this.eachRow(query, params, options, function rowCallback(n, row) {
resultStream.add(row);
}, function eachRowFinished(err, result) {
if (sync) {
// Prevent sync callback
return setImmediate(function eachRowFinishedImmediate() {
onFinish(err, result);
});
}
onFinish(err, result);
});
sync = false;
return resultStream;
};
/**
* Executes batch of queries on an available connection to a host.
* <p>It returns a <code>Promise</code> when a <code>callback</code> is not provided.</p>
* @param {Array.<string>|Array.<{query, params}>} queries The queries to execute as an Array of strings or as an array
* of object containing the query and params
* @param {QueryOptions} [options] The query options.
* @param {ResultCallback} [callback] Executes callback(err, result) when the batch was executed
*/
Client.prototype.batch = function (queries, options, callback) {
if (!callback && typeof options === 'function') {
callback = options;
options = null;
}
return promiseUtils.optionalCallback(this._batch(queries, options), callback);
};
/**
* Async-only version of {@link Client#batch()} .
* @param {Array.<string>|Array.<{query, params}>}queries
* @param {QueryOptions} options
* @returns {Promise<ResultSet>}
* @private
*/
Client.prototype._batch = async function (queries, options) {
if (!Array.isArray(queries)) {
throw new errors.ArgumentError('Queries should be an Array');
}
if (queries.length === 0) {
throw new errors.ArgumentError('Queries array should not be empty');
}
await this._connect();
const execOptions = DefaultExecutionOptions.create(options, this);
let queryItems;
if (execOptions.isPrepared()) {
// use keyspace from query options if protocol supports per-query keyspace, otherwise use connection keyspace.
const version = this.controlConnection.protocolVersion;
const queryKeyspace = types.protocolVersion.supportsKeyspaceInRequest(version) && options.keyspace || this.keyspace;
queryItems = await PrepareHandler.getPreparedMultiple(
this, execOptions.getLoadBalancingPolicy(), queries, queryKeyspace);
} else {
queryItems = new Array(queries.length);
for (let i = 0; i < queries.length; i++) {
const item = queries[i];
if (!item) {
throw new errors.ArgumentError(`Invalid query at index ${i}`);
}
const query = typeof item === 'string' ? item : item.query;
if (!query) {
throw new errors.ArgumentError(`Invalid query at index ${i}`);
}
queryItems[i] = { query, params: item.params };
}
}
const request = await this._createBatchRequest(queryItems, execOptions);
return await RequestHandler.send(request, execOptions, this);
};
/**
* Gets the host that are replicas of a given token.
* @param {String} keyspace
* @param {Buffer} token
* @returns {Array<Host>}
*/
Client.prototype.getReplicas = function (keyspace, token) {
return this.metadata.getReplicas(keyspace, token);
};
/**
* Gets a snapshot containing information on the connections pools held by this Client at the current time.
* <p>
* The information provided in the returned object only represents the state at the moment this method was called and
* it's not maintained in sync with the driver metadata.
* </p>
* @returns {ClientState} A [ClientState]{@linkcode module:metadata~ClientState} instance.
*/
Client.prototype.getState = function () {
return ClientState.from(this);
};
Client.prototype.log = utils.log;
/**
* Closes all connections to all hosts.
* <p>It returns a <code>Promise</code> when a <code>callback</code> is not provided.</p>
* @param {Function} [callback] Optional callback to be invoked when finished closing all connections.
*/
Client.prototype.shutdown = function (callback) {
return promiseUtils.optionalCallback(this._shutdown(), callback);
};
/** @private */
Client.prototype._shutdown = async function () {
this.log('info', 'Shutting down');
if (!this.hosts || !this.connected) {
// not initialized
this.connected = false;
return;
}
if (this.connecting) {
this.log('warning', 'Shutting down while connecting');
// wait until finish connecting for easier troubleshooting
await promiseUtils.fromEvent(this, 'connected');
}
this.connected = false;
this.isShuttingDown = true;
const hosts = this.hosts.values();
this.insightsClient.shutdown();
// Shutdown the ControlConnection before shutting down the pools
this.controlConnection.shutdown();
this.options.policies.speculativeExecution.shutdown();
if (this.options.requestTracker) {
this.options.requestTracker.shutdown();
}
// go through all the host and shut down their pools
await Promise.all(hosts.map(h => h.shutdown(false)));
};
/**
* Waits until that the schema version in all nodes is the same or the waiting time passed.
* @param {Connection} connection
* @returns {Promise<boolean>}
* @ignore
*/
Client.prototype._waitForSchemaAgreement = async function (connection) {
if (this.hosts.length === 1) {
return true;
}
const start = process.hrtime();
const maxWaitSeconds = this.options.protocolOptions.maxSchemaAgreementWaitSeconds;
this.log('info', 'Waiting for schema agreement');
let versionsMatch;
while (!versionsMatch && process.hrtime(start)[0] < maxWaitSeconds) {
versionsMatch = await this.metadata.compareSchemaVersions(connection);
if (versionsMatch) {
this.log('info', 'Schema versions match');
break;
}
// Let some time pass before the next check
await promiseUtils.delay(500);
}
return versionsMatch;
};
/**
* Waits for schema agreements and schedules schema metadata refresh.
* @param {Connection} connection
* @param event
* @returns {Promise<boolean>}
* @ignore
* @internal
*/
Client.prototype.handleSchemaAgreementAndRefresh = async function (connection, event) {
let agreement = false;
try {
agreement = await this._waitForSchemaAgreement(connection);
} catch (err) {
//we issue a warning but we continue with the normal flow
this.log('warning', 'There was an error while waiting for the schema agreement between nodes', err);
}
if (!this.options.isMetadataSyncEnabled) {
return agreement;
}
// Refresh metadata immediately
try {
await this.controlConnection.handleSchemaChange(event, true);
} catch (err) {
this.log('warning', 'There was an error while handling schema change', err);
}
return agreement;
};
/**
* Connects and handles the execution of prepared and simple statements.
* @param {string} query
* @param {Array} params
* @param {ExecutionOptions} execOptions
* @returns {Promise<ResultSet>}
* @private
*/
Client.prototype._execute = async function (query, params, execOptions) {
const version = this.controlConnection.protocolVersion;
if (!execOptions.isPrepared() && params && !Array.isArray(params) &&
!types.protocolVersion.supportsNamedParameters(version)) {
// Only Cassandra 2.1 and above supports named parameters
throw new errors.ArgumentError('Named parameters for simple statements are not supported, use prepare flag');
}
let request;
if (!this.connected) {
// Micro optimization to avoid an async execution for a simple check
await this._connect();