Skip to content

Commit

Permalink
Merge c186978 into 5594dac
Browse files Browse the repository at this point in the history
  • Loading branch information
FloatingCrowbar authored Feb 4, 2024
2 parents 5594dac + c186978 commit c2c899d
Showing 1 changed file with 84 additions and 7 deletions.
91 changes: 84 additions & 7 deletions ydb/core/persqueue/key.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ class TKeyPrefix : public TBuffer
: Partition(partition)
{
Resize(UnmarkedSize());
*PtrType() = type;
memcpy(PtrPartition(), Sprintf("%.10" PRIu32, Partition.InternalPartitionId).data(), 10);
SetType(type);
memcpy(PtrPartition(), Sprintf("%.10" PRIu32, Partition.InternalPartitionId).data(), 10);
}

TKeyPrefix(EType type, const TPartitionId& partition, EMark mark)
Expand All @@ -62,25 +62,100 @@ class TKeyPrefix : public TBuffer
static constexpr ui32 MarkPosition() { return UnmarkedSize(); }
static constexpr ui32 MarkedSize() { return UnmarkedSize() + 1; }

void SetType(EType type) {
*PtrType() = type;

void SetType(EType type) {
if (!IsServicePartition()) {
*PtrType() = type;
return;
}

switch (*PtrType()) {
case TypeNone:
*PtrType() = ServiceTypeNone;
return;
case TypeData:
*PtrType() = ServiceTypeData;
return;
case TypeTmpData:
*PtrType() = ServiceTypeTmpData;
return;
case TypeInfo:
*PtrType() = ServiceTypeInfo;
return;
case TypeMeta:
*PtrType() = ServiceTypeMeta;
return;
case TypeTxMeta:
*PtrType() = ServiceTypeTxMeta;
return;
default:
Y_ABORT();
}
}


EType GetType() const {
return EType(*PtrType());
switch (*PtrType()) {
case TypeNone:
case ServiceTypeNone:
return TypeNone;
case TypeData:
case ServiceTypeData:
return TypeData;
case TypeTmpData:
case ServiceTypeTmpData:
return TypeTmpData;
case TypeInfo:
case ServiceTypeInfo:
return TypeInfo;
case TypeMeta:
case ServiceTypeMeta:
return TypeMeta;
case TypeTxMeta:
case ServiceTypeTxMeta:
return TypeTxMeta;
default:
Y_ABORT();
}
}

bool IsServicePartition() const {return ServicePartition.GetOrElse(false) || Partition.WriteId.Defined();}

const TPartitionId& GetPartition() const { return Partition; }

protected:
static constexpr ui32 UnmarkedSize() { return 1 + 10; }

void ParsePartition()
void ParsePartition()
{
Partition.InternalPartitionId = FromString<ui32>(TStringBuf{PtrPartition(), 10});
Partition.OriginalPartitionId = FromString<ui32>(TStringBuf{PtrPartition(), 10});
//ToDo: deside what to do here. We only save one number for partition
Partition.InternalPartitionId = Partition.OriginalPartitionId;
switch (*PtrType()) {
case TypeNone:
case TypeData:
case TypeTmpData:
case TypeInfo:
case TypeMeta:
case TypeTxMeta:
ServicePartition = false;
break;
default:
ServicePartition = true;
}
}


private:
enum EServiceType : char {
ServiceTypeNone = 1,
ServiceTypeInfo = 'M',
ServiceTypeData = 'D',
ServiceTypeTmpData = 'X',
ServiceTypeMeta = 'J',
ServiceTypeTxMeta = 'K'
};

char* PtrType() { return Data(); }
char* PtrMark() { return Data() + UnmarkedSize(); }
char* PtrPartition() { return Data() + 1; }
Expand All @@ -90,6 +165,8 @@ class TKeyPrefix : public TBuffer
const char* PtrPartition() const { return Data() + 1; }

TPartitionId Partition;
TMaybe<bool> ServicePartition;

};

// {char type; ui32 partiton; ui64 offset; ui16 partNo; ui32 count, ui16 internalPartsCount}
Expand Down

0 comments on commit c2c899d

Please sign in to comment.