Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix various issues with limiting streams #3035

Merged
merged 5 commits into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 31 additions & 32 deletions assets/js/phoenix_live_view/dom_patch.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ export default class DOMPatch {
this.html = html
this.streams = streams
this.streamInserts = {}
this.streamComponentRestore = {}
this.targetCID = targetCID
this.cidPatch = isCid(this.targetCID)
this.pendingRemoves = []
Expand Down Expand Up @@ -108,9 +109,7 @@ export default class DOMPatch {
})
if(reset !== undefined){
DOM.all(container, `[${PHX_STREAM_REF}="${ref}"]`, child => {
if(!this.streamInserts[child.id]){
this.removeStreamChildElement(child)
}
this.removeStreamChildElement(child)
})
}
deleteIds.forEach(id => {
Expand All @@ -123,9 +122,7 @@ export default class DOMPatch {
if(isJoinPatch){
DOM.all(this.container, `[${phxUpdate}=${PHX_STREAM}]`, el => {
Array.from(el.children).forEach(child => {
if(!this.streamInserts[child.id]){
this.removeStreamChildElement(child)
}
this.removeStreamChildElement(child)
})
})
}
Expand All @@ -143,11 +140,20 @@ export default class DOMPatch {
skipFromChildren: (from) => { return from.getAttribute(phxUpdate) === PHX_STREAM },
// tell morphdom how to add a child
addChild: (parent, child) => {
let {ref, streamAt, limit} = this.getStreamInsert(child)
let {ref, streamAt} = this.getStreamInsert(child)
if(ref === undefined){ return parent.appendChild(child) }

DOM.putSticky(child, PHX_STREAM_REF, el => el.setAttribute(PHX_STREAM_REF, ref))

// we may need to restore the component, see removeStreamChildElement
if(child.getAttribute(PHX_COMPONENT)){
if(child.getAttribute(PHX_SKIP) !== null){
child = this.streamComponentRestore[child.id]
} else {
delete this.streamComponentRestore[child.id]
}
}

// streaming
if(streamAt === 0){
parent.insertAdjacentElement("afterbegin", child)
Expand All @@ -157,19 +163,6 @@ export default class DOMPatch {
let sibling = Array.from(parent.children)[streamAt]
parent.insertBefore(child, sibling)
}
let children = limit !== null && Array.from(parent.children)
let childrenToRemove = []
if(limit && limit < 0 && children.length > limit * -1){
childrenToRemove = children.slice(0, children.length + limit)
} else if(limit && limit >= 0 && children.length > limit){
childrenToRemove = children.slice(limit)
}
childrenToRemove.forEach(removeChild => {
// do not remove child as part of limit if we are re-adding it
if(!this.streamInserts[removeChild.id]){
this.removeStreamChildElement(removeChild)
}
})
},
onBeforeNodeAdded: (el) => {
DOM.maybeAddPrivateHooks(el, phxViewportTop, phxViewportBottom)
Expand Down Expand Up @@ -198,17 +191,6 @@ export default class DOMPatch {
}
added.push(el)
},
onBeforeElChildrenUpdated: (fromEl, toEl) => {
// when resetting a stream, we override the order of the streamInserts to
// the order of the elements in the diff (toEl)
if(fromEl.getAttribute(phxUpdate) === PHX_STREAM){
Array.from(toEl.children).forEach((child, idx) => {
// if this is not a reset, the streamAt must be preserved
let insert = this.streamInserts[child.id]
if(insert && insert.reset){ insert.streamAt = idx }
})
}
},
onNodeDiscarded: (el) => this.onNodeDiscarded(el),
onBeforeNodeDiscarded: (el) => {
if(el.getAttribute && el.getAttribute(PHX_PRUNE) !== null){ return true }
Expand Down Expand Up @@ -340,6 +322,11 @@ export default class DOMPatch {

removeStreamChildElement(child){
if(!this.maybePendingRemove(child)){
if(child.getAttribute(PHX_COMPONENT) && this.streamInserts[child.id]){
SteffenDE marked this conversation as resolved.
Show resolved Hide resolved
// live component would be removed and then be re-added;
// because of the PHX_SKIP optimization we need to temporarily store the DOM node
this.streamComponentRestore[child.id] = child
}
child.remove()
this.onNodeDiscarded(child)
}
Expand All @@ -351,7 +338,7 @@ export default class DOMPatch {
}

maybeReOrderStream(el, isNew){
let {ref, streamAt, limit} = this.getStreamInsert(el)
let {ref, streamAt} = this.getStreamInsert(el)
if(streamAt === undefined || (streamAt === 0 && !isNew)){ return }

// we need to the PHX_STREAM_REF here as well as addChild is invoked only for parents
Expand All @@ -373,6 +360,18 @@ export default class DOMPatch {
}
}
}

this.maybeLimitStream(el)
}

maybeLimitStream(el){
let {limit} = this.getStreamInsert(el)
let children = limit !== null && Array.from(el.parentElement.children)
if(limit && limit < 0 && children.length > limit * -1){
children.slice(0, children.length + limit).forEach(child => this.removeStreamChildElement(child))
} else if(limit && limit >= 0 && children.length > limit){
children.slice(limit).forEach(child => this.removeStreamChildElement(child))
}
}

transitionPendingRemoves(){
Expand Down
3 changes: 2 additions & 1 deletion lib/phoenix_live_view.ex
Original file line number Diff line number Diff line change
Expand Up @@ -1746,6 +1746,7 @@ defmodule Phoenix.LiveView do
def stream_insert(%Socket{} = socket, name, item, opts \\ []) do
at = Keyword.get(opts, :at, -1)
limit = Keyword.get(opts, :limit)

update_stream(socket, name, &LiveStream.insert_item(&1, item, at, limit))
end

Expand Down Expand Up @@ -1819,7 +1820,7 @@ defmodule Phoenix.LiveView do
Enum.reduce(items, new_socket, fn item, acc -> stream_insert(acc, name, item, opts) end)

%{} ->
config = get_in(streams, [:__configured__, name]) || []
config = get_in(streams, [:__configured__, name]) || opts
SteffenDE marked this conversation as resolved.
Show resolved Hide resolved

ref =
if cid = socket.assigns[:myself] do
Expand Down
44 changes: 33 additions & 11 deletions lib/phoenix_live_view/test/dom.ex
SteffenDE marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -430,20 +430,35 @@ defmodule Phoenix.LiveViewTest.DOM do
content_changed? && type == "stream" ->
children = updated_existing_children ++ updated_appended

# we always reduce over all streams, even if only one of them is actually relevant
# for this container...
new_children =
Enum.reduce(streams, children, fn item, acc ->
[ref, inserts, _deletes | _maybe_reset] = item
Enum.reduce(streams, updated_existing_children, fn item, acc ->
[ref, inserts, _deletes | maybe_reset] = item
reset = maybe_reset == [true]

# remove all children that are in a stream that was reset
acc =
if reset do
Enum.reject(acc, fn child ->
item_ref = attribute(child, "data-phx-stream")
item_ref == ref
end)
else
acc
end

Enum.reduce(inserts, acc, fn [id, insert_at, _limit], acc ->
old_index = Enum.find_index(acc, &(attribute(&1, "id") == id))
Enum.reduce(inserts, acc, fn [id, insert_at, limit], acc ->
old_index = Enum.find_index(children, &(attribute(&1, "id") == id))
current_index = Enum.find_index(acc, &(attribute(&1, "id") == id))

appended? = Enum.any?(updated_appended, &(attribute(&1, "id") == id))

existing? = Enum.any?(updated_existing_children, &(attribute(&1, "id") == id))
deleted? = MapSet.member?(stream_deletes, id)

child =
case old_index && Enum.at(acc, old_index) do
case old_index && Enum.at(children, old_index) do
nil -> nil
child -> set_attr(child, "data-phx-stream", ref)
end
Expand All @@ -459,19 +474,20 @@ defmodule Phoenix.LiveViewTest.DOM do
acc

# do not append existing child if already present, only update in place
old_index && insert_at == -1 && (existing? or appended?) ->
current_index && insert_at == -1 && (existing? or appended?) ->
if deleted? do
acc |> List.delete_at(old_index) |> List.insert_at(insert_at, child)
acc |> List.delete_at(current_index) |> List.insert_at(insert_at, child)
else
List.replace_at(acc, old_index, child)
List.replace_at(acc, current_index, child)
end

old_index && insert_at ->
acc |> List.delete_at(old_index) |> List.insert_at(insert_at, child)
current_index && insert_at ->
acc |> List.delete_at(current_index) |> List.insert_at(insert_at, child)

!old_index && insert_at ->
!current_index && insert_at ->
List.insert_at(acc, insert_at, child)
end
|> maybe_apply_stream_limit(limit)
end)
end)
|> Enum.reject(fn child ->
Expand Down Expand Up @@ -615,4 +631,10 @@ defmodule Phoenix.LiveViewTest.DOM do
do: Enum.map(values, &normalize_attribute_order/1)

defp normalize_attribute_order(value), do: value

defp maybe_apply_stream_limit(children, nil), do: children

defp maybe_apply_stream_limit(children, limit) do
Enum.take(children, limit)
end
end
1 change: 1 addition & 0 deletions test/e2e/test_helper.exs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ defmodule Phoenix.LiveViewTest.E2E.Router do
live "/stream", Phoenix.LiveViewTest.StreamLive
live "/stream/reset", Phoenix.LiveViewTest.StreamResetLive
live "/stream/reset-lc", Phoenix.LiveViewTest.StreamResetLCLive
live "/stream/limit", Phoenix.LiveViewTest.StreamLimitLive
live "/healthy/:category", Phoenix.LiveViewTest.HealthyLive

live "/upload", Phoenix.LiveViewTest.E2E.UploadLive
Expand Down
Loading
Loading