Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make generic pipeline work with airflow2x #3167

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

shalberd
Copy link
Contributor

@shalberd shalberd commented Jun 21, 2023

fixes #3166

@ianonavy @lresende NOT intended to work with Airflow 1.x

What changes were proposed in this pull request?

changes to airflow processor and airflow pipeline template with regards to Airflow 2.8.2 or higher support
also added pendulum instead of days_ago since deprecated

https://github.com/apache/airflow/pull/21653/files

Conceptual overlap with a fork, came to the same code in parallel

change to the Kubernetes client SDK in generic pipeline part of template since the Airflow abstractions
were all deprecated and removed except for Secret.

Finally, Airflow 2 adds logic that makes config_file mutually exclusive
with in_cluster, so we need to ensure that None is passed as None and
not string "None".

See also

kflow-ai@f9d1329#diff-dc6c3f666aad9271fa5e9b8c31e3f0582cd39a7d2516cbc2240731fe4456e641

How was this pull request tested?

In contrast to kubeflow pipelines, even for Airflow 1.x and the different pipeline editors, there do not seem to be any tests.
I'd like to test the built wheel file in a docker image in conjunction with ODH.
Mostly seeing whether the generated DAG code works with Airflow 2.8.2 and higher.

Developer's Certificate of Origin 1.1

   By making a contribution to this project, I certify that:

   (a) The contribution was created in whole or in part by me and I
       have the right to submit it under the Apache License 2.0; or

   (b) The contribution is based upon previous work that, to the best
       of my knowledge, is covered under an appropriate open source
       license and I have the right under that license to submit that
       work with modifications, whether created in whole or in part
       by me, under the same open source license (unless I am
       permitted to submit under a different license), as indicated
       in the file; or

   (c) The contribution was provided directly to me by some other
       person who certified (a), (b) or (c) and I have not modified
       it.

   (d) I understand and agree that this project and the contribution
       are public and that a record of the contribution (including all
       personal information I submit with it, including my sign-off) is
       maintained indefinitely and may be redistributed consistent with
       this project or the open source license(s) involved.

@ianonavy
Copy link

One thing I forgot to note is that from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator was deprecated in favor of from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator in Airflow 2.6.0. Decided to keep the older one for compatibility with earlier releases; not sure which version we are targeting.

@shalberd
Copy link
Contributor Author

shalberd commented Jun 21, 2023

Regarding KubernetesPodOperator location: Yes, good point. I missed that in the release notes. Well, 2.5.x is already well past, plus there were some security bugs in 2.5.x that someone in our org mentioned that make it a good idea to go further than 2.5.x. Airflow 2.6.0 came out in May, so we can assume it is current. …@lresende what do you think in terms of which Airflow 2.x release we are targeting in our efforts here? greater or equal to 2.6.0?

@shalberd shalberd force-pushed the make_generic_pipeline_work_with_airflow2x branch from 4cbf21d to 4b8386f Compare June 22, 2023 08:08
@giladd123
Copy link

giladd123 commented Jun 22, 2023

I did approximatly the same thing in an air gapped environment so it would be a problem for me to commit my code but I think I can help.
I also encountered a problem with the resources, what seemed to work for me is switching resources = ... to something like:

{% if operation.cpu_request or operation.mem_request or operation.gpu_limit %}
container_resources=k8s.V1ResourceRequirements(
    requests={
        {% if operation.cpu_request %}
        'cpu': '{{ operation.cpu_request }}',
        {% endif %}
        {% if operation.mem_request %}
        'memory': '{{ operation.mem_request }}G',
        {% endif %}
    },
    limits={
        {% if operation.cpu_request %}
        'cpu': '{{ operation.cpu_request }}',
        {% endif %}
        {% if operation.mem_request %}
        'memory': '{{ operation.mem_request }}G',
        {% endif %}
        {% if operation.gpu_limit %}
        'nvidia.com/gpu': '{{ operation.gpu_limit }}', # or 'amd.com/gpu'
        {% endif %}
    },
)
{% endif %}

In this case, the UI doesn't have an option for CPU and Memory limits, so the limits could probably be removed (In my k8s cluster a limit must be set so I just set it in some ratio of the request).

I lack the knowledge to change the UI to include limits but I'm willing to help with anything else!

@shalberd
Copy link
Contributor Author

shalberd commented Jun 26, 2023

@giladd123 agreed, setting limits is good practice, even when not enforced by LimitRange and max ratio request to limit.

Quoting

Requests and Limits should never be treated as min/max requirements for any pod. 
The requested amount should be a reflection of the pod’s required resources (both CPU and memory) 
under load. The ratio between request and limit should be as close to 1.0 as possible. 
Any divergence from this should have a valid business reason. The request amount reflects 
the pod’s normal usage, the limit is there as a safety net to protect the rest of the cluster’s load, 
in case it misbehaves, or a mistake was made during the profiling of the application.

For the template, setting limits to either equal to requests or minimally higher by a factor of x 1.2 would be good practice and make the cluster as a whole more stable. In the end, developers sets those requests in the GUI and they must be admonished to set realistic request sizes. With a limit set on more than 2 or 3 times the request, they'll realize soon they are wrong in their pipeline steps definition for resources whne competing resources lead to e.g. node OOM.

@giladd123
Copy link

For the template, setting limits to either equal to requests or minimally higher by a factor of x 1.2 would be good practice and make the cluster as a whole more stable.

Altough being a good practice, I don't think setting a hard ratio is a good idea as in the end this gives the user less flexability (for example, my cluster enforces a 1:4 ratio on cpu request and limit, with a 1.2x jobs will just not go up). I don't think we should force good practices on users, we should let them deal with this themselves.

@shalberd
Copy link
Contributor Author

shalberd commented Jun 26, 2023

@giladd123 Agreed, however, not setting limits at all leads to pods being scheduled on a node that they really should not be scheduled on. In your example of 1 to 4 ratio, should all pods ever run on the same node at the same time, you're in for huge trouble, having a node outage and re-scheduling, leading to interruptions. We never went beyond 1 to 2 ratio in our production and test clusters.

As you mentioned, it'd be best to at least have the option to set limits at the GUI level.
Then, each enterprise can make their developers input educated settings and enforce them also via LimitRange.
I need to check whether cpu request, memory request, gpu request, gpu memory request are GUI required values.
If they are, then the same (mandatory fields) should apply for limits on the GUI as well, being of course user-defineable.
Not providing limits is simply sloppy, I've got no idea why noone ever criticized that.

@lresende
Copy link
Member

Where we are with this? I am assuming still a WIP and we are waiting for more changes before a full review?

@dolevi101
Copy link

Hi, is there any update?

@lresende lresende added status:Work in Progress Development in progress. A PR tagged with this label is not review ready unless stated otherwise. platform: pipeline-Airflow Related to usage of Apache Airflow as pipeline runtime labels Oct 4, 2023
@shalberd
Copy link
Contributor Author

shalberd commented Oct 9, 2023

@lresende yes, it is WIP still, in large part due to the fact that I have not received any feedback, neither from ODH nor here, on why CPU and Memory limits are not in the variables and thus cannot be used at least optionally. In Kubeflow notebooks, limits can be used, why not in pipelines? @harshad16 My aim is to conceptually keep Airflow 2.x runtime code as much aligned as possible with the Redhat KFP side.

@shalberd
Copy link
Contributor Author

shalberd commented Nov 17, 2023

I am leaving out the issue of resource limits for now. My aim is to test build this and integrate it into an open data hub notebook container, then test it with our Airflow 2.6.2 instance that is reading the Elyra-generated DAGs from our Gitlab company-internally.
Since that won't be at pypi, I'll probably have to find another way to load the three modified builds into my custom docker image, i.e. https://github.com/opendatahub-io/notebooks/blob/main/jupyter/pytorch/ubi9-python-3.9/Pipfile#L37

PIP_INDEX_URL or similar. You can probably tell I never installed from dev builds before ...
Anyone is welcome do do code review and give me some hints how I can best test this out running in Jupyter on ODH.

@shalberd shalberd changed the title [WIP] Make generic pipeline work with airflow2x Make generic pipeline work with airflow2x Nov 17, 2023
@shalberd
Copy link
Contributor Author

shalberd commented Nov 24, 2023

Testing with a built .whl file

$ make build-dependencies
$ make build-server
python3 -m build
* Creating virtualenv isolated environment...
* Installing packages in isolated environment... (hatchling>=1.2.0)
* Getting build dependencies for sdist...
* Building sdist...
* Building wheel from sdist
* Creating virtualenv isolated environment...
* Installing packages in isolated environment... (hatchling>=1.2.0)
* Getting build dependencies for wheel...
* Building wheel...
Successfully built elyra-3.16.0.dev0.tar.gz and elyra-3.16.0.dev0-py3-none-any.whl
$ find . -type f -name "*.whl"
./dist/elyra-3.16.0.dev0-py3-none-any.whl

and integrated it into Open Data Hub Notebook Containers build process.

$ elyra-pipeline --version
v3.16.0.dev0

getting there. Muuch better. Just tested this and realized I forgot about what Gilad told me back then regarding changes to resources assembly,
Bildschirmfoto 2023-11-24 um 17 11 29
Bildschirmfoto 2023-11-24 um 17 11 53

https://airflow.apache.org/docs/apache-airflow-providers-cncf-kubernetes/stable/_modules/airflow/providers/cncf/kubernetes/operators/pod.html

making the changes now and will test the locally-built wheel file once more with Airflow 2.6.x then, next week.

i.e. in the airflow template file, for now only with that gpu limit, no cpu and memory limits as we do not have them yet in GUI.

        {% if operation.cpu_request or operation.mem_request or operation.gpu_limit %}
                                                            container_resources=k8s.V1ResourceRequirements(
                                                                requests={
                                                                    {% if operation.cpu_request %}
                                                                        'cpu': '{{ operation.cpu_request }}',
                                                                    {% endif %}
                                                                    {% if operation.mem_request %}
                                                                        'memory': '{{ operation.mem_request }}G',
                                                                    {% endif %}
                                                                },
                                                                limits={
                                                                    {% if operation.gpu_limit %}
                                                                        'nvidia.com/gpu': '{{ operation.gpu_limit }}', # or 'amd.com/gpu'
                                                                    {% endif %}
                                                                },
                                                            )
        {% endif %}

I don't use GPUs, but I want to find out before I make the whl again whether there is also a property for the kind of gpu, don't wanna hard-code either nvidia.com/gpu or amd.com/gpu. Looks like the field I am looking for is gpu_vendor. Need to add gpu_vendor to airflow processor target_ops

$ git diff
diff --git a/elyra/pipeline/airflow/processor_airflow.py b/elyra/pipeline/airflow/processor_airflow.py
index 3248367a..7f46f317 100644
--- a/elyra/pipeline/airflow/processor_airflow.py
+++ b/elyra/pipeline/airflow/processor_airflow.py
@@ -343,6 +343,7 @@ be fully qualified (i.e., prefixed with their package names).
                     "cpu_request": operation.cpu,
                     "mem_request": operation.memory,
                     "gpu_limit": operation.gpu,
+                    "gpu_vendor": operation.gpu_vendor,
                     "operator_source": operation.filename,
                 }
 
diff --git a/elyra/templates/airflow/airflow_template.jinja2 b/elyra/templates/airflow/airflow_template.jinja2
index b9314edd..5f033317 100644
--- a/elyra/templates/airflow/airflow_template.jinja2
+++ b/elyra/templates/airflow/airflow_template.jinja2
@@ -47,17 +47,21 @@ op_{{ operation.id|regex_replace }} = KubernetesPodOperator(name='{{ operation.n
                                                             task_id='{{ operation.notebook|regex_replace }}',
                                                             env_vars={{ operation.pipeline_envs }},
         {% if operation.cpu_request or operation.mem_request or operation.gpu_limit %}
-                                                            resources = {
-            {% if operation.cpu_request %}
-                                                                       'request_cpu': '{{ operation.cpu_request }}',
-            {% endif %}
-            {% if operation.mem_request %}
-                                                                       'request_memory': '{{ operation.mem_request }}G',
-            {% endif %}
-            {% if operation.gpu_limit %}
-                                                                       'limit_gpu': '{{ operation.gpu_limit }}',
-            {% endif %}
-                                                            },
+                                                            container_resources=k8s.V1ResourceRequirements(
+                                                                requests={
+                                                                    {% if operation.cpu_request %}
+                                                                        'cpu': '{{ operation.cpu_request }}',
+                                                                    {% endif %}
+                                                                    {% if operation.mem_request %}
+                                                                        'memory': '{{ operation.mem_request }}G',
+                                                                    {% endif %}
+                                                                },
+                                                                limits={
+                                                                    {% if operation.gpu_limit %}
+                                                                        '{{ operation.gpu_vendor }}': '{{ operation.gpu_limit }}',
+                                                                    {% endif %}
+                                                                },
+                                                            )
         {% endif %}
                                                             volumes=[{{ processor.render_volumes(operation.elyra_props) }}],
                                                             volume_mounts=[{{ processor.render_mounts(operation.elyra_props) }}],

Another thing I noticed: CPU always needs to be an integer greater or equal than 1, memory also. This is not good. We should later fix that GUI and property part to more K8S and Openshift style resource units: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/

@shalberd
Copy link
Contributor Author

shalberd commented Nov 27, 2023

Ok, that worked nicely and my pipelines are getting picked up without error, GPU field, if specified, is picked up as well.

Bildschirmfoto 2023-11-27 um 12 56 27

Bildschirmfoto 2023-11-27 um 12 57 23

Bildschirmfoto 2023-11-27 um 13 00 18

I am having a minor issue with PodTemplate not allowing for Volume Creation from ConfigMap Content (only from PVCs), which would be super useful for mounting in custom CA-bundle file from a configmap to /etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem. This is for now presenting me with problems when downloading a spawned pipeline task in Airflow from an Artifactory with custom CA based server certificate, but that is a different story. I mean what I define at Airflow config for the worker

extraVolumes
  - name: trusted-ca
    configMap:
      name: trusted-ca
      items:
      - key: 'ca-bundle.crt'
        path: "tls-ca-bundle.pem"

extraVolumeMounts
  - name: trusted-ca
    readOnly: true
    mountPath: '/etc/pki/ca-trust/extracted/pem'

is present in the airflow worker container, but not in the Elyra-defined dependent Pod Container unfortunately. Some stuff for a different story, volumes from ConfigMaps and volumeMounts based on that volume, similar to CPU limits ;-)
There has already been some discussion on pod_template_file, of which above spec can already be a part, and it not rippling through. Mabe @thesuperzapper can give me some input on how to volumes from configmaps instead of PVCs in K8S / Openshift with https://airflow.apache.org/docs/apache-airflow-providers-cncf-kubernetes/stable/_modules/airflow/providers/cncf/kubernetes/operators/pod.html. If we were to go with the pod template file approach, currently, Elyra is lacking a way to define that field in the main DAG def as well as mounting in a file, as with custom-ca, into a volume from a configmap, I think.

@lresende lresende removed the status:Work in Progress Development in progress. A PR tagged with this label is not review ready unless stated otherwise. label Dec 8, 2023
@lresende lresende self-requested a review December 8, 2023 16:36
@shalberd shalberd force-pushed the make_generic_pipeline_work_with_airflow2x branch 2 times, most recently from 1f21e5f to a6b7e82 Compare December 19, 2023 14:38
@shalberd
Copy link
Contributor Author

shalberd commented Dec 19, 2023

squashed commits as well to be more readable. As mentioned, the built whl file together with Open Data Hub Jupyter Notebooks is working fine together with Airflow 2.6.2.

@lresende
Copy link
Member

lresende commented Dec 21, 2023

It would be good to have a 2nd person to validate this is working before we look into merging it...
I will try to find some time next week during my break, but if anyone has the Airflow 2.x available and could validate

@kevin-bates
Copy link
Member

@shalberd - thank you for this pull request. I am surprised at how little changes there are and don't see any kind of Airflow version checking happening. With these changes, is elyra agnostic to the Airflow version or do these changes replace the existing Airflow 1x support with Airflow 2x support? I guess I was expecting to see a new subclass (e.g., AirflowPipelineProcessor2) or runtime type (APACHE_AIRFLOW_2) so as to preserve existing support. If a replacement - what do we tell our existing Airflow 1x users?

@shalberd
Copy link
Contributor Author

shalberd commented Jan 2, 2024

@kevin-bates it is a replacement for Airflow 2.x. I have already talked with @lresende in a Friday community call and we have the idea so far to make this part of Elyra 4.x, i.e. no more Airflow 1.x support soon. About lifecycle management: I only have contact with Astronomer and the Airflow community helm chart maintainer @thesuperzapper ... my judgment call would be: Airflow 1.x is long deprecated, no more security updates ... announce Airflow 1.x support derelease with Elyra 4.x
By the way: I'll fix the tests at elyra/tests to be Airflow 2.x compatible as well as part of this PR.

@MR-GOYAL
Copy link

MR-GOYAL commented Jan 2, 2024

@shalberd I have tried changes as mentioned in the fork and its working fine with the airflow 2.7.2(version we are using). I have tested in our environment and airflow pipeline are executing as expected.
Thanks for the airflow 2.x support on elyra

@shalberd shalberd force-pushed the make_generic_pipeline_work_with_airflow2x branch from d0c2050 to 81bf416 Compare January 12, 2024 08:48
@shalberd shalberd force-pushed the make_generic_pipeline_work_with_airflow2x branch from 81bf416 to d724719 Compare January 12, 2024 08:58
@romeokienzler
Copy link
Member

waiting for #3202 to be merged

@lresende
Copy link
Member

Looks great that the generic pipelines are working with these changes, we need to make sure custom components made for Airflow 2.x are also working well.

@shalberd
Copy link
Contributor Author

shalberd commented Jan 12, 2024

Looks great that the generic pipelines are working with these changes

yes, I added the new limits fields to airflow template as well and it is working nicely, was able to push generated DAG code to our Git (see screenshot) and it ran through as intended on Airflow, i.e. as task container with cpu and memory limits nice. Thank you, @giladd123 (limits support via GUI will be in elyra 4) and @ianonavy (publicly visible fork and tips) for the support along the way, y'all rock / ata totach

Bildschirmfoto 2024-01-10 um 17 32 04

we need to make sure custom components made for Airflow 2.x are also working well.

@lresende yes, I will test out the functionality behind those core airflow components in generic pipelines and give feedback in our weekly call.

Bildschirmfoto 2024-01-12 um 17 48 41

but that is part of a different story as well #2124

@kevin-bates

Tracker ticket #3165

It's not just about core operators provided by airflow (via package catalog connector), but also what is called provider package catalog connector and importing what is called community maintained operators from outside Airflow for Airflow

we only close #3166 with this PR, but I've got the other ones on the radar, will test, see, and make PRs separately where needed.

@shalberd
Copy link
Contributor Author

confirming this here is still working beautifully for Airflow 2.8.2

…ocessor airflow and jinja2 template airflow to comply with Airflow 2.x. Added new location of KubernetesPodOperator library in Airflow 2.x to test Pipeline for processor airflow. Added cpu and memory limits fields in airflow 2.x fashion as well.

Signed-off-by: Sven Thoms <[email protected]>
@shalberd shalberd force-pushed the make_generic_pipeline_work_with_airflow2x branch from 4f8996f to d4176b9 Compare September 3, 2024 11:13
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
platform: pipeline-Airflow Related to usage of Apache Airflow as pipeline runtime
Projects
None yet
Development

Successfully merging this pull request may close these issues.

make generic pipelines to DAG rendering work with Airflow 2.x
8 participants