Skip to content

Commit

Permalink
Add support for sending datetime values in UTC (#1247)
Browse files Browse the repository at this point in the history
* Add support for sending datetime values in UTC

This update brings 2 new Bolt structures for transmitting datetime in UTC, specifically tag bytes 49 and 69.

Bolt 5 will only support these structures and will no longer accept tag bytes 46 and 66. Bolt 4.4 and 4.3 will negotiate with server and use the new structures if possible. Previous protocol versions behaviour remains unchanged.

* Add type feature

* Skip tests needing additional type support

* Update tests name

* Update matching

* Update matching
  • Loading branch information
injectives authored Jun 21, 2022
1 parent 30f5ed3 commit e056f71
Show file tree
Hide file tree
Showing 43 changed files with 419 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,13 @@

import io.netty.channel.Channel;
import io.netty.util.AttributeKey;
import java.util.Collections;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import org.neo4j.driver.internal.BoltServerAddress;
import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher;
import org.neo4j.driver.internal.messaging.BoltPatchesListener;
import org.neo4j.driver.internal.messaging.BoltProtocolVersion;

public final class ChannelAttributes {
Expand All @@ -39,6 +43,8 @@ public final class ChannelAttributes {
private static final AttributeKey<String> TERMINATION_REASON = newInstance("terminationReason");
private static final AttributeKey<AuthorizationStateListener> AUTHORIZATION_STATE_LISTENER =
newInstance("authorizationStateListener");
private static final AttributeKey<Set<BoltPatchesListener>> BOLT_PATCHES_LISTENERS =
newInstance("boltPatchesListeners");

// configuration hints provided by the server
private static final AttributeKey<Long> CONNECTION_READ_TIMEOUT = newInstance("connectionReadTimeout");
Expand Down Expand Up @@ -134,6 +140,20 @@ public static void setConnectionReadTimeout(Channel channel, Long connectionRead
setOnce(channel, CONNECTION_READ_TIMEOUT, connectionReadTimeout);
}

public static void addBoltPatchesListener(Channel channel, BoltPatchesListener listener) {
Set<BoltPatchesListener> boltPatchesListeners = get(channel, BOLT_PATCHES_LISTENERS);
if (boltPatchesListeners == null) {
boltPatchesListeners = new HashSet<>();
setOnce(channel, BOLT_PATCHES_LISTENERS, boltPatchesListeners);
}
boltPatchesListeners.add(listener);
}

public static Set<BoltPatchesListener> boltPatchesListeners(Channel channel) {
Set<BoltPatchesListener> boltPatchesListeners = get(channel, BOLT_PATCHES_LISTENERS);
return boltPatchesListeners != null ? boltPatchesListeners : Collections.emptySet();
}

private static <T> T get(Channel channel, AttributeKey<T> key) {
return channel.attr(key).get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
*/
package org.neo4j.driver.internal.async.connection;

import static org.neo4j.driver.internal.async.connection.ChannelAttributes.addBoltPatchesListener;

import io.netty.channel.Channel;
import io.netty.channel.ChannelPipeline;
import org.neo4j.driver.Logging;
import org.neo4j.driver.internal.async.inbound.ChannelErrorHandler;
Expand All @@ -33,10 +36,15 @@ public void build(MessageFormat messageFormat, ChannelPipeline pipeline, Logging
// inbound handlers
pipeline.addLast(new ChunkDecoder(logging));
pipeline.addLast(new MessageDecoder());
pipeline.addLast(new InboundMessageHandler(messageFormat, logging));
Channel channel = pipeline.channel();
InboundMessageHandler inboundMessageHandler = new InboundMessageHandler(messageFormat, logging);
addBoltPatchesListener(channel, inboundMessageHandler);
pipeline.addLast(inboundMessageHandler);

// outbound handlers
pipeline.addLast(OutboundMessageHandler.NAME, new OutboundMessageHandler(messageFormat, logging));
OutboundMessageHandler outboundMessageHandler = new OutboundMessageHandler(messageFormat, logging);
addBoltPatchesListener(channel, outboundMessageHandler);
pipeline.addLast(OutboundMessageHandler.NAME, outboundMessageHandler);

// last one - error handler
pipeline.addLast(new ChannelErrorHandler(logging));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,32 +23,38 @@
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.messageDispatcher;

import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.DecoderException;
import java.util.Set;
import org.neo4j.driver.Logger;
import org.neo4j.driver.Logging;
import org.neo4j.driver.internal.logging.ChannelActivityLogger;
import org.neo4j.driver.internal.messaging.BoltPatchesListener;
import org.neo4j.driver.internal.messaging.MessageFormat;

public class InboundMessageHandler extends SimpleChannelInboundHandler<ByteBuf> {
public class InboundMessageHandler extends SimpleChannelInboundHandler<ByteBuf> implements BoltPatchesListener {
private final ByteBufInput input;
private final MessageFormat.Reader reader;
private final MessageFormat messageFormat;
private final Logging logging;

private InboundMessageDispatcher messageDispatcher;
private MessageFormat.Reader reader;
private Logger log;

public InboundMessageHandler(MessageFormat messageFormat, Logging logging) {
this.input = new ByteBufInput();
this.reader = messageFormat.newReader(input);
this.messageFormat = messageFormat;
this.logging = logging;
this.reader = messageFormat.newReader(input);
}

@Override
public void handlerAdded(ChannelHandlerContext ctx) {
messageDispatcher = requireNonNull(messageDispatcher(ctx.channel()));
log = new ChannelActivityLogger(ctx.channel(), logging, getClass());
Channel channel = ctx.channel();
messageDispatcher = requireNonNull(messageDispatcher(channel));
log = new ChannelActivityLogger(channel, logging, getClass());
}

@Override
Expand Down Expand Up @@ -79,4 +85,12 @@ protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
input.stop();
}
}

@Override
public void handle(Set<String> patches) {
if (patches.contains(DATE_TIME_UTC_PATCH)) {
messageFormat.enableDateTimeUtc();
reader = messageFormat.newReader(input);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,29 @@
import io.netty.handler.codec.EncoderException;
import io.netty.handler.codec.MessageToMessageEncoder;
import java.util.List;
import java.util.Set;
import org.neo4j.driver.Logger;
import org.neo4j.driver.Logging;
import org.neo4j.driver.internal.async.connection.BoltProtocolUtil;
import org.neo4j.driver.internal.logging.ChannelActivityLogger;
import org.neo4j.driver.internal.messaging.BoltPatchesListener;
import org.neo4j.driver.internal.messaging.Message;
import org.neo4j.driver.internal.messaging.MessageFormat;

public class OutboundMessageHandler extends MessageToMessageEncoder<Message> {
public class OutboundMessageHandler extends MessageToMessageEncoder<Message> implements BoltPatchesListener {
public static final String NAME = OutboundMessageHandler.class.getSimpleName();
private final ChunkAwareByteBufOutput output;
private final MessageFormat.Writer writer;
private final MessageFormat messageFormat;
private final Logging logging;

private MessageFormat.Writer writer;
private Logger log;

public OutboundMessageHandler(MessageFormat messageFormat, Logging logging) {
this.output = new ChunkAwareByteBufOutput();
this.writer = messageFormat.newWriter(output);
this.messageFormat = messageFormat;
this.logging = logging;
this.writer = messageFormat.newWriter(output);
}

@Override
Expand Down Expand Up @@ -79,4 +83,12 @@ protected void encode(ChannelHandlerContext ctx, Message msg, List<Object> out)
BoltProtocolUtil.writeMessageBoundary(messageBuf);
out.add(messageBuf);
}

@Override
public void handle(Set<String> patches) {
if (patches.contains(DATE_TIME_UTC_PATCH)) {
messageFormat.enableDateTimeUtc();
writer = messageFormat.newWriter(output);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,24 @@
*/
package org.neo4j.driver.internal.handlers;

import static org.neo4j.driver.internal.async.connection.ChannelAttributes.boltPatchesListeners;
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.protocolVersion;
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setConnectionId;
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setConnectionReadTimeout;
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setServerAgent;
import static org.neo4j.driver.internal.util.MetadataExtractor.extractBoltPatches;
import static org.neo4j.driver.internal.util.MetadataExtractor.extractServer;

import io.netty.channel.Channel;
import io.netty.channel.ChannelPromise;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;
import org.neo4j.driver.Value;
import org.neo4j.driver.internal.messaging.BoltProtocolVersion;
import org.neo4j.driver.internal.messaging.v43.BoltProtocolV43;
import org.neo4j.driver.internal.messaging.v44.BoltProtocolV44;
import org.neo4j.driver.internal.spi.ResponseHandler;

public class HelloResponseHandler implements ResponseHandler {
Expand All @@ -55,6 +62,14 @@ public void onSuccess(Map<String, Value> metadata) {

processConfigurationHints(metadata);

BoltProtocolVersion protocolVersion = protocolVersion(channel);
if (BoltProtocolV44.VERSION.equals(protocolVersion) || BoltProtocolV43.VERSION.equals(protocolVersion)) {
Set<String> boltPatches = extractBoltPatches(metadata);
if (!boltPatches.isEmpty()) {
boltPatchesListeners(channel).forEach(listener -> listener.handle(boltPatches));
}
}

connectionInitializedPromise.setSuccess();
} catch (Throwable error) {
onFailure(error);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.driver.internal.messaging;

import java.util.Set;

public interface BoltPatchesListener {
String DATE_TIME_UTC_PATCH = "utc";

void handle(Set<String> patches);
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,11 @@ interface Reader {
Writer newWriter(PackOutput output);

Reader newReader(PackInput input);

/**
* Enables datetime in UTC if supported by the given message format. This is only for use with formats that support multiple modes.
* <p>
* This only takes effect on subsequent writer and reader creation via {@link #newWriter(PackOutput)} and {@link #newReader(PackInput)}.
*/
default void enableDateTimeUtc() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
public class CommonMessageReader implements MessageFormat.Reader {
private final ValueUnpacker unpacker;

public CommonMessageReader(PackInput input) {
this(new CommonValueUnpacker(input));
public CommonMessageReader(PackInput input, boolean dateTimeUtcEnabled) {
this(new CommonValueUnpacker(input, dateTimeUtcEnabled));
}

protected CommonMessageReader(ValueUnpacker unpacker) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static java.time.ZoneOffset.UTC;

import java.io.IOException;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
Expand Down Expand Up @@ -54,7 +55,9 @@ public class CommonValuePacker implements ValuePacker {
public static final int LOCAL_DATE_TIME_STRUCT_SIZE = 2;

public static final byte DATE_TIME_WITH_ZONE_OFFSET = 'F';
public static final byte DATE_TIME_WITH_ZONE_OFFSET_UTC = 'I';
public static final byte DATE_TIME_WITH_ZONE_ID = 'f';
public static final byte DATE_TIME_WITH_ZONE_ID_UTC = 'i';
public static final int DATE_TIME_STRUCT_SIZE = 3;

public static final byte DURATION = 'E';
Expand All @@ -66,9 +69,11 @@ public class CommonValuePacker implements ValuePacker {
public static final byte POINT_3D_STRUCT_TYPE = 'Y';
public static final int POINT_3D_STRUCT_SIZE = 4;

private final boolean dateTimeUtcEnabled;
protected final PackStream.Packer packer;

public CommonValuePacker(PackOutput output) {
public CommonValuePacker(PackOutput output, boolean dateTimeUtcEnabled) {
this.dateTimeUtcEnabled = dateTimeUtcEnabled;
this.packer = new PackStream.Packer(output);
}

Expand Down Expand Up @@ -119,7 +124,11 @@ protected void packInternalValue(InternalValue value) throws IOException {
packLocalDateTime(value.asLocalDateTime());
break;
case DATE_TIME:
packZonedDateTime(value.asZonedDateTime());
if (dateTimeUtcEnabled) {
packZonedDateTimeUsingUtcBaseline(value.asZonedDateTime());
} else {
packZonedDateTime(value.asZonedDateTime());
}
break;
case DURATION:
packDuration(value.asIsoDuration());
Expand Down Expand Up @@ -199,6 +208,29 @@ private void packLocalDateTime(LocalDateTime localDateTime) throws IOException {
packer.pack(nano);
}

private void packZonedDateTimeUsingUtcBaseline(ZonedDateTime zonedDateTime) throws IOException {
Instant instant = zonedDateTime.toInstant();
long epochSecondLocal = instant.getEpochSecond();
int nano = zonedDateTime.getNano();
ZoneId zone = zonedDateTime.getZone();

if (zone instanceof ZoneOffset) {
int offsetSeconds = ((ZoneOffset) zone).getTotalSeconds();

packer.packStructHeader(DATE_TIME_STRUCT_SIZE, DATE_TIME_WITH_ZONE_OFFSET_UTC);
packer.pack(epochSecondLocal);
packer.pack(nano);
packer.pack(offsetSeconds);
} else {
String zoneId = zone.getId();

packer.packStructHeader(DATE_TIME_STRUCT_SIZE, DATE_TIME_WITH_ZONE_ID_UTC);
packer.pack(epochSecondLocal);
packer.pack(nano);
packer.pack(zoneId);
}
}

private void packZonedDateTime(ZonedDateTime zonedDateTime) throws IOException {
long epochSecondLocal = zonedDateTime.toLocalDateTime().toEpochSecond(UTC);
int nano = zonedDateTime.getNano();
Expand Down
Loading

0 comments on commit e056f71

Please sign in to comment.