Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Telegram content proxy #163

Merged
merged 23 commits into from
Aug 28, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
9eb8f30
(#102) Web: add a dependency on Telegram link resolver
ForNeVeR Aug 6, 2022
afbe2f8
(#102) ContentProxy: add a FileCache
ForNeVeR Aug 7, 2022
800546a
(#102) ContentProxy: finally, make it compile
ForNeVeR Aug 20, 2022
3e7b422
(#102) FileCacheTests: preliminary test API
ForNeVeR Aug 20, 2022
97db22a
(#102) TestFramework: extract the code from TestUtils
ForNeVeR Aug 20, 2022
4d6bfc5
(#102) ContentProxy: finish working FileCache
ForNeVeR Aug 21, 2022
a5e26a1
(#102) FileCacheTests: implement an ordering test
ForNeVeR Aug 25, 2022
6ea4892
(#102) FileCache: cache directory validation tests
ForNeVeR Aug 25, 2022
cb218b6
(#102) FileCache: additional tests
ForNeVeR Aug 27, 2022
b50d615
(#102) FileCache: finish the last tests
ForNeVeR Aug 27, 2022
9100471
(#102) ContentController: test redirect mode
ForNeVeR Aug 27, 2022
107c4be
(#102) ContentController: last test groundwork
ForNeVeR Aug 27, 2022
e8e8153
(#102) FileCache: async stream optimization
ForNeVeR Aug 28, 2022
067da2d
(#102) ContentController: add last tests
ForNeVeR Aug 28, 2022
9292428
(#102) ContentController: make it work in manual tests
ForNeVeR Aug 28, 2022
b02512c
(#102) ContentProxy: some small fixes
ForNeVeR Aug 28, 2022
5d954d6
(#102) ContentProxy: add file names and MIME types
ForNeVeR Aug 28, 2022
3977248
(#102) FileCache: support older versions of Windows
ForNeVeR Aug 28, 2022
fb5dc3a
Docs: a slight improvement
ForNeVeR Aug 28, 2022
a58f54e
(#102) FileCache: drop redundant rec
ForNeVeR Aug 28, 2022
b2cccee
(#102) FileCache: improve the workarounds for the older versions of W…
ForNeVeR Aug 28, 2022
2861ee8
(#102) ContentProxy: redesign the attribute optionality
ForNeVeR Aug 28, 2022
7936682
(#102) Settings: update the example
ForNeVeR Aug 28, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Emulsion.ContentProxy/Emulsion.ContentProxy.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,19 @@
<ItemGroup>
<Compile Include="Proxy.fs" />
<Compile Include="ContentStorage.fs" />
<Compile Include="FileCache.fs" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Emulsion.Database\Emulsion.Database.fsproj" />
<ProjectReference Include="..\Emulsion.Settings\Emulsion.Settings.fsproj" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Hashids.net" Version="1.4.1" />
<PackageReference Include="Microsoft.Extensions.Http" Version="6.0.0" />
<PackageReference Include="Serilog" Version="2.10.0" />
<PackageReference Include="SimpleBase" Version="3.1.0" />
</ItemGroup>

</Project>
164 changes: 164 additions & 0 deletions Emulsion.ContentProxy/FileCache.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
namespace Emulsion.ContentProxy

open System
open System.IO
open System.Net.Http
open System.Security.Cryptography
open System.Text
open System.Threading

open Serilog
open SimpleBase

open Emulsion.Settings

type DownloadRequest = {
Uri: Uri
CacheKey: string
Size: uint64
}

module Base58 =
/// Suggested by @ttldtor.
let M4N71KR = Base58(Base58Alphabet "123456789qwertyuiopasdfghjkzxcvbnmQWERTYUPASDFGHJKLZXCVBNM")

module FileCache =
let EncodeFileName(sha256: SHA256, cacheKey: string): string =
cacheKey
|> Encoding.UTF8.GetBytes
|> sha256.ComputeHash
|> Base58.M4N71KR.Encode

let DecodeFileNameToSha256Hash(fileName: string): byte[] =
(Base58.M4N71KR.Decode fileName).ToArray()

type FileCache(logger: ILogger,
settings: FileCacheSettings,
httpClientFactory: IHttpClientFactory,
sha256: SHA256) =

let getFilePath(cacheKey: string) =
Path.Combine(settings.Directory, FileCache.EncodeFileName(sha256, cacheKey))

let getFromCache(cacheKey: string) = async {
let path = getFilePath cacheKey
return
if File.Exists path then
Some(new FileStream(path, FileMode.Open, FileAccess.Read, FileShare.Read|||FileShare.Delete))
else
None
}

let assertCacheValid() = async {
Directory.EnumerateFileSystemEntries settings.Directory
|> Seq.iter(fun entry ->
let entryName = Path.GetFileName entry

if not <| File.Exists entry
then failwith $"Cache directory invalid: contains a subdirectory: \"{entryName}\"."

let hash = FileCache.DecodeFileNameToSha256Hash entryName
if hash.Length <> sha256.HashSize / 8
then failwith (
$"Cache directory invalid: contains entry \"{entryName}\" which doesn't correspond to a " +
"base58-encoded SHA-256 hash."
)
)
}

let ensureFreeCache size = async {
if size > settings.FileSizeLimitBytes || size > settings.TotalCacheSizeLimitBytes then
return false
else
do! assertCacheValid()

let allEntries =
Directory.EnumerateFileSystemEntries settings.Directory
|> Seq.map FileInfo

// Now, sort the entries from newest to oldest, and start deleting if required at a point when we understand
// that there are too much files:
let entriesByPriority =
allEntries
|> Seq.sortByDescending(fun info -> info.LastWriteTimeUtc)
|> Seq.toArray

let mutable currentSize = 0UL
for info in entriesByPriority do
currentSize <- currentSize + Checked.uint64 info.Length
if currentSize + size > settings.TotalCacheSizeLimitBytes then
logger.Information("Deleting a cache item \"{FileName}\" ({Size} bytes)", info.Name, info.Length)
info.Delete()

return true
}

let download(uri: Uri): Async<Stream> = async {
let! ct = Async.CancellationToken

use client = httpClientFactory.CreateClient()
let! response = Async.AwaitTask <| client.GetAsync(uri, ct)
return! Async.AwaitTask <| response.EnsureSuccessStatusCode().Content.ReadAsStreamAsync()
}

let downloadIntoCacheAndGet uri cacheKey: Async<Stream> = async {
let! ct = Async.CancellationToken
let! stream = download uri
let path = getFilePath cacheKey
logger.Information("Saving {Uri} to path {Path}…", uri, path)

do! async { // to limit the cachedFile scope
use cachedFile = new FileStream(path, FileMode.CreateNew, FileAccess.Write, FileShare.None)
do! Async.AwaitTask(stream.CopyToAsync(cachedFile, ct))
logger.Information("Download successful: \"{Uri}\" to \"{Path}\".")
}

let! file = getFromCache cacheKey
return upcast Option.get file
}

let cancellation = new CancellationTokenSource()
let processRequest request: Async<Stream> = async {
logger.Information("Cache lookup for content {Uri} (cache key {CacheKey})", request.Uri, request.CacheKey)
match! getFromCache request.CacheKey with
| Some content ->
logger.Information("Cache hit for content {Uri} (cache key {CacheKey})", request.Uri, request.CacheKey)
return content
| None ->
logger.Information("No cache hit for content {Uri} (cache key {CacheKey}), will download", request.Uri, request.CacheKey)
let! shouldCache = ensureFreeCache request.Size
if shouldCache then
logger.Information("Resource {Uri} (cache key {CacheKey}, {Size} bytes) will fit into cache, caching", request.Uri, request.CacheKey, request.Size)
let! result = downloadIntoCacheAndGet request.Uri request.CacheKey
logger.Information("Resource {Uri} (cache key {CacheKey}, {Size} bytes) downloaded", request.Uri, request.CacheKey, request.Size)
return result
else
logger.Information("Resource {Uri} (cache key {CacheKey}) won't fit into cache, directly downloading", request.Uri, request.CacheKey)
let! result = download request.Uri
return result
}

let rec processLoop(processor: MailboxProcessor<_ * AsyncReplyChannel<_>>) = async {
while true do
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have several places with while true in async code already, and they seem non-problematic for now. I think I'll leave that as-is, but will keep an eye on it.

let! request, replyChannel = processor.Receive()
try
let! result = processRequest request
replyChannel.Reply(Some result)
with
| ex ->
logger.Error(ex, "Exception while processing the file download queue")
replyChannel.Reply None
}
let processor = MailboxProcessor.Start(processLoop, cancellation.Token)

interface IDisposable with
member _.Dispose() =
cancellation.Dispose()
(processor :> IDisposable).Dispose()

member _.Download(uri: Uri, cacheKey: string, size: uint64): Async<Stream option> =
processor.PostAndAsyncReply(fun chan -> ({
Uri = uri
CacheKey = cacheKey
Size = size
}, chan))
23 changes: 22 additions & 1 deletion Emulsion.Settings/Settings.fs
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,19 @@ type HostingSettings = {
HashIdSalt: string
}

type FileCacheSettings = {
Directory: string
FileSizeLimitBytes: uint64
TotalCacheSizeLimitBytes: uint64
}

type EmulsionSettings = {
Xmpp: XmppSettings
Telegram: TelegramSettings
Log: LogSettings
Database: DatabaseSettings option
Hosting: HostingSettings option
FileCache: FileCacheSettings option
}

let defaultConnectionTimeout = TimeSpan.FromMinutes 5.0
Expand All @@ -56,6 +63,12 @@ let private readTimeSpan defaultVal key section =
|> Option.defaultValue defaultVal

let read (config : IConfiguration) : EmulsionSettings =
let uint64OrDefault value ``default`` =
value
|> Option.ofObj
|> Option.map uint64
|> Option.defaultValue ``default``

let readXmpp (section : IConfigurationSection) = {
Login = section["login"]
Password = section["password"]
Expand Down Expand Up @@ -91,9 +104,17 @@ let read (config : IConfiguration) : EmulsionSettings =
}
| None, None, None -> None
| other -> failwith $"Parameter pack {other} represents invalid hosting settings."
let readFileCache(section: IConfigurationSection) =
Option.ofObj section["directory"]
|> Option.map(fun directory -> {
Directory = directory
FileSizeLimitBytes = uint64OrDefault section["fileSizeLimitBytes"] 1024UL * 1024UL
TotalCacheSizeLimitBytes = uint64OrDefault section["fileSizeLimitBytes"] 20UL * 1024UL * 1024UL
})

{ Xmpp = readXmpp <| config.GetSection("xmpp")
Telegram = readTelegram <| config.GetSection("telegram")
Log = readLog <| config.GetSection "log"
Database = readDatabase <| config.GetSection "database"
Hosting = readHosting <| config.GetSection "hosting" }
Hosting = readHosting <| config.GetSection "hosting"
FileCache = readFileCache <| config.GetSection "fileCache" }
28 changes: 27 additions & 1 deletion Emulsion.Telegram/Client.fs
Original file line number Diff line number Diff line change
@@ -1,11 +1,20 @@
namespace Emulsion.Telegram

open System
open System.Threading

open Emulsion.Database
open Emulsion.Messaging.MessageSystem
open Emulsion.Settings

type FileInfo = {
TemporaryLink: Uri
Size: uint64
}

type ITelegramClient =
abstract GetFileInfo: fileId: string -> Async<FileInfo option>

type Client(ctx: ServiceContext,
cancellationToken: CancellationToken,
telegramSettings: TelegramSettings,
Expand All @@ -15,10 +24,27 @@ type Client(ctx: ServiceContext,

let botConfig = { Funogram.Telegram.Bot.Config.defaultConfig with Token = telegramSettings.Token }

interface ITelegramClient with
member this.GetFileInfo(fileId) = async {
let logger = ctx.Logger
logger.Information("Querying file information for file {FileId}", fileId)
let! file = Funogram.sendGetFile botConfig fileId
match file.FilePath, file.FileSize with
| None, None ->
logger.Warning("File {FileId} was not found on server", fileId)
return None
| Some fp, Some sz ->
return Some {
TemporaryLink = Uri $"https://api.telegram.org/file/bot{telegramSettings.Token}/{fp}"
Size = Checked.uint64 sz
}
| x, y -> return failwith $"Unknown data received from Telegram server: {x}, {y}"
}

override _.RunUntilError receiver = async {
// Run loop of Telegram is in no need of any complicated start, so just return an async that will perform it:
return Funogram.run ctx.Logger telegramSettings databaseSettings hostingSettings botConfig receiver
}

override _.Send message =
Funogram.send telegramSettings botConfig message
Funogram.sendMessage telegramSettings botConfig message
18 changes: 13 additions & 5 deletions Emulsion.Telegram/Funogram.fs
Original file line number Diff line number Diff line change
Expand Up @@ -298,9 +298,9 @@ module MessageConverter =
else extractMessageContent replyTo links.ReplyToContentLinks
{ main = mainMessage; replyTo = Some replyToMessage }

let internal processSendResult(result: Result<'a, ApiResponseError>): unit =
let internal processSendResult(result: Result<'a, ApiResponseError>): 'a =
match result with
| Ok _ -> ()
| Ok x -> x
| Error e ->
failwith $"Telegram API Call processing error {e.ErrorCode}: {e.Description}"

Expand Down Expand Up @@ -347,15 +347,23 @@ let internal prepareHtmlMessage: Message -> string = function
| Authored {author = author; text = text} -> $"<b>{Html.escape author}</b>\n{Html.escape text}"
| Event {text = text} -> Html.escape text

let send (settings: TelegramSettings) (botConfig: BotConfig) (OutgoingMessage content): Async<unit> =
let private send (botConfig: BotConfig) request = api botConfig request

let sendGetFile (botConfig: BotConfig) (fileId: string): Async<File> = async {
let! result = send botConfig (Req.GetFile.Make fileId)
return processSendResult result
}

let sendMessage (settings: TelegramSettings) (botConfig: BotConfig) (OutgoingMessage content): Async<unit> =
let sendHtmlMessage (groupId: ChatId) text =
Req.SendMessage.Make(groupId, text, ParseMode.HTML)

let groupId = Int(int64 settings.GroupId)
let message = prepareHtmlMessage content
async {
let! result = api botConfig (sendHtmlMessage groupId message)
return processSendResult result
let! result = send botConfig (sendHtmlMessage groupId message)
processSendResult result |> ignore
return ()
}

let run (logger: ILogger)
Expand Down
25 changes: 25 additions & 0 deletions Emulsion.TestFramework/Emulsion.TestFramework.fsproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<Project Sdk="Microsoft.NET.Sdk.Web">

<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<OutputType>Library</OutputType>
</PropertyGroup>

<ItemGroup>
<Compile Include="LockedBuffer.fs" />
<Compile Include="Logging.fs" />
<Compile Include="Waiter.fs" />
<Compile Include="TestDataStorage.fs" />
<Compile Include="Exceptions.fs" />
<Compile Include="TelegramClientMock.fs" />
<Compile Include="WebFileStorage.fs" />
<Compile Include="SimpleHttpClientFactory.fs" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Serilog.Sinks.XUnit" Version="1.0.8" />
<ProjectReference Include="..\Emulsion.Database\Emulsion.Database.fsproj" />
<ProjectReference Include="..\Emulsion.Telegram\Emulsion.Telegram.fsproj" />
</ItemGroup>
</Project>
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module Emulsion.Tests.TestUtils.Exceptions
module Emulsion.TestFramework.Exceptions

open System
open Microsoft.EntityFrameworkCore
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace Emulsion.Tests.TestUtils
namespace Emulsion.TestFramework

type LockedBuffer<'T>() =
let messages = ResizeArray<'T>()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module Emulsion.Tests.TestUtils.Logging
module Emulsion.TestFramework.Logging

open Serilog
open Xunit.Abstractions
Expand Down
7 changes: 7 additions & 0 deletions Emulsion.TestFramework/SimpleHttpClientFactory.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace Emulsion.TestFramework

open System.Net.Http

type SimpleHttpClientFactory() =
interface IHttpClientFactory with
member this.CreateClient _ = new HttpClient()
14 changes: 14 additions & 0 deletions Emulsion.TestFramework/TelegramClientMock.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
namespace Emulsion.TestFramework

open System.Collections.Generic

open Emulsion.Telegram

type TelegramClientMock() =
let responses = Dictionary<string, FileInfo option>()

interface ITelegramClient with
member this.GetFileInfo fileId = async.Return responses[fileId]

member _.SetResponse(fileId: string, fileInfo: FileInfo option): unit =
responses[fileId] <- fileInfo
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module Emulsion.Tests.TestUtils.TestDataStorage
module Emulsion.TestFramework.TestDataStorage

open System.IO

Expand Down
Loading