Skip to content

Commit

Permalink
Check arguments and function decorated with @task
Browse files Browse the repository at this point in the history
  • Loading branch information
sunank200 committed Jan 6, 2025
1 parent 72a9dd3 commit fb8b300
Show file tree
Hide file tree
Showing 3 changed files with 499 additions and 288 deletions.
Original file line number Diff line number Diff line change
@@ -1,9 +1,44 @@
import pendulum
from airflow.models import DAG
from airflow.operators.dummy import DummyOperator
from datetime import datetime
from airflow.plugins_manager import AirflowPlugin
from airflow.decorators import task, get_current_context
from airflow.models.baseoperator import BaseOperator
from airflow.decorators import dag, task
from airflow.providers.standard.operators.python import PythonOperator


def access_invalid_key_in_context(**context):
print("access invalid key", context["conf"])


@task
def access_invalid_key_task_out_of_dag(**context):
print("access invalid key", context.get("conf"))



@dag(
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=[""],
)
def invalid_dag():
@task()
def access_invalid_key_task(**context):
print("access invalid key", context.get("conf"))

task1 = PythonOperator(
task_id="task1",
python_callable=access_invalid_key_in_context,
)
access_invalid_key_task() >> task1
access_invalid_key_task_out_of_dag()


invalid_dag()

@task
def print_config(**context):
Expand Down Expand Up @@ -74,3 +109,16 @@ def execute(self, context):
tomorrow_ds = context["tomorrow_ds"]
yesterday_ds = context["yesterday_ds"]
yesterday_ds_nodash = context["yesterday_ds_nodash"]

@task
def access_invalid_argument_task_out_of_dag(execution_date, **context):
print("execution date", execution_date)
print("access invalid key", context.get("conf"))

@task(task_id="print_the_context")
def print_context(ds=None, **kwargs):
"""Print the Airflow context and ds variable from the context."""
print(ds)
print(kwargs.get("tomorrow_ds"))

run_this = print_context()
133 changes: 115 additions & 18 deletions crates/ruff_linter/src/rules/airflow/rules/removal_in_3.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
use crate::checkers::ast::Checker;
use ruff_diagnostics::{Diagnostic, Edit, Fix, FixAvailability, Violation};
use ruff_macros::{derive_message_formats, ViolationMetadata};
use ruff_python_ast::helpers::map_callable;
use ruff_python_ast::{
name::QualifiedName, Arguments, Expr, ExprAttribute, ExprCall, ExprContext, ExprName,
ExprStringLiteral, ExprSubscript, StmtClassDef,
ExprStringLiteral, ExprSubscript, Stmt, StmtClassDef, StmtFunctionDef,
};
use ruff_python_semantic::analyze::typing;
use ruff_python_semantic::Modules;
use ruff_python_semantic::ScopeKind;
use ruff_text_size::Ranged;
use ruff_text_size::TextRange;

use crate::checkers::ast::Checker;

/// ## What it does
/// Checks for uses of deprecated Airflow functions and values.
///
Expand Down Expand Up @@ -71,6 +71,21 @@ impl Violation for Airflow3Removal {
}
}

const REMOVED_CONTEXT_KEYS: [&str; 12] = [
"conf",
"execution_date",
"next_ds",
"next_ds_nodash",
"next_execution_date",
"prev_ds",
"prev_ds_nodash",
"prev_execution_date",
"prev_execution_date_success",
"tomorrow_ds",
"yesterday_ds",
"yesterday_ds_nodash",
];

fn extract_name_from_slice(slice: &Expr) -> Option<String> {
match slice {
Expr::StringLiteral(ExprStringLiteral { value, .. }) => Some(value.to_string()),
Expand All @@ -79,21 +94,6 @@ fn extract_name_from_slice(slice: &Expr) -> Option<String> {
}

pub(crate) fn removed_context_variable(checker: &mut Checker, expr: &Expr) {
const REMOVED_CONTEXT_KEYS: [&str; 12] = [
"conf",
"execution_date",
"next_ds",
"next_ds_nodash",
"next_execution_date",
"prev_ds",
"prev_ds_nodash",
"prev_execution_date",
"prev_execution_date_success",
"tomorrow_ds",
"yesterday_ds",
"yesterday_ds_nodash",
];

if let Expr::Subscript(ExprSubscript { value, slice, .. }) = expr {
if let Expr::Name(ExprName { id, .. }) = &**value {
if id.as_str() == "context" {
Expand Down Expand Up @@ -144,6 +144,7 @@ pub(crate) fn removed_in_3(checker: &mut Checker, expr: &Expr) {
check_call_arguments(checker, &qualname, arguments);
};
check_method(checker, call_expr);
check_context_get(checker, call_expr);
}
Expr::Attribute(attribute_expr @ ExprAttribute { attr, .. }) => {
check_name(checker, expr, attr.range());
Expand Down Expand Up @@ -307,6 +308,50 @@ fn check_class_attribute(checker: &mut Checker, attribute_expr: &ExprAttribute)
}
}

/// Check whether a removed context key is access through context.get("key").
///
/// ```python
/// from airflow.decorators import task
///
///
/// @task
/// def access_invalid_key_task_out_of_dag(**context):
/// print("access invalid key", context.get("conf"))
/// ```
fn check_context_get(checker: &mut Checker, call_expr: &ExprCall) {
if is_task_context_referenced(checker, &call_expr.func) {
return;
}

let Expr::Attribute(ExprAttribute { value, attr, .. }) = &*call_expr.func else {
return;
};

if !value
.as_name_expr()
.is_some_and(|name| matches!(name.id.as_str(), "context" | "kwargs"))
{
return;
}

if attr.as_str() != "get" {
return;
}

for removed_key in REMOVED_CONTEXT_KEYS {
if let Some(argument) = call_expr.arguments.find_argument_value(removed_key, 0) {
checker.diagnostics.push(Diagnostic::new(
Airflow3Removal {
deprecated: removed_key.to_string(),
replacement: Replacement::None,
},
argument.range(),
));
return;
}
}
}

/// Check whether a removed Airflow class method is called.
///
/// For example:
Expand Down Expand Up @@ -909,3 +954,55 @@ fn is_airflow_builtin_or_provider(segments: &[&str], module: &str, symbol_suffix
_ => false,
}
}

fn is_task_context_referenced(checker: &mut Checker, expr: &Expr) -> bool {
let parents: Vec<_> = checker.semantic().current_statements().collect();

for stmt in parents {
if let Stmt::FunctionDef(function_def) = stmt {
if is_decorated_with(checker, function_def) {
let arguments = extract_task_function_arguments(function_def);

for deprecated_arg in REMOVED_CONTEXT_KEYS {
if arguments.contains(&deprecated_arg.to_string()) {
checker.diagnostics.push(Diagnostic::new(
Airflow3Removal {
deprecated: deprecated_arg.to_string(),
replacement: Replacement::None,
},
expr.range(),
));
return true;
}
}
}
}
}

false
}

fn extract_task_function_arguments(stmt: &StmtFunctionDef) -> Vec<String> {
let mut arguments = Vec::new();

for param in &stmt.parameters.args {
arguments.push(param.parameter.name.to_string());
}

if let Some(vararg) = &stmt.parameters.kwarg {
arguments.push(format!("**{}", vararg.name));
}

arguments
}

fn is_decorated_with(checker: &mut Checker, stmt: &StmtFunctionDef) -> bool {
stmt.decorator_list.iter().any(|decorator| {
checker
.semantic()
.resolve_qualified_name(map_callable(&decorator.expression))
.is_some_and(|qualified_name| {
matches!(qualified_name.segments(), ["airflow", "decorators", "task"])
})
})
}
Loading

0 comments on commit fb8b300

Please sign in to comment.