Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use snapshots when initiating github sync #2273

Merged
merged 4 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ and this project adheres to

### Changed

- Use snapshots when initiating Github Sync
[#1827](https://github.com/OpenFn/lightning/issues/1827)

### Fixed

## [v2.7.11] - 2024-07-26
Expand Down
66 changes: 38 additions & 28 deletions lib/lightning/export_utils.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ defmodule Lightning.ExportUtils do
"""

alias Lightning.Projects
alias Lightning.Repo
alias Lightning.Workflows
alias Lightning.Workflows.Snapshot

defp hyphenate(string) when is_binary(string) do
string |> String.replace(" ", "-")
Expand Down Expand Up @@ -41,36 +41,38 @@ defmodule Lightning.ExportUtils do
else: base
end

defp edge_to_treenode(%{source_job_id: nil} = edge, triggers) do
edge = Repo.preload(edge, [:source_trigger, :target_job])
trigger_name = edge.source_trigger.type |> Atom.to_string()
target_job = edge.target_job.name |> hyphenate()
defp edge_to_treenode(%{source_job_id: nil} = edge, triggers, jobs) do
source_trigger =
Enum.find(triggers, fn t -> t.id == edge.source_trigger_id end)

target_job = Enum.find(jobs, fn j -> j.id == edge.target_job_id end)
trigger_name = to_string(source_trigger.type)
target_job_name = hyphenate(target_job.name)

%{
name: "#{trigger_name}->#{target_job}",
source_trigger: find_trigger_name(edge, triggers)
name: "#{trigger_name}->#{target_job_name}",
source_trigger: trigger_name
}
|> merge_edge_common_fields(edge)
|> merge_edge_common_fields(edge, target_job)
end

defp edge_to_treenode(%{source_trigger_id: nil} = edge, _unused_triggers) do
edge = Repo.preload(edge, [:source_job, :target_job])
source_job = edge.source_job.name |> hyphenate()
target_job = edge.target_job.name |> hyphenate()
defp edge_to_treenode(%{source_trigger_id: nil} = edge, _triggers, jobs) do
target_job = Enum.find(jobs, fn j -> j.id == edge.target_job_id end)
source_job = Enum.find(jobs, fn j -> j.id == edge.source_job_id end)
source_job_name = hyphenate(source_job.name)
target_job_name = hyphenate(target_job.name)

%{
name: "#{source_job}->#{target_job}",
source_job: source_job
name: "#{source_job_name}->#{target_job_name}",
source_job: source_job_name
}
|> merge_edge_common_fields(edge)
|> merge_edge_common_fields(edge, target_job)
end

defp merge_edge_common_fields(json, edge) do
target_job = edge.target_job.name |> hyphenate()

defp merge_edge_common_fields(json, edge, target_job) do
json
|> Map.merge(%{
target_job: target_job,
target_job: hyphenate(target_job.name),
condition_type: edge.condition_type |> Atom.to_string(),
enabled: edge.enabled,
node_type: :edge
Expand All @@ -87,12 +89,6 @@ defmodule Lightning.ExportUtils do
end)
end

defp find_trigger_name(edge, triggers) do
[trigger] = Enum.filter(triggers, fn t -> t.id == edge.source_trigger_id end)

trigger.name
end

defp pick_and_sort(map) do
ordering_map = %{
project: [:name, :description, :credentials, :globals, :workflows],
Expand Down Expand Up @@ -229,17 +225,21 @@ defmodule Lightning.ExportUtils do
edges =
workflow.edges
|> Enum.sort_by(& &1.inserted_at, NaiveDateTime)
|> Enum.map(fn e -> edge_to_treenode(e, triggers) end)
|> Enum.map(fn e ->
edge_to_treenode(e, workflow.triggers, workflow.jobs)
end)

flow_map = %{jobs: jobs, edges: edges, triggers: triggers}

flow_map
|> to_workflow_yaml_tree(workflow)
end

def generate_new_yaml(project_id) do
project = Projects.get_project!(project_id)
@spec generate_new_yaml(Projects.Project.t(), [Snapshot.t()] | nil) ::
{:ok, binary()}
def generate_new_yaml(project, snapshots \\ nil)

def generate_new_yaml(project, nil) do
yaml =
project
|> Workflows.get_workflows_for()
Expand All @@ -248,4 +248,14 @@ defmodule Lightning.ExportUtils do

{:ok, yaml}
end

def generate_new_yaml(project, snapshots) when is_list(snapshots) do
yaml =
snapshots
|> Enum.sort_by(& &1.name)
|> build_yaml_tree(project)
|> to_new_yaml()

{:ok, yaml}
end
end
13 changes: 9 additions & 4 deletions lib/lightning/projects.ex
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ defmodule Lightning.Projects do
alias Lightning.Workflows.Job
alias Lightning.Workflows.Trigger
alias Lightning.Workflows.Workflow
alias Lightning.Workflows.Snapshot
alias Lightning.WorkOrder

require Logger
Expand Down Expand Up @@ -558,11 +559,15 @@ defmodule Lightning.Projects do
{:ok, string}

"""
@spec export_project(:yaml, any) :: {:ok, binary}
def export_project(:yaml, project_id) do
{:ok, yaml} = ExportUtils.generate_new_yaml(project_id)
@spec export_project(atom(), Ecto.UUID.t(), [Ecto.UUID.t()] | nil) ::
{:ok, binary}
def export_project(:yaml, project_id, snapshot_ids \\ nil) do
project = get_project!(project_id)

{:ok, yaml}
snapshots =
if snapshot_ids, do: Snapshot.get_all_by_ids(snapshot_ids), else: nil

{:ok, _yaml} = ExportUtils.generate_new_yaml(project, snapshots)
end

@doc """
Expand Down
11 changes: 9 additions & 2 deletions lib/lightning/projects/provisioner.ex
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,11 @@ defmodule Lightning.Projects.Provisioner do

Exclude deleted workflows.
"""
@spec preload_dependencies(Project.t()) :: Project.t()
def preload_dependencies(project) do
@spec preload_dependencies(Project.t(), nil | [Ecto.UUID.t(), ...]) ::
Project.t()
def preload_dependencies(project, snapshots \\ nil)

def preload_dependencies(project, nil) do
w = from(w in Workflow, where: is_nil(w.deleted_at))

Repo.preload(
Expand All @@ -146,6 +149,10 @@ defmodule Lightning.Projects.Provisioner do
)
end

def preload_dependencies(project, snapshots) when is_list(snapshots) do
%{project | workflows: Snapshot.get_all_by_ids(snapshots)}
end

defp project_changeset(project, attrs) do
project
|> cast(attrs, [:id, :name, :description])
Expand Down
50 changes: 42 additions & 8 deletions lib/lightning/version_control/version_control.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ defmodule Lightning.VersionControl do
alias Lightning.VersionControl.GithubError
alias Lightning.VersionControl.ProjectRepoConnection
alias Lightning.VersionControl.VersionControlUsageLimiter
alias Lightning.Workflows.Snapshot
alias Lightning.Workflows.Workflow

defdelegate subscribe(user), to: Events

Expand Down Expand Up @@ -130,6 +132,8 @@ defmodule Lightning.VersionControl do
VersionControlUsageLimiter.limit_github_sync(
repo_connection.project_id
),
snapshots <-
list_or_create_snapshots_for_project(repo_connection.project_id),
{:ok, client} <-
GithubClient.build_installation_client(
repo_connection.github_installation_id
Expand All @@ -143,20 +147,50 @@ defmodule Lightning.VersionControl do
pull_yml_target_path() |> Path.basename(),
%{
ref: default_branch,
inputs: %{
projectId: repo_connection.project_id,
apiSecretName: api_secret_name(repo_connection),
pathToConfig: config_target_path(repo_connection),
branch: repo_connection.branch,
commitMessage:
"user #{user_email} initiated a sync from Lightning"
}
inputs:
%{
projectId: repo_connection.project_id,
apiSecretName: api_secret_name(repo_connection),
pathToConfig: config_target_path(repo_connection),
branch: repo_connection.branch,
commitMessage:
"user #{user_email} initiated a sync from Lightning"
}
|> maybe_add_snapshots(snapshots)
}
) do
:ok
end
end

defp list_or_create_snapshots_for_project(project_id) do
current_query =
from w in Workflow,
left_join: s in assoc(w, :snapshots),
on: s.lock_version == w.lock_version,
where: w.project_id == ^project_id and is_nil(w.deleted_at),
select: {w, s.id}

workflows = Repo.all(current_query)

Enum.reduce(workflows, [], fn {workflow, snapshot_id}, acc ->
if is_nil(snapshot_id) do
{:ok, snapshot} = Snapshot.get_or_create_latest_for(workflow)
[snapshot.id | acc]
else
[snapshot_id | acc]
end
end)
end

defp maybe_add_snapshots(inputs, snapshot_ids) do
if Enum.empty?(snapshot_ids) do
inputs
else
Map.put(inputs, :snapshots, Enum.join(snapshot_ids, " "))
end
end

def fetch_user_installations(user) do
with {:ok, access_token} <- fetch_user_access_token(user),
{:ok, client} <- GithubClient.build_bearer_client(access_token) do
Expand Down
7 changes: 7 additions & 0 deletions lib/lightning/workflows/snapshot.ex
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,13 @@ defmodule Lightning.Workflows.Snapshot do
|> Repo.all()
end

def get_all_by_ids(ids) do
from(s in __MODULE__,
where: s.id in ^ids
)
|> Repo.all()
end

@doc """
Get the latest snapshot for a workflow, based on the lock_version.

Expand Down
7 changes: 4 additions & 3 deletions lib/lightning_web/controllers/api/provisioning_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ defmodule LightningWeb.API.ProvisioningController do
conn.assigns.current_resource,
project
),
project <- Provisioner.preload_dependencies(project) do
project <-
Provisioner.preload_dependencies(project, params["snapshots"]) do
conn
|> put_status(:ok)
|> render("create.json", project: project)
Expand All @@ -78,7 +79,7 @@ defmodule LightningWeb.API.ProvisioningController do
Returns a description of the project as yaml. Same as the export project to
yaml button (see Downloads Controller) but made for the API.
"""
def show_yaml(conn, %{"id" => id}) do
def show_yaml(conn, %{"id" => id} = params) do
with %Projects.Project{} = project <-
Projects.get_project(id) || {:error, :not_found},
:ok <-
Expand All @@ -88,7 +89,7 @@ defmodule LightningWeb.API.ProvisioningController do
conn.assigns.current_resource,
project
) do
{:ok, yaml} = Projects.export_project(:yaml, id)
{:ok, yaml} = Projects.export_project(:yaml, id, params["snapshots"])

conn
|> put_resp_content_type("text/yaml")
Expand Down
35 changes: 23 additions & 12 deletions lib/lightning_web/controllers/api/provisioning_json.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ defmodule LightningWeb.API.ProvisioningJSON do
alias Lightning.Projects.Project
alias Lightning.Workflows.Edge
alias Lightning.Workflows.Job
alias Lightning.Workflows.Snapshot
alias Lightning.Workflows.Trigger
alias Lightning.Workflows.Workflow

Expand All @@ -21,47 +22,57 @@ defmodule LightningWeb.API.ProvisioningJSON do
def as_json(%Project{} = project) do
Ecto.embedded_dump(project, :json)
|> Map.put(
"workflows",
:workflows,
project.workflows
|> Enum.sort_by(& &1.inserted_at, NaiveDateTime)
|> Enum.map(&as_json/1)
)
end

def as_json(%Workflow{} = workflow) do
Ecto.embedded_dump(workflow, :json)
def as_json(%module{} = workflow_or_snapshot)
when module in [Workflow, Snapshot] do
workflow_id =
if module == Workflow do
workflow_or_snapshot.id
else
workflow_or_snapshot.workflow_id
end

workflow_or_snapshot
|> Ecto.embedded_dump(:json)
|> Map.put(:id, workflow_id)
|> Map.put(
"jobs",
workflow.jobs
:jobs,
workflow_or_snapshot.jobs
|> Enum.sort_by(& &1.inserted_at, NaiveDateTime)
|> Enum.map(&as_json/1)
)
|> Map.put(
"triggers",
workflow.triggers
:triggers,
workflow_or_snapshot.triggers
|> Enum.sort_by(& &1.inserted_at, NaiveDateTime)
|> Enum.map(&as_json/1)
)
|> Map.put(
"edges",
workflow.edges
:edges,
workflow_or_snapshot.edges
|> Enum.sort_by(& &1.inserted_at, NaiveDateTime)
|> Enum.map(&as_json/1)
)
end

def as_json(%Job{} = job) do
def as_json(%module{} = job) when module in [Job, Snapshot.Job] do
Ecto.embedded_dump(job, :json)
|> Map.take(~w(id adaptor body name)a)
end

def as_json(%Trigger{} = trigger) do
def as_json(%module{} = trigger) when module in [Trigger, Snapshot.Trigger] do
Ecto.embedded_dump(trigger, :json)
|> Map.take(~w(id type cron_expression enabled)a)
|> drop_keys_with_nil_value()
end

def as_json(%Edge{} = edge) do
def as_json(%module{} = edge) when module in [Edge, Snapshot.Edge] do
Ecto.embedded_dump(edge, :json)
|> Map.take(~w(
id enabled source_job_id source_trigger_id
Expand Down
Loading