Skip to content

Commit

Permalink
V8
Browse files Browse the repository at this point in the history
  • Loading branch information
Pastor committed Nov 1, 2024
1 parent 14f2c5b commit cbe234e
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 17 deletions.
4 changes: 4 additions & 0 deletions vol8/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
<artifactId>lombok</artifactId>
<version>1.18.34</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>bson</artifactId>
Expand Down
10 changes: 4 additions & 6 deletions vol8/src/main/java/ru/mifi/practice/vol8/User.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@
import org.bson.BsonBinaryWriter;
import org.bson.BsonReader;
import org.bson.BsonType;
import org.bson.io.BasicOutputBuffer;
import ru.mifi.practice.vol8.streaming.Bson;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;

@RequiredArgsConstructor
public final class User {
Expand All @@ -18,17 +17,16 @@ public final class User {

public static void main(String[] args) {
User user = new User("bob", "password");
BasicOutputBuffer output = new BasicOutputBuffer();
try (AbstractBsonWriter writer = new BsonBinaryWriter(output)) {
ByteArrayOutputStream output = new ByteArrayOutputStream();
try (var bsonOutput = Bson.newOutput(output); AbstractBsonWriter writer = new BsonBinaryWriter(bsonOutput)) {
writer.writeStartDocument();
writer.writeString("name", user.name);
writer.writeString("password", user.password);
writer.writeEndDocument();
writer.flush();
}
byte[] bytes = output.toByteArray();
try (BsonReader reader = new BsonBinaryReader(new Bson.BsonInputStream(
new ByteArrayInputStream(bytes)))) {
try (BsonReader reader = new BsonBinaryReader(Bson.newInput(bytes))) {
boolean reading = true;
while (reading) {
BsonType type = reader.readBsonType();
Expand Down
119 changes: 108 additions & 11 deletions vol8/src/main/java/ru/mifi/practice/vol8/streaming/Bson.java
Original file line number Diff line number Diff line change
@@ -1,19 +1,115 @@
package ru.mifi.practice.vol8.streaming;

import com.google.common.io.ByteStreams;
import lombok.SneakyThrows;
import org.bson.BsonSerializationException;
import org.bson.ByteBuf;
import org.bson.io.BsonInput;
import org.bson.io.BsonInputMark;
import org.bson.io.BsonOutput;
import org.bson.io.OutputBuffer;
import org.bson.types.ObjectId;

import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.RandomAccessFile;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.UUID;

import static java.lang.String.format;

public final class Bson {
public static final class BsonInputStream implements BsonInput {
public interface Bson {

static BsonInput newInput(byte[] bytes) {
return newInput(new ByteArrayInputStream(bytes));
}

static BsonInput newInput(InputStream stream) {
return new BsonInputStream(stream);
}

static BsonOutput newOutput(OutputStream stream) {
return new BsonOutputStream(stream);
}

final class BsonOutputStream extends OutputBuffer {
private final File temporaryFile;
private final RandomAccessFile raf;
private final OutputStream stream;

@SneakyThrows
private BsonOutputStream(OutputStream stream) {
this.temporaryFile = File.createTempFile(UUID.randomUUID().toString(), ".stream");
this.raf = new RandomAccessFile(temporaryFile, "rwd");
this.stream = stream;
}

@SneakyThrows
@Override
public void close() {
raf.close();
try (stream; var inputStream = new FileInputStream(temporaryFile)) {
ByteStreams.copy(inputStream, stream);
}
temporaryFile.delete();
}

@Override
public int pipe(OutputStream out) throws IOException {
throw new UnsupportedOperationException("BsonOutputStream pipe not supported");
}

@Override
public List<ByteBuf> getByteBuffers() {
throw new UnsupportedOperationException("BsonOutputStream getByteBuffers not supported");
}

@SneakyThrows
@Override
public int getPosition() {
return (int) raf.getFilePointer();
}

@SneakyThrows
@Override
public int getSize() {
return (int) raf.length();
}

@Override
public void truncateToPosition(int newPosition) {
throw new UnsupportedOperationException("BsonOutputStream truncateToPosition not supported");
}

@SneakyThrows
@Override
public void writeBytes(byte[] bytes, int offset, int length) {
raf.write(bytes, offset, length);
}

@SneakyThrows
@Override
public void writeByte(int value) {
raf.write(value);
}

@SneakyThrows
@Override
protected void write(int position, int value) {
long lastPosition = getPosition();
raf.seek(position);
raf.write(value);
raf.seek(lastPosition);
}
}

final class BsonInputStream implements BsonInput {
private static final int READ_LIMIT = 4096;
private static final String[] ONE_BYTE_ASCII_STRINGS = new String[Byte.MAX_VALUE + 1];

Expand All @@ -25,7 +121,7 @@ public static final class BsonInputStream implements BsonInput {

private final PosBufferedInputStream stream;

public BsonInputStream(InputStream stream) {
private BsonInputStream(InputStream stream) {
this.stream = new PosBufferedInputStream(stream);
}

Expand Down Expand Up @@ -119,6 +215,7 @@ public ObjectId readObjectId() {
return new ObjectId(bytes);
}

//FIXME: Переписать на потоковое чтение строки
@SneakyThrows
@Override
public String readCString() {
Expand Down Expand Up @@ -179,17 +276,17 @@ private void ensureAvailable(final int bytesNeeded) {
+ "but only %d remain", bytesNeeded, stream.available()));
}
}
}

private static final class PosBufferedInputStream extends BufferedInputStream {
private static final class PosBufferedInputStream extends BufferedInputStream {

private PosBufferedInputStream(InputStream in) {
super(in);
}
private PosBufferedInputStream(InputStream in) {
super(in);
}

@SuppressWarnings("PMD.UnusedPrivateMethod")
private int getPosition() {
return pos;
@SuppressWarnings("PMD.UnusedPrivateMethod")
private int getPosition() {
return pos;
}
}
}
}

0 comments on commit cbe234e

Please sign in to comment.