From c7fdccc32f7eeb18e09c1185416ca1fc31f9777a Mon Sep 17 00:00:00 2001 From: levy Date: Mon, 14 Sep 2020 17:28:18 +0800 Subject: [PATCH 1/9] feat(security): start negotiation --- idl/recompile_thrift.sh | 1 + idl/security.thrift | 61 ++ .../pegasus/apps/negotiation_request.java | 500 +++++++++ .../pegasus/apps/negotiation_response.java | 500 +++++++++ .../pegasus/apps/negotiation_status.java | 71 ++ .../xiaomi/infra/pegasus/apps/security.java | 977 ++++++++++++++++++ .../operator/negotiation_operator.java | 49 + .../pegasus/rpc/async/ClusterManager.java | 3 +- .../infra/pegasus/rpc/async/Negotiation.java | 68 +- .../pegasus/rpc/async/ReplicaSession.java | 19 +- .../pegasus/rpc/async/NegotiationTest.java | 20 + 11 files changed, 2265 insertions(+), 4 deletions(-) create mode 100644 idl/security.thrift create mode 100644 src/main/java/com/xiaomi/infra/pegasus/apps/negotiation_request.java create mode 100644 src/main/java/com/xiaomi/infra/pegasus/apps/negotiation_response.java create mode 100644 src/main/java/com/xiaomi/infra/pegasus/apps/negotiation_status.java create mode 100644 src/main/java/com/xiaomi/infra/pegasus/apps/security.java create mode 100644 src/main/java/com/xiaomi/infra/pegasus/operator/negotiation_operator.java create mode 100644 src/test/java/com/xiaomi/infra/pegasus/rpc/async/NegotiationTest.java diff --git a/idl/recompile_thrift.sh b/idl/recompile_thrift.sh index d6bfaa0f..b3f8b367 100755 --- a/idl/recompile_thrift.sh +++ b/idl/recompile_thrift.sh @@ -24,6 +24,7 @@ rm -rf $TMP_DIR mkdir -p $TMP_DIR $thrift --gen java rrdb.thrift $thrift --gen java replication.thrift +$thrift --gen java security.thrift for gen_file in `find $TMP_DIR -name "*.java"`; do cat apache-licence-template $gen_file > $gen_file.tmp diff --git a/idl/security.thrift b/idl/security.thrift new file mode 100644 index 00000000..e94e5e54 --- /dev/null +++ b/idl/security.thrift @@ -0,0 +1,61 @@ +include "base.thrift" + +namespace cpp dsn.apps +namespace java com.xiaomi.infra.pegasus.apps +namespace py pypegasus.rrdb + +// negotiation process: +// +// client server +// | --- SASL_LIST_MECHANISMS --> | +// | <-- SASL_LIST_MECHANISMS_RESP --- | +// | -- SASL_SELECT_MECHANISMS --> | +// | <-- SASL_SELECT_MECHANISMS_RESP --- | +// | | +// | --- SASL_INITIATE --> | +// | | +// | <-- SASL_CHALLENGE --- | +// | --- SASL_CHALLENGE_RESP --> | +// | | +// | ..... | +// | | +// | <-- SASL_CHALLENGE --- | +// | --- SASL_CHALLENGE_RESP --> | +// | | (authentication will succeed +// | | if all challenges passed) +// | <-- SASL_SUCC --- | +// (client won't response | | +// if servers says ok) | | +// | --- RPC_CALL ---> | +// | <-- RPC_RESP ---- | + +enum negotiation_status { + INVALID + SASL_LIST_MECHANISMS + SASL_LIST_MECHANISMS_RESP + SASL_SELECT_MECHANISMS + SASL_SELECT_MECHANISMS_RESP + SASL_INITIATE + SASL_CHALLENGE + SASL_CHALLENGE_RESP + SASL_SUCC + SASL_AUTH_DISABLE + SASL_AUTH_FAIL +} + +struct negotiation_request +{ + 1: negotiation_status status; + 2: base.blob msg; +} + +struct negotiation_response +{ + 1: negotiation_status status; + 2: base.blob msg; +} + +service security +{ + negotiation_response negotiate(1:negotiation_request request); +} diff --git a/src/main/java/com/xiaomi/infra/pegasus/apps/negotiation_request.java b/src/main/java/com/xiaomi/infra/pegasus/apps/negotiation_request.java new file mode 100644 index 00000000..6246c2e1 --- /dev/null +++ b/src/main/java/com/xiaomi/infra/pegasus/apps/negotiation_request.java @@ -0,0 +1,500 @@ +// Copyright (c) 2017, Xiaomi, Inc. All rights reserved. +// This source code is licensed under the Apache License Version 2.0, which +// can be found in the LICENSE file in the root directory of this source tree. +/** + * Autogenerated by Thrift Compiler (0.11.0) + * + * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING + * @generated + */ +package com.xiaomi.infra.pegasus.apps; + +@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"}) +@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.11.0)", date = "2020-09-14") +public class negotiation_request implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { + private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("negotiation_request"); + + private static final org.apache.thrift.protocol.TField STATUS_FIELD_DESC = new org.apache.thrift.protocol.TField("status", org.apache.thrift.protocol.TType.I32, (short)1); + private static final org.apache.thrift.protocol.TField MSG_FIELD_DESC = new org.apache.thrift.protocol.TField("msg", org.apache.thrift.protocol.TType.STRUCT, (short)2); + + private static final org.apache.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new negotiation_requestStandardSchemeFactory(); + private static final org.apache.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new negotiation_requestTupleSchemeFactory(); + + /** + * + * @see negotiation_status + */ + public negotiation_status status; // required + public com.xiaomi.infra.pegasus.base.blob msg; // required + + /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ + public enum _Fields implements org.apache.thrift.TFieldIdEnum { + /** + * + * @see negotiation_status + */ + STATUS((short)1, "status"), + MSG((short)2, "msg"); + + private static final java.util.Map byName = new java.util.HashMap(); + + static { + for (_Fields field : java.util.EnumSet.allOf(_Fields.class)) { + byName.put(field.getFieldName(), field); + } + } + + /** + * Find the _Fields constant that matches fieldId, or null if its not found. + */ + public static _Fields findByThriftId(int fieldId) { + switch(fieldId) { + case 1: // STATUS + return STATUS; + case 2: // MSG + return MSG; + default: + return null; + } + } + + /** + * Find the _Fields constant that matches fieldId, throwing an exception + * if it is not found. + */ + public static _Fields findByThriftIdOrThrow(int fieldId) { + _Fields fields = findByThriftId(fieldId); + if (fields == null) throw new java.lang.IllegalArgumentException("Field " + fieldId + " doesn't exist!"); + return fields; + } + + /** + * Find the _Fields constant that matches name, or null if its not found. + */ + public static _Fields findByName(java.lang.String name) { + return byName.get(name); + } + + private final short _thriftId; + private final java.lang.String _fieldName; + + _Fields(short thriftId, java.lang.String fieldName) { + _thriftId = thriftId; + _fieldName = fieldName; + } + + public short getThriftFieldId() { + return _thriftId; + } + + public java.lang.String getFieldName() { + return _fieldName; + } + } + + // isset id assignments + public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; + static { + java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new java.util.EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); + tmpMap.put(_Fields.STATUS, new org.apache.thrift.meta_data.FieldMetaData("status", org.apache.thrift.TFieldRequirementType.DEFAULT, + new org.apache.thrift.meta_data.EnumMetaData(org.apache.thrift.protocol.TType.ENUM, negotiation_status.class))); + tmpMap.put(_Fields.MSG, new org.apache.thrift.meta_data.FieldMetaData("msg", org.apache.thrift.TFieldRequirementType.DEFAULT, + new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, com.xiaomi.infra.pegasus.base.blob.class))); + metaDataMap = java.util.Collections.unmodifiableMap(tmpMap); + org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(negotiation_request.class, metaDataMap); + } + + public negotiation_request() { + } + + public negotiation_request( + negotiation_status status, + com.xiaomi.infra.pegasus.base.blob msg) + { + this(); + this.status = status; + this.msg = msg; + } + + /** + * Performs a deep copy on other. + */ + public negotiation_request(negotiation_request other) { + if (other.isSetStatus()) { + this.status = other.status; + } + if (other.isSetMsg()) { + this.msg = new com.xiaomi.infra.pegasus.base.blob(other.msg); + } + } + + public negotiation_request deepCopy() { + return new negotiation_request(this); + } + + @Override + public void clear() { + this.status = null; + this.msg = null; + } + + /** + * + * @see negotiation_status + */ + public negotiation_status getStatus() { + return this.status; + } + + /** + * + * @see negotiation_status + */ + public negotiation_request setStatus(negotiation_status status) { + this.status = status; + return this; + } + + public void unsetStatus() { + this.status = null; + } + + /** Returns true if field status is set (has been assigned a value) and false otherwise */ + public boolean isSetStatus() { + return this.status != null; + } + + public void setStatusIsSet(boolean value) { + if (!value) { + this.status = null; + } + } + + public com.xiaomi.infra.pegasus.base.blob getMsg() { + return this.msg; + } + + public negotiation_request setMsg(com.xiaomi.infra.pegasus.base.blob msg) { + this.msg = msg; + return this; + } + + public void unsetMsg() { + this.msg = null; + } + + /** Returns true if field msg is set (has been assigned a value) and false otherwise */ + public boolean isSetMsg() { + return this.msg != null; + } + + public void setMsgIsSet(boolean value) { + if (!value) { + this.msg = null; + } + } + + public void setFieldValue(_Fields field, java.lang.Object value) { + switch (field) { + case STATUS: + if (value == null) { + unsetStatus(); + } else { + setStatus((negotiation_status)value); + } + break; + + case MSG: + if (value == null) { + unsetMsg(); + } else { + setMsg((com.xiaomi.infra.pegasus.base.blob)value); + } + break; + + } + } + + public java.lang.Object getFieldValue(_Fields field) { + switch (field) { + case STATUS: + return getStatus(); + + case MSG: + return getMsg(); + + } + throw new java.lang.IllegalStateException(); + } + + /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */ + public boolean isSet(_Fields field) { + if (field == null) { + throw new java.lang.IllegalArgumentException(); + } + + switch (field) { + case STATUS: + return isSetStatus(); + case MSG: + return isSetMsg(); + } + throw new java.lang.IllegalStateException(); + } + + @Override + public boolean equals(java.lang.Object that) { + if (that == null) + return false; + if (that instanceof negotiation_request) + return this.equals((negotiation_request)that); + return false; + } + + public boolean equals(negotiation_request that) { + if (that == null) + return false; + if (this == that) + return true; + + boolean this_present_status = true && this.isSetStatus(); + boolean that_present_status = true && that.isSetStatus(); + if (this_present_status || that_present_status) { + if (!(this_present_status && that_present_status)) + return false; + if (!this.status.equals(that.status)) + return false; + } + + boolean this_present_msg = true && this.isSetMsg(); + boolean that_present_msg = true && that.isSetMsg(); + if (this_present_msg || that_present_msg) { + if (!(this_present_msg && that_present_msg)) + return false; + if (!this.msg.equals(that.msg)) + return false; + } + + return true; + } + + @Override + public int hashCode() { + int hashCode = 1; + + hashCode = hashCode * 8191 + ((isSetStatus()) ? 131071 : 524287); + if (isSetStatus()) + hashCode = hashCode * 8191 + status.getValue(); + + hashCode = hashCode * 8191 + ((isSetMsg()) ? 131071 : 524287); + if (isSetMsg()) + hashCode = hashCode * 8191 + msg.hashCode(); + + return hashCode; + } + + @Override + public int compareTo(negotiation_request other) { + if (!getClass().equals(other.getClass())) { + return getClass().getName().compareTo(other.getClass().getName()); + } + + int lastComparison = 0; + + lastComparison = java.lang.Boolean.valueOf(isSetStatus()).compareTo(other.isSetStatus()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetStatus()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.status, other.status); + if (lastComparison != 0) { + return lastComparison; + } + } + lastComparison = java.lang.Boolean.valueOf(isSetMsg()).compareTo(other.isSetMsg()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetMsg()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.msg, other.msg); + if (lastComparison != 0) { + return lastComparison; + } + } + return 0; + } + + public _Fields fieldForId(int fieldId) { + return _Fields.findByThriftId(fieldId); + } + + public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException { + scheme(iprot).read(iprot, this); + } + + public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException { + scheme(oprot).write(oprot, this); + } + + @Override + public java.lang.String toString() { + java.lang.StringBuilder sb = new java.lang.StringBuilder("negotiation_request("); + boolean first = true; + + sb.append("status:"); + if (this.status == null) { + sb.append("null"); + } else { + sb.append(this.status); + } + first = false; + if (!first) sb.append(", "); + sb.append("msg:"); + if (this.msg == null) { + sb.append("null"); + } else { + sb.append(this.msg); + } + first = false; + sb.append(")"); + return sb.toString(); + } + + public void validate() throws org.apache.thrift.TException { + // check for required fields + // check for sub-struct validity + if (msg != null) { + msg.validate(); + } + } + + private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException { + try { + write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, java.lang.ClassNotFoundException { + try { + read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private static class negotiation_requestStandardSchemeFactory implements org.apache.thrift.scheme.SchemeFactory { + public negotiation_requestStandardScheme getScheme() { + return new negotiation_requestStandardScheme(); + } + } + + private static class negotiation_requestStandardScheme extends org.apache.thrift.scheme.StandardScheme { + + public void read(org.apache.thrift.protocol.TProtocol iprot, negotiation_request struct) throws org.apache.thrift.TException { + org.apache.thrift.protocol.TField schemeField; + iprot.readStructBegin(); + while (true) + { + schemeField = iprot.readFieldBegin(); + if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { + break; + } + switch (schemeField.id) { + case 1: // STATUS + if (schemeField.type == org.apache.thrift.protocol.TType.I32) { + struct.status = com.xiaomi.infra.pegasus.apps.negotiation_status.findByValue(iprot.readI32()); + struct.setStatusIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + case 2: // MSG + if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) { + struct.msg = new com.xiaomi.infra.pegasus.base.blob(); + struct.msg.read(iprot); + struct.setMsgIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + default: + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + iprot.readFieldEnd(); + } + iprot.readStructEnd(); + + // check for required fields of primitive type, which can't be checked in the validate method + struct.validate(); + } + + public void write(org.apache.thrift.protocol.TProtocol oprot, negotiation_request struct) throws org.apache.thrift.TException { + struct.validate(); + + oprot.writeStructBegin(STRUCT_DESC); + if (struct.status != null) { + oprot.writeFieldBegin(STATUS_FIELD_DESC); + oprot.writeI32(struct.status.getValue()); + oprot.writeFieldEnd(); + } + if (struct.msg != null) { + oprot.writeFieldBegin(MSG_FIELD_DESC); + struct.msg.write(oprot); + oprot.writeFieldEnd(); + } + oprot.writeFieldStop(); + oprot.writeStructEnd(); + } + + } + + private static class negotiation_requestTupleSchemeFactory implements org.apache.thrift.scheme.SchemeFactory { + public negotiation_requestTupleScheme getScheme() { + return new negotiation_requestTupleScheme(); + } + } + + private static class negotiation_requestTupleScheme extends org.apache.thrift.scheme.TupleScheme { + + @Override + public void write(org.apache.thrift.protocol.TProtocol prot, negotiation_request struct) throws org.apache.thrift.TException { + org.apache.thrift.protocol.TTupleProtocol oprot = (org.apache.thrift.protocol.TTupleProtocol) prot; + java.util.BitSet optionals = new java.util.BitSet(); + if (struct.isSetStatus()) { + optionals.set(0); + } + if (struct.isSetMsg()) { + optionals.set(1); + } + oprot.writeBitSet(optionals, 2); + if (struct.isSetStatus()) { + oprot.writeI32(struct.status.getValue()); + } + if (struct.isSetMsg()) { + struct.msg.write(oprot); + } + } + + @Override + public void read(org.apache.thrift.protocol.TProtocol prot, negotiation_request struct) throws org.apache.thrift.TException { + org.apache.thrift.protocol.TTupleProtocol iprot = (org.apache.thrift.protocol.TTupleProtocol) prot; + java.util.BitSet incoming = iprot.readBitSet(2); + if (incoming.get(0)) { + struct.status = com.xiaomi.infra.pegasus.apps.negotiation_status.findByValue(iprot.readI32()); + struct.setStatusIsSet(true); + } + if (incoming.get(1)) { + struct.msg = new com.xiaomi.infra.pegasus.base.blob(); + struct.msg.read(iprot); + struct.setMsgIsSet(true); + } + } + } + + private static S scheme(org.apache.thrift.protocol.TProtocol proto) { + return (org.apache.thrift.scheme.StandardScheme.class.equals(proto.getScheme()) ? STANDARD_SCHEME_FACTORY : TUPLE_SCHEME_FACTORY).getScheme(); + } +} + diff --git a/src/main/java/com/xiaomi/infra/pegasus/apps/negotiation_response.java b/src/main/java/com/xiaomi/infra/pegasus/apps/negotiation_response.java new file mode 100644 index 00000000..8de65734 --- /dev/null +++ b/src/main/java/com/xiaomi/infra/pegasus/apps/negotiation_response.java @@ -0,0 +1,500 @@ +// Copyright (c) 2017, Xiaomi, Inc. All rights reserved. +// This source code is licensed under the Apache License Version 2.0, which +// can be found in the LICENSE file in the root directory of this source tree. +/** + * Autogenerated by Thrift Compiler (0.11.0) + * + * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING + * @generated + */ +package com.xiaomi.infra.pegasus.apps; + +@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"}) +@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.11.0)", date = "2020-09-14") +public class negotiation_response implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { + private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("negotiation_response"); + + private static final org.apache.thrift.protocol.TField STATUS_FIELD_DESC = new org.apache.thrift.protocol.TField("status", org.apache.thrift.protocol.TType.I32, (short)1); + private static final org.apache.thrift.protocol.TField MSG_FIELD_DESC = new org.apache.thrift.protocol.TField("msg", org.apache.thrift.protocol.TType.STRUCT, (short)2); + + private static final org.apache.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new negotiation_responseStandardSchemeFactory(); + private static final org.apache.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new negotiation_responseTupleSchemeFactory(); + + /** + * + * @see negotiation_status + */ + public negotiation_status status; // required + public com.xiaomi.infra.pegasus.base.blob msg; // required + + /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ + public enum _Fields implements org.apache.thrift.TFieldIdEnum { + /** + * + * @see negotiation_status + */ + STATUS((short)1, "status"), + MSG((short)2, "msg"); + + private static final java.util.Map byName = new java.util.HashMap(); + + static { + for (_Fields field : java.util.EnumSet.allOf(_Fields.class)) { + byName.put(field.getFieldName(), field); + } + } + + /** + * Find the _Fields constant that matches fieldId, or null if its not found. + */ + public static _Fields findByThriftId(int fieldId) { + switch(fieldId) { + case 1: // STATUS + return STATUS; + case 2: // MSG + return MSG; + default: + return null; + } + } + + /** + * Find the _Fields constant that matches fieldId, throwing an exception + * if it is not found. + */ + public static _Fields findByThriftIdOrThrow(int fieldId) { + _Fields fields = findByThriftId(fieldId); + if (fields == null) throw new java.lang.IllegalArgumentException("Field " + fieldId + " doesn't exist!"); + return fields; + } + + /** + * Find the _Fields constant that matches name, or null if its not found. + */ + public static _Fields findByName(java.lang.String name) { + return byName.get(name); + } + + private final short _thriftId; + private final java.lang.String _fieldName; + + _Fields(short thriftId, java.lang.String fieldName) { + _thriftId = thriftId; + _fieldName = fieldName; + } + + public short getThriftFieldId() { + return _thriftId; + } + + public java.lang.String getFieldName() { + return _fieldName; + } + } + + // isset id assignments + public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; + static { + java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new java.util.EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); + tmpMap.put(_Fields.STATUS, new org.apache.thrift.meta_data.FieldMetaData("status", org.apache.thrift.TFieldRequirementType.DEFAULT, + new org.apache.thrift.meta_data.EnumMetaData(org.apache.thrift.protocol.TType.ENUM, negotiation_status.class))); + tmpMap.put(_Fields.MSG, new org.apache.thrift.meta_data.FieldMetaData("msg", org.apache.thrift.TFieldRequirementType.DEFAULT, + new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, com.xiaomi.infra.pegasus.base.blob.class))); + metaDataMap = java.util.Collections.unmodifiableMap(tmpMap); + org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(negotiation_response.class, metaDataMap); + } + + public negotiation_response() { + } + + public negotiation_response( + negotiation_status status, + com.xiaomi.infra.pegasus.base.blob msg) + { + this(); + this.status = status; + this.msg = msg; + } + + /** + * Performs a deep copy on other. + */ + public negotiation_response(negotiation_response other) { + if (other.isSetStatus()) { + this.status = other.status; + } + if (other.isSetMsg()) { + this.msg = new com.xiaomi.infra.pegasus.base.blob(other.msg); + } + } + + public negotiation_response deepCopy() { + return new negotiation_response(this); + } + + @Override + public void clear() { + this.status = null; + this.msg = null; + } + + /** + * + * @see negotiation_status + */ + public negotiation_status getStatus() { + return this.status; + } + + /** + * + * @see negotiation_status + */ + public negotiation_response setStatus(negotiation_status status) { + this.status = status; + return this; + } + + public void unsetStatus() { + this.status = null; + } + + /** Returns true if field status is set (has been assigned a value) and false otherwise */ + public boolean isSetStatus() { + return this.status != null; + } + + public void setStatusIsSet(boolean value) { + if (!value) { + this.status = null; + } + } + + public com.xiaomi.infra.pegasus.base.blob getMsg() { + return this.msg; + } + + public negotiation_response setMsg(com.xiaomi.infra.pegasus.base.blob msg) { + this.msg = msg; + return this; + } + + public void unsetMsg() { + this.msg = null; + } + + /** Returns true if field msg is set (has been assigned a value) and false otherwise */ + public boolean isSetMsg() { + return this.msg != null; + } + + public void setMsgIsSet(boolean value) { + if (!value) { + this.msg = null; + } + } + + public void setFieldValue(_Fields field, java.lang.Object value) { + switch (field) { + case STATUS: + if (value == null) { + unsetStatus(); + } else { + setStatus((negotiation_status)value); + } + break; + + case MSG: + if (value == null) { + unsetMsg(); + } else { + setMsg((com.xiaomi.infra.pegasus.base.blob)value); + } + break; + + } + } + + public java.lang.Object getFieldValue(_Fields field) { + switch (field) { + case STATUS: + return getStatus(); + + case MSG: + return getMsg(); + + } + throw new java.lang.IllegalStateException(); + } + + /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */ + public boolean isSet(_Fields field) { + if (field == null) { + throw new java.lang.IllegalArgumentException(); + } + + switch (field) { + case STATUS: + return isSetStatus(); + case MSG: + return isSetMsg(); + } + throw new java.lang.IllegalStateException(); + } + + @Override + public boolean equals(java.lang.Object that) { + if (that == null) + return false; + if (that instanceof negotiation_response) + return this.equals((negotiation_response)that); + return false; + } + + public boolean equals(negotiation_response that) { + if (that == null) + return false; + if (this == that) + return true; + + boolean this_present_status = true && this.isSetStatus(); + boolean that_present_status = true && that.isSetStatus(); + if (this_present_status || that_present_status) { + if (!(this_present_status && that_present_status)) + return false; + if (!this.status.equals(that.status)) + return false; + } + + boolean this_present_msg = true && this.isSetMsg(); + boolean that_present_msg = true && that.isSetMsg(); + if (this_present_msg || that_present_msg) { + if (!(this_present_msg && that_present_msg)) + return false; + if (!this.msg.equals(that.msg)) + return false; + } + + return true; + } + + @Override + public int hashCode() { + int hashCode = 1; + + hashCode = hashCode * 8191 + ((isSetStatus()) ? 131071 : 524287); + if (isSetStatus()) + hashCode = hashCode * 8191 + status.getValue(); + + hashCode = hashCode * 8191 + ((isSetMsg()) ? 131071 : 524287); + if (isSetMsg()) + hashCode = hashCode * 8191 + msg.hashCode(); + + return hashCode; + } + + @Override + public int compareTo(negotiation_response other) { + if (!getClass().equals(other.getClass())) { + return getClass().getName().compareTo(other.getClass().getName()); + } + + int lastComparison = 0; + + lastComparison = java.lang.Boolean.valueOf(isSetStatus()).compareTo(other.isSetStatus()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetStatus()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.status, other.status); + if (lastComparison != 0) { + return lastComparison; + } + } + lastComparison = java.lang.Boolean.valueOf(isSetMsg()).compareTo(other.isSetMsg()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetMsg()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.msg, other.msg); + if (lastComparison != 0) { + return lastComparison; + } + } + return 0; + } + + public _Fields fieldForId(int fieldId) { + return _Fields.findByThriftId(fieldId); + } + + public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException { + scheme(iprot).read(iprot, this); + } + + public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException { + scheme(oprot).write(oprot, this); + } + + @Override + public java.lang.String toString() { + java.lang.StringBuilder sb = new java.lang.StringBuilder("negotiation_response("); + boolean first = true; + + sb.append("status:"); + if (this.status == null) { + sb.append("null"); + } else { + sb.append(this.status); + } + first = false; + if (!first) sb.append(", "); + sb.append("msg:"); + if (this.msg == null) { + sb.append("null"); + } else { + sb.append(this.msg); + } + first = false; + sb.append(")"); + return sb.toString(); + } + + public void validate() throws org.apache.thrift.TException { + // check for required fields + // check for sub-struct validity + if (msg != null) { + msg.validate(); + } + } + + private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException { + try { + write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, java.lang.ClassNotFoundException { + try { + read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private static class negotiation_responseStandardSchemeFactory implements org.apache.thrift.scheme.SchemeFactory { + public negotiation_responseStandardScheme getScheme() { + return new negotiation_responseStandardScheme(); + } + } + + private static class negotiation_responseStandardScheme extends org.apache.thrift.scheme.StandardScheme { + + public void read(org.apache.thrift.protocol.TProtocol iprot, negotiation_response struct) throws org.apache.thrift.TException { + org.apache.thrift.protocol.TField schemeField; + iprot.readStructBegin(); + while (true) + { + schemeField = iprot.readFieldBegin(); + if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { + break; + } + switch (schemeField.id) { + case 1: // STATUS + if (schemeField.type == org.apache.thrift.protocol.TType.I32) { + struct.status = com.xiaomi.infra.pegasus.apps.negotiation_status.findByValue(iprot.readI32()); + struct.setStatusIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + case 2: // MSG + if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) { + struct.msg = new com.xiaomi.infra.pegasus.base.blob(); + struct.msg.read(iprot); + struct.setMsgIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + default: + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + iprot.readFieldEnd(); + } + iprot.readStructEnd(); + + // check for required fields of primitive type, which can't be checked in the validate method + struct.validate(); + } + + public void write(org.apache.thrift.protocol.TProtocol oprot, negotiation_response struct) throws org.apache.thrift.TException { + struct.validate(); + + oprot.writeStructBegin(STRUCT_DESC); + if (struct.status != null) { + oprot.writeFieldBegin(STATUS_FIELD_DESC); + oprot.writeI32(struct.status.getValue()); + oprot.writeFieldEnd(); + } + if (struct.msg != null) { + oprot.writeFieldBegin(MSG_FIELD_DESC); + struct.msg.write(oprot); + oprot.writeFieldEnd(); + } + oprot.writeFieldStop(); + oprot.writeStructEnd(); + } + + } + + private static class negotiation_responseTupleSchemeFactory implements org.apache.thrift.scheme.SchemeFactory { + public negotiation_responseTupleScheme getScheme() { + return new negotiation_responseTupleScheme(); + } + } + + private static class negotiation_responseTupleScheme extends org.apache.thrift.scheme.TupleScheme { + + @Override + public void write(org.apache.thrift.protocol.TProtocol prot, negotiation_response struct) throws org.apache.thrift.TException { + org.apache.thrift.protocol.TTupleProtocol oprot = (org.apache.thrift.protocol.TTupleProtocol) prot; + java.util.BitSet optionals = new java.util.BitSet(); + if (struct.isSetStatus()) { + optionals.set(0); + } + if (struct.isSetMsg()) { + optionals.set(1); + } + oprot.writeBitSet(optionals, 2); + if (struct.isSetStatus()) { + oprot.writeI32(struct.status.getValue()); + } + if (struct.isSetMsg()) { + struct.msg.write(oprot); + } + } + + @Override + public void read(org.apache.thrift.protocol.TProtocol prot, negotiation_response struct) throws org.apache.thrift.TException { + org.apache.thrift.protocol.TTupleProtocol iprot = (org.apache.thrift.protocol.TTupleProtocol) prot; + java.util.BitSet incoming = iprot.readBitSet(2); + if (incoming.get(0)) { + struct.status = com.xiaomi.infra.pegasus.apps.negotiation_status.findByValue(iprot.readI32()); + struct.setStatusIsSet(true); + } + if (incoming.get(1)) { + struct.msg = new com.xiaomi.infra.pegasus.base.blob(); + struct.msg.read(iprot); + struct.setMsgIsSet(true); + } + } + } + + private static S scheme(org.apache.thrift.protocol.TProtocol proto) { + return (org.apache.thrift.scheme.StandardScheme.class.equals(proto.getScheme()) ? STANDARD_SCHEME_FACTORY : TUPLE_SCHEME_FACTORY).getScheme(); + } +} + diff --git a/src/main/java/com/xiaomi/infra/pegasus/apps/negotiation_status.java b/src/main/java/com/xiaomi/infra/pegasus/apps/negotiation_status.java new file mode 100644 index 00000000..da082975 --- /dev/null +++ b/src/main/java/com/xiaomi/infra/pegasus/apps/negotiation_status.java @@ -0,0 +1,71 @@ +// Copyright (c) 2017, Xiaomi, Inc. All rights reserved. +// This source code is licensed under the Apache License Version 2.0, which +// can be found in the LICENSE file in the root directory of this source tree. +/** + * Autogenerated by Thrift Compiler (0.11.0) + * + * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING + * @generated + */ +package com.xiaomi.infra.pegasus.apps; + + +public enum negotiation_status implements org.apache.thrift.TEnum { + INVALID(0), + SASL_LIST_MECHANISMS(1), + SASL_LIST_MECHANISMS_RESP(2), + SASL_SELECT_MECHANISMS(3), + SASL_SELECT_MECHANISMS_RESP(4), + SASL_INITIATE(5), + SASL_CHALLENGE(6), + SASL_CHALLENGE_RESP(7), + SASL_SUCC(8), + SASL_AUTH_DISABLE(9), + SASL_AUTH_FAIL(10); + + private final int value; + + private negotiation_status(int value) { + this.value = value; + } + + /** + * Get the integer value of this enum value, as defined in the Thrift IDL. + */ + public int getValue() { + return value; + } + + /** + * Find a the enum type by its integer value, as defined in the Thrift IDL. + * @return null if the value is not found. + */ + public static negotiation_status findByValue(int value) { + switch (value) { + case 0: + return INVALID; + case 1: + return SASL_LIST_MECHANISMS; + case 2: + return SASL_LIST_MECHANISMS_RESP; + case 3: + return SASL_SELECT_MECHANISMS; + case 4: + return SASL_SELECT_MECHANISMS_RESP; + case 5: + return SASL_INITIATE; + case 6: + return SASL_CHALLENGE; + case 7: + return SASL_CHALLENGE_RESP; + case 8: + return SASL_SUCC; + case 9: + return SASL_AUTH_DISABLE; + case 10: + return SASL_AUTH_FAIL; + default: + return null; + } + } +} diff --git a/src/main/java/com/xiaomi/infra/pegasus/apps/security.java b/src/main/java/com/xiaomi/infra/pegasus/apps/security.java new file mode 100644 index 00000000..e41ec283 --- /dev/null +++ b/src/main/java/com/xiaomi/infra/pegasus/apps/security.java @@ -0,0 +1,977 @@ +// Copyright (c) 2017, Xiaomi, Inc. All rights reserved. +// This source code is licensed under the Apache License Version 2.0, which +// can be found in the LICENSE file in the root directory of this source tree. +/** + * Autogenerated by Thrift Compiler (0.11.0) + * + * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING + * @generated + */ +package com.xiaomi.infra.pegasus.apps; + +@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"}) +@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.11.0)", date = "2020-09-14") +public class security { + + public interface Iface { + + public negotiation_response negotiate(negotiation_request request) throws org.apache.thrift.TException; + + } + + public interface AsyncIface { + + public void negotiate(negotiation_request request, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException; + + } + + public static class Client extends org.apache.thrift.TServiceClient implements Iface { + public static class Factory implements org.apache.thrift.TServiceClientFactory { + public Factory() {} + public Client getClient(org.apache.thrift.protocol.TProtocol prot) { + return new Client(prot); + } + public Client getClient(org.apache.thrift.protocol.TProtocol iprot, org.apache.thrift.protocol.TProtocol oprot) { + return new Client(iprot, oprot); + } + } + + public Client(org.apache.thrift.protocol.TProtocol prot) + { + super(prot, prot); + } + + public Client(org.apache.thrift.protocol.TProtocol iprot, org.apache.thrift.protocol.TProtocol oprot) { + super(iprot, oprot); + } + + public negotiation_response negotiate(negotiation_request request) throws org.apache.thrift.TException + { + send_negotiate(request); + return recv_negotiate(); + } + + public void send_negotiate(negotiation_request request) throws org.apache.thrift.TException + { + negotiate_args args = new negotiate_args(); + args.setRequest(request); + sendBase("negotiate", args); + } + + public negotiation_response recv_negotiate() throws org.apache.thrift.TException + { + negotiate_result result = new negotiate_result(); + receiveBase(result, "negotiate"); + if (result.isSetSuccess()) { + return result.success; + } + throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "negotiate failed: unknown result"); + } + + } + public static class AsyncClient extends org.apache.thrift.async.TAsyncClient implements AsyncIface { + public static class Factory implements org.apache.thrift.async.TAsyncClientFactory { + private org.apache.thrift.async.TAsyncClientManager clientManager; + private org.apache.thrift.protocol.TProtocolFactory protocolFactory; + public Factory(org.apache.thrift.async.TAsyncClientManager clientManager, org.apache.thrift.protocol.TProtocolFactory protocolFactory) { + this.clientManager = clientManager; + this.protocolFactory = protocolFactory; + } + public AsyncClient getAsyncClient(org.apache.thrift.transport.TNonblockingTransport transport) { + return new AsyncClient(protocolFactory, clientManager, transport); + } + } + + public AsyncClient(org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.async.TAsyncClientManager clientManager, org.apache.thrift.transport.TNonblockingTransport transport) { + super(protocolFactory, clientManager, transport); + } + + public void negotiate(negotiation_request request, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException { + checkReady(); + negotiate_call method_call = new negotiate_call(request, resultHandler, this, ___protocolFactory, ___transport); + this.___currentMethod = method_call; + ___manager.call(method_call); + } + + public static class negotiate_call extends org.apache.thrift.async.TAsyncMethodCall { + private negotiation_request request; + public negotiate_call(negotiation_request request, org.apache.thrift.async.AsyncMethodCallback resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException { + super(client, protocolFactory, transport, resultHandler, false); + this.request = request; + } + + public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException { + prot.writeMessageBegin(new org.apache.thrift.protocol.TMessage("negotiate", org.apache.thrift.protocol.TMessageType.CALL, 0)); + negotiate_args args = new negotiate_args(); + args.setRequest(request); + args.write(prot); + prot.writeMessageEnd(); + } + + public negotiation_response getResult() throws org.apache.thrift.TException { + if (getState() != org.apache.thrift.async.TAsyncMethodCall.State.RESPONSE_READ) { + throw new java.lang.IllegalStateException("Method call not finished!"); + } + org.apache.thrift.transport.TMemoryInputTransport memoryTransport = new org.apache.thrift.transport.TMemoryInputTransport(getFrameBuffer().array()); + org.apache.thrift.protocol.TProtocol prot = client.getProtocolFactory().getProtocol(memoryTransport); + return (new Client(prot)).recv_negotiate(); + } + } + + } + + public static class Processor extends org.apache.thrift.TBaseProcessor implements org.apache.thrift.TProcessor { + private static final org.slf4j.Logger _LOGGER = org.slf4j.LoggerFactory.getLogger(Processor.class.getName()); + public Processor(I iface) { + super(iface, getProcessMap(new java.util.HashMap>())); + } + + protected Processor(I iface, java.util.Map> processMap) { + super(iface, getProcessMap(processMap)); + } + + private static java.util.Map> getProcessMap(java.util.Map> processMap) { + processMap.put("negotiate", new negotiate()); + return processMap; + } + + public static class negotiate extends org.apache.thrift.ProcessFunction { + public negotiate() { + super("negotiate"); + } + + public negotiate_args getEmptyArgsInstance() { + return new negotiate_args(); + } + + protected boolean isOneway() { + return false; + } + + @Override + protected boolean handleRuntimeExceptions() { + return false; + } + + public negotiate_result getResult(I iface, negotiate_args args) throws org.apache.thrift.TException { + negotiate_result result = new negotiate_result(); + result.success = iface.negotiate(args.request); + return result; + } + } + + } + + public static class AsyncProcessor extends org.apache.thrift.TBaseAsyncProcessor { + private static final org.slf4j.Logger _LOGGER = org.slf4j.LoggerFactory.getLogger(AsyncProcessor.class.getName()); + public AsyncProcessor(I iface) { + super(iface, getProcessMap(new java.util.HashMap>())); + } + + protected AsyncProcessor(I iface, java.util.Map> processMap) { + super(iface, getProcessMap(processMap)); + } + + private static java.util.Map> getProcessMap(java.util.Map> processMap) { + processMap.put("negotiate", new negotiate()); + return processMap; + } + + public static class negotiate extends org.apache.thrift.AsyncProcessFunction { + public negotiate() { + super("negotiate"); + } + + public negotiate_args getEmptyArgsInstance() { + return new negotiate_args(); + } + + public org.apache.thrift.async.AsyncMethodCallback getResultHandler(final org.apache.thrift.server.AbstractNonblockingServer.AsyncFrameBuffer fb, final int seqid) { + final org.apache.thrift.AsyncProcessFunction fcall = this; + return new org.apache.thrift.async.AsyncMethodCallback() { + public void onComplete(negotiation_response o) { + negotiate_result result = new negotiate_result(); + result.success = o; + try { + fcall.sendResponse(fb, result, org.apache.thrift.protocol.TMessageType.REPLY,seqid); + } catch (org.apache.thrift.transport.TTransportException e) { + _LOGGER.error("TTransportException writing to internal frame buffer", e); + fb.close(); + } catch (java.lang.Exception e) { + _LOGGER.error("Exception writing to internal frame buffer", e); + onError(e); + } + } + public void onError(java.lang.Exception e) { + byte msgType = org.apache.thrift.protocol.TMessageType.REPLY; + org.apache.thrift.TSerializable msg; + negotiate_result result = new negotiate_result(); + if (e instanceof org.apache.thrift.transport.TTransportException) { + _LOGGER.error("TTransportException inside handler", e); + fb.close(); + return; + } else if (e instanceof org.apache.thrift.TApplicationException) { + _LOGGER.error("TApplicationException inside handler", e); + msgType = org.apache.thrift.protocol.TMessageType.EXCEPTION; + msg = (org.apache.thrift.TApplicationException)e; + } else { + _LOGGER.error("Exception inside handler", e); + msgType = org.apache.thrift.protocol.TMessageType.EXCEPTION; + msg = new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.INTERNAL_ERROR, e.getMessage()); + } + try { + fcall.sendResponse(fb,msg,msgType,seqid); + } catch (java.lang.Exception ex) { + _LOGGER.error("Exception writing to internal frame buffer", ex); + fb.close(); + } + } + }; + } + + protected boolean isOneway() { + return false; + } + + public void start(I iface, negotiate_args args, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException { + iface.negotiate(args.request,resultHandler); + } + } + + } + + public static class negotiate_args implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { + private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("negotiate_args"); + + private static final org.apache.thrift.protocol.TField REQUEST_FIELD_DESC = new org.apache.thrift.protocol.TField("request", org.apache.thrift.protocol.TType.STRUCT, (short)1); + + private static final org.apache.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new negotiate_argsStandardSchemeFactory(); + private static final org.apache.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new negotiate_argsTupleSchemeFactory(); + + public negotiation_request request; // required + + /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ + public enum _Fields implements org.apache.thrift.TFieldIdEnum { + REQUEST((short)1, "request"); + + private static final java.util.Map byName = new java.util.HashMap(); + + static { + for (_Fields field : java.util.EnumSet.allOf(_Fields.class)) { + byName.put(field.getFieldName(), field); + } + } + + /** + * Find the _Fields constant that matches fieldId, or null if its not found. + */ + public static _Fields findByThriftId(int fieldId) { + switch(fieldId) { + case 1: // REQUEST + return REQUEST; + default: + return null; + } + } + + /** + * Find the _Fields constant that matches fieldId, throwing an exception + * if it is not found. + */ + public static _Fields findByThriftIdOrThrow(int fieldId) { + _Fields fields = findByThriftId(fieldId); + if (fields == null) throw new java.lang.IllegalArgumentException("Field " + fieldId + " doesn't exist!"); + return fields; + } + + /** + * Find the _Fields constant that matches name, or null if its not found. + */ + public static _Fields findByName(java.lang.String name) { + return byName.get(name); + } + + private final short _thriftId; + private final java.lang.String _fieldName; + + _Fields(short thriftId, java.lang.String fieldName) { + _thriftId = thriftId; + _fieldName = fieldName; + } + + public short getThriftFieldId() { + return _thriftId; + } + + public java.lang.String getFieldName() { + return _fieldName; + } + } + + // isset id assignments + public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; + static { + java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new java.util.EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); + tmpMap.put(_Fields.REQUEST, new org.apache.thrift.meta_data.FieldMetaData("request", org.apache.thrift.TFieldRequirementType.DEFAULT, + new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, negotiation_request.class))); + metaDataMap = java.util.Collections.unmodifiableMap(tmpMap); + org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(negotiate_args.class, metaDataMap); + } + + public negotiate_args() { + } + + public negotiate_args( + negotiation_request request) + { + this(); + this.request = request; + } + + /** + * Performs a deep copy on other. + */ + public negotiate_args(negotiate_args other) { + if (other.isSetRequest()) { + this.request = new negotiation_request(other.request); + } + } + + public negotiate_args deepCopy() { + return new negotiate_args(this); + } + + @Override + public void clear() { + this.request = null; + } + + public negotiation_request getRequest() { + return this.request; + } + + public negotiate_args setRequest(negotiation_request request) { + this.request = request; + return this; + } + + public void unsetRequest() { + this.request = null; + } + + /** Returns true if field request is set (has been assigned a value) and false otherwise */ + public boolean isSetRequest() { + return this.request != null; + } + + public void setRequestIsSet(boolean value) { + if (!value) { + this.request = null; + } + } + + public void setFieldValue(_Fields field, java.lang.Object value) { + switch (field) { + case REQUEST: + if (value == null) { + unsetRequest(); + } else { + setRequest((negotiation_request)value); + } + break; + + } + } + + public java.lang.Object getFieldValue(_Fields field) { + switch (field) { + case REQUEST: + return getRequest(); + + } + throw new java.lang.IllegalStateException(); + } + + /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */ + public boolean isSet(_Fields field) { + if (field == null) { + throw new java.lang.IllegalArgumentException(); + } + + switch (field) { + case REQUEST: + return isSetRequest(); + } + throw new java.lang.IllegalStateException(); + } + + @Override + public boolean equals(java.lang.Object that) { + if (that == null) + return false; + if (that instanceof negotiate_args) + return this.equals((negotiate_args)that); + return false; + } + + public boolean equals(negotiate_args that) { + if (that == null) + return false; + if (this == that) + return true; + + boolean this_present_request = true && this.isSetRequest(); + boolean that_present_request = true && that.isSetRequest(); + if (this_present_request || that_present_request) { + if (!(this_present_request && that_present_request)) + return false; + if (!this.request.equals(that.request)) + return false; + } + + return true; + } + + @Override + public int hashCode() { + int hashCode = 1; + + hashCode = hashCode * 8191 + ((isSetRequest()) ? 131071 : 524287); + if (isSetRequest()) + hashCode = hashCode * 8191 + request.hashCode(); + + return hashCode; + } + + @Override + public int compareTo(negotiate_args other) { + if (!getClass().equals(other.getClass())) { + return getClass().getName().compareTo(other.getClass().getName()); + } + + int lastComparison = 0; + + lastComparison = java.lang.Boolean.valueOf(isSetRequest()).compareTo(other.isSetRequest()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetRequest()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.request, other.request); + if (lastComparison != 0) { + return lastComparison; + } + } + return 0; + } + + public _Fields fieldForId(int fieldId) { + return _Fields.findByThriftId(fieldId); + } + + public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException { + scheme(iprot).read(iprot, this); + } + + public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException { + scheme(oprot).write(oprot, this); + } + + @Override + public java.lang.String toString() { + java.lang.StringBuilder sb = new java.lang.StringBuilder("negotiate_args("); + boolean first = true; + + sb.append("request:"); + if (this.request == null) { + sb.append("null"); + } else { + sb.append(this.request); + } + first = false; + sb.append(")"); + return sb.toString(); + } + + public void validate() throws org.apache.thrift.TException { + // check for required fields + // check for sub-struct validity + if (request != null) { + request.validate(); + } + } + + private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException { + try { + write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, java.lang.ClassNotFoundException { + try { + read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private static class negotiate_argsStandardSchemeFactory implements org.apache.thrift.scheme.SchemeFactory { + public negotiate_argsStandardScheme getScheme() { + return new negotiate_argsStandardScheme(); + } + } + + private static class negotiate_argsStandardScheme extends org.apache.thrift.scheme.StandardScheme { + + public void read(org.apache.thrift.protocol.TProtocol iprot, negotiate_args struct) throws org.apache.thrift.TException { + org.apache.thrift.protocol.TField schemeField; + iprot.readStructBegin(); + while (true) + { + schemeField = iprot.readFieldBegin(); + if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { + break; + } + switch (schemeField.id) { + case 1: // REQUEST + if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) { + struct.request = new negotiation_request(); + struct.request.read(iprot); + struct.setRequestIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + default: + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + iprot.readFieldEnd(); + } + iprot.readStructEnd(); + + // check for required fields of primitive type, which can't be checked in the validate method + struct.validate(); + } + + public void write(org.apache.thrift.protocol.TProtocol oprot, negotiate_args struct) throws org.apache.thrift.TException { + struct.validate(); + + oprot.writeStructBegin(STRUCT_DESC); + if (struct.request != null) { + oprot.writeFieldBegin(REQUEST_FIELD_DESC); + struct.request.write(oprot); + oprot.writeFieldEnd(); + } + oprot.writeFieldStop(); + oprot.writeStructEnd(); + } + + } + + private static class negotiate_argsTupleSchemeFactory implements org.apache.thrift.scheme.SchemeFactory { + public negotiate_argsTupleScheme getScheme() { + return new negotiate_argsTupleScheme(); + } + } + + private static class negotiate_argsTupleScheme extends org.apache.thrift.scheme.TupleScheme { + + @Override + public void write(org.apache.thrift.protocol.TProtocol prot, negotiate_args struct) throws org.apache.thrift.TException { + org.apache.thrift.protocol.TTupleProtocol oprot = (org.apache.thrift.protocol.TTupleProtocol) prot; + java.util.BitSet optionals = new java.util.BitSet(); + if (struct.isSetRequest()) { + optionals.set(0); + } + oprot.writeBitSet(optionals, 1); + if (struct.isSetRequest()) { + struct.request.write(oprot); + } + } + + @Override + public void read(org.apache.thrift.protocol.TProtocol prot, negotiate_args struct) throws org.apache.thrift.TException { + org.apache.thrift.protocol.TTupleProtocol iprot = (org.apache.thrift.protocol.TTupleProtocol) prot; + java.util.BitSet incoming = iprot.readBitSet(1); + if (incoming.get(0)) { + struct.request = new negotiation_request(); + struct.request.read(iprot); + struct.setRequestIsSet(true); + } + } + } + + private static S scheme(org.apache.thrift.protocol.TProtocol proto) { + return (org.apache.thrift.scheme.StandardScheme.class.equals(proto.getScheme()) ? STANDARD_SCHEME_FACTORY : TUPLE_SCHEME_FACTORY).getScheme(); + } + } + + public static class negotiate_result implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { + private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("negotiate_result"); + + private static final org.apache.thrift.protocol.TField SUCCESS_FIELD_DESC = new org.apache.thrift.protocol.TField("success", org.apache.thrift.protocol.TType.STRUCT, (short)0); + + private static final org.apache.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new negotiate_resultStandardSchemeFactory(); + private static final org.apache.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new negotiate_resultTupleSchemeFactory(); + + public negotiation_response success; // required + + /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ + public enum _Fields implements org.apache.thrift.TFieldIdEnum { + SUCCESS((short)0, "success"); + + private static final java.util.Map byName = new java.util.HashMap(); + + static { + for (_Fields field : java.util.EnumSet.allOf(_Fields.class)) { + byName.put(field.getFieldName(), field); + } + } + + /** + * Find the _Fields constant that matches fieldId, or null if its not found. + */ + public static _Fields findByThriftId(int fieldId) { + switch(fieldId) { + case 0: // SUCCESS + return SUCCESS; + default: + return null; + } + } + + /** + * Find the _Fields constant that matches fieldId, throwing an exception + * if it is not found. + */ + public static _Fields findByThriftIdOrThrow(int fieldId) { + _Fields fields = findByThriftId(fieldId); + if (fields == null) throw new java.lang.IllegalArgumentException("Field " + fieldId + " doesn't exist!"); + return fields; + } + + /** + * Find the _Fields constant that matches name, or null if its not found. + */ + public static _Fields findByName(java.lang.String name) { + return byName.get(name); + } + + private final short _thriftId; + private final java.lang.String _fieldName; + + _Fields(short thriftId, java.lang.String fieldName) { + _thriftId = thriftId; + _fieldName = fieldName; + } + + public short getThriftFieldId() { + return _thriftId; + } + + public java.lang.String getFieldName() { + return _fieldName; + } + } + + // isset id assignments + public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; + static { + java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new java.util.EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); + tmpMap.put(_Fields.SUCCESS, new org.apache.thrift.meta_data.FieldMetaData("success", org.apache.thrift.TFieldRequirementType.DEFAULT, + new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, negotiation_response.class))); + metaDataMap = java.util.Collections.unmodifiableMap(tmpMap); + org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(negotiate_result.class, metaDataMap); + } + + public negotiate_result() { + } + + public negotiate_result( + negotiation_response success) + { + this(); + this.success = success; + } + + /** + * Performs a deep copy on other. + */ + public negotiate_result(negotiate_result other) { + if (other.isSetSuccess()) { + this.success = new negotiation_response(other.success); + } + } + + public negotiate_result deepCopy() { + return new negotiate_result(this); + } + + @Override + public void clear() { + this.success = null; + } + + public negotiation_response getSuccess() { + return this.success; + } + + public negotiate_result setSuccess(negotiation_response success) { + this.success = success; + return this; + } + + public void unsetSuccess() { + this.success = null; + } + + /** Returns true if field success is set (has been assigned a value) and false otherwise */ + public boolean isSetSuccess() { + return this.success != null; + } + + public void setSuccessIsSet(boolean value) { + if (!value) { + this.success = null; + } + } + + public void setFieldValue(_Fields field, java.lang.Object value) { + switch (field) { + case SUCCESS: + if (value == null) { + unsetSuccess(); + } else { + setSuccess((negotiation_response)value); + } + break; + + } + } + + public java.lang.Object getFieldValue(_Fields field) { + switch (field) { + case SUCCESS: + return getSuccess(); + + } + throw new java.lang.IllegalStateException(); + } + + /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */ + public boolean isSet(_Fields field) { + if (field == null) { + throw new java.lang.IllegalArgumentException(); + } + + switch (field) { + case SUCCESS: + return isSetSuccess(); + } + throw new java.lang.IllegalStateException(); + } + + @Override + public boolean equals(java.lang.Object that) { + if (that == null) + return false; + if (that instanceof negotiate_result) + return this.equals((negotiate_result)that); + return false; + } + + public boolean equals(negotiate_result that) { + if (that == null) + return false; + if (this == that) + return true; + + boolean this_present_success = true && this.isSetSuccess(); + boolean that_present_success = true && that.isSetSuccess(); + if (this_present_success || that_present_success) { + if (!(this_present_success && that_present_success)) + return false; + if (!this.success.equals(that.success)) + return false; + } + + return true; + } + + @Override + public int hashCode() { + int hashCode = 1; + + hashCode = hashCode * 8191 + ((isSetSuccess()) ? 131071 : 524287); + if (isSetSuccess()) + hashCode = hashCode * 8191 + success.hashCode(); + + return hashCode; + } + + @Override + public int compareTo(negotiate_result other) { + if (!getClass().equals(other.getClass())) { + return getClass().getName().compareTo(other.getClass().getName()); + } + + int lastComparison = 0; + + lastComparison = java.lang.Boolean.valueOf(isSetSuccess()).compareTo(other.isSetSuccess()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetSuccess()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.success, other.success); + if (lastComparison != 0) { + return lastComparison; + } + } + return 0; + } + + public _Fields fieldForId(int fieldId) { + return _Fields.findByThriftId(fieldId); + } + + public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException { + scheme(iprot).read(iprot, this); + } + + public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException { + scheme(oprot).write(oprot, this); + } + + @Override + public java.lang.String toString() { + java.lang.StringBuilder sb = new java.lang.StringBuilder("negotiate_result("); + boolean first = true; + + sb.append("success:"); + if (this.success == null) { + sb.append("null"); + } else { + sb.append(this.success); + } + first = false; + sb.append(")"); + return sb.toString(); + } + + public void validate() throws org.apache.thrift.TException { + // check for required fields + // check for sub-struct validity + if (success != null) { + success.validate(); + } + } + + private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException { + try { + write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, java.lang.ClassNotFoundException { + try { + read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private static class negotiate_resultStandardSchemeFactory implements org.apache.thrift.scheme.SchemeFactory { + public negotiate_resultStandardScheme getScheme() { + return new negotiate_resultStandardScheme(); + } + } + + private static class negotiate_resultStandardScheme extends org.apache.thrift.scheme.StandardScheme { + + public void read(org.apache.thrift.protocol.TProtocol iprot, negotiate_result struct) throws org.apache.thrift.TException { + org.apache.thrift.protocol.TField schemeField; + iprot.readStructBegin(); + while (true) + { + schemeField = iprot.readFieldBegin(); + if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { + break; + } + switch (schemeField.id) { + case 0: // SUCCESS + if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) { + struct.success = new negotiation_response(); + struct.success.read(iprot); + struct.setSuccessIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + default: + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + iprot.readFieldEnd(); + } + iprot.readStructEnd(); + + // check for required fields of primitive type, which can't be checked in the validate method + struct.validate(); + } + + public void write(org.apache.thrift.protocol.TProtocol oprot, negotiate_result struct) throws org.apache.thrift.TException { + struct.validate(); + + oprot.writeStructBegin(STRUCT_DESC); + if (struct.success != null) { + oprot.writeFieldBegin(SUCCESS_FIELD_DESC); + struct.success.write(oprot); + oprot.writeFieldEnd(); + } + oprot.writeFieldStop(); + oprot.writeStructEnd(); + } + + } + + private static class negotiate_resultTupleSchemeFactory implements org.apache.thrift.scheme.SchemeFactory { + public negotiate_resultTupleScheme getScheme() { + return new negotiate_resultTupleScheme(); + } + } + + private static class negotiate_resultTupleScheme extends org.apache.thrift.scheme.TupleScheme { + + @Override + public void write(org.apache.thrift.protocol.TProtocol prot, negotiate_result struct) throws org.apache.thrift.TException { + org.apache.thrift.protocol.TTupleProtocol oprot = (org.apache.thrift.protocol.TTupleProtocol) prot; + java.util.BitSet optionals = new java.util.BitSet(); + if (struct.isSetSuccess()) { + optionals.set(0); + } + oprot.writeBitSet(optionals, 1); + if (struct.isSetSuccess()) { + struct.success.write(oprot); + } + } + + @Override + public void read(org.apache.thrift.protocol.TProtocol prot, negotiate_result struct) throws org.apache.thrift.TException { + org.apache.thrift.protocol.TTupleProtocol iprot = (org.apache.thrift.protocol.TTupleProtocol) prot; + java.util.BitSet incoming = iprot.readBitSet(1); + if (incoming.get(0)) { + struct.success = new negotiation_response(); + struct.success.read(iprot); + struct.setSuccessIsSet(true); + } + } + } + + private static S scheme(org.apache.thrift.protocol.TProtocol proto) { + return (org.apache.thrift.scheme.StandardScheme.class.equals(proto.getScheme()) ? STANDARD_SCHEME_FACTORY : TUPLE_SCHEME_FACTORY).getScheme(); + } + } + +} diff --git a/src/main/java/com/xiaomi/infra/pegasus/operator/negotiation_operator.java b/src/main/java/com/xiaomi/infra/pegasus/operator/negotiation_operator.java new file mode 100644 index 00000000..3a1e3250 --- /dev/null +++ b/src/main/java/com/xiaomi/infra/pegasus/operator/negotiation_operator.java @@ -0,0 +1,49 @@ +// Copyright (c) 2017, Xiaomi, Inc. All rights reserved. +// This source code is licensed under the Apache License Version 2.0, which +// can be found in the LICENSE file in the root directory of this source tree. +package com.xiaomi.infra.pegasus.operator; + +import com.xiaomi.infra.pegasus.apps.negotiation_request; +import com.xiaomi.infra.pegasus.apps.negotiation_response; +import com.xiaomi.infra.pegasus.apps.security; +import com.xiaomi.infra.pegasus.base.gpid; +import org.apache.thrift.TApplicationException; +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TMessage; +import org.apache.thrift.protocol.TMessageType; +import org.apache.thrift.protocol.TProtocol; + +public class negotiation_operator extends client_operator { + public negotiation_operator(negotiation_request request) { + super(new gpid(), "", 0); + this.request = request; + } + + public String name() { + return "negotiate"; + } + + public void send_data(TProtocol oprot, int seqid) throws TException { + TMessage msg = new TMessage("RPC_NEGOTIATION", TMessageType.CALL, seqid); + oprot.writeMessageBegin(msg); + security.negotiate_args get_args = new security.negotiate_args(request); + get_args.write(oprot); + oprot.writeMessageEnd(); + } + + public void recv_data(TProtocol iprot) throws TException { + security.negotiate_result result = new security.negotiate_result(); + result.read(iprot); + if (result.isSetSuccess()) resp = result.success; + else + throw new TApplicationException( + TApplicationException.MISSING_RESULT, "get failed: unknown result"); + } + + public negotiation_response get_response() { + return resp; + } + + private negotiation_request request; + private negotiation_response resp; +} diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ClusterManager.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ClusterManager.java index 89d9aa04..5ccefb94 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ClusterManager.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ClusterManager.java @@ -56,14 +56,13 @@ public ClusterManager(ClientOptions opts) throws IllegalArgumentException { replicaGroup = getEventLoopGroupInstance(opts.getAsyncWorkers()); metaGroup = getEventLoopGroupInstance(1); tableGroup = getEventLoopGroupInstance(1); + enableAuth = opts.isEnableAuth(); metaList = opts.getMetaServers().split(","); // the constructor of meta session is depend on the replicaSessions, // so the replicaSessions should be initialized earlier metaSession = new MetaSession(this, metaList, (int) opts.getMetaQueryTimeout().toMillis(), 10, metaGroup); - - this.enableAuth = opts.isEnableAuth(); } public EventExecutor getExecutor() { diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/Negotiation.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/Negotiation.java index 19152fe9..342364dd 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/Negotiation.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/Negotiation.java @@ -1,13 +1,79 @@ package com.xiaomi.infra.pegasus.rpc.async; +import com.xiaomi.infra.pegasus.apps.negotiation_request; +import com.xiaomi.infra.pegasus.apps.negotiation_response; +import com.xiaomi.infra.pegasus.apps.negotiation_status; +import com.xiaomi.infra.pegasus.base.blob; +import com.xiaomi.infra.pegasus.base.error_code; +import com.xiaomi.infra.pegasus.operator.negotiation_operator; +import com.xiaomi.infra.pegasus.rpc.ReplicationException; +import org.slf4j.Logger; + public class Negotiation { public Negotiation(ReplicaSession session) { this.session = session; } public void start() { - // TBD(zlw) + status = negotiation_status.SASL_LIST_MECHANISMS; + send(status, new blob(new byte[0])); + } + + public void send(negotiation_status status, blob msg) { + negotiation_request request = new negotiation_request(); + request.status = status; + request.msg = msg; + session.sendNegoMsg(request, defaultTimeoutMs); + } + + public static class RecvHandler implements Runnable { + negotiation_operator op; + ReplicaSession session; + + RecvHandler(negotiation_operator op, ReplicaSession session) { + this.op = op; + this.session = session; + } + + @Override + public void run() { + try { + if (op.rpc_error.errno != error_code.error_types.ERR_OK) { + throw new ReplicationException(op.rpc_error.errno); + } + handleResponse(); + } catch (Exception e) { + logger.error("Negotiation failed", e); + session.markSessionDisconnect(); + } + } + + private void handleResponse() throws Exception { + final negotiation_response resp = op.get_response(); + if (resp == null) { + logger.error("RecvHandler received a null response, abandon it"); + return; + } + + switch (resp.status) { + case SASL_LIST_MECHANISMS_RESP: + case SASL_SELECT_MECHANISMS_RESP: + case SASL_CHALLENGE: + case SASL_SUCC: + break; + default: + throw new Exception("Received an unexpected response, status " + resp.status); + } + } + } + + public negotiation_status get_status() { + return status; } private ReplicaSession session; + private negotiation_status status; + + private static final Logger logger = org.slf4j.LoggerFactory.getLogger(Negotiation.class); + private static final int defaultTimeoutMs = 3000; } diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java index d916bd56..2aa5361e 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java @@ -3,9 +3,11 @@ // can be found in the LICENSE file in the root directory of this source tree. package com.xiaomi.infra.pegasus.rpc.async; +import com.xiaomi.infra.pegasus.apps.negotiation_request; import com.xiaomi.infra.pegasus.base.error_code.error_types; import com.xiaomi.infra.pegasus.base.rpc_address; import com.xiaomi.infra.pegasus.operator.client_operator; +import com.xiaomi.infra.pegasus.operator.negotiation_operator; import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.socket.SocketChannel; @@ -119,6 +121,17 @@ public int asyncSend( return entry.sequenceId; } + public void sendNegoMsg(negotiation_request msg, long timeoutInMilliseconds) { + final RequestEntry entry = new ReplicaSession.RequestEntry(); + entry.sequenceId = seqId.getAndIncrement(); + entry.op = new negotiation_operator(msg); + entry.callback = new Negotiation.RecvHandler((negotiation_operator) entry.op, this); + entry.timeoutTask = addTimer(entry.sequenceId, timeoutInMilliseconds); + pendingResponse.put(entry.sequenceId, entry); + + write(entry, fields); + } + public void closeSession() { VolatileFields f = fields; if (f.state == ConnState.CONNECTED && f.nettyChannel != null) { @@ -210,7 +223,11 @@ public void operationComplete(ChannelFuture channelFuture) throws Exception { } private void startNegotiation(Channel activeChannel) { - logger.info("{}: mark session state negotiation"); + VolatileFields newCache = new VolatileFields(); + newCache.state = ConnState.CONNECTING; + newCache.nettyChannel = activeChannel; + fields = newCache; + if (enableAuth) { negotiation = new Negotiation(this); negotiation.start(); diff --git a/src/test/java/com/xiaomi/infra/pegasus/rpc/async/NegotiationTest.java b/src/test/java/com/xiaomi/infra/pegasus/rpc/async/NegotiationTest.java new file mode 100644 index 00000000..b99de766 --- /dev/null +++ b/src/test/java/com/xiaomi/infra/pegasus/rpc/async/NegotiationTest.java @@ -0,0 +1,20 @@ +package com.xiaomi.infra.pegasus.rpc.async; + +import static com.xiaomi.infra.pegasus.apps.negotiation_status.SASL_LIST_MECHANISMS; +import static org.mockito.ArgumentMatchers.any; + +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +public class NegotiationTest { + @Test + public void testStart() { + Negotiation negotiation = new Negotiation(null); + Negotiation mockNegotiation = Mockito.spy(negotiation); + + Mockito.doNothing().when(mockNegotiation).send(any(), any()); + mockNegotiation.start(); + Assert.assertEquals(mockNegotiation.get_status(), SASL_LIST_MECHANISMS); + } +} From 6e2e949dbc2e8597e574d4abbd0d690dd0c45998 Mon Sep 17 00:00:00 2001 From: levy Date: Mon, 14 Sep 2020 17:40:47 +0800 Subject: [PATCH 2/9] fix --- .../infra/pegasus/client/ClientOptions.java | 2 +- .../infra/pegasus/rpc/async/ReplicaSession.java | 16 ++++++++-------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/ClientOptions.java b/src/main/java/com/xiaomi/infra/pegasus/client/ClientOptions.java index 949dfc29..28662441 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/ClientOptions.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/ClientOptions.java @@ -56,7 +56,7 @@ public class ClientOptions { public static final Duration DEFAULT_FALCON_PUSH_INTERVAL = Duration.ofSeconds(10); public static final boolean DEFAULT_ENABLE_WRITE_LIMIT = true; public static final Duration DEFAULT_META_QUERY_TIMEOUT = Duration.ofMillis(5000); - public static final boolean DEFAULT_ENABLE_AUTH = false; + public static final boolean DEFAULT_ENABLE_AUTH = true; private final String metaServers; private final Duration operationTimeout; diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java index 2aa5361e..30b54622 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java @@ -228,13 +228,8 @@ private void startNegotiation(Channel activeChannel) { newCache.nettyChannel = activeChannel; fields = newCache; - if (enableAuth) { - negotiation = new Negotiation(this); - negotiation.start(); - } else { - logger.info("{}: mark session state connected"); - markSessionConnected(activeChannel); - } + negotiation = new Negotiation(this); + negotiation.start(); } private void markSessionConnected(Channel activeChannel) { @@ -398,7 +393,12 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { logger.info("Channel {} for session {} is active", ctx.channel().toString(), name()); - startNegotiation(ctx.channel()); + if (enableAuth) { + startNegotiation(ctx.channel()); + } else { + logger.info("{}: mark session state connected"); + markSessionConnected(ctx.channel()); + } } @Override From 8631927334b32b5a89141b4eeca6613fd2cad15a Mon Sep 17 00:00:00 2001 From: levy Date: Mon, 14 Sep 2020 17:41:20 +0800 Subject: [PATCH 3/9] fix --- .../java/com/xiaomi/infra/pegasus/client/ClientOptions.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/ClientOptions.java b/src/main/java/com/xiaomi/infra/pegasus/client/ClientOptions.java index 28662441..949dfc29 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/ClientOptions.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/ClientOptions.java @@ -56,7 +56,7 @@ public class ClientOptions { public static final Duration DEFAULT_FALCON_PUSH_INTERVAL = Duration.ofSeconds(10); public static final boolean DEFAULT_ENABLE_WRITE_LIMIT = true; public static final Duration DEFAULT_META_QUERY_TIMEOUT = Duration.ofMillis(5000); - public static final boolean DEFAULT_ENABLE_AUTH = true; + public static final boolean DEFAULT_ENABLE_AUTH = false; private final String metaServers; private final Duration operationTimeout; From 172239f580c18fb307a4e377184f0dbafac533d5 Mon Sep 17 00:00:00 2001 From: levy Date: Tue, 15 Sep 2020 18:49:51 +0800 Subject: [PATCH 4/9] fix --- .../xiaomi/infra/pegasus/rpc/async/Negotiation.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/Negotiation.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/Negotiation.java index 342364dd..dfd3c52f 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/Negotiation.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/Negotiation.java @@ -10,6 +10,12 @@ import org.slf4j.Logger; public class Negotiation { + private static final Logger logger = org.slf4j.LoggerFactory.getLogger(Negotiation.class); + + private ReplicaSession session; + private negotiation_status status; + private static final int defaultTimeoutMs = 3000; + public Negotiation(ReplicaSession session) { this.session = session; } @@ -70,10 +76,4 @@ private void handleResponse() throws Exception { public negotiation_status get_status() { return status; } - - private ReplicaSession session; - private negotiation_status status; - - private static final Logger logger = org.slf4j.LoggerFactory.getLogger(Negotiation.class); - private static final int defaultTimeoutMs = 3000; } From 5aceaa1377d7c6dd4e3e5024b38f560eabf9640c Mon Sep 17 00:00:00 2001 From: levy Date: Tue, 15 Sep 2020 19:12:20 +0800 Subject: [PATCH 5/9] feat(security): implement start negotiation --- .../infra/pegasus/rpc/async/Negotiation.java | 18 +++--------- .../pegasus/rpc/async/ReplicaSession.java | 28 ++++++------------- 2 files changed, 12 insertions(+), 34 deletions(-) diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/Negotiation.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/Negotiation.java index dfd3c52f..17017e86 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/Negotiation.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/Negotiation.java @@ -1,6 +1,5 @@ package com.xiaomi.infra.pegasus.rpc.async; -import com.xiaomi.infra.pegasus.apps.negotiation_request; import com.xiaomi.infra.pegasus.apps.negotiation_response; import com.xiaomi.infra.pegasus.apps.negotiation_status; import com.xiaomi.infra.pegasus.base.blob; @@ -11,10 +10,8 @@ public class Negotiation { private static final Logger logger = org.slf4j.LoggerFactory.getLogger(Negotiation.class); - - private ReplicaSession session; private negotiation_status status; - private static final int defaultTimeoutMs = 3000; + ReplicaSession session; public Negotiation(ReplicaSession session) { this.session = session; @@ -26,19 +23,14 @@ public void start() { } public void send(negotiation_status status, blob msg) { - negotiation_request request = new negotiation_request(); - request.status = status; - request.msg = msg; - session.sendNegoMsg(request, defaultTimeoutMs); + // TODO: send negotiation message, using RecvHandler to handle the corresponding response. } public static class RecvHandler implements Runnable { negotiation_operator op; - ReplicaSession session; - RecvHandler(negotiation_operator op, ReplicaSession session) { + RecvHandler(negotiation_operator op) { this.op = op; - this.session = session; } @Override @@ -50,15 +42,13 @@ public void run() { handleResponse(); } catch (Exception e) { logger.error("Negotiation failed", e); - session.markSessionDisconnect(); } } private void handleResponse() throws Exception { final negotiation_response resp = op.get_response(); if (resp == null) { - logger.error("RecvHandler received a null response, abandon it"); - return; + throw new Exception("RecvHandler received a null response, abandon it"); } switch (resp.status) { diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java index 30b54622..f82a874c 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java @@ -3,11 +3,9 @@ // can be found in the LICENSE file in the root directory of this source tree. package com.xiaomi.infra.pegasus.rpc.async; -import com.xiaomi.infra.pegasus.apps.negotiation_request; import com.xiaomi.infra.pegasus.base.error_code.error_types; import com.xiaomi.infra.pegasus.base.rpc_address; import com.xiaomi.infra.pegasus.operator.client_operator; -import com.xiaomi.infra.pegasus.operator.negotiation_operator; import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.socket.SocketChannel; @@ -121,17 +119,6 @@ public int asyncSend( return entry.sequenceId; } - public void sendNegoMsg(negotiation_request msg, long timeoutInMilliseconds) { - final RequestEntry entry = new ReplicaSession.RequestEntry(); - entry.sequenceId = seqId.getAndIncrement(); - entry.op = new negotiation_operator(msg); - entry.callback = new Negotiation.RecvHandler((negotiation_operator) entry.op, this); - entry.timeoutTask = addTimer(entry.sequenceId, timeoutInMilliseconds); - pendingResponse.put(entry.sequenceId, entry); - - write(entry, fields); - } - public void closeSession() { VolatileFields f = fields; if (f.state == ConnState.CONNECTED && f.nettyChannel != null) { @@ -223,13 +210,14 @@ public void operationComplete(ChannelFuture channelFuture) throws Exception { } private void startNegotiation(Channel activeChannel) { - VolatileFields newCache = new VolatileFields(); - newCache.state = ConnState.CONNECTING; - newCache.nettyChannel = activeChannel; - fields = newCache; - - negotiation = new Negotiation(this); - negotiation.start(); + logger.info("{}: mark session state negotiation"); + if (enableAuth) { + negotiation = new Negotiation(this); + negotiation.start(); + } else { + logger.info("{}: mark session state connected"); + markSessionConnected(activeChannel); + } } private void markSessionConnected(Channel activeChannel) { From 566e99630d3fb4caa419c8ba784e4b201aa228dd Mon Sep 17 00:00:00 2001 From: levy Date: Tue, 15 Sep 2020 19:14:08 +0800 Subject: [PATCH 6/9] fix --- .../com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java index f82a874c..d916bd56 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java @@ -381,12 +381,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { logger.info("Channel {} for session {} is active", ctx.channel().toString(), name()); - if (enableAuth) { - startNegotiation(ctx.channel()); - } else { - logger.info("{}: mark session state connected"); - markSessionConnected(ctx.channel()); - } + startNegotiation(ctx.channel()); } @Override From 91a5932fb35d7e3d14abb2ce78b821b01df21dc2 Mon Sep 17 00:00:00 2001 From: levy Date: Wed, 16 Sep 2020 11:32:56 +0800 Subject: [PATCH 7/9] fix --- .../java/com/xiaomi/infra/pegasus/rpc/async/Negotiation.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/Negotiation.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/Negotiation.java index 17017e86..c6132462 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/Negotiation.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/Negotiation.java @@ -11,7 +11,7 @@ public class Negotiation { private static final Logger logger = org.slf4j.LoggerFactory.getLogger(Negotiation.class); private negotiation_status status; - ReplicaSession session; + private ReplicaSession session; public Negotiation(ReplicaSession session) { this.session = session; From ac3070c809039e2e46a655f5ad4a7189762d9122 Mon Sep 17 00:00:00 2001 From: levy Date: Wed, 16 Sep 2020 11:34:16 +0800 Subject: [PATCH 8/9] fix --- .../java/com/xiaomi/infra/pegasus/rpc/async/Negotiation.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/Negotiation.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/Negotiation.java index c6132462..6f82f48c 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/Negotiation.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/Negotiation.java @@ -26,7 +26,7 @@ public void send(negotiation_status status, blob msg) { // TODO: send negotiation message, using RecvHandler to handle the corresponding response. } - public static class RecvHandler implements Runnable { + private class RecvHandler implements Runnable { negotiation_operator op; RecvHandler(negotiation_operator op) { From 0935fad8c804fc4a7352f88cd08c28c0a3a59b28 Mon Sep 17 00:00:00 2001 From: levy Date: Thu, 17 Sep 2020 14:34:59 +0800 Subject: [PATCH 9/9] fix --- .../java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java index d916bd56..106f8bd5 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java @@ -210,12 +210,10 @@ public void operationComplete(ChannelFuture channelFuture) throws Exception { } private void startNegotiation(Channel activeChannel) { - logger.info("{}: mark session state negotiation"); if (enableAuth) { negotiation = new Negotiation(this); negotiation.start(); } else { - logger.info("{}: mark session state connected"); markSessionConnected(activeChannel); } }