Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pbm hosting in .NET 4.7.2 fsi (F# 8) of SSIS script task #417

Open
ingted opened this issue Dec 14, 2023 · 4 comments
Open

pbm hosting in .NET 4.7.2 fsi (F# 8) of SSIS script task #417

ingted opened this issue Dec 14, 2023 · 4 comments

Comments

@ingted
Copy link
Contributor

ingted commented Dec 14, 2023

Version of Akka.NET? .net 4.7.2 in SSIS script task
Which Akka.NET Modules? Akka.Hosting &

To Reproduce
Create a HostBuilder and start it via F# fsiEvaluationSession hosted in SSIS 2022 script task (.NET 4.7.2) with .AddPetabridgeCmd

Expected behavior
The pbm client should easily connect pbm host, but not.

Actual behavior
The same F# script would successfully create the (web) service and ActorSystem would bring pbm host started in normal Fsi session.
But if the script is execut ed in a fsiEvaluationSession hosted in SSIS, the ActorSystem would not bring pbm host started with HOCON correctly configured.

Environment
Windows Server 2022/SQL Server 2022 (SSIS)

Additional context
It must to manually start pbm with

let cmd = PetabridgeCmd.Get asys
cmd.Start()
@ingted
Copy link
Contributor Author

ingted commented Dec 14, 2023

I know all the scenario is complex that it is HIGHLY not related to Akka.Hosting... Just want to share this to those similar sympton that some cases the HOCON is not enough, cmd.Start could be the cure.

@ingted
Copy link
Contributor Author

ingted commented Dec 14, 2023

let fsiDict = new System.Collections.Generic.Dictionary<string, obj>()
fsiDict.Add("defSqlConn", @"data source=10.36.205.125,1433;initial catalog=Dynakka;persist security info=True;user id=; password=; MultipleActiveResultSets=True;Trust Server Certificate=True;Max Pool Size=20000")
fsiDict.Add("akkaSysName", @"cluster-system")
fsiDict.Add("akkaHostName", @"10.28.199.143")
fsiDict.Add("akkaPbmPort", @"7000")
fsiDict.Add("akkaJournalTbl", @"journal_new")
fsiDict.Add("akkaMetaTbl", @"metadata_new")
fsiDict.Add("akkaSnapShotTbl", @"snapshot_new")
fsiDict.Add("akkaPersistConn", @"data source=10.28.112.94,1433;initial catalog=AkkaPersistence_MDC_DEV;persist security info=True;user id=; password=; MultipleActiveResultSets=True;Trust Server Certificate=True;Max Pool Size=20000")
fsiDict.Add("akkaShardingName", @"myShard")
fsiDict.Add("akkaRoles", @"")
//module SHARED
open System
open System.Collections.Generic
open System.IO
open System.IO.IsolatedStorage
open System.Reflection
open System.Data.SqlClient
type FsiEvaluationResult() =
    member val IsSuccess = false with get, set
    member val Value = null with get, set
    member val Exception = null with get, set
    member val Diagnostics = List<FsiDiagnostic>() with get, set
and FsiDiagnostic() =
    member val FileName = "" with get, set
    member val StartLine = 0 with get, set
    member val StartColumn = 0 with get, set
    member val EndLine = 0 with get, set
    member val EndColumn = 0 with get, set
    member val RangeFileName = "" with get, set
    member val Severity = 0 with get, set
    member val Message = "" with get, set
    member val Subcategory = "" with get, set
    member val ErrorNumber = 0 with get, set
    member val ErrorNumberPrefix = "" with get, set
    member val ErrorNumberText = "" with get, set
type DynakkaAssembly(id, fnm, nm, b64, bArr) =
    member val Id : int = id with get, set
    member val AssFullName :string = fnm with get, set
    member val AssName : string = nm with get, set
    member val AssB64 : string = b64 with get, set
    member val BArr : byte[] = bArr with get, set
    static member val connection : SqlConnection = null with get, set
    static member val connectionString : string = null with get, set
    static member val locker : obj = obj() with get, set
    static member AdHocFromDatabase (fnm : string) =
        lock DynakkaAssembly.locker (fun () ->
            if DynakkaAssembly.connection = null then
                try
                    DynakkaAssembly.connection <- new SqlConnection(DynakkaAssembly.connectionString)
                with
                | :? ArgumentException as exn when exn.Message.StartsWith("Keyword not supported: 'trust server certificate'") ->
                    DynakkaAssembly.connection <- new SqlConnection(DynakkaAssembly.connectionString.Replace("Trust Server Certificate=True;", ""))
                DynakkaAssembly.connection.Open()
        )
        let query = $"SELECT top 1 * FROM DynakkaAssembly where assFullName = '{fnm}' or assName = '{fnm}'"
        use command = new SqlCommand(query, DynakkaAssembly.connection)
        use reader = command.ExecuteReader()
        let readData () =
            if reader.Read() then
                let dynakka = 
                    DynakkaAssembly(
                        reader.["id"] :?> int
                        , reader.["assFullName"].ToString()
                        , reader.["assName"].ToString()
                        , reader.["assB64"].ToString()
                        , reader.["bArr"] :?> byte[])
                Some dynakka
            else
                None
        readData()
    static member ReadDataFromDatabase (connectionString : string) =
        let dynakkaList = new List<DynakkaAssembly>()
        lock DynakkaAssembly.locker (fun () ->
            if DynakkaAssembly.connection = null then
                try
                    DynakkaAssembly.connection <- new SqlConnection(connectionString)
                with
                | :? ArgumentException as exn when exn.Message.StartsWith("Keyword not supported: 'trust server certificate'") ->
                    DynakkaAssembly.connection <- new SqlConnection(connectionString.Replace("Trust Server Certificate=True;", ""))
                DynakkaAssembly.connection.Open()
        )
            
        let query = "SELECT * FROM DynakkaAssembly where catagory in ('fsi', 'posh')"
        use command = new SqlCommand(query, DynakkaAssembly.connection)
        use reader = command.ExecuteReader()
        while reader.Read() do
            let dynakka = 
                DynakkaAssembly(
                    reader.["id"] :?> int
                    , reader.["assFullName"].ToString()
                    , reader.["assName"].ToString()
                    , reader.["assB64"].ToString()
                    , reader.["bArr"] :?> byte[])
            dynakkaList.Add(dynakka)
        dynakkaList
module Isst =
    let GetIsolatedStoragePhysicalPath (isst : IsolatedStorageFile) =
        if isst = null then ""
        else
            let field = 
                let rd = typeof<IsolatedStorageFile>.GetField("_rootDirectory", BindingFlags.NonPublic ||| BindingFlags.Instance)
                if rd = null then
                    typeof<IsolatedStorageFile>.GetField("m_RootDir", BindingFlags.NonPublic ||| BindingFlags.Instance)
                else
                    rd
            match field with
            | null -> null
            | _ ->
                let rootDir = field.GetValue(isst) :?> string
                rootDir
    let GetIsoFile() =
        try
            let store = IsolatedStorageFile.GetStore(IsolatedStorageScope.Assembly ||| IsolatedStorageScope.Machine, null, null)
#if DEBUG
            Console.WriteLine($"Util IsolatedStoragePhysicalPath: {GetIsolatedStoragePhysicalPath store}")
#endif
            store
        with
        | ex -> 
            Console.WriteLine(ex.ToString())
            null
    let instance = GetIsoFile()
    let physicalPath = GetIsolatedStoragePhysicalPath instance
    let GetBytesFromFile (isst : IsolatedStorageFile) (fileName : string) =
        if isst.FileExists(fileName) then
            use isoStream = new IsolatedStorageFileStream(fileName, FileMode.Open, isst)
            use reader = new BinaryReader(isoStream)
            let length = int isoStream.Length
            reader.ReadBytes length
        else
            null
    let WriteBytesToFile (isst : IsolatedStorageFile) (fileName : string) (data : byte[]) =
        if not (isst.FileExists(fileName)) then
            use isoStream = new IsolatedStorageFileStream(fileName, FileMode.Create, isst)
            use writer = new BinaryWriter(isoStream)
            writer.Write(data)
#if DEBUG
            Console.WriteLine("Data has been written to the file.")
#endif
module Assm =
    let mutable assmFiles = null
    let dict = Dictionary<string, Assembly>()
    let assmFileDict = Dictionary<string, DynakkaAssembly>()
    //let mutable SystemPrivateCoreLib = string fsiDict.["SystemPrivateCoreLib"]
    let mutable defSqlConn = string fsiDict.["defSqlConn"]
    let GetAssemblyFromRDBMS (da : DynakkaAssembly) =
        let fileName = da.AssName + ".dll"
        if dict.ContainsKey(fileName) then
            dict.[fileName]
        else
            if Isst.instance = null then
                try
                    let assmb = Assembly.Load(da.BArr)
                    dict.Add(fileName, assmb)
#if DEBUG
                    Console.WriteLine($"{fileName} loaded!")
#endif
                    assmb
                with
                | _ ->
                    if fileName = "System.Private.CoreLib.dll" then
                        //let aAgain = Assembly.LoadFile(SystemPrivateCoreLib)
                        //dict.Add(fileName, aAgain)
#if DEBUG
                        Console.WriteLine($"{fileName} loaded again!")
#endif
                        //aAgain
                        null
                    else
                        failwith "沒救了"
            else
                if Isst.instance.FileExists(fileName) then
                    let file2Load = $"{Isst.physicalPath}\\{fileName}"
                    try
                        let assmb = Assembly.LoadFile(file2Load)
                        dict.Add(fileName, assmb)
#if DEBUG
                        Console.WriteLine($"{fileName} loaded!")
#endif
                        assmb
                    with
                    | ex ->
#if DEBUG
                        Console.WriteLine($"Load {file2Load} failed! {ex.Message}")
#endif
                        null
                else
                    Isst.WriteBytesToFile Isst.instance fileName da.BArr
                    let file2Load = $"{Isst.physicalPath}\\{fileName}"
                    try
                        let assmb = Assembly.LoadFile(file2Load)
                        dict.Add(fileName, assmb)
#if DEBUG
                        Console.WriteLine($"{fileName} loaded! Length: {da.BArr.Length}")
#endif
                        assmb
                    with
                    | ex ->
#if DEBUG
                        Console.WriteLine($"Load {file2Load} failed! {ex.Message}")
#endif
                        null
    
    let RDBMSResolveEventHandler (sender : obj) (args : ResolveEventArgs) =
        if args.Name <> null then
            if args.Name.StartsWith("FSI-ASSEMBLY") then
                let assmsObj = AppDomain.CurrentDomain.GetAssemblies()
                let where = assmsObj |> Array.filter (fun a -> a.FullName = args.Name)
                if where |> Array.length > 0 then
                    Console.WriteLine(args.RequestingAssembly.FullName)
                    let fst = where.[0]
                    Console.WriteLine(fst.FullName)
                    fst
                else
                    let assms = assmsObj |> Array.map (fun a -> a.FullName)
                    Console.WriteLine(String.Join("\r\n", assms))
                    null
            else
#if DEBUG
                Console.WriteLine($"Util Resolving...{args.Name}")
#endif
                let nm = args.Name.Split(',').[0] + ".dll"
                if dict.ContainsKey(nm) then
                    dict.[nm]
                else
                    if assmFileDict.ContainsKey(nm) then
                        GetAssemblyFromRDBMS assmFileDict.[nm]
                    else
                        match DynakkaAssembly.AdHocFromDatabase(args.Name) with
                        | Some da ->
                            GetAssemblyFromRDBMS da
                        | None ->
                            null
        else
#if DEBUG
            Console.WriteLine($"Util Resolving...{args.RequestingAssembly.FullName}")
#endif
            let nm = args.RequestingAssembly.FullName.Substring(0, args.RequestingAssembly.FullName.IndexOf(",")) + ".dll"
            if dict.ContainsKey(nm) then
                dict.[nm]
            else
                if assmFileDict.ContainsKey(nm) then
                    GetAssemblyFromRDBMS assmFileDict.[nm]
                else
                    match DynakkaAssembly.AdHocFromDatabase(args.Name) with
                    | Some da ->
                        GetAssemblyFromRDBMS da
                    | None ->
                        null
    
    let Init (args : string array) =
        if args.Length > 0 then
            DynakkaAssembly.connectionString <- args.[0]
            assmFiles <- DynakkaAssembly.ReadDataFromDatabase(args.[0])
        else
            DynakkaAssembly.connectionString <- defSqlConn
            assmFiles <- DynakkaAssembly.ReadDataFromDatabase(DynakkaAssembly.connectionString)
        assmFiles
        |> Seq.iter (fun da ->
            GetAssemblyFromRDBMS da |> ignore
            assmFileDict.Add(da.AssName + ".dll", da)
        )
        //if args.Length > 1 then
        //    SystemPrivateCoreLib <- args.[1]
        Console.WriteLine($"Util dict count: {dict.Count}")
        let eh = new ResolveEventHandler(RDBMSResolveEventHandler)
        AppDomain.CurrentDomain.add_AssemblyResolve eh

#I @"C:\ProgramData\IsolatedStorage\mz4o11kq.jdy\qgd4b1ez.pue\StrongName.15qvdbdd4t2fbn4hyk4s5kg2clcrckpe\AssemFiles\"
#r @"C:\ProgramData\IsolatedStorage\mz4o11kq.jdy\qgd4b1ez.pue\StrongName.15qvdbdd4t2fbn4hyk4s5kg2clcrckpe\AssemFiles\\Microsoft.Extensions.Hosting.Abstractions.dll"
#r @"C:\ProgramData\IsolatedStorage\mz4o11kq.jdy\qgd4b1ez.pue\StrongName.15qvdbdd4t2fbn4hyk4s5kg2clcrckpe\AssemFiles\\Microsoft.Extensions.Hosting.dll"
#r @"C:\ProgramData\IsolatedStorage\mz4o11kq.jdy\qgd4b1ez.pue\StrongName.15qvdbdd4t2fbn4hyk4s5kg2clcrckpe\AssemFiles\\Akka.dll"
#r @"C:\ProgramData\IsolatedStorage\mz4o11kq.jdy\qgd4b1ez.pue\StrongName.15qvdbdd4t2fbn4hyk4s5kg2clcrckpe\AssemFiles\\Akka.Hosting.dll"
#r @"C:\ProgramData\IsolatedStorage\mz4o11kq.jdy\qgd4b1ez.pue\StrongName.15qvdbdd4t2fbn4hyk4s5kg2clcrckpe\AssemFiles\\Akka.Cluster.Hosting.dll"
#r @"C:\ProgramData\IsolatedStorage\mz4o11kq.jdy\qgd4b1ez.pue\StrongName.15qvdbdd4t2fbn4hyk4s5kg2clcrckpe\AssemFiles\\Akka.Cluster.dll"
#r @"C:\ProgramData\IsolatedStorage\mz4o11kq.jdy\qgd4b1ez.pue\StrongName.15qvdbdd4t2fbn4hyk4s5kg2clcrckpe\AssemFiles\\Akka.Persistence.Hosting.dll"
#r @"C:\ProgramData\IsolatedStorage\mz4o11kq.jdy\qgd4b1ez.pue\StrongName.15qvdbdd4t2fbn4hyk4s5kg2clcrckpe\AssemFiles\\Akka.Persistence.Sql.Hosting.dll"
#r @"C:\ProgramData\IsolatedStorage\mz4o11kq.jdy\qgd4b1ez.pue\StrongName.15qvdbdd4t2fbn4hyk4s5kg2clcrckpe\AssemFiles\\Akka.Cluster.Sharding.dll"
#r @"C:\ProgramData\IsolatedStorage\mz4o11kq.jdy\qgd4b1ez.pue\StrongName.15qvdbdd4t2fbn4hyk4s5kg2clcrckpe\AssemFiles\\Akkling.dll"
#r @"C:\ProgramData\IsolatedStorage\mz4o11kq.jdy\qgd4b1ez.pue\StrongName.15qvdbdd4t2fbn4hyk4s5kg2clcrckpe\AssemFiles\\Akkling.Cluster.Sharding.dll"
#r @"C:\ProgramData\IsolatedStorage\mz4o11kq.jdy\qgd4b1ez.pue\StrongName.15qvdbdd4t2fbn4hyk4s5kg2clcrckpe\AssemFiles\\Akkling.Persistence.dll"
#r @"C:\ProgramData\IsolatedStorage\mz4o11kq.jdy\qgd4b1ez.pue\StrongName.15qvdbdd4t2fbn4hyk4s5kg2clcrckpe\AssemFiles\\FsPickler.nstd20.dll"
#r @"C:\ProgramData\IsolatedStorage\mz4o11kq.jdy\qgd4b1ez.pue\StrongName.15qvdbdd4t2fbn4hyk4s5kg2clcrckpe\AssemFiles\\Argu.dll"
#r @"C:\Program Files\Microsoft Visual Studio\2022\Community\MSBuild\Current\Bin\System.Collections.Immutable.dll"
#r @"C:\ProgramData\IsolatedStorage\mz4o11kq.jdy\qgd4b1ez.pue\StrongName.15qvdbdd4t2fbn4hyk4s5kg2clcrckpe\AssemFiles\\Microsoft.Data.SqlClient.dll"
#r @"C:\ProgramData\IsolatedStorage\mz4o11kq.jdy\qgd4b1ez.pue\StrongName.15qvdbdd4t2fbn4hyk4s5kg2clcrckpe\AssemFiles\\Microsoft.SqlServer.Types.dll"
#r @"C:\ProgramData\IsolatedStorage\mz4o11kq.jdy\qgd4b1ez.pue\StrongName.15qvdbdd4t2fbn4hyk4s5kg2clcrckpe\AssemFiles\\Akka.Persistence.Sql.dll"
#r @"C:\ProgramData\IsolatedStorage\mz4o11kq.jdy\qgd4b1ez.pue\StrongName.15qvdbdd4t2fbn4hyk4s5kg2clcrckpe\AssemFiles\\Microsoft.Extensions.Configuration.dll"
#r @"C:\ProgramData\IsolatedStorage\mz4o11kq.jdy\qgd4b1ez.pue\StrongName.15qvdbdd4t2fbn4hyk4s5kg2clcrckpe\AssemFiles\\Microsoft.Extensions.Hosting.Abstractions.dll"
#r @"C:\ProgramData\IsolatedStorage\mz4o11kq.jdy\qgd4b1ez.pue\StrongName.15qvdbdd4t2fbn4hyk4s5kg2clcrckpe\AssemFiles\\System.Memory.dll"
#r @"C:\ProgramData\IsolatedStorage\mz4o11kq.jdy\qgd4b1ez.pue\StrongName.15qvdbdd4t2fbn4hyk4s5kg2clcrckpe\AssemFiles\\Microsoft.Extensions.DependencyInjection.Abstractions.dll"
#r @"C:\ProgramData\IsolatedStorage\mz4o11kq.jdy\qgd4b1ez.pue\StrongName.15qvdbdd4t2fbn4hyk4s5kg2clcrckpe\AssemFiles\\Microsoft.Extensions.Logging.dll"
#r @"C:\ProgramData\IsolatedStorage\mz4o11kq.jdy\qgd4b1ez.pue\StrongName.15qvdbdd4t2fbn4hyk4s5kg2clcrckpe\AssemFiles\\Microsoft.Extensions.ObjectPool.dll"
#r @"C:\ProgramData\IsolatedStorage\mz4o11kq.jdy\qgd4b1ez.pue\StrongName.15qvdbdd4t2fbn4hyk4s5kg2clcrckpe\AssemFiles\\System.Threading.Tasks.Extensions.dll"
#r @"C:\ProgramData\IsolatedStorage\mz4o11kq.jdy\qgd4b1ez.pue\StrongName.15qvdbdd4t2fbn4hyk4s5kg2clcrckpe\AssemFiles\\Microsoft.Extensions.Logging.Abstractions.dll"
#r @"C:\ProgramData\IsolatedStorage\mz4o11kq.jdy\qgd4b1ez.pue\StrongName.15qvdbdd4t2fbn4hyk4s5kg2clcrckpe\AssemFiles\\Microsoft.Extensions.Configuration.Abstractions.dll"
#r @"C:\ProgramData\IsolatedStorage\mz4o11kq.jdy\qgd4b1ez.pue\StrongName.15qvdbdd4t2fbn4hyk4s5kg2clcrckpe\AssemFiles\\FParsec.dll"
Assembly.LoadFile @"C:\ProgramData\IsolatedStorage\mz4o11kq.jdy\qgd4b1ez.pue\StrongName.15qvdbdd4t2fbn4hyk4s5kg2clcrckpe\AssemFiles\\FParsec.dll"
#r @"C:\ProgramData\IsolatedStorage\mz4o11kq.jdy\qgd4b1ez.pue\StrongName.15qvdbdd4t2fbn4hyk4s5kg2clcrckpe\AssemFiles\\FParsecCS.dll"
Assembly.LoadFile @"C:\ProgramData\IsolatedStorage\mz4o11kq.jdy\qgd4b1ez.pue\StrongName.15qvdbdd4t2fbn4hyk4s5kg2clcrckpe\AssemFiles\\FParsecCS.dll"
#r @"C:\ProgramData\IsolatedStorage\mz4o11kq.jdy\qgd4b1ez.pue\StrongName.15qvdbdd4t2fbn4hyk4s5kg2clcrckpe\AssemFiles\\Aether.dll"
#r @"C:\ProgramData\IsolatedStorage\mz4o11kq.jdy\qgd4b1ez.pue\StrongName.15qvdbdd4t2fbn4hyk4s5kg2clcrckpe\AssemFiles\\Chiron.nstd20.dll"
#r @"C:\ProgramData\IsolatedStorage\mz4o11kq.jdy\qgd4b1ez.pue\StrongName.15qvdbdd4t2fbn4hyk4s5kg2clcrckpe\AssemFiles\\Akka.Remote.dll"
#r @"C:\ProgramData\IsolatedStorage\mz4o11kq.jdy\qgd4b1ez.pue\StrongName.15qvdbdd4t2fbn4hyk4s5kg2clcrckpe\AssemFiles\\Petabridge.Cmd.Cluster.dll"
Assembly.LoadFile @"C:\ProgramData\IsolatedStorage\mz4o11kq.jdy\qgd4b1ez.pue\StrongName.15qvdbdd4t2fbn4hyk4s5kg2clcrckpe\AssemFiles\\Petabridge.Cmd.Cluster.dll"
#r @"C:\ProgramData\IsolatedStorage\mz4o11kq.jdy\qgd4b1ez.pue\StrongName.15qvdbdd4t2fbn4hyk4s5kg2clcrckpe\AssemFiles\\Petabridge.Cmd.Cluster.Sharding.dll"
Assembly.LoadFile @"C:\ProgramData\IsolatedStorage\mz4o11kq.jdy\qgd4b1ez.pue\StrongName.15qvdbdd4t2fbn4hyk4s5kg2clcrckpe\AssemFiles\\Petabridge.Cmd.Cluster.Sharding.dll"
#r @"C:\ProgramData\IsolatedStorage\mz4o11kq.jdy\qgd4b1ez.pue\StrongName.15qvdbdd4t2fbn4hyk4s5kg2clcrckpe\AssemFiles\\Petabridge.Cmd.Common.dll"
Assembly.LoadFile @"C:\ProgramData\IsolatedStorage\mz4o11kq.jdy\qgd4b1ez.pue\StrongName.15qvdbdd4t2fbn4hyk4s5kg2clcrckpe\AssemFiles\\Petabridge.Cmd.Common.dll"
#r @"C:\ProgramData\IsolatedStorage\mz4o11kq.jdy\qgd4b1ez.pue\StrongName.15qvdbdd4t2fbn4hyk4s5kg2clcrckpe\AssemFiles\\Petabridge.Cmd.Remote.dll"
Assembly.LoadFile @"C:\ProgramData\IsolatedStorage\mz4o11kq.jdy\qgd4b1ez.pue\StrongName.15qvdbdd4t2fbn4hyk4s5kg2clcrckpe\AssemFiles\\Petabridge.Cmd.Remote.dll"
#r @"C:\ProgramData\IsolatedStorage\mz4o11kq.jdy\qgd4b1ez.pue\StrongName.15qvdbdd4t2fbn4hyk4s5kg2clcrckpe\AssemFiles\\Phobos.Actor.Common.dll"
#r @"C:\ProgramData\IsolatedStorage\mz4o11kq.jdy\qgd4b1ez.pue\StrongName.15qvdbdd4t2fbn4hyk4s5kg2clcrckpe\AssemFiles\\Petabridge.Cmd.Host.dll"
Assembly.LoadFile @"C:\ProgramData\IsolatedStorage\mz4o11kq.jdy\qgd4b1ez.pue\StrongName.15qvdbdd4t2fbn4hyk4s5kg2clcrckpe\AssemFiles\\Phobos.Actor.Common.dll"
open Akka.Cluster.Sharding
open Akkling.Cluster.Sharding


open Networking
open IOrm

open Akka.Hosting
open Microsoft.Extensions.DependencyInjection

open Petabridge.Cmd.Host


typeof<AkkaHostingExtensions>.GetMethods()|>Array.filter (fun mi -> mi.Name = "AddHocon")|>fun a -> printfn "AddHocon count: %d" a.Length
let ah = typeof<AkkaHostingExtensions>.GetMethod("AddHocon", [| typeof<AkkaConfigurationBuilder>; typeof<Akka.Configuration.Config>;  typeof<Akka.Hosting.HoconAddMode> |])
printfn "AddHocon: %A" ah
let ahs = typeof<ServiceCollectionHostedServiceExtensions>.GetMethod("AddHostedService", [|typeof<IServiceCollection>|])
printfn "AddHostedService: %A" ahs
let chiron = typeof<Chiron.nstd20.Json>.Assembly
let fparsec = AppDomain.CurrentDomain.GetAssemblies()|>Array.find (fun a -> a.FullName.Split([|','|]).[0] = "FParsec")
open FParsec
//let ac = CharParsers.anyChar
//let run = ac.GetType().Assembly.GetType("FParsec.CharParsers").GetMethod "run"
//let ac = FParsec.CharParsers.anyChar
//let fp = fparsec.GetType().Assembly
let fc = fparsec.GetType "FParsec.CharParsers"
//fc.Name
let run = fc.GetMethod "run"
//run = null
printfn "%A" run
////let rp = fp.GetTypes()|>Array.map _.Name|>Array.sort
Assm.Init([||])
//Assembly.Load "Microsoft.Data.SqlClient"
//let a = Assembly.Load "Microsoft.Data.SqlClient"
//(new FileInfo(a.Location)).DirectoryName
//typeof<executable_statistics>.Name

open Akka.Actor
open Akka.Configuration
open Akka.Hosting
//open Akka.Persistence.Sql.Query
open Akka.Cluster.Sharding
open Akka.Cluster
open Akka.Persistence.Sql.Hosting
open Akkling
open Akkling.Cluster.Sharding
open Akkling.Persistence
open Akka.Serialization
open Akka.Actor
open MBrace.FsPickler.nstd20
open System.Threading.Tasks
open Microsoft.FSharp.Quotations
open Argu
open System.Collections.Immutable

let stdoutloglevel = "WARNING"
let actorSystemName = string fsiDict.["akkaSysName"]
let hostName = string fsiDict.["akkaHostName"]
let seedHostName = 
    if fsiDict.ContainsKey "seedHostName" then
        string fsiDict.["seedHostName"]
    else
        hostName
let hostPort = getFreePort ()
let ifSelfSeed = not (fsiDict.ContainsKey "seedHostPort")
let seedHostPort = 
    if ifSelfSeed then
        string hostPort
    else
        string fsiDict.["seedHostPort"]
let seednode = $"akka.tcp://{actorSystemName}@{seedHostName}:{seedHostPort}"
//let seednode = $"akka.tcp://{actorSystemName}@10.36.205.150:9000"
let rolesList = 
    (string fsiDict.["akkaRoles"]).Split([|','|]) 
    |> Array.append (if ifSelfSeed then [|"SeedNode"|] else [||])
let roles = rolesList |> (fun arr -> String.Join(",", arr))
let journalTbl = string fsiDict.["akkaJournalTbl"]
let metaTbl = string fsiDict.["akkaMetaTbl"]
let ssTbl = string fsiDict.["akkaSnapShotTbl"]
let pbmPort = 
    match string fsiDict.["akkaPbmPort"] with
    | "dyna" -> (getFreePort ()).ToString()
    | s -> s 
let connStrAkkaPersist = (string fsiDict.["akkaPersistConn"])//.Replace("Trust Server Certificate=True;", "")
let appConf = $"""
petabridge.cmd {{
    # default IP address used to listen for incoming petabridge.cmd client connections
    # should be a safe default as it listens on "all network interfaces".
    host = "0.0.0.0"
    # default port number used to listen for incoming petabridge.cmd client connections
    port = {pbmPort}
    # when true, logs all loaded palettes on startup
    log-palettes-on-startup = on
}}
akka {{
    stdout-loglevel = {stdoutloglevel}
    actor {{
        provider = cluster
        serializers {{
                  hyperion = "Akka.Serialization.HyperionSerializer, Akka.Serialization.Hyperion"
                  #akka-cluster-client = "Akka.Cluster.Tools.Client.Serialization.ClusterClientMessageSerializer, Akka.Cluster.Tools"

        }}
        serialization-bindings {{
                #"System.Object" = hyperion #dot-netty.tcp
                "Microsoft.FSharp.Quotations.FSharpExpr, FSharp.Core" = "Expr"
                "Akkling.Cluster.Sharding.ShardEnvelope, Akkling.Cluster.Sharding" = "Expr"
                "Akka.Persistence.Journal.Tagged, Akka.Persistence" = "Expr"

        }}
        serialization-identifiers {{

                "Akka.Serialization.HyperionSerializer, Akka.Serialization.Hyperion" = -5
        }}
    }}
                        
    remote {{
        dot-netty.tcp {{
            hostname = "{hostName}"
            port = {hostPort}
        }}
    }}
    cluster {{
        #will inject this node as a self-seed node at run-time
        seed-nodes = ["{seednode}"] 
        roles = ["pubsub", "ShardNode", {roles}]
        pub-sub {{
            role = "pubsub"
        }}
        sharding {{
            role = "ShardNode"
        }}

    }}
    persistence {{
        journal {{
            plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
            plugin = "akka.persistence.journal.sql"
            sql {{
                class = "Akka.Persistence.Sql.Journal.SqlWriteJournal, Akka.Persistence.Sql"
                event-adapters = {{

                }}
                event-adapter-bindings = {{
                    
                }}
                #connection-string = "{{database-connection-string}}"
                #provider-name = "{{provider-name}}"
                table-mapping = sql-server
                serializer = Expr
                sql-server {{
                    schema-name = dbo
                    journal {{
                        use-writer-uuid-column = false
                        table-name = "{journalTbl}"
                        columns {{
                            ordering = Ordering
                            deleted = IsDeleted
                            persistence-id = PersistenceId
                            sequence-number = SequenceNr
                            created = Timestamp
                            tags = Tags
                            message = Payload
                            identifier = SerializerId
                            manifest = Manifest
                        }}
                    }}
                
                    metadata {{
                        table-name = "{metaTbl}"
                        columns {{
                            persistence-id = PersistenceId
                            sequence-number = SequenceNr
                        }}
                    }}
                }}
                sqlserver = ${{akka.persistence.journal.sql.sql-server}}
            }}
        }}
        query.journal.sql {{
            class = "Akka.Persistence.Sql.Query.SqlReadJournalProvider, Akka.Persistence.Sql"
            #connection-string = "{{database-connection-string}}"
            #provider-name = "{{provider-name}}"
            table-mapping = sql-server
            sql-server = ${{akka.persistence.journal.sql.sql-server}}
        }}
        snapshot-store {{
            plugin = "akka.persistence.snapshot-store.sql"
            sql {{
                class = "Akka.Persistence.Sql.Snapshot.SqlSnapshotStore, Akka.Persistence.Sql"
                #connection-string = "{{database-connection-string}}"
                #provider-name = "{{provider-name}}"
                table-mapping = sql-server
                serializer = Expr
                sql-server {{
                    schema-name = dbo
                    snapshot {{
                        table-name = "{ssTbl}"
                        columns {{
                            persistence-id = PersistenceId
                            sequence-number = SequenceNr
                            created = Timestamp
                            snapshot = Snapshot
                            manifest = Manifest
                            serializerId = SerializerId
                        }}
                    }}
                }}
            }}
        }}
    }}
}}
"""
Environment.SetEnvironmentVariable("SQL_CONNECTION_STR", connStrAkkaPersist)
Environment.SetEnvironmentVariable("SQL_PROVIDER_NAME", "SqlServer.2022")
System.IO.File.WriteAllText(@"c:\assms.txt", System.String.Join("\r\n", AppDomain.CurrentDomain.GetAssemblies()|>Array.sortBy _.FullName|>Array.map (fun a -> try $"{a.FullName} {a.Location}" with | _ -> "")))
let entityFactoryWithRoleSimpleFor2 (system: ActorSystem) (name: string) (roleOpt:string option) (props: Props<'Message>) = //: EntityFac<'Message> =
    let clusterSharding = ClusterSharding.Get(system)
    let adjustedProps = adjustMultiPersistentProps props
    let settings = 
        match roleOpt with
        | Some r ->
            ClusterShardingSettings.Create(system).WithRole(r)
        | None ->
            ClusterShardingSettings.Create(system)
    let shardRegion = clusterSharding.Start(name, adjustedProps.ToProps(), settings, new Cluster.TypedMessageExtractorSimple2<'Message>())
    { ShardRegion = shardRegion; TypeName = name }
let asys = 
    //try
        let host = 
            Infra.NewNode(
                appConf
                , (string fsiDict.["akkaSysName"], FuncHelper.func2 (fun builder options -> 
                                        builder.WithSqlPersistence(
                                            //https://github.com/linq2db/linq2db/blob/master/Source/LinqToDB/ProviderName.cs
                                            connStrAkkaPersist, "SqlServer.2022"
                                        ) |> ignore
                                    ))
                , (string fsiDict.["akkaShardingName"], FuncHelper.func1 (fun name -> Unchecked.defaultof<Props>), Unchecked.defaultof<IMessageExtractor>)
                , (FuncHelper.func1 (fun jReader -> Unchecked.defaultof<Props>))
                , (FuncHelper.func3 (fun asys registry jReader -> 
                    exprJSerializationSupport asys
                    let behavior (ctx : Actor<_>) (msg:obj) = 
                        match msg with
                        | :? Msg as f ->
                            match f with
                            | Ping (str, (id,dt), arr) -> 
                                printfn ".net 472 received: %A, from %A" f (ctx.Self.Path.ToStringWithAddress())
                                //ctx.Sender() <! Pong
                                ctx.Sender() <! Ping ("YYY", (0, DateTime.UtcNow), [|3;3;3;3|])
                            | Pong ->
                                printfn ".net 472 received: %A, from %A" f (ctx.Self.Path.ToStringWithAddress())
                                ctx.Sender() <! Ping ("YYY", (0, DateTime.UtcNow), [|3;3;3;3|])
                        | :? string as "self-test fsi" ->
                            fsiDict.Add ("entityTest", true)
                        | :? string as "self-test2 fsi" ->
                            fsiDict.Add ("shardingTest", true)
                        | _ ->
                            
                            printfn "received: %A" msg
                        Ignore
                    let fac1 = entityFactoryWithRoleSimpleFor2 asys "pfcf.ssis" (Some "") <| props (actorOf2 behavior)
                    let entityRef = fac1.RefFor "2" "1"
   
                    let proxy = spawnShardedProxy (fun (msg:ShardingMsg) -> msg.ShardId, msg.EntityId, msg.Message) asys "pfcf.ssis" (Some "")
                    entityRef <! box "self-test fsi"
                    let msg : ShardingMsg = {
                        ShardId = "1"
                        EntityId = "5"
                        Message = "self-test2 fsi"
                    }
                    proxy <! msg
            
                    entityRef
                    |> box
                    ))
                , (FuncHelper.func5 (fun asys registry iaref props object -> 
                                        printfn "==================== %A ====================" asys.Name
                                    ))
                , (FuncHelper.func2 (fun registerPalette registry -> ()))
                , false
                , true
            )
        host.StartAsync().Wait()
        //|> Async.AwaitTask
        //|> (fun a ->
        //    Async.StartWithContinuations (
        //        a
        //        , (fun () -> 
        //            printfn "host started!"
        //            (host.Services.GetService(typeof<ActorSystem>) :?> ActorSystem).WhenTerminated.Wait()
        //            )
        //        , (fun exn -> printfn "%s" exn.Message)
        //        , (fun cancel -> printfn "canceled: %s" cancel.Message)
        //    )
        //)
        let asys : ActorSystem = unbox <| host.Services.GetService(typeof<ActorSystem>)
        asys
    //with
    //| exn ->
    //    null
let cluster = Akka.Cluster.Cluster.Get asys
fsiDict.Add ("akkaHostPort", box cluster.SelfAddress.Port.Value)
fsiDict.Add ("akkaActorSystem", box asys)
let mbrs : ImmutableSortedSet<Member> = cluster.State.Members
let seedNone : Member seq = mbrs |> Seq.filter (fun mbr -> mbr.Roles.Contains "SeedNode")
if seedNone |> Seq.length > 0 then
    fsiDict.Add ("akkaSeedNodeExisted", box true)    
    let seedNoneMbr = seedNone |> Seq.item 0
    if seedNoneMbr.Address.Port.HasValue then
        if seedNoneMbr.Address.Port.Value = System.Int32.Parse seedHostPort then
            fsiDict.Add ("akkaSeedNodeMbrPortVerified", box true) 
    
    if seedNoneMbr.Address.Host = seedHostName then
        fsiDict.Add ("akkaSeedNodeMbrHostVerified", box true) 
System.IO.File.WriteAllText(@"c:\assms2.txt", System.String.Join("\r\n", AppDomain.CurrentDomain.GetAssemblies()|>Array.sortBy _.FullName|>Array.map (fun a -> try $"{a.FullName} {a.Location}" with | _ -> "")))
asys

@ingted
Copy link
Contributor Author

ingted commented Dec 14, 2023

using System;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Bootstrap.Docker;
using Akka.Cluster.Hosting;
using Akka.Cluster.Sharding;
using Akka.Cluster.Tools.PublishSubscribe;
using Akka.Cluster.Tools.Singleton;
using Akka.Configuration;
using Akka.Hosting;
using LL = Akka.Event.LogLevel;
using Akka.Persistence.Hosting;
using Akka.Persistence.Query;
using Akka.Persistence.Sql;
using Akka.Persistence.Sql.Query;
using Akka.Util;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Petabridge.Cmd.Cluster;
using Petabridge.Cmd.Cluster.Sharding;
using Petabridge.Cmd.Host;
using Petabridge.Cmd.Remote;
using Microsoft.FSharp.Core;

using Akka.Cluster;
using System.Reactive;
using Unit = Microsoft.FSharp.Core.Unit;
using Akka.Logger.NLog;

namespace Akka.Platform
{
    public static partial class Infra
    {
        public delegate void CommandHandler(CommandPaletteHandler h);
        public static IHost NewNode(
            string hoconConf,
            Tuple<string, Func<AkkaConfigurationBuilder, Config, Unit>> addAkka,
            Tuple<string, Func<string, Props>, IMessageExtractor> addSharding,
            Func<SqlReadJournal, Props> singletonPropFun,
            Func<ActorSystem, IActorRegistry, SqlReadJournal, object> clusterInit,
            Func<ActorSystem, IActorRegistry, IActorRef, Props, object, Unit> clusterPostInit,
            Func<CommandHandler, IActorRegistry, Unit> postPbmRegister, bool ifSharding, bool ifSingleton
            )
        {
            var sqlConnectionString = Environment.GetEnvironmentVariable("SQL_CONNECTION_STR")?.Trim();
            if (string.IsNullOrEmpty(sqlConnectionString))
            {
                Console.WriteLine("ERROR! SQL connection string not provided. Can't start.");
            }
            //Console.WriteLine($"Connecting to SQL server at {sqlConnectionString}");

            var sqlProviderName = Environment.GetEnvironmentVariable("SQL_PROVIDER_NAME")?.Trim();
            if (string.IsNullOrEmpty(sqlProviderName))
            {
                Console.WriteLine("ERROR! SQL provider name not provided. Can't start.");
            }
            Console.WriteLine($"Connecting to SQL provider {sqlProviderName}");

            // Need to wait for the SQL server to spin up
            //await Task.Delay(TimeSpan.FromSeconds(15));

            Func<string> hc = () => hoconConf;
            Func<string> rc = () => File.ReadAllText("app.conf");

            var config = (hoconConf != null ? (hoconConf.Trim() != "" ? hc : rc) : rc)();
            var host = 
                new HostBuilder()
                .ConfigureServices((hostContext, services) =>
                {
                    services.AddAkka(addAkka.Item1, configurationBuilder =>
                    {
                        var conf =
                            ConfigurationFactory.ParseString(config)
                            .WithFallback(GetSqlHocon(sqlConnectionString, sqlProviderName))
                            .WithFallback(OpsConfig.GetOpsConfig())
                            .WithFallback(ClusterSharding.DefaultConfig())
                            .WithFallback(DistributedPubSub.DefaultConfig())
                            .WithFallback(SqlPersistence.DefaultConfiguration);

                        //options.AddHocon(conf.BootstrapFromDocker(), HoconAddMode.Prepend)
                        configurationBuilder
                        .AddHocon(conf, HoconAddMode.Prepend)
                        .ConfigureLoggers(setup =>
                        {
                            // Example: This sets the minimum log level
                            setup.LogLevel = LL.WarningLevel;
                            setup.ClearLoggers();

                            // Example: Add the default logger
                            // NOTE: You can also use setup.AddLogger<DefaultLogger>();
                            //setup.AddDefaultLogger();
                            setup.AddLoggerFactory();

                            setup.AddLogger<NLogLogger>();
                        })
                        .WithActors((system, registry) =>
                        {
                            var readJournal = system.ReadJournalFor<SqlReadJournal>(SqlReadJournal.Identifier);

                            var inited = clusterInit.Invoke(system, registry, readJournal);


                            Cluster.Get(system).RegisterOnMemberUp(() =>
                            {
                                var sharding = ClusterSharding.Get(system);

                                var shardRegion =
                                    ifSharding ?
                                    sharding.Start(
                                        addSharding.Item1,
                                        addSharding.Item2,
                                        ClusterShardingSettings.Create(system),
                                        addSharding.Item3) : ActorRefs.Nobody;


                                var singletonProp =
                                    ifSingleton ?
                                        ClusterSingletonManager.Props(
                                            singletonPropFun(readJournal),
                                            ClusterSingletonManagerSettings.Create(
                                                system.Settings.Config.GetConfig("akka.cluster.-singleton"))) : null;

                                clusterPostInit.Invoke(system, registry, shardRegion, singletonProp, inited);
                            });
                        })
                        .AddPetabridgeCmd(cmd =>
                        {
                            void RegisterPalette(CommandPaletteHandler h)
                            {
                                if (cmd.RegisterCommandPalette(h))
                                {
                                    Console.WriteLine("Petabridge.Cmd - Registered {0}", h.Palette.ModuleName);
                                }
                                else
                                {
                                    Console.WriteLine("Petabridge.Cmd - DID NOT REGISTER {0}", h.Palette.ModuleName);
                                }
                            }

                            var actorSystem = cmd.Sys;
                            var actorRegistry = ActorRegistry.For(actorSystem);
                            

                            RegisterPalette(ClusterCommands.Instance);
                            RegisterPalette(new RemoteCommands());
                            RegisterPalette(ClusterShardingCommands.Instance);

                            var regPalette = new CommandHandler(RegisterPalette);

                            postPbmRegister(regPalette, actorRegistry);

                            cmd.Start();
                        });

                        addAkka.Item2.Invoke(configurationBuilder, config);
                    });
                })
                .ConfigureLogging((hostContext, configLogging) =>
                {
                    //Akka.Persistence.Sql.Query.InternalProtocol.QueryOrderingIdsTimerKey 很煩,關掉 
                    //configLogging.AddConsole();
                })
                .UseConsoleLifetime()
                .Build();

            return host;
            //host.Dispose();
            //return 0;
        }
    }
}

@ingted
Copy link
Contributor Author

ingted commented Dec 14, 2023

need to add

let cmd = PetabridgeCmd.Get asys
cmd.Start()

in the F# script (if the script is executed in fsiEvaluationSession)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant