Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Telegram content proxy #163

Merged
merged 23 commits into from
Aug 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
9eb8f30
(#102) Web: add a dependency on Telegram link resolver
ForNeVeR Aug 6, 2022
afbe2f8
(#102) ContentProxy: add a FileCache
ForNeVeR Aug 7, 2022
800546a
(#102) ContentProxy: finally, make it compile
ForNeVeR Aug 20, 2022
3e7b422
(#102) FileCacheTests: preliminary test API
ForNeVeR Aug 20, 2022
97db22a
(#102) TestFramework: extract the code from TestUtils
ForNeVeR Aug 20, 2022
4d6bfc5
(#102) ContentProxy: finish working FileCache
ForNeVeR Aug 21, 2022
a5e26a1
(#102) FileCacheTests: implement an ordering test
ForNeVeR Aug 25, 2022
6ea4892
(#102) FileCache: cache directory validation tests
ForNeVeR Aug 25, 2022
cb218b6
(#102) FileCache: additional tests
ForNeVeR Aug 27, 2022
b50d615
(#102) FileCache: finish the last tests
ForNeVeR Aug 27, 2022
9100471
(#102) ContentController: test redirect mode
ForNeVeR Aug 27, 2022
107c4be
(#102) ContentController: last test groundwork
ForNeVeR Aug 27, 2022
e8e8153
(#102) FileCache: async stream optimization
ForNeVeR Aug 28, 2022
067da2d
(#102) ContentController: add last tests
ForNeVeR Aug 28, 2022
9292428
(#102) ContentController: make it work in manual tests
ForNeVeR Aug 28, 2022
b02512c
(#102) ContentProxy: some small fixes
ForNeVeR Aug 28, 2022
5d954d6
(#102) ContentProxy: add file names and MIME types
ForNeVeR Aug 28, 2022
3977248
(#102) FileCache: support older versions of Windows
ForNeVeR Aug 28, 2022
fb5dc3a
Docs: a slight improvement
ForNeVeR Aug 28, 2022
a58f54e
(#102) FileCache: drop redundant rec
ForNeVeR Aug 28, 2022
b2cccee
(#102) FileCache: improve the workarounds for the older versions of W…
ForNeVeR Aug 28, 2022
2861ee8
(#102) ContentProxy: redesign the attribute optionality
ForNeVeR Aug 28, 2022
7936682
(#102) Settings: update the example
ForNeVeR Aug 28, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion Emulsion.ContentProxy/ContentStorage.fs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ type MessageContentIdentity = {
ChatUserName: string
MessageId: int64
FileId: string
FileName: string
MimeType: string
}

let getOrCreateMessageRecord (context: EmulsionDbContext) (id: MessageContentIdentity): Async<TelegramContent> = async {
Expand All @@ -17,7 +19,9 @@ let getOrCreateMessageRecord (context: EmulsionDbContext) (id: MessageContentIde
for content in context.TelegramContents do
where (content.ChatUserName = id.ChatUserName
&& content.MessageId = id.MessageId
&& content.FileId = id.FileId)
&& content.FileId = id.FileId
&& content.FileName = id.FileName
&& content.MimeType = id.MimeType)
} |> tryExactlyOneAsync
match existingItem with
| None ->
Expand All @@ -26,6 +30,8 @@ let getOrCreateMessageRecord (context: EmulsionDbContext) (id: MessageContentIde
ChatUserName = id.ChatUserName
MessageId = id.MessageId
FileId = id.FileId
FileName = id.FileName
MimeType = id.MimeType
}
do! addAsync context.TelegramContents newItem
return newItem
Expand Down
7 changes: 7 additions & 0 deletions Emulsion.ContentProxy/Emulsion.ContentProxy.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,21 @@
<ItemGroup>
<Compile Include="Proxy.fs" />
<Compile Include="ContentStorage.fs" />
<Compile Include="FileCache.fs" />
<Compile Include="SimpleHttpClientFactory.fs" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Emulsion.Database\Emulsion.Database.fsproj" />
<ProjectReference Include="..\Emulsion.Settings\Emulsion.Settings.fsproj" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Hashids.net" Version="1.4.1" />
<PackageReference Include="JetBrains.Lifetimes" Version="2020.2.2" />
<PackageReference Include="Microsoft.Extensions.Http" Version="6.0.0" />
<PackageReference Include="Serilog" Version="2.10.0" />
<PackageReference Include="SimpleBase" Version="3.1.0" />
</ItemGroup>

</Project>
218 changes: 218 additions & 0 deletions Emulsion.ContentProxy/FileCache.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
namespace Emulsion.ContentProxy

open System
open System.IO
open System.Net.Http
open System.Security.Cryptography
open System.Text
open System.Threading

open JetBrains.Collections.Viewable
open Serilog
open SimpleBase

open Emulsion.Settings

type DownloadRequest = {
Uri: Uri
CacheKey: string
Size: uint64
}

module Base58 =
/// Suggested by @ttldtor.
let M4N71KR = Base58(Base58Alphabet "123456789qwertyuiopasdfghjkzxcvbnmQWERTYUPASDFGHJKLZXCVBNM")

module FileCache =
let EncodeFileName(sha256: SHA256, cacheKey: string): string =
cacheKey
|> Encoding.UTF8.GetBytes
|> sha256.ComputeHash
|> Base58.M4N71KR.Encode

let TryDecodeFileNameToSha256Hash(fileName: string): byte[] option =
try
Some <| (Base58.M4N71KR.Decode fileName).ToArray()
with
| :? ArgumentException -> None

let IsMoveAndDeleteModeEnabled =
// NOTE: On older versions of Windows (known to reproduce on windows-2019 GitHub Actions image), the following
// scenario may be defunct:
//
// - open a file with FileShare.Delete (i.e. for download)
// - delete a file (i.e. during the cache cleanup)
// - try to create a file with the same name again
//
// According to this article
// (https://boostgsoc13.github.io/boost.afio/doc/html/afio/FAQ/deleting_open_files.html), it is impossible to do
// since file will occupy its disk name until the last handle is closed.
//
// In practice, this is allowed (checked at least on Windows 10 20H2 and windows-2022 GitHub Actions image), but
// some tests are known to be broken on older versions of Windows (windows-2019).
//
// As a workaround, let's rename the file to a random name before deleting it.
//
// This workaround may be removed after these older versions of Windows goes out of support.
OperatingSystem.IsWindows()

type FileCache(logger: ILogger,
settings: FileCacheSettings,
httpClientFactory: IHttpClientFactory,
sha256: SHA256) =

let error = Signal<Exception>()

let getFilePath(cacheKey: string) =
Path.Combine(settings.Directory, FileCache.EncodeFileName(sha256, cacheKey))

let readFileOptions =
FileStreamOptions(Mode = FileMode.Open, Access = FileAccess.Read, Options = FileOptions.Asynchronous, Share = (FileShare.Read ||| FileShare.Delete))

let writeFileOptions =
FileStreamOptions(Mode = FileMode.CreateNew, Access = FileAccess.Write, Options = FileOptions.Asynchronous, Share = FileShare.None)

let getFromCache(cacheKey: string) = async {
let path = getFilePath cacheKey
return
if File.Exists path then
Some(new FileStream(path, readFileOptions))
else
None
}

let enumerateCacheFiles() =
let entries = Directory.EnumerateFileSystemEntries settings.Directory
if FileCache.IsMoveAndDeleteModeEnabled then
entries |> Seq.filter(fun p -> not(p.EndsWith ".deleted"))
else
entries

let deleteFileSafe (fileInfo: FileInfo) = async {
if FileCache.IsMoveAndDeleteModeEnabled then
fileInfo.MoveTo(Path.Combine(fileInfo.DirectoryName, $"{Guid.NewGuid().ToString()}.deleted"))
fileInfo.Delete()
else
fileInfo.Delete()
}

let assertCacheDirectoryExists() = async {
Directory.CreateDirectory settings.Directory |> ignore
}

let assertCacheValid() = async {
enumerateCacheFiles()
|> Seq.iter(fun entry ->
let entryName = Path.GetFileName entry

if not <| File.Exists entry
then failwith $"Cache directory invalid: contains a subdirectory \"{entryName}\"."

match FileCache.TryDecodeFileNameToSha256Hash entryName with
| Some hash when hash.Length = sha256.HashSize / 8 -> ()
| _ ->
failwith (
$"Cache directory invalid: contains an entry \"{entryName}\" which doesn't correspond to a " +
"base58-encoded SHA-256 hash."
)
)
}

let ensureFreeCache size = async {
if size > settings.FileSizeLimitBytes || size > settings.TotalCacheSizeLimitBytes then
return false
else
do! assertCacheDirectoryExists()
do! assertCacheValid()

let allEntries = enumerateCacheFiles() |> Seq.map FileInfo

// Now, sort the entries from newest to oldest, and start deleting if required at a point when we understand
// that there are too much files:
let entriesByPriority =
allEntries
|> Seq.sortByDescending(fun info -> info.LastWriteTimeUtc)
|> Seq.toArray

let mutable currentSize = 0UL
for info in entriesByPriority do
currentSize <- currentSize + Checked.uint64 info.Length
if currentSize + size > settings.TotalCacheSizeLimitBytes then
logger.Information("Deleting a cache item \"{FileName}\" ({Size} bytes)", info.Name, info.Length)
do! deleteFileSafe info

return true
}

let download(uri: Uri): Async<Stream> = async {
let! ct = Async.CancellationToken

use client = httpClientFactory.CreateClient()
let! response = Async.AwaitTask <| client.GetAsync(uri, ct)
return! Async.AwaitTask <| response.EnsureSuccessStatusCode().Content.ReadAsStreamAsync()
}

let downloadIntoCacheAndGet uri cacheKey: Async<Stream> = async {
let! ct = Async.CancellationToken
let! stream = download uri
let path = getFilePath cacheKey
logger.Information("Saving {Uri} to path {Path}…", uri, path)

do! async { // to limit the cachedFile scope
use cachedFile = new FileStream(path, writeFileOptions)
do! Async.AwaitTask(stream.CopyToAsync(cachedFile, ct))
logger.Information("Download successful: \"{Uri}\" to \"{Path}\".", uri, path)
}

let! file = getFromCache cacheKey
return upcast Option.get file
}

let cancellation = new CancellationTokenSource()
let processRequest request: Async<Stream> = async {
logger.Information("Cache lookup for content {Uri} (cache key {CacheKey})", request.Uri, request.CacheKey)
match! getFromCache request.CacheKey with
| Some content ->
logger.Information("Cache hit for content {Uri} (cache key {CacheKey})", request.Uri, request.CacheKey)
return content
| None ->
logger.Information("No cache hit for content {Uri} (cache key {CacheKey}), will download", request.Uri, request.CacheKey)
let! shouldCache = ensureFreeCache request.Size
if shouldCache then
logger.Information("Resource {Uri} (cache key {CacheKey}, {Size} bytes) will fit into cache, caching", request.Uri, request.CacheKey, request.Size)
let! result = downloadIntoCacheAndGet request.Uri request.CacheKey
logger.Information("Resource {Uri} (cache key {CacheKey}, {Size} bytes) downloaded", request.Uri, request.CacheKey, request.Size)
return result
else
logger.Information("Resource {Uri} (cache key {CacheKey}) won't fit into cache, directly downloading", request.Uri, request.CacheKey)
let! result = download request.Uri
return result
}

let processLoop(processor: MailboxProcessor<_ * AsyncReplyChannel<_>>) = async {
while true do
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have several places with while true in async code already, and they seem non-problematic for now. I think I'll leave that as-is, but will keep an eye on it.

let! request, replyChannel = processor.Receive()
try
let! result = processRequest request
replyChannel.Reply(Some result)
with
| ex ->
logger.Error(ex, "Exception while processing the file download queue")
error.Fire ex
replyChannel.Reply None
}
let processor = MailboxProcessor.Start(processLoop, cancellation.Token)

interface IDisposable with
member _.Dispose() =
cancellation.Dispose()
(processor :> IDisposable).Dispose()

member _.Download(uri: Uri, cacheKey: string, size: uint64): Async<Stream option> =
processor.PostAndAsyncReply(fun chan -> ({
Uri = uri
CacheKey = cacheKey
Size = size
}, chan))

member _.Error: ISource<Exception> = error
7 changes: 7 additions & 0 deletions Emulsion.ContentProxy/SimpleHttpClientFactory.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace Emulsion.ContentProxy

open System.Net.Http

type SimpleHttpClientFactory() =
interface IHttpClientFactory with
member this.CreateClient _ = new HttpClient()
2 changes: 2 additions & 0 deletions Emulsion.Database/Entities.fs
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,6 @@ type TelegramContent = {
ChatUserName: string
MessageId: int64
FileId: string
FileName: string
MimeType: string
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// <auto-generated />
namespace Emulsion.Database.Migrations

open System
open Emulsion.Database
open Microsoft.EntityFrameworkCore
open Microsoft.EntityFrameworkCore.Infrastructure
open Microsoft.EntityFrameworkCore.Migrations

[<DbContext(typeof<EmulsionDbContext>)>]
[<Migration("20220828133844_ContentFileNameAndMimeType")>]
type ContentFileNameAndMimeType() =
inherit Migration()

override this.Up(migrationBuilder:MigrationBuilder) =
migrationBuilder.AddColumn<string>(
name = "FileName"
,table = "TelegramContents"
,``type`` = "TEXT"
,nullable = true
,defaultValue = "file.bin"
) |> ignore

migrationBuilder.AddColumn<string>(
name = "MimeType"
,table = "TelegramContents"
,``type`` = "TEXT"
,nullable = true
,defaultValue = "application/octet-stream"
) |> ignore

migrationBuilder.Sql @"
drop index TelegramContents_Unique;

create unique index TelegramContents_Unique
on TelegramContents(ChatUserName, MessageId, FileId, FileName, MimeType)
" |> ignore


override this.Down(migrationBuilder:MigrationBuilder) =
migrationBuilder.DropColumn(
name = "FileName"
,table = "TelegramContents"
) |> ignore

migrationBuilder.DropColumn(
name = "MimeType"
,table = "TelegramContents"
) |> ignore

migrationBuilder.Sql @"
drop index TelegramContents_Unique;

create unique index TelegramContents_Unique
on TelegramContents(ChatUserName, MessageId, FileId)
" |> ignore


override this.BuildTargetModel(modelBuilder: ModelBuilder) =
modelBuilder
.HasAnnotation("ProductVersion", "5.0.10")
|> ignore

modelBuilder.Entity("Emulsion.Database.Entities.TelegramContent", (fun b ->

b.Property<Int64>("Id")
.IsRequired(true)
.ValueGeneratedOnAdd()
.HasColumnType("INTEGER") |> ignore
b.Property<string>("ChatUserName")
.IsRequired(false)
.HasColumnType("TEXT") |> ignore
b.Property<string>("FileId")
.IsRequired(false)
.HasColumnType("TEXT") |> ignore
b.Property<string>("FileName")
.IsRequired(false)
.HasColumnType("TEXT") |> ignore
b.Property<Int64>("MessageId")
.IsRequired(true)
.HasColumnType("INTEGER") |> ignore
b.Property<string>("MimeType")
.IsRequired(false)
.HasColumnType("TEXT") |> ignore

b.HasKey("Id") |> ignore

b.ToTable("TelegramContents") |> ignore

)) |> ignore

Loading