Skip to content

Commit

Permalink
refactor: move source connectors to source dir (#3826)
Browse files Browse the repository at this point in the history
move source connectors to source/

Signed-off-by: tabVersion <tabvision@bupt.icu>

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
tabVersion and mergify[bot] authored Jul 13, 2022
1 parent 3e72311 commit 369b57e
Show file tree
Hide file tree
Showing 58 changed files with 140 additions and 126 deletions.
20 changes: 1 addition & 19 deletions src/connector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,25 +37,7 @@

extern crate core;

pub mod base;
pub mod datagen;
mod filesystem;
pub mod kafka;
pub mod kinesis;
mod nexmark;
mod pulsar;

pub use base::*;

pub mod aws_utils;
pub mod dummy_connector;
mod macros;
pub mod sink;

pub use base::ConnectorState;
pub use datagen::DATAGEN_CONNECTOR;
pub use kafka::KAFKA_CONNECTOR;
pub use kinesis::KINESIS_CONNECTOR;
pub use nexmark::NEXMARK_CONNECTOR;

pub use crate::pulsar::PULSAR_CONNECTOR;
pub mod source;
32 changes: 18 additions & 14 deletions src/connector/src/base.rs → src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,26 @@ use prost::Message;
use risingwave_pb::source::ConnectorSplit;
use serde::{Deserialize, Serialize};

use crate::datagen::{
use crate::source::datagen::{
DatagenProperties, DatagenSplit, DatagenSplitEnumerator, DatagenSplitReader, DATAGEN_CONNECTOR,
};
use crate::dummy_connector::DummySplitReader;
use crate::filesystem::s3::{S3Properties, S3_CONNECTOR};
use crate::kafka::enumerator::KafkaSplitEnumerator;
use crate::kafka::source::KafkaSplitReader;
use crate::kafka::{KafkaProperties, KafkaSplit, KAFKA_CONNECTOR};
use crate::kinesis::enumerator::client::KinesisSplitEnumerator;
use crate::kinesis::source::reader::KinesisMultiSplitReader;
use crate::kinesis::split::KinesisSplit;
use crate::kinesis::{KinesisProperties, KINESIS_CONNECTOR};
use crate::nexmark::source::reader::NexmarkSplitReader;
use crate::nexmark::{NexmarkProperties, NexmarkSplit, NexmarkSplitEnumerator, NEXMARK_CONNECTOR};
use crate::pulsar::source::reader::PulsarSplitReader;
use crate::pulsar::{PulsarProperties, PulsarSplit, PulsarSplitEnumerator, PULSAR_CONNECTOR};
use crate::source::dummy_connector::DummySplitReader;
use crate::source::filesystem::s3::{S3Properties, S3_CONNECTOR};
use crate::source::kafka::enumerator::KafkaSplitEnumerator;
use crate::source::kafka::source::KafkaSplitReader;
use crate::source::kafka::{KafkaProperties, KafkaSplit, KAFKA_CONNECTOR};
use crate::source::kinesis::enumerator::client::KinesisSplitEnumerator;
use crate::source::kinesis::source::reader::KinesisMultiSplitReader;
use crate::source::kinesis::split::KinesisSplit;
use crate::source::kinesis::{KinesisProperties, KINESIS_CONNECTOR};
use crate::source::nexmark::source::reader::NexmarkSplitReader;
use crate::source::nexmark::{
NexmarkProperties, NexmarkSplit, NexmarkSplitEnumerator, NEXMARK_CONNECTOR,
};
use crate::source::pulsar::source::reader::PulsarSplitReader;
use crate::source::pulsar::{
PulsarProperties, PulsarSplit, PulsarSplitEnumerator, PULSAR_CONNECTOR,
};
use crate::{impl_connector_properties, impl_split, impl_split_enumerator, impl_split_reader};

/// [`SplitEnumerator`] fetches the split metadata from the external source service.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@

use async_trait::async_trait;

use crate::base::SplitEnumerator;
use crate::datagen::{DatagenProperties, DatagenSplit};
use crate::source::datagen::{DatagenProperties, DatagenSplit};
use crate::source::SplitEnumerator;

#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub struct DatagenSplitEnumerator {
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use serde_json::{Map, Value};
use tokio::time::{sleep, Duration, Instant};

use super::DEFAULT_DATAGEN_INTERVAL;
use crate::SourceMessage;
use crate::source::SourceMessage;

pub struct DatagenEventGenerator {
pub fields_map: HashMap<String, FieldGeneratorImpl>,
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ use async_trait::async_trait;
use risingwave_common::field_generator::FieldGeneratorImpl;

use super::generator::DatagenEventGenerator;
use crate::datagen::source::SEQUENCE_FIELD_KIND;
use crate::datagen::{DatagenProperties, DatagenSplit};
use crate::{
use crate::source::datagen::source::SEQUENCE_FIELD_KIND;
use crate::source::datagen::{DatagenProperties, DatagenSplit};
use crate::source::{
Column, ConnectorState, DataType, SourceMessage, SplitImpl, SplitMetaData, SplitReader,
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use anyhow::anyhow;
use bytes::Bytes;
use serde::{Deserialize, Serialize};

use crate::base::SplitMetaData;
use crate::source::base::SplitMetaData;

#[derive(Clone, Serialize, Deserialize, Debug, Default, PartialEq, Hash)]
pub struct DatagenSplit {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use anyhow::Result;
use async_trait::async_trait;
use futures::future;

use crate::{Column, ConnectorState, SourceMessage, SplitReader};
use crate::source::{Column, ConnectorState, SourceMessage, SplitReader};

/// [`DummySplitReader`] is a placeholder for source executor that is assigned no split. It will
/// wait forever when calling `next`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ mod test {
use itertools::Itertools;
use tokio::{sync, time};

use crate::filesystem::file_common::{
use crate::source::filesystem::file_common::{
Directory, EntryDiscover, EntryOpt, EntryOptEvent, EntryStat, StatusWatch,
};

Expand Down
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ use thiserror::Error;
use tokio::sync;
use tokio::sync::mpsc::Sender;

use crate::filesystem::file_common::{
use crate::source::filesystem::file_common::{
Directory, EntryDiscover, EntryOpt, EntryOptEvent, EntryStat, StatusWatch,
};
use crate::filesystem::s3::s3_notification_event::{NotificationEvent, NotifyEventType};
use crate::filesystem::s3::S3Properties;
use crate::source::filesystem::s3::s3_notification_event::{NotificationEvent, NotifyEventType};
use crate::source::filesystem::s3::S3Properties;

#[derive(Debug, Clone, Eq, PartialEq)]
pub enum AwsCredential {
Expand Down Expand Up @@ -478,8 +478,8 @@ pub(crate) mod test {
use aws_smithy_http::byte_stream::ByteStream;
use chrono::Utc;

use crate::filesystem::file_common::{Directory, StatusWatch};
use crate::filesystem::s3::s3_dir::{
use crate::source::filesystem::file_common::{Directory, StatusWatch};
use crate::source::filesystem::s3::s3_dir::{
new_share_config, AwsCredential, S3Directory, S3SourceBasicConfig, S3SourceConfig,
SqsReceiveMsgConfig,
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@ use tokio_util::io;
use tokio_util::io::ReaderStream;

use crate::aws_utils::{default_conn_config, s3_client, AwsConfigV2, AwsCredentialV2};
use crate::base::{SourceMessage, SplitReader};
use crate::filesystem::file_common::{EntryStat, StatusWatch};
use crate::filesystem::s3::s3_dir::FileSystemOptError::IllegalS3FilePath;
use crate::filesystem::s3::s3_dir::{
use crate::source::base::{SourceMessage, SplitReader};
use crate::source::filesystem::file_common::{EntryStat, StatusWatch};
use crate::source::filesystem::s3::s3_dir::FileSystemOptError::IllegalS3FilePath;
use crate::source::filesystem::s3::s3_dir::{
AwsCustomConfig, S3SourceBasicConfig, S3SourceConfig, SqsReceiveMsgConfig,
};
use crate::filesystem::s3::S3Properties;
use crate::{Column, ConnectorState, SplitMetaData};
use crate::source::filesystem::s3::S3Properties;
use crate::source::{Column, ConnectorState, SplitMetaData};

const MAX_CHANNEL_BUFFER_SIZE: usize = 2048;
const READ_CHUNK_SIZE: usize = 1024;
Expand Down Expand Up @@ -246,7 +246,7 @@ impl S3FileReader {
payload: read_bytes,
};
if s3_msg_sender.send(s3_inner_msg).await.is_err() {
return Err(anyhow::Error::from(crate::filesystem::s3::s3_dir::FileSystemOptError::GetS3ObjectError(
return Err(anyhow::Error::from(crate::source::filesystem::s3::s3_dir::FileSystemOptError::GetS3ObjectError(
bucket.clone(),
s3_file.clone().object.path,
)));
Expand Down Expand Up @@ -288,7 +288,7 @@ impl S3FileReader {
match get_object {
Ok(get_object_out) => Ok(get_object_out.body),
Err(sdk_err) => Err(anyhow::Error::from(
crate::filesystem::s3::s3_dir::FileSystemOptError::AwsSdkInnerError(
crate::source::filesystem::s3::s3_dir::FileSystemOptError::AwsSdkInnerError(
format!("S3 GetObject from {} error:", bucket),
sdk_err.to_string(),
),
Expand Down Expand Up @@ -375,8 +375,8 @@ impl SplitReader for S3FileReader {
#[cfg(test)]
mod test {

use crate::filesystem::s3::source::s3_file_reader::S3FileSplit;
use crate::filesystem::s3::S3Properties;
use crate::source::filesystem::s3::source::s3_file_reader::S3FileSplit;
use crate::source::filesystem::s3::S3Properties;

const TEST_REGION_NAME: &str = "cn-north-1";
const BUCKET_NAME: &str = "dd-storage-s3";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ use rdkafka::consumer::{BaseConsumer, Consumer, DefaultConsumerContext};
use rdkafka::error::KafkaResult;
use rdkafka::{Offset, TopicPartitionList};

use crate::base::SplitEnumerator;
use crate::kafka::split::KafkaSplit;
use crate::kafka::{KafkaProperties, KAFKA_SYNC_CALL_TIMEOUT};
use crate::source::base::SplitEnumerator;
use crate::source::kafka::split::KafkaSplit;
use crate::source::kafka::{KafkaProperties, KAFKA_SYNC_CALL_TIMEOUT};

#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub enum KafkaEnumeratorOffset {
Expand Down
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use bytes::Bytes;
use rdkafka::message::BorrowedMessage;
use rdkafka::Message;

use crate::base::SourceMessage;
use crate::source::base::SourceMessage;

impl<'a> From<BorrowedMessage<'a>> for SourceMessage {
fn from(message: BorrowedMessage<'a>) -> Self {
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ use rdkafka::config::RDKafkaLogLevel;
use rdkafka::consumer::{Consumer, DefaultConsumerContext, StreamConsumer};
use rdkafka::{ClientConfig, Offset, TopicPartitionList};

use crate::base::{SourceMessage, SplitReader};
use crate::kafka::split::KafkaSplit;
use crate::kafka::KafkaProperties;
use crate::{Column, ConnectorState, SplitImpl};
use crate::source::base::{SourceMessage, SplitReader};
use crate::source::kafka::split::KafkaSplit;
use crate::source::kafka::KafkaProperties;
use crate::source::{Column, ConnectorState, SplitImpl};

const KAFKA_MAX_FETCH_MESSAGES: usize = 1024;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use anyhow::anyhow;
use bytes::Bytes;
use serde::{Deserialize, Serialize};

use crate::base::SplitMetaData;
use crate::source::SplitMetaData;

#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Hash)]
pub struct KafkaSplit {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use http::Uri;
use maplit::hashmap;
use serde::{Deserialize, Serialize};

use crate::kinesis::KinesisProperties;
use crate::source::kinesis::KinesisProperties;

#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub struct AwsAssumeRole {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ use async_trait::async_trait;
use aws_sdk_kinesis::model::Shard;
use aws_sdk_kinesis::Client as kinesis_client;

use crate::base::SplitEnumerator;
use crate::kinesis::split::{KinesisOffset, KinesisSplit};
use crate::kinesis::*;
use crate::source::kinesis::split::{KinesisOffset, KinesisSplit};
use crate::source::kinesis::*;
use crate::source::SplitEnumerator;

pub struct KinesisSplitEnumerator {
stream_name: String,
Expand Down
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use aws_sdk_kinesis::model::Record;
use bytes::Bytes;
use serde::{Deserialize, Serialize};

use crate::base::SourceMessage;
use crate::source::SourceMessage;

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct KinesisMessage {
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,10 @@ use futures_concurrency::prelude::*;
use tokio::sync::Mutex;
use tokio::task::JoinHandle;

use crate::base::{SourceMessage, SplitReader};
use crate::kinesis::source::message::KinesisMessage;
use crate::kinesis::split::{KinesisOffset, KinesisSplit};
use crate::kinesis::{build_client, KinesisProperties};
use crate::{Column, ConnectorState, SplitImpl};
use crate::source::kinesis::source::message::KinesisMessage;
use crate::source::kinesis::split::{KinesisOffset, KinesisSplit};
use crate::source::kinesis::{build_client, KinesisProperties};
use crate::source::{Column, ConnectorState, SourceMessage, SplitImpl, SplitReader};

pub struct KinesisMultiSplitReader {
/// splits are not allowed to be empty, otherwise connector source should create
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use anyhow::anyhow;
use bytes::Bytes;
use serde::{Deserialize, Serialize};

use crate::base::SplitMetaData;
use crate::source::SplitMetaData;

#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, Hash)]
pub enum KinesisOffset {
Expand Down
28 changes: 28 additions & 0 deletions src/connector/src/source/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright 2022 Singularity Data
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod base;
pub mod datagen;
pub mod dummy_connector;
pub mod filesystem;
pub mod kafka;
pub mod kinesis;
pub mod nexmark;
pub mod pulsar;
pub use base::*;
pub use kafka::KAFKA_CONNECTOR;
pub use kinesis::KINESIS_CONNECTOR;
pub use nexmark::NEXMARK_CONNECTOR;

pub use crate::source::pulsar::PULSAR_CONNECTOR;
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
use std::f64::consts::PI;

use super::NEXMARK_BASE_TIME;
use crate::nexmark::NexmarkProperties;
use crate::source::nexmark::NexmarkProperties;

#[derive(PartialEq)]
enum RateShape {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@

use async_trait::async_trait;

use crate::base::SplitEnumerator;
use crate::nexmark::split::NexmarkSplit;
use crate::nexmark::NexmarkProperties;
use crate::source::nexmark::split::NexmarkSplit;
use crate::source::nexmark::NexmarkProperties;
use crate::source::SplitEnumerator;

pub struct NexmarkSplitEnumerator {
split_num: i32,
Expand Down Expand Up @@ -52,7 +52,7 @@ mod tests {
use anyhow::Result;

use super::*;
use crate::SplitMetaData;
use crate::source::SplitMetaData;

#[tokio::test]
async fn test_nexmark_split_enumerator() -> Result<()> {
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use rand::seq::SliceRandom;
use rand::{Rng, SeedableRng};
use serde::{Deserialize, Serialize};

use crate::nexmark::config::NexmarkConfig;
use crate::source::nexmark::config::NexmarkConfig;

const MIN_STRING_LENGTH: usize = 3;

Expand Down Expand Up @@ -336,7 +336,7 @@ mod tests {
use std::time::{SystemTime, UNIX_EPOCH};

use super::*;
use crate::nexmark::{NexmarkProperties, NEXMARK_BASE_TIME};
use crate::source::nexmark::{NexmarkProperties, NEXMARK_BASE_TIME};

#[test]
fn test_milli_ts_to_timestamp_string() -> Result<()> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ use std::time::{SystemTime, UNIX_EPOCH};

use anyhow::{Ok, Result};

use crate::nexmark::config::NexmarkConfig;
use crate::nexmark::source::event::{Event, EventType};
use crate::nexmark::source::message::NexmarkMessage;
use crate::SourceMessage;
use crate::source::nexmark::config::NexmarkConfig;
use crate::source::nexmark::source::event::{Event, EventType};
use crate::source::nexmark::source::message::NexmarkMessage;
use crate::source::SourceMessage;

#[derive(Clone, Debug)]
pub struct NexmarkEventGenerator {
Expand Down
Loading

0 comments on commit 369b57e

Please sign in to comment.