diff --git a/http/get_compressed/README.md b/http/get_compressed/README.md index dde6e20..c8ea47d 100644 --- a/http/get_compressed/README.md +++ b/http/get_compressed/README.md @@ -19,4 +19,168 @@ # HTTP GET Arrow Data: Compression Examples -This directory contains examples of HTTP servers/clients that transmit/receive data in the Arrow IPC streaming format and use compression (in various ways) to reduce the size of the transmitted data. +This directory contains examples of HTTP servers/clients that transmit/receive +data in the Arrow IPC streaming format and use compression (in various ways) to +reduce the size of the transmitted data. + +Since we re-use the [Arrow IPC format][ipc] for transferring Arrow data over +HTTP and both Arrow IPC and HTTP standards support compression on their own, +there are at least two approaches to this problem: + +1. Compressed HTTP responses carrying Arrow IPC streams with uncompressed + array buffers. +2. Uncompressed HTTP responses carrying Arrow IPC streams with compressed + array buffers. + +Applying both IPC buffer and HTTP compression to the same data is not +recommended. The extra CPU overhead of decompressing the data twice is +not worth any possible gains that double compression might bring. If +compression ratios are unambiguously more important than reducing CPU +overhead, then a different compression algorithm that optimizes for that can +be chosen. + +This table shows the support for different compression algorithms in HTTP and +Arrow IPC: + +| Codec | Identifier | HTTP Support | IPC Support | +|----------- | ----------- | ------------- | ------------ | +| GZip | `gzip` | X | | +| DEFLATE | `deflate` | X | | +| Brotli | `br` | X[^2] | | +| Zstandard | `zstd` | X[^2] | X[^3] | +| LZ4 | `lz4` | | X[^3] | + +Since not all Arrow IPC implementations support compression, HTTP compression +based on accepted formats negotiated with the client is a great way to increase +the chances of efficient data transfer. + +Servers may check the `Accept-Encoding` header of the client and choose the +compression format in this order of preference: `zstd`, `br`, `gzip`, +`identity` (no compression). If the client does not specify a preference, the +only constraint on the server is the availability of the compression algorithm +in the server environment. + +## Arrow IPC Compression + +When IPC buffer compression is preferred and servers can't assume all clients +support it[^4], clients may be asked to explicitly list the supported compression +algorithms in the request headers. The `Accept` header can be used for this +since `Accept-Encoding` (and `Content-Encoding`) is used to control compression +of the entire HTTP response stream and instruct HTTP clients (like browsers) to +decompress the response before giving data to the application or saving the +data. + + Accept: application/vnd.apache.arrow.stream; codecs="zstd, lz4" + +This is similar to clients requesting video streams by specifying the +container format and the codecs they support +(e.g. `Accept: video/webm; codecs="vp8, vorbis"`). + +The server is allowed to choose any of the listed codecs, or not compress the +IPC buffers at all. Uncompressed IPC buffers should always be acceptable by +clients. + +If a server adopts this approach and a client does not specify any codecs in +the `Accept` header, the server can fall back to checking `Accept-Encoding` +header to pick a compression algorithm for the entire HTTP response stream. + +To make debugging easier servers may include the chosen compression codec(s) +in the `Content-Type` header of the response (quotes are optional): + + Content-Type: application/vnd.apache.arrow.stream; codecs=zstd + +This is not necessary for correct decompression because the payload already +contains information that tells the IPC reader how to decompress the buffers, +but it can help developers understand what is going on. + +When programatically checking if the `Content-Type` header contains a specific +format, it is important to use a parser that can handle parameters or look +only at the media type part of the header. This is not an exclusivity of the +Arrow IPC format, but a general rule for all media types. For example, +`application/json; charset=utf-8` should match `application/json`. + +When considering use of IPC buffer compression, check the [IPC format section of +the Arrow Implementation Status page][^5] to see whether the the Arrow +implementations you are targeting support it. + +## HTTP/1.1 Response Compression + +HTTP/1.1 offers an elaborate way for clients to specify their preferred +content encoding (read compression algorithm) using the `Accept-Encoding` +header.[^1] + +At least the Python server (in [`python/`](./python)) implements a fully +compliant parser for the `Accept-Encoding` header. Application servers may +choose to implement a simpler check of the `Accept-Encoding` header or assume +that the client accepts the chosen compression scheme when talking to that +server. + +Here is an example of a header that a client may send and what it means: + + Accept-Encoding: zstd;q=1.0, gzip;q=0.5, br;q=0.8, identity;q=0 + +This header says that the client prefers that the server compress the +response with `zstd`, but if that is not possible, then `brotli` and `gzip` +are acceptable (in that order because 0.8 is greater than 0.5). The client +does not want the response to be uncompressed. This is communicated by +`"identity"` being listed with `q=0`. + +To tell the server the client only accepts `zstd` responses and nothing +else, not even uncompressed responses, the client would send: + + Accept-Encoding: zstd, *;q=0 + +RFC 2616[^1] specifies the rules for how a server should interpret the +`Accept-Encoding` header: + + A server tests whether a content-coding is acceptable, according to + an Accept-Encoding field, using these rules: + + 1. If the content-coding is one of the content-codings listed in + the Accept-Encoding field, then it is acceptable, unless it is + accompanied by a qvalue of 0. (As defined in section 3.9, a + qvalue of 0 means "not acceptable.") + + 2. The special "*" symbol in an Accept-Encoding field matches any + available content-coding not explicitly listed in the header + field. + + 3. If multiple content-codings are acceptable, then the acceptable + content-coding with the highest non-zero qvalue is preferred. + + 4. The "identity" content-coding is always acceptable, unless + specifically refused because the Accept-Encoding field includes + "identity;q=0", or because the field includes "*;q=0" and does + not explicitly include the "identity" content-coding. If the + Accept-Encoding field-value is empty, then only the "identity" + encoding is acceptable. + +If you're targeting web browsers, check the compatibility table of [compression +algorithms on MDN Web Docs][^2]. + +Another important rule is that if the server compresses the response, it +must include a `Content-Encoding` header in the response. + + If the content-coding of an entity is not "identity", then the + response MUST include a Content-Encoding entity-header (section + 14.11) that lists the non-identity content-coding(s) used. + +Since not all servers implement the full `Accept-Encoding` header parsing logic, +clients tend to stick to simple header values like `Accept-Encoding: identity` +when no compression is desired, and `Accept-Encoding: gzip, deflate, zstd, br` +when the client supports different compression formats and is indifferent to +which one the server chooses. Clients should expect uncompressed responses as +well in theses cases. The only way to force a "406 Not Acceptable" response when +no compression is available is to send `identity;q=0` or `*;q=0` somewhere in +the end of the `Accept-Encoding` header. But that relies on the server +implementing the full `Accept-Encoding` handling logic. + + +[^1]: [Fielding, R. et al. (1999). HTTP/1.1. RFC 2616, Section 14.3 Accept-Encoding.](https://www.rfc-editor.org/rfc/rfc2616#section-14.3) +[^2]: [MDN Web Docs: Accept-Encoding](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Encoding#browser_compatibility) +[^3]: [Arrow Columnar Format: Compression](https://arrow.apache.org/docs/format/Columnar.html#compression) +[^4]: Web applications using the JavaScript Arrow implementation don't have + access to the compression APIs to decompress `zstd` and `lz4` IPC buffers. +[^5]: [Arrow Implementation Status: IPC Format](https://arrow.apache.org/docs/status.html#ipc-format) + +[ipc]: https://arrow.apache.org/docs/format/Columnar.html#serialization-and-interprocess-communication-ipc diff --git a/http/get_compressed/curl/client/README.md b/http/get_compressed/curl/client/README.md new file mode 100644 index 0000000..6694ce0 --- /dev/null +++ b/http/get_compressed/curl/client/README.md @@ -0,0 +1,80 @@ + + +# HTTP GET Arrow Data: Compressed Arrow Data Examples + +This directory contains a simple `curl` script that issues multiple HTTP GET +requests to the server implemented in the parent directory, negotiating +different compression algorithms for the Arrow IPC stream data piping the output +to different files with extensions that indicate the compression algorithm used. + +To run this example, first start one of the server examples in the parent +directory, then run the `client.sh` script. + +You can check all the sizes with a simple command: + +```bash +$ du -sh out* | sort -gr +816M out.arrows +804M out_from_chunked.arrows +418M out_from_chunked.arrows+lz4 +405M out.arrows+lz4 +257M out.arrows.gz +256M out_from_chunked.arrows.gz +229M out_from_chunked.arrows+zstd +229M out.arrows+zstd +220M out.arrows.zstd +219M out_from_chunked.arrows.zstd + 39M out_from_chunked.arrows.br + 38M out.arrows.br +``` + +> [!WARNING] +> Better compression is not the only relevant metric as it might come with a +> trade-off in terms of CPU usage. The best compression algorithm for your use +> case will depend on your specific requirements. + +## Meaning of the file extensions + +Files produced by HTTP/1.0 requests are not chunked, they get buffered in memory +at the server before being sent to the client. If compressed, they end up +slightly smaller than the results of chunked responses, but the extra delay for +first byte is not worth it in most cases. + + - `out.arrows` (Uncompressed) + - `out.arrows.gz` (Gzip HTTP compression) + - `out.arrows.zstd` (Zstandard HTTP compression) + - `out.arrows.br` (Brotli HTTP compression) + + - `out.arrows+zstd` (Zstandard IPC compression) + - `out.arrows+lz4` (LZ4 IPC compression) + +HTTP/1.1 requests are returned by the server with `Transfer-Encoding: chunked` +to send the data in smaller chunks that are sent to the socket as soon as they +are ready. This is useful for large responses that take a long time to generate +at the cost of a small overhead caused by the independent compression of each +chunk. + + - `out_from_chunked.arrows` (Uncompressed) + - `out_from_chunked.arrows.gz` (Gzip HTTP compression) + - `out_from_chunked.arrows.zstd` (Zstandard HTTP compression) + - `out_from_chunked.arrows.br` (Brotli HTTP compression) + + - `out_from_chunked.arrows+lz4` (LZ4 IPC compression) + - `out_from_chunked.arrows+zstd` (Zstandard IPC compression) diff --git a/http/get_compressed/curl/client/client.sh b/http/get_compressed/curl/client/client.sh new file mode 100755 index 0000000..1706953 --- /dev/null +++ b/http/get_compressed/curl/client/client.sh @@ -0,0 +1,46 @@ +#!/bin/sh + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +CURL="curl --verbose" +URI="http://localhost:8008" +OUT_HTTP1=out.arrows +OUT_CHUNKED=out_from_chunked.arrows + +# HTTP/1.0 means that response is not chunked and not compressed... +$CURL --http1.0 -o $OUT_HTTP1 $URI +# ...but it may be compressed with an explicitly set Accept-Encoding +# header +$CURL --http1.0 -H "Accept-Encoding: gzip, *;q=0" -o $OUT_HTTP1.gz $URI +$CURL --http1.0 -H "Accept-Encoding: zstd, *;q=0" -o $OUT_HTTP1.zstd $URI +$CURL --http1.0 -H "Accept-Encoding: br, *;q=0" -o $OUT_HTTP1.br $URI +# ...or with IPC buffer compression if the Accept header specifies codecs. +$CURL --http1.0 -H "Accept: application/vnd.apache.arrow.stream; codecs=\"zstd, lz4\"" -o $OUT_HTTP1+zstd $URI +$CURL --http1.0 -H "Accept: application/vnd.apache.arrow.stream; codecs=lz4" -o $OUT_HTTP1+lz4 $URI + +# HTTP/1.1 means compression is on by default... +# ...but it can be refused with the Accept-Encoding: identity header. +$CURL -H "Accept-Encoding: identity" -o $OUT_CHUNKED $URI +# ...with gzip if no Accept-Encoding header is set. +$CURL -o $OUT_CHUNKED.gz $URI +# ...or with the compression algorithm specified in the Accept-Encoding. +$CURL -H "Accept-Encoding: zstd, *;q=0" -o $OUT_CHUNKED.zstd $URI +$CURL -H "Accept-Encoding: br, *;q=0" -o $OUT_CHUNKED.br $URI +# ...or with IPC buffer compression if the Accept header specifies codecs. +$CURL -H "Accept: application/vnd.apache.arrow.stream; codecs=\"zstd, lz4\"" -o $OUT_CHUNKED+zstd $URI +$CURL -H "Accept: application/vnd.apache.arrow.stream; codecs=lz4" -o $OUT_CHUNKED+lz4 $URI diff --git a/http/get_compressed/python/client/README.md b/http/get_compressed/python/client/README.md new file mode 100644 index 0000000..1285a74 --- /dev/null +++ b/http/get_compressed/python/client/README.md @@ -0,0 +1,32 @@ + + +# HTTP GET Arrow Data: Compressed Arrow Data Examples + +This directory contains an HTTP client implemented in Python that issues multiple +requests to one of the server examples implemented in the parent directory, +negotiating different compression algorithms for the Arrow IPC stream data. + +To run this example, first start one of the compressed server examples in the +parent directory, then: + +```sh +pip install pyarrow +python client.py +``` diff --git a/http/get_compressed/python/client/client.py b/http/get_compressed/python/client/client.py new file mode 100644 index 0000000..d09aeb0 --- /dev/null +++ b/http/get_compressed/python/client/client.py @@ -0,0 +1,96 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import urllib.request +import pyarrow as pa +import time + +URI = "http://localhost:8008" +ARROW_STREAM_FORMAT = "application/vnd.apache.arrow.stream" + + +def make_request(uri, compression): + coding = "identity" if compression.startswith("identity") else compression + # urllib.request.urlopen() always sends an HTTP/1.1 request + # with Accept-Encoding: identity, so we need to setup a request + # object with custom headers to request a specific compression + headers = { + "Accept-Encoding": f"{coding}, *;q=0", + } + if compression.startswith("identity+"): + # request IPC buffer compression instead of HTTP compression + ipc_codec = compression.split("+")[1] + headers["Accept"] = f'{ARROW_STREAM_FORMAT};codecs="{ipc_codec}"' + request = urllib.request.Request(uri, headers=headers) + + response = urllib.request.urlopen(request) + content_type = response.headers["Content-Type"] + if not content_type.startswith(ARROW_STREAM_FORMAT): + raise ValueError(f"Expected {ARROW_STREAM_FORMAT}, got {content_type}") + if compression.startswith("identity"): + return response + # IANA nomenclature for Brotli is "br" and not "brotli" + compression = "brotli" if compression == "br" else compression + return pa.CompressedInputStream(response, compression) + + +def request_and_process(uri, compression): + batches = [] + log_prefix = f"{'[' + compression + ']':>10}:" + print( + f"{log_prefix} Requesting data from {uri} with `{compression}` compression strategy." + ) + start_time = time.time() + response = make_request(uri, compression) + with pa.ipc.open_stream(response) as reader: + schema = reader.schema + time_to_schema = time.time() - start_time + try: + batch = reader.read_next_batch() + time_to_first_batch = time.time() - start_time + batches.append(batch) + while True: + batch = reader.read_next_batch() + batches.append(batch) + except StopIteration: + pass + processing_time = time.time() - start_time + reader_stats = reader.stats + print( + f"{log_prefix} Schema received in {time_to_schema:.3f} seconds." + f" schema=({', '.join(schema.names)})." + ) + print( + f"{log_prefix} First batch received and processed in" + f" {time_to_first_batch:.3f} seconds" + ) + print( + f"{log_prefix} Processing of all batches completed in" + f" {processing_time:.3f} seconds." + ) + print(f"{log_prefix}", reader_stats) + return batches + + +# HTTP compression +request_and_process(URI, "identity") +request_and_process(URI, "zstd") +request_and_process(URI, "br") +request_and_process(URI, "gzip") +# using IPC buffer compression instead of HTTP compression +request_and_process(URI, "identity+zstd") +request_and_process(URI, "identity+lz4") diff --git a/http/get_compressed/python/server/README.md b/http/get_compressed/python/server/README.md new file mode 100644 index 0000000..cf4bed7 --- /dev/null +++ b/http/get_compressed/python/server/README.md @@ -0,0 +1,32 @@ + + + +# HTTP GET Arrow Data: Compressed Arrow Data Examples + +This directory contains an example of an HTTP server implemented in Python +able to serve Arrow IPC streams compressed with different algorithms negotiated +with the client via different standard HTTP headers. + +To run this example: + +```sh +pip install pyarrow +python server.py +``` diff --git a/http/get_compressed/python/server/server.py b/http/get_compressed/python/server/server.py new file mode 100644 index 0000000..c82c551 --- /dev/null +++ b/http/get_compressed/python/server/server.py @@ -0,0 +1,564 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from random import choice, randint +from http.server import BaseHTTPRequestHandler, HTTPServer +import io +import pyarrow as pa +import pyarrow.compute as pc +import re +import socketserver +import string + +# use dictionary encoding for the ticker column +USE_DICTIONARY_ENCODING = True + + +def random_string(alphabet, length): + return "".join(choice(alphabet) for _ in range(length)) + + +def random_name(initial): + length = randint(3, 7) + return initial + random_string(string.ascii_lowercase, length) + + +def example_tickers(num_tickers): + tickers = [] + while len(tickers) < num_tickers: + length = randint(3, 4) + random_ticker = random_string(string.ascii_uppercase, length) + if random_ticker not in tickers: + tickers.append(random_ticker) + return tickers + + +the_ticker_type = ( + pa.dictionary(pa.int32(), pa.utf8()) if USE_DICTIONARY_ENCODING else pa.utf8() +) +the_schema = pa.schema( + [ + ("ticker", the_ticker_type), + ("price", pa.int64()), + ("volume", pa.int64()), + ] +) + + +def example_batch(tickers, length): + ticker_indices = [] + price = [] + volume = [] + for _ in range(length): + ticker_indices.append(randint(0, len(tickers) - 1)) + price.append(randint(1, 1000) * 100) + volume.append(randint(1, 10000)) + ticker = ( + pa.DictionaryArray.from_arrays(ticker_indices, tickers) + if USE_DICTIONARY_ENCODING + else pc.take(tickers, ticker_indices, boundscheck=False) + ) + return pa.RecordBatch.from_arrays([ticker, price, volume], schema=the_schema) + + +def example_batches(tickers): + # these parameters are chosen to generate a response + # of ~1 GB and chunks of ~140 KB (uncompressed) + total_records = 42_000_000 + batch_len = 6 * 1024 + # all the batches sent are random slices of the larger base batch + base_batch = example_batch(tickers, length=8 * batch_len) + batches = [] + records = 0 + while records < total_records: + length = min(batch_len, total_records - records) + offset = randint(0, base_batch.num_rows - length - 1) + batch = base_batch.slice(offset, length) + batches.append(batch) + records += length + return batches + + +# end of example data generation + +# what the HTTP spec calls a token (any character except CTLs or separators) +TOKEN_RE = r"(?:[A-Za-z0-9!#$%&'*+./^_`|~-]+)" +# [L]inear [W]hite [S]pace pattern (HTTP/1.1 - RFC 2616) +LWS_RE = r"(?:[ \t]|\r\n[ \t]+)*" +TOKENIZER_PAT = re.compile( + f"(?P{TOKEN_RE})" + r'|(?P"([^"\\]|\\.)*")' # a quoted string (escaped pairs allowed) + r"|(?P,)" + r"|(?P;)" + r"|(?P=)" + f"|(?P{LWS_RE})" # LWS is skipped + r"|(?P.+)", + flags=re.ASCII, # HTTP headers are encoded in ASCII +) + + +def parse_header_value(header_name, header_value): + """ + Parse the Accept or Accept-Encoding request header values. + + Returns + ------- + list of (str, dict) + The list of lowercase tokens and their parameters in the order they + appear in the header. The parameters are stored in a dictionary where + the keys are the parameter names and the values are the parameter + values. If a parameter is not followed by an equal sign and a value, + the value is None. + """ + + def unexpected(label, value): + msg = f"Malformed {header_name} header: unexpected {label} at {value!r}" + return ValueError(msg) + + def tokenize(): + for mo in re.finditer(TOKENIZER_PAT, header_value): + kind = mo.lastgroup + if kind == "SKIP": + continue + elif kind == "MISMATCH": + raise unexpected("character", mo.group()) + yield (kind, mo.group()) + + tokens = tokenize() + + def expect(expected_kind): + kind, text = next(tokens) + if kind != expected_kind: + raise unexpected("token", text) + return text + + accepted = [] + while True: + try: + name, params = None, {} + name = expect("TOK").lower() + kind, text = next(tokens) + while True: + if kind == "COMMA": + accepted.append((name, params)) + break + if kind == "SEMI": + ident = expect("TOK") + params[ident] = None # init param to None + kind, text = next(tokens) + if kind != "EQ": + continue + kind, text = next(tokens) + if kind in ["TOK", "QUOTED"]: + if kind == "QUOTED": + text = text[1:-1] # remove the quotes + params[ident] = text # set param to value + kind, text = next(tokens) + continue + raise unexpected("token", text) + except StopIteration: + break + if name is not None: + # any unfinished ;param=value sequence or trailing separators are ignored + accepted.append((name, params)) + return accepted + + +ARROW_STREAM_FORMAT = "application/vnd.apache.arrow.stream" + + +def pick_ipc_codec(accept_header, available, default): + """ + Pick the IPC stream codec according to the Accept header. + + This is used when deciding which codec to use for compression of IPC buffer + streams. This is a feature of the Arrow IPC stream format and is different + from the HTTP content-coding used to compress the entire HTTP response. + + This is how a client may specify the IPC buffer compression codecs it + accepts: + + Accept: application/vnd.apache.arrow.stream; codecs="zstd, lz4" + + Parameters + ---------- + accept_header : str|None + The value of the Accept header from an HTTP request. + available : list of str + The codecs that the server can provide in the order preferred by the + server. Example: ["zstd", "lz4"]. + default : str|None + The codec to use if the client does not specify the ";codecs" parameter + in the Accept header. + + Returns + ------- + str|None + The codec that the server should use to compress the IPC buffer stream. + None if the client does not accept any of the available codecs + explicitly listed. ;codecs="" means no codecs are accepted. + If the client does not specify the codecs parameter, the default codec + is returned. + """ + did_specify_codecs = False + accepted_codecs = [] + if accept_header is not None: + accepted = parse_header_value("Accept", accept_header) + for media_range, params in accepted: + if ( + media_range == "*/*" + or media_range == "application/*" + or media_range == ARROW_STREAM_FORMAT + ): + did_specify_codecs = "codecs" in params + codecs_str = params.get("codecs") + if codecs_str is None: + continue + for codec in codecs_str.split(","): + accepted_codecs.append(codec.strip()) + + for codec in available: + if codec in accepted_codecs: + return codec + return None if did_specify_codecs else default + + +def pick_coding(accept_encoding_header, available): + """ + Pick the content-coding according to the Accept-Encoding header. + + This is used when using HTTP response compression instead of IPC buffer + compression. + + Parameters + ---------- + accept_encoding_header : str + The value of the Accept-Encoding header from an HTTP request. + available : list of str + The content-codings that the server can provide in the order preferred + by the server. Example: ["zstd", "br", "gzip"]. + + Returns + ------- + str + The content-coding that the server should use to compress the response. + "identity" is returned if no acceptable content-coding is found in the + list of available codings. + + None if the client does not accept any of the available content-codings + and doesn't accept "identity" (uncompressed) either. In this case, + a "406 Not Acceptable" response should be sent. + """ + accepted = parse_header_value("Accept-Encoding", accept_encoding_header) + + def qvalue_or(params, default): + qvalue = params.get("q") + if qvalue is not None: + try: + return float(qvalue) + except ValueError: + raise ValueError(f"Invalid qvalue in Accept-Encoding header: {qvalue}") + return default + + if "identity" not in available: + available = available + ["identity"] + state = {} + for coding, params in accepted: + qvalue = qvalue_or(params, 0.0001 if coding == "identity" else 1.0) + if coding == "*": + for coding in available: + if coding not in state: + state[coding] = qvalue + elif coding in available: + state[coding] = qvalue + # "identity" is always acceptable unless explicitly refused (;q=0) + if "identity" not in state: + state["identity"] = 0.0001 + # all the candidate codings are now in the state dictionary and we + # have to consider only the ones that have the maximum qvalue + max_qvalue = max(state.values()) + if max_qvalue == 0.0: + return None + for coding in available: + if coding in state and state[coding] == max_qvalue: + return coding + return None + + +def pick_compression(headers, available_ipc_codecs, available_codings, default): + """ + Pick the compression strategy based on the Accept and Accept-Encoding headers. + + Parameters + ---------- + headers : dict + The HTTP request headers. + available_ipc_codecs : list of str + The codecs that the server can provide for IPC buffer compression. + available_codings : list of str + The content-codings that the server can provide for HTTP response + compression. + default : str + The default compression strategy to use if the client does explicitly + choose. + + Returns + ------- + str|None + The compression strategy to use. It can be one of the following: + "identity": no compression at all. + "identity+zstd": No HTTP compression + IPC buffer compression with Zstd. + "identity+lz4": No HTTP compression + IPC buffer compression with LZ4. + "zstd", "br", "gzip", ...: HTTP compression without IPC buffer compression. + None means a "406 Not Acceptable" response should be sent. + """ + accept = headers.get("Accept") + ipc_codec = pick_ipc_codec(accept, available_ipc_codecs, default=None) + if ipc_codec is None: + accept_encoding = headers.get("Accept-Encoding") + return ( + default + if accept_encoding is None + else pick_coding(accept_encoding, available_codings) + ) + return "identity+" + ipc_codec + + +class LateClosingBytesIO(io.BytesIO): + """ + BytesIO that does not close on close(). + + When a stream wrapping a a file-like object is closed, the underlying + file-like object is also closed. This subclass prevents that from + happening by overriding the close method. + + If we close a RecordBatchStreamWriter wrapping a BytesIO object, we want + to be able to create a memory view of the buffer. But that is only possible + if the BytesIO object is not closed yet. + """ + + def close(self): + pass + + def close_now(self): + super().close() + + +class SocketWriterSink(socketserver._SocketWriter): + """Wrapper to make wfile usable as a sink for Arrow stream writing.""" + + def __init__(self, wfile): + self.wfile = wfile + + def writable(self): + return True + + def write(self, b): + self.wfile.write(b) + + def fileno(self): + return self._sock.fileno() + + def close(self): + """Do nothing so Arrow stream wrappers don't close the socket.""" + pass + + +def generate_chunk_buffers(schema, source, compression): + # the sink holds the buffer and we give a view of it to the caller + with LateClosingBytesIO() as sink: + # keep buffering until we have at least MIN_BUFFER_SIZE bytes + # in the buffer before yielding it to the caller. Setting it + # to 1 means we yield as soon as the compression blocks are + # formed and reach the sink buffer. + MIN_BUFFER_SIZE = 64 * 1024 + if compression.startswith("identity"): + if compression == "identity+zstd": + options = pa.ipc.IpcWriteOptions(compression="zstd") + elif compression == "identity+lz4": + options = pa.ipc.IpcWriteOptions(compression="lz4") + else: + options = None + # source: RecordBatchReader + # |> writer: RecordBatchStreamWriter + # |> sink: LateClosingBytesIO + writer = pa.ipc.new_stream(sink, schema, options=options) + for batch in source: + writer.write_batch(batch) + if sink.tell() >= MIN_BUFFER_SIZE: + sink.truncate() + with sink.getbuffer() as buffer: + yield buffer + sink.seek(0) + + writer.close() # write EOS marker and flush + else: + compression = "brotli" if compression == "br" else compression + with pa.CompressedOutputStream(sink, compression) as compressor: + # has the first buffer been yielded already? + sent_first = False + # source: RecordBatchReader + # |> writer: RecordBatchStreamWriter + # |> compressor: CompressedOutputStream + # |> sink: LateClosingBytesIO + writer = pa.ipc.new_stream(compressor, schema) + for batch in source: + writer.write_batch(batch) + # we try to yield a buffer ASAP no matter how small + if not sent_first and sink.tell() == 0: + compressor.flush() + pos = sink.tell() + if pos >= MIN_BUFFER_SIZE or (not sent_first and pos >= 1): + sink.truncate() + with sink.getbuffer() as buffer: + yield buffer + sink.seek(0) + sent_first = True + + writer.close() # write EOS marker and flush + compressor.close() + + sink.truncate() + with sink.getbuffer() as buffer: + yield buffer + sink.close_now() + + +AVAILABLE_IPC_CODECS = ["zstd", "lz4"] +"""List of available codecs Arrow IPC buffer compression.""" + +AVAILABLE_CODINGS = ["zstd", "br", "gzip"] +""" +List of available content-codings as used in HTTP. + +Note that Arrow stream classes refer to Brotli as "brotli" and not "br". +""" + + +class MyRequestHandler(BaseHTTPRequestHandler): + """ + Response handler for a simple HTTP server. + + This HTTP request handler serves a compressed HTTP response with an Arrow + stream in it or a (TODO) compressed Arrow stream in a uncompressed HTTP + response. + + The Arrow data is randomly generated "trading data" with a schema consisting + of a ticker, price (in cents), and volume. + """ + + def _resolve_batches(self): + return pa.RecordBatchReader.from_batches(the_schema, all_batches) + + def _send_not_acceptable(self, parsing_error=None): + self.send_response(406, "Not Acceptable") + self.send_header("Content-Type", "text/plain") + self.end_headers() + if parsing_error: + message = f"Error parsing header: {parsing_error}\n" + else: + message = "None of the available codings are accepted by this client.\n" + accept = self.headers.get("Accept") + if accept is not None: + message += f"`Accept` header was {accept!r}.\n" + accept_encoding = self.headers.get("Accept-Encoding") + if accept_encoding is not None: + message += f"`Accept-Encoding` header was {accept_encoding!r}.\n" + self.wfile.write(bytes(message, "utf-8")) + + def do_GET(self): + # HTTP/1.0 requests don't get chunked responses + if self.request_version == "HTTP/1.0": + self.protocol_version = "HTTP/1.0" + chunked = False + else: + self.protocol_version = "HTTP/1.1" + chunked = True + + # if client's intent cannot be derived from the headers, return + # uncompressed data for HTTP/1.0 requests and compressed data for + # HTTP/1.1 requests with the safest compression format choice: "gzip". + default_compression = ( + "identity" + if self.request_version == "HTTP/1.0" or ("gzip" not in AVAILABLE_CODINGS) + else "gzip" + ) + try: + compression = pick_compression( + self.headers, + AVAILABLE_IPC_CODECS, + AVAILABLE_CODINGS, + default_compression, + ) + if compression is None: + self._send_not_acceptable() + return + except ValueError as e: + self._send_not_acceptable(str(e)) + return + + ### in a real application the data would be resolved from a database or + ### another source like a file and error handling would be done here + ### before the 200 OK response starts being sent to the client. + source = self._resolve_batches() + + self.send_response(200) + ### set these headers if testing with a local browser-based client: + # self.send_header('Access-Control-Allow-Origin', 'http://localhost:8008') + # self.send_header('Access-Control-Allow-Methods', 'GET') + # self.send_header('Access-Control-Allow-Headers', 'Content-Type') + self.send_header( + "Content-Type", + ( + f"{ARROW_STREAM_FORMAT}; codecs={compression[9:]}" + if compression.startswith("identity+") + else ARROW_STREAM_FORMAT + ), + ) + # suggest a default filename in case this response is saved by the user + self.send_header("Content-Disposition", r'attachment; filename="output.arrows"') + + if not compression.startswith("identity"): + self.send_header("Content-Encoding", compression) + if chunked: + self.send_header("Transfer-Encoding", "chunked") + self.end_headers() + for buffer in generate_chunk_buffers(the_schema, source, compression): + self.wfile.write(f"{len(buffer):X}\r\n".encode("utf-8")) + self.wfile.write(buffer) + self.wfile.write("\r\n".encode("utf-8")) + self.wfile.write("0\r\n\r\n".encode("utf-8")) + else: + self.end_headers() + sink = SocketWriterSink(self.wfile) + for buffer in generate_chunk_buffers(the_schema, source, compression): + sink.write(buffer) + + +print("Generating example data...") + +all_tickers = example_tickers(60) +all_batches = example_batches(all_tickers) + +server_address = ("localhost", 8008) +try: + httpd = HTTPServer(server_address, MyRequestHandler) + print(f"Serving on {server_address[0]}:{server_address[1]}...") + httpd.serve_forever() +except KeyboardInterrupt: + print("Shutting down server") + httpd.socket.close() diff --git a/http/get_simple/python/client/urllib.request/client.py b/http/get_simple/python/client/urllib.request/client.py index 1d5d198..a2f24de 100644 --- a/http/get_simple/python/client/urllib.request/client.py +++ b/http/get_simple/python/client/urllib.request/client.py @@ -25,7 +25,7 @@ response = urllib.request.urlopen('http://localhost:8008') content_type = response.headers['Content-Type'] -if content_type != ARROW_STREAM_FORMAT: +if not content_type.startswith(ARROW_STREAM_FORMAT): raise ValueError(f"Expected {ARROW_STREAM_FORMAT}, got {content_type}") batches = []