Skip to content

Commit 9bb16d3

Browse files
committed
Fixed authentication issue with remote, auth-enabled databases.
1 parent 60c0985 commit 9bb16d3

File tree

6 files changed

+87
-96
lines changed

6 files changed

+87
-96
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,4 @@ build
77
*.ipr
88
out
99
dist
10+
jvm*.properties

src/java/cascading/mongodb/MongoDBConfiguration.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,5 +176,26 @@ void setPort(int port)
176176
{
177177
jobConf.setInt(PORT, port);
178178
}
179+
180+
String getUsername()
181+
{
182+
return jobConf.get(USERNAME, null);
183+
}
184+
185+
void setUsername(String username)
186+
{
187+
jobConf.set(USERNAME, username);
188+
}
189+
190+
char[] getPassword()
191+
{
192+
String s = jobConf.get(PASSWORD, null);
193+
return s.toCharArray();
194+
}
195+
196+
void setPassword(char[] password)
197+
{
198+
jobConf.set(PASSWORD, new String(password));
199+
}
179200

180201
}
Lines changed: 12 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,20 @@
11
package cascading.mongodb;
22

3-
import cascading.tuple.Tuple;
4-
import com.mongodb.BasicDBObject;
53
import com.mongodb.DB;
6-
import com.mongodb.DBCollection;
7-
import com.mongodb.MongoException;
84
import org.apache.hadoop.fs.FileSystem;
95
import org.apache.hadoop.mapred.JobConf;
106
import org.apache.hadoop.mapred.OutputFormat;
117
import org.apache.hadoop.mapred.RecordWriter;
12-
import org.apache.hadoop.mapred.Reporter;
138
import org.apache.hadoop.util.Progressable;
149
import org.apache.log4j.Logger;
15-
import org.bson.types.ObjectId;
1610

1711
import java.io.IOException;
1812

1913
/**
2014
* An OutputFormat that sends reduce output to a MongoDB collection.
2115
* <p/>
2216
* {@link MongoDBOutputFormat} accepts &lt;key, value&gt; pairs, where
23-
* the key has a type extending MongoDBWritable. Returned {@link MongoDBRecordWriter}
17+
* the key has a type extending MongoDBWritable. Returned {@link cascading.mongodb.DefaultMongoDBRecordWriter}
2418
* writes a document to the collection with the Fields names specified as the
2519
* attributes of the document.
2620
*
@@ -36,7 +30,17 @@ public RecordWriter<K, V> getRecordWriter(FileSystem fileSystem, JobConf jobConf
3630
String collection = dbConfiguration.getCollection();
3731
DB db = dbConfiguration.getDB();
3832

39-
return new MongoDBRecordWriter(db, collection);
33+
34+
String username = dbConfiguration.getUsername();
35+
if (username == null && "".equals(username))
36+
log.warn("Username was not set. If your database has security enabled, your upcoming writes will fail. If not, no worries!");
37+
38+
else if (!db.authenticate(dbConfiguration.getUsername(), dbConfiguration.getPassword()))
39+
{
40+
throw new IllegalArgumentException("Username or password provided was incorrect. Check your configuration.");
41+
}
42+
43+
return new DefaultMongoDBRecordWriter(db, collection);
4044

4145
}
4246

@@ -47,68 +51,5 @@ public void checkOutputSpecs(FileSystem fileSystem, JobConf jobConf) throws IOEx
4751
//Don't do anything with this.
4852
}
4953

50-
protected class MongoDBRecordWriter implements RecordWriter<K, V> {
51-
private DB db;
52-
private String collection;
53-
private int retryAttempts;
54-
55-
protected MongoDBRecordWriter(DB db, String collection) {
56-
this.db = db;
57-
this.collection = collection;
58-
this.retryAttempts = 10;
59-
}
60-
61-
/**
62-
* {@inheritDoc}
63-
*/
64-
public synchronized void write(K k, V v) throws IOException {
65-
66-
log.debug("MongoDBRecordWriter: {key = " + k + ", value = " + v + ", collection = " + collection + "};");
67-
BasicDBObject d = (BasicDBObject) v;
68-
69-
log.debug("Write Document: {document = " + d + ", collection = " + collection + "};");
70-
71-
int insertAttemptsRemaining = retryAttempts;
72-
73-
while(true)
74-
{
75-
try
76-
{
77-
db.getCollection(collection).insert(d);
78-
}
79-
catch (MongoException e)
80-
{
81-
if (insertAttemptsRemaining >= 0)
82-
{
83-
log.info("Waiting one second before retry...");
84-
try
85-
{
86-
Thread.sleep(1000);
87-
}
88-
catch (InterruptedException ie)
89-
{}
90-
log.info("Document insert failed: { message = " + e.getMessage() + ", attemptsRemaining = " + insertAttemptsRemaining);
91-
insertAttemptsRemaining--;
92-
continue;
93-
}
94-
else
95-
{
96-
throw new IOException("Exhausted 10 attempts to insert document: {document = " + d + "};", e);
97-
}
98-
}
99-
insertAttemptsRemaining = retryAttempts;
100-
break;
101-
}
102-
}
103-
104-
/**
105-
* {@inheritDoc}
106-
*/
107-
public void close(Reporter reporter) throws IOException {
108-
109-
}
110-
111-
}
112-
11354

11455
}

src/java/cascading/mongodb/MongoDBScheme.java

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -51,21 +51,21 @@ public Tuple source(Object key, Object value) {
5151
// {@inheritDoc}
5252
public void sink(TupleEntry tupleEntry, OutputCollector outputCollector) throws IOException {
5353

54-
BasicDBObject document = new BasicDBObject();
55-
56-
for (int i = 0; i < fields.length; i++) {
57-
Fields field = fields[i];
58-
TupleEntry values = tupleEntry.selectEntry(field);
59-
60-
for (int j = 0; j < values.getFields().size(); j++) {
61-
Fields fields = values.getFields();
62-
Tuple tuple = values.getTuple();
63-
64-
document.put(fields.get(j).toString(), tuple.getString(j));
65-
}
66-
}
67-
68-
outputCollector.collect(null, document);
54+
// BasicDBObject document = new BasicDBObject();
55+
//
56+
// for (int i = 0; i < fields.length; i++) {
57+
// Fields field = fields[i];
58+
// TupleEntry values = tupleEntry.selectEntry(field);
59+
//
60+
// for (int j = 0; j < values.getFields().size(); j++) {
61+
// Fields fields = values.getFields();
62+
// Tuple tuple = values.getTuple();
63+
//
64+
// document.put(fields.get(j).toString(), tuple.getString(j));
65+
// }
66+
// }
67+
68+
outputCollector.collect(null, tupleEntry);
6969

7070
// Tuple result = tupleEntry.selectTuple(getSinkFields());
7171
// result = cleanTuple(result);

src/java/cascading/mongodb/MongoDBTap.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,11 +103,20 @@ private DBAddress getDBAddress()
103103

104104
private DB getDB() {
105105

106+
log.debug("Requesting params for DB: {db=" + getMongo().getDB(getDBAddress().getDBName()) + "};");
106107
if (db == null) {
108+
log.debug("DB was null - requesting refresh.");
107109
db = getMongo().getDB(getDBAddress().getDBName());
110+
108111
if (username != null)
112+
{
113+
log.debug("Attempting to authenticate user...");
109114
if (!db.authenticate(username, password))
115+
{
110116
throw new IllegalArgumentException("MongoDBTap: Auth Failed: {username = " + username + ", password = " + Arrays.toString(password) + "};");
117+
}
118+
log.debug("Apparently authentication passed for: {username=" + username + "};");
119+
}
111120
}
112121

113122
return db;
@@ -164,7 +173,7 @@ public boolean deletePath(JobConf jobConf) throws IOException {
164173
return true;
165174

166175
log.debug("Removing collection: {name = " + collection + "};");
167-
//getDB().getCollection(collection).drop();
176+
getDB().getCollection(collection).drop();
168177

169178
return true;
170179

src/test/cascading/mongodb/MongoTest.java

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,11 @@
11
package cascading.mongodb;
22

3-
import com.mongodb.BasicDBObject;
4-
import com.mongodb.DB;
5-
import com.mongodb.DBCollection;
6-
import com.mongodb.Mongo;
3+
import com.mongodb.*;
74
import org.junit.Assert;
85
import org.junit.Before;
96
import org.junit.Test;
107

8+
import java.util.Arrays;
119
import java.util.Set;
1210

1311
/**
@@ -17,17 +15,21 @@
1715
public class MongoTest
1816
{
1917
private Mongo m;
18+
private String username = "gameattain";
19+
private char[] password = {'G', 'A', '.', '2', '0', '0', '9'};
20+
21+
//private DBAddress connection = new DBAddress("arrow.mongohq.com", 27025, "gameattain");
2022

2123
@Before
2224
public void setup() throws Exception
2325
{
24-
m = new Mongo("localhost");
26+
m = new Mongo(new DBAddress("arrow.mongohq.com", 27025, "gameattain"));
2527
}
2628

2729
@Test
2830
public void testMongoConnection() throws Exception
2931
{
30-
Mongo pm = new Mongo("localhost");
32+
Mongo pm = new Mongo(new DBAddress("arrow.mongohq.com", 27025, "gameattain"));
3133

3234
Assert.assertNotNull(pm);
3335
System.out.println("Established connection with local Mongo instance.");
@@ -36,8 +38,13 @@ public void testMongoConnection() throws Exception
3638
@Test
3739
public void testGetCollection()
3840
{
39-
DB db = m.getDB("testdb");
41+
DB db = m.getDB("gameattain");
42+
4043

44+
if (!db.authenticate(username, password))
45+
{
46+
throw new IllegalArgumentException("MongoTest: Auth Failed: {username = " + username + ", password = " + Arrays.toString(password) + "};");
47+
}
4148
Assert.assertNotNull(db);
4249

4350
Set<String> collection = db.getCollectionNames();
@@ -54,7 +61,13 @@ public void exec(Object o)
5461
@Test
5562
public void testGetTestCollection()
5663
{
57-
DB db = m.getDB("testdb");
64+
DB db = m.getDB("gameattain");
65+
66+
67+
if (!db.authenticate(username, password))
68+
{
69+
throw new IllegalArgumentException("MongoTest: Auth Failed: {username = " + username + ", password = " + Arrays.toString(password) + "};");
70+
}
5871

5972
Assert.assertNotNull(db);
6073

@@ -66,7 +79,13 @@ public void testGetTestCollection()
6679
@Test
6780
public void testInsertDoc()
6881
{
69-
DB db = m.getDB("testdb");
82+
DB db = m.getDB("gameattain");
83+
84+
85+
if (!db.authenticate(username, password))
86+
{
87+
throw new IllegalArgumentException("MongoTest: Auth Failed: {username = " + username + ", password = " + Arrays.toString(password) + "};");
88+
}
7089
DBCollection collection = db.getCollection("foo");
7190

7291
BasicDBObject doc = new BasicDBObject();

0 commit comments

Comments
 (0)