This repository has been archived by the owner on Aug 4, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 7
/
hiveclient.go
130 lines (101 loc) · 3.14 KB
/
hiveclient.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
/*
Hive: The go thrift library for connecting to hive server.
This is just the generated Thrift-Hive and a very small connection wrapper.
Usage:
func main() {
hive.MakePool("192.168.1.17:10000")
conn, err := GetHiveConn()
if err == nil {
er, err := conn.Client.Execute("SELECT * FROM logevent")
if er == nil && err == nil {
for {
row, _, _ := conn.Client.FetchOne()
log.Println("row ", row)
}
}
}
if conn != nil {
// make sure to check connection back into pool
conn.Checkin()
}
}
*/
package hive
import (
"errors"
"fmt"
thrifthive "github.com/araddon/hive/thriftlib"
"github.com/araddon/thrift4go/lib/go/thrift"
"log"
"net"
)
type HiveConnection struct {
Server string
Id int
Client *thrifthive.ThriftHiveClient
}
var hivePool chan *HiveConnection
// create connection pool, initialize connections
func MakePool(server string) {
hivePool = make(chan *HiveConnection, 100)
for i := 0; i < 100; i++ {
// add empty values to the pool
hivePool <- &HiveConnection{Server: server, Id: i}
}
}
// main entry point for checking out a connection from a list
func GetHiveConn() (conn *HiveConnection, err error) {
//configMu.Lock()
//keyspaceConfig, ok := configMap[keyspace]
//if !ok {
// configMu.Unlock()
// return nil, errors.New("Must define keyspaces before you can get connection")
//}
//configMu.Unlock()
return getConnFromPool()
}
func getConnFromPool() (conn *HiveConnection, err error) {
conn = <-hivePool
log.Printf("in checkout, pulled off pool: remaining = %d, connid=%d Server=%s\n", len(hivePool), conn.Id, conn.Server)
// BUG(ar): an error occured on batch mutate <nil> <nil> <nil> Cannot read. Remote side has closed. Tried to read 4 bytes, but only got 0 bytes.
if conn.Client == nil || conn.Client.Transport.IsOpen() == false {
err = conn.Open()
log.Println(" in create conn, how is client? ", conn.Client, " is err? ", err)
return conn, err
}
return
}
// opens a hive connection
func (conn *HiveConnection) Open() error {
log.Println("creating new hive connection ")
tcpConn, er := net.Dial("tcp", conn.Server)
if er != nil {
return er
}
ts, err := thrift.NewTSocketConn(tcpConn)
if err != nil {
return err
}
fmt.Println(ts)
if ts == nil {
return errors.New("No TSocket connection?")
}
// the TSocket implements interface TTransport
//trans := thrift.NewTFramedTransport(ts)
trans, _ := thrift.NewTNonblockingSocketConn(tcpConn)
trans.Open()
// NewTBinaryProtocolTransport(t TTransport) *TBinaryProtocol {
protocolfac := thrift.NewTBinaryProtocolFactoryDefault()
//NewThriftHiveClientProtocol(t thrift.TTransport, iprot thrift.TProtocol, oprot thrift.TProtocol)
conn.Client = thrifthive.NewThriftHiveClientFactory(trans, protocolfac)
log.Println("is open? ", trans.IsOpen())
log.Println(" in conn.Open, how is client? ", conn.Client)
if conn.Client == nil {
log.Println("ERROR, no client")
return errors.New("no client")
}
return nil
}
func (conn *HiveConnection) Checkin() {
hivePool <- conn
}