Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: add changes to make quitstore working with eccenca DataPlatform #240

Draft
wants to merge 7 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 52 additions & 11 deletions quit/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,14 +415,41 @@ def getFileReferenceAndContext(self, blob, commit):
return self._blobs.get(blob)

def applyQueryOnCommit(self, parsedQuery, parent_commit_ref, target_ref, query=None,
default_graph=[], named_graph=[]):
default_graph=[], named_graph=[], queryType=None, comment=None):
"""Apply an update query on the graph and the git repository."""
graph, commitid = self.instance(parent_commit_ref)
resultingChanges, exception = graph.update(parsedQuery)
if exception:
# TODO need to revert or invalidate the graph at this point.
pass
oid = self.commit(graph, resultingChanges, 'New Commit from QuitStore', parent_commit_ref,

graphuri = None

if comment is not None:
queryType = comment
elif len(resultingChanges) > 1:
queryType = 'Edit resource in'
for entry in resultingChanges:
if "delta" in entry:
for x in entry["delta"]:
graphuri = str(x)
if queryType == 'Modify':
ls = entry["delta"][x]
if len(ls) == 1 and "removals" in ls[0]:
queryType = 'Remove resource in'
elif len(ls) == 1 and "additions" in ls[0]:
queryType = 'Add resource in'

if queryType is not None and graphuri is not None:
if queryType == 'InsertData' or queryType == 'Load':
message = 'Insert data into Graph <' + graphuri + '>'
elif queryType == 'DeleteData' or queryType == 'DeleteWhere':
message = 'Delete data from Graph <' + graphuri + '>'
else:
message = queryType + ' Graph <' + graphuri + '>'
else:
message = 'New Commit from QuitStore'
oid = self.commit(graph, resultingChanges, message, parent_commit_ref,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, 'New Commit from QuitStore' is a placeholder. It should be a message given by the client not generated based on the query, as the query is already represented in the commit message.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See #237

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the tip! Yes I understand, just want to show the possibility during demonstration =)

target_ref, query=query, default_graph=default_graph,
named_graph=named_graph)
if exception:
Expand Down Expand Up @@ -474,7 +501,7 @@ def commit(self, graph, delta, message, parent_commit_ref, target_ref, query=Non
graphconfig = self._graphconfigs.get(parent_commit_id)
known_files = graphconfig.getfiles().keys()

blobs_new = self._applyKnownGraphs(delta, blobs, parent_commit, index)
blobs_new = self._applyKnownGraphs(delta, blobs, parent_commit, index, graphconfig)
new_contexts = self._applyUnknownGraphs(delta, known_files)
new_config = copy(graphconfig)

Expand Down Expand Up @@ -536,25 +563,39 @@ def _build_message(self, message, query, result, default_graph, named_graph, **k
out.append('{}: "{}"'.format(k, v.replace('"', "\\\"")))
return "\n".join(out)

def _applyKnownGraphs(self, delta, blobs, parent_commit, index):
def _applyKnownGraphs(self, delta, blobs, parent_commit, index, graphconfig):
blobs_new = set()
for blob in blobs:
(fileName, oid) = blob
type = None

try:
file_reference, context = self.getFileReferenceAndContext(blob, parent_commit)
for entry in delta:

changeset = entry['delta'].get(context.identifier, None)

if changeset:
applyChangeset(file_reference, changeset, context.identifier)
del(entry['delta'][context.identifier])

index.add(file_reference.path, file_reference.content)
type = entry['type']
if type == 'DROP':
index.remove(file_reference.path)
index.remove(file_reference.path + '.graph')
graphconfig.removegraph(context.identifier)
del (entry['delta'][context.identifier])
else:
applyChangeset(file_reference, changeset, context.identifier)
del (entry['delta'][context.identifier])

self._blobs.remove(blob)
blob = fileName, index.stash[file_reference.path][0]
self._blobs.set(blob, (file_reference, context))
blobs_new.add(blob)

if type == 'DROP':
pass
else:
index.add(file_reference.path, file_reference.content)
blob = fileName, index.stash[file_reference.path][0]
self._blobs.set(blob, (file_reference, context))
blobs_new.add(blob)

except KeyError:
pass
return blobs_new
Expand Down
92 changes: 86 additions & 6 deletions quit/helpers.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
#!/usr/bin/env python3
import cgi
import logging
import os
from pprint import pprint
from xml.dom.minidom import parse

import uwsgi
from pyparsing import ParseException
from rdflib import Graph
from werkzeug.wsgi import make_chunk_iter

from quit.exceptions import UnSupportedQuery, SparqlProtocolError, NonAbsoluteBaseError
from rdflib.term import URIRef
from rdflib.term import URIRef, Variable
from rdflib.plugins.sparql.parserutils import CompValue, plist
from rdflib.plugins.sparql.parser import parseQuery, parseUpdate
from quit.tools.algebra import translateQuery, translateUpdate
from rdflib.plugins.sparql.parser import parseQuery, parseUpdate, Query
from quit.tools.algebra import translateQuery, translateUpdate, pprintAlgebra
from rdflib.plugins.serializers.nt import _nt_row as _nt
from rdflib.plugins.sparql import parser, algebra
from rdflib.plugins import sparql
Expand Down Expand Up @@ -167,7 +175,8 @@ def configure_query_dataset(parsed_query, default_graphs, named_graphs):
for uri in default_graphs:
parsed_query[1]['datasetClause'].append(CompValue('DatasetClause', default=URIRef(uri)))
for uri in named_graphs:
parsed_query[1]['datasetClause'].append(CompValue('DatasetClause', named=URIRef(uri)))
if uri not in default_graphs:
parsed_query[1]['datasetClause'].append(CompValue('DatasetClause', named=URIRef(uri)))

return parsed_query

Expand Down Expand Up @@ -210,6 +219,7 @@ def parse_query_type(query, base=None, default_graph=[], named_graph=[]):
"""Parse a query and add default and named graph uri if possible."""
try:
parsed_query = parseQuery(query)
parsed_query = parse_named_graph_query(parsed_query)
parsed_query = configure_query_dataset(parsed_query, default_graph, named_graph)
translated_query = translateQuery(parsed_query, base=base)
except ParseException:
Expand Down Expand Up @@ -287,6 +297,7 @@ def parse_sparql_request(request):
default_graph = []
named_graph = []
accept_header = None
comment = None

if request.method == "GET":
default_graph = request.args.getlist('default-graph-uri')
Expand Down Expand Up @@ -317,5 +328,74 @@ def parse_sparql_request(request):
named_graph = request.args.getlist('using-named-graph-uri')
query = request.data.decode("utf-8")
type = 'update'

return query, type, default_graph, named_graph
elif content_mimetype == "application/rdf+xml":
default_graph = request.args.getlist('default-graph-uri')
named_graph = request.args.getlist('named-graph-uri')
graph = request.args.get('graph')
data = request.data.decode("utf-8")
g = Graph()
g.parse(data=data, format='application/rdf+xml')
query = ('INSERT DATA {{ GRAPH <{graph}> '
'{{ {data} }} }}').format(graph=graph,
data=g.serialize(format="nt").decode("utf-8"))
type = 'update'
elif request.method == "PUT":
if 'Content-Type' in request.headers:
content_mimetype, options = parse_options_header(request.headers['Content-Type'])
default_graph = request.args.getlist('default-graph-uri')
named_graph = request.args.getlist('named-graph-uri')
graph = request.args.get('graph')
data = request.input_stream.read()
g = Graph()
if content_mimetype is not None:
g.parse(data=data, format=content_mimetype)
else:
g.parse(data=data, format='application/rdf+xml')
query = ('WITH <{graph}> DELETE {{ ?s ?p ?o }} INSERT {{ {data} }} '
'WHERE {{ ?s ?p ?o }}').format(graph=graph,
data=g.serialize(format="nt").decode("utf-8"))
type = 'update'
comment = 'Replace'

return query, type, default_graph, named_graph, comment


def parse_named_graph_query(query):

datasetClause = query[1].datasetClause

if datasetClause is not None:
default_list = []
named_list = []
for d in datasetClause:
if d.default:
default_list.append(d.default)

for d in datasetClause:
if d.named:
if d.named in default_list:
query[1].datasetClause.remove(d)
else:
named_list.append(d.named)

if len(named_list) > 0:
q = "SELECT * WHERE{ FILTER ( ?"
for t in query[1].where.part:
try:
term = t.term
except ParseException:
raise UnSupportedQuery()
q = q + term + " IN (<" + '>,<'.join(named_list) + ">))}"

parsedFilter = Query.parseString(q, parseAll=True)[1].where.part[0]
query[1].where.part.append(parsedFilter)
else:
if 'graph' in query[1].where.part[0]:
pass
else:
graphValue = query[1].where
whereValue = CompValue('GroupGraphPatternSub', part=[CompValue('GraphGraphPattern',
term=Variable('selfDefinedGraphVariable'), graph=graphValue)])
query[1].where = whereValue

return query
4 changes: 3 additions & 1 deletion quit/tools/algebra.py
Original file line number Diff line number Diff line change
Expand Up @@ -747,6 +747,8 @@ def translateQuery(q, base=None, initNs=None):
q[1], visitPost=functools.partial(translatePName, prologue=prologue))

P, PV = translate(q[1])
if Variable('selfDefinedGraphVariable') in PV:
PV.remove(Variable('selfDefinedGraphVariable'))
datasetClause = q[1].datasetClause
if q[1].name == 'ConstructQuery':

Expand Down Expand Up @@ -777,7 +779,7 @@ def pp(p, ind=" "):
return
print("%s(" % (p.name, ))
for k in p:
print("%s%s =" % (ind, k,), end=' ')
#print("%s%s =" % (ind, k,), end=' ')
pp(p[k], ind + " ")
print("%s)" % ind)

Expand Down
6 changes: 3 additions & 3 deletions quit/tools/evaluate.py
Original file line number Diff line number Diff line change
Expand Up @@ -483,8 +483,8 @@ def evalQuery(graph, query, initBindings, base=None):
# TODO re-enable original behaviour if FROM NAMED works with named graphs
# https://github.com/AKSW/QuitStore/issues/144
elif d.named:
raise FromNamedError
# g = d.named
# ctx.load(g, default=False)
# raise FromNamedError
g = d.named
ctx.load(g, default=False)

return evalPart(ctx, main)
13 changes: 12 additions & 1 deletion quit/tools/update.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,21 @@ def evalDrop(ctx, u):
"""
http://www.w3.org/TR/sparql11-update/#drop
"""
res = {}
res["type"] = "DROP"
res["delta"] = {}
if ctx.dataset.store.graph_aware:
for g in _graphAll(ctx, u.graphiri):
_append(res["delta"], u.graphiri, 'removals', g)
ctx.dataset.store.remove_graph(g)
graph = ctx.dataset.get_context(u.graphiri)
graph -= g
else:
_append(res["delta"], u.graphiri, 'removals', list(u.triples))
evalClear(ctx, u)

return res


def evalInsertData(ctx, u):
"""
Expand Down Expand Up @@ -390,7 +399,9 @@ def evalUpdate(graph, update, initBindings=None, actionLog=False):
elif u.name == 'Clear':
evalClear(ctx, u)
elif u.name == 'Drop':
evalDrop(ctx, u)
result = evalDrop(ctx, u)
if result:
res.append(result)
elif u.name == 'Create':
evalCreate(ctx, u)
elif u.name == 'Add':
Expand Down
2 changes: 1 addition & 1 deletion quit/web/modules/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def login():
else:
state = session["state"]
logger.debug("request url: {}".format(request.url))
redirect_uri = request.url
redirect_uri = 'http://docker.local/quitstore/login'
authorizeEndpoint = "https://github.com/login/oauth/authorize"
tokenEndpoint = "https://github.com/login/oauth/access_token"

Expand Down
21 changes: 13 additions & 8 deletions quit/web/modules/endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from quit.conf import Feature
from quit import helpers as helpers
from quit.helpers import parse_sparql_request, parse_query_type
from quit.tools.algebra import pprintAlgebra
from quit.web.app import render_template, feature_required
from quit.exceptions import UnSupportedQuery, SparqlProtocolError, NonAbsoluteBaseError
from quit.exceptions import FromNamedError, QuitMergeConflict, RevisionNotFound
Expand Down Expand Up @@ -33,8 +34,8 @@
'application/json']


@endpoint.route("/sparql", defaults={'branch_or_ref': None}, methods=['POST', 'GET'])
@endpoint.route("/sparql/<path:branch_or_ref>", methods=['POST', 'GET'])
@endpoint.route("/sparql", defaults={'branch_or_ref': None}, methods=['POST', 'GET', 'PUT'])
@endpoint.route("/sparql/<path:branch_or_ref>", methods=['POST', 'GET', 'PUT'])
def sparql(branch_or_ref):
"""Process a SPARQL query (Select or Update).

Expand All @@ -51,7 +52,7 @@ def sparql(branch_or_ref):

logger.debug("Request method: {}".format(request.method))

query, type, default_graph, named_graph = parse_sparql_request(request)
query, type, default_graph, named_graph, comment = parse_sparql_request(request)

if query is None:
if request.accept_mimetypes.best_match(['text/html']) == 'text/html':
Expand All @@ -62,22 +63,24 @@ def sparql(branch_or_ref):
'to the SPARQL 1.1 standard', 400)
else:
# TODO allow USING NAMED when fixed in rdflib
if len(named_graph) > 0:
return make_response('FROM NAMED and USING NAMED not supported, yet', 400)

parse_type = getattr(helpers, 'parse_' + type + '_type')

try:
queryType, parsedQuery = parse_type(
query, quit.config.namespace, default_graph, named_graph)
if queryType != 'AskQuery':
print(queryType)
print("query:")
print(query)
except UnSupportedQuery:
return make_response('Unsupported Query', 400)
except NonAbsoluteBaseError:
return make_response('Non absolute Base URI given', 400)
except SparqlProtocolError:
return make_response('Sparql Protocol Error', 400)

if queryType in ['InsertData', 'DeleteData', 'Modify', 'DeleteWhere', 'Load']:
if queryType in ['InsertData', 'DeleteData', 'Modify', 'DeleteWhere', 'Load', 'Drop']:
if branch_or_ref:
commit_id = quit.repository.revision(branch_or_ref).id
else:
Expand Down Expand Up @@ -107,7 +110,8 @@ def sparql(branch_or_ref):
logger.debug("target ref is: {}".format(target_ref))
oid = quit.applyQueryOnCommit(parsedQuery, parent_commit_id, target_ref,
query=query, default_graph=default_graph,
named_graph=named_graph)
named_graph=named_graph, queryType=queryType,
comment=comment)

if resolution_method == "merge":
logger.debug(("going to merge update into {} because it is at {} but {} was "
Expand Down Expand Up @@ -137,7 +141,8 @@ def sparql(branch_or_ref):
try:
oid = quit.applyQueryOnCommit(parsedQuery, branch_or_ref, target_ref,
query=query, default_graph=default_graph,
named_graph=named_graph)
named_graph=named_graph, queryType=queryType,
comment=comment)
response = make_response('', 200)
response.headers["X-CurrentBranch"] = target_head
if oid is not None:
Expand Down
2 changes: 1 addition & 1 deletion quit/web/templates/sparql.html
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
value: '{{ config["defaultQuery"] | safe }}',
sparql: {
showQueryButton: true,
endpoint: '{{ request.url }}'
endpoint: '{{ url_for('endpoint.sparql') }}'
}
});
var yasr = YASR(document.getElementById("yasr"), {
Expand Down