11package org .tarantool ;
22
3- import java .io .DataInputStream ;
4- import java .io .DataOutputStream ;
3+ import org .tarantool .protocol .ProtoUtils ;
4+ import org .tarantool .protocol .TarantoolGreeting ;
5+
56import java .io .IOException ;
6- import java .io .OutputStream ;
77import java .net .Socket ;
8- import java .nio .ByteBuffer ;
98import java .nio .channels .SocketChannel ;
10- import java .security .MessageDigest ;
11- import java .security .NoSuchAlgorithmException ;
12- import java .util .ArrayList ;
13- import java .util .EnumMap ;
14- import java .util .LinkedHashMap ;
159import java .util .List ;
16- import java .util .Map ;
1710import java .util .concurrent .atomic .AtomicLong ;
1811
1912public abstract class TarantoolBase <Result > extends AbstractTarantoolOps <Integer , List <?>, Object , Result > {
20- protected static final String WELCOME = "Tarantool " ;
2113 protected String serverVersion ;
2214 /**
2315 * Connection state
2416 */
25- protected String salt ;
2617 protected MsgPackLite msgPackLite = MsgPackLite .INSTANCE ;
2718 protected AtomicLong syncId = new AtomicLong ();
2819 protected int initialRequestSize = 4096 ;
29- /**
30- * Read properties
31- */
32- protected DataInputStream is ;
33- protected CountInputStream cis ;
34- protected Map <Integer , Object > headers ;
35- protected Map <Integer , Object > body ;
3620
3721 public TarantoolBase () {
3822 }
3923
4024 public TarantoolBase (String username , String password , Socket socket ) {
4125 super ();
4226 try {
43- this .is = new DataInputStream (cis = new CountInputStreamImpl (socket .getInputStream ()));
44- byte [] bytes = new byte [64 ];
45- is .readFully (bytes );
46- String firstLine = new String (bytes );
47- if (!firstLine .startsWith (WELCOME )) {
48- close ();
49- throw new CommunicationException ("Welcome message should starts with tarantool but starts with '" + firstLine + "'" , new IllegalStateException ("Invalid welcome packet" ));
50- }
51- serverVersion = firstLine .substring (WELCOME .length ());
52- is .readFully (bytes );
53- this .salt = new String (bytes );
54- if (username != null && password != null ) {
55- ByteBuffer authPacket = createAuthPacket (username , password );
56- OutputStream os = socket .getOutputStream ();
57- os .write (authPacket .array (), 0 , authPacket .remaining ());
58- os .flush ();
59- readPacket (is );
60- Long code = (Long ) headers .get (Key .CODE .getId ());
61- if (code != 0 ) {
62- throw serverError (code , body .get (Key .ERROR .getId ()));
63- }
64- }
27+ TarantoolGreeting greeting = ProtoUtils .connect (socket , username , password );
28+ this .serverVersion = greeting .getServerVersion ();
6529 } catch (IOException e ) {
66- try {
67- is .close ();
68- } catch (IOException ignored ) {
69-
70- }
71- try {
72- cis .close ();
73- } catch (IOException ignored ) {
74-
75- }
7630 throw new CommunicationException ("Couldn't connect to tarantool" , e );
7731 }
7832 }
7933
80-
81- protected ByteBuffer createAuthPacket (String username , final String password ) throws IOException {
82- final MessageDigest sha1 ;
83- try {
84- sha1 = MessageDigest .getInstance ("SHA-1" );
85- } catch (NoSuchAlgorithmException e ) {
86- throw new IllegalStateException (e );
87- }
88- List auth = new ArrayList (2 );
89- auth .add ("chap-sha1" );
90-
91- byte [] p = sha1 .digest (password .getBytes ());
92-
93- sha1 .reset ();
94- byte [] p2 = sha1 .digest (p );
95-
96- sha1 .reset ();
97- sha1 .update (Base64 .decode (salt ), 0 , 20 );
98- sha1 .update (p2 );
99- byte [] scramble = sha1 .digest ();
100- for (int i = 0 , e = 20 ; i < e ; i ++) {
101- p [i ] ^= scramble [i ];
102- }
103- auth .add (p );
104- return createPacket (Code .AUTH , 0L , null , Key .USER_NAME , username , Key .TUPLE , auth );
105- }
106-
107- protected ByteBuffer createPacket (Code code , Long syncId , Long schemaId , Object ... args ) throws IOException {
108- TarantoolClientImpl .ByteArrayOutputStream bos = new TarantoolClientImpl .ByteArrayOutputStream (initialRequestSize );
109- bos .write (new byte [5 ]);
110- DataOutputStream ds = new DataOutputStream (bos );
111- Map <Key , Object > header = new EnumMap <Key , Object >(Key .class );
112- Map <Key , Object > body = new EnumMap <Key , Object >(Key .class );
113- header .put (Key .CODE , code );
114- header .put (Key .SYNC , syncId );
115- if (schemaId != null ) {
116- header .put (Key .SCHEMA_ID , schemaId );
117- }
118- if (args != null ) {
119- for (int i = 0 , e = args .length ; i < e ; i += 2 ) {
120- Object value = args [i + 1 ];
121- body .put ((Key ) args [i ], value );
122- }
123- }
124- msgPackLite .pack (header , ds );
125- msgPackLite .pack (body , ds );
126- ds .flush ();
127- ByteBuffer buffer = bos .toByteBuffer ();
128- buffer .put (0 , (byte ) 0xce );
129- buffer .putInt (1 , bos .size () - 5 );
130- return buffer ;
131- }
132-
133- protected void readPacket (DataInputStream is ) throws IOException {
134- int size = ((Number ) msgPackLite .unpack (is )).intValue ();
135- long mark = cis .getBytesRead ();
136- headers = (Map <Integer , Object >) msgPackLite .unpack (is );
137- if (cis .getBytesRead () - mark < size ) {
138- body = (Map <Integer , Object >) msgPackLite .unpack (is );
139- }
140- is .skipBytes ((int ) (cis .getBytesRead () - mark - size ));
141- }
142-
14334 protected static class SQLMetaData {
14435 protected String name ;
14536
@@ -159,57 +50,10 @@ public String toString() {
15950 }
16051 }
16152
162- protected List <SQLMetaData > getSQLMetadata () {
163- List <Map <Integer , Object >> meta = (List <Map <Integer , Object >>) body .get (Key .SQL_METADATA .getId ());
164- List <SQLMetaData > values = new ArrayList <SQLMetaData >(meta .size ());
165- for (Map <Integer ,Object > c :meta ) {
166- values .add (new SQLMetaData ((String ) c .get (Key .SQL_FIELD_NAME .getId ())));
167- }
168- return values ;
169- }
170-
171- protected List <List <Object >> getSQLData () {
172- return (List <List <Object >>) body .get (Key .DATA .getId ());
173- }
174-
175- protected List <Map <String , Object >> readSqlResult (List <List <?>> data ) {
176- List <Map <String , Object >> values = new ArrayList <Map <String , Object >>(data .size ());
177- List <SQLMetaData > metaData = getSQLMetadata ();
178- LinkedHashMap <String , Object > value = new LinkedHashMap <String , Object >();
179- for (List row : data ) {
180- for (int i = 0 ; i < row .size (); i ++) {
181- value .put (metaData .get (i ).getName (), row .get (i ));
182- }
183- values .add (value );
184- }
185- return values ;
186- }
187-
188-
189- protected Long getSqlRowCount () {
190- Map <Key , Object > info = (Map <Key , Object >) body .get (Key .SQL_INFO .getId ());
191- Number rowCount ;
192- if (info != null && (rowCount = ((Number ) info .get (Key .SQL_ROW_COUNT .getId ()))) != null ) {
193- return rowCount .longValue ();
194- }
195- return null ;
196- }
197-
198-
19953 protected TarantoolException serverError (long code , Object error ) {
20054 return new TarantoolException (code , error instanceof String ? (String ) error : new String ((byte []) error ));
20155 }
20256
203- protected class ByteArrayOutputStream extends java .io .ByteArrayOutputStream {
204- public ByteArrayOutputStream (int size ) {
205- super (size );
206- }
207-
208- ByteBuffer toByteBuffer () {
209- return ByteBuffer .wrap (buf , 0 , count );
210- }
211- }
212-
21357 protected void closeChannel (SocketChannel channel ) {
21458 if (channel != null ) {
21559 try {
0 commit comments