Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/lemon-cars-count.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'svelte': patch
---

fix: rebase pending batches when other batches are committed
206 changes: 126 additions & 80 deletions packages/svelte/src/internal/client/reactivity/batch.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/** @import { Derived, Effect, Source } from '#client' */
/** @import { Derived, Effect, Source, Value } from '#client' */
import {
BLOCK_EFFECT,
BRANCH_EFFECT,
Expand All @@ -10,10 +10,11 @@ import {
INERT,
RENDER_EFFECT,
ROOT_EFFECT,
MAYBE_DIRTY
MAYBE_DIRTY,
DERIVED
} from '#client/constants';
import { async_mode_flag } from '../../flags/index.js';
import { deferred, define_property } from '../../shared/utils.js';
import { deferred, define_property, noop } from '../../shared/utils.js';
import {
active_effect,
is_dirty,
Expand Down Expand Up @@ -97,22 +98,8 @@ export class Batch {
#deferred = null;

/**
* True if an async effect inside this batch resolved and
* its parent branch was already deleted
*/
#neutered = false;

/**
* Async effects (created inside `async_derived`) encountered during processing.
* These run after the rest of the batch has updated, since they should
* always have the latest values
* @type {Effect[]}
*/
#async_effects = [];

/**
* The same as `#async_effects`, but for effects inside a newly-created
* `<svelte:boundary>` — these do not prevent the batch from committing
* Async effects inside a newly-created `<svelte:boundary>`
* — these do not prevent the batch from committing
* @type {Effect[]}
*/
#boundary_async_effects = [];
Expand Down Expand Up @@ -165,40 +152,15 @@ export class Batch {

previous_batch = null;

/** @type {Map<Source, { v: unknown, wv: number }> | null} */
var current_values = null;

// if there are multiple batches, we are 'time travelling' —
// we need to undo the changes belonging to any batch
// other than the current one
if (async_mode_flag && batches.size > 1) {
current_values = new Map();
batch_deriveds = new Map();

for (const [source, current] of this.current) {
current_values.set(source, { v: source.v, wv: source.wv });
source.v = current;
}

for (const batch of batches) {
if (batch === this) continue;

for (const [source, previous] of batch.#previous) {
if (!current_values.has(source)) {
current_values.set(source, { v: source.v, wv: source.wv });
source.v = previous;
}
}
}
}
var revert = Batch.apply(this);

for (const root of root_effects) {
this.#traverse_effect_tree(root);
}

// if we didn't start any new async work, and no async work
// is outstanding from a previous flush, commit
if (this.#async_effects.length === 0 && this.#pending === 0) {
if (this.#pending === 0) {
this.#commit();

var render_effects = this.#render_effects;
Expand All @@ -210,7 +172,7 @@ export class Batch {

// If sources are written to, then work needs to happen in a separate batch, else prior sources would be mixed with
// newly updated sources, which could lead to infinite loops when effects run over and over again.
previous_batch = current_batch;
previous_batch = this;
current_batch = null;

flush_queued_effects(render_effects);
Expand All @@ -223,27 +185,12 @@ export class Batch {
this.#defer_effects(this.#block_effects);
}

if (current_values) {
for (const [source, { v, wv }] of current_values) {
// reset the source to the current value (unless
// it got a newer value as a result of effects running)
if (source.wv <= wv) {
source.v = v;
}
}

batch_deriveds = null;
}

for (const effect of this.#async_effects) {
update_effect(effect);
}
revert();

for (const effect of this.#boundary_async_effects) {
update_effect(effect);
}

this.#async_effects = [];
this.#boundary_async_effects = [];
}

Expand Down Expand Up @@ -272,12 +219,8 @@ export class Batch {
} else if (async_mode_flag && (flags & RENDER_EFFECT) !== 0) {
this.#render_effects.push(effect);
} else if ((flags & CLEAN) === 0) {
if ((flags & ASYNC) !== 0) {
var effects = effect.b?.is_pending()
? this.#boundary_async_effects
: this.#async_effects;

effects.push(effect);
if ((flags & ASYNC) !== 0 && effect.b?.is_pending()) {
this.#boundary_async_effects.push(effect);
} else if (is_dirty(effect)) {
if ((effect.f & BLOCK_EFFECT) !== 0) this.#block_effects.push(effect);
update_effect(effect);
Expand Down Expand Up @@ -350,10 +293,6 @@ export class Batch {
}
}

neuter() {
this.#neutered = true;
}

flush() {
if (queued_root_effects.length > 0) {
this.activate();
Expand All @@ -374,13 +313,58 @@ export class Batch {
* Append and remove branches to/from the DOM
*/
#commit() {
if (!this.#neutered) {
for (const fn of this.#callbacks) {
fn();
}
for (const fn of this.#callbacks) {
fn();
}

this.#callbacks.clear();

// If there are other pending batches, they now need to be 'rebased' —
// in other words, we re-run block/async effects with the newly
// committed state, unless the batch in question has a more
// recent value for a given source
if (batches.size > 1) {
this.#previous.clear();

let is_earlier = true;

for (const batch of batches) {
if (batch === this) {
is_earlier = false;
continue;
}

for (const [source, value] of this.current) {
if (batch.current.has(source)) {
if (is_earlier) {
// bring the value up to date
batch.current.set(source, value);
} else {
// later batch has more recent value,
// no need to re-run these effects
continue;
}
}

mark_effects(source);
}

if (queued_root_effects.length > 0) {
current_batch = batch;
const revert = Batch.apply(batch);

for (const root of queued_root_effects) {
batch.#traverse_effect_tree(root);
}

queued_root_effects = [];
revert();
}
}

current_batch = null;
}

batches.delete(this);
}

Expand All @@ -402,9 +386,6 @@ export class Batch {
schedule_effect(e);
}

this.#render_effects = [];
this.#effects = [];

this.flush();
} else {
this.deactivate();
Expand Down Expand Up @@ -444,6 +425,51 @@ export class Batch {
static enqueue(task) {
queue_micro_task(task);
}

/**
* @param {Batch} current_batch
*/
static apply(current_batch) {
if (!async_mode_flag || batches.size === 1) {
return noop;
}

// if there are multiple batches, we are 'time travelling' —
// we need to undo the changes belonging to any batch
// other than the current one

/** @type {Map<Source, { v: unknown, wv: number }>} */
var current_values = new Map();
batch_deriveds = new Map();

for (const [source, current] of current_batch.current) {
current_values.set(source, { v: source.v, wv: source.wv });
source.v = current;
}

for (const batch of batches) {
if (batch === current_batch) continue;

for (const [source, previous] of batch.#previous) {
if (!current_values.has(source)) {
current_values.set(source, { v: source.v, wv: source.wv });
source.v = previous;
}
}
}

return () => {
for (const [source, { v, wv }] of current_values) {
// reset the source to the current value (unless
// it got a newer value as a result of effects running)
if (source.wv <= wv) {
source.v = v;
}
}

batch_deriveds = null;
};
}
}

/**
Expand Down Expand Up @@ -615,6 +641,26 @@ function flush_queued_effects(effects) {
eager_block_effects = null;
}

/**
* This is similar to `mark_reactions`, but it only marks async/block effects
* so that these can re-run after another batch has been committed
* @param {Value} value
*/
function mark_effects(value) {
if (value.reactions !== null) {
for (const reaction of value.reactions) {
const flags = reaction.f;

if ((flags & DERIVED) !== 0) {
mark_effects(/** @type {Derived} */ (reaction));
} else if ((flags & (ASYNC | BLOCK_EFFECT)) !== 0) {
set_signal_status(reaction, DIRTY);
schedule_effect(/** @type {Effect} */ (reaction));
}
}
}
}

/**
* @param {Effect} signal
* @returns {void}
Expand Down
Loading
Loading