diff --git a/crates/service/src/worker.rs b/crates/service/src/worker.rs index ec6180e1d..473b89a08 100644 --- a/crates/service/src/worker.rs +++ b/crates/service/src/worker.rs @@ -1,5 +1,4 @@ use crate::instance::*; -use crate::version::Version; use arc_swap::ArcSwap; use base::index::*; use base::search::*; @@ -31,7 +30,6 @@ impl Worker { }); let protect = WorkerProtect { startup, indexes }; sync_dir(&path); - Version::write(path.join("metadata")); Arc::new(Worker { path, protect: Mutex::new(protect), diff --git a/src/index/am.rs b/src/index/am.rs index d2bc2ef87..7e9316812 100644 --- a/src/index/am.rs +++ b/src/index/am.rs @@ -143,12 +143,42 @@ pub unsafe extern "C" fn ambuild( index_relation, Some((heap_relation, index_info, result.as_ptr())), ); + make_well_formed(index_relation); result.into_pg() } #[pgrx::pg_guard] -pub unsafe extern "C" fn ambuildempty(index_relation: pgrx::pg_sys::Relation) { - am_build::build(index_relation, None); +pub unsafe extern "C" fn ambuildempty(_index: pgrx::pg_sys::Relation) {} + +unsafe fn make_well_formed(index_relation: pgrx::pg_sys::Relation) { + unsafe { + let meta_buffer = pgrx::pg_sys::ReadBuffer(index_relation, 0xFFFFFFFF /* P_NEW */); + pgrx::pg_sys::LockBuffer(meta_buffer, pgrx::pg_sys::BUFFER_LOCK_EXCLUSIVE as _); + assert!(pgrx::pg_sys::BufferGetBlockNumber(meta_buffer) == 0); + let state = pgrx::pg_sys::GenericXLogStart(index_relation); + let meta_page = pgrx::pg_sys::GenericXLogRegisterBuffer( + state, + meta_buffer, + pgrx::pg_sys::GENERIC_XLOG_FULL_IMAGE as _, + ); + meta_page.write_bytes(0, pgrx::pg_sys::BLCKSZ as usize); + pgrx::pg_sys::GenericXLogFinish(state); + pgrx::pg_sys::UnlockReleaseBuffer(meta_buffer); + } +} + +unsafe fn check_well_formed(index_relation: pgrx::pg_sys::Relation) { + if !test_well_formed(index_relation) { + am_build::build(index_relation, None); + make_well_formed(index_relation); + } +} + +unsafe fn test_well_formed(index_relation: pgrx::pg_sys::Relation) -> bool { + pgrx::pg_sys::RelationGetNumberOfBlocksInFork( + index_relation, + pgrx::pg_sys::ForkNumber_MAIN_FORKNUM, + ) == 1 } #[pgrx::pg_guard] @@ -162,6 +192,7 @@ pub unsafe extern "C" fn aminsert( _index_unchanged: bool, _index_info: *mut pgrx::pg_sys::IndexInfo, ) -> bool { + check_well_formed(index_relation); let oid = (*index_relation).rd_id; let id = get_handle(oid); let vector = from_datum(*values.add(0), *is_null.add(0)); @@ -177,6 +208,7 @@ pub unsafe extern "C" fn ambeginscan( n_keys: std::os::raw::c_int, n_orderbys: std::os::raw::c_int, ) -> pgrx::pg_sys::IndexScanDesc { + check_well_formed(index_relation); assert!(n_keys == 0); assert!(n_orderbys == 1); am_scan::make_scan(index_relation) @@ -218,6 +250,9 @@ pub unsafe extern "C" fn ambulkdelete( callback: pgrx::pg_sys::IndexBulkDeleteCallback, callback_state: *mut std::os::raw::c_void, ) -> *mut pgrx::pg_sys::IndexBulkDeleteResult { + if !test_well_formed((*info).index) { + pgrx::warning!("The vector index is not initialized."); + } let oid = (*(*info).index).rd_id; let id = get_handle(oid); if let Some(callback) = callback { @@ -234,9 +269,12 @@ pub unsafe extern "C" fn ambulkdelete( #[pgrx::pg_guard] pub unsafe extern "C" fn amvacuumcleanup( - _info: *mut pgrx::pg_sys::IndexVacuumInfo, + info: *mut pgrx::pg_sys::IndexVacuumInfo, _stats: *mut pgrx::pg_sys::IndexBulkDeleteResult, ) -> *mut pgrx::pg_sys::IndexBulkDeleteResult { + if !test_well_formed((*info).index) { + pgrx::warning!("The vector index is not initialized."); + } let result = pgrx::PgBox::::alloc0(); result.into_pg() } diff --git a/src/index/hook_maintain.rs b/src/index/hook_maintain.rs index e9b6d400c..cd28d5137 100644 --- a/src/index/hook_maintain.rs +++ b/src/index/hook_maintain.rs @@ -80,7 +80,7 @@ pub unsafe fn maintain_index_in_object_access( pgrx::pg_sys::ObjectAccessType_OAT_DROP => { let search = pgrx::pg_catalog::PgClass::search_reloid(object_id).unwrap(); if let Some(pg_class) = search.get() { - if let Some(()) = check(pg_class) { + if let Some(()) = check_vector_index(pg_class) { let handle = get_handle(object_id); let mut t = TRANSACTION.borrow_mut(); match t.index.get(&handle) { @@ -110,7 +110,7 @@ pub unsafe fn maintain_index_in_object_access( } } -fn check(pg_class: pgrx::pg_catalog::PgClass<'_>) -> Option<()> { +fn check_vector_index(pg_class: pgrx::pg_catalog::PgClass<'_>) -> Option<()> { if pg_class.relkind() != pgrx::pg_catalog::PgClassRelkind::Index { return None; } @@ -124,10 +124,10 @@ fn check(pg_class: pgrx::pg_catalog::PgClass<'_>) -> Option<()> { return None; } // probably a vector index, so enter a slow path to ensure it - slow_path(pg_am) + check_vector_index_slow_path(pg_am) } -fn slow_path(pg_am: pgrx::pg_catalog::PgAm<'_>) -> Option<()> { +fn check_vector_index_slow_path(pg_am: pgrx::pg_catalog::PgAm<'_>) -> Option<()> { let amhandler = pg_am.amhandler(); let mut flinfo = unsafe { let mut flinfo = pgrx::pg_sys::FmgrInfo::default(); diff --git a/tests/unlogged/test.sh b/tests/unlogged/test.sh new file mode 100755 index 000000000..15ec9b4a6 --- /dev/null +++ b/tests/unlogged/test.sh @@ -0,0 +1,27 @@ +#!/usr/bin/env bash +set -e + +d=$(psql -U postgres -tAqX -c "SELECT CURRENT_SETTING('data_directory')")/pg_vectors/indexes + +a=$(sudo ls -1 $d | wc -l) + +printf "entries = $a\n" + +psql -f $(dirname $0)/test.sql & +sleep 10s + +sudo killall -9 postgres +sleep 1s + +sudo systemctl restart postgresql + +b=$(sudo ls -1 $d | wc -l) + +printf "entries = $b\n" + +if [ "$a" == "$b" ]; then + echo "Unlogged test [OK]" +else + echo "Unlogged test [FAILED]" + exit 1 +fi diff --git a/tests/unlogged/test.sql b/tests/unlogged/test.sql new file mode 100644 index 000000000..8fd59e49f --- /dev/null +++ b/tests/unlogged/test.sql @@ -0,0 +1,7 @@ +CREATE UNLOGGED TABLE tests_unlogged_t(val vector(3)); + +INSERT INTO tests_unlogged_t (val) SELECT ARRAY[random(), random(), random()]::real[] FROM generate_series(1, 1000); + +CREATE INDEX ON tests_unlogged_t USING vectors (val vector_l2_ops); + +SELECT pg_sleep(60); -- 60 seconds