Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Blockstore and other repo building blocks #34

Merged
merged 12 commits into from
Oct 4, 2024
2 changes: 1 addition & 1 deletion .formatter.exs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Used by "mix format"
[
inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"],
locals_without_parens: [from: 2],
locals_without_parens: [from: 2]
]
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ auth.ets
# Cargo build output
/target/

/repos/

# SQLite
pds
pds-shm
Expand All @@ -40,4 +42,4 @@ pds-wal
/target/

#DS_Store
.DS_Store
.DS_Store
3 changes: 2 additions & 1 deletion config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import Config

config :hexpds,
plc_server: "plc.directory",
appview_server: "public.api.bsky.app",
appview_server: "api.bsky.app",
relay_server: "bsky.network",
# ignore pls for now
pds_host: "kawaii.social",
Expand All @@ -28,5 +28,6 @@ config :hexpds, Hexpds.Database,
# Replace with Postgres URL in production!
url: "sqlite3:///pds"


config :mnesia,
dir: ~c".mnesia/#{Mix.env()}/#{node()}"
8 changes: 4 additions & 4 deletions lib/hexpds/auth/session.ex
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,10 @@ defmodule Hexpds.Auth.Session do
{:error, reason} ->
{:error, reason}

json ->
case json["scope"] do
"com.atproto.access" -> if find_r_jwt(jwt), do: json, else: {:error, "Invalid token"}
"com.atproto.refresh" -> if find_a_jwt(jwt), do: json, else: {:error, "Invalid token"}
%{"sub" => sub} = json ->
case sub["scope"] do
"com.atproto.access" -> if find_a_jwt(jwt), do: json, else: {:error, "Invalid token"}
"com.atproto.refresh" -> if find_r_jwt(jwt), do: json, else: {:error, "Invalid token"}
_ -> {:error, "Invalid scope"}
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/hexpds/blob.ex
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ defmodule Hexpds.Blob do
though SQL is fast enough that it doesn't really matter.

(This is not part of the ATP spec, just a weird
hack I added)
hack I added) -- sj
"""
def hash(did, "bafkr" <> _ = cid) do
hash(did, CID.decode_cid!(cid))
Expand Down
54 changes: 25 additions & 29 deletions lib/hexpds/blockstore.ex
Original file line number Diff line number Diff line change
@@ -1,53 +1,49 @@
defmodule Hexpds.BlockStore do
@callback put_block(key :: binary(), value :: binary()) :: :ok | {:error, term()}
@callback put_block(value :: binary()) :: :ok | {:error, term()}
@callback get_block(key :: binary()) :: {:ok, binary()} | {:error, term()}
@callback del_block(key :: binary()) :: :ok | {:error, term()}
end

defmodule BlocksTable do
defmodule Hexpds.BlocksTable do
use Ecto.Schema

schema "blocks" do
field(:key, :string)
field(:value, :binary)
field(:block_cid, :string) # A CID
field(:block_value, :binary) # A Dag-CBOR blob

timestamps()
end
end

defmodule Hexpds.EctoBlockStore do
import Ecto.Query
@behaviour Hexpds.BlockStore

def init(_type, config) do
{:ok, Keyword.put(config, :database, :memory)}
end

def put_block(key, value) do
case Hexpds.Database.get_by(BlocksTable, key: key) do
nil ->
case Hexpds.Database.insert!(%BlocksTable{key: key, value: value}) do
{:ok, _} -> :ok
{:error, _} -> {:error, :insert_failed}
end

{:ok, res} when res == value ->
:ok

{:ok, _} ->
{:error, :different_value}

{:error, _} ->
{:error, :get_failed}
alias Hexpds.{BlockStore, DagCBOR, BlocksTable}
@behaviour BlockStore

@impl BlockStore
def put_block(value) do
cid = Hexpds.Repo.Helpers.term_to_dagcbor_cid(value)
case get_block(cid) do
{:error, :not_found} ->
%BlocksTable{
block_cid: cid,
block_value: DagCBOR.encode!(value)
}
|> Hexpds.User.Sqlite.insert!()
anything_else -> anything_else
end
end

@impl BlockStore
def get_block(key) do
case Hexpds.Database.get_by(BlocksTable, key: key) do
case Hexpds.User.Sqlite.get_by(BlocksTable, block_cid: key) do
nil -> {:error, :not_found}
block -> {:ok, block.value}
%BlocksTable{} = block -> block
end
end

@impl BlockStore
def del_block(key) do
Hexpds.Database.delete_all(from(b in BlocksTable, where: b.key == ^key))
Hexpds.User.Sqlite.delete_all(from(b in BlocksTable, where: b.block_cid == ^key))
end
end
8 changes: 8 additions & 0 deletions lib/hexpds/dagcbor.ex
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
defmodule Hexpds.DagCBOR do
import Hexpds.Helpers
defmodule Internal do
use Rustler, otp_app: :hexpds, crate: "hexpds_dagcbor_internal"
@spec encode_dag_cbor(binary()) :: {:ok, binary()} | {:error, String.t()}
Expand Down Expand Up @@ -33,6 +34,8 @@ defmodule Hexpds.DagCBOR do
with {:ok, json} <- Jason.encode(l), do: encode(json)
end

def! encode(json)

@doc """
Decodes a CBOR binary into a JSON string.
"""
Expand All @@ -53,4 +56,9 @@ defmodule Hexpds.DagCBOR do
def decode(cbor) do
with {:ok, json} <- decode_json(cbor), do: Jason.decode(json)
end

def bytes(bytes) do
"hexpds_dagcbor_bytes" <> Base.encode16(bytes, case: :lower)
end

end
2 changes: 1 addition & 1 deletion lib/hexpds/db/cid.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ defmodule Ecto.Types.Cid do
do:
{:ok,
term
|> CID.encode!(:base32_lower)}
|> Hexpds.Repo.Helpers.cid_string}
end
13 changes: 12 additions & 1 deletion lib/hexpds/db/user.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ defmodule Hexpds.User do
field(:did, :string)
field(:handle, :string)
field(:password_hash, :string)
field(:signing_key, Ecto.Type.ErlangTerm)
field(:signing_key, Ecto.Type.ErlangTerm) # Why store as erlangterms? So that the type of key is automatically encoded with the key
field(:rotation_key, Ecto.Type.ErlangTerm)
field(:data, :map)
end
Expand Down Expand Up @@ -68,5 +68,16 @@ defmodule Hexpds.User do
data: %{"preferences" => %{}}
}
|> tap(&Hexpds.Database.insert/1)
|> tap(&setup_repo_for/1)
end

def setup_repo_for(%__MODULE__{did: did} = u) do
File.mkdir_p!("./repos/#{did}")
Hexpds.User.Sqlite.exec(u,
fn ->
Hexpds.User.Sqlite.Migrations.all()
|> Hexpds.User.Sqlite.migrate()
end
)
end
end
34 changes: 34 additions & 0 deletions lib/hexpds/db/user/sqlite.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
defmodule Hexpds.User.Sqlite do
@moduledoc """
Users' repos will be stored in individual sqlite DBs
"""

use Ecto.Repo,
otp_app: :hexpds,
adapter: Ecto.Adapters.SQLite3

def get_for_user(%Hexpds.User{did: did}) do
repo_path = "repos/#{did}/repo.db"
{:ok, db} = start_link(name: nil, database: repo_path)
db
end


def migrate(migrations) do
order_migrations =
migrations
|> Enum.with_index()
|> Enum.map(fn {m, i} -> {i, m} end)
Ecto.Migrator.run(__MODULE__, order_migrations, :up, all: true, dynamic_repo: get_dynamic_repo())
end

def exec(user, callback) do
repo = get_for_user(user)
try do
put_dynamic_repo(repo)
callback.()
after
Supervisor.stop(repo)
end
end
end
74 changes: 74 additions & 0 deletions lib/hexpds/db/user/sqlite/migrations.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
defmodule Hexpds.User.Sqlite.Migrations.Macros do
defmacro migration(migration_name, do: change_block) do
# need to satisfy my DSL addiction - sj
quote do
@migrations __MODULE__.unquote(migration_name)
defmodule unquote(migration_name) do
use Ecto.Migration

def change do
unquote(change_block)
end
end
end
end
end

defmodule Hexpds.User.Sqlite.Migrations do
Module.register_attribute(__MODULE__, :migrations, accumulate: true)

import __MODULE__.Macros

migration SetupBlockstore do
create table(:blocks) do
add(:block_cid, :string, primary_key: true, null: false)
add(:block_value, :binary, null: false)

timestamps() # Maybe will be helpful if we do special sync APIs
end
end

migration CommitsAndRecords do
create table(:commits) do
add :seq, :integer, primary_key: true, null: false
add :cid, :string, null: false

timestamps() # Probably not strictly necessary, but why not? Better safe than sorry, can easily remove later
end
create table(:records) do
add :record_path, :text, primary_key: true, null: false
add :collection, :string, null: false
add :record_cid, :string, null: false

timestamps() # Will help with sorting for e.g. listRecords
end
end

migration MstNodes do
# Have probably gotten some of this wrong and may need to change implementation
create table(:mst_nodes) do
add :cid, :string, primary_key: true, null: false
add :left, :string, comment: "CID link, optional: link to sub-tree Node on a lower level and with all keys sorting before keys at this node"
add :parent_node_cid, :string
add :depth, :int, null: false
end
create table(:tree_entries) do
add :tree_entry_key, :text, primary_key: true, null: false # This is equivalent to record path
add :parent_node_cid, :string, null: false
add :value, :string, null: false # CID link to the record data (CBOR) for this entry
add :right, :string # link to a sub-tree Node at a lower level which has keys sorting after this TreeEntry's key (to the "right"), but before the next TreeEntry's key in this Node (if any)
end
end

migration SomeUsefulIndices do
# Admittedly I might also be getting this wrong
create index(:tree_entries, :parent_node_cid)
create index(:mst_nodes, :parent_node_cid)
create unique_index(:records, :record_cid)
create index(:records, :collection)
create unique_index(:tree_entries, :value)
create index(:mst_nodes, :depth)
end

def all, do: Enum.reverse(@migrations)
end
16 changes: 16 additions & 0 deletions lib/hexpds/firehose/firehose.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
defmodule Hexpds.Firehose do
defp cbor_concat(header, op) do
[header, op]
|> Enum.map(&Hexpds.DagCBOR.encode!/1)
|> Enum.reverse()
|> Enum.reduce(&<>/2)
end

def error(type, message \\ nil) do
cbor_concat(%{op: -1}, optional_join(%{error: type}, if(message, do: %{message: message})))
end

defp optional_join(map, nil), do: map
defp optional_join(map1, map2), do: Map.merge(map1, map2)

end
26 changes: 26 additions & 0 deletions lib/hexpds/firehose/websocket.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
defmodule Hexpds.Firehose.Websocket do
@behaviour WebSock

@impl WebSock
def init(_params) do
# We can ignore the params for now but we should use them for backfilling later
pid = self()
:syn.join(:firehose, :websockets, pid)
{:ok, nil}
end

@impl WebSock
def handle_info({:firehose_message, bindata}, _) do
{:push, {:binary, bindata}, nil}
end

@impl WebSock
def handle_in(_, _) do
# Ignore incoming messages
{:ok, nil}
end

def push_frame(frame) do
:syn.publish(:firehose, :websockets, {:firehose_message, frame})
end
end
2 changes: 1 addition & 1 deletion lib/hexpds/k256.ex
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ defmodule Hexpds.K256 do
@doc """
Signs a binary message with a Secp256k1 private key. Returns a binary signature.
"""
def sign(%__MODULE__{privkey: privkey}, message)
def sign(%{privkey: privkey}, message)
when is_binary(message) and is_valid_key(privkey) do
with {:ok, sig} <- Hexpds.K256.Internal.sign_message(privkey, message),
{:ok, sig_bytes} <- Base.decode16(sig, case: :lower),
Expand Down
Empty file added lib/hexpds/mst_test.ex
Empty file.
Loading
Loading