Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 41 additions & 19 deletions rust/operator-binary/src/airflow_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ use stackable_operator::{
DeepMerge,
},
kube::{
core::{error_boundary, DeserializeGuard},
runtime::{controller::Action, reflector::ObjectRef},
Resource, ResourceExt,
},
Expand Down Expand Up @@ -289,6 +290,11 @@ pub enum Error {
"failed to write to String (Vec<u8> to be precise) containing Airflow config"
))]
WriteToConfigFileString { source: std::io::Error },

#[snafu(display("AirflowCluster object is invalid"))]
InvalidAirflowCluster {
source: error_boundary::InvalidObject,
},
}

type Result<T, E = Error> = std::result::Result<T, E>;
Expand All @@ -299,9 +305,18 @@ impl ReconcilerError for Error {
}
}

pub async fn reconcile_airflow(airflow: Arc<AirflowCluster>, ctx: Arc<Ctx>) -> Result<Action> {
pub async fn reconcile_airflow(
airflow: Arc<DeserializeGuard<AirflowCluster>>,
ctx: Arc<Ctx>,
) -> Result<Action> {
tracing::info!("Starting reconcile");

let airflow = airflow
.0
.as_ref()
.map_err(error_boundary::InvalidObject::clone)
.context(InvalidAirflowClusterSnafu)?;

let client = &ctx.client;
let resolved_product_image: ResolvedProductImage = airflow
.spec
Expand Down Expand Up @@ -338,7 +353,7 @@ pub async fn reconcile_airflow(airflow: Arc<AirflowCluster>, ctx: Arc<Ctx>) -> R
}
}

let role_config = transform_all_roles_to_config::<AirflowConfigFragment, _>(&airflow, roles);
let role_config = transform_all_roles_to_config::<AirflowConfigFragment, _>(airflow, roles);
let validated_role_config = validate_all_roles_and_groups_config(
&resolved_product_image.product_version,
&role_config.context(ProductConfigTransformSnafu)?,
Expand All @@ -350,7 +365,7 @@ pub async fn reconcile_airflow(airflow: Arc<AirflowCluster>, ctx: Arc<Ctx>) -> R

let vector_aggregator_address = resolve_vector_aggregator_address(
client,
airflow.as_ref(),
airflow,
airflow
.spec
.cluster_config
Expand All @@ -374,8 +389,7 @@ pub async fn reconcile_airflow(airflow: Arc<AirflowCluster>, ctx: Arc<Ctx>) -> R
.context(BuildLabelSnafu)?;

let (rbac_sa, rbac_rolebinding) =
build_rbac_resources(airflow.as_ref(), APP_NAME, required_labels)
.context(BuildRBACObjectsSnafu)?;
build_rbac_resources(airflow, APP_NAME, required_labels).context(BuildRBACObjectsSnafu)?;

let rbac_sa = cluster_resources
.add(client, rbac_sa)
Expand All @@ -397,7 +411,7 @@ pub async fn reconcile_airflow(airflow: Arc<AirflowCluster>, ctx: Arc<Ctx>) -> R
} = &airflow_executor
{
build_executor_template(
&airflow,
airflow,
common_configuration,
&resolved_product_image,
&authentication_config,
Expand All @@ -418,7 +432,7 @@ pub async fn reconcile_airflow(airflow: Arc<AirflowCluster>, ctx: Arc<Ctx>) -> R
// some roles will only run "internally" and do not need to be created as services
if let Some(resolved_port) = role_port(role_name) {
let role_service =
build_role_service(&airflow, &resolved_product_image, role_name, resolved_port)?;
build_role_service(airflow, &resolved_product_image, role_name, resolved_port)?;
cluster_resources
.add(client, role_service)
.await
Expand All @@ -427,7 +441,7 @@ pub async fn reconcile_airflow(airflow: Arc<AirflowCluster>, ctx: Arc<Ctx>) -> R

for (rolegroup_name, rolegroup_config) in role_config.iter() {
let rolegroup = RoleGroupRef {
cluster: ObjectRef::from_obj(&*airflow),
cluster: ObjectRef::from_obj(airflow),
role: role_name.into(),
role_group: rolegroup_name.into(),
};
Expand All @@ -436,16 +450,15 @@ pub async fn reconcile_airflow(airflow: Arc<AirflowCluster>, ctx: Arc<Ctx>) -> R
.merged_config(&airflow_role, &rolegroup)
.context(FailedToResolveConfigSnafu)?;

let rg_service =
build_rolegroup_service(&airflow, &resolved_product_image, &rolegroup)?;
let rg_service = build_rolegroup_service(airflow, &resolved_product_image, &rolegroup)?;
cluster_resources.add(client, rg_service).await.context(
ApplyRoleGroupServiceSnafu {
rolegroup: rolegroup.clone(),
},
)?;

let rg_statefulset = build_server_rolegroup_statefulset(
&airflow,
airflow,
&resolved_product_image,
&airflow_role,
&rolegroup,
Expand All @@ -466,7 +479,7 @@ pub async fn reconcile_airflow(airflow: Arc<AirflowCluster>, ctx: Arc<Ctx>) -> R
);

let rg_configmap = build_rolegroup_config_map(
&airflow,
airflow,
&resolved_product_image,
&rolegroup,
rolegroup_config,
Expand All @@ -488,7 +501,7 @@ pub async fn reconcile_airflow(airflow: Arc<AirflowCluster>, ctx: Arc<Ctx>) -> R
pod_disruption_budget: pdb,
}) = role_config
{
add_pdbs(pdb, &airflow, &airflow_role, client, &mut cluster_resources)
add_pdbs(pdb, airflow, &airflow_role, client, &mut cluster_resources)
.await
.context(FailedToCreatePdbSnafu)?;
}
Expand All @@ -501,13 +514,13 @@ pub async fn reconcile_airflow(airflow: Arc<AirflowCluster>, ctx: Arc<Ctx>) -> R

let status = AirflowClusterStatus {
conditions: compute_conditions(
airflow.as_ref(),
airflow,
&[&ss_cond_builder, &cluster_operation_cond_builder],
),
};

client
.apply_patch_status(OPERATOR_NAME, &*airflow, &status)
.apply_patch_status(OPERATOR_NAME, airflow, &status)
.await
.context(ApplyStatusSnafu)?;

Expand All @@ -516,7 +529,7 @@ pub async fn reconcile_airflow(airflow: Arc<AirflowCluster>, ctx: Arc<Ctx>) -> R

#[allow(clippy::too_many_arguments)]
async fn build_executor_template(
airflow: &Arc<AirflowCluster>,
airflow: &AirflowCluster,
common_config: &CommonConfiguration<ExecutorConfigFragment>,
resolved_product_image: &ResolvedProductImage,
authentication_config: &Vec<AirflowAuthenticationConfigResolved>,
Expand All @@ -529,7 +542,7 @@ async fn build_executor_template(
.merged_executor_config(&common_config.config)
.context(FailedToResolveConfigSnafu)?;
let rolegroup = RoleGroupRef {
cluster: ObjectRef::from_obj(&**airflow),
cluster: ObjectRef::from_obj(airflow),
role: "executor".into(),
role_group: "kubernetes".into(),
};
Expand Down Expand Up @@ -1223,8 +1236,17 @@ fn build_gitsync_container(
Ok(gitsync_container)
}

pub fn error_policy(_obj: Arc<AirflowCluster>, _error: &Error, _ctx: Arc<Ctx>) -> Action {
Action::requeue(*Duration::from_secs(5))
pub fn error_policy(
_obj: Arc<DeserializeGuard<AirflowCluster>>,
error: &Error,
_ctx: Arc<Ctx>,
) -> Action {
match error {
// root object is invalid, will be requeued when modified anyway
Error::InvalidAirflowCluster { .. } => Action::await_change(),

_ => Action::requeue(*Duration::from_secs(10)),
}
}

fn add_authentication_volumes_and_volume_mounts(
Expand Down
46 changes: 27 additions & 19 deletions rust/operator-binary/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,13 @@ use crate::airflow_controller::AIRFLOW_CONTROLLER_NAME;

use clap::{crate_description, crate_version, Parser};
use futures::StreamExt;
use stackable_airflow_crd::{
authentication::AirflowAuthentication, AirflowCluster, APP_NAME, OPERATOR_NAME,
};
use stackable_airflow_crd::{AirflowCluster, APP_NAME, OPERATOR_NAME};
use stackable_operator::{
cli::{Command, ProductOperatorRun},
commons::authentication::AuthenticationClass,
k8s_openapi::api::{apps::v1::StatefulSet, core::v1::Service},
kube::{
core::DeserializeGuard,
runtime::{reflector::ObjectRef, watcher, Controller},
ResourceExt,
},
Expand Down Expand Up @@ -72,7 +71,7 @@ async fn main() -> anyhow::Result<()> {
stackable_operator::client::create_client(Some(OPERATOR_NAME.to_string())).await?;

let airflow_controller_builder = Controller::new(
watch_namespace.get_api::<AirflowCluster>(&client),
watch_namespace.get_api::<DeserializeGuard<AirflowCluster>>(&client),
watcher::Config::default(),
);

Expand All @@ -88,17 +87,14 @@ async fn main() -> anyhow::Result<()> {
)
.shutdown_on_signal()
.watches(
client.get_api::<AuthenticationClass>(&()),
client.get_api::<DeserializeGuard<AuthenticationClass>>(&()),
watcher::Config::default(),
move |authentication_class| {
airflow_store_1
.state()
.into_iter()
.filter(move |airflow: &Arc<AirflowCluster>| {
references_authentication_class(
&airflow.spec.cluster_config.authentication,
&authentication_class,
)
.filter(move |airflow: &Arc<DeserializeGuard<AirflowCluster>>| {
references_authentication_class(airflow, &authentication_class)
})
.map(|airflow| ObjectRef::from_obj(&*airflow))
},
Expand Down Expand Up @@ -127,15 +123,27 @@ async fn main() -> anyhow::Result<()> {
}

fn references_authentication_class(
authentication_config: &AirflowAuthentication,
authentication_class: &AuthenticationClass,
airflow: &DeserializeGuard<AirflowCluster>,
authentication_class: &DeserializeGuard<AuthenticationClass>,
) -> bool {
assert!(authentication_class.metadata.name.is_some());
airflow
.0
.as_ref()
.expect("AirflowCluster object is invalid");
authentication_class
.0
.as_ref()
.expect("AuthenticationClass object is invalid");

authentication_config
.authentication_class_names()
.into_iter()
.filter(|c| *c == authentication_class.name_any())
.count()
> 0
if let Ok(airflow_cluster) = airflow.0.as_ref() {
let authentication_config = airflow_cluster.spec.clone().cluster_config.authentication;
authentication_config
.authentication_class_names()
.into_iter()
.filter(|c| *c == authentication_class.name_any())
.count()
> 0
} else {
false
}
}
Loading