-
Notifications
You must be signed in to change notification settings - Fork 12
Changes from 19 commits
ef838ef
7911c2e
4e3f16f
9f37157
2967b86
5fe5737
3607af8
db9c2e3
24cca4c
bf9c623
c913f2f
caf77ff
6a40f02
1288e8e
f71f71b
badb86c
394a435
1df202d
80732c6
c55a82f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,11 +1,19 @@ | ||
Copyright 2017 "Shopify inc." | ||
|
||
Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: | ||
Permission is hereby granted, free of charge, to any person obtaining a copy | ||
of this software and associated documentation files (the "Software"), to deal | ||
in the Software without restriction, including without limitation the rights | ||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | ||
copies of the Software, and to permit persons to whom the Software is | ||
furnished to do so, subject to the following conditions: | ||
|
||
1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. | ||
The above copyright notice and this permission notice shall be included in all | ||
copies or substantial portions of the Software. | ||
|
||
2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. | ||
|
||
3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission. | ||
|
||
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. | ||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | ||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | ||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | ||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | ||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | ||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | ||
SOFTWARE. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
include README.md LICENSE |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,5 +24,6 @@ autopep8: | |
lint: | ||
@echo 'Linting...' | ||
@pylint --rcfile=pylintrc pyoozie tests | ||
@flake8 | ||
|
||
autolint: autopep8 lint |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,8 @@ | ||
pytest>=2.7 | ||
pytest-cov | ||
pytest-randomly | ||
pytest-mock | ||
pylint | ||
xmltodict | ||
autopep8 | ||
flake8 |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,19 @@ | ||
# Copyright (c) 2017 "Shopify inc." All rights reserved. | ||
# Use of this source code is governed by a BSD-style license that can be found in the LICENSE file. | ||
# Use of this source code is governed by a MIT-style license that can be found in the LICENSE file. | ||
|
||
from pyoozie.coordinator import Coordinator, ExecutionOrder | ||
from pyoozie.tags import Parameters, Configuration, Credentials, Shell, SubWorkflow, GlobalConfiguration, Email | ||
from pyoozie.builder import WorkflowBuilder, CoordinatorBuilder | ||
|
||
__version__ = '0.0.0' | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When should we update this; on every intended release? Every PR would become a bit tedious. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not every PR, it just seems like |
||
|
||
__all__ = ( | ||
# coordinator | ||
'Coordinator', 'ExecutionOrder', 'Configuration', 'Parameters', | ||
|
||
# tags | ||
'Parameters', 'Configuration', 'Credentials', 'Shell', 'SubWorkflow', 'GlobalConfiguration', 'Email', | ||
|
||
# builder | ||
'WorkflowBuilder', 'CoordinatorBuilder', | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,104 @@ | ||
# Copyright (c) 2017 "Shopify inc." All rights reserved. | ||
# Use of this source code is governed by a MIT-style license that can be found in the LICENSE file. | ||
from __future__ import unicode_literals | ||
|
||
from pyoozie.coordinator import Coordinator | ||
from pyoozie.tags import Configuration | ||
|
||
|
||
def _workflow_submission_xml(username, workflow_xml_path, configuration=None, indent=False): | ||
"""Generate a Workflow XML submission message to POST to Oozie.""" | ||
submission = Configuration(configuration) | ||
submission.update({ | ||
'user.name': username, | ||
'oozie.wf.application.path': workflow_xml_path, | ||
}) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You've inverted the order of precedence here from the existing code. Intentional? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes; I wanted to force There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
That's not a bad idea, but also not that important. |
||
return submission.xml(indent) | ||
|
||
|
||
def _coordinator_submission_xml(username, coord_xml_path, configuration=None, indent=False): | ||
"""Generate a Coordinator XML submission message to POST to Oozie.""" | ||
submission = Configuration(configuration) | ||
submission.update({ | ||
'user.name': username, | ||
'oozie.coord.application.path': coord_xml_path, | ||
}) | ||
return submission.xml(indent) | ||
|
||
|
||
class WorkflowBuilder(object): | ||
|
||
def __init__(self, name): | ||
# Initially, let's just use a static template and only one action payload and one action on error | ||
self._name = name | ||
self._action_name = None | ||
self._action_payload = None | ||
self._action_error = None | ||
self._kill_message = None | ||
|
||
def add_action(self, name, action, action_on_error, kill_on_error='${wf:lastErrorNode()} - ${wf:id()}'): | ||
# Today you can't rename your action and you can only have one, but in the future you can add multiple | ||
# named actions | ||
if any((self._action_name, self._action_payload, self._action_error, self._kill_message)): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ha. I considered suggesting this pattern as a replacement for the multiline There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
raise NotImplementedError("Can only add one action in this version") | ||
else: | ||
self._action_name = name | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The single space indentation difference here is super confusing. |
||
self._action_payload = action | ||
self._action_error = action_on_error | ||
self._kill_message = kill_on_error | ||
|
||
return self | ||
|
||
def build(self, indent=False): | ||
def format_xml(xml): | ||
xml = xml.replace("<?xml version='1.0' encoding='UTF-8'?>", '') | ||
return '\n'.join([(' ' * 8) + line for line in xml.strip().split('\n')]) | ||
return ''' | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you like, you could use implicit line joining here to avoid the unindent, e.g. ('<?xml version="1.0" encoding="UTF-8"?>'
'<workflow-app xmlns="uri:oozie:workflow:0.5"'
' name="{name}">'
'etc...') The downside is you'd have to put the (This is fine as-is; I just wanted to mention it.) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, if we start having too many of these, we should consider using templates. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
This inline template hopefully be a short-lived exception. |
||
<?xml version="1.0" encoding="UTF-8"?> | ||
<workflow-app xmlns="uri:oozie:workflow:0.5" | ||
name="{name}"> | ||
<start to="action-{action_name}" /> | ||
<action name="action-{action_name}"> | ||
{action_payload_xml} | ||
<ok to="end" /> | ||
<error to="action-error" /> | ||
</action> | ||
<action name="action-error"> | ||
{action_error_xml} | ||
<ok to="kill" /> | ||
<error to="kill" /> | ||
</action> | ||
<kill name="kill"> | ||
<message>{kill_message}</message> | ||
</kill> | ||
<end name="end" /> | ||
</workflow-app> | ||
'''.format(action_payload_xml=format_xml(self._action_payload.xml(indent=indent)), | ||
action_error_xml=format_xml(self._action_error.xml(indent=indent)), | ||
kill_message=self._kill_message, | ||
action_name=self._action_name, | ||
name=self._name).strip() | ||
|
||
|
||
class CoordinatorBuilder(object): | ||
|
||
def __init__(self, name, workflow_xml_path, frequency_in_minutes, start, end=None, timezone=None, | ||
workflow_configuration=None, timeout_in_minutes=None, concurrency=None, execution_order=None, | ||
throttle=None, parameters=None): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not builder-pattern all of this? Something like:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This may demonstrate that the builder pattern in Python is unneeded; let's discuss IRL. |
||
self._coordinator = Coordinator( | ||
name=name, | ||
workflow_app_path=workflow_xml_path, | ||
frequency=frequency_in_minutes, | ||
start=start, | ||
end=end, | ||
timezone=timezone, | ||
workflow_configuration=workflow_configuration, | ||
timeout=timeout_in_minutes, | ||
concurrency=concurrency, | ||
execution_order=execution_order, | ||
throttle=throttle, | ||
parameters=parameters, | ||
) | ||
|
||
def build(self, indent=False): | ||
return self._coordinator.xml(indent) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,92 @@ | ||
# Copyright (c) 2017 "Shopify inc." All rights reserved. | ||
# Use of this source code is governed by a MIT-style license that can be found in the LICENSE file. | ||
from __future__ import unicode_literals | ||
|
||
from datetime import timedelta | ||
from enum import Enum | ||
|
||
from pyoozie.tags import _validate, XMLSerializable, Parameters, Configuration | ||
|
||
|
||
ONE_HUNDRED_YEARS = 100 * 365.24 | ||
|
||
|
||
class ExecutionOrder(Enum): | ||
"""Execution order used for coordinator jobs.""" | ||
|
||
FIFO = 'FIFO' | ||
LIFO = 'LIFO' | ||
LAST_ONLY = 'LAST_ONLY' | ||
NONE = 'NONE' | ||
|
||
def __str__(self): | ||
return self.value | ||
|
||
|
||
def format_datetime(value): | ||
return value.strftime('%Y-%m-%dT%H:%MZ') | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No seconds? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oozie's time grain is
|
||
|
||
|
||
class Coordinator(XMLSerializable): | ||
|
||
def __init__(self, name, workflow_app_path, frequency, start, end=None, timezone=None, | ||
workflow_configuration=None, timeout=None, concurrency=None, execution_order=None, throttle=None, | ||
parameters=None): | ||
super(Coordinator, self).__init__('coordinator-app') | ||
# Compose and validate dates/frequencies | ||
if end is None: | ||
end = start + timedelta(days=ONE_HUNDRED_YEARS) | ||
assert end > start, "End time ({end}) must be greater than the start time ({start})".format( | ||
end=format_datetime(end), start=format_datetime(start)) | ||
assert frequency >= 5, "Frequency ({frequency} min) must be greater than or equal to 5 min".format( | ||
frequency=frequency) | ||
|
||
# Coordinator | ||
self.name = _validate(name) | ||
self.frequency = frequency | ||
self.start = start | ||
self.end = end | ||
self.timezone = timezone if timezone else 'UTC' | ||
|
||
# Workflow action | ||
self.workflow_app_path = workflow_app_path | ||
self.workflow_configuration = Configuration(workflow_configuration) | ||
|
||
# Controls | ||
self.timeout = timeout | ||
self.concurrency = concurrency | ||
self.execution_order = execution_order | ||
self.throttle = throttle | ||
|
||
self.parameters = Parameters(parameters) | ||
|
||
def _xml(self, doc, tag, text): | ||
with tag(self.xml_tag, xmlns="uri:oozie:coordinator:0.5", name=self.name, frequency=str(self.frequency), | ||
start=format_datetime(self.start), end=format_datetime(self.end), timezone=self.timezone): | ||
|
||
if self.parameters: | ||
self.parameters._xml(doc, tag, text) | ||
|
||
if self.timeout or self.concurrency or self.execution_order or self.throttle: | ||
with tag('controls'): | ||
if self.timeout: | ||
with tag('timeout'): | ||
text(str(self.timeout)) | ||
if self.concurrency: | ||
with tag('concurrency'): | ||
text(str(self.concurrency)) | ||
if self.execution_order: | ||
with tag('execution'): | ||
text(str(self.execution_order)) | ||
if self.throttle: | ||
with tag('throttle'): | ||
text(str(self.throttle)) | ||
|
||
with tag('action'): | ||
with tag('workflow'): | ||
with tag('app-path'): | ||
text(self.workflow_app_path) | ||
if self.workflow_configuration: | ||
self.workflow_configuration._xml(doc, tag, text) | ||
|
||
return doc |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's use both linters