Skip to content

Commit edcf1f5

Browse files
committed
add download data code
1 parent 3fd0c3f commit edcf1f5

8 files changed

+585
-0
lines changed

prop.properties

Whitespace-only changes.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
package org.tron.core.db.fast.download;
2+
3+
import io.netty.bootstrap.ServerBootstrap;
4+
import io.netty.channel.ChannelFuture;
5+
import io.netty.channel.ChannelInitializer;
6+
import io.netty.channel.ChannelOption;
7+
import io.netty.channel.ChannelPipeline;
8+
import io.netty.channel.EventLoopGroup;
9+
import io.netty.channel.nio.NioEventLoopGroup;
10+
import io.netty.channel.socket.SocketChannel;
11+
import io.netty.channel.socket.nio.NioServerSocketChannel;
12+
import io.netty.handler.codec.LineBasedFrameDecoder;
13+
import io.netty.handler.codec.string.StringDecoder;
14+
import io.netty.handler.codec.string.StringEncoder;
15+
import io.netty.handler.logging.LogLevel;
16+
import io.netty.handler.logging.LoggingHandler;
17+
import io.netty.handler.ssl.SslContext;
18+
import io.netty.handler.ssl.SslContextBuilder;
19+
import io.netty.handler.ssl.util.SelfSignedCertificate;
20+
import io.netty.handler.stream.ChunkedWriteHandler;
21+
import io.netty.util.CharsetUtil;
22+
23+
public class DataDownloadServer {
24+
25+
static final boolean SSL = System.getProperty("ssl") != null;
26+
// Use the same default port with the telnet example so that we can use the telnet client example to access it.
27+
static final int PORT = Integer.parseInt(System.getProperty("port", SSL ? "8992" : "8023"));
28+
29+
public static void main(String[] args) throws Exception {
30+
// Configure SSL.
31+
final SslContext sslCtx;
32+
if (SSL) {
33+
SelfSignedCertificate ssc = new SelfSignedCertificate();
34+
sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
35+
} else {
36+
sslCtx = null;
37+
}
38+
39+
// Configure the server.
40+
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
41+
EventLoopGroup workerGroup = new NioEventLoopGroup();
42+
try {
43+
ServerBootstrap b = new ServerBootstrap();
44+
b.group(bossGroup, workerGroup)
45+
.channel(NioServerSocketChannel.class)
46+
.option(ChannelOption.SO_BACKLOG, 100)
47+
.handler(new LoggingHandler(LogLevel.INFO))
48+
.childHandler(new ChannelInitializer<SocketChannel>() {
49+
@Override
50+
public void initChannel(SocketChannel ch) throws Exception {
51+
ChannelPipeline p = ch.pipeline();
52+
if (sslCtx != null) {
53+
p.addLast(sslCtx.newHandler(ch.alloc()));
54+
}
55+
p.addLast(
56+
new StringEncoder(CharsetUtil.UTF_8),
57+
new LineBasedFrameDecoder(8192),
58+
new StringDecoder(CharsetUtil.UTF_8),
59+
new ChunkedWriteHandler(),
60+
new DataDownloadServerHandler());
61+
}
62+
});
63+
64+
// Start the server.
65+
ChannelFuture f = b.bind(PORT).sync();
66+
67+
// Wait until the server socket is closed.
68+
f.channel().closeFuture().sync();
69+
} finally {
70+
// Shut down all event loops to terminate all threads.
71+
bossGroup.shutdownGracefully();
72+
workerGroup.shutdownGracefully();
73+
}
74+
}
75+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package org.tron.core.db.fast.download;
2+
3+
import io.netty.channel.ChannelFutureListener;
4+
import io.netty.channel.ChannelHandlerContext;
5+
import io.netty.channel.DefaultFileRegion;
6+
import io.netty.channel.SimpleChannelInboundHandler;
7+
import io.netty.handler.ssl.SslHandler;
8+
import io.netty.handler.stream.ChunkedFile;
9+
import java.io.RandomAccessFile;
10+
11+
public class DataDownloadServerHandler extends SimpleChannelInboundHandler<String> {
12+
13+
@Override
14+
public void channelActive(ChannelHandlerContext ctx) {
15+
ctx.writeAndFlush("HELLO: Type the path of the file to retrieve.\n");
16+
}
17+
18+
@Override
19+
public void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
20+
RandomAccessFile raf = null;
21+
long length = -1;
22+
try {
23+
raf = new RandomAccessFile(msg, "r");
24+
length = raf.length();
25+
} catch (Exception e) {
26+
ctx.writeAndFlush("ERR: " + e.getClass().getSimpleName() + ": " + e.getMessage() + '\n');
27+
return;
28+
} finally {
29+
if (length < 0 && raf != null) {
30+
raf.close();
31+
}
32+
}
33+
34+
ctx.write("OK: " + raf.length() + '\n');
35+
if (ctx.pipeline().get(SslHandler.class) == null) {
36+
// SSL not enabled - can use zero-copy file transfer.
37+
ctx.write(new DefaultFileRegion(raf.getChannel(), 0, length));
38+
} else {
39+
// SSL enabled - cannot use zero-copy file transfer.
40+
ctx.write(new ChunkedFile(raf));
41+
}
42+
ctx.writeAndFlush("\n");
43+
}
44+
45+
@Override
46+
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
47+
cause.printStackTrace();
48+
49+
if (ctx.channel().isActive()) {
50+
ctx.writeAndFlush("ERR: " +
51+
cause.getClass().getSimpleName() + ": " +
52+
cause.getMessage() + '\n').addListener(ChannelFutureListener.CLOSE);
53+
}
54+
}
55+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
package org.tron.core.db.fast.download.http;
2+
3+
import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE;
4+
import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
5+
import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;
6+
import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
7+
import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
8+
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
9+
10+
import io.netty.buffer.Unpooled;
11+
import io.netty.channel.ChannelFuture;
12+
import io.netty.channel.ChannelFutureListener;
13+
import io.netty.channel.ChannelHandlerContext;
14+
import io.netty.channel.SimpleChannelInboundHandler;
15+
import io.netty.handler.codec.http.DefaultFullHttpResponse;
16+
import io.netty.handler.codec.http.DefaultHttpResponse;
17+
import io.netty.handler.codec.http.FullHttpRequest;
18+
import io.netty.handler.codec.http.FullHttpResponse;
19+
import io.netty.handler.codec.http.HttpChunkedInput;
20+
import io.netty.handler.codec.http.HttpHeaders;
21+
import io.netty.handler.codec.http.HttpResponse;
22+
import io.netty.handler.codec.http.HttpResponseStatus;
23+
import io.netty.handler.codec.http.LastHttpContent;
24+
import io.netty.handler.stream.ChunkedFile;
25+
import io.netty.util.CharsetUtil;
26+
import io.netty.util.internal.SystemPropertyUtil;
27+
import java.io.File;
28+
import java.io.FileNotFoundException;
29+
import java.io.RandomAccessFile;
30+
import java.io.UnsupportedEncodingException;
31+
import java.net.URLDecoder;
32+
import java.util.regex.Pattern;
33+
import javax.activation.MimetypesFileTypeMap;
34+
import lombok.extern.slf4j.Slf4j;
35+
import org.apache.commons.lang3.StringUtils;
36+
37+
@Slf4j
38+
public class HttpChannelHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
39+
40+
public static final String HTTP_DATE_FORMAT = "EEE, dd MMM yyyy HH:mm:ss zzz";
41+
public static final String HTTP_DATE_GMT_TIMEZONE = "GMT";
42+
public static final int HTTP_CACHE_SECONDS = 60;
43+
44+
@Override
45+
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
46+
// 监测解码情况
47+
if (!request.getDecoderResult().isSuccess()) {
48+
sendError(ctx, BAD_REQUEST);
49+
return;
50+
}
51+
final String uri = request.getUri();
52+
final String path = sanitizeUri(uri);
53+
System.out.println("get file:" + path);
54+
if (path == null) {
55+
sendError(ctx, FORBIDDEN);
56+
return;
57+
}
58+
//读取要下载的文件
59+
File file = new File(path);
60+
if (file.isHidden() || !file.exists()) {
61+
sendError(ctx, NOT_FOUND);
62+
return;
63+
}
64+
// if (!file.isFile()) {
65+
// sendError(ctx, FORBIDDEN);
66+
// return;
67+
// }
68+
RandomAccessFile raf;
69+
try {
70+
raf = new RandomAccessFile(path, "r");
71+
} catch (FileNotFoundException ignore) {
72+
sendError(ctx, NOT_FOUND);
73+
return;
74+
}
75+
long fileLength = raf.length();
76+
HttpResponse response = new DefaultHttpResponse(HTTP_1_1, HttpResponseStatus.OK);
77+
HttpHeaders.setContentLength(response, fileLength);
78+
setContentTypeHeader(response, file);
79+
//setDateAndCacheHeaders(response, file);
80+
if (HttpHeaders.isKeepAlive(request)) {
81+
response.headers().set("CONNECTION", HttpHeaders.Values.KEEP_ALIVE);
82+
}
83+
84+
logger.info("begin send header data");
85+
// Write the initial line and the header.
86+
ctx.write(response);
87+
logger.info("begin send content data");
88+
// Write the content.
89+
//ChannelFuture sendFileFuture =
90+
ctx.write(new HttpChunkedInput(new ChunkedFile(raf, 0, fileLength, 8192)),
91+
ctx.newProgressivePromise());
92+
// sendFuture用于监视发送数据的状态
93+
// sendFileFuture.addListener(new ChannelProgressiveFutureListener() {
94+
// @Override
95+
// public void operationProgressed(ChannelProgressiveFuture future, long progress, long total) {
96+
// if (total < 0) { // total unknown
97+
// System.err.println(future.channel() + " Transfer progress: " + progress);
98+
// } else {
99+
// System.err.println(future.channel() + " Transfer progress: " + progress + " / " + total);
100+
// }
101+
// }
102+
//
103+
// @Override
104+
// public void operationComplete(ChannelProgressiveFuture future) {
105+
// System.err.println(future.channel() + " Transfer complete.");
106+
// }
107+
// });
108+
109+
// Write the end marker
110+
ChannelFuture lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
111+
112+
// Decide whether to close the connection or not.
113+
if (!HttpHeaders.isKeepAlive(request)) {
114+
// Close the connection when the whole content is written out.
115+
lastContentFuture.addListener(ChannelFutureListener.CLOSE);
116+
}
117+
}
118+
119+
@Override
120+
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
121+
cause.printStackTrace();
122+
if (ctx.channel().isActive()) {
123+
sendError(ctx, INTERNAL_SERVER_ERROR);
124+
}
125+
ctx.close();
126+
}
127+
128+
private static final Pattern INSECURE_URI = Pattern.compile(".*[<>&\"].*");
129+
130+
private static String sanitizeUri(String uri) {
131+
// Decode the path.
132+
try {
133+
uri = URLDecoder.decode(uri, "UTF-8");
134+
} catch (UnsupportedEncodingException e) {
135+
throw new Error(e);
136+
}
137+
138+
if (!uri.startsWith("/")) {
139+
return null;
140+
}
141+
142+
// Convert file separators.
143+
uri = uri.replace('/', File.separatorChar);
144+
145+
// Simplistic dumb security check.
146+
// You will have to do something serious in the production environment.
147+
if (uri.contains(File.separator + '.') || uri.contains('.' + File.separator) || uri
148+
.startsWith(".") || uri.endsWith(".")
149+
|| INSECURE_URI.matcher(uri).matches()) {
150+
return null;
151+
}
152+
153+
// Convert to absolute path.
154+
if (StringUtils.startsWith(uri, File.separator)) {
155+
return SystemPropertyUtil.get("user.dir") + uri;
156+
}
157+
return SystemPropertyUtil.get("user.dir") + File.separator + uri;
158+
}
159+
160+
161+
private static void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) {
162+
FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, status, Unpooled
163+
.copiedBuffer("Failure: " + status + "\r\n", CharsetUtil.UTF_8));
164+
response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8");
165+
166+
// Close the connection as soon as the error message is sent.
167+
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
168+
}
169+
170+
/**
171+
* Sets the content type header for the HTTP Response
172+
*
173+
* @param response HTTP response
174+
* @param file file to extract content type
175+
*/
176+
private static void setContentTypeHeader(HttpResponse response, File file) {
177+
MimetypesFileTypeMap m = new MimetypesFileTypeMap();
178+
String contentType = m.getContentType(file.getPath());
179+
if (!contentType.equals("application/octet-stream")) {
180+
contentType += "; charset=utf-8";
181+
}
182+
response.headers().set(CONTENT_TYPE, contentType);
183+
}
184+
185+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package org.tron.core.db.fast.download.http;
2+
3+
import io.netty.channel.ChannelInitializer;
4+
import io.netty.channel.ChannelPipeline;
5+
import io.netty.channel.socket.SocketChannel;
6+
import io.netty.handler.codec.http.HttpObjectAggregator;
7+
import io.netty.handler.codec.http.HttpServerCodec;
8+
import io.netty.handler.stream.ChunkedWriteHandler;
9+
10+
public class HttpChannelInitlalizer extends ChannelInitializer<SocketChannel> {
11+
12+
@Override
13+
protected void initChannel(SocketChannel ch) throws Exception {
14+
ChannelPipeline pipeline = ch.pipeline();
15+
pipeline.addLast(new HttpServerCodec());
16+
pipeline.addLast(new HttpObjectAggregator(65536));
17+
pipeline.addLast(new ChunkedWriteHandler());
18+
pipeline.addLast(new HttpChannelHandler());
19+
}
20+
21+
}

0 commit comments

Comments
 (0)