-
Notifications
You must be signed in to change notification settings - Fork 12
Changes from 2 commits
ef838ef
7911c2e
4e3f16f
9f37157
2967b86
5fe5737
3607af8
db9c2e3
24cca4c
bf9c623
c913f2f
caf77ff
6a40f02
1288e8e
f71f71b
badb86c
394a435
1df202d
80732c6
c55a82f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
include README.md LICENSE |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,5 +24,6 @@ autopep8: | |
lint: | ||
@echo 'Linting...' | ||
@pylint --rcfile=pylintrc pyoozie tests | ||
@flake8 | ||
|
||
autolint: autopep8 lint |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,8 @@ | ||
pytest>=2.7 | ||
pytest-cov | ||
pytest-randomly | ||
pytest-mock | ||
pylint | ||
xmltodict | ||
autopep8 | ||
flake8 |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -35,8 +35,18 @@ load-plugins= | |
# it should appear only once). | ||
|
||
# CHANGED: | ||
# too-few-public-methods | ||
# too-many-instance-attributes | ||
# too-many-arguments | ||
# protected-access; _xml() calls | ||
# invalid-name; some 2-letter variables are OK | ||
# redefined-outer-name; doesn't play well with pytest fixtures | ||
# fixme; we're in-development, so we need some of these | ||
# C0111: Missing docstring | ||
disable=C0111 | ||
# D101 Missing docstring in public class | ||
# D102 Missing docstring in public method | ||
# D105 Missing docstring in magic method | ||
disable=C0111,D101,D102,D105,too-few-public-methods,too-many-instance-attributes,too-many-arguments,protected-access,invalid-name,redefined-outer-name,fixme | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So our lint config which disallows lines longer than 120 itself has lines far longer than 120. |
||
|
||
|
||
[REPORTS] | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,21 @@ | ||
# Copyright (c) 2017 "Shopify inc." All rights reserved. | ||
# Use of this source code is governed by a BSD-style license that can be found in the LICENSE file. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Our policy for open-source is to license with MIT. |
||
|
||
from pyoozie.coordinator import Coordinator, ExecutionOrder | ||
from pyoozie.tags import validate, Parameters, Configuration, Credentials, Shell, SubWorkflow, GlobalConfiguration, \ | ||
Email, IdentifierTooLongError | ||
from pyoozie.builder import workflow, coordinator, _coordinator_submission_xml, _workflow_submission_xml | ||
|
||
__version__ = '0.0.0' | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When should we update this; on every intended release? Every PR would become a bit tedious. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not every PR, it just seems like |
||
|
||
__all__ = ( | ||
# coordinator | ||
'Coordinator', 'ExecutionOrder', 'Configuration', 'Parameters', | ||
|
||
# tags | ||
'validate', 'Parameters', 'Configuration', 'Credentials', 'Shell', 'SubWorkflow', 'GlobalConfiguration', \ | ||
'Email', 'IdentifierTooLongError', | ||
|
||
# builder | ||
'workflow', 'coordinator', '_coordinator_submission_xml', '_workflow_submission_xml', | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Still weirded out by exporting private functions. |
||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,135 @@ | ||
# Copyright (c) 2017 "Shopify inc." All rights reserved. | ||
# Use of this source code is governed by a BSD-style license that can be found in the LICENSE file. | ||
from __future__ import unicode_literals | ||
|
||
from pyoozie.coordinator import Coordinator | ||
from pyoozie.tags import Configuration, Parameters | ||
|
||
|
||
def _workflow_submission_xml(hadoop_user, hdfs_path, configuration=None, indent=False): | ||
"""Generate a Workflow XML submission message to POST to Oozie.""" | ||
submission = Configuration(configuration) | ||
submission.update({ | ||
'user.name': hadoop_user, | ||
'oozie.wf.application.path': hdfs_path | ||
}) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You've inverted the order of precedence here from the existing code. Intentional? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes; I wanted to force There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
That's not a bad idea, but also not that important. |
||
return submission.xml(indent) | ||
|
||
|
||
def _coordinator_submission_xml(hadoop_user, hdfs_path, configuration=None, indent=False): | ||
"""Generate a Coordinator XML submission message to POST to Oozie.""" | ||
submission = Configuration(configuration) | ||
submission.update({ | ||
'user.name': hadoop_user, | ||
'oozie.coord.application.path': hdfs_path, | ||
}) | ||
return submission.xml(indent) | ||
|
||
|
||
class workflow(object): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm shocked that lint/flake doesn't protest about a lowercase class name. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Probably because I added |
||
|
||
def __init__(self, name): | ||
# Initially, let's just use a static template and only one action payload and one action on error | ||
self._name = name | ||
self._action_name = None | ||
self._action_payload = None | ||
self._action_error = None | ||
self._kill_message = None | ||
|
||
def add_action(self, name, action, action_on_error, kill_on_error='${wf:lastErrorNode()} - ${wf:id()}'): | ||
# Today you can't rename your action and you can only have one, but in the future you can add multiple | ||
# named actions | ||
if self._action_name is None and self._action_payload is None and self._action_error is None and \ | ||
self._kill_message is None: | ||
self._action_name = name | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The single space indentation difference here is super confusing. |
||
self._action_payload = action | ||
self._action_error = action_on_error | ||
self._kill_message = kill_on_error | ||
else: | ||
raise NotImplementedError("Can only add one action in this version") | ||
return self | ||
|
||
def build(self, indent=False): | ||
def remove_header(xml): | ||
return xml.replace("<?xml version='1.0' encoding='UTF-8'?>", '') | ||
return ''' | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you like, you could use implicit line joining here to avoid the unindent, e.g. ('<?xml version="1.0" encoding="UTF-8"?>'
'<workflow-app xmlns="uri:oozie:workflow:0.5"'
' name="{name}">'
'etc...') The downside is you'd have to put the (This is fine as-is; I just wanted to mention it.) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, if we start having too many of these, we should consider using templates. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
This inline template hopefully be a short-lived exception. |
||
<?xml version="1.0" encoding="UTF-8"?> | ||
<workflow-app xmlns="uri:oozie:workflow:0.5" | ||
name="{name}"> | ||
<start to="action-{action_name}" /> | ||
<action name="action-{action_name}"> | ||
{action_payload_xml} | ||
<ok to="end" /> | ||
<error to="action-error" /> | ||
</action> | ||
<action name="action-error"> | ||
{action_error_xml} | ||
<ok to="kill" /> | ||
<error to="kill" /> | ||
</action> | ||
<kill name="kill"> | ||
<message>{kill_message}</message> | ||
</kill> | ||
<end name="end" /> | ||
</workflow-app> | ||
'''.format(action_payload_xml=remove_header(self._action_payload.xml(indent=indent)), | ||
action_error_xml=remove_header(self._action_error.xml(indent=indent)), | ||
kill_message=self._kill_message, | ||
action_name=self._action_name, | ||
name=self._name).strip() | ||
|
||
def submit(self, oozie_url, hdfs_path, hadoop_user, hdfs_callback, timeout_in_seconds=None, | ||
verbose=False, start=False, indent=False): | ||
xml = self.build(indent=indent) | ||
hdfs_callback(hdfs_path, xml) | ||
# TODO create Oozie API and submit | ||
from mock import Mock | ||
OozieAPI = Mock() | ||
api = OozieAPI(url=oozie_url, user=hadoop_user, timeout=timeout_in_seconds, verbose=verbose) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Near term I think it'd make more sense to pass in a preconstructed |
||
api.jobs_submit_workflow(hdfs_path=hdfs_path, start=start) | ||
raise NotImplementedError() | ||
|
||
|
||
class coordinator(object): | ||
|
||
def __init__(self, name, workflow, frequency_in_minutes, start, end=None, timezone=None, | ||
workflow_configuration=None, timeout_in_minutes=None, concurrency=None, execution_order=None, | ||
throttle=None, parameters=None): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not builder-pattern all of this? Something like:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This may demonstrate that the builder pattern in Python is unneeded; let's discuss IRL. |
||
workflow_configuration = Configuration(workflow_configuration) if workflow_configuration else None | ||
self._coordinator = Coordinator( | ||
name=name, | ||
workflow_app_path=None, # Defer this until the build | ||
frequency=frequency_in_minutes, | ||
start=start, | ||
end=end, | ||
timezone=timezone, | ||
workflow_configuration=workflow_configuration, | ||
timeout=timeout_in_minutes, | ||
concurrency=concurrency, | ||
execution_order=execution_order, | ||
throttle=throttle, | ||
parameters=Parameters(parameters) if parameters else None | ||
) | ||
self._workflow = workflow | ||
|
||
def build(self, workflow_hdfs_path, indent=False): | ||
self._coordinator.workflow_app_path = workflow_hdfs_path | ||
return self._coordinator.xml(indent) | ||
|
||
def submit(self, oozie_url, workflow_hdfs_path, coord_hdfs_path, hadoop_user, hdfs_callback, | ||
timeout_in_seconds=None, verbose=False, indent=False): | ||
self._workflow.submit( | ||
oozie_url=oozie_url, | ||
hdfs_path=workflow_hdfs_path, | ||
hadoop_user=hadoop_user, | ||
hdfs_callback=hdfs_callback, | ||
start=False, | ||
indent=False) | ||
xml = self.build(workflow_hdfs_path, indent=indent) | ||
hdfs_callback(coord_hdfs_path, xml) | ||
# TODO create Oozie API and submit | ||
from mock import Mock | ||
OozieAPI = Mock() | ||
api = OozieAPI(url=oozie_url, user=hadoop_user, timeout=timeout_in_seconds, verbose=verbose) | ||
api.job_submit_coordinator(hdfs_path=coord_hdfs_path) | ||
raise NotImplementedError() |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,102 @@ | ||
# Copyright (c) 2017 "Shopify inc." All rights reserved. | ||
# Use of this source code is governed by a BSD-style license that can be found in the LICENSE file. | ||
from __future__ import unicode_literals | ||
|
||
from datetime import timedelta | ||
from enum import Enum | ||
|
||
from pyoozie.tags import Xml, Parameters, validate | ||
|
||
|
||
class ExecutionOrder(Enum): | ||
"""Execution order used for coordinator jobs.""" | ||
|
||
FIFO = 'FIFO' | ||
|
||
LIFO = 'LIFO' | ||
|
||
LAST_ONLY = 'LAST_ONLY' | ||
# "When LAST_ONLY is set, an action that is WAITING or READY will be SKIPPED when the current time is past the | ||
# next action's nominal time. For example, suppose action 1 and 2 are both WAITING , the current time is | ||
# 5:00pm, and action 2's nominal time is 5:10pm. In 10 minutes from now, at 5:10pm, action 1 will become | ||
# SKIPPED, assuming it doesn't transition to SUBMITTED (or a terminal state) before then. Another way of | ||
# thinking about this is to view it as similar to setting the timeout equal to the frequency, except that the | ||
# SKIPPED status doesn't cause the coordinator job to eventually become DONEWITHERROR and can actually become | ||
# SUCCEEDED (i.e. it's a "good" version of TIMEDOUT ). LAST_ONLY is useful if you want a recurring job, but do | ||
# not actually care about the individual instances and just always want the latest action." | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we really need Oozie doc pasted here? (I know, was there in the original source too.) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Probably not, but maybe as a docstring? I do want to explain what these are. |
||
|
||
NONE = 'NONE' | ||
# "Similar to LAST_ONLY except all older materializations are skipped. When NONE is set, an action that is | ||
# WAITING or READY will be SKIPPED when the current time is more than a certain configured number of minutes | ||
# (tolerance) past the action's nominal time." | ||
|
||
def __str__(self): | ||
return self.value | ||
|
||
|
||
def format_datetime(value): | ||
return value.strftime('%Y-%m-%dT%H:%MZ') | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No seconds? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oozie's time grain is
|
||
|
||
|
||
class Coordinator(Xml): | ||
|
||
def __init__(self, name, workflow_app_path, frequency, start, end=None, timezone=None, | ||
workflow_configuration=None, timeout=None, concurrency=None, execution_order=None, throttle=None, | ||
parameters=None): | ||
super(Coordinator, self).__init__('coordinator-app') | ||
# Compose and validate dates/frequencies | ||
if end is None: | ||
end = start + timedelta(days=100 * 365.24) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you please extract this to a constant? It's not likely to be reused, but a name would make it easier to see your intention, e.g. ONE_HUNDRED_YEARS = 100 * 365.24 # days There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Boo for Python not supporting |
||
assert end > start, "End time (%s) must be greater than the start time (%s)" % \ | ||
(format_datetime(end), format_datetime(start)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we standardize on |
||
assert frequency >= 5, "Frequency (%d min) must be greater than or equal to 5 min" % frequency | ||
|
||
# Coordinator | ||
self.name = validate(name) | ||
self.frequency = frequency | ||
self.start = start | ||
self.end = end | ||
self.timezone = timezone if timezone else 'UTC' | ||
|
||
# Workflow action | ||
self.workflow_app_path = workflow_app_path | ||
self.workflow_configuration = workflow_configuration | ||
|
||
# Controls | ||
self.timeout = timeout | ||
self.concurrency = concurrency | ||
self.execution_order = execution_order | ||
self.throttle = throttle | ||
|
||
self.parameters = Parameters(parameters) | ||
|
||
def _xml(self, doc, tag, text): | ||
with tag(self.xml_tag, xmlns="uri:oozie:coordinator:0.5", name=self.name, frequency=str(self.frequency), | ||
start=format_datetime(self.start), end=format_datetime(self.end), timezone=self.timezone): | ||
|
||
if self.parameters: | ||
self.parameters._xml(doc, tag, text) | ||
|
||
if self.timeout or self.concurrency or self.execution_order or self.throttle: | ||
with tag('controls'): | ||
if self.timeout: | ||
with tag('timeout'): | ||
text(str(self.timeout)) | ||
if self.concurrency: | ||
with tag('concurrency'): | ||
text(str(self.concurrency)) | ||
if self.execution_order: | ||
with tag('execution'): | ||
text(str(self.execution_order)) | ||
if self.throttle: | ||
with tag('throttle'): | ||
text(str(self.throttle)) | ||
|
||
with tag('action'): | ||
with tag('workflow'): | ||
with tag('app-path'): | ||
text(self.workflow_app_path) | ||
if self.workflow_configuration: | ||
self.workflow_configuration._xml(doc, tag, text) | ||
|
||
return doc |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's use both linters