diff --git a/src/lib/vacuum-filter/vacuum-filter.ts b/src/lib/vacuum-filter/vacuum-filter.ts index 4415f7324..2e8c802f3 100644 --- a/src/lib/vacuum-filter/vacuum-filter.ts +++ b/src/lib/vacuum-filter/vacuum-filter.ts @@ -3,6 +3,28 @@ import { randomBytes } from "@/lib/vacuum-filter/random"; const textEncoder = new TextEncoder(); const BUCKET_SIZE = 4 as const; const DEFAULT_SCRATCH_BYTES = 256; +const INV_2_32 = 1 / 4294967296; +const FAST_REDUCE_MAX_BUCKETS = 1 << 21; // 2^21:保证 hv(32-bit) * buckets 在 IEEE754 中仍可精确表示整数 +const IS_LITTLE_ENDIAN = (() => { + const buf = new ArrayBuffer(4); + const view = new DataView(buf); + view.setUint32(0, 0x11223344, true); + return new Uint32Array(buf)[0] === 0x11223344; +})(); + +function computeFastReduceParams(numBuckets: number): { + bucketMask: number; + fastReduceMul: number | null; +} { + // 1) numBuckets 为 2 的幂:位与最快 + // 2) 否则使用 multiply-high 等价式:floor(hvIndex * numBuckets / 2^32) + // 该实现依赖 IEEE754 精度:当 numBuckets <= 2^21 时,32-bit hvIndex 与 numBuckets 的乘积 < 2^53, + // 因而计算与截断都保持精确,结果必定落在 [0, numBuckets)。 + const bucketMask = (numBuckets & (numBuckets - 1)) === 0 ? numBuckets - 1 : 0; + const fastReduceMul = + bucketMask === 0 && numBuckets <= FAST_REDUCE_MAX_BUCKETS ? numBuckets * INV_2_32 : null; + return { bucketMask, fastReduceMul }; +} /** * Vacuum Filter(真空过滤器) @@ -174,16 +196,19 @@ function readU32LE(bytes: Uint8Array, offset: number): number { ); } -// MurmurHash3 x86 32-bit x2(共享同一份 bytes 扫描;用于生成 index/tag) -function murmur3X86_32x2( - bytes: Uint8Array, - len: number, - seedA: number, - seedB: number, - out: Uint32Array -): void { - let hA = seedA >>> 0; - let hB = seedB >>> 0; +function fmix32(h: number): number { + let x = h >>> 0; + x ^= x >>> 16; + x = Math.imul(x, 0x85ebca6b) >>> 0; + x ^= x >>> 13; + x = Math.imul(x, 0xc2b2ae35) >>> 0; + x ^= x >>> 16; + return x >>> 0; +} + +// MurmurHash3 x86 32-bit(用于生成 index;tag 由二次混合派生) +function murmur3X86_32(bytes: Uint8Array, len: number, seed: number, words?: Uint32Array): number { + let h = seed >>> 0; const c1 = 0xcc9e2d51; const c2 = 0x1b873593; @@ -191,22 +216,35 @@ function murmur3X86_32x2( const nblocks = (length / 4) | 0; const blockLen = nblocks * 4; - for (let base = 0; base < blockLen; base += 4) { - let k = - (bytes[base] | (bytes[base + 1] << 8) | (bytes[base + 2] << 16) | (bytes[base + 3] << 24)) >>> - 0; - - k = Math.imul(k, c1) >>> 0; - k = ((k << 15) | (k >>> 17)) >>> 0; - k = Math.imul(k, c2) >>> 0; + if (words && IS_LITTLE_ENDIAN && bytes.byteOffset === 0) { + for (let i = 0; i < nblocks; i++) { + let k = words[i] >>> 0; - hA ^= k; - hA = ((hA << 13) | (hA >>> 19)) >>> 0; - hA = (Math.imul(hA, 5) + 0xe6546b64) >>> 0; + k = Math.imul(k, c1) >>> 0; + k = ((k << 15) | (k >>> 17)) >>> 0; + k = Math.imul(k, c2) >>> 0; - hB ^= k; - hB = ((hB << 13) | (hB >>> 19)) >>> 0; - hB = (Math.imul(hB, 5) + 0xe6546b64) >>> 0; + h ^= k; + h = ((h << 13) | (h >>> 19)) >>> 0; + h = (Math.imul(h, 5) + 0xe6546b64) >>> 0; + } + } else { + for (let base = 0; base < blockLen; base += 4) { + let k = + (bytes[base] | + (bytes[base + 1] << 8) | + (bytes[base + 2] << 16) | + (bytes[base + 3] << 24)) >>> + 0; + + k = Math.imul(k, c1) >>> 0; + k = ((k << 15) | (k >>> 17)) >>> 0; + k = Math.imul(k, c2) >>> 0; + + h ^= k; + h = ((h << 13) | (h >>> 19)) >>> 0; + h = (Math.imul(h, 5) + 0xe6546b64) >>> 0; + } } // tail @@ -224,28 +262,11 @@ function murmur3X86_32x2( k1 = Math.imul(k1, c1) >>> 0; k1 = ((k1 << 15) | (k1 >>> 17)) >>> 0; k1 = Math.imul(k1, c2) >>> 0; - hA ^= k1; - hB ^= k1; + h ^= k1; } - // fmix (A) - hA ^= length; - hA ^= hA >>> 16; - hA = Math.imul(hA, 0x85ebca6b) >>> 0; - hA ^= hA >>> 13; - hA = Math.imul(hA, 0xc2b2ae35) >>> 0; - hA ^= hA >>> 16; - - // fmix (B) - hB ^= length; - hB ^= hB >>> 16; - hB = Math.imul(hB, 0x85ebca6b) >>> 0; - hB ^= hB >>> 13; - hB = Math.imul(hB, 0xc2b2ae35) >>> 0; - hB ^= hB >>> 16; - - out[0] = hA >>> 0; - out[1] = hB >>> 0; + h ^= length; + return fmix32(h); } export class VacuumFilter { @@ -261,12 +282,14 @@ export class VacuumFilter { private readonly lenMasks: [number, number, number, number]; private readonly numBuckets: number; + private readonly bucketMask: number; + private readonly fastReduceMul: number | null; private readonly table: Uint32Array; private numItems = 0; // 热路径优化:避免 TextEncoder.encode 分配;每次 has/add/delete 复用同一块 scratch private scratch: Uint8Array = new Uint8Array(DEFAULT_SCRATCH_BYTES); - private readonly hashOut: Uint32Array = new Uint32Array(2); + private scratch32: Uint32Array = new Uint32Array(this.scratch.buffer); private tmpIndex = 0; private tmpTag = 0; @@ -320,6 +343,9 @@ export class VacuumFilter { const mask = bigSeg - 1; this.lenMasks = [mask, mask, mask, mask]; this.numBuckets = bucketCount; + const fast = computeFastReduceParams(this.numBuckets); + this.bucketMask = fast.bucketMask; + this.fastReduceMul = fast.fastReduceMul; this.table = new Uint32Array(this.numBuckets * BUCKET_SIZE); return; } @@ -341,6 +367,9 @@ export class VacuumFilter { const segLens = [l0 + 1, l1 + 1, l2 + 1, l3 + 1]; const maxSegLen = Math.max(...segLens); this.numBuckets = roundUpToMultiple(bucketCount, upperPower2(maxSegLen)); + const fast = computeFastReduceParams(this.numBuckets); + this.bucketMask = fast.bucketMask; + this.fastReduceMul = fast.fastReduceMul; this.table = new Uint32Array(this.numBuckets * BUCKET_SIZE); } @@ -432,32 +461,60 @@ export class VacuumFilter { private indexTag(key: string): void { // 使用 seeded MurmurHash3(32-bit)生成确定性哈希,降低可控输入退化风险 - // 关键优化:ASCII 快路径(API Key/ID 通常为 ASCII),避免 TextEncoder.encode 分配 + // 关键优化:尽量走 TextEncoder.encodeInto(无分配,且编码在原生层完成) const strLen = key.length; if (this.scratch.length < strLen) { - this.scratch = new Uint8Array(Math.max(this.scratch.length * 2, strLen)); + // 注意:scratch32 需要 4-byte 对齐;否则 new Uint32Array(buffer) 会抛 RangeError。 + const newLen = roundUpToMultiple(Math.max(this.scratch.length * 2, strLen), 4); + this.scratch = new Uint8Array(newLen); + this.scratch32 = new Uint32Array(this.scratch.buffer); } - let asciiLen = 0; - for (; asciiLen < strLen; asciiLen++) { - const c = key.charCodeAt(asciiLen); - if (c > 0x7f) break; - this.scratch[asciiLen] = c; + // encodeInto 可能因 out buffer 不足而截断:read < strLen 时扩容重试 + let encoded = textEncoder.encodeInto(key, this.scratch); + if (encoded.read < strLen) { + // UTF-8 最坏 4 bytes/char;用 4x 作为上界(仅影响少见的非 ASCII key) + const newLen = roundUpToMultiple(Math.max(this.scratch.length * 2, strLen * 4), 4); + this.scratch = new Uint8Array(newLen); + this.scratch32 = new Uint32Array(this.scratch.buffer); + encoded = textEncoder.encodeInto(key, this.scratch); } - if (asciiLen === strLen) { - murmur3X86_32x2(this.scratch, strLen, this.hashSeedA, this.hashSeedB, this.hashOut); + // 极端情况下 encodeInto 仍可能因缓冲不足而截断:回退到 encode(保证正确性) + let bytes: Uint8Array; + let byteLen: number; + let words: Uint32Array | undefined; + + if (encoded.read < strLen) { + bytes = textEncoder.encode(key); + byteLen = bytes.length; + words = undefined; } else { - // 非 ASCII:交给 TextEncoder(少见路径) - const keyBytes = textEncoder.encode(key); - murmur3X86_32x2(keyBytes, keyBytes.length, this.hashSeedA, this.hashSeedB, this.hashOut); + bytes = this.scratch; + byteLen = encoded.written; + words = this.scratch32; } - const hvIndex = this.hashOut[0] >>> 0; - const hvTag = this.hashOut[1] >>> 0; - - // 参考实现使用 `hash % numBuckets`。这里保持简单、快速(即便 numBuckets 非 2 的幂也可用)。 - const index = hvIndex % this.numBuckets; + const hvIndex = murmur3X86_32(bytes, byteLen, this.hashSeedA, words); + + // tag 从 index hash 二次混合派生(避免再扫一遍 bytes) + // 注意:tag 不再来自“第二份独立 hash”。这会降低 (index, tag) 的独立性,但在 32-bit fingerprint 场景下碰撞概率仍极低。 + const hvTag = fmix32((hvIndex ^ this.hashSeedB) >>> 0); + + // 参考实现使用 `hash % numBuckets`;这里做一个“尽量快”的等价映射: + // - numBuckets 为 2 的幂:位与(最快) + // - numBuckets 较小:使用 multiply-high 等价式(避免 `%`) + // - 其它:回退到 `%` + const bucketMask = this.bucketMask; + const fastReduceMul = this.fastReduceMul; + // fastReduceMul != null 时:value = hvIndex * numBuckets / 2^32 < numBuckets <= 2^21, + // 因此 `>>> 0` 与 Math.floor 等价且不会发生 2^32 wrap。 + const index = + bucketMask !== 0 + ? (hvIndex & bucketMask) >>> 0 + : fastReduceMul !== null + ? (hvIndex * fastReduceMul) >>> 0 + : hvIndex % this.numBuckets; let tag = (hvTag & this.tagMask) >>> 0; if (tag === 0) tag = 1; diff --git a/tests/unit/lib/endpoint-circuit-breaker.test.ts b/tests/unit/lib/endpoint-circuit-breaker.test.ts index 90b938f9f..7efa8ce92 100644 --- a/tests/unit/lib/endpoint-circuit-breaker.test.ts +++ b/tests/unit/lib/endpoint-circuit-breaker.test.ts @@ -19,6 +19,12 @@ function createLoggerMock() { }; } +async function flushPromises(rounds = 2): Promise { + for (let i = 0; i < rounds; i++) { + await new Promise((resolve) => setTimeout(resolve, 0)); + } +} + afterEach(() => { vi.useRealTimers(); }); @@ -134,6 +140,10 @@ describe("endpoint-circuit-breaker", () => { findProviderEndpointById: vi.fn(async () => null), })); + // recordEndpointFailure 会 non-blocking 触发告警;先让 event-loop 跑完再清空计数,避免串台导致误判 + await flushPromises(); + sendAlertMock.mockClear(); + const { triggerEndpointCircuitBreakerAlert } = await import("@/lib/endpoint-circuit-breaker"); await triggerEndpointCircuitBreakerAlert( @@ -184,6 +194,10 @@ describe("endpoint-circuit-breaker", () => { })), })); + // recordEndpointFailure 会 non-blocking 触发告警;先让 event-loop 跑完再清空计数,避免串台导致误判 + await flushPromises(); + sendAlertMock.mockClear(); + const { triggerEndpointCircuitBreakerAlert } = await import("@/lib/endpoint-circuit-breaker"); await triggerEndpointCircuitBreakerAlert(10, 3, "2026-01-01T00:05:00.000Z", "timeout"); diff --git a/tests/unit/vacuum-filter/vacuum-filter-has.bench.test.ts b/tests/unit/vacuum-filter/vacuum-filter-has.bench.test.ts new file mode 100644 index 000000000..2c5aec50a --- /dev/null +++ b/tests/unit/vacuum-filter/vacuum-filter-has.bench.test.ts @@ -0,0 +1,222 @@ +import { VacuumFilter } from "@/lib/vacuum-filter/vacuum-filter"; +import os from "node:os"; +import { describe, expect, test } from "vitest"; + +// 说明: +// - 这是一个“可复现的本机 microbench”,用于量化 VacuumFilter.has 的优化收益。 +// - 默认跳过(避免 CI/本地 `npm test` 触发超时);需要显式开启: +// - *nix: RUN_VACUUM_FILTER_BENCH=1 node --expose-gc node_modules/vitest/vitest.mjs run tests/unit/vacuum-filter/vacuum-filter-has.bench.test.ts +// - pwsh: $env:RUN_VACUUM_FILTER_BENCH='1'; node --expose-gc node_modules/vitest/vitest.mjs run tests/unit/vacuum-filter/vacuum-filter-has.bench.test.ts + +const shouldRunBench = process.env.RUN_VACUUM_FILTER_BENCH === "1"; +const benchTest = shouldRunBench ? test : test.skip; + +function xorshift32(seed: number): () => number { + let s = seed >>> 0; + return () => { + s ^= (s << 13) >>> 0; + s ^= s >>> 17; + s ^= (s << 5) >>> 0; + return s >>> 0; + }; +} + +const ALPHABET = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789_-"; + +function makeAsciiKey(rng: () => number, len: number): string { + let out = ""; + for (let i = 0; i < len; i++) out += ALPHABET[rng() % ALPHABET.length]; + return out; +} + +function freshSameContent(s: string): string { + // 让 V8 很难复用同一个 string 实例(模拟“请求头解析后每次都是新字符串对象”) + return (" " + s).slice(1); +} + +function median(values: number[]): number { + if (values.length === 0) return Number.NaN; + const sorted = values.slice().sort((a, b) => a - b); + const mid = (sorted.length / 2) | 0; + return sorted.length % 2 === 1 ? sorted[mid] : (sorted[mid - 1] + sorted[mid]) / 2; +} + +function nsPerOp(elapsedNs: bigint, ops: number): number { + return Number(elapsedNs) / ops; +} + +function opsPerSecFromNsPerOp(ns: number): number { + return 1e9 / ns; +} + +function gcIfAvailable(): void { + const g = (globalThis as unknown as { gc?: () => void }).gc; + if (typeof g === "function") g(); +} + +type Scenario = { + name: string; + kind: "hit" | "miss"; + strings: "reuse" | "single_use"; + makeLookups: () => string[]; + expectedHitsSet: number; + expectedHitsVf: number | null; // miss 场景 VF 允许 false positive +}; + +function formatMops(opsPerSec: number): string { + return `${(opsPerSec / 1e6).toFixed(2)} Mops/s`; +} + +describe("VacuumFilter.has bench (local only)", () => { + benchTest( + "quantify Set.has vs VacuumFilter.has", + () => { + const N = 50_000; + const KEY_LEN = 48; + const LOOKUPS = 200_000; + const WARMUP_ROUNDS = 2; + const MEASURE_ROUNDS = 7; + + // eslint-disable-next-line no-console + console.log( + `[bench-env] node=${process.version} v8=${process.versions.v8} platform=${process.platform} arch=${process.arch}` + ); + // eslint-disable-next-line no-console + console.log(`[bench-env] cpu=${os.cpus()[0]?.model ?? "unknown"}`); + // eslint-disable-next-line no-console + console.log( + `[bench-params] N=${N} keyLen=${KEY_LEN} lookups=${LOOKUPS} warmup=${WARMUP_ROUNDS} rounds=${MEASURE_ROUNDS}` + ); + + const rng = xorshift32(0x12345678); + const keys: string[] = Array.from({ length: N }, () => makeAsciiKey(rng, KEY_LEN)); + + const set = new Set(keys); + const vf = new VacuumFilter({ maxItems: N, fingerprintBits: 32, seed: "bench-seed" }); + for (const k of keys) { + expect(vf.add(k)).toBe(true); + } + + const lookupIdx: number[] = Array.from({ length: LOOKUPS }, () => rng() % N); + + const reusedHits: string[] = lookupIdx.map((i) => keys[i]); + const reusedMisses: string[] = lookupIdx.map((i) => `!${keys[i].slice(1)}`); // '!' 不在 ALPHABET,保证 miss + + // Sanity: misses must be misses for Set + for (let i = 0; i < 1000; i++) { + expect(set.has(reusedMisses[i])).toBe(false); + } + + const scenarios: Scenario[] = [ + { + name: "hit/reuse", + kind: "hit", + strings: "reuse", + makeLookups: () => reusedHits, + expectedHitsSet: LOOKUPS, + expectedHitsVf: LOOKUPS, + }, + { + name: "miss/reuse", + kind: "miss", + strings: "reuse", + makeLookups: () => reusedMisses, + expectedHitsSet: 0, + expectedHitsVf: null, + }, + { + name: "hit/single_use", + kind: "hit", + strings: "single_use", + makeLookups: () => lookupIdx.map((i) => freshSameContent(keys[i])), + expectedHitsSet: LOOKUPS, + expectedHitsVf: LOOKUPS, + }, + { + name: "miss/single_use", + kind: "miss", + strings: "single_use", + makeLookups: () => lookupIdx.map((i) => freshSameContent(`!${keys[i].slice(1)}`)), + expectedHitsSet: 0, + expectedHitsVf: null, + }, + ]; + + function runSet(lookups: string[]): number { + let hits = 0; + for (let i = 0; i < lookups.length; i++) hits += set.has(lookups[i]) ? 1 : 0; + return hits; + } + + function runVf(lookups: string[]): number { + let hits = 0; + for (let i = 0; i < lookups.length; i++) hits += vf.has(lookups[i]) ? 1 : 0; + return hits; + } + + for (const scenario of scenarios) { + const setNsSamples: number[] = []; + const vfNsSamples: number[] = []; + + // Warmup(同时也让 Set 可能缓存 string hash;这正是需要量化的差异) + for (let round = 0; round < WARMUP_ROUNDS; round++) { + const lookups = scenario.makeLookups(); + runSet(lookups); + runVf(lookups); + } + + for (let round = 0; round < MEASURE_ROUNDS; round++) { + const lookups = scenario.makeLookups(); + + // 交替测量顺序,减少“先测导致的 cache/JIT 影响” + const measureSetFirst = round % 2 === 0; + if (measureSetFirst) { + gcIfAvailable(); + const t0 = process.hrtime.bigint(); + const hitsSet = runSet(lookups); + const t1 = process.hrtime.bigint(); + setNsSamples.push(nsPerOp(t1 - t0, LOOKUPS)); + expect(hitsSet).toBe(scenario.expectedHitsSet); + + gcIfAvailable(); + const t2 = process.hrtime.bigint(); + const hitsVf = runVf(lookups); + const t3 = process.hrtime.bigint(); + vfNsSamples.push(nsPerOp(t3 - t2, LOOKUPS)); + if (typeof scenario.expectedHitsVf === "number") + expect(hitsVf).toBe(scenario.expectedHitsVf); + } else { + gcIfAvailable(); + const t0 = process.hrtime.bigint(); + const hitsVf = runVf(lookups); + const t1 = process.hrtime.bigint(); + vfNsSamples.push(nsPerOp(t1 - t0, LOOKUPS)); + if (typeof scenario.expectedHitsVf === "number") + expect(hitsVf).toBe(scenario.expectedHitsVf); + + gcIfAvailable(); + const t2 = process.hrtime.bigint(); + const hitsSet = runSet(lookups); + const t3 = process.hrtime.bigint(); + setNsSamples.push(nsPerOp(t3 - t2, LOOKUPS)); + expect(hitsSet).toBe(scenario.expectedHitsSet); + } + } + + const setMedianNs = median(setNsSamples); + const vfMedianNs = median(vfNsSamples); + const setMedianOps = opsPerSecFromNsPerOp(setMedianNs); + const vfMedianOps = opsPerSecFromNsPerOp(vfMedianNs); + const ratio = vfMedianNs / setMedianNs; + + // eslint-disable-next-line no-console + console.log( + `[bench] ${scenario.name} Set=${formatMops(setMedianOps)} (${setMedianNs.toFixed(1)} ns/op) | ` + + `VF=${formatMops(vfMedianOps)} (${vfMedianNs.toFixed(1)} ns/op) | ` + + `VF/Set=${ratio.toFixed(2)}x` + ); + } + }, + 60_000 + ); +}); diff --git a/tests/unit/vacuum-filter/vacuum-filter.test.ts b/tests/unit/vacuum-filter/vacuum-filter.test.ts index a05a8db24..d7ab167b9 100644 --- a/tests/unit/vacuum-filter/vacuum-filter.test.ts +++ b/tests/unit/vacuum-filter/vacuum-filter.test.ts @@ -107,6 +107,55 @@ describe("VacuumFilter", () => { } }); + test("超长 key 也应可用(避免 scratch32 对齐导致 RangeError)", () => { + const vf = new VacuumFilter({ + maxItems: 1000, + fingerprintBits: 32, + maxKickSteps: 500, + seed: "unit-test-long-key", + }); + + // 触发 scratch 扩容:> DEFAULT_SCRATCH_BYTES*2 且不是 4 的倍数 + const longKey = "a".repeat(1001); + expect(vf.add(longKey)).toBe(true); + expect(vf.has(longKey)).toBe(true); + expect(vf.delete(longKey)).toBe(true); + expect(vf.has(longKey)).toBe(false); + }); + + test("fast-reduce bucket index 映射不越界(非 2 的幂 numBuckets)", () => { + const vf = new VacuumFilter({ + maxItems: 20_000, + fingerprintBits: 32, + maxKickSteps: 500, + seed: "unit-test-fast-reduce-index", + }); + + const numBuckets = vf.capacitySlots() / 4; + + // @ts-expect-error: 单测需要覆盖私有字段(确保走 fast-reduce 分支) + const bucketMask = vf.bucketMask as number; + // @ts-expect-error: 单测需要覆盖私有字段(确保走 fast-reduce 分支) + const fastReduceMul = vf.fastReduceMul as number | null; + + expect(bucketMask).toBe(0); + expect(fastReduceMul).not.toBeNull(); + + const indexTag = (key: string): void => { + // @ts-expect-error: 单测需要覆盖私有方法的核心不变量 + vf.indexTag(key); + }; + + for (let i = 0; i < 10_000; i++) { + indexTag(`key_${i}`); + // @ts-expect-error: 单测需要覆盖私有字段的核心不变量 + const index = vf.tmpIndex as number; + if (index < 0 || index >= numBuckets) { + throw new Error(`tmpIndex out of range: ${index} (numBuckets=${numBuckets})`); + } + } + }); + test("alternate index 应为可逆映射(alt(alt(i,tag),tag)=i)且不越界", () => { const vf = new VacuumFilter({ maxItems: 50_000,