Skip to content

Commit

Permalink
Merge branch 'master' into ingvar/force-net8
Browse files Browse the repository at this point in the history
  • Loading branch information
RReverser authored Dec 11, 2024
2 parents eaa85b1 + 3782f88 commit 70aba73
Show file tree
Hide file tree
Showing 28 changed files with 1,306 additions and 339 deletions.
16 changes: 8 additions & 8 deletions crates/bindings-csharp/BSATN.Runtime/Builtins.cs
Original file line number Diff line number Diff line change
Expand Up @@ -304,30 +304,30 @@ public AlgebraicType GetAlgebraicType(ITypeRegistrar registrar) =>

// [SpacetimeDB.Type] - we have custom representation of time in microseconds, so implementing BSATN manually
public abstract partial record ScheduleAt
: SpacetimeDB.TaggedEnum<(DateTimeOffset Time, TimeSpan Interval)>
: SpacetimeDB.TaggedEnum<(TimeSpan Interval, DateTimeOffset Time)>
{
// Manual expansion of what would be otherwise generated by the [SpacetimeDB.Type] codegen.
public sealed record Time(DateTimeOffset Time_) : ScheduleAt;

public sealed record Interval(TimeSpan Interval_) : ScheduleAt;

public static implicit operator ScheduleAt(DateTimeOffset time) => new Time(time);
public sealed record Time(DateTimeOffset Time_) : ScheduleAt;

public static implicit operator ScheduleAt(TimeSpan interval) => new Interval(interval);

public static implicit operator ScheduleAt(DateTimeOffset time) => new Time(time);

public readonly partial struct BSATN : IReadWrite<ScheduleAt>
{
[SpacetimeDB.Type]
private partial record ScheduleAtRepr
: SpacetimeDB.TaggedEnum<(DateTimeOffsetRepr Time, TimeSpanRepr Interval)>;
: SpacetimeDB.TaggedEnum<(TimeSpanRepr Interval, DateTimeOffsetRepr Time)>;

private static readonly ScheduleAtRepr.BSATN ReprBSATN = new();

public ScheduleAt Read(BinaryReader reader) =>
ReprBSATN.Read(reader) switch
{
ScheduleAtRepr.Time(var timeRepr) => new Time(timeRepr.ToStd()),
ScheduleAtRepr.Interval(var intervalRepr) => new Interval(intervalRepr.ToStd()),
ScheduleAtRepr.Time(var timeRepr) => new Time(timeRepr.ToStd()),
_ => throw new SwitchExpressionException(),
};

Expand All @@ -337,8 +337,8 @@ public void Write(BinaryWriter writer, ScheduleAt value)
writer,
value switch
{
Time(var time) => new ScheduleAtRepr.Time(new(time)),
Interval(var interval) => new ScheduleAtRepr.Interval(new(interval)),
Time(var time) => new ScheduleAtRepr.Time(new(time)),
_ => throw new SwitchExpressionException(),
}
);
Expand All @@ -349,8 +349,8 @@ public AlgebraicType GetAlgebraicType(ITypeRegistrar registrar) =>
// to avoid leaking the internal *Repr wrappers in generated SATS.
new AlgebraicType.Sum(
[
new("Time", new AlgebraicType.U64(default)),
new("Interval", new AlgebraicType.U64(default)),
new("Time", new AlgebraicType.U64(default)),
]
);
}
Expand Down
17 changes: 15 additions & 2 deletions crates/commitlog/src/repo/fs.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use std::fs::{self, File};
use std::io;
use std::io::{self, Seek};

use log::{debug, warn};
use spacetimedb_paths::server::{CommitLogDir, SegmentFile};

use super::{Repo, TxOffset, TxOffsetIndex, TxOffsetIndexMut};
use super::{Repo, Segment, TxOffset, TxOffsetIndex, TxOffsetIndexMut};

const SEGMENT_FILE_EXT: &str = ".stdb.log";

Expand Down Expand Up @@ -57,6 +57,19 @@ impl Fs {
}
}

impl Segment for File {
fn segment_len(&mut self) -> io::Result<u64> {
let old_pos = self.stream_position()?;
let len = self.seek(io::SeekFrom::End(0))?;
// If we're already at the end of the file, avoid seeking.
if old_pos != len {
self.seek(io::SeekFrom::Start(old_pos))?;
}

Ok(len)
}
}

impl Repo for Fs {
type Segment = File;

Expand Down
13 changes: 11 additions & 2 deletions crates/commitlog/src/repo/mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ impl Segment {
pub fn len(&self) -> usize {
self.buf.read().unwrap().len()
}

pub fn is_empty(&self) -> bool {
self.len() == 0
}
Expand All @@ -39,6 +40,12 @@ impl From<SharedBytes> for Segment {
}
}

impl super::Segment for Segment {
fn segment_len(&mut self) -> io::Result<u64> {
Ok(Segment::len(self) as u64)
}
}

impl FileLike for Segment {
fn fsync(&mut self) -> io::Result<()> {
Ok(())
Expand Down Expand Up @@ -118,8 +125,10 @@ impl Repo for Memory {
let mut inner = self.0.write().unwrap();
match inner.entry(offset) {
btree_map::Entry::Occupied(entry) => {
if entry.get().read().unwrap().len() == 0 {
Ok(Segment::from(Arc::clone(entry.get())))
let entry = entry.get();
let read_guard = entry.read().unwrap();
if read_guard.len() == 0 {
Ok(Segment::from(Arc::clone(entry)))
} else {
Err(io::Error::new(
io::ErrorKind::AlreadyExists,
Expand Down
17 changes: 16 additions & 1 deletion crates/commitlog/src/repo/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,28 @@ pub type TxOffset = u64;
pub type TxOffsetIndexMut = IndexFileMut<TxOffset>;
pub type TxOffsetIndex = IndexFile<TxOffset>;

pub trait Segment: FileLike + io::Read + io::Write + io::Seek + Send + Sync {
/// Determine the length in bytes of the segment.
///
/// This method does not rely on metadata `fsync`, and may use up to three
/// `seek` operations.
///
/// If the method returns successfully, the seek position before the call is
/// restored. However, if it returns an error, the seek position is
/// unspecified.
//
// TODO: Replace with `Seek::stream_len` if / when stabilized:
// https://github.com/rust-lang/rust/issues/59359
fn segment_len(&mut self) -> io::Result<u64>;
}

/// A repository of log segments.
///
/// This is mainly an internal trait to allow testing against an in-memory
/// representation.
pub trait Repo: Clone {
/// The type of log segments managed by this repo, which must behave like a file.
type Segment: io::Read + io::Write + FileLike + io::Seek + Send + Sync + 'static;
type Segment: Segment + 'static;

/// Create a new segment with the minimum transaction offset `offset`.
///
Expand Down
8 changes: 7 additions & 1 deletion crates/commitlog/src/tests/partial.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use log::debug;

use crate::{
commitlog, error, payload,
repo::{self, Repo},
repo::{self, Repo, Segment},
segment::FileLike,
tests::helpers::enable_logging,
Commit, Encode, Options, DEFAULT_LOG_FORMAT_VERSION,
Expand Down Expand Up @@ -160,6 +160,12 @@ struct ShortSegment {
max_len: u64,
}

impl Segment for ShortSegment {
fn segment_len(&mut self) -> io::Result<u64> {
self.inner.segment_len()
}
}

impl FileLike for ShortSegment {
fn fsync(&mut self) -> std::io::Result<()> {
self.inner.fsync()
Expand Down
26 changes: 23 additions & 3 deletions crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,15 +399,35 @@ impl MutTxId {
_ => unimplemented!(),
};
// Create and build the index.
//
// Ensure adding the index does not cause a unique constraint violation due to
// the existing rows having the same value for some column(s).
let mut insert_index = table.new_index(index.index_id, &columns, is_unique)?;
insert_index.build_from_rows(&columns, table.scan_rows(blob_store))?;
let mut build_from_rows = |table: &Table, bs: &dyn BlobStore| -> Result<()> {
if let Some(violation) = insert_index.build_from_rows(&columns, table.scan_rows(bs))? {
let violation = table
.get_row_ref(bs, violation)
.expect("row came from scanning the table")
.project(&columns)
.expect("`cols` should consist of valid columns for this table");
return Err(IndexError::from(table.build_error_unique(&insert_index, &columns, violation)).into());
}
Ok(())
};
build_from_rows(table, blob_store)?;
// NOTE: Also add all the rows in the already committed table to the index.
//
// FIXME: Is this correct? Index scan iterators (incl. the existing `Locking` versions)
// appear to assume that a table's index refers only to rows within that table,
// and does not handle the case where a `TxState` index refers to `CommittedState` rows.
if let Some(committed_table) = commit_table {
insert_index.build_from_rows(&columns, committed_table.scan_rows(commit_blob_store))?;
//
// TODO(centril): An alternative here is to actually add this index to `CommittedState`,
// pretending that it was already committed, and recording this pretense.
// Then, we can roll that back on a failed tx.
if let Some(commit_table) = commit_table {
build_from_rows(commit_table, commit_blob_store)?;
}

table.indexes.insert(columns.clone(), insert_index);
// Associate `index_id -> (table_id, col_list)` for fast lookup.
idx_map.insert(index_id, (table_id, columns.clone()));
Expand Down
Loading

0 comments on commit 70aba73

Please sign in to comment.