Skip to content
This repository was archived by the owner on Aug 19, 2018. It is now read-only.

Commit 799d71c

Browse files
committed
Add draft of FastTransport::ClientSession
1 parent dfe9d37 commit 799d71c

File tree

6 files changed

+412
-51
lines changed

6 files changed

+412
-51
lines changed

ft/run2.sh

+17-3
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,29 @@
11
#!/bin/bash
22
set -e
3-
(cd ..; make ./obj.fasttransport/FastEcho)
3+
4+
SERVER="python server.py"
5+
CLIENT="python client.py"
6+
while getopts "sc" OPT; do
7+
case $OPT in
8+
s)
9+
SERVER="../obj.fasttransport/FastEcho"
10+
;;
11+
c)
12+
CLIENT="../obj.fasttransport/FastTelnet -x"
13+
;;
14+
esac
15+
done
16+
17+
(cd ..; make ./obj.fasttransport/Fast{Echo,Telnet})
418
TIME=5
519
CAP_FILE=/tmp/x.cap
620
sudo rm -f $CAP_FILE
721
sudo dumpcap -a duration:$TIME -i lo -w $CAP_FILE &
822
sleep .1
9-
../obj.fasttransport/FastEcho &
23+
$SERVER &
1024
SERVER_PID=$!
1125
sleep .1
12-
python client.py &
26+
$CLIENT &
1327
CLIENT_PID=$!
1428
trap "kill $SERVER_PID; kill $CLIENT_PID" exit
1529
sleep $TIME

ft/transport.py

+5-24
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@
5151

5252
# The fraction of packets that will be dropped on transmission.
5353
# This should be 0 for production!
54-
PACKET_LOSS = 0.05
54+
PACKET_LOSS = 0.00
5555

5656
WINDOW_SIZE = 10
5757
REQ_ACK_AFTER = 5
@@ -936,7 +936,6 @@ class _ClientChannel(object):
936936
# beginSending() transitions from IDLE to SENDING.
937937
# processReceivedData() transitions from SENDING to RECEIVING.
938938
state = None
939-
session = None
940939
rpcId = None
941940
currentRpc = None
942941
outboundMsg = None
@@ -1062,11 +1061,9 @@ def connect(self, service):
10621061
header = Header()
10631062
header.direction = Header.CLIENT_TO_SERVER
10641063
header.clientSessionHint = self._id
1065-
header.serverSessionHint = self._serverSessionHint
1066-
header.sessionToken = self._token
1067-
header.rpcId = 0
10681064
header.serverSessionHint = 0
10691065
header.sessionToken = 0
1066+
header.rpcId = 0
10701067
header.channelId = 0
10711068
header.payloadType = Header.PT_SESSION_OPEN
10721069
self._transport._sendOne(self.getAddress(), header, Buffer([]))
@@ -1089,16 +1086,14 @@ def getAddress(self):
10891086
return self._service.address
10901087

10911088
def processInboundPacket(self, payloadCM):
1092-
"""Return whether the session is still valid."""
1093-
10941089
self._lastActivityTime = gettime()
10951090

10961091
header = Header.fromString(payloadCM.payload[:Header.LENGTH])
10971092

10981093
if header.channelId >= self._numChannels:
10991094
if header.payloadType == Header.PT_SESSION_OPEN:
11001095
self._processSessionOpenResponse(payloadCM)
1101-
return True
1096+
return
11021097

11031098
channel = self._channels[header.channelId]
11041099
if channel.rpcId == header.rpcId:
@@ -1117,12 +1112,9 @@ def processInboundPacket(self, payloadCM):
11171112
self._serverSessionHint = None
11181113
self._token = None
11191114
self.connect(self._service)
1120-
return False
11211115
else:
1122-
if (0 < channel.rpcId - header.rpcId < 1024 and
1123-
header.payloadType == Header.PT_DATA and header.requestAck):
1116+
if header.payloadType == Header.PT_DATA and header.requestAck:
11241117
raise NotImplementedError("faked full ACK response")
1125-
return True
11261118

11271119
def startRpc(self, rpc):
11281120
"""Queue an RPC for transmission on this session.
@@ -1140,15 +1132,6 @@ def startRpc(self, rpc):
11401132
channel.currentRpc = rpc
11411133
channel.outboundMsg.beginSending(rpc.getRequestBuffer())
11421134

1143-
def getActiveChannels(self):
1144-
"""Used for timers."""
1145-
if not self._isConnected():
1146-
return
1147-
for channelId in range(self._numChannels):
1148-
channel = self._channels[channelId]
1149-
if channel.state != channel.IDLE_STATE:
1150-
yield channelId
1151-
11521135
def close(self):
11531136
debug("Aborting session")
11541137
for channel in self._channels:
@@ -1312,9 +1295,7 @@ def _tryProcessPacket(self):
13121295
else:
13131296
if header.clientSessionHint < len(self._clientSessions):
13141297
session = self._clientSessions[header.clientSessionHint]
1315-
stillValid = session.processInboundPacket(payloadCM)
1316-
if not stillValid:
1317-
self._clientSessions.put(session)
1298+
session.processInboundPacket(payloadCM)
13181299
return True
13191300

13201301
def poll(self):

src/Driver.cc

+9
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,15 @@ UDPDriver::release(char *payload, uint32_t len)
4141
packetBufsUtilized--;
4242
}
4343

44+
UDPDriver::UDPDriver()
45+
: socketFd(-1), packetBufsUtilized(0)
46+
{
47+
int fd = socket(AF_INET, SOCK_DGRAM, 0);
48+
if (fd == -1)
49+
throw UnrecoverableDriverException(errno);
50+
socketFd = fd;
51+
}
52+
4453
UDPDriver::UDPDriver(const sockaddr *addr, socklen_t addrlen)
4554
: socketFd(-1), packetBufsUtilized(0)
4655
{

src/Driver.h

+1
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ class UDPDriver : public Driver {
102102
Buffer::Iterator *payload);
103103
virtual bool tryRecvPacket(Received *received);
104104
virtual void release(char *payload, uint32_t len);
105+
UDPDriver();
105106
UDPDriver(const sockaddr *addr, socklen_t addrlen);
106107
virtual ~UDPDriver();
107108
private:

0 commit comments

Comments
 (0)