Skip to content

Commit

Permalink
fix: pass cas api cpus explicitly to the process
Browse files Browse the repository at this point in the history
  • Loading branch information
smrz2001 committed Jul 9, 2024
1 parent 7b659c0 commit 9691ce3
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 8 deletions.
25 changes: 24 additions & 1 deletion operator/src/network/cas.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::BTreeMap;
use std::{cmp, collections::BTreeMap};

use k8s_openapi::{
api::{
Expand Down Expand Up @@ -317,6 +317,14 @@ pub fn cas_stateful_set_spec(
]
.concat();

let num_api_cpus = config
.cas_resource_limits
.cpu
.clone()
.and_then(|cpu_quantity| quantity_to_milli(&cpu_quantity))
.map(|cpu_millis| cmp::max(cpu_millis / 1000, 1))
.unwrap_or(1);

let mut cas_api_env = [
cas_node_env.clone(),
vec![
Expand All @@ -335,6 +343,11 @@ pub fn cas_stateful_set_spec(
value: Some(DEFAULT_METRICS_PORT.to_string()),
..Default::default()
},
EnvVar {
name: "MULTIPROCESS_SIZE".to_owned(),
value: Some(num_api_cpus.to_string()),
..Default::default()
},
],
]
.concat();
Expand Down Expand Up @@ -548,6 +561,16 @@ pub fn cas_stateful_set_spec(
..Default::default()
}
}

fn quantity_to_milli(quantity: &Quantity) -> Option<i64> {
if let Some(value) = quantity.0.strip_suffix("m") {
return value.parse::<i64>().ok();
} else if let Ok(value) = quantity.0.parse::<f64>() {
return Some((value * 1000.0) as i64);
}
None
}

pub fn cas_service_spec() -> ServiceSpec {
ServiceSpec {
ports: Some(vec![ServicePort {
Expand Down
107 changes: 100 additions & 7 deletions operator/src/network/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2848,7 +2848,7 @@ mod tests {
stub.cas_stateful_set.patch(expect![[r#"
--- original
+++ modified
@@ -134,8 +134,8 @@
@@ -138,8 +138,8 @@
"value": "http://localstack:4566/000000000000/cas-anchor-dev-"
}
],
Expand All @@ -2859,7 +2859,7 @@ mod tests {
"name": "cas-api",
"ports": [
{
@@ -276,8 +276,8 @@
@@ -280,8 +280,8 @@
"value": "1000"
}
],
Expand All @@ -2870,7 +2870,7 @@ mod tests {
"name": "cas-worker",
"resources": {
"limits": {
@@ -446,8 +446,8 @@
@@ -450,8 +450,8 @@
"value": "dev"
}
],
Expand Down Expand Up @@ -2962,7 +2962,7 @@ mod tests {
stub.cas_stateful_set.patch(expect![[r#"
--- original
+++ modified
@@ -144,12 +144,12 @@
@@ -148,12 +148,12 @@
],
"resources": {
"limits": {
Expand All @@ -2977,7 +2977,7 @@ mod tests {
"ephemeral-storage": "1Gi",
"memory": "1Gi"
}
@@ -281,12 +281,12 @@
@@ -285,12 +285,12 @@
"name": "cas-worker",
"resources": {
"limits": {
Expand All @@ -2992,7 +2992,7 @@ mod tests {
"ephemeral-storage": "1Gi",
"memory": "1Gi"
}
@@ -369,12 +369,12 @@
@@ -373,12 +373,12 @@
"name": "cas-scheduler",
"resources": {
"limits": {
Expand All @@ -3007,7 +3007,7 @@ mod tests {
"ephemeral-storage": "1Gi",
"memory": "1Gi"
}
@@ -474,7 +474,7 @@
@@ -478,7 +478,7 @@
],
"resources": {
"requests": {
Expand Down Expand Up @@ -4702,6 +4702,99 @@ mod tests {
timeout_after_1s(mocksrv).await;
}
#[tokio::test]
async fn cas_api_env_cpu_override() {
// Setup network spec and status
let network = Network::test().with_spec(NetworkSpec {
cas: Some(CasSpec {
cas_resource_limits: Some(ResourceLimitsSpec {
cpu: Some(Quantity("2.5".to_owned())),
..Default::default()
}),
..Default::default()
}),
..Default::default()
});
let mock_rpc_client = default_ipfs_rpc_mock();
let mut stub = Stub::default().with_network(network.clone());
stub.cas_stateful_set.patch(expect![[r#"
--- original
+++ modified
@@ -119,7 +119,7 @@
},
{
"name": "MULTIPROCESS_SIZE",
- "value": "1"
+ "value": "2"
},
{
"name": "NODE_ENV",
@@ -148,14 +148,12 @@
],
"resources": {
"limits": {
- "cpu": "250m",
- "ephemeral-storage": "1Gi",
- "memory": "1Gi"
+ "cpu": "2.5",
+ "ephemeral-storage": "1Gi"
},
"requests": {
- "cpu": "250m",
- "ephemeral-storage": "1Gi",
- "memory": "1Gi"
+ "cpu": "2.5",
+ "ephemeral-storage": "1Gi"
}
}
},
@@ -285,14 +283,12 @@
"name": "cas-worker",
"resources": {
"limits": {
- "cpu": "250m",
- "ephemeral-storage": "1Gi",
- "memory": "1Gi"
+ "cpu": "2.5",
+ "ephemeral-storage": "1Gi"
},
"requests": {
- "cpu": "250m",
- "ephemeral-storage": "1Gi",
- "memory": "1Gi"
+ "cpu": "2.5",
+ "ephemeral-storage": "1Gi"
}
}
},
@@ -373,14 +369,12 @@
"name": "cas-scheduler",
"resources": {
"limits": {
- "cpu": "250m",
- "ephemeral-storage": "1Gi",
- "memory": "1Gi"
+ "cpu": "2.5",
+ "ephemeral-storage": "1Gi"
},
"requests": {
- "cpu": "250m",
- "ephemeral-storage": "1Gi",
- "memory": "1Gi"
+ "cpu": "2.5",
+ "ephemeral-storage": "1Gi"
}
}
}
"#]]);
let (testctx, api_handle) = Context::test(mock_rpc_client);
let fakeserver = ApiServerVerifier::new(api_handle);
let mocksrv = stub.run(fakeserver);
reconcile(Arc::new(network), testctx)
.await
.expect("reconciler");
timeout_after_1s(mocksrv).await;
}
#[tokio::test]
#[traced_test]
async fn migration_cmd() {
// Setup network spec and status
Expand Down
4 changes: 4 additions & 0 deletions operator/src/network/testdata/default_stubs/cas_stateful_set
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ Request {
"name": "METRICS_PORT",
"value": "9464"
},
{
"name": "MULTIPROCESS_SIZE",
"value": "1"
},
{
"name": "NODE_ENV",
"value": "dev"
Expand Down

0 comments on commit 9691ce3

Please sign in to comment.