Skip to content

Commit 9045930

Browse files
author
Dominique Quatravaux
committed
[finish] the syncer
- Abandon the idea of bidirectional sync, due to hackmdio/codimd#1013 - Therefore, revert everything chokidar, and the `initialScan` of the PostgreSQL database - We only track changes after the syncer starts syncing - Last command-line argument designates the directory to sync to - AtomicCompareUpdateFile class that does the ole `O_WRONLY | O_CREAT | O_EXCL` dance to create a backup file, then uses `write-file-atomic` to update the main file, then deletes the backup file if and only if it is known to be useless (i.e. equal to the `oldContent` of the PostgreSQL event) - Use a `defer()` + `concatAll()` combo to prevent multiple attempts to save to the file system from occuring in parallel - Touch up Dockerfile + docker-compose rig
1 parent 7cd767b commit 9045930

File tree

3 files changed

+72
-74
lines changed

3 files changed

+72
-74
lines changed

Diff for: docker-compose.yml

+2-1
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,9 @@ services:
3131
image: epflidevfsd/codimd-syncer
3232
environment:
3333
- DB_URL=postgres://codimd:${POSTGRES_PASSWORD}@postgres/codimd
34+
- DEBUG=syncer
3435
volumes:
35-
- ./:/home/work:rw
36+
- ./:/markdown:rw
3637
volumes:
3738
database-data: {}
3839
upload-data: {}

Diff for: syncer/Dockerfile

+2-2
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,5 @@ WORKDIR /sync2git
55
ADD . .
66
RUN npm i
77

8-
RUN mkdir /depot
9-
CMD npx ts-node /sync2git/index.ts /depot
8+
RUN mkdir /markdown
9+
CMD npx ts-node /sync2git/index.ts /markdown

Diff for: syncer/index.ts

+68-71
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,17 @@
1-
import * as path from 'path'
2-
import { promises as fs } from 'fs'
1+
/**
2+
* Sync changes from CodiMD's PostgreSQL database to the file system.
3+
*
4+
* Due to https://github.com/hackmdio/codimd/issues/1013 it is not
5+
* currently possible to do the opposite.
6+
*/
7+
8+
import { promises as fs, constants as fsc } from 'fs'
39
import { Pool as PgPool } from 'pg'
410
import pgCreateSubscriber from 'pg-listen'
511
import { Subscriber as PgSubscriber } from 'pg-listen'
6-
import { concat, from, Observable } from "rxjs"
7-
import { flatMap } from 'rxjs/operators'
8-
import * as chokidar from 'chokidar'
12+
import { concat, defer, from, of, Observable } from "rxjs"
13+
import { concatAll, map, flatMap, tap } from 'rxjs/operators'
14+
import * as writeFileAtomic from 'write-file-atomic'
915

1016
import debug_ from "debug"
1117
const debug = debug_("syncer")
@@ -14,10 +20,28 @@ async function main (argv: string[]) {
1420
const config = parseCommandLine(argv)
1521

1622
const pgNotesStream = new PgNotesStream(config)
17-
pgNotesStream.stream().subscribe((c) => console.log([c.filename, c.shortid, c.newContent.length]))
18-
19-
const fsNotesStream = new FsNotesStream(config)
20-
fsNotesStream.stream().subscribe((c) => console.log([c.filename, c.newContent.length]))
23+
pgNotesStream.stream()
24+
.pipe(
25+
tap((chg) => debug('Change: %o %o', chg.shortid, chg.newContent.length)),
26+
map((chg) => {
27+
if (! chg.newContent) return of()
28+
if (chg.oldContent === chg.newContent) return of()
29+
const content = chg.newContent,
30+
filename = parseFilenameFromHeader(content)
31+
if (! filename) return of()
32+
debug('%o needs sync to %o', chg.shortid, filename)
33+
34+
return defer(() => {
35+
debug('Sync starting on %o', filename)
36+
const updater = new AtomicCompareUpdateFile(config, filename)
37+
38+
return from(updater.update(chg.oldContent, content))
39+
.pipe(tap(() => debug('Sync done on %o', filename)))
40+
})
41+
}),
42+
concatAll() // defer() + concatAll() = don't parallelize saves
43+
)
44+
.subscribe(() => {})
2145
}
2246

2347
/////////////////////////// PostgreSQL interface //////////////////////////////
@@ -149,7 +173,6 @@ module PgNotesStream {
149173
shortid: string
150174
oldContent: string
151175
newContent: string
152-
filename: string
153176
}
154177
}
155178

@@ -210,85 +233,59 @@ class PgNotesStream {
210233
return {
211234
shortid: row.shortid,
212235
oldContent: row.old_content,
213-
newContent: row.new_content,
214-
filename: parseFilenameFromHeader(row.new_content ||
215-
row.old_content)
236+
newContent: row.new_content
216237
}
217238
}
218239

219-
220-
private async initialScan () : Promise<PgNotesStream.Change[]> {
221-
const client = getPool(),
222-
res = await client.query('SELECT shortid, content FROM "Notes"')
223-
224-
return res.rows.map((row) => ({
225-
shortid: row.shortid,
226-
oldContent: null,
227-
newContent: row.content,
228-
filename: parseFilenameFromHeader(row.content)
229-
}))
230-
}
231-
232240
public stream () : Observable<PgNotesStream.Change> {
233-
function fromPromisedArray<T> (p : Promise<T[]>)
234-
: Observable<T> {
235-
return from(p).pipe(flatMap((array) => from(array)))
236-
}
237-
238241
return concat(
239-
fromPromisedArray(this.initialScan()),
240242
this.listenPg().pipe(
241243
flatMap((pgEvent) => this.consumeChange(
242244
pgEvent.sync_revisions_id))
243245
))
244246
}
245247
}
246248

247-
/////////////////////////// FsNotesStream class //////////////////////
249+
/////////////////////////// Filesystem operations //////////////////////
248250

249-
module FsNotesStream {
250-
export type Change = {
251-
filename: string,
252-
newContent: string
251+
class AtomicCompareUpdateFile {
252+
private filename : string
253+
constructor (private config: Config, basename : string) {
254+
this.filename = this.config.markdownDir + '/' + basename
255+
}
256+
257+
public async update (from : string, to : string) {
258+
const [oldContents, backupPath] = await this.mkbackup()
259+
await writeFileAtomic(this.filename, to)
260+
if (from === oldContents) await fs.unlink(backupPath)
253261
}
254-
}
255-
256-
class FsNotesStream {
257-
constructor (private config : Config) {}
258262

259-
stream () : Observable<FsNotesStream.Change> {
260-
return new Observable(sub => {
261-
async function fileChanged(f : string) {
262-
const fileText = await fs.readFile(f, "utf8")
263-
sub.next({
264-
filename: path.basename(f),
265-
newContent: fileText
266-
})
263+
private async mkbackup () : Promise<[string, string]> {
264+
let uniqueCounter = 0
265+
const anotherBackupFilename = () => this.filename + '.BAK-' + uniqueCounter++
266+
let fd : fs.FileHandle, backupFilename : string
267+
268+
for(backupFilename = anotherBackupFilename();;
269+
backupFilename = anotherBackupFilename()) {
270+
try {
271+
fd = await fs.open(backupFilename,
272+
fsc.O_WRONLY | fsc.O_CREAT | fsc.O_EXCL)
273+
break
274+
} catch (e) {
275+
if (e.code === 'EEXIST') {
276+
continue
277+
} else {
278+
throw e
279+
}
267280
}
281+
}
268282

269-
const watched = this.config.markdownDir + '/*.md'
270-
debug('chokidar: watching %o', watched)
271-
const watcher = chokidar.watch(
272-
watched,
273-
{ persistent: true })
274-
watcher.on('add', fileChanged)
275-
watcher.on('change', fileChanged)
276-
})
277-
}
278-
}
283+
const contents = await fs.readFile(this.filename)
284+
await fd.writeFile(contents)
285+
await fd.close()
279286

280-
/**
281-
* Creates an observable for the action of committing a revision to Git.
282-
*
283-
* The observable starts the commit operation only when it is subscribed
284-
* to. It then returns a single value (the commit SHA1) and completes.
285-
*/
286-
async function saveNote (_config: Config, shortId: string,
287-
text: string) {
288-
debug('Save on %o starts; text: %o', shortId, text)
289-
await new Promise(resolve => setTimeout(resolve, 2000)); // Sleep
290-
debug('Save on %o done; text: %o', shortId, text)
291-
return `XXXXsha1sha1${shortId}`
287+
return [contents.toString("utf8"), backupFilename]
288+
}
292289
}
293290

294291
////////////////////////// Ancillary functions ///////////////////////////

0 commit comments

Comments
 (0)