Skip to content
This repository has been archived by the owner on Nov 1, 2024. It is now read-only.

Add basic XML tag generation #2

Merged
merged 20 commits into from
Feb 9, 2017
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions MANIFEST.in
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include README.md LICENSE
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,6 @@ autopep8:
lint:
@echo 'Linting...'
@pylint --rcfile=pylintrc pyoozie tests
@flake8
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use both linters


autolint: autopep8 lint
3 changes: 3 additions & 0 deletions dev_requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
pytest>=2.7
pytest-cov
pytest-randomly
pytest-mock
pylint
xmltodict
autopep8
flake8
12 changes: 11 additions & 1 deletion pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,18 @@ load-plugins=
# it should appear only once).

# CHANGED:
# too-few-public-methods
# too-many-instance-attributes
# too-many-arguments
# protected-access; _xml() calls
# invalid-name; some 2-letter variables are OK
# redefined-outer-name; doesn't play well with pytest fixtures
# fixme; we're in-development, so we need some of these
# C0111: Missing docstring
disable=C0111
# D101 Missing docstring in public class
# D102 Missing docstring in public method
# D105 Missing docstring in magic method
disable=C0111,D101,D102,D105,too-few-public-methods,too-many-instance-attributes,too-many-arguments,protected-access,invalid-name,redefined-outer-name,fixme
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So our lint config which disallows lines longer than 120 itself has lines far longer than 120.
Sounds like a workaround exists: https://lists.logilab.org/pipermail/python-projects/2010-August/002579.html



[REPORTS]
Expand Down
17 changes: 17 additions & 0 deletions pyoozie/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,21 @@
# Copyright (c) 2017 "Shopify inc." All rights reserved.
# Use of this source code is governed by a BSD-style license that can be found in the LICENSE file.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Our policy for open-source is to license with MIT.


from pyoozie.coordinator import Coordinator, ExecutionOrder
from pyoozie.tags import validate, Parameters, Configuration, Credentials, Shell, SubWorkflow, GlobalConfiguration, \
Email, IdentifierTooLongError
from pyoozie.builder import workflow, coordinator, _coordinator_submission_xml, _workflow_submission_xml

__version__ = '0.0.0'

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

0.0.1?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When should we update this; on every intended release? Every PR would become a bit tedious.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not every PR, it just seems like 0.0.1 is a better starting place.


__all__ = (
# coordinator
'Coordinator', 'ExecutionOrder', 'Configuration', 'Parameters',

# tags
'validate', 'Parameters', 'Configuration', 'Credentials', 'Shell', 'SubWorkflow', 'GlobalConfiguration', \
'Email', 'IdentifierTooLongError',

# builder
'workflow', 'coordinator', '_coordinator_submission_xml', '_workflow_submission_xml',
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still weirded out by exporting private functions.

)
135 changes: 135 additions & 0 deletions pyoozie/builder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
# Copyright (c) 2017 "Shopify inc." All rights reserved.
# Use of this source code is governed by a BSD-style license that can be found in the LICENSE file.
from __future__ import unicode_literals

from pyoozie.coordinator import Coordinator
from pyoozie.tags import Configuration, Parameters


def _workflow_submission_xml(hadoop_user, hdfs_path, configuration=None, indent=False):
"""Generate a Workflow XML submission message to POST to Oozie."""
submission = Configuration(configuration)
submission.update({
'user.name': hadoop_user,
'oozie.wf.application.path': hdfs_path
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You've inverted the order of precedence here from the existing code. Intentional?
https://github.com/Shopify/starscream/blob/master/starscream/scheduling/oozie/message.py#L8-L13

Copy link
Contributor Author

@cfournie cfournie Feb 8, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes; I wanted to force hadoop_user and hdfs_path to be the preferred way to specify user.name and oozie.wf.application.path. Should I warn the user if they specify an overlapping config? Or raise an exception?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should I warn the user if they specify an overlapping config? Or raise an exception?

That's not a bad idea, but also not that important.

return submission.xml(indent)


def _coordinator_submission_xml(hadoop_user, hdfs_path, configuration=None, indent=False):
"""Generate a Coordinator XML submission message to POST to Oozie."""
submission = Configuration(configuration)
submission.update({
'user.name': hadoop_user,
'oozie.coord.application.path': hdfs_path,
})
return submission.xml(indent)


class workflow(object):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm shocked that lint/flake doesn't protest about a lowercase class name.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably because I added invalid-name to pylintrc's ignore


def __init__(self, name):
# Initially, let's just use a static template and only one action payload and one action on error
self._name = name
self._action_name = None
self._action_payload = None
self._action_error = None
self._kill_message = None

def add_action(self, name, action, action_on_error, kill_on_error='${wf:lastErrorNode()} - ${wf:id()}'):
# Today you can't rename your action and you can only have one, but in the future you can add multiple
# named actions
if self._action_name is None and self._action_payload is None and self._action_error is None and \
self._kill_message is None:
self._action_name = name
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The single space indentation difference here is super confusing.

self._action_payload = action
self._action_error = action_on_error
self._kill_message = kill_on_error
else:
raise NotImplementedError("Can only add one action in this version")
return self

def build(self, indent=False):
def remove_header(xml):
return xml.replace("<?xml version='1.0' encoding='UTF-8'?>", '')
return '''

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you like, you could use implicit line joining here to avoid the unindent, e.g.

('<?xml version="1.0" encoding="UTF-8"?>'
 '<workflow-app xmlns="uri:oozie:workflow:0.5"'
 '              name="{name}">'
 'etc...')

The downside is you'd have to put the \ns in yourself if you want them in the output.

(This is fine as-is; I just wanted to mention it.)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, if we start having too many of these, we should consider using templates.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, if we start having too many of these, we should consider using templates.

This inline template hopefully be a short-lived exception.

<?xml version="1.0" encoding="UTF-8"?>
<workflow-app xmlns="uri:oozie:workflow:0.5"
name="{name}">
<start to="action-{action_name}" />
<action name="action-{action_name}">
{action_payload_xml}
<ok to="end" />
<error to="action-error" />
</action>
<action name="action-error">
{action_error_xml}
<ok to="kill" />
<error to="kill" />
</action>
<kill name="kill">
<message>{kill_message}</message>
</kill>
<end name="end" />
</workflow-app>
'''.format(action_payload_xml=remove_header(self._action_payload.xml(indent=indent)),
action_error_xml=remove_header(self._action_error.xml(indent=indent)),
kill_message=self._kill_message,
action_name=self._action_name,
name=self._name).strip()

def submit(self, oozie_url, hdfs_path, hadoop_user, hdfs_callback, timeout_in_seconds=None,
verbose=False, start=False, indent=False):
xml = self.build(indent=indent)
hdfs_callback(hdfs_path, xml)
# TODO create Oozie API and submit
from mock import Mock
OozieAPI = Mock()
api = OozieAPI(url=oozie_url, user=hadoop_user, timeout=timeout_in_seconds, verbose=verbose)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Near term I think it'd make more sense to pass in a preconstructed OozieAPI, since the likely usecase is submitting multiple objects. Alternately, don't directly link these two at all (i.e. have the builder write to hdfs and stop).

api.jobs_submit_workflow(hdfs_path=hdfs_path, start=start)
raise NotImplementedError()


class coordinator(object):

def __init__(self, name, workflow, frequency_in_minutes, start, end=None, timezone=None,
workflow_configuration=None, timeout_in_minutes=None, concurrency=None, execution_order=None,
throttle=None, parameters=None):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not builder-pattern all of this? Something like:

coordinator = coordinator('my-flow', workflow)
coordinator.set_frequency(360)
coordinator.set_start(datetime.now())

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may demonstrate that the builder pattern in Python is unneeded; let's discuss IRL.

workflow_configuration = Configuration(workflow_configuration) if workflow_configuration else None
self._coordinator = Coordinator(
name=name,
workflow_app_path=None, # Defer this until the build
frequency=frequency_in_minutes,
start=start,
end=end,
timezone=timezone,
workflow_configuration=workflow_configuration,
timeout=timeout_in_minutes,
concurrency=concurrency,
execution_order=execution_order,
throttle=throttle,
parameters=Parameters(parameters) if parameters else None
)
self._workflow = workflow

def build(self, workflow_hdfs_path, indent=False):
self._coordinator.workflow_app_path = workflow_hdfs_path
return self._coordinator.xml(indent)

def submit(self, oozie_url, workflow_hdfs_path, coord_hdfs_path, hadoop_user, hdfs_callback,
timeout_in_seconds=None, verbose=False, indent=False):
self._workflow.submit(
oozie_url=oozie_url,
hdfs_path=workflow_hdfs_path,
hadoop_user=hadoop_user,
hdfs_callback=hdfs_callback,
start=False,
indent=False)
xml = self.build(workflow_hdfs_path, indent=indent)
hdfs_callback(coord_hdfs_path, xml)
# TODO create Oozie API and submit
from mock import Mock
OozieAPI = Mock()
api = OozieAPI(url=oozie_url, user=hadoop_user, timeout=timeout_in_seconds, verbose=verbose)
api.job_submit_coordinator(hdfs_path=coord_hdfs_path)
raise NotImplementedError()
102 changes: 102 additions & 0 deletions pyoozie/coordinator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# Copyright (c) 2017 "Shopify inc." All rights reserved.
# Use of this source code is governed by a BSD-style license that can be found in the LICENSE file.
from __future__ import unicode_literals

from datetime import timedelta
from enum import Enum

from pyoozie.tags import Xml, Parameters, validate


class ExecutionOrder(Enum):
"""Execution order used for coordinator jobs."""

FIFO = 'FIFO'

LIFO = 'LIFO'

LAST_ONLY = 'LAST_ONLY'
# "When LAST_ONLY is set, an action that is WAITING or READY will be SKIPPED when the current time is past the
# next action's nominal time. For example, suppose action 1 and 2 are both WAITING , the current time is
# 5:00pm, and action 2's nominal time is 5:10pm. In 10 minutes from now, at 5:10pm, action 1 will become
# SKIPPED, assuming it doesn't transition to SUBMITTED (or a terminal state) before then. Another way of
# thinking about this is to view it as similar to setting the timeout equal to the frequency, except that the
# SKIPPED status doesn't cause the coordinator job to eventually become DONEWITHERROR and can actually become
# SUCCEEDED (i.e. it's a "good" version of TIMEDOUT ). LAST_ONLY is useful if you want a recurring job, but do
# not actually care about the individual instances and just always want the latest action."
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need Oozie doc pasted here? (I know, was there in the original source too.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably not, but maybe as a docstring? I do want to explain what these are.


NONE = 'NONE'
# "Similar to LAST_ONLY except all older materializations are skipped. When NONE is set, an action that is
# WAITING or READY will be SKIPPED when the current time is more than a certain configured number of minutes
# (tolerance) past the action's nominal time."

def __str__(self):
return self.value


def format_datetime(value):
return value.strftime('%Y-%m-%dT%H:%MZ')

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No seconds?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oozie's time grain is minute. You can probably specify seconds, but they'd be ignored.
From https://oozie.apache.org/docs/4.1.0/CoordinatorFunctionalSpec.html#a4._Datetime_Frequency_and_Time-Period_Representation

If the Oozie processing timezone is UTC , all datetime values are always in UTC down to a minute precision, 'YYYY-MM-DDTHH:mmZ'.



class Coordinator(Xml):

def __init__(self, name, workflow_app_path, frequency, start, end=None, timezone=None,
workflow_configuration=None, timeout=None, concurrency=None, execution_order=None, throttle=None,
parameters=None):
super(Coordinator, self).__init__('coordinator-app')
# Compose and validate dates/frequencies
if end is None:
end = start + timedelta(days=100 * 365.24)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please extract this to a constant? It's not likely to be reused, but a name would make it easier to see your intention, e.g.

ONE_HUNDRED_YEARS = 100 * 365.24 # days

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Boo for Python not supporting timedelta(years=100)

assert end > start, "End time (%s) must be greater than the start time (%s)" % \
(format_datetime(end), format_datetime(start))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we standardize on .format instead of %?

assert frequency >= 5, "Frequency (%d min) must be greater than or equal to 5 min" % frequency

# Coordinator
self.name = validate(name)
self.frequency = frequency
self.start = start
self.end = end
self.timezone = timezone if timezone else 'UTC'

# Workflow action
self.workflow_app_path = workflow_app_path
self.workflow_configuration = workflow_configuration

# Controls
self.timeout = timeout
self.concurrency = concurrency
self.execution_order = execution_order
self.throttle = throttle

self.parameters = Parameters(parameters)

def _xml(self, doc, tag, text):
with tag(self.xml_tag, xmlns="uri:oozie:coordinator:0.5", name=self.name, frequency=str(self.frequency),
start=format_datetime(self.start), end=format_datetime(self.end), timezone=self.timezone):

if self.parameters:
self.parameters._xml(doc, tag, text)

if self.timeout or self.concurrency or self.execution_order or self.throttle:
with tag('controls'):
if self.timeout:
with tag('timeout'):
text(str(self.timeout))
if self.concurrency:
with tag('concurrency'):
text(str(self.concurrency))
if self.execution_order:
with tag('execution'):
text(str(self.execution_order))
if self.throttle:
with tag('throttle'):
text(str(self.throttle))

with tag('action'):
with tag('workflow'):
with tag('app-path'):
text(self.workflow_app_path)
if self.workflow_configuration:
self.workflow_configuration._xml(doc, tag, text)

return doc
Loading