Skip to content

Commit

Permalink
try fix compile error
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Sep 10, 2024
1 parent e51dd6c commit e6a0095
Show file tree
Hide file tree
Showing 6 changed files with 218 additions and 103 deletions.
44 changes: 26 additions & 18 deletions src/connector/src/sink/google_pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@
use std::collections::BTreeMap;

use anyhow::{anyhow, Context};
use futures::future::try_join_all;
use futures::prelude::future::FutureExt;
use futures::prelude::TryFuture;
use futures::TryFutureExt;
use google_cloud_gax::conn::Environment;
use google_cloud_googleapis::pubsub::v1::PubsubMessage;
Expand All @@ -26,7 +23,7 @@ use google_cloud_pubsub::client::google_cloud_auth::credentials::CredentialsFile
use google_cloud_pubsub::client::google_cloud_auth::project;
use google_cloud_pubsub::client::google_cloud_auth::token::DefaultTokenSourceProvider;
use google_cloud_pubsub::client::{Client, ClientConfig};
use google_cloud_pubsub::publisher::{Awaiter, Publisher};
use google_cloud_pubsub::publisher::Publisher;
use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::Schema;
use serde_derive::Deserialize;
Expand All @@ -46,19 +43,33 @@ use crate::dispatch_sink_formatter_str_key_impl;
pub const PUBSUB_SINK: &str = "google_pubsub";
const PUBSUB_SEND_FUTURE_BUFFER_MAX_SIZE: usize = 65536;

fn may_delivery_future(awaiter: Vec<Awaiter>) -> GooglePubSubSinkDeliveryFuture {
try_join_all(awaiter.into_iter().map(|awaiter| {
awaiter.get().map(|result| {
result
.context("Google Pub/Sub sink error")
.map_err(SinkError::GooglePubSub)
.map(|_| ())
})
}))
.map_ok(|_: Vec<()>| ())
.boxed()
mod delivery_future {
use anyhow::Context;
use futures::future::try_join_all;
use futures::{FutureExt, TryFuture, TryFutureExt};
use google_cloud_pubsub::publisher::Awaiter;

use crate::sink::SinkError;

pub type GooglePubSubSinkDeliveryFuture =
impl TryFuture<Ok = (), Error = SinkError> + Unpin + 'static;

pub(super) fn may_delivery_future(awaiter: Vec<Awaiter>) -> GooglePubSubSinkDeliveryFuture {
try_join_all(awaiter.into_iter().map(|awaiter| {
awaiter.get().map(|result| {
result
.context("Google Pub/Sub sink error")
.map_err(SinkError::GooglePubSub)
.map(|_| ())
})
}))
.map_ok(|_: Vec<()>| ())
.boxed()
}
}

use delivery_future::*;

#[serde_as]
#[derive(Clone, Debug, Deserialize, WithOptions)]
pub struct GooglePubSubConfig {
Expand Down Expand Up @@ -172,9 +183,6 @@ struct GooglePubSubPayloadWriter<'w> {
add_future: DeliveryFutureManagerAddFuture<'w, GooglePubSubSinkDeliveryFuture>,
}

pub type GooglePubSubSinkDeliveryFuture =
impl TryFuture<Ok = (), Error = SinkError> + Unpin + 'static;

impl GooglePubSubSinkWriter {
pub async fn new(
config: GooglePubSubConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ pub struct MonitoredFanoutPartitionedWriterBuilder<B: IcebergWriterBuilder> {
}

impl<B: IcebergWriterBuilder> MonitoredFanoutPartitionedWriterBuilder<B> {
#[expect(dead_code)]
pub fn new(
inner: FanoutPartitionedWriterBuilder<B>,
partition_num: LabelGuardedIntGauge<2>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ pub struct MonitoredWriteWriterBuilder<B: IcebergWriterBuilder> {

impl<B: IcebergWriterBuilder> MonitoredWriteWriterBuilder<B> {
/// Create writer context.
#[expect(dead_code)]
pub fn new(
inner: B,
write_qps: LabelGuardedIntCounter<2>,
Expand Down
64 changes: 39 additions & 25 deletions src/meta/src/barrier/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,33 +58,47 @@ struct ControlStreamNode {
sender: UnboundedSender<StreamingControlStreamRequest>,
}

fn into_future(
worker_id: WorkerId,
stream: BoxStream<
'static,
risingwave_rpc_client::error::Result<StreamingControlStreamResponse>,
>,
) -> ResponseStreamFuture {
stream.into_future().map(move |(opt, stream)| {
(
worker_id,
stream,
opt.ok_or_else(|| anyhow!("end of stream").into())
.and_then(|result| result.map_err(|e| e.into())),
)
})
mod response_stream_future {
use std::future::Future;

use anyhow::anyhow;
use futures::stream::BoxStream;
use futures::{FutureExt, StreamExt};
use risingwave_pb::stream_service::StreamingControlStreamResponse;

use crate::manager::WorkerId;
use crate::MetaResult;

pub(super) fn into_future(
worker_id: WorkerId,
stream: BoxStream<
'static,
risingwave_rpc_client::error::Result<StreamingControlStreamResponse>,
>,
) -> ResponseStreamFuture {
stream.into_future().map(move |(opt, stream)| {
(
worker_id,
stream,
opt.ok_or_else(|| anyhow!("end of stream").into())
.and_then(|result| result.map_err(|e| e.into())),
)
})
}

pub(super) type ResponseStreamFuture = impl Future<
Output = (
WorkerId,
BoxStream<
'static,
risingwave_rpc_client::error::Result<StreamingControlStreamResponse>,
>,
MetaResult<StreamingControlStreamResponse>,
),
> + 'static;
}

type ResponseStreamFuture = impl Future<
Output = (
WorkerId,
BoxStream<
'static,
risingwave_rpc_client::error::Result<StreamingControlStreamResponse>,
>,
MetaResult<StreamingControlStreamResponse>,
),
> + 'static;
use response_stream_future::*;

pub(super) struct ControlStreamManager {
context: GlobalBarrierManagerContext,
Expand Down
127 changes: 75 additions & 52 deletions src/stream/src/common/log_store_impl/kv_log_store/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,18 +53,28 @@ use crate::common::log_store_impl::kv_log_store::serde::{
};
use crate::common::log_store_impl::kv_log_store::KvLogStoreMetrics;

type RewindBackoffPolicy = impl Iterator<Item = Duration>;
pub(crate) const REWIND_BASE_DELAY: Duration = Duration::from_secs(1);
pub(crate) const REWIND_BACKOFF_FACTOR: u64 = 2;
pub(crate) const REWIND_MAX_DELAY: Duration = Duration::from_secs(180);

fn initial_rewind_backoff_policy() -> RewindBackoffPolicy {
tokio_retry::strategy::ExponentialBackoff::from_millis(REWIND_BASE_DELAY.as_millis() as _)
.factor(REWIND_BACKOFF_FACTOR)
.max_delay(REWIND_MAX_DELAY)
.map(tokio_retry::strategy::jitter)
mod rewind_backoff_policy {
use std::time::Duration;

use crate::common::log_store_impl::kv_log_store::{
REWIND_BACKOFF_FACTOR, REWIND_BASE_DELAY, REWIND_MAX_DELAY,
};

pub(super) type RewindBackoffPolicy = impl Iterator<Item = Duration>;
pub(super) fn initial_rewind_backoff_policy() -> RewindBackoffPolicy {
tokio_retry::strategy::ExponentialBackoff::from_millis(REWIND_BASE_DELAY.as_millis() as _)
.factor(REWIND_BACKOFF_FACTOR)
.max_delay(REWIND_MAX_DELAY)
.map(tokio_retry::strategy::jitter)
}
}

use rewind_backoff_policy::*;

struct RewindDelay {
last_rewind_truncate_offset: Option<TruncateOffset>,
backoff_policy: RewindBackoffPolicy,
Expand Down Expand Up @@ -218,58 +228,71 @@ impl<S: StateStoreRead, F: FnMut() -> bool> AutoRebuildStateStoreReadIter<S, F>
}
}

type TimeoutAutoRebuildIter<S: StateStoreRead> =
AutoRebuildStateStoreReadIter<S, impl FnMut() -> bool + Send>;
mod timeout_auto_rebuild {
use std::time::{Duration, Instant};

async fn iter_with_timeout_rebuild<S: StateStoreRead>(
state_store: S,
range: TableKeyRange,
epoch: HummockEpoch,
options: ReadOptions,
timeout: Duration,
) -> StorageResult<TimeoutAutoRebuildIter<S>> {
const CHECK_TIMEOUT_PERIOD: usize = 100;
// use a struct here to avoid accidental copy instead of move on primitive usize
struct Count(usize);
let mut check_count = Count(0);
let mut total_count = Count(0);
let mut curr_iter_item_count = Count(0);
let mut start_time = Instant::now();
let initial_start_time = start_time;
AutoRebuildStateStoreReadIter::new(
state_store,
move || {
check_count.0 += 1;
curr_iter_item_count.0 += 1;
total_count.0 += 1;
if check_count.0 == CHECK_TIMEOUT_PERIOD {
check_count.0 = 0;
if start_time.elapsed() > timeout {
let prev_iter_item_count = curr_iter_item_count.0;
curr_iter_item_count.0 = 0;
start_time = Instant::now();
info!(
table_id = options.table_id.table_id,
iter_exist_time_secs = initial_start_time.elapsed().as_secs(),
prev_iter_item_count,
total_iter_item_count = total_count.0,
"kv log store iter is rebuilt"
);
true
use risingwave_hummock_sdk::key::TableKeyRange;
use risingwave_hummock_sdk::HummockEpoch;
use risingwave_storage::error::StorageResult;
use risingwave_storage::store::{ReadOptions, StateStoreRead};

use crate::common::log_store_impl::kv_log_store::reader::AutoRebuildStateStoreReadIter;

pub(super) type TimeoutAutoRebuildIter<S: StateStoreRead> =
AutoRebuildStateStoreReadIter<S, impl FnMut() -> bool + Send>;

pub(super) async fn iter_with_timeout_rebuild<S: StateStoreRead>(
state_store: S,
range: TableKeyRange,
epoch: HummockEpoch,
options: ReadOptions,
timeout: Duration,
) -> StorageResult<TimeoutAutoRebuildIter<S>> {
const CHECK_TIMEOUT_PERIOD: usize = 100;
// use a struct here to avoid accidental copy instead of move on primitive usize
struct Count(usize);
let mut check_count = Count(0);
let mut total_count = Count(0);
let mut curr_iter_item_count = Count(0);
let mut start_time = Instant::now();
let initial_start_time = start_time;
AutoRebuildStateStoreReadIter::new(
state_store,
move || {
check_count.0 += 1;
curr_iter_item_count.0 += 1;
total_count.0 += 1;
if check_count.0 == CHECK_TIMEOUT_PERIOD {
check_count.0 = 0;
if start_time.elapsed() > timeout {
let prev_iter_item_count = curr_iter_item_count.0;
curr_iter_item_count.0 = 0;
start_time = Instant::now();
info!(
table_id = options.table_id.table_id,
iter_exist_time_secs = initial_start_time.elapsed().as_secs(),
prev_iter_item_count,
total_iter_item_count = total_count.0,
"kv log store iter is rebuilt"
);
true
} else {
false
}
} else {
false
}
} else {
false
}
},
range,
epoch,
options,
)
.await
},
range,
epoch,
options,
)
.await
}
}

use timeout_auto_rebuild::*;

impl<S: StateStoreRead, F: FnMut() -> bool + Send> StateStoreIter
for AutoRebuildStateStoreReadIter<S, F>
{
Expand Down
Loading

0 comments on commit e6a0095

Please sign in to comment.