Skip to content

Commit 15569a9

Browse files
committed
File index writing
1 parent 8ac4bb9 commit 15569a9

File tree

5 files changed

+104
-98
lines changed

5 files changed

+104
-98
lines changed

src/InMemoryIndex.zig

+26-33
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,14 @@ const Item = common.Item;
1010
const SearchResults = common.SearchResults;
1111
const Change = common.Change;
1212

13-
const Segment = @import("InMemorySegment.zig");
14-
const Segments = std.DoublyLinkedList(Segment);
13+
const InMemorySegment = @import("InMemorySegment.zig");
14+
const InMemorySegments = std.DoublyLinkedList(InMemorySegment);
1515

1616
allocator: std.mem.Allocator,
1717
write_lock: std.Thread.RwLock,
1818
merge_lock: std.Thread.Mutex,
19-
segments: Segments,
20-
max_items_per_segment: usize = 100000,
19+
segments: InMemorySegments,
20+
max_items_per_segment: usize = 1_000_000,
2121
max_segments: usize = 16,
2222
auto_cleanup: bool = true,
2323

@@ -37,28 +37,27 @@ pub fn deinit(self: *Self) void {
3737
defer self.write_lock.unlock();
3838

3939
while (self.segments.popFirst()) |node| {
40-
self.destroyNode(node);
40+
self.destroySegment(node);
4141
}
4242
}
4343

44-
fn destroyNode(self: *Self, node: *Segments.Node) void {
44+
fn createSegment(self: *Self) !*InMemorySegments.Node {
45+
const node = try self.allocator.create(InMemorySegments.Node);
46+
node.data = InMemorySegment.init(self.allocator);
47+
return node;
48+
}
49+
50+
fn destroySegment(self: *Self, node: *InMemorySegments.Node) void {
4551
node.data.deinit();
4652
self.allocator.destroy(node);
4753
}
4854

4955
pub fn update(self: *Self, changes: []const Change) !void {
5056
var committed = false;
5157

52-
const node = try self.allocator.create(Segments.Node);
58+
const node = try self.createSegment();
5359
defer {
54-
if (!committed) self.allocator.destroy(node);
55-
}
56-
57-
node.data = Segment.init(self.allocator);
58-
defer {
59-
if (!committed) {
60-
node.data.deinit();
61-
}
60+
if (!committed) self.destroySegment(node);
6261
}
6362

6463
var num_items: usize = 0;
@@ -134,9 +133,9 @@ fn hasNewerVersion(self: *Self, id: u32, version: u32) bool {
134133
}
135134

136135
const Merge = struct {
137-
first: *Segments.Node,
138-
last: *Segments.Node,
139-
replacement: *Segments.Node,
136+
first: *InMemorySegments.Node,
137+
last: *InMemorySegments.Node,
138+
replacement: *InMemorySegments.Node,
140139
};
141140

142141
fn prepareMerge(self: *Self) !?Merge {
@@ -158,7 +157,7 @@ fn prepareMerge(self: *Self) !?Merge {
158157
return null;
159158
}
160159

161-
var best_node: ?*Segments.Node = null;
160+
var best_node: ?*InMemorySegments.Node = null;
162161
var best_score: f64 = std.math.inf(f64);
163162
segments_iter = self.segments.first;
164163
var level_size = @as(f64, @floatFromInt(total_size)) / 2;
@@ -187,23 +186,17 @@ fn prepareMerge(self: *Self) !?Merge {
187186

188187
const segment1 = node1.data;
189188
const segment2 = node2.data;
190-
const segments = [2]Segment{ segment1, segment2 };
189+
const segments = [2]InMemorySegment{ segment1, segment2 };
191190

192191
var committed = false;
193192

194-
const node = try self.allocator.create(Segments.Node);
193+
const node = try self.createSegment();
195194
defer {
196-
if (!committed) self.allocator.destroy(node);
195+
if (!committed) self.destroySegment(node);
197196
}
198197

199198
const merge = Merge{ .first = node1, .last = node2, .replacement = node };
200199

201-
node.data = Segment.init(self.allocator);
202-
defer {
203-
if (!committed) {
204-
node.data.deinit();
205-
}
206-
}
207200
node.data.version = segment1.version;
208201

209202
var total_docs: usize = 0;
@@ -267,11 +260,11 @@ fn commitMerge(self: *Self, merge: Merge) void {
267260

268261
self.segments.insertAfter(merge.last, merge.replacement);
269262

270-
var iter: ?*Segments.Node = merge.first;
263+
var iter: ?*InMemorySegments.Node = merge.first;
271264
while (iter) |node| {
272265
iter = node.next;
273266
self.segments.remove(node);
274-
self.destroyNode(node);
267+
self.destroySegment(node);
275268
if (node == merge.last) break;
276269
}
277270

@@ -288,7 +281,7 @@ fn mergeSegments(self: *Self) !void {
288281
}
289282
}
290283

291-
pub fn freezeFirstSegment(self: *Self) ?*Segment {
284+
pub fn freezeFirstSegment(self: *Self) ?*InMemorySegment {
292285
self.merge_lock.lock();
293286
defer self.merge_lock.unlock();
294287

@@ -311,7 +304,7 @@ pub fn freezeFirstSegment(self: *Self) ?*Segment {
311304
return null;
312305
}
313306

314-
pub fn removeFrozenSegment(self: *Self, segment: *Segment) void {
307+
pub fn removeSegment(self: *Self, segment: *InMemorySegment) void {
315308
self.merge_lock.lock();
316309
defer self.merge_lock.unlock();
317310

@@ -323,7 +316,7 @@ pub fn removeFrozenSegment(self: *Self, segment: *Segment) void {
323316
if (&node.data == segment) {
324317
if (node.data.frozen) {
325318
self.segments.remove(node);
326-
self.destroyNode(node);
319+
self.destroySegment(node);
327320
return;
328321
}
329322
}

src/Index.zig

+38-9
Original file line numberDiff line numberDiff line change
@@ -62,32 +62,61 @@ pub fn start(self: *Self) !void {
6262
pub fn deinit(self: *Self) void {
6363
self.stage.deinit();
6464
while (self.segments.popFirst()) |node| {
65-
node.data.deinit();
66-
self.allocator.destroy(node);
65+
self.destroySegment(node);
6766
}
6867
self.scheduler.deinit();
6968
}
7069

70+
fn createSegment(self: *Self) !*Segments.Node {
71+
const node = try self.allocator.create(Segments.Node);
72+
node.data = Segment.init(self.allocator);
73+
return node;
74+
}
75+
76+
fn destroySegment(self: *Self, node: *Segments.Node) void {
77+
node.data.deinit();
78+
self.allocator.destroy(node);
79+
}
80+
81+
fn write(self: *Self) !void {
82+
var file = try self.dir.atomicFile("index.dat", .{});
83+
defer file.deinit();
84+
85+
var writer = file.file.writer();
86+
try writer.writeInt(u32, @truncate(self.segments.len), .little);
87+
var it = self.segments.first;
88+
while (it) |node| : (it = node.next) {
89+
try writer.writeInt(u32, node.data.version[0], .little);
90+
try writer.writeInt(u32, node.data.version[1], .little);
91+
}
92+
93+
try file.finish();
94+
}
95+
7196
fn cleanup(self: *Self) !void {
7297
if (!self.run_cleanup) return;
7398

7499
log.info("running cleanup", .{});
75100

76101
// try self.stage.cleanup();
77102

78-
const staged_segment_or_null = self.stage.freezeFirstSegment();
79-
if (staged_segment_or_null) |staged_segment| {
80-
const segment = Segment.init(self.allocator);
81-
try segment.convert(self.dir, staged_segment);
103+
if (self.stage.freezeFirstSegment()) |segment| {
104+
var commited = false;
105+
const node = try self.createSegment();
106+
defer {
107+
if (!commited) self.destroySegment(node);
108+
}
109+
110+
try node.data.convert(self.dir, segment);
82111

83112
self.write_lock.lock();
84113
defer self.write_lock.unlock();
85114

86-
var node = try self.allocator.create(Segments.Node);
87-
node.data = segment;
115+
try self.write();
88116

89-
self.stage.removeFrozenSegment(staged_segment);
117+
self.stage.removeSegment(segment);
90118
self.segments.append(node);
119+
commited = true;
91120
}
92121
}
93122

src/Segment.zig

+36-9
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,10 @@ const InMemorySegment = @import("InMemorySegment.zig");
99

1010
const filefmt = @import("filefmt.zig");
1111

12+
pub const Version = std.meta.Tuple(&.{ u32, u32 });
13+
1214
allocator: std.mem.Allocator,
13-
version: u32 = 0,
15+
version: Version = .{ 0, 0 },
1416
docs: std.AutoHashMap(u32, bool),
1517
index: std.ArrayList(u32),
1618
block_size: usize = 0,
@@ -68,14 +70,20 @@ pub fn search(self: *Self, hashes: []const u32, results: *SearchResults) !void {
6870
}
6971
const matches = std.sort.equalRange(Item, Item{ .hash = hash, .id = 0 }, block_items.items, {}, Item.cmpByHash);
7072
for (matches[0]..matches[1]) |i| {
71-
try results.incr(block_items.items[i].id, self.version);
73+
try results.incr(block_items.items[i].id, self.version[1]);
7274
}
7375
}
7476
}
7577
}
7678

7779
const max_file_name_size = 255;
78-
const file_name_fmt = "segment-{d}.data";
80+
const file_name_fmt = "segment-{d}-{d}.dat";
81+
82+
fn read(self: *Self, dir: std.fs.Dir, file_name: []const u8) !void {
83+
const file = try dir.openFile(file_name, .{});
84+
defer file.close();
85+
try filefmt.readFile(file, self);
86+
}
7987

8088
pub fn convert(self: *Self, dir: std.fs.Dir, source: *InMemorySegment) !void {
8189
if (!source.frozen) {
@@ -86,13 +94,31 @@ pub fn convert(self: *Self, dir: std.fs.Dir, source: *InMemorySegment) !void {
8694
}
8795

8896
var file_name_buf: [max_file_name_size]u8 = undefined;
89-
const file_name = try std.fmt.bufPrint(&file_name_buf, file_name_fmt, .{source.version});
97+
const file_name = try std.fmt.bufPrint(&file_name_buf, file_name_fmt, .{ source.version, source.version });
9098

91-
const file = try dir.createFile(file_name, .{ .exclusive = true, .read = true });
92-
defer file.close();
99+
var file = try dir.atomicFile(file_name, .{});
100+
defer file.deinit();
101+
try filefmt.writeFile(file.file.writer(), source);
102+
try file.finish();
93103

94-
try filefmt.writeFile(file.writer(), source);
95-
try filefmt.readFile(file, self);
104+
try self.read(dir, file_name);
105+
}
106+
107+
pub fn merge(self: *Self, dir: std.fs.Dir, sources: [2]*Self) !void {
108+
if (sources[0].version[1] + 1 != sources[1].version[0]) {
109+
return error.SourceSegmentVersionMismatch;
110+
}
111+
112+
self.version[0] = sources[0].version[0];
113+
self.version[1] = sources[1].version[1];
114+
115+
var file_name_buf: [max_file_name_size]u8 = undefined;
116+
const file_name = try std.fmt.bufPrint(&file_name_buf, file_name_fmt, .{ self.version[0], self.version[1] });
117+
118+
var file = try dir.atomicFile(file_name, .{});
119+
defer file.deinit();
120+
121+
try self.read(dir, file_name);
96122
}
97123

98124
test "convert" {
@@ -113,7 +139,8 @@ test "convert" {
113139

114140
try segment.convert(tmpDir.dir, &source);
115141

116-
try std.testing.expectEqual(1, segment.version);
142+
try std.testing.expectEqual(1, segment.version[0]);
143+
try std.testing.expectEqual(1, segment.version[1]);
117144
try std.testing.expectEqual(1, segment.docs.count());
118145
try std.testing.expectEqual(1, segment.index.items.len);
119146
}

src/SegmentList.zig

-45
This file was deleted.

src/filefmt.zig

+4-2
Original file line numberDiff line numberDiff line change
@@ -342,7 +342,8 @@ pub fn readFile(file: fs.File, segment: *Segment) !void {
342342
}
343343
const num_blocks = blocks_data_size / header.block_size;
344344

345-
segment.version = header.version;
345+
segment.version[0] = header.version;
346+
segment.version[1] = header.version;
346347
segment.block_size = header.block_size;
347348

348349
try segment.docs.ensureTotalCapacity(header.num_docs);
@@ -403,7 +404,8 @@ test "writeFile/readFile" {
403404

404405
try readFile(file, &segment);
405406

406-
try testing.expectEqual(1, segment.version);
407+
try testing.expectEqual(1, segment.version[0]);
408+
try testing.expectEqual(1, segment.version[1]);
407409
try testing.expectEqual(1, segment.docs.count());
408410
try testing.expectEqual(1, segment.index.items.len);
409411
try testing.expectEqual(1, segment.index.items[0]);

0 commit comments

Comments
 (0)