diff --git a/LICENSE b/LICENSE index 2e7778c..e857789 100644 --- a/LICENSE +++ b/LICENSE @@ -1,11 +1,19 @@ Copyright 2017 "Shopify inc." -Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: -1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. -2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. - -3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. \ No newline at end of file +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/MANIFEST.in b/MANIFEST.in new file mode 100644 index 0000000..64ad321 --- /dev/null +++ b/MANIFEST.in @@ -0,0 +1 @@ +include README.md LICENSE diff --git a/Makefile b/Makefile index 9fc2229..f51359b 100644 --- a/Makefile +++ b/Makefile @@ -24,5 +24,6 @@ autopep8: lint: @echo 'Linting...' @pylint --rcfile=pylintrc pyoozie tests + @flake8 autolint: autopep8 lint diff --git a/dev_requirements.txt b/dev_requirements.txt index 30568a2..4ef4469 100644 --- a/dev_requirements.txt +++ b/dev_requirements.txt @@ -1,5 +1,8 @@ pytest>=2.7 pytest-cov pytest-randomly +pytest-mock pylint +xmltodict autopep8 +flake8 diff --git a/pylintrc b/pylintrc index 77e058d..c430a2e 100644 --- a/pylintrc +++ b/pylintrc @@ -35,8 +35,19 @@ load-plugins= # it should appear only once). # CHANGED: +# too-few-public-methods +# too-many-instance-attributes +# too-many-arguments +# protected-access; _xml() calls +# invalid-name; some 2-letter variables are OK +# redefined-outer-name; doesn't play well with pytest fixtures +# fixme; we're in-development, so we need some of these # C0111: Missing docstring -disable=C0111 +# D101 Missing docstring in public class +# D102 Missing docstring in public method +# D105 Missing docstring in magic method +disable=C0111,D101,D102,D105,too-few-public-methods,too-many-instance-attributes,too-many-arguments, + protected-access,invalid-name,redefined-outer-name,fixme [REPORTS] diff --git a/pyoozie/__init__.py b/pyoozie/__init__.py index 3c57adb..15bb064 100644 --- a/pyoozie/__init__.py +++ b/pyoozie/__init__.py @@ -1,4 +1,19 @@ # Copyright (c) 2017 "Shopify inc." All rights reserved. -# Use of this source code is governed by a BSD-style license that can be found in the LICENSE file. +# Use of this source code is governed by a MIT-style license that can be found in the LICENSE file. + +from pyoozie.coordinator import Coordinator, ExecutionOrder +from pyoozie.tags import Parameters, Configuration, Credentials, Shell, SubWorkflow, GlobalConfiguration, Email +from pyoozie.builder import WorkflowBuilder, CoordinatorBuilder __version__ = '0.0.0' + +__all__ = ( + # coordinator + 'Coordinator', 'ExecutionOrder', 'Configuration', 'Parameters', + + # tags + 'Parameters', 'Configuration', 'Credentials', 'Shell', 'SubWorkflow', 'GlobalConfiguration', 'Email', + + # builder + 'WorkflowBuilder', 'CoordinatorBuilder', +) diff --git a/pyoozie/builder.py b/pyoozie/builder.py new file mode 100644 index 0000000..c3e16d8 --- /dev/null +++ b/pyoozie/builder.py @@ -0,0 +1,104 @@ +# Copyright (c) 2017 "Shopify inc." All rights reserved. +# Use of this source code is governed by a MIT-style license that can be found in the LICENSE file. +from __future__ import unicode_literals + +from pyoozie.coordinator import Coordinator +from pyoozie.tags import Configuration + + +def _workflow_submission_xml(username, workflow_xml_path, configuration=None, indent=False): + """Generate a Workflow XML submission message to POST to Oozie.""" + submission = Configuration(configuration) + submission.update({ + 'user.name': username, + 'oozie.wf.application.path': workflow_xml_path, + }) + return submission.xml(indent) + + +def _coordinator_submission_xml(username, coord_xml_path, configuration=None, indent=False): + """Generate a Coordinator XML submission message to POST to Oozie.""" + submission = Configuration(configuration) + submission.update({ + 'user.name': username, + 'oozie.coord.application.path': coord_xml_path, + }) + return submission.xml(indent) + + +class WorkflowBuilder(object): + + def __init__(self, name): + # Initially, let's just use a static template and only one action payload and one action on error + self._name = name + self._action_name = None + self._action_payload = None + self._action_error = None + self._kill_message = None + + def add_action(self, name, action, action_on_error, kill_on_error='${wf:lastErrorNode()} - ${wf:id()}'): + # Today you can't rename your action and you can only have one, but in the future you can add multiple + # named actions + if any((self._action_name, self._action_payload, self._action_error, self._kill_message)): + raise NotImplementedError("Can only add one action in this version") + else: + self._action_name = name + self._action_payload = action + self._action_error = action_on_error + self._kill_message = kill_on_error + + return self + + def build(self, indent=False): + def format_xml(xml): + xml = xml.replace("", '') + return '\n'.join([(' ' * 8) + line for line in xml.strip().split('\n')]) + return ''' + + + + +{action_payload_xml} + + + + +{action_error_xml} + + + + + {kill_message} + + + +'''.format(action_payload_xml=format_xml(self._action_payload.xml(indent=indent)), + action_error_xml=format_xml(self._action_error.xml(indent=indent)), + kill_message=self._kill_message, + action_name=self._action_name, + name=self._name).strip() + + +class CoordinatorBuilder(object): + + def __init__(self, name, workflow_xml_path, frequency_in_minutes, start, end=None, timezone=None, + workflow_configuration=None, timeout_in_minutes=None, concurrency=None, execution_order=None, + throttle=None, parameters=None): + self._coordinator = Coordinator( + name=name, + workflow_app_path=workflow_xml_path, + frequency=frequency_in_minutes, + start=start, + end=end, + timezone=timezone, + workflow_configuration=workflow_configuration, + timeout=timeout_in_minutes, + concurrency=concurrency, + execution_order=execution_order, + throttle=throttle, + parameters=parameters, + ) + + def build(self, indent=False): + return self._coordinator.xml(indent) diff --git a/pyoozie/coordinator.py b/pyoozie/coordinator.py new file mode 100644 index 0000000..e59cd34 --- /dev/null +++ b/pyoozie/coordinator.py @@ -0,0 +1,92 @@ +# Copyright (c) 2017 "Shopify inc." All rights reserved. +# Use of this source code is governed by a MIT-style license that can be found in the LICENSE file. +from __future__ import unicode_literals + +from datetime import timedelta +from enum import Enum + +from pyoozie.tags import _validate, XMLSerializable, Parameters, Configuration + + +ONE_HUNDRED_YEARS = 100 * 365.24 + + +class ExecutionOrder(Enum): + """Execution order used for coordinator jobs.""" + + FIFO = 'FIFO' + LIFO = 'LIFO' + LAST_ONLY = 'LAST_ONLY' + NONE = 'NONE' + + def __str__(self): + return self.value + + +def format_datetime(value): + return value.strftime('%Y-%m-%dT%H:%MZ') + + +class Coordinator(XMLSerializable): + + def __init__(self, name, workflow_app_path, frequency, start, end=None, timezone=None, + workflow_configuration=None, timeout=None, concurrency=None, execution_order=None, throttle=None, + parameters=None): + super(Coordinator, self).__init__('coordinator-app') + # Compose and validate dates/frequencies + if end is None: + end = start + timedelta(days=ONE_HUNDRED_YEARS) + assert end > start, "End time ({end}) must be greater than the start time ({start})".format( + end=format_datetime(end), start=format_datetime(start)) + assert frequency >= 5, "Frequency ({frequency} min) must be greater than or equal to 5 min".format( + frequency=frequency) + + # Coordinator + self.name = _validate(name) + self.frequency = frequency + self.start = start + self.end = end + self.timezone = timezone if timezone else 'UTC' + + # Workflow action + self.workflow_app_path = workflow_app_path + self.workflow_configuration = Configuration(workflow_configuration) + + # Controls + self.timeout = timeout + self.concurrency = concurrency + self.execution_order = execution_order + self.throttle = throttle + + self.parameters = Parameters(parameters) + + def _xml(self, doc, tag, text): + with tag(self.xml_tag, xmlns="uri:oozie:coordinator:0.5", name=self.name, frequency=str(self.frequency), + start=format_datetime(self.start), end=format_datetime(self.end), timezone=self.timezone): + + if self.parameters: + self.parameters._xml(doc, tag, text) + + if self.timeout or self.concurrency or self.execution_order or self.throttle: + with tag('controls'): + if self.timeout: + with tag('timeout'): + text(str(self.timeout)) + if self.concurrency: + with tag('concurrency'): + text(str(self.concurrency)) + if self.execution_order: + with tag('execution'): + text(str(self.execution_order)) + if self.throttle: + with tag('throttle'): + text(str(self.throttle)) + + with tag('action'): + with tag('workflow'): + with tag('app-path'): + text(self.workflow_app_path) + if self.workflow_configuration: + self.workflow_configuration._xml(doc, tag, text) + + return doc diff --git a/pyoozie/tags.py b/pyoozie/tags.py new file mode 100644 index 0000000..e29491a --- /dev/null +++ b/pyoozie/tags.py @@ -0,0 +1,306 @@ +# Copyright (c) 2017 "Shopify inc." All rights reserved. +# Use of this source code is governed by a MIT-style license that can be found in the LICENSE file. +from __future__ import unicode_literals +from abc import ABCMeta, abstractmethod +import re +import yattag + + +MAX_IDENTIFIER_LENGTH = 39 +REGEX_IDENTIFIER = r'^[a-zA-Z_][\-_a-zA-Z0-9]{0,38}$' +COMPILED_REGEX_IDENTIFIER = re.compile(REGEX_IDENTIFIER) + + +def _validate(identifier): + + assert len(identifier) <= MAX_IDENTIFIER_LENGTH, \ + "Identifier must be less than {max_length} chars long, '{identifier}' is {length}".format( + max_length=MAX_IDENTIFIER_LENGTH, + identifier=identifier, + length=len(identifier)) + + assert COMPILED_REGEX_IDENTIFIER.match(identifier), \ + "Identifier must match {regex}, '{identifier}' does not".format( + regex=REGEX_IDENTIFIER, + identifier=identifier) + + return identifier + + +class XMLSerializable(object): + """An abstract object that can be serialized to XML.""" + + __metaclass__ = ABCMeta + + def __init__(self, xml_tag): + self.xml_tag = xml_tag + + def xml(self, indent=False): + doc, tag, text = yattag.Doc().tagtext() + doc.asis("") + xml = self._xml(doc, tag, text).getvalue() + if indent: + return yattag.indent(xml, indentation=' ' * 4, newline='\r\n') + else: + return xml + + @abstractmethod + def _xml(self, doc, tag, text): + raise NotImplementedError + + def __str__(self): + return self.xml_tag + + +class _PropertyList(XMLSerializable, dict): + """ + Object used to represent Oozie workflow/coordinator property-value sets. + + Generates XML of the form: + ... + + + [PROPERTY-NAME] + [PROPERTY-VALUE] + + ... + + """ + + def __init__(self, xml_tag, attributes=None, values=None): + XMLSerializable.__init__(self, xml_tag) + if values: + dict.__init__(self, values) + else: + dict.__init__(self) + self.attributes = attributes or {} + + def _xml(self, doc, tag, text): + with tag(self.xml_tag, **self.attributes): + for name, value in sorted(self.items()): + with tag('property'): + with tag('name'): + doc.text('{}'.format(name)) + with tag('value'): + doc.text('{}'.format(value) if value is not None else '') + return doc + + +class Parameters(_PropertyList): + """Coordinator/workflow parameters. + + Allows one to specify properties that can be reused in actions. "Properties that are a valid Java identifier, + [A-Za-z_][0-9A-Za-z_]* , are available as '${NAME}' variables within the workflow definition." + + "Properties that are not valid Java Identifier, for example 'job.tracker', are available via the + String wf:conf(String name) function. Valid identifier properties are available via this function as well." + """ + + def __init__(self, values=None): + _PropertyList.__init__(self, 'parameters', values=values) + + +class Configuration(_PropertyList): + """Coordinator job submission, workflow, workflow action configuration XML.""" + + def __init__(self, values=None): + _PropertyList.__init__(self, 'configuration', values=values) + + +class Credentials(_PropertyList): + """HCatalog, Hive Metastore, HBase, or Hive Server 2 action credentials. + + Generates XML of the form: + ``` + ... + + + + hcat.metastore.uri + HCAT_URI + + ... + + + + + ... + ``` + """ + + def __init__(self, values, credential_name, credential_type): + _PropertyList.__init__(self, 'credentials', + attributes={ + 'name': credential_name, + 'type': credential_type, + }, + values=values) + self.name = _validate(credential_name) + + +class Shell(XMLSerializable): + """Workflow shell action (v0.3).""" + + def __init__(self, exec_command, job_tracker=None, name_node=None, prepares=None, job_xml_files=None, + configuration=None, arguments=None, env_vars=None, files=None, archives=None, capture_output=False): + XMLSerializable.__init__(self, 'shell') + self.exec_command = exec_command + self.job_tracker = job_tracker + self.name_node = name_node + self.prepares = prepares if prepares else list() + self.job_xml_files = job_xml_files if job_xml_files else list() + self.configuration = Configuration(configuration) + self.arguments = arguments if arguments else list() + self.env_vars = env_vars if env_vars else dict() + self.files = files if files else list() + self.archives = archives if archives else list() + self.capture_output = capture_output + + def _xml(self, doc, tag, text): + with tag(self.xml_tag, xmlns='uri:oozie:shell-action:0.3'): + if self.job_tracker: + with tag('job-tracker'): + doc.text(self.job_tracker) + + if self.name_node: + with tag('name-node'): + doc.text(self.name_node) + + if self.prepares: + raise NotImplementedError("Shell action's prepares has not yet been implemented") + + for xml_file in self.job_xml_files: + with tag('job-xml'): + doc.text(xml_file) + + if self.configuration: + self.configuration._xml(doc, tag, text) + + with tag('exec'): + doc.text(self.exec_command) + + for argument in self.arguments: + with tag('argument'): + doc.text(argument) + + for key, value in self.env_vars.items(): + with tag('env-var'): + doc.text('{key}={value}'.format(key=key, value=value)) + + for filename in self.files: + with tag('file'): + doc.text(filename) + + for archive in self.archives: + with tag('archive'): + doc.text(archive) + + if self.capture_output: + doc.stag('capture-output') + + return doc + + +class SubWorkflow(XMLSerializable): + """Run another workflow defined in another XML file on HDFS. + + An Oozie sub-workflow is an "action [that] runs a child workflow job [...]. The parent workflow job will wait + until the child workflow job has completed." + """ + + def __init__(self, app_path, propagate_configuration=True, configuration=None): + XMLSerializable.__init__(self, 'sub-workflow') + self.app_path = app_path + self.propagate_configuration = propagate_configuration + self.configuration = Configuration(configuration) + + def _xml(self, doc, tag, text): + with tag(self.xml_tag): + with tag('app-path'): + doc.text(self.app_path) + if self.propagate_configuration: + doc.stag('propagate-configuration') + if self.configuration: + self.configuration._xml(doc, tag, text) + + return doc + + +class GlobalConfiguration(XMLSerializable): + """Global configuration values for all actions in a workflow. + + "Oozie allows a global section to reduce the redundant job-tracker and name-node declarations for each action. + [...] The global section may contain the job-xml, configuration, job-tracker, or name-node that the user would + like to set for every action. If a user then redefines one of these in a specific action node, Oozie will + update [sic] use the specific declaration instead of the global one for that action." + + "The job-xml element, if present, must refer to a Hadoop JobConf job.xml file bundled in the workflow + application." + """ + + def __init__(self, job_tracker=None, name_node=None, job_xml_files=None, configuration=None): + XMLSerializable.__init__(self, 'global') + self.job_tracker = job_tracker + self.name_node = name_node + self.job_xml_files = job_xml_files if job_xml_files else list() + self.configuration = Configuration(configuration) + + def _xml(self, doc, tag, text): + with tag(self.xml_tag): + if self.job_tracker: + with tag('job-tracker'): + doc.text(self.job_tracker) + if self.name_node: + with tag('name-node'): + doc.text(self.name_node) + if self.job_xml_files: + for xml_file in self.job_xml_files: + with tag('job-xml'): + doc.text(xml_file) + if self.configuration: + self.configuration._xml(doc, tag, text) + + return doc + + +class Email(XMLSerializable): + """Email action for use within a workflow.""" + + def __init__(self, to, subject, body, cc=None, bcc=None, content_type=None, attachments=None): + XMLSerializable.__init__(self, 'email') + self.to = to + self.subject = subject + self.body = body + self.cc = cc + self.bcc = bcc + self.content_type = content_type + self.attachments = attachments + + def _xml(self, doc, tag, text): + def format_list(emails): + if hasattr(emails, '__iter__') and not isinstance(emails, str): + return ','.join(sorted(emails)) + else: + return emails + + with tag(self.xml_tag, xmlns='uri:oozie:email-action:0.2'): + with tag('to'): + doc.text(format_list(self.to)) + with tag('subject'): + doc.text(self.subject) + with tag('body'): + doc.text(self.body) + if self.cc: + with tag('cc'): + doc.text(format_list(self.cc)) + if self.bcc: + with tag('bcc'): + doc.text(format_list(self.bcc)) + if self.content_type: + with tag('content_type'): + doc.text(self.content_type) + if self.attachments: + with tag('attachment'): + doc.text(format_list(self.attachments)) + + return doc diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..5588c28 --- /dev/null +++ b/setup.cfg @@ -0,0 +1,10 @@ +[flake8] +# Ignoring: +# D100 Missing docstring in public module +# D101 Missing docstring in public class +# D102 Missing docstring in public method +# D103 Missing docstring in public function +# D104 Missing docstring in public package +# D105 Missing docstring in magic method +ignore=D100,D101,D102,D103,D104,D105 +max-line-length = 120 diff --git a/setup.py b/setup.py index dc195aa..9f2de88 100755 --- a/setup.py +++ b/setup.py @@ -1,9 +1,13 @@ #!/usr/bin/env python # Copyright (c) 2017 "Shopify inc." All rights reserved. -# Use of this source code is governed by a BSD-style license that can be found in the LICENSE file. +# Use of this source code is governed by a MIT-style license that can be found in the LICENSE file. import re -from distutils.core import setup + +try: + from setuptools import setup +except: + from distutils.core import setup with open('README.md') as fh: @@ -15,7 +19,7 @@ if not version: raise RuntimeError('Cannot find version information') - + setup( name='pyoozie', version=version, @@ -26,15 +30,16 @@ url='https://github.com/Shopify/pyoozie', packages=['pyoozie'], install_requires=[ - 'requests>=2.12.3' + 'enum34>=0.9.23', + 'yattag>=1.7.2', + 'setuptools>=0.9', ], - license="BSD", - zip_safe=False, - keywords='oozie', + license="MIT", + keywords=['oozie'], classifiers=[ 'Development Status :: 2 - Pre-Alpha', 'Intended Audience :: Developers', - 'License :: OSI Approved :: BSD License', + 'License :: OSI Approved :: MIT License', 'Natural Language :: English', 'Programming Language :: Python :: 2', 'Programming Language :: Python :: 2.7', diff --git a/tests/__init__.py b/tests/__init__.py index 8835f60..cd96953 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -1,2 +1,2 @@ # Copyright (c) 2017 "Shopify inc." All rights reserved. -# Use of this source code is governed by a BSD-style license that can be found in the LICENSE file. +# Use of this source code is governed by a MIT-style license that can be found in the LICENSE file. diff --git a/tests/data/coordinator-with-controls.xml b/tests/data/coordinator-with-controls.xml new file mode 100644 index 0000000..375d635 --- /dev/null +++ b/tests/data/coordinator-with-controls.xml @@ -0,0 +1,31 @@ + + + + + throttle + 1 + + + + 10 + 1 + LAST_ONLY + ${throttle} + + + + /user/oozie/workflows/descriptive-name + + + mapred.job.queue.name + production + + + + + diff --git a/tests/data/coordinator.xml b/tests/data/coordinator.xml new file mode 100644 index 0000000..deabe6a --- /dev/null +++ b/tests/data/coordinator.xml @@ -0,0 +1,12 @@ + + + + /user/oozie/workflows/descriptive-name + + + diff --git a/tests/data/workflow.xml b/tests/data/workflow.xml new file mode 100644 index 0000000..ca4e43b --- /dev/null +++ b/tests/data/workflow.xml @@ -0,0 +1,25 @@ + + + + + + echo "test" + + + + + + + person@example.com + Error + A bad thing happened + + + + + + Failure message + + + diff --git a/tests/pyoozie/__init__.py b/tests/pyoozie/__init__.py new file mode 100644 index 0000000..cd96953 --- /dev/null +++ b/tests/pyoozie/__init__.py @@ -0,0 +1,2 @@ +# Copyright (c) 2017 "Shopify inc." All rights reserved. +# Use of this source code is governed by a MIT-style license that can be found in the LICENSE file. diff --git a/tests/pyoozie/conftest.py b/tests/pyoozie/conftest.py new file mode 100644 index 0000000..e6bf304 --- /dev/null +++ b/tests/pyoozie/conftest.py @@ -0,0 +1,9 @@ +# Copyright (c) 2017 "Shopify inc." All rights reserved. +# Use of this source code is governed by a MIT-style license that can be found in the LICENSE file. +import pytest + + +@pytest.fixture +def coordinator_xml_with_controls(): + with open('tests/data/coordinator-with-controls.xml', 'r') as fh: + return fh.read() diff --git a/tests/pyoozie/test_builder.py b/tests/pyoozie/test_builder.py new file mode 100644 index 0000000..4cf7386 --- /dev/null +++ b/tests/pyoozie/test_builder.py @@ -0,0 +1,159 @@ +# Copyright (c) 2017 "Shopify inc." All rights reserved. +# Use of this source code is governed by a MIT-style license that can be found in the LICENSE file. +from __future__ import unicode_literals + +from datetime import datetime + +import pytest + +from tests.utils import xml_to_dict_unordered +from pyoozie import WorkflowBuilder, CoordinatorBuilder, Shell, Email, ExecutionOrder +from pyoozie.builder import _workflow_submission_xml, _coordinator_submission_xml + + +@pytest.fixture +def workflow_app_path(): + return '/user/oozie/workflows/descriptive-name' + + +@pytest.fixture +def coord_app_path(): + return '/user/oozie/coordinators/descriptive-name' + + +@pytest.fixture +def username(): + return 'test' + + +def test_workflow_submission_xml(username, workflow_app_path): + actual = _workflow_submission_xml( + username=username, + workflow_xml_path=workflow_app_path, + indent=True, + ) + assert xml_to_dict_unordered(''' + + + oozie.wf.application.path + /user/oozie/workflows/descriptive-name + + + user.name + test + + ''') == xml_to_dict_unordered(actual) + + +def test_workflow_submission_xml_with_configuration(username, workflow_app_path): + actual = _workflow_submission_xml( + username=username, + workflow_xml_path=workflow_app_path, + configuration={ + 'other.key': 'other value', + }, + indent=True + ) + + assert xml_to_dict_unordered(''' + + + other.key + other value + + + oozie.wf.application.path + /user/oozie/workflows/descriptive-name + + + user.name + test + + ''') == xml_to_dict_unordered(actual) + + +def test_coordinator_submission_xml(username, coord_app_path): + actual = _coordinator_submission_xml( + username=username, + coord_xml_path=coord_app_path, + indent=True + ) + assert xml_to_dict_unordered(''' + + + oozie.coord.application.path + /user/oozie/coordinators/descriptive-name + + + user.name + test + + ''') == xml_to_dict_unordered(actual) + + +def test_coordinator_submission_xml_with_configuration(username, coord_app_path): + actual = _coordinator_submission_xml( + username=username, + coord_xml_path=coord_app_path, + configuration={ + 'oozie.coord.group.name': 'descriptive-group', + }, + indent=True + ) + assert xml_to_dict_unordered(''' + + + oozie.coord.application.path + /user/oozie/coordinators/descriptive-name + + + oozie.coord.group.name + descriptive-group + + + user.name + test + + ''') == xml_to_dict_unordered(actual) + + +def test_workflow_builder(): + with open('tests/data/workflow.xml', 'r') as fh: + expected = fh.read() + + # Can it XML? + builder = WorkflowBuilder( + name='descriptive-name' + ).add_action( + name='payload', + action=Shell(exec_command='echo "test"'), + action_on_error=Email(to='person@example.com', subject='Error', body='A bad thing happened'), + kill_on_error='Failure message', + ) + + actual_xml = builder.build() + assert xml_to_dict_unordered(expected) == xml_to_dict_unordered(actual_xml) + + +def test_coordinator_builder(coordinator_xml_with_controls, workflow_app_path): + + builder = CoordinatorBuilder( + name='coordinator-name', + workflow_xml_path=workflow_app_path, + frequency_in_minutes=24 * 60, # In minutes + start=datetime(2015, 1, 1, 10, 56), + end=datetime(2115, 1, 1, 10, 56), + concurrency=1, + throttle='${throttle}', + timeout_in_minutes=10, + execution_order=ExecutionOrder.LAST_ONLY, + parameters={ + 'throttle': 1, + }, + workflow_configuration={ + 'mapred.job.queue.name': 'production', + }) + + # Can it XML? + expected_xml = builder.build() + assert xml_to_dict_unordered(coordinator_xml_with_controls) == xml_to_dict_unordered(expected_xml) diff --git a/tests/pyoozie/test_coordinator.py b/tests/pyoozie/test_coordinator.py new file mode 100644 index 0000000..3fb875d --- /dev/null +++ b/tests/pyoozie/test_coordinator.py @@ -0,0 +1,81 @@ +# Copyright (c) 2017 "Shopify inc." All rights reserved. +# Use of this source code is governed by a MIT-style license that can be found in the LICENSE file. +from datetime import datetime, timedelta + +import pytest + +from pyoozie import Coordinator, ExecutionOrder, Configuration, Parameters +from tests.utils import xml_to_dict_unordered + + +def parse_datetime(string): + return datetime.strptime(string, '%Y-%m-%dT%H:%MZ') + + +@pytest.fixture +def expected_coordinator_options(): + return { + 'name': 'coordinator-name', + 'frequency': 1440, + 'start': parse_datetime('2015-01-01T10:56Z'), + 'end': parse_datetime('2115-01-01T10:56Z'), + 'workflow_app_path': '/user/oozie/workflows/descriptive-name', + } + + +@pytest.fixture +def coordinator_xml(): + with open('tests/data/coordinator.xml', 'r') as fh: + return fh.read() + + +def test_coordinator(coordinator_xml, expected_coordinator_options): + actual = Coordinator(**expected_coordinator_options).xml() + assert xml_to_dict_unordered(coordinator_xml) == xml_to_dict_unordered(actual) + + +def test_coordinator_end_default(coordinator_xml, expected_coordinator_options): + del expected_coordinator_options['end'] + actual = Coordinator(**expected_coordinator_options).xml() + assert xml_to_dict_unordered(coordinator_xml) == xml_to_dict_unordered(actual) + + +def test_coordinator_with_controls_and_more(coordinator_xml_with_controls, expected_coordinator_options): + actual = Coordinator( + timeout=10, + concurrency=1, + execution_order=ExecutionOrder.LAST_ONLY, + throttle='${throttle}', + workflow_configuration=Configuration({ + 'mapred.job.queue.name': 'production' + }), + parameters=Parameters({ + 'throttle': 1 + }), + **expected_coordinator_options + ).xml() + assert xml_to_dict_unordered(coordinator_xml_with_controls) == xml_to_dict_unordered(actual) + + +def test_really_long_coordinator_name(expected_coordinator_options): + with pytest.raises(AssertionError) as assertion_info: + del expected_coordinator_options['name'] + Coordinator(name='long' * 10, **expected_coordinator_options) + assert str(assertion_info.value) == \ + "Identifier must be less than 39 chars long, 'longlonglonglonglonglonglonglonglonglong' is 40" + + +def test_coordinator_bad_frequency(expected_coordinator_options): + expected_coordinator_options['frequency'] = 0 + with pytest.raises(AssertionError) as assertion_info: + Coordinator(**expected_coordinator_options) + assert str(assertion_info.value) == \ + 'Frequency (0 min) must be greater than or equal to 5 min' + + +def test_coordinator_end_before_start(expected_coordinator_options): + expected_coordinator_options['end'] = expected_coordinator_options['start'] - timedelta(days=10) + with pytest.raises(AssertionError) as assertion_info: + Coordinator(**expected_coordinator_options) + assert str(assertion_info.value) == \ + 'End time (2014-12-22T10:56Z) must be greater than the start time (2015-01-01T10:56Z)' diff --git a/tests/pyoozie/test_package.py b/tests/pyoozie/test_package.py index f7c6bab..faf3a2c 100644 --- a/tests/pyoozie/test_package.py +++ b/tests/pyoozie/test_package.py @@ -1,6 +1,7 @@ # Copyright (c) 2017 "Shopify inc." All rights reserved. -# Use of this source code is governed by a BSD-style license that can be found in the LICENSE file. +# Use of this source code is governed by a MIT-style license that can be found in the LICENSE file. import pyoozie + def test_version(): assert pyoozie.__version__ diff --git a/tests/pyoozie/test_tags.py b/tests/pyoozie/test_tags.py new file mode 100644 index 0000000..a9c4886 --- /dev/null +++ b/tests/pyoozie/test_tags.py @@ -0,0 +1,289 @@ +# -*- coding: utf-8 -*- +# Copyright (c) 2017 "Shopify inc." All rights reserved. +# Use of this source code is governed by a MIT-style license that can be found in the LICENSE file. +from __future__ import unicode_literals, print_function + +import decimal +import pytest + +from pyoozie import Parameters, Configuration, Credentials, Shell, SubWorkflow, GlobalConfiguration, Email +from pyoozie.tags import _validate +from tests.utils import xml_to_dict_unordered + + +@pytest.fixture +def expected_property_values(): + return { + 'boolean': False, + 'decimal': decimal.Decimal('0.75'), + 'float': 0.5, + 'int': 0, + 'long': 10, + 'none': None, + 'unicode': 'ǝnlɐʌ', + 'string': str('value'), + } + + +@pytest.fixture +def expected_property_values_xml(): + return ''' + + boolean + False + + + decimal + 0.75 + + + float + 0.5 + + + int + 0 + + + long + 10 + + + none + + + + string + value + + + unicode + ǝnlɐʌ + ''' + + +def test_validate(): + _validate('ok-id') + + _validate('very-long-flow-name-that-spans-39-chars') + + with pytest.raises(AssertionError) as assertion_info: + _validate('too-long-flow-name-that-spans-more-than-39-chars') + assert str(assertion_info.value) == "Identifier must be less than 39 " \ + "chars long, 'too-long-flow-name-that-spans-more-than-39-chars' is 48" + + with pytest.raises(AssertionError) as assertion_info: + _validate('0-id-starting-with-a-non-alpha-char') + assert str(assertion_info.value) == "Identifier must match ^[a-zA-Z_]" \ + "[\\-_a-zA-Z0-9]{0,38}$, '0-id-starting-with-a-non-alpha-char' " \ + "does not" + + with pytest.raises(AssertionError) as assertion_info: + _validate('id.with.illlegal.chars') + assert str(assertion_info.value) == "Identifier must match ^[a-zA-Z_]" \ + "[\\-_a-zA-Z0-9]{0,38}$, 'id.with.illlegal.chars' does not" + + +def test_parameters(expected_property_values, expected_property_values_xml): + actual = Parameters(expected_property_values).xml(indent=True) + expected = '{xml}'.format(xml=expected_property_values_xml) + assert xml_to_dict_unordered(expected) == xml_to_dict_unordered(actual) + + +def test_configuration(expected_property_values, expected_property_values_xml): + actual = Configuration(expected_property_values).xml(indent=True) + expected = '{xml}'.format(xml=expected_property_values_xml) + assert xml_to_dict_unordered(expected) == xml_to_dict_unordered(actual) + + +def test_credentials(expected_property_values, expected_property_values_xml): + actual = Credentials(expected_property_values, + credential_name='my-hcat-creds', + credential_type='hcat').xml(indent=True) + expected = "{xml}".format( + xml=expected_property_values_xml) + assert xml_to_dict_unordered(expected) == xml_to_dict_unordered(actual) + + +def test_shell(): + actual = Shell( + exec_command='${EXEC}', + job_tracker='${jobTracker}', + name_node='${nameNode}', + prepares=None, + job_xml_files=['/user/${wf:user()}/job.xml'], + configuration={ + 'mapred.job.queue.name': '${queueName}' + }, + arguments=['A', 'B'], + env_vars=None, + files=['/users/blabla/testfile.sh#testfile'], + archives=['/users/blabla/testarchive.jar#testarchive'], + capture_output=False + ).xml(indent=True) + assert xml_to_dict_unordered(''' + + ${jobTracker} + ${nameNode} + /user/${wf:user()}/job.xml + + + mapred.job.queue.name + ${queueName} + + + ${EXEC} + A + B + /users/blabla/testfile.sh#testfile + /users/blabla/testarchive.jar#testarchive + ''') == xml_to_dict_unordered(actual) + + +def test_subworkflow(): + app_path = '/user/username/workflows/cool-flow' + + actual = SubWorkflow( + app_path=app_path, + propagate_configuration=False, + configuration=None, + ).xml(indent=True) + assert xml_to_dict_unordered(''' + + /user/username/workflows/cool-flow + + ''') == xml_to_dict_unordered(actual) + + actual = SubWorkflow( + app_path=app_path, + propagate_configuration=True, + configuration=None, + ).xml(indent=True) + assert xml_to_dict_unordered(''' + + /user/username/workflows/cool-flow + + + ''') == xml_to_dict_unordered(actual) + + actual = SubWorkflow( + app_path=app_path, + propagate_configuration=True, + configuration={ + 'job_tracker': 'a_jobtracker', + 'name_node': 'hdfs://localhost:50070', + }, + ).xml(indent=True) + assert xml_to_dict_unordered(''' + + /user/username/workflows/cool-flow + + + + job_tracker + a_jobtracker + + + name_node + hdfs://localhost:50070 + + + + ''') == xml_to_dict_unordered(actual) + + +def test_global_configuration(): + configuration = { + 'mapred.job.queue.name': '${queueName}' + } + + actual = GlobalConfiguration( + job_tracker='a_jobtracker', + name_node='hdfs://localhost:50070', + job_xml_files=None, + configuration=None, + ).xml(indent=True) + assert xml_to_dict_unordered(''' + + a_jobtracker + hdfs://localhost:50070 + + ''') == xml_to_dict_unordered(actual) + + actual = GlobalConfiguration( + job_tracker='a_jobtracker', + name_node='hdfs://localhost:50070', + job_xml_files=['/user/${wf:user()}/job.xml'], + configuration=configuration, + ).xml(indent=True) + assert xml_to_dict_unordered(''' + + a_jobtracker + hdfs://localhost:50070 + /user/${wf:user()}/job.xml + + + mapred.job.queue.name + ${queueName} + + + + ''') == xml_to_dict_unordered(actual) + + +def test_email(): + actual = Email( + to='mrt@example.com', + subject='Chains', + body='Do you need more?', + ).xml(indent=True) + assert xml_to_dict_unordered(''' + + mrt@example.com + Chains + Do you need more? + + ''') == xml_to_dict_unordered(actual) + + actual = Email( + to='mrt@example.com', + subject='Chains', + body='Do you need more?', + cc='ateam@example.com', + bcc='jewelrystore@example.com', + content_type='text/plain', + attachments='/path/to/attachment/on/hdfs.txt', + ).xml(indent=True) + assert xml_to_dict_unordered(''' + + mrt@example.com + Chains + Do you need more? + ateam@example.com + jewelrystore@example.com + text/plain + /path/to/attachment/on/hdfs.txt + + ''') == xml_to_dict_unordered(actual) + + actual = Email( + to=['mrt@example.com', 'b.a.baracus@example.com'], + subject='Chains', + body='Do you need more?', + cc=('ateam@example.com', 'webmaster@example.com'), + bcc=set(['jewelrystore@example.com', 'goldchains4u@example.com']), + content_type='text/plain', + attachments=['/path/on/hdfs.txt', + '/another/path/on/hdfs.txt'], + ).xml(indent=True) + assert xml_to_dict_unordered(''' + + b.a.baracus@example.com,mrt@example.com + Chains + Do you need more? + ateam@example.com,webmaster@example.com + goldchains4u@example.com,jewelrystore@example.com + text/plain + /another/path/on/hdfs.txt,/path/on/hdfs.txt + + ''') == xml_to_dict_unordered(actual) diff --git a/tests/utils.py b/tests/utils.py new file mode 100644 index 0000000..4b26f94 --- /dev/null +++ b/tests/utils.py @@ -0,0 +1,16 @@ +# Copyright (c) 2017 "Shopify inc." All rights reserved. +# Use of this source code is governed by a MIT-style license that can be found in the LICENSE file. +from __future__ import unicode_literals, print_function + +import xmltodict + + +def xml_to_dict_unordered(xml): + def unorder(value): + if hasattr(value, 'items'): + return {k: unorder(v) for k, v in value.items()} + elif isinstance(value, list): + return sorted([unorder(v) for v in value], key=str) + else: + return value + return unorder(xmltodict.parse(xml))