-
Notifications
You must be signed in to change notification settings - Fork 1
/
ewave.py
450 lines (377 loc) · 15 KB
/
ewave.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
# -*- mode: python -*-
"""
Read and write WAVE format files.
Differences from wave.py in the python standard library (on which much of this
is based):
* Support is provided for reading and writing standard and extended format files
with 8, 16, 32, or 64-bit linear PCM encoding, or with 32 or 64-bit IEEE float
encoding.
* Data access is through numpy.memmap whenever possible. This speeds reading
large files and allows files to be edited in place, by opening a file in 'r+'
mode and calling read() with memmap='r+'; appending also works in this mode,
as long as the data chunk is the last in the file. Files opened in 'w+' mode
can be read after writing.
* A single class handles both read and write operations.
* Non-accessor methods can be chained, e.g. fp.write(data).flush()
Note that WAV files cannot store more than 2-4 GiB of data. Mu-law, A-law, and
other exotic encoding schemes are not supported. Does not support bit packings
where the container sizes don't correspond to mmapable types (e.g. 24 bit). Try
libsndfile for those sorts of files.
Copyright (C) 2012-2023 Dan Meliza <dan // AT // meliza.org>
"""
from pathlib import Path
from typing import BinaryIO, Optional, Union
import numpy as np
import numpy.typing as npt
# chunk.Chunk is deprecated in py311 and will be removed in py313
try:
from wave import _Chunk as Chunk
except ImportError:
from chunk import Chunk
WAVE_FORMAT_PCM = 0x0001
WAVE_FORMAT_IEEE_FLOAT = 0x0003
WAVE_FORMAT_EXTENSIBLE = 0xFFFE
__version__ = "1.0.9"
class Error(Exception):
pass
class wavfile:
"""A WAVE file for reading and/or writing.
file: the path of the file to open, or an open file-like object
mode: the mode to open the file (r, r+, w, w+). If already open,
uses the file's handle.
sampling_rate: for 'w' mode only, set the sampling rate of the data
dtype: for 'w' mode only, set the storage format using one of the following codes:
'b','h','i','l': 8,16,32,64-bit PCM
'f','d': 32,64-bit IEEE float
nchannels: for 'w' mode only, set the number of channels to store
additional keyword arguments are ignored
The returned object may be used as a context manager, and will close the
underlying file when the context exits.
"""
def __init__(
self,
file: Union[str, Path, BinaryIO],
mode: str = "r",
sampling_rate: int = 20000,
dtype: npt.DTypeLike = "h",
nchannels: int = 1,
**kwargs,
):
from builtins import open # noqa: UP029
# validate arguments; props are overwritten if header is read
self._dtype = np.dtype(dtype)
self._nchannels = int(nchannels)
self._framerate = int(sampling_rate)
self._file_format(self._dtype)
if hasattr(file, "read"):
self.fp = file
else:
if mode not in ("r", "r+", "w", "w+"):
raise ValueError("Invalid mode (use 'r', 'r+', 'w', 'w+')")
self.fp = open(file, mode=mode + "b")
if self.mode == "r":
self._load_header()
elif self.mode == "r+":
try:
self._load_header()
except EOFError:
# file is empty; needs header
self._write_header(sampling_rate, dtype, nchannels)
else:
self._write_header(sampling_rate, dtype, nchannels)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.__del__()
def __del__(self):
if hasattr(self, "fp") and hasattr(self.fp, "close"):
self.flush()
self.fp.close()
del self.fp
@property
def filename(self) -> str:
"""The path of the file"""
return self.fp.name
@property
def mode(self) -> str:
"""The mode for the file"""
return self.fp.mode.replace("b", "")
@property
def sampling_rate(self) -> int:
return self._framerate
@property
def nchannels(self) -> int:
return self._nchannels
@property
def nframes(self) -> int:
if hasattr(self, "_bytes_written"):
nbytes = self._bytes_written
else:
nbytes = self._data_chunk.chunksize
return nbytes // (self.dtype.itemsize * self.nchannels)
@property
def dtype(self) -> np.dtype:
"""Data storage type"""
return self._dtype
def __repr__(self) -> str:
return "<open %s.%s '%s', mode '%s', dtype '%s', sampling rate %d at %s>" % (
self.__class__.__module__,
self.__class__.__name__,
self.filename,
self.mode,
self.dtype,
self.sampling_rate,
hex(id(self)),
)
def flush(self):
"""Flushes data to disk and update header with correct size information"""
import struct
if self.mode == "r":
return
self.fp.seek(4)
self.fp.write(struct.pack(b"<L", self._data_offset + self._bytes_written - 8))
self.fp.seek(self._data_offset - 4)
self.fp.write(struct.pack(b"<L", self._bytes_written))
self.fp.flush()
return self
def read(
self,
frames: Optional[int] = None,
offset: int = 0,
memmap: Union[str, bool, None] = "c",
) -> np.ndarray:
"""Returns acoustic data from file.
By default, the returned value is a memmap of the data in
'copy-on-write' mode, which means read operations are delayed until the
data are actually accessed or modified, and changes to the memmap object
are not propagated to the disk.
For multichannel WAV files, the data are returned as a 2D
array with dimensions frames x channels
- frames: number of frames to return. None for all the frames in the file
- offset: start read at specific frame
- memmap: if False, reads the whole file into memory at once; if not, returns
a numpy.memmap object using this value as the mode argument. 'c'
corresponds to copy-on-write; use 'r+' to write changes to disk. Be
warned that 'w' modes may corrupt data. Memmap may not work with
certain input types (e.g., files in zip archives) and does not currently work
on Windows.
"""
if self.mode == "w":
raise Error("file is write-only")
if self.mode in ("r+", "w+"):
self.fp.flush()
# find offset
coff = self._data_offset + offset * self.nchannels * self._dtype.itemsize
if frames is None:
frames = self.nframes - offset
if memmap:
A = np.memmap(
self.fp,
offset=coff,
dtype=self._dtype,
mode=memmap,
shape=frames * self.nchannels,
)
else:
pos = self.fp.tell()
self.fp.seek(coff)
data = self.fp.read(frames * self.nchannels * self._dtype.itemsize)
A = np.frombuffer(data, dtype=self._dtype)
self.fp.seek(pos)
if self.nchannels > 1:
nsamples = (A.size // self.nchannels) * self.nchannels
A = A[:nsamples]
A.shape = (nsamples // self.nchannels, self.nchannels)
return A
def write(self, data: npt.ArrayLike, scale: bool = True):
"""Writes data to the WAVE file
- data : input data, in any form that can be converted to an array with
the file's dtype. Data are silently coerced into an array whose
shape matches the number of channels in the file.
- scale : if True, data are rescaled so that their maximum range matches
that of the file's encoding. If not, the raw values are
used, which can result in clipping.
"""
from numpy import asarray
if self.mode == "r":
raise Error("file is read-only")
if hasattr(self, "_postdata_chunk") and self._postdata_chunk:
raise Error("cannot append to data chunk without overwriting other chunks")
if not scale:
data = asarray(data, self._dtype)
data = rescale(data, self._dtype).tobytes()
self.fp.write(data)
self._bytes_written += len(data)
return self
def _load_header(self):
"""Reads metadata from header"""
import struct
from numpy import dtype
fp = Chunk(self.fp, bigendian=0)
if fp.getname() != b"RIFF":
raise Error("file does not start with RIFF id")
if fp.read(4) != b"WAVE":
raise Error("not a WAVE file")
self._fmt_chunk = None
self._fact_chunk = None
self._data_chunk = None
self._postdata_chunk = None
while 1:
try:
chunk = Chunk(fp, bigendian=0)
except EOFError:
break
chunkname = chunk.getname()
if chunkname == b"fmt ":
self._fmt_chunk = chunk
elif chunkname == b"fact":
self._fact_chunk = chunk
elif chunkname == b"data":
if not self._fmt_chunk:
raise Error("data chunk before fmt chunk")
self._data_chunk = chunk
elif self._data_chunk and self._fact_chunk:
# check whether a chunk is present after the data chunk to
# prevent appending data
self._postdata_chunk = chunk
chunk.skip()
if not self._fmt_chunk or not self._data_chunk:
raise Error("fmt and/or data chunk missing")
self._dtype = None
self._fmt_chunk.seek(0)
(
self._tag,
self._nchannels,
self._framerate,
nAvgBytesPerSec,
wBlockAlign,
bits,
) = struct.unpack(b"<HHLLHH", self._fmt_chunk.read(16))
# load extended block if it's there
if self._tag == WAVE_FORMAT_EXTENSIBLE:
if self._fmt_chunk.chunksize < 16:
raise Error("extensible format but no format extension")
cbSize, wValidBits, dwChannelMask, self._tag = struct.unpack(
b"<hhlH", self._fmt_chunk.read(10)
)
if self._tag == WAVE_FORMAT_PCM:
# bit size is rounded up to the nearest multiple of 8; I'm
# not going to support any format that can't be easily
# mmap'd, i.e. files that have weird container sizes (like 24)
if bits <= 8:
self._dtype = dtype("B")
elif bits <= 16:
self._dtype = dtype("<h")
elif bits <= 24:
raise Error("unsupported bit depth: %d" % bits)
elif bits <= 32:
self._dtype = dtype("<i")
elif bits == 64:
self._dtype = dtype("<l")
else:
raise Error("unsupported bit depth: %d" % bits)
elif self._tag == WAVE_FORMAT_IEEE_FLOAT:
try:
self._dtype = dtype("float%d" % bits)
except TypeError as err:
raise Error("unsupported bit depth for IEEE floats: %d" % bits) from err
else:
raise Error(f"unsupported format: {self._tag}")
self._data_offset = self._data_chunk.offset + 8
if self.mode == "r+":
self.fp.seek(0, 2)
self._bytes_written = self.fp.tell() - self._data_offset
@classmethod
def _file_format(cls, dtype):
"""Returns appropriate file format or raises an error"""
if dtype.kind == "i" or (dtype.kind == "u" and dtype.itemsize == 1):
return WAVE_FORMAT_PCM
elif dtype.kind == "f":
return WAVE_FORMAT_IEEE_FLOAT
else:
raise Error(f"unsupported type {dtype} cannot be stored in wave files")
def _write_header(self, sampling_rate, dtype, nchannels, write_fact=None):
"""Creates header for wave file based on sampling rate and data type"""
# this is a bit tricky b/c Chunk is a read-only class
# however, this only gets called for a pristine file
# we'll have to go back and patch up the sizes later
import struct
# main chunk
out = struct.pack(b"<4sl4s", b"RIFF", 0, b"WAVE")
# fmt chunk
tag = etag = self._file_format(self._dtype)
fmt_size = 16
if self._dtype.itemsize > 2 or self._nchannels > 2:
fmt_size = 40
tag = WAVE_FORMAT_EXTENSIBLE
out += struct.pack(
b"<4slHHllHH",
b"fmt ",
fmt_size,
tag,
self._nchannels,
self._framerate,
self._nchannels * self._framerate * self._dtype.itemsize,
self._nchannels * self._dtype.itemsize,
self._dtype.itemsize * 8,
)
if tag == WAVE_FORMAT_EXTENSIBLE:
out += struct.pack(
b"<HHlH14s",
22,
self._dtype.itemsize * 8,
# use the full bitdepth
(1 << self._nchannels) - 1,
etag,
b"\x00\x00\x00\x00\x10\x00\x80\x00\x00\xaa\x008\x9b\x71",
)
# fact chunk
if write_fact or (
write_fact is None
and tag in (WAVE_FORMAT_IEEE_FLOAT, WAVE_FORMAT_EXTENSIBLE)
):
out += struct.pack(b"<4sll", b"fact", 4, self._dtype.itemsize)
# beginning of data chunk
out += struct.pack(b"<4sl", b"data", 0)
self.fp.seek(0)
self.fp.write(out)
self._data_offset = self.fp.tell()
self._bytes_written = 0
open = wavfile
def rescale(data: npt.ArrayLike, tgt_dtype: npt.DTypeLike) -> np.ndarray:
"""Rescales data to the correct range for tgt_dtype.
- data: a numpy array or anything convertable into one.
- tgt_dtype: the data type of the target container
"""
from numpy import asarray, dtype, maximum, minimum
# convert to numpy array, retaining best type
data = asarray(data)
src = data.dtype
tgt = dtype(tgt_dtype)
if src == tgt:
return data
if tgt.kind == "f":
if src.kind == "f":
return data.astype(tgt)
umax = 1 << (src.itemsize * 8 - 1)
out = (data / umax).astype(tgt)
if src.kind == "u":
out -= 1.0
return out
elif src.kind == "f" and tgt.kind in ("i", "u"):
umax = 1 << (tgt.itemsize * 8 - 1)
out = data * umax
# assume positive clipping - may break on other architectures
out = minimum(maximum(out, -umax), umax - 1).astype(tgt)
elif tgt.kind in ("i", "u"):
if tgt > src:
out = data.astype(tgt) << (tgt.itemsize - src.itemsize) * 8
else:
out = (data >> (src.itemsize - tgt.itemsize) * 8).astype(tgt)
else:
raise Error(f"unsupported target type {tgt}")
if src.kind != tgt.kind and src.kind == "u" or tgt.kind == "u":
out += asarray(1, dtype=tgt) << tgt.itemsize * 8 - 1
return out
# Variables:
# End: