Skip to content

Commit

Permalink
Merge pull request #193 from ultima-ib/polars39
Browse files Browse the repository at this point in the history
Polars40
  • Loading branch information
AnatolyBuga authored May 26, 2024
2 parents 1261c88 + 9049fac commit 8895474
Show file tree
Hide file tree
Showing 65 changed files with 376 additions and 501 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/lint-test-rust-frontend.yml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ jobs:

- uses: actions-rs/toolchain@v1
with:
toolchain: stable
toolchain: nightly
override: true
profile: minimal

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/publish-python.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ permissions:
id-token: write

env:
RUST_TOOLCHAIN: stable
RUST_TOOLCHAIN: nightly
PYTHON_VERSION: '3.9'
MATURIN_VERSION: '1.2.1'

Expand Down
8 changes: 4 additions & 4 deletions .github/workflows/test-python.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ jobs:
- name: Set up Rust
uses: dtolnay/rust-toolchain@master
with:
toolchain: stable
toolchain: nightly

- name: Cache Rust
uses: Swatinem/rust-cache@v2
Expand All @@ -99,7 +99,7 @@ jobs:
RUSTFLAGS: -C debuginfo=0 # Do not produce debug symbols to keep memory usage down
run: |
source activate
maturin develop --all-features
maturin develop --all-features --release
- name: Run tests and report coverage
run: pytest --cov
Expand Down Expand Up @@ -142,7 +142,7 @@ jobs:
- name: Set up Rust
uses: dtolnay/rust-toolchain@master
with:
toolchain: stable
toolchain: nightly

- name: Cache Rust
uses: Swatinem/rust-cache@v2
Expand All @@ -168,7 +168,7 @@ jobs:
RUSTFLAGS: -C debuginfo=0 # Do not produce debug symbols to keep memory usage down
# we do build and install to better mimic real life
run: |
maturin build --all-features
maturin build --all-features --release
pip install ../target/wheels/ultibi-*.whl
- name: Run tests
Expand Down
9 changes: 5 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ exclude = ['templates']
[workspace.dependencies]
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
polars = { version = "0.35.4", features = [
# git = "https://github.com/pola-rs/polars", tag="py-0.20.27"
polars = { version = "0.40.0", features = [
"performant",
"strings",
"ndarray",
Expand All @@ -31,9 +32,9 @@ polars = { version = "0.35.4", features = [
"diagonal_concat",
"serde-lazy"
] }
polars-plan = { version = "0.35.4", features = ["ffi_plugin"]}
polars-arrow = {version = "0.35.4", features=["arrow_rs"]}
arrow-array = "49"
polars-plan = { version = "0.40.0", features = ["ffi_plugin"]}
polars-arrow = {version = "0.40.0", features=["arrow_rs"]}
arrow-array = "51" # must match connector-x
toml = "0.8.8"
once_cell = "1.12"
dashmap = "5.4.0"
Expand Down
4 changes: 2 additions & 2 deletions frtb_engine/src/drc/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ pub fn drc_scalinng(dc: Option<&String>, format: Option<&String>) -> Expr {
apply_multiple(
move |columns| {
let z = columns[0]
.utf8()?
.str()?
.as_date(format.as_deref(), false)?
.as_date_iter()
.zip(
columns[1]
.utf8()?
.str()?
.as_date(format.as_deref(), false)?
.as_date_iter(),
)
Expand Down
6 changes: 4 additions & 2 deletions frtb_engine/src/drc/drc_nonsec.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::prelude::*;
use polars::chunked_array::ops::SortMultipleOptions;
use ultibi::{
polars::prelude::{
apply_multiple, df, ChunkApply, DataType, GetOutput, IntoSeries, NamedFromOwned,
Expand Down Expand Up @@ -93,8 +94,9 @@ fn drc_nonsec_charge_calculator(rtrn: ReturnMetric, offset: bool, weights: Expr)
// This shouldn't be a problem since we sum positions (netShort netLong) anyway,
// And THEN apply CreditQuality weights, BECAUSE Obligor - CreditQuality should be 1to1 map
if offset {
let sort_options = SortMultipleOptions::default().with_maintain_order(true);
lf = lf
.sort_by_exprs(&[col("rft")], [false], false, true)
.sort_by_exprs(&[col("rft")], sort_options)
.group_by(["b", "rf"])
.apply(
|mut df| {
Expand Down Expand Up @@ -222,7 +224,7 @@ fn drc_nonsec_charge_calculator(rtrn: ReturnMetric, offset: bool, weights: Expr)
col("RiskFactor"),
col("SeniorityRank"),
col("GrossJTD"),
weights.list().get(lit(0)),
weights.list().get(lit(0), false),
col("ScaleFactor"),
],
GetOutput::from_type(DataType::Float64),
Expand Down
2 changes: 1 addition & 1 deletion frtb_engine/src/drc/drc_secnonctp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ fn drc_secnonctp_charge_calculator(rtrn: ReturnMetric) -> Expr {
col("RiskFactorType"), //Seniority
col("Tranche"),
col("GrossJTD"),
col("SensWeights").list().get(lit(0)),
col("SensWeights").list().get(lit(0), false),
col("ScaleFactor"),
],
GetOutput::from_type(DataType::Float64),
Expand Down
15 changes: 13 additions & 2 deletions frtb_engine/src/drc/drc_weights.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,17 @@
use polars::prelude::*;
use std::collections::BTreeMap;

use polars::{
datatypes::StringChunked,
df,
lazy::{
dsl::{col, concat_list},
frame::LazyFrame,
},
prelude::{JoinType, NamedFrom, NamedFromOwned},
series::{IntoSeries, Series},
};
use ultibi::{DataFrame, IntoLazy};

pub(crate) fn dcr_nonsec_default_weights() -> DataFrame {
let s0 = Series::new("AAA", &[0.005]);
let s1 = Series::new("AA", &[0.02]);
Expand Down Expand Up @@ -89,7 +100,7 @@ pub(crate) fn drc_secnonctp_weights_frame() -> DataFrame {
.into_iter()
.map(|(k, v)| (k.to_string(), v / 100.))
.unzip();
let seniority_arr = Utf8Chunked::from_iter(key);
let seniority_arr = StringChunked::from_iter(key);
df![
"Key" => seniority_arr.into_series(),
"RiskWeightDRC" => Series::from_vec("RiskWeight",weight),
Expand Down
4 changes: 2 additions & 2 deletions frtb_engine/src/helpers.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#![allow(clippy::unnecessary_lazy_evaluations)]

use ndarray::Array2;
use polars::prelude::{BooleanType, ChunkedArray, PolarsError, Utf8Type};
use polars::prelude::{BooleanType, ChunkedArray, PolarsError, StringType};
use serde::{Deserialize, Serialize};
use ultibi::{PolarsResult, CPM};

Expand Down Expand Up @@ -150,7 +150,7 @@ pub(crate) enum ReturnMetric {
Hbr,
}

pub fn first_appearance(ca: &ChunkedArray<Utf8Type>) -> ChunkedArray<BooleanType> {
pub fn first_appearance(ca: &ChunkedArray<StringType>) -> ChunkedArray<BooleanType> {
let mut unique_values = std::collections::HashSet::new();

ca.into_iter()
Expand Down
110 changes: 67 additions & 43 deletions frtb_engine/src/risk_weights.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@
use crate::drc::drc_weights;
use once_cell::sync::OnceCell;
use polars::io::csv::read::CsvReadOptions;
use std::collections::BTreeMap;
use std::path::Path;
use ultibi::polars::prelude::concat_lf_diagonal;
use ultibi::polars::prelude::{
col, concat_list, concat_str, df, lit, CsvReader, DataFrame, DataType, Expr, GetOutput,
IntoLazy, IntoSeries, JoinType, LazyFrame, NamedFrom, PolarsError, PolarsResult, SerReader,
Series, Utf8NameSpaceImpl,
col, concat_list, concat_str, df, lit, DataFrame, DataType, Expr, GetOutput, IntoLazy,
IntoSeries, JoinType, LazyFrame, NamedFrom, PolarsError, PolarsResult, SerReader, Series,
StringNameSpaceImpl,
};

static FX_SPECIAL_DELTA_FULL_RW: OnceCell<LazyFrame> = OnceCell::new();
Expand Down Expand Up @@ -60,40 +62,40 @@ pub fn weights_assign(
) -> PolarsResult<LazyFrame> {
// check columns. Some of the cast weights files must contain these:
let check_columns0 = [
col("RiskClass").cast(DataType::Utf8),
col("RiskCategory").cast(DataType::Utf8),
col("RiskFactorType").cast(DataType::Utf8),
col("Weights").cast(DataType::Utf8),
col("RiskClass").cast(DataType::String),
col("RiskCategory").cast(DataType::String),
col("RiskFactorType").cast(DataType::String),
col("Weights").cast(DataType::String),
];
let check_columns_rc_rcat_b_w = [
col("RiskClass").cast(DataType::Utf8),
col("RiskCategory").cast(DataType::Utf8),
col("BucketBCBS").cast(DataType::Utf8),
col("Weights").cast(DataType::Utf8),
col("RiskClass").cast(DataType::String),
col("RiskCategory").cast(DataType::String),
col("BucketBCBS").cast(DataType::String),
col("Weights").cast(DataType::String),
];
let check_columns_rcat_rc_rft_b_w = [
col("RiskClass").cast(DataType::Utf8),
col("RiskCategory").cast(DataType::Utf8),
col("BucketBCBS").cast(DataType::Utf8),
col("Weights").cast(DataType::Utf8),
col("RiskFactorType").cast(DataType::Utf8),
col("RiskClass").cast(DataType::String),
col("RiskCategory").cast(DataType::String),
col("BucketBCBS").cast(DataType::String),
col("Weights").cast(DataType::String),
col("RiskFactorType").cast(DataType::String),
];
let check_columns_rc_rcat_w = [
col("RiskClass").cast(DataType::Utf8),
col("RiskCategory").cast(DataType::Utf8),
col("Weights").cast(DataType::Utf8),
col("RiskClass").cast(DataType::String),
col("RiskCategory").cast(DataType::String),
col("Weights").cast(DataType::String),
];
let check_columns4 = [
col("RiskClass").cast(DataType::Utf8),
col("RiskCategory").cast(DataType::Utf8),
col("CreditQuality").cast(DataType::Utf8),
col("Weights").cast(DataType::Utf8),
col("RiskClass").cast(DataType::String),
col("RiskCategory").cast(DataType::String),
col("CreditQuality").cast(DataType::String),
col("Weights").cast(DataType::String),
];
let check_columns_key_rc_rcat_drcrw = [
col("RiskClass").cast(DataType::Utf8),
col("RiskCategory").cast(DataType::Utf8),
col("Key").cast(DataType::Utf8),
col("RiskWeightDRC").cast(DataType::Utf8),
col("RiskClass").cast(DataType::String),
col("RiskCategory").cast(DataType::String),
col("Key").cast(DataType::String),
col("RiskWeightDRC").cast(DataType::String),
];

// FX - can't be put into a frame due to regex requirement
Expand Down Expand Up @@ -575,8 +577,29 @@ pub fn weight_assign_logic(lf: LazyFrame, weights: SensWeightsConfig) -> PolarsR
lf1 = lf1.with_column(
col("BucketBCBS")
.map(
|s| Ok(Some(s.utf8()?.str_slice(0, Some(3)).into_series())),
GetOutput::from_type(DataType::Utf8),
|s| {
Ok(Some(
s.str()?
.str_slice(
&Series::from_any_values_and_dtype(
"offset",
&[0.into()],
&DataType::Int64,
true,
)
.unwrap(),
&Series::from_any_values_and_dtype(
"offset",
&[3.into()],
&DataType::UInt64,
true,
)
.unwrap(),
)? // shall not fail
.into_series(),
))
},
GetOutput::from_type(DataType::String),
)
.alias("Bucket"),
);
Expand Down Expand Up @@ -632,8 +655,8 @@ pub fn weight_assign_logic(lf: LazyFrame, weights: SensWeightsConfig) -> PolarsR
col("RiskClass"),
col("RiskCategory"),
col("CreditQuality").map(
|s| Ok(Some(s.utf8()?.to_uppercase().into_series())),
GetOutput::from_type(DataType::Utf8),
|s| Ok(Some(s.str()?.to_uppercase().into_series())),
GetOutput::from_type(DataType::String),
),
];
let mut lf1 = lf1.join(
Expand All @@ -656,15 +679,16 @@ pub fn weight_assign_logic(lf: LazyFrame, weights: SensWeightsConfig) -> PolarsR
concat_str(
[
col("CreditQuality").map(
|s| Ok(Some(s.utf8()?.to_uppercase().into_series())),
GetOutput::from_type(DataType::Utf8),
|s| Ok(Some(s.str()?.to_uppercase().into_series())),
GetOutput::from_type(DataType::String),
),
col("RiskFactorType").map(
|s| Ok(Some(s.utf8()?.to_uppercase().into_series())),
GetOutput::from_type(DataType::Utf8),
|s| Ok(Some(s.str()?.to_uppercase().into_series())),
GetOutput::from_type(DataType::String),
),
],
"_",
true,
)
.alias("Key"),
);
Expand Down Expand Up @@ -787,18 +811,18 @@ pub(crate) fn girr_infl_xccy_weights_frame(
/// If not tries to serialise it
/// Checks for expected columns
pub fn frame_from_path_or_str(
some_str: &str,
path_or_serialised_df: &str,
check_columns: &[Expr],
cast_to_lst_f64: &str,
) -> PolarsResult<DataFrame> {
let df = if let Ok(csv) = CsvReader::from_path(some_str) {
csv.has_header(true)
.finish()?
.lazy()
.select(check_columns)
.collect()
let df = if Path::new(path_or_serialised_df).exists() {
CsvReadOptions::default()
.with_has_header(true)
.try_into_reader_with_file_path(Some(path_or_serialised_df.into()))
.unwrap() // path exists, we shouldn't panic
.finish()
} else {
serde_json::from_str::<DataFrame>(some_str)
serde_json::from_str::<DataFrame>(path_or_serialised_df)
.map_err(|_| PolarsError::InvalidOperation("couldn't serialise string to frame".into()))
.and_then(|df| df.lazy().select(check_columns).collect())
}?;
Expand Down
22 changes: 11 additions & 11 deletions frtb_engine/src/risk_weights_crr2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use once_cell::sync::OnceCell;
use ultibi::polars::prelude::concat_lf_diagonal;
use ultibi::polars::prelude::{
col, DataType, GetOutput, IntoLazy, IntoSeries, JoinType, LazyFrame, NamedFrom, PolarsResult,
Series, Utf8NameSpaceImpl,
Series, StringNameSpaceImpl,
};

use crate::{
Expand All @@ -33,16 +33,16 @@ pub fn weights_assign_crr2(
build_params: &BTreeMap<String, String>,
) -> PolarsResult<LazyFrame> {
let check_columns = [
col("RiskClass").cast(DataType::Utf8),
col("RiskCategory").cast(DataType::Utf8),
col("BucketCRR2").cast(DataType::Utf8),
col("WeightsCRR2").cast(DataType::Utf8),
col("RiskClass").cast(DataType::String),
col("RiskCategory").cast(DataType::String),
col("BucketCRR2").cast(DataType::String),
col("WeightsCRR2").cast(DataType::String),
];
let check_columns4 = [
col("RiskClass").cast(DataType::Utf8),
col("RiskCategory").cast(DataType::Utf8),
col("CreditQuality").cast(DataType::Utf8),
col("WeightsCRR2").cast(DataType::Utf8),
col("RiskClass").cast(DataType::String),
col("RiskCategory").cast(DataType::String),
col("CreditQuality").cast(DataType::String),
col("WeightsCRR2").cast(DataType::String),
];

let csr_nonsec_weights_crr2 = [
Expand Down Expand Up @@ -141,8 +141,8 @@ pub fn weight_assign_logic_crr2(
col("RiskClass"),
col("RiskCategory"),
col("CreditQuality").map(
|s| Ok(Some(s.utf8()?.to_uppercase().into_series())),
GetOutput::from_type(DataType::Utf8),
|s| Ok(Some(s.str()?.to_uppercase().into_series())),
GetOutput::from_type(DataType::String),
),
];
let mut lf1 = lf1.join(
Expand Down
Loading

0 comments on commit 8895474

Please sign in to comment.