Skip to content

Commit

Permalink
Unify ca_path and ca_file configuration parameters
Browse files Browse the repository at this point in the history
  • Loading branch information
davisp committed Jul 20, 2023
1 parent 3467385 commit e517a90
Show file tree
Hide file tree
Showing 18 changed files with 909 additions and 58 deletions.
2 changes: 2 additions & 0 deletions scripts/ci/posix/build-services-stop.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
# GCS emulator
if [[ "$TILEDB_CI_BACKEND" == "GCS" ]] && [[ "$GCS_PID" ]]; then
kill -9 "$GCS_PID" || true # failure to stop should not fail job
kill -9 "$GCS_SSL_PID" || true
fi

# Azure emulator
if [[ "$TILEDB_CI_BACKEND" = "AZURE" ]] && [[ "$AZURITE_PID" ]]; then
# Kill the running Azurite server
kill -n 9 "$AZURITE_PID" || true
kill -n 9 "$AZURITE_SSL_PID" || true
fi

if [[ "$TILEDB_CI_BACKEND" == "S3" ]] && [[ "$TILEDB_CI_OS" == "macOS" ]]; then
Expand Down
11 changes: 11 additions & 0 deletions scripts/run-azurite.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
set -xe

# Starts an Azurite server

Expand All @@ -38,11 +39,21 @@ run_azurite() {
export AZURITE_PID=$!
}

run_azurite_ssl_proxy() {
$DIR/run-ssl-proxy.py \
--source-port 10001 \
--target-port 10000 \
--public-certificate /tmp/azurite-data/test_certs/public.crt \
--private-key /tmp/azurite-data/test_certs/private.key &
export AZURITE_SSL_PID=$!
}

run() {
mkdir -p /tmp/azurite-data
cp -f -r $DIR/../test/inputs/test_certs /tmp/azurite-data

run_azurite
run_azurite_ssl_proxy
}

run
18 changes: 15 additions & 3 deletions scripts/run-gcs-emu.sh
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@
# This script should be sourced from tiledb/build folder
set -xe

export_gcs_env(){
export CLOUD_STORAGE_EMULATOR_ENDPOINT=http://localhost:9000 # For JSON and XML API
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"

export_gcs_env() {
export TILEDB_TEST_GCS_ENDPOINT=http://localhost:9000 # For JSON and XML API
}

run_gcs(){
run_gcs() {
pushd .
source /tmp/storage-testbench-venv/bin/activate
cd /tmp/storage-testbench
Expand All @@ -43,10 +45,20 @@ run_gcs(){
popd
}

run_gcs_ssl_proxy() {
$DIR/run-ssl-proxy.py \
--source-port 9001 \
--target-port 9000 \
--public-certificate $DIR/../test/inputs/test_certs/public.crt \
--private-key $DIR/../test/inputs/test_certs/private.key &
export GCS_SSL_PID=$!
}

run() {
export_gcs_env

run_gcs
run_gcs_ssl_proxy
}

run
151 changes: 151 additions & 0 deletions scripts/run-ssl-proxy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
#!/usr/bin/env python3

import argparse as ap
import os
import queue
import select
import socket
import ssl
import sys
import threading

NUM_THREADS = os.cpu_count()

def log(*args):
print(*args, flush=True)

class Session(object):
def __init__(self, sock, addr, dst_port):
self.src_sock = sock
self.addr = addr
self.dst_port = dst_port
self.dst_sock = None

def close(self):
try:
self.src_sock.close()
except:
pass
try:
self.dst_sock.close()
except:
pass

def forward(self, from_sock, to_sock):
data = from_sock.recv(1024)
if not data:
self.close()
return False
to_sock.sendall(data)
return True

def run(self):
self.dst_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.dst_sock.connect(('127.0.0.1', self.dst_port))
sockets = [self.src_sock, self.dst_sock]
while True:
(rlist, wlist, xlist) = select.select(sockets, [], sockets)
if xlist:
self.close()
return
for r in rlist:
if r == self.src_sock:
if not self.forward(self.src_sock, self.dst_sock):
return
elif r == self.dst_sock:
if not self.forward(self.dst_sock, self.src_sock):
return
else:
self.close()
raise RuntimeError("Unknown socket: {}".format(r))

def handle_clients(conn_queue):
while True:
session = conn_queue.get()
if session is None:
return
try:
session.run()
except Exception as e:
log("Error handling client: {}".format(e))
finally:
log("Client disconnected: {}".format(session.addr))

def run_proxy(cfg):
ssl_cert = cfg.public_certificate
ssl_key = cfg.private_key
if not os.path.isfile(ssl_cert):
log("Missing public certificate: {}".format(ssl_cert))
exit(3)
if not os.path.isfile(ssl_key):
log("Missing private key: {}".format(ssl_key))
exit(3)
ssl_ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
ssl_ctx.load_cert_chain(ssl_cert, ssl_key)

conn_queue = queue.Queue()

threads = []
for _ in range(NUM_THREADS):
t = threading.Thread(target=handle_clients, args=(conn_queue,))
t.daemon = True
t.start()
threads.append(t)

addr = ("None", 0)
with socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) as listener:
listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
listener.bind(('127.0.0.1', cfg.source_port))
listener.listen(1024)
msg = "SSL Proxy Server listening at https://127.0.0.1:{}"
log(msg.format(cfg.source_port))
with ssl_ctx.wrap_socket(listener, server_side=True) as ssock:
while True:
try:
conn, addr = ssock.accept()
log("Client connected: {}".format(addr))
conn_queue.put(Session(conn, addr, cfg.target_port))
except Exception as e:
log("Error creationg session for {} : {}".format(addr, e))

def parse_args():
parser = ap.ArgumentParser(
prog = "run-ssl-proxy.py",
description = "A simple SSL Proxy - Not for Production Use"
)
parser.add_argument("-s", "--source-port", type=int,
help = "Source port on which to accept connections")
parser.add_argument("-d", "--target-port", type=int,
help = "Target port to proxy connections to")
parser.add_argument("-c", "--public-certificate",
help = "The server public certificate to use")
parser.add_argument("-k", "--private-key",
help = "The server private key to use")
args = parser.parse_args()

if args.source_port is None:
log("Missing source port")
exit(1)
if args.target_port is None:
log("Missing target port")
exit(1)
if args.public_certificate is None:
log("Missing public certificate")
exit(1)
if args.private_key is None:
log("Missing private key")
exit(1)

return args

def main():
log("SSL Proxy Initializing...")
try:
run_proxy(parse_args())
except KeyboardInterrupt:
pass
finally:
log("SSL Proxy Shutting Down")

if __name__ == "__main__":
main()
1 change: 1 addition & 0 deletions test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ set(TILEDB_UNIT_TEST_SOURCES
src/unit-s3.cc
src/unit-sparse-global-order-reader.cc
src/unit-sparse-unordered-with-dups-reader.cc
src/unit-ssl-config.cc
src/unit-Subarray.cc
src/unit-SubarrayPartitioner-dense.cc
src/unit-SubarrayPartitioner-error.cc
Expand Down
8 changes: 8 additions & 0 deletions test/src/unit-capi-config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ void check_save_to_file() {
ss << "sm.var_offsets.bitsize 64\n";
ss << "sm.var_offsets.extra_element false\n";
ss << "sm.var_offsets.mode bytes\n";
ss << "ssl.verify true\n";
ss << "vfs.azure.block_list_block_size 5242880\n";
ss << "vfs.azure.max_parallel_ops " << std::thread::hardware_concurrency()
<< "\n";
Expand Down Expand Up @@ -659,12 +660,17 @@ TEST_CASE("C API: Test config iter", "[capi][config]") {
all_param_values["sm.fragment_info.preload_mbrs"] = "true";
all_param_values["sm.partial_tile_offsets_loading"] = "false";

all_param_values["ssl.ca_file"] = "";
all_param_values["ssl.ca_path"] = "";
all_param_values["ssl.verify"] = "true";

all_param_values["vfs.max_batch_size"] = "104857600";
all_param_values["vfs.min_batch_gap"] = "512000";
all_param_values["vfs.min_batch_size"] = "20971520";
all_param_values["vfs.min_parallel_size"] = "10485760";
all_param_values["vfs.read_ahead_size"] = "102400";
all_param_values["vfs.read_ahead_cache_size"] = "10485760";
all_param_values["vfs.gcs.endpoint"] = "";
all_param_values["vfs.gcs.project_id"] = "";
all_param_values["vfs.gcs.max_parallel_ops"] =
std::to_string(std::thread::hardware_concurrency());
Expand Down Expand Up @@ -732,6 +738,7 @@ TEST_CASE("C API: Test config iter", "[capi][config]") {
vfs_param_values["min_parallel_size"] = "10485760";
vfs_param_values["read_ahead_size"] = "102400";
vfs_param_values["read_ahead_cache_size"] = "10485760";
vfs_param_values["gcs.endpoint"] = "";
vfs_param_values["gcs.project_id"] = "";
vfs_param_values["gcs.max_parallel_ops"] =
std::to_string(std::thread::hardware_concurrency());
Expand Down Expand Up @@ -793,6 +800,7 @@ TEST_CASE("C API: Test config iter", "[capi][config]") {
vfs_param_values["hdfs.name_node_uri"] = "";

std::map<std::string, std::string> gcs_param_values;
gcs_param_values["endpoint"] = "";
gcs_param_values["project_id"] = "";
gcs_param_values["max_parallel_ops"] =
std::to_string(std::thread::hardware_concurrency());
Expand Down
2 changes: 1 addition & 1 deletion test/src/unit-cppapi-config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ TEST_CASE("C++ API: Config iterator", "[cppapi][config]") {
names.push_back(it->first);
}
// Check number of VFS params in default config object.
CHECK(names.size() == 62);
CHECK(names.size() == 63);
}

TEST_CASE("C++ API: Config Environment Variables", "[cppapi][config]") {
Expand Down
Loading

0 comments on commit e517a90

Please sign in to comment.