From 37a19ae5a6c89d2c40da5733e2bd0c13c100768a Mon Sep 17 00:00:00 2001 From: VioletBuse <44533409+VioletBuse@users.noreply.github.com> Date: Tue, 18 Jun 2024 23:46:27 +0200 Subject: [PATCH] init --- .github/workflows/test.yml | 23 +++++ .gitignore | 4 + README.md | 28 ++++++ gleam.toml | 23 +++++ manifest.toml | 28 ++++++ src/lite_fs.gleam | 194 +++++++++++++++++++++++++++++++++++++ test/lite_fs_test.gleam | 12 +++ 7 files changed, 312 insertions(+) create mode 100644 .github/workflows/test.yml create mode 100644 .gitignore create mode 100644 README.md create mode 100644 gleam.toml create mode 100644 manifest.toml create mode 100644 src/lite_fs.gleam create mode 100644 test/lite_fs_test.gleam diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml new file mode 100644 index 0000000..6ac2397 --- /dev/null +++ b/.github/workflows/test.yml @@ -0,0 +1,23 @@ +name: test + +on: + push: + branches: + - master + - main + pull_request: + +jobs: + test: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: erlef/setup-beam@v1 + with: + otp-version: "26.0.2" + gleam-version: "1.2.1" + rebar3-version: "3" + # elixir-version: "1.15.4" + - run: gleam deps download + - run: gleam test + - run: gleam format --check src test diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..599be4e --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +*.beam +*.ez +/build +erl_crash.dump diff --git a/README.md b/README.md new file mode 100644 index 0000000..88b2be6 --- /dev/null +++ b/README.md @@ -0,0 +1,28 @@ +# lite_fs + +[![Package Version](https://img.shields.io/hexpm/v/lite_fs)](https://hex.pm/packages/lite_fs) +[![Hex Docs](https://img.shields.io/badge/hex-docs-ffaff3)](https://hexdocs.pm/lite_fs/) + +```sh +gleam add lite_fs +``` +```gleam +import lite_fs +import gleam/io + +pub fn main() { + // localhost:20202/events + // print events to console when you get them + lite_fs.start(port: 20202, with: io.debug) +} +``` + +Further documentation can be found at . + +## Development + +```sh +gleam run # Run the project +gleam test # Run the tests +gleam shell # Run an Erlang shell +``` diff --git a/gleam.toml b/gleam.toml new file mode 100644 index 0000000..e108286 --- /dev/null +++ b/gleam.toml @@ -0,0 +1,23 @@ +name = "lite_fs" +version = "1.0.0" + +# Fill out these fields if you intend to generate HTML documentation or publish +# your project to the Hex package manager. +# +description = "litefs event stream client for gleam." +licences = ["Apache-2.0"] +repository = { type = "github", user = "VioletBuse", repo = "gleam_litefs" } +# links = [{ title = "Website", href = "https://gleam.run" }] +# +# For a full reference of all the available options, you can have a look at +# https://gleam.run/writing-gleam/gleam-toml/. + +[dependencies] +gleam_stdlib = ">= 0.34.0 and < 2.0.0" +httpp = ">= 1.1.0 and < 2.0.0" +jackson = ">= 1.2.0 and < 2.0.0" +gleam_http = ">= 3.6.0 and < 4.0.0" +gleam_erlang = ">= 0.25.0 and < 1.0.0" + +[dev-dependencies] +gleeunit = ">= 1.0.0 and < 2.0.0" diff --git a/manifest.toml b/manifest.toml new file mode 100644 index 0000000..d9e0c57 --- /dev/null +++ b/manifest.toml @@ -0,0 +1,28 @@ +# This file was generated by Gleam +# You typically do not need to edit this file + +packages = [ + { name = "certifi", version = "2.12.0", build_tools = ["rebar3"], requirements = [], otp_app = "certifi", source = "hex", outer_checksum = "EE68D85DF22E554040CDB4BE100F33873AC6051387BAF6A8F6CE82272340FF1C" }, + { name = "gleam_erlang", version = "0.25.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_erlang", source = "hex", outer_checksum = "054D571A7092D2A9727B3E5D183B7507DAB0DA41556EC9133606F09C15497373" }, + { name = "gleam_http", version = "3.6.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_http", source = "hex", outer_checksum = "8C07DF9DF8CC7F054C650839A51C30A7D3C26482AC241C899C1CEA86B22DBE51" }, + { name = "gleam_otp", version = "0.10.0", build_tools = ["gleam"], requirements = ["gleam_erlang", "gleam_stdlib"], otp_app = "gleam_otp", source = "hex", outer_checksum = "0B04FE915ACECE539B317F9652CAADBBC0F000184D586AAAF2D94C100945D72B" }, + { name = "gleam_stdlib", version = "0.38.0", build_tools = ["gleam"], requirements = [], otp_app = "gleam_stdlib", source = "hex", outer_checksum = "663CF11861179AF415A625307447775C09404E752FF99A24E2057C835319F1BE" }, + { name = "gleeunit", version = "1.1.2", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleeunit", source = "hex", outer_checksum = "72CDC3D3F719478F26C4E2C5FED3E657AC81EC14A47D2D2DEBB8693CA3220C3B" }, + { name = "hackney", version = "1.20.1", build_tools = ["rebar3"], requirements = ["certifi", "idna", "metrics", "mimerl", "parse_trans", "ssl_verify_fun", "unicode_util_compat"], otp_app = "hackney", source = "hex", outer_checksum = "FE9094E5F1A2A2C0A7D10918FEE36BFEC0EC2A979994CFF8CFE8058CD9AF38E3" }, + { name = "httpp", version = "1.1.0", build_tools = ["gleam"], requirements = ["gleam_erlang", "gleam_http", "gleam_otp", "gleam_stdlib", "hackney"], otp_app = "httpp", source = "hex", outer_checksum = "66E6744054539734B5C769C50873198E36DDFA9A20F9BACD9590888C042FC020" }, + { name = "idna", version = "6.1.1", build_tools = ["rebar3"], requirements = ["unicode_util_compat"], otp_app = "idna", source = "hex", outer_checksum = "92376EB7894412ED19AC475E4A86F7B413C1B9FBB5BD16DCCD57934157944CEA" }, + { name = "jackson", version = "1.2.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "jackson", source = "hex", outer_checksum = "4D7F81FE3C46F83D201E8985564E3D986526FEFA5783CDC206A3BFAF1D89B27D" }, + { name = "metrics", version = "1.0.1", build_tools = ["rebar3"], requirements = [], otp_app = "metrics", source = "hex", outer_checksum = "69B09ADDDC4F74A40716AE54D140F93BEB0FB8978D8636EADED0C31B6F099F16" }, + { name = "mimerl", version = "1.3.0", build_tools = ["rebar3"], requirements = [], otp_app = "mimerl", source = "hex", outer_checksum = "A1E15A50D1887217DE95F0B9B0793E32853F7C258A5CD227650889B38839FE9D" }, + { name = "parse_trans", version = "3.4.1", build_tools = ["rebar3"], requirements = [], otp_app = "parse_trans", source = "hex", outer_checksum = "620A406CE75DADA827B82E453C19CF06776BE266F5A67CFF34E1EF2CBB60E49A" }, + { name = "ssl_verify_fun", version = "1.1.7", build_tools = ["mix", "rebar3", "make"], requirements = [], otp_app = "ssl_verify_fun", source = "hex", outer_checksum = "FE4C190E8F37401D30167C8C405EDA19469F34577987C76DDE613E838BBC67F8" }, + { name = "unicode_util_compat", version = "0.7.0", build_tools = ["rebar3"], requirements = [], otp_app = "unicode_util_compat", source = "hex", outer_checksum = "25EEE6D67DF61960CF6A794239566599B09E17E668D3700247BC498638152521" }, +] + +[requirements] +gleam_erlang = { version = ">= 0.25.0 and < 1.0.0"} +gleam_http = { version = ">= 3.6.0 and < 4.0.0" } +gleam_stdlib = { version = ">= 0.34.0 and < 2.0.0" } +gleeunit = { version = ">= 1.0.0 and < 2.0.0" } +httpp = { version = ">= 1.1.0 and < 2.0.0" } +jackson = { version = ">= 1.2.0 and < 2.0.0" } diff --git a/src/lite_fs.gleam b/src/lite_fs.gleam new file mode 100644 index 0000000..3559ac5 --- /dev/null +++ b/src/lite_fs.gleam @@ -0,0 +1,194 @@ +import gleam/bit_array +import gleam/bool +import gleam/bytes_builder.{type BytesBuilder} +import gleam/dynamic +import gleam/erlang/process +import gleam/http.{Get} +import gleam/http/request.{type Request, Request} +import gleam/int +import gleam/option.{None, Some} +import gleam/result +import gleam/string +import httpp/streaming +import jackson + +pub type Message { + Shutdown +} + +fn build_request(port: Int) -> Request(BytesBuilder) { + Request( + method: Get, + headers: [#("connection", "keep-alive")], + body: bytes_builder.new(), + scheme: http.Http, + host: "localhost", + port: Some(port), + path: "/events", + query: None, + ) +} + +fn create_on_data(callback: fn(Event) -> Nil) { + fn(msg: streaming.Message, _, _) { + case msg { + streaming.Done -> Error(process.Normal) + streaming.Bits(bits) -> { + let evt = { + use str <- result.try(bit_array.to_string(bits) |> result.nil_error) + use event <- result.try(parse_event(string.trim(str))) + + Ok(event) + } + + case evt, bit_array.to_string(bits) { + Error(_), Ok(evt_string) -> + Error(process.Abnormal("unable to parse event " <> evt_string)) + Error(_), _ -> + Error(process.Abnormal( + "received bits from server that could not be parsed into string", + )) + Ok(event), _ -> { + callback(event) + Ok(Nil) + } + } + } + } + } +} + +fn on_message(message: Message, _, _) { + case message { + Shutdown -> Error(process.Normal) + } +} + +fn on_error(_, _, _) { + Error(process.Abnormal("error received")) +} + +fn build_request_handler(port: Int, on_event_callback: fn(Event) -> Nil) { + let req = build_request(port) + + streaming.StreamingRequestHandler( + initial_state: Nil, + req: req, + on_data: create_on_data(on_event_callback), + on_message: on_message, + on_error: on_error, + initial_response_timeout: 10_000, + ) +} + +/// start the event listener +pub fn start(port port: Int, with callback: fn(Event) -> Nil) { + build_request_handler(port, callback) + |> streaming.start +} + +pub type Primary { + Local + Remote(hostname: String) +} + +/// The different types of events that you can get from the event stream +pub type Event { + Init(primary_data: Primary) + Tx( + db: String, + tx_id: Int, + post_apply_checksum: Int, + commit: Int, + page_size: Int, + timestamp: String, + ) + PrimaryChange(primary_data: Primary) +} + +fn parse_event(json: String) -> Result(Event, Nil) { + use parsed_json <- result.try(jackson.parse(json) |> result.nil_error) + use resolved_type <- result.try(jackson.resolve_pointer(parsed_json, "/type")) + use event_type <- result.try( + jackson.decode(resolved_type, dynamic.string) |> result.nil_error, + ) + + case event_type { + "init" -> parse_primary(json, Init) + "primaryChange" -> parse_primary(json, PrimaryChange) + "tx" -> parse_tx_event(json) + _ -> Error(Nil) + } +} + +fn parse_primary( + event: String, + constructor: fn(Primary) -> Event, +) -> Result(Event, Nil) { + use parsed_json <- result.try(jackson.parse(event) |> result.nil_error) + use resolved_is_primary <- result.try(jackson.resolve_pointer( + parsed_json, + "/data/is_primary", + )) + use is_primary <- result.try( + jackson.decode(resolved_is_primary, dynamic.bool) |> result.nil_error, + ) + + use <- bool.guard(when: is_primary, return: Ok(constructor(Local))) + + use resolved_hostname <- result.try(jackson.resolve_pointer( + parsed_json, + "/data/hostname", + )) + use hostname <- result.try( + jackson.decode(resolved_hostname, dynamic.string) |> result.nil_error, + ) + + Ok(constructor(Remote(hostname))) +} + +type TxData { + TxData( + txid: String, + checksum: String, + page_size: Int, + commit: Int, + timestamp: String, + ) +} + +fn parse_tx_event(event: String) -> Result(Event, Nil) { + use json <- result.try(jackson.parse(event) |> result.nil_error) + use db_resolved <- result.try(jackson.resolve_pointer(json, "/db")) + use db <- result.try( + jackson.decode(db_resolved, dynamic.string) |> result.nil_error, + ) + use data <- result.try(jackson.resolve_pointer(json, "/data")) + + use raw_data <- result.try( + jackson.decode( + data, + dynamic.decode5( + TxData, + dynamic.field("txId", dynamic.string), + dynamic.field("postApplyChecksum", dynamic.string), + dynamic.field("pageSize", dynamic.int), + dynamic.field("commit", dynamic.int), + dynamic.field("timestamp", dynamic.string), + ), + ) + |> result.nil_error, + ) + + use tx_id <- result.try(int.base_parse(raw_data.txid, 16)) + use checksum <- result.try(int.base_parse(raw_data.checksum, 16)) + + Ok(Tx( + db, + tx_id, + checksum, + raw_data.page_size, + raw_data.commit, + raw_data.timestamp, + )) +} diff --git a/test/lite_fs_test.gleam b/test/lite_fs_test.gleam new file mode 100644 index 0000000..3831e7a --- /dev/null +++ b/test/lite_fs_test.gleam @@ -0,0 +1,12 @@ +import gleeunit +import gleeunit/should + +pub fn main() { + gleeunit.main() +} + +// gleeunit test functions end in `_test` +pub fn hello_world_test() { + 1 + |> should.equal(1) +}