Skip to content

Commit

Permalink
fix various issues with limiting streams
Browse files Browse the repository at this point in the history
997f15c
introduces an issue where limits were not correctly enforced when adding
items in bulk.

It also fixes an issue with limits being applied differently on resets,
because we previously did not remove stream elements when they were being
re-added later. I think this was initially introduced to avoid issues
with live components not being correctly rendered because of the PHX_SKIP
optimization. To counteract this, we now remove all elements on reset to
force them being added in the correct order, but also store the live
component DOM nodes to correctly restore them.

References #2686.
  • Loading branch information
SteffenDE committed Jan 21, 2024
1 parent 2d573f7 commit 0c14b8a
Show file tree
Hide file tree
Showing 5 changed files with 359 additions and 33 deletions.
63 changes: 31 additions & 32 deletions assets/js/phoenix_live_view/dom_patch.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ export default class DOMPatch {
this.html = html
this.streams = streams
this.streamInserts = {}
this.streamComponentRestore = {}
this.targetCID = targetCID
this.cidPatch = isCid(this.targetCID)
this.pendingRemoves = []
Expand Down Expand Up @@ -108,9 +109,7 @@ export default class DOMPatch {
})
if(reset !== undefined){
DOM.all(container, `[${PHX_STREAM_REF}="${ref}"]`, child => {
if(!this.streamInserts[child.id]){
this.removeStreamChildElement(child)
}
this.removeStreamChildElement(child)
})
}
deleteIds.forEach(id => {
Expand All @@ -123,9 +122,7 @@ export default class DOMPatch {
if(isJoinPatch){
DOM.all(this.container, `[${phxUpdate}=${PHX_STREAM}]`, el => {
Array.from(el.children).forEach(child => {
if(!this.streamInserts[child.id]){
this.removeStreamChildElement(child)
}
this.removeStreamChildElement(child)
})
})
}
Expand All @@ -143,11 +140,20 @@ export default class DOMPatch {
skipFromChildren: (from) => { return from.getAttribute(phxUpdate) === PHX_STREAM },
// tell morphdom how to add a child
addChild: (parent, child) => {
let {ref, streamAt, limit} = this.getStreamInsert(child)
let {ref, streamAt} = this.getStreamInsert(child)
if(ref === undefined){ return parent.appendChild(child) }

DOM.putSticky(child, PHX_STREAM_REF, el => el.setAttribute(PHX_STREAM_REF, ref))

// we may need to restore the component, see removeStreamChildElement
if(child.getAttribute(PHX_COMPONENT)){
if(child.getAttribute(PHX_SKIP) !== null){
child = this.streamComponentRestore[child.id]
} else {
delete this.streamComponentRestore[child.id]
}
}

// streaming
if(streamAt === 0){
parent.insertAdjacentElement("afterbegin", child)
Expand All @@ -157,19 +163,6 @@ export default class DOMPatch {
let sibling = Array.from(parent.children)[streamAt]
parent.insertBefore(child, sibling)
}
let children = limit !== null && Array.from(parent.children)
let childrenToRemove = []
if(limit && limit < 0 && children.length > limit * -1){
childrenToRemove = children.slice(0, children.length + limit)
} else if(limit && limit >= 0 && children.length > limit){
childrenToRemove = children.slice(limit)
}
childrenToRemove.forEach(removeChild => {
// do not remove child as part of limit if we are re-adding it
if(!this.streamInserts[removeChild.id]){
this.removeStreamChildElement(removeChild)
}
})
},
onBeforeNodeAdded: (el) => {
DOM.maybeAddPrivateHooks(el, phxViewportTop, phxViewportBottom)
Expand Down Expand Up @@ -198,17 +191,6 @@ export default class DOMPatch {
}
added.push(el)
},
onBeforeElChildrenUpdated: (fromEl, toEl) => {
// when resetting a stream, we override the order of the streamInserts to
// the order of the elements in the diff (toEl)
if(fromEl.getAttribute(phxUpdate) === PHX_STREAM){
Array.from(toEl.children).forEach((child, idx) => {
// if this is not a reset, the streamAt must be preserved
let insert = this.streamInserts[child.id]
if(insert && insert.reset){ insert.streamAt = idx }
})
}
},
onNodeDiscarded: (el) => this.onNodeDiscarded(el),
onBeforeNodeDiscarded: (el) => {
if(el.getAttribute && el.getAttribute(PHX_PRUNE) !== null){ return true }
Expand Down Expand Up @@ -340,6 +322,11 @@ export default class DOMPatch {

removeStreamChildElement(child){
if(!this.maybePendingRemove(child)){
if(child.getAttribute(PHX_COMPONENT) && this.streamInserts[child.id]){
// live component would be removed and then be re-added;
// because of the PHX_SKIP optimization we need to temporarily store the DOM node
this.streamComponentRestore[child.id] = child
}
child.remove()
this.onNodeDiscarded(child)
}
Expand All @@ -351,7 +338,7 @@ export default class DOMPatch {
}

maybeReOrderStream(el, isNew){
let {ref, streamAt, limit} = this.getStreamInsert(el)
let {ref, streamAt} = this.getStreamInsert(el)
if(streamAt === undefined || (streamAt === 0 && !isNew)){ return }

// we need to the PHX_STREAM_REF here as well as addChild is invoked only for parents
Expand All @@ -373,6 +360,18 @@ export default class DOMPatch {
}
}
}

this.maybeLimitStream(el)
}

maybeLimitStream(el){
let {limit} = this.getStreamInsert(el)
let children = limit !== null && Array.from(el.parentElement.children)
if(limit && limit < 0 && children.length > limit * -1){
children.slice(0, children.length + limit).forEach(child => this.removeStreamChildElement(child))
} else if(limit && limit >= 0 && children.length > limit){
children.slice(limit).forEach(child => this.removeStreamChildElement(child))
}
}

transitionPendingRemoves(){
Expand Down
3 changes: 2 additions & 1 deletion lib/phoenix_live_view.ex
Original file line number Diff line number Diff line change
Expand Up @@ -1746,6 +1746,7 @@ defmodule Phoenix.LiveView do
def stream_insert(%Socket{} = socket, name, item, opts \\ []) do
at = Keyword.get(opts, :at, -1)
limit = Keyword.get(opts, :limit)

update_stream(socket, name, &LiveStream.insert_item(&1, item, at, limit))
end

Expand Down Expand Up @@ -1819,7 +1820,7 @@ defmodule Phoenix.LiveView do
Enum.reduce(items, new_socket, fn item, acc -> stream_insert(acc, name, item, opts) end)

%{} ->
config = get_in(streams, [:__configured__, name]) || []
config = get_in(streams, [:__configured__, name]) || opts

ref =
if cid = socket.assigns[:myself] do
Expand Down
1 change: 1 addition & 0 deletions test/e2e/test_helper.exs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ defmodule Phoenix.LiveViewTest.E2E.Router do
live "/stream", Phoenix.LiveViewTest.StreamLive
live "/stream/reset", Phoenix.LiveViewTest.StreamResetLive
live "/stream/reset-lc", Phoenix.LiveViewTest.StreamResetLCLive
live "/stream/limit", Phoenix.LiveViewTest.StreamLimitLive
live "/healthy/:category", Phoenix.LiveViewTest.HealthyLive

live "/upload", Phoenix.LiveViewTest.E2E.UploadLive
Expand Down
240 changes: 240 additions & 0 deletions test/e2e/tests/streams.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -293,3 +293,243 @@ test.describe("Issue #3023", () => {
await expect(await listItems(page)).toEqual(["items-a", "items-e", "items-f", "items-g", "items-b", "items-c", "items-d"]);
});
});

test.describe("stream limit - issue #2686", () => {
const listItems = async (page) => page.locator("ul > li").evaluateAll(list => list.map(el => el.id));

test("limit is enforced on mount, but not dead render", async ({ page, request }) => {
const html = await (request.get("/stream/limit").then(r => r.text()));
for (let i = 1; i <= 10; i++) {
await expect(html).toContain(`id="items-${i}"`);
}

await page.goto("/stream/limit");
await syncLV(page);

await expect(await listItems(page)).toEqual([
"items-6",
"items-7",
"items-8",
"items-9",
"items-10"
]);
});

test("removes item at front when appending and limit is negative", async ({ page }) => {
await page.goto("/stream/limit");
await syncLV(page);

// these are the defaults in the LV
await expect(page.locator("input[name='at']")).toHaveValue("-1");
await expect(page.locator("input[name='limit']")).toHaveValue("-5");

await page.getByRole("button", { name: "add 1", exact: true }).click();
await syncLV(page);

await expect(await listItems(page)).toEqual([
"items-7",
"items-8",
"items-9",
"items-10",
"items-11"
]);

await page.getByRole("button", { name: "add 10", exact: true }).click();
await syncLV(page);

await expect(await listItems(page)).toEqual([
"items-17",
"items-18",
"items-19",
"items-20",
"items-21"
]);
});

test("removes item at back when prepending and limit is positive", async ({ page }) => {
await page.goto("/stream/limit");
await syncLV(page);

await page.locator("input[name='at']").fill("0");
await page.locator("input[name='limit']").fill("5");
await page.getByRole("button", { name: "recreate stream" }).click();
await syncLV(page);

await expect(await listItems(page)).toEqual([
"items-10",
"items-9",
"items-8",
"items-7",
"items-6"
]);

await page.getByRole("button", { name: "add 1", exact: true }).click();
await syncLV(page);

await expect(await listItems(page)).toEqual([
"items-11",
"items-10",
"items-9",
"items-8",
"items-7"
]);

await page.getByRole("button", { name: "add 10", exact: true }).click();
await syncLV(page);

await expect(await listItems(page)).toEqual([
"items-21",
"items-20",
"items-19",
"items-18",
"items-17"
]);
});

test("does nothing if appending and positive limit is reached", async ({ page }) => {
await page.goto("/stream/limit");
await syncLV(page);

await page.locator("input[name='at']").fill("-1");
await page.locator("input[name='limit']").fill("5");
await page.getByRole("button", { name: "recreate stream" }).click();
await syncLV(page);

await page.getByRole("button", { name: "clear" }).click();
await syncLV(page);

await expect(await listItems(page)).toEqual([]);

const items = [];
for (let i = 1; i <= 5; i++) {
await page.getByRole("button", { name: "add 1", exact: true }).click();
await syncLV(page);
items.push(`items-${i}`);
await expect(await listItems(page)).toEqual(items);
}

// now adding new items should do nothing, as the limit is reached
await page.getByRole("button", { name: "add 1", exact: true }).click();
await syncLV(page);

await expect(await listItems(page)).toEqual([
"items-1",
"items-2",
"items-3",
"items-4",
"items-5"
]);

// same when bulk inserting
await page.getByRole("button", { name: "add 10", exact: true }).click();
await syncLV(page);

await expect(await listItems(page)).toEqual([
"items-1",
"items-2",
"items-3",
"items-4",
"items-5"
]);
});

test("does nothing if prepending and negative limit is reached", async ({ page }) => {
await page.goto("/stream/limit");
await syncLV(page);

await page.locator("input[name='at']").fill("0");
await page.locator("input[name='limit']").fill("-5");
await page.getByRole("button", { name: "recreate stream" }).click();
await syncLV(page);

await page.getByRole("button", { name: "clear" }).click();
await syncLV(page);

await expect(await listItems(page)).toEqual([]);

const items = [];
for (let i = 1; i <= 5; i++) {
await page.getByRole("button", { name: "add 1", exact: true }).click();
await syncLV(page);
items.unshift(`items-${i}`);
await expect(await listItems(page)).toEqual(items);
}

// now adding new items should do nothing, as the limit is reached
await page.getByRole("button", { name: "add 1", exact: true }).click();
await syncLV(page);

await expect(await listItems(page)).toEqual([
"items-5",
"items-4",
"items-3",
"items-2",
"items-1"
]);

// same when bulk inserting
await page.getByRole("button", { name: "add 10", exact: true }).click();
await syncLV(page);

await expect(await listItems(page)).toEqual([
"items-5",
"items-4",
"items-3",
"items-2",
"items-1"
]);
});

test("arbitrary index", async ({ page }) => {
await page.goto("/stream/limit");
await syncLV(page);

await page.locator("input[name='at']").fill("1");
await page.locator("input[name='limit']").fill("5");
await page.getByRole("button", { name: "recreate stream" }).click();
await syncLV(page);

// we tried to insert 10 items
await expect(await listItems(page)).toEqual([
"items-1",
"items-10",
"items-9",
"items-8",
"items-7"
]);

await page.getByRole("button", { name: "add 10", exact: true }).click();
await syncLV(page);
await expect(await listItems(page)).toEqual([
"items-1",
"items-20",
"items-19",
"items-18",
"items-17"
]);

await page.locator("input[name='at']").fill("1");
await page.locator("input[name='limit']").fill("-5");
await page.getByRole("button", { name: "recreate stream" }).click();
await syncLV(page);

// we tried to insert 10 items
await expect(await listItems(page)).toEqual([
"items-10",
"items-5",
"items-4",
"items-3",
"items-2"
]);

await page.getByRole("button", { name: "add 10", exact: true }).click();
await syncLV(page);
await expect(await listItems(page)).toEqual([
"items-20",
"items-5",
"items-4",
"items-3",
"items-2"
]);
});
});
Loading

0 comments on commit 0c14b8a

Please sign in to comment.