Skip to content

[BLD] Debug log timeout #4357

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 44 commits into from
Closed
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
8e4a75e
[BLD] Debug log timeout
HammadB Apr 23, 2025
f6d0502
fix test
HammadB Apr 23, 2025
5ef61b3
fix test
HammadB Apr 23, 2025
70bd45f
artifact
HammadB Apr 23, 2025
d2df452
urgh
HammadB Apr 23, 2025
c85092c
n
HammadB Apr 23, 2025
2669de9
to
HammadB Apr 23, 2025
a628202
more debug
HammadB Apr 23, 2025
5ed08c2
f
HammadB Apr 23, 2025
d3194c6
debug
HammadB Apr 23, 2025
45c5398
debug
HammadB Apr 24, 2025
cb60d38
debug
HammadB Apr 24, 2025
c0e6518
timout debug
HammadB Apr 24, 2025
ee329ea
i/o
HammadB Apr 24, 2025
51eaa4b
wt
HammadB Apr 24, 2025
40ab431
pop
HammadB Apr 24, 2025
6d750e8
log timeout
HammadB Apr 24, 2025
fee72ff
this seems to untrigger the race
HammadB Apr 24, 2025
05eb9f9
debug
HammadB Apr 24, 2025
0dd0f7d
debug
HammadB Apr 24, 2025
4c909cd
pop
HammadB Apr 24, 2025
3ef76d1
poop
HammadB Apr 25, 2025
0488b73
garb
HammadB Apr 25, 2025
3f53b37
trigger it ???
HammadB Apr 25, 2025
a661d76
uganda
HammadB Apr 25, 2025
d567c3c
remove some debug. see if triggers
HammadB Apr 25, 2025
8353d73
...
HammadB Apr 25, 2025
3bc0bda
...
HammadB Apr 25, 2025
626ef5d
:wock
HammadB Apr 25, 2025
7f84f17
to
HammadB Apr 27, 2025
c8766e3
repro
HammadB Apr 27, 2025
8258290
lower block size to trigger bug
HammadB Apr 29, 2025
d96cab9
debug
HammadB Apr 29, 2025
1d89837
ha
HammadB Apr 29, 2025
2e3cb4f
track
HammadB Apr 29, 2025
e426246
debug
HammadB Apr 30, 2025
5cdf315
Merge branch 'main' into hammad/debug_log_timeout
HammadB Apr 30, 2025
2b99891
disable spann
HammadB Apr 30, 2025
e440d05
repro
HammadB Apr 30, 2025
806d08b
repri count
HammadB Apr 30, 2025
71cee5d
are we over notifiying?
HammadB Apr 30, 2025
bc2f6d6
remove
HammadB Apr 30, 2025
53c4094
sender overwrite
HammadB Apr 30, 2025
4cfb4f2
show this fixes
HammadB Apr 30, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
442 changes: 217 additions & 225 deletions .github/workflows/_python-tests.yml

Large diffs are not rendered by default.

152 changes: 76 additions & 76 deletions .github/workflows/pr.yml
Original file line number Diff line number Diff line change
@@ -119,44 +119,44 @@ jobs:
VERCEL_ORG_ID: ${{ secrets.VERCEL_ORG_ID }}
VERCEL_PROJECT_ID: ${{ secrets.VERCEL_DOCS_PROJECT_ID }}

check-helm-version-bump:
name: Warn if Helm chart was updated without version bump
needs: change-detection
if: needs.change-detection.outputs.helm-changes == 'true'
runs-on: blacksmith-4vcpu-ubuntu-2204
permissions:
pull-requests: write
steps:
- uses: actions/checkout@v4
- name: Comment warning
if: needs.change-detection.outputs.helm-version-changed == 'false'
uses: marocchino/sticky-pull-request-comment@v2
with:
header: helm-chart-version-info
message: |
:warning: The Helm chart was updated without a version bump. Your changes will only be published if the version field in `k8s/distributed-chroma/Chart.yaml` is updated.
# check-helm-version-bump:
# name: Warn if Helm chart was updated without version bump
# needs: change-detection
# if: needs.change-detection.outputs.helm-changes == 'true'
# runs-on: blacksmith-4vcpu-ubuntu-2204
# permissions:
# pull-requests: write
# steps:
# - uses: actions/checkout@v4
# - name: Comment warning
# if: needs.change-detection.outputs.helm-version-changed == 'false'
# uses: marocchino/sticky-pull-request-comment@v2
# with:
# header: helm-chart-version-info
# message: |
# :warning: The Helm chart was updated without a version bump. Your changes will only be published if the version field in `k8s/distributed-chroma/Chart.yaml` is updated.

- name: Comment success
if: needs.change-detection.outputs.helm-version-changed == 'true'
uses: marocchino/sticky-pull-request-comment@v2
with:
header: helm-chart-version-info
message: |
:white_check_mark: The Helm chart's version was changed. Your changes to the chart will be published upon merge to `main`.
# - name: Comment success
# if: needs.change-detection.outputs.helm-version-changed == 'true'
# uses: marocchino/sticky-pull-request-comment@v2
# with:
# header: helm-chart-version-info
# message: |
# :white_check_mark: The Helm chart's version was changed. Your changes to the chart will be published upon merge to `main`.

delete-helm-comment:
name: Delete Helm chart comment if not changed
needs: change-detection
if: needs.change-detection.outputs.helm-changes == 'false'
runs-on: blacksmith-4vcpu-ubuntu-2204
permissions:
pull-requests: write
steps:
- name: Delete comment (Helm chart was not changed)
uses: marocchino/sticky-pull-request-comment@v2
with:
header: helm-chart-version-info
delete: true
# delete-helm-comment:
# name: Delete Helm chart comment if not changed
# needs: change-detection
# if: needs.change-detection.outputs.helm-changes == 'false'
# runs-on: blacksmith-4vcpu-ubuntu-2204
# permissions:
# pull-requests: write
# steps:
# - name: Delete comment (Helm chart was not changed)
# uses: marocchino/sticky-pull-request-comment@v2
# with:
# header: helm-chart-version-info
# delete: true

python-tests:
name: Python tests
@@ -167,30 +167,30 @@ jobs:
with:
property_testing_preset: 'normal'

python-vulnerability-scan:
name: Python vulnerability scan
needs: change-detection
if: contains(fromJson(needs.change-detection.outputs.tests-to-run), 'python')
uses: ./.github/workflows/_python-vulnerability-scan.yml
# python-vulnerability-scan:
# name: Python vulnerability scan
# needs: change-detection
# if: contains(fromJson(needs.change-detection.outputs.tests-to-run), 'python')
# uses: ./.github/workflows/_python-vulnerability-scan.yml

javascript-client-tests:
name: JavaScript client tests
needs: change-detection
if: contains(fromJson(needs.change-detection.outputs.tests-to-run), 'js-client')
uses: ./.github/workflows/_javascript-client-tests.yml
# javascript-client-tests:
# name: JavaScript client tests
# needs: change-detection
# if: contains(fromJson(needs.change-detection.outputs.tests-to-run), 'js-client')
# uses: ./.github/workflows/_javascript-client-tests.yml

rust-tests:
name: Rust tests
needs: change-detection
if: contains(fromJson(needs.change-detection.outputs.tests-to-run), 'rust')
uses: ./.github/workflows/_rust-tests.yml
secrets: inherit
# rust-tests:
# name: Rust tests
# needs: change-detection
# if: contains(fromJson(needs.change-detection.outputs.tests-to-run), 'rust')
# uses: ./.github/workflows/_rust-tests.yml
# secrets: inherit

go-tests:
name: Go tests
needs: change-detection
if: contains(fromJson(needs.change-detection.outputs.tests-to-run), 'go')
uses: ./.github/workflows/_go-tests.yml
# go-tests:
# name: Go tests
# needs: change-detection
# if: contains(fromJson(needs.change-detection.outputs.tests-to-run), 'go')
# uses: ./.github/workflows/_go-tests.yml

check-title:
name: Check PR Title
@@ -254,22 +254,22 @@ jobs:
# When creating a branch protection rule, you have to specify a static list
# of checks.
# So since this job always runs, we can specify it in the branch protection rule.
all-required-pr-checks-passed:
if: always()
needs:
- python-tests
- python-vulnerability-scan
- javascript-client-tests
- rust-tests
- go-tests
- check-title
- lint
- check-helm-version-bump
- delete-helm-comment
runs-on: blacksmith-4vcpu-ubuntu-2204
steps:
- name: Decide whether the needed jobs succeeded or failed
uses: re-actors/alls-green@release/v1
with:
jobs: ${{ toJSON(needs) }}
allowed-skips: python-tests,python-vulnerability-scan,javascript-client-tests,rust-tests,go-tests,check-helm-version-bump,delete-helm-comment
# all-required-pr-checks-passed:
# if: always()
# needs:
# - python-tests
# - python-vulnerability-scan
# - javascript-client-tests
# - rust-tests
# - go-tests
# - check-title
# - lint
# - check-helm-version-bump
# - delete-helm-comment
# runs-on: blacksmith-4vcpu-ubuntu-2204
# steps:
# - name: Decide whether the needed jobs succeeded or failed
# uses: re-actors/alls-green@release/v1
# with:
# jobs: ${{ toJSON(needs) }}
# allowed-skips: python-tests,python-vulnerability-scan,javascript-client-tests,rust-tests,go-tests,check-helm-version-bump,delete-helm-comment
4 changes: 2 additions & 2 deletions chromadb/test/conftest.py
Original file line number Diff line number Diff line change
@@ -47,7 +47,7 @@
logger = logging.getLogger(__name__)

VALID_PRESETS = ["fast", "normal", "slow"]
CURRENT_PRESET = os.getenv("PROPERTY_TESTING_PRESET", "fast")
CURRENT_PRESET = os.getenv("PROPERTY_TESTING_PRESET", "normal")

if CURRENT_PRESET not in VALID_PRESETS:
raise ValueError(
@@ -62,7 +62,7 @@
hypothesis.HealthCheck.large_base_example,
hypothesis.HealthCheck.function_scoped_fixture,
],
verbosity=hypothesis.Verbosity.verbose
verbosity=hypothesis.Verbosity.verbose,
)

hypothesis.settings.register_profile(
23 changes: 22 additions & 1 deletion chromadb/test/property/test_add.py
Original file line number Diff line number Diff line change
@@ -36,6 +36,12 @@
normal=hypothesis.settings(max_examples=500),
fast=hypothesis.settings(max_examples=200),
),
phases=[
hypothesis.Phase.explicit,
hypothesis.Phase.reuse,
hypothesis.Phase.generate,
hypothesis.Phase.target,
],
)
def test_add_small(
client: ClientAPI,
@@ -77,6 +83,12 @@ def test_add_small(
hypothesis.HealthCheck.large_base_example,
hypothesis.HealthCheck.function_scoped_fixture,
],
phases=[
hypothesis.Phase.explicit,
hypothesis.Phase.reuse,
hypothesis.Phase.generate,
hypothesis.Phase.target,
],
)
def test_add_medium(
client: ClientAPI,
@@ -183,7 +195,16 @@ def create_large_recordset(


@given(collection=collection_st, should_compact=st.booleans())
@settings(deadline=None, max_examples=5)
@settings(
deadline=None,
max_examples=5,
phases=[
hypothesis.Phase.explicit,
hypothesis.Phase.reuse,
hypothesis.Phase.generate,
hypothesis.Phase.target,
],
)
def test_add_large(
client: ClientAPI, collection: strategies.Collection, should_compact: bool
) -> None:
2 changes: 1 addition & 1 deletion rust/frontend/sample_configs/distributed.yaml
Original file line number Diff line number Diff line change
@@ -46,5 +46,5 @@ scorecard:
- "collection_id:*"
score: 100
enable_span_indexing: true
default_knn_index: "spann"
default_knn_index: "hnsw"
enable_set_index_params: false
6 changes: 3 additions & 3 deletions rust/frontend/sample_configs/tilt_config.yaml
Original file line number Diff line number Diff line change
@@ -23,7 +23,7 @@ log:
host: "logservice.chroma"
port: 50051
connect_timeout_ms: 5000
request_timeout_ms: 60000 # 1 minute
request_timeout_ms: 5000
alt_host: "rust-log-service.chroma"
#use_alt_host_for_everything: true

@@ -32,7 +32,7 @@ executor:
connections_per_node: 5
replication_factor: 2
connect_timeout_ms: 5000
request_timeout_ms: 60000 # 1 minute
request_timeout_ms: 5000
assignment:
rendezvous_hashing:
hasher: Murmur3
@@ -54,5 +54,5 @@ scorecard:
circuit_breaker:
requests: 1000
enable_span_indexing: true
default_knn_index: "spann"
default_knn_index: "hnsw"
enable_set_index_params: true
2 changes: 1 addition & 1 deletion rust/log/src/grpc_log.rs
Original file line number Diff line number Diff line change
@@ -183,7 +183,7 @@ impl Configurable<GrpcLogConfig> for GrpcLog {
};
let endpoint_res = endpoint_res
.connect_timeout(Duration::from_millis(my_config.connect_timeout_ms))
.timeout(Duration::from_millis(my_config.request_timeout_ms));
.timeout(Duration::from_millis(5000)); // HARDCODE TO 5s for debugging
let channel = endpoint_res.connect_lazy();
let channel = ServiceBuilder::new()
.layer(chroma_tracing::GrpcTraceLayer)
75 changes: 71 additions & 4 deletions rust/storage/src/admissioncontrolleds3.rs
Original file line number Diff line number Diff line change
@@ -70,6 +70,10 @@ impl InflightRequest {
.store(priority.as_usize(), std::sync::atomic::Ordering::SeqCst);
// Ignore send errors since it can happen that the receiver is dropped
// and the task is busy reading the data from s3.
tracing::info!(
"Sending notification to reprioritize request for key with priority {}",
priority.as_usize()
);
let _ = channel.send(()).await;
}
}
@@ -231,7 +235,7 @@ impl AdmissionControlledS3Storage {
let maybe_inflight = requests.get(&key).cloned();
future_to_await = match maybe_inflight {
Some(fut) => {
tracing::trace!("[AdmissionControlledS3] Found inflight request to s3 for key: {:?}. Deduping", key);
tracing::debug!("[AdmissionControlledS3] Found inflight request to s3 for key: {:?}. Deduping", key);
fut.update_priority(options.priority).await;
fut.future
}
@@ -245,14 +249,21 @@ impl AdmissionControlledS3Storage {
)
.boxed()
.shared();
requests.insert(
let old_val = requests.insert(
key.clone(),
InflightRequest {
priority: atomic_priority,
future: get_parallel_storage_future.clone(),
notify_channel: None,
},
);
if old_val.is_some() {
tracing::error!(
"There was already an inflight request for key: {:?}. This should not happen.",
key
);
panic!("There was already an inflight request for key: {:?}. This should not happen.", key);
}
get_parallel_storage_future
}
};
@@ -302,14 +313,21 @@ impl AdmissionControlledS3Storage {
)
.boxed()
.shared();
requests.insert(
let old_val = requests.insert(
key.to_string(),
InflightRequest {
priority: atomic_priority,
future: get_storage_future.clone(),
notify_channel: Some(tx),
},
);
if old_val.is_some() {
tracing::error!(
"There was already an inflight request for key: {:?}. This should not happen.",
key
);
panic!("There was already an inflight request for key: {:?}. This should not happen.", key);
}
get_storage_future
}
};
@@ -318,8 +336,22 @@ impl AdmissionControlledS3Storage {
let res = future_to_await.await;
{
let mut requests = self.outstanding_read_requests.lock().await;
// debug requests
let mut requests_per_priority_level = HashMap::new();
for (_key, request) in requests.iter() {
let priority = request.priority.load(Ordering::SeqCst);
let curr_count = requests_per_priority_level.get(&priority).unwrap_or(&0);
requests_per_priority_level.insert(priority, *curr_count + 1);
// let curr_count = requests_per_priority_level.get(&priority).unwrap();
}
for (priority, count) in requests_per_priority_level.iter() {
if *count > 30 {
tracing::info!("There are {} requests with priority {}", count, priority);
}
}
requests.remove(key);
}

res
}

@@ -526,6 +558,26 @@ impl CountBasedPolicy {
priority: Arc<AtomicUsize>,
mut channel_receiver: Option<tokio::sync::mpsc::Receiver<()>>,
) -> SemaphorePermit<'_> {
let timeout_duration = std::time::Duration::from_secs(10);
let (timeout_tx, mut timeout_rx) = tokio::sync::oneshot::channel();
tokio::spawn(async move {
tokio::time::sleep(timeout_duration).await;
let did_timeout = timeout_rx.try_recv();
match did_timeout {
Ok(_) => {}
Err(e) => match e {
tokio::sync::oneshot::error::TryRecvError::Closed => {
tracing::error!("Timeout channel closed");
}
tokio::sync::oneshot::error::TryRecvError::Empty => {
tracing::error!("Timeout channel empty");
}
},
}
});

let mut repri_count = 0;

loop {
let current_priority = priority.load(Ordering::SeqCst);
let current_priority: StorageRequestPriority = current_priority.into();
@@ -538,6 +590,7 @@ impl CountBasedPolicy {
{
match self.remaining_tokens[pri].try_acquire() {
Ok(token) => {
let _ = timeout_tx.send(());
return token;
}
Err(TryAcquireError::NoPermits) => continue,
@@ -548,14 +601,27 @@ impl CountBasedPolicy {
match &mut channel_receiver {
Some(rx) => {
select! {
_ = rx.recv() => {
msg = rx.recv() => {
// Reevaluate priority if we got a notification.
// WITHOUT the changes below, this will spin forever
tracing::info!("Got notification to reevaluate priority, repriority count: {}", repri_count);
repri_count += 1;
match msg {
Some(_) => {
continue;
}
None => {
// Sender dropped, exit loop.
channel_receiver = None;
}
}
continue;
}
token = self.remaining_tokens[current_priority.as_usize()].acquire() => {
match token {
Ok(token) => {
// If we got a token, return it.
let _ = timeout_tx.send(());
return token;
},
Err(e) => {
@@ -574,6 +640,7 @@ impl CountBasedPolicy {
match token {
Ok(token) => {
// If we got a token, return it.
let _ = timeout_tx.send(());
return token;
}
Err(e) => {
2 changes: 2 additions & 0 deletions rust/storage/src/s3.rs
Original file line number Diff line number Diff line change
@@ -320,6 +320,8 @@ impl S3Storage {
.instrument(tracing::trace_span!("S3 get stream"))
.await?;
let read_block_span = tracing::trace_span!("S3 read bytes to end");
// add some additional latency to try to trigger the issue
// tokio::time::sleep(Duration::from_millis(20)).await;
let buf = async {
let mut buf: Vec<u8> = Vec::new();
while let Some(res) = stream.next().await {
7 changes: 7 additions & 0 deletions rust/system/src/execution/dispatcher.rs
Original file line number Diff line number Diff line change
@@ -102,6 +102,13 @@ impl Dispatcher {
/// # Parameters
/// - task: The task to enqueue
async fn enqueue_task(&mut self, mut task: TaskMessage) {
tracing::info!(
task_name = task.get_name(),
task_id = task.id().to_string(),
"Enqueueing task"
);
let task_name = task.get_name();
let task_id = task.id().to_string();
match task.get_type() {
OperatorType::IO => {
let child_span = trace_span!(parent: Span::current(), "IO task execution", name = task.get_name());
2 changes: 1 addition & 1 deletion rust/system/src/execution/operator.rs
Original file line number Diff line number Diff line change
@@ -250,7 +250,7 @@ where
err
);
} else {
tracing::debug!(
tracing::info!(
"Failed to send task result for task {} to reply channel: {}",
self.task_id,
err
6 changes: 6 additions & 0 deletions rust/system/src/execution/orchestrator.rs
Original file line number Diff line number Diff line change
@@ -31,14 +31,20 @@ pub trait Orchestrator: Debug + Send + Sized + 'static {
async fn run(mut self, system: System) -> Result<Self::Output, Self::Error> {
let (tx, rx) = oneshot::channel();
self.set_result_channel(tx);
tracing::info!("{} starting", Self::name());
let mut handle = system.start_component(self);
let res = rx.await;
tracing::info!("{} finished", Self::name());
handle.stop();
res?
}

/// Sends a task to the dispatcher and return whether the task is successfully sent
async fn send(&mut self, task: TaskMessage, ctx: &ComponentContext<Self>) -> bool {
tracing::info!(
task_name = %task.get_name(),
"Sending task to dispatcher"
);
let res = self.dispatcher().send(task, Some(Span::current())).await;
self.ok_or_terminate(res, ctx).await.is_some()
}
35 changes: 33 additions & 2 deletions rust/system/src/execution/worker_thread.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use super::{dispatcher::TaskRequestMessage, operator::TaskMessage};
use crate::{Component, ComponentContext, ComponentRuntime, Handler, ReceiverForMessage};
use async_trait::async_trait;
use std::fmt::{Debug, Formatter, Result};
use std::{
fmt::{Debug, Formatter, Result},
time::Duration,
};
use tracing::{trace_span, Instrument, Span};

/// A worker thread is responsible for executing tasks
@@ -57,11 +60,39 @@ impl Handler<TaskMessage> for WorkerThread {
type Result = ();

async fn handle(&mut self, mut task: TaskMessage, ctx: &ComponentContext<WorkerThread>) {
tracing::info!("Worker thread: executing task {}", task.get_name());
let child_span =
trace_span!(parent: Span::current(), "Task execution", name = task.get_name());
let task_timeout = Duration::from_secs(15);
let (mark_done_tx, mut mark_done_rx) = tokio::sync::oneshot::channel();
let task_name = task.get_name().to_string();
tokio::spawn(async move {
tokio::time::sleep(task_timeout).await;
let attempted_recv = mark_done_rx.try_recv();
match attempted_recv {
Ok(_) => {
//tracing::info!("Task {} completed", task_name)
}
Err(e) => match e {
tokio::sync::oneshot::error::TryRecvError::Empty => {
tracing::info!("[HAMMAD] Task {} timed out", task_name);
}
tokio::sync::oneshot::error::TryRecvError::Closed => {
tracing::error!("[HAMMAD] Never got confirmation for task {}", task_name);
}
},
};
});
task.run().instrument(child_span).await;
mark_done_tx.send(()).unwrap_or_else(|_| {
tracing::error!("[HAMMAD] Failed to send task completion signal");
});
let req: TaskRequestMessage = TaskRequestMessage::new(ctx.receiver());
let _res = self.dispatcher.send(req, None).await;
let res = self.dispatcher.send(req, None).await;
if let Err(err) = res {
tracing::error!("Error sending task request: {}", err);
tracing::error!("Worker thread error");
}
// TODO: task run should be able to error and we should send it as part of the result
}
}
1 change: 1 addition & 0 deletions rust/worker/src/execution/operators/fetch_log.rs
Original file line number Diff line number Diff line change
@@ -7,6 +7,7 @@ use chroma_log::Log;
use chroma_system::{Operator, OperatorType};
use chroma_types::{Chunk, CollectionUuid, LogRecord};
use thiserror::Error;
use tokio::select;

/// The `FetchLogOperator` fetches logs from the log service
///
2 changes: 1 addition & 1 deletion rust/worker/src/execution/operators/filter.rs
Original file line number Diff line number Diff line change
@@ -402,7 +402,7 @@ impl Operator<FilterInput, FilterOutput> for FilterOperator {
type Error = FilterError;

async fn run(&self, input: &FilterInput) -> Result<FilterOutput, FilterError> {
tracing::debug!("[{}]: {:?}", self.get_name(), input);
tracing::info!("Running filter operator");

let record_segment_reader = match RecordSegmentReader::from_segment(
&input.record_segment,
8 changes: 6 additions & 2 deletions rust/worker/src/execution/operators/prefetch_record.rs
Original file line number Diff line number Diff line change
@@ -6,7 +6,7 @@ use chroma_segment::{
blockfile_record::{RecordSegmentReader, RecordSegmentReaderCreationError},
types::{materialize_logs, LogMaterializerError},
};
use chroma_system::Operator;
use chroma_system::{Operator, OperatorType};
use thiserror::Error;
use tracing::{trace, Instrument, Span};

@@ -57,7 +57,7 @@ impl Operator<PrefetchRecordInput, PrefetchRecordOutput> for PrefetchRecordOpera
&self,
input: &PrefetchRecordInput,
) -> Result<PrefetchRecordOutput, PrefetchRecordError> {
trace!("[{}]: {:?}", self.get_name(), input);
tracing::info!("RUNNING THE PREFETCH [{}]", self.get_name());

let record_segment_reader = match RecordSegmentReader::from_segment(
&input.record_segment,
@@ -94,4 +94,8 @@ impl Operator<PrefetchRecordInput, PrefetchRecordOutput> for PrefetchRecordOpera
fn errors_when_sender_dropped(&self) -> bool {
false
}

// fn get_type(&self) -> OperatorType {
// OperatorType::IO
// }
}
3 changes: 3 additions & 0 deletions rust/worker/src/execution/orchestration/count.rs
Original file line number Diff line number Diff line change
@@ -138,12 +138,14 @@ impl Handler<TaskResult<FetchLogOutput, FetchLogError>> for CountOrchestrator {
message: TaskResult<FetchLogOutput, FetchLogError>,
ctx: &ComponentContext<Self>,
) {
tracing::info!("CountOrchestrator: FetchLogOperator finished");
let output = match self.ok_or_terminate(message.into_inner(), ctx).await {
Some(output) => output,
None => return,
};
self.fetch_log_bytes
.replace(output.iter().map(|(l, _)| l.size_bytes()).sum());
tracing::info!("sending CountRecordsOperator");
let task = wrap(
CountRecordsOperator::new(),
CountRecordsInput::new(
@@ -166,6 +168,7 @@ impl Handler<TaskResult<CountRecordsOutput, CountRecordsError>> for CountOrchest
message: TaskResult<CountRecordsOutput, CountRecordsError>,
ctx: &ComponentContext<Self>,
) {
tracing::info!("CountOrchestrator: CountRecordsOperator finished");
self.terminate_with_result(
message.into_inner().map_err(|e| e.into()).map(|output| {
(
4 changes: 4 additions & 0 deletions rust/worker/src/execution/orchestration/get.rs
Original file line number Diff line number Diff line change
@@ -204,6 +204,7 @@ impl Handler<TaskResult<FetchLogOutput, FetchLogError>> for GetOrchestrator {
message: TaskResult<FetchLogOutput, FetchLogError>,
ctx: &ComponentContext<Self>,
) {
tracing::info!("GetOrchestrator: FetchLogOperator finished");
let output = match self.ok_or_terminate(message.into_inner(), ctx).await {
Some(output) => output,
None => return,
@@ -234,6 +235,7 @@ impl Handler<TaskResult<FilterOutput, FilterError>> for GetOrchestrator {
message: TaskResult<FilterOutput, FilterError>,
ctx: &ComponentContext<Self>,
) {
tracing::info!("GetOrchestrator: FilterOperator finished");
let output = match self.ok_or_terminate(message.into_inner(), ctx).await {
Some(output) => output,
None => return,
@@ -266,6 +268,7 @@ impl Handler<TaskResult<LimitOutput, LimitError>> for GetOrchestrator {
message: TaskResult<LimitOutput, LimitError>,
ctx: &ComponentContext<Self>,
) {
tracing::info!("GetOrchestrator: LimitOperator finished");
let output = match self.ok_or_terminate(message.into_inner(), ctx).await {
Some(output) => output,
None => return,
@@ -320,6 +323,7 @@ impl Handler<TaskResult<ProjectionOutput, ProjectionError>> for GetOrchestrator
message: TaskResult<ProjectionOutput, ProjectionError>,
ctx: &ComponentContext<Self>,
) {
tracing::info!("GetOrchestrator: ProjectionOperator finished");
let output = match self.ok_or_terminate(message.into_inner(), ctx).await {
Some(output) => output,
None => return,
8 changes: 4 additions & 4 deletions rust/worker/tilt_config.yaml
Original file line number Diff line number Diff line change
@@ -40,7 +40,7 @@ query_service:
host: "logservice.chroma"
port: 50051
connect_timeout_ms: 5000
request_timeout_ms: 60000 # 1 minute
request_timeout_ms: 5000
alt_host: "rust-log-service.chroma"
#use_alt_host_for_everything: true
dispatcher:
@@ -52,7 +52,7 @@ query_service:
blockfile_provider:
arrow:
block_manager_config:
max_block_size_bytes: 8388608 # 8MB
max_block_size_bytes: 16384 # 16KiB
block_cache_config:
disk:
dir: "/cache/chroma/query-service/block-cache"
@@ -117,7 +117,7 @@ compaction_service:
host: "logservice.chroma"
port: 50051
connect_timeout_ms: 5000
request_timeout_ms: 60000 # 1 minute
request_timeout_ms: 5000
alt_host: "rust-log-service.chroma"
#use_alt_host_for_everything: true
dispatcher:
@@ -138,7 +138,7 @@ compaction_service:
blockfile_provider:
arrow:
block_manager_config:
max_block_size_bytes: 8388608 # 8MB
max_block_size_bytes: 16384 # 16KiB
block_cache_config:
lru:
capacity: 1000