-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathTorrent.java
348 lines (304 loc) · 8.72 KB
/
Torrent.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
import java.io.*;
import java.net.*;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.List;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.concurrent.ExecutorService;
import java.util.Collections;
import java.util.concurrent.*;
public class Torrent extends Thread {
private final ExecutorService task = Executors.newCachedThreadPool();
private List<Peer> peers;
private int pieceNUM;
private int LISTENING_PORT;
private String local_id;
private ServerSocket listener;
private HashMap<Integer, ArrayList<Peer>> peerTracker;
public boolean isRunning;
private boolean[] piecesAsked;
private int doneNUM;
private boolean[] pieceHas;
public boolean is_Running;
private boolean shut_down;
public Torrent(String peerID) {
local_id = peerID;
this.shut_down = false;
long total = Long.parseLong(peerProcess.config.getProperty("FileSize"));
long piece = Long.parseLong(peerProcess.config.getProperty("PieceSize"));
long out = total/piece;
if(total%piece ==0){this.pieceNUM=(int)out;}
else{this.pieceNUM=(int)out+1;}
piecesAsked = new boolean[pieceNUM];
for (int i = 0; i < pieceNUM - 1; i++) {
piecesAsked[i] = false;
}
}
public void initialPeer() {
peers = Collections.synchronizedList(new ArrayList<Peer>()); //List of Peer objects (instance variable)
readFile("PeerInfo.cfg");
for(int i=0;i<peers.size();i++){
Peer newpeer = peers.get(i);
addNewPeer(newpeer);
if(newpeer.startHandshake(local_id)) {
newpeer.start();
send_Havemsg(newpeer);
} else {
peers.remove(newpeer);
}
}
is_Running=true;
}
// server socket listening and creat new peer which is acorresponding to a remote machine
public void run() {
try {
listener = new ServerSocket(LISTENING_PORT);
System.out.println("Listening on port " + LISTENING_PORT);
while (isRunning) {
final Socket incomingConn = listener.accept();
task.execute(new Runnable(){
public void run(){
add_Peer(incomingConn);
}
});
}
}catch (IOException e) {
e.printStackTrace();
}
}
/* add new Peer, send handshake msg adn start peer thread*/
public void addNewPeer(Peer newPeer) {
if (checkDuplicatePeer(newPeer)) {
return;
}
if(!newPeer.startHandshake(local_id)) {
newPeer.disconnect_Peer();
return;
}
peers.add(newPeer);
newPeer.start();
send_Havemsg(newPeer);
}
/* add new Peer, send handshake message and start peer thread, take socket as parameter*/
public void add_Peer(final Socket incomingsock){
Peer newPeer =new Peer(incomingsock, this);
if (checkDuplicatePeer(newPeer)) {
newPeer.disconnect_Peer();
return;
}
if(newPeer.answer_Handshake(local_id)){
peers.add(newPeer);
newPeer.start();
send_Havemsg(newPeer);
}else {
newPeer.disconnect_Peer();
return;
}
}
private boolean checkDuplicatePeer(Peer peer) {
synchronized (peers) {
for (Peer existPeer : peers) {
if (existPeer.getPeerId().equals(peer.getPeerId())) {
return true;
}
}
}
return false;
}
public void send_Havemsg(Peer peer) {
for (int i = 0; i < pieceNUM; i++) {
if (pieceHas[i])
peer.sendHaveMessage(i);
}
}
public void rerequest(int index) {
piecesAsked[index] = false;
}
public void file_download() {
if(getDoneNUM()==pieceNUM) {
System.out.println("Skipping download(), file already done");
return;
}
/* for each piece, generate a list which contain the peer of having the corresponding piece*/
peerTracker = new HashMap<Integer, ArrayList<Peer>>();
for (int i = 0; i < pieceNUM; i++) {
peerTracker.put(i, new ArrayList<Peer>());
synchronized(peers){
for(Peer peer : peers){
boolean[] pieceStatus=peer.getPieceStatus();
for(i=0;i<pieceStatus.length;i++){
if (pieceStatus[i]== true)
peerTracker.get(i).add(peer);
}
}
}
}
/* send interested msg to every peer which has the lacking piece*/
for (int i = 0; i < pieceNUM; i++) {
if (pieceHas[i]==false) {
for (Peer peer : peerTracker.get(i)) {
peer.setMeInterested(true);
peer.sendInterested();
}
}
}
// send request msg
piecesAsked= new boolean[pieceNUM];
while(is_Running) {
if (getDoneNUM() == pieceNUM) {
break;
}
for (int i = 0; i < pieceNUM; i++) {
if (piecesAsked[i] || pieceHas[i]) // do not request when already requested or already downloaded
continue;
ArrayList<Peer> availablePeers = peerTracker.get(i);
if (!availablePeers.isEmpty()) {
int available = -1;
for (int j = 0; j < availablePeers.size(); j++) {
// peer which is Connected unchoking and not requested yet is available
Peer temp= availablePeers.get(j);
if (temp.is_Connected() && !temp.reqCheck && !temp.Choking()) {
available = j;
break;
}
}
if (available == -1) {
continue;
} else {
piecesAsked[i] = true;
request_Piece(availablePeers.remove(available), i);
}
}
}
try {
Thread.sleep(5);
} catch(Exception e) {
e.printStackTrace();
}
}
}
// send request message to the denoting peers
public void request_Piece(Peer peer, int index) {
try{
peer.reqCheck = true;
peer.currIndex=index;
peer.sendRequestMessage(index);
peer.sendRequestToOut();
} catch(IOException e) {
e.printStackTrace();
}
}
public void request_Receiv_ed(AbstractMessage.Request request, Peer peer) throws IOException {
int index = request.getreqIndex();
if (index >= 0 && index < pieceNUM) {
Path path = Paths.get("peer_"+local_id+"/TheFile.dat."+index);
byte[] data = Files.readAllBytes(path);
AbstractMessage.Piece pm = new AbstractMessage.Piece(index, data);
peer.sendPiece(pm);
}
else {
peer.disconnect_Peer();
}
}
public void pieceReceived(AbstractMessage.Piece rvfPiece, Peer peer) {
synchronized(peer.pendingReqs) {
AbstractMessage.Request expectReq = peer.pendingReqs.peek();
int index = rvfPiece.getPieceIndex();
if (index == expectReq.getreqIndex()) {
if (!peer.pendingReqs.isEmpty())
peer.pendingReqs.poll();
}
}
peer.reqNext();
pieceHas[rvfPiece.getPieceIndex()]=true;
writePiece(rvfPiece.getPiece(), rvfPiece.getPieceIndex());
int index=rvfPiece.getPieceIndex();
if(pieceHas[index]){
peer.reqCheck=false;
peer.currIndex=-1;
incrementDone();
broadcast(rvfPiece.getPieceIndex());
}
}
//Broadcasting have message for piece just received
public void broadcast(int index) {
synchronized(peers) {
for (Peer peer : peers) {
if (peer.is_Connected())
peer.sendHaveMessage(index);
}
}
}
public void removePeer(Peer a){
this.peers.remove(a);
}
public synchronized int getDoneNUM() {
return doneNUM;
}
private synchronized void incrementDone() {
doneNUM++;
}
public int getPort() {
return LISTENING_PORT;
}
public int getpieceNUM() {
return pieceNUM;
}
public void writePiece(byte[] piece, int index) {
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append("TheFile.dat.");
stringBuilder.append(index);
String filename=stringBuilder.toString();
try {
PrintWriter f = new PrintWriter(new FileWriter("peer_"+local_id+"/"+filename));
f.println(piece);
f.flush();
f.close();
} catch (IOException ex) {
}
}
public void readFile(String s1){
try{
FileReader fr = new FileReader(s1);
BufferedReader br = new BufferedReader(fr);
String s;
String[] str=null;
while((s = br.readLine()) != null) { //reading file till the current peer is founds
str=s.split("\\s+");
if(str[0].equals(local_id)){
this.LISTENING_PORT=Integer.parseInt(str[2]);
this.pieceHas= new boolean[this.pieceNUM];
for(int i=0;i<this.pieceHas.length;i++){
if(Integer.parseInt(str[3])==1){
this.pieceHas[i]=true;
incrementDone();
}
else{
this.pieceHas[i]=false;
}
}
break;
}
else peers.add(new Peer(str[0],str[1],Integer.parseInt(str[2]),Integer.parseInt(str[3]),this.pieceNUM,this));
}
fr.close();
}
catch (Exception e){
System.err.println("Error: " + e.getMessage());
}
}
public void shutdownTorrent() {
if(!shut_down) {
shut_down = true;
this.is_Running = false;
synchronized(peers) {
for (Peer peer : peers) {
peer.disconnect_Peer();
}
}
}
}
}