from IPython.display import display, Markdown, Latex
import pandas as pd
import textwrap
import re
import os
import pydot
from io import BytesIO
import IPython
from cognipy.interop import cognipy_create, cognipy_delete, cognipy_call
[docs]def CQL(cql, ns='http://www.cognitum.eu/onto#'):
"""Converts CQL query into pure SPARQL by replacing CNL names into their IRI representations
Args:
cql (str): cql string
ns (str): namespace for CNL names into IRI expansion
"""
def CONC(str):
return '<'+ns + str.split('-')[0]+"".join([x.title() for x in str.split('-')[1:]])+">"
def my_replace(match):
match = match.group()
return CONC(match[1:-1])
return re.sub(r'\<[^\>:/]+\>', my_replace, cql)
[docs]def encode_string_for_graph_label(val):
"""Encodes reserved graphviz characters"""
return val.replace('{', '{').replace('|', '|').replace('}', '}').replace('<', '<').replace('>', '>')
[docs]def basic_backquote_string_evaluator(val):
"""The basic evaluator for backquoted strings"""
return eval(val,globals(),locals())
[docs]class Ontology:
"""
A class used to represent an ontology.
This is the main entry point for the cognipy package.
You can create many ontology objects and use them at the same time.
Args:
source (str):
'cnl/file' - local cnl file *.encnl
'cnl/string' - arg is a string ontology in a cnl format
'rdf/uri' - uri to OWL/RDF or RDF/XML file
'rdf/string' - arf is a string ontology in OWL/RDF or RDF/XML format
arg (str): path/string/uri
verbose (bool): should the content of the ontology be displayed
evaluator (function): a function that is used to evaluate string values within
the ontology that are embrased with backquotes i.e.:`...`.
It enables encoding complex structures within the ontology.
Default = None - no backqueted string evaluation is performed.
graph_attribute_formatter (function) : a function that is used to format
an attribute value when diagram is rendered
stop_on_error(bool): if True (default) the method with throw an error if any
occured during the ontology loading process.
"""
def _resolve_str(self, v):
if(isinstance(v, str) and len(v) > 0 and v[0] == '`' and v[-1] == '`'):
return self._evaluator(v[1:-1])
else:
return v
def _resolve_value(self, col):
if(isinstance(col, list)):
return list([self._resolve_value(y) for y in col])
elif(isinstance(col, set)):
return set({self._resolve_value(y) for y in col})
elif(isinstance(col, dict)):
return dict({x: self._resolve_value(y) for x, y in col.items()})
else:
return self._resolve_str(col)
def __init__(self, source, arg, verbose=False, evaluator=None, graph_attribute_formatter=default_graph_attribute_formatter, stop_on_error=True):
loadAnnotations = True
passParamsAsCnl = True
modalCheck = True
materialized = True
self._verbose = verbose
self._evaluator = evaluator
self._materialized = materialized
self._graph_attribute_formatter = graph_attribute_formatter
self._uid = cognipy_create()
if source == "cnl/file":
cognipy_call(self._uid, "LoadCnl", arg, loadAnnotations,
modalCheck, passParamsAsCnl, stop_on_error)
elif source == "cnl/string":
cognipy_call(self._uid, "LoadCnlFromString", arg,
loadAnnotations, modalCheck, passParamsAsCnl, stop_on_error)
elif source == "rdf/uri":
cognipy_call(self._uid, "LoadRdf", arg, loadAnnotations,
modalCheck, passParamsAsCnl, stop_on_error)
elif source == "rdf/string":
cognipy_call(self._uid, "LoadRdfFromString", arg,
loadAnnotations, modalCheck, passParamsAsCnl, stop_on_error)
elif source == "rdf/file":
with open(arg,"rt") as f:
cognipy_call(self._uid, "LoadRdfFromString", f.read(),
loadAnnotations, modalCheck, passParamsAsCnl, stop_on_error)
else:
raise ValueError("Invalid source parameter")
if self._verbose:
cnl = self.as_cnl()
markdown = self.highlight(cnl).replace('**a**', 'a').replace('**X**', 'X').replace(
'**Y**', 'Y').replace('\r\n', '<br>').replace("<br><br>", "<br>")
display(Markdown(markdown))
[docs] def get_load_error(self):
"""Returns the last error that happened during ontology loading."""
return cognipy_call(self._uid, "GetLoadError")
def __del__(self):
cognipy_delete(self._uid)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, exc_traceback):
self.__del__()
[docs] def as_rdf(self, conclusions=False):
"""Returns the content of the ontology in OWL/RDF format."""
return cognipy_call(self._uid, "ToRDF", conclusions)
[docs] def as_cnl(self, conclusions=False):
"""Returns the content of the ontology as CNL."""
return cognipy_call(self._uid, "ToCNL", conclusions, True)
[docs] def sub_concepts_of(self, cnl, direct=False):
"""Get all the sub-concepts of the given concept specification
Args:
cnl (str): the cnl expression that evaluates to the concept definition
Returns:
Pandas DataFrame containing all the sub-concepts of the given concept expression
"""
return cognipy_call(self._uid, "GetSubConceptsOf", cnl, direct)
[docs] def sparql_query_for_instances(self, cnl):
"""Converts CNL concept definition into corresponding SPARQL query
Args:
cnl (str): the cnl expression that evaluates to the concept definition
Returns:
str: SPARQL query
"""
return cognipy_call(self._uid, "SelectInstancesSPARQL", cnl, False)
[docs] def super_concepts_of(self, cnl, direct=False):
"""Get all the super-concepts of the given concept specification
Args:
cnl (str): the cnl expression that evaluates to the concept definition
Returns:
Pandas DataFrame congaing all the super-concepts of the given concept expression
"""
return cognipy_call(self._uid, "GetSuperConceptsOf", cnl, direct)
[docs] def instances_of(self, cnl, direct=False):
"""Get list of all the instances of the given concept specification
Args:
cnl (str): the cnl expression that evaluates to the concept definition
direct (bool): if True, only the direct instances of the given concept
specification will be returned
Returns:
List of all the instances of the given concept expression
"""
return cognipy_call(self._uid, "GetInstancesOf", cnl, direct)
def _to_pandas(self, vals, cols):
if self._evaluator is not None:
vals = [[self._resolve_value(item)
for item in row] for row in vals]
return pd.DataFrame(vals, columns=cols)
[docs] def select_instances_of(self, cnl):
"""Get all the instances of the given concept specification
Args:
cnl (str): the cnl expression that evaluates to the concept definition
Returns:
Pandas DataFrame containing all the instances of the given concept expression together with all their attributes and relations
"""
val = cognipy_call(
self._uid, "SparqlQueryForInstancesWithDetails", cnl)
return self._to_pandas(val["Item2"], val["Item1"])
[docs] def autocomplete(self, str):
"""Gives the autocompletion lists for a given string
Args:
cnl (str): the partially defined cnl expression
Returns:
List[str]: autocompletions
"""
return list(cognipy_call(self._uid, "AutoComplete", str))
[docs] def highlight(self, cnl):
"""Gives the Markdown of the given cnl
Args:
cnl (str): the cnl string
Returns:
str: highlighted cnl string
"""
return cognipy_call(self._uid, "Highlight", cnl)
[docs] def insert_abox_cnl(self, cnl):
"""Inserts new knowledge into the ontology.
Only A-Box is accepted here.
Args:
cnl (str): the cnl string
"""
cognipy_call(self._uid, "KnowledgeInsert", cnl, True, True)
if self._verbose:
markdown = self.highlight(cnl).replace('**a**', 'a').replace('**X**', 'X').replace(
'**Y**', 'Y').replace('\r\n', '<br>').replace("<br><br>", "<br>")
display(Markdown(markdown))
[docs] def delete_abox_cnl(self, cnl):
"""Deletes the specified knowledge from the A-Box ontology
Only A-Box is accepted here. The exception will be thrown if T-Box is given
Args:
cnl (str): the cnl string
"""
cognipy_call(self._uid, "KnowledgeDelete", cnl, True)
if self._verbose:
markdown = self.highlight(cnl).replace('**a**', 'a').replace('**X**', 'X').replace(
'**Y**', 'Y').replace('\r\n', '<br>').replace("<br><br>", "<br>")
display(Markdown(markdown))
[docs] def delete_abox_instance(self, inst):
"""Deletes the specified instance from the A-Box of the ontology including all the connections
it has to other instances and concepts. If the instance is involved in the T-Box definition the
exception will be thrown.
Args:
inst (str): the cnl name of the instance.
"""
cognipy_call(self._uid, "RemoveInstance", inst)
if self._verbose:
markdown = "("+inst+")"
display(Markdown(markdown))
[docs] def sparql_query(self, query, asCNL=True, column_names=None):
"""Executes the SPARQL query
Args:
query: the SPARQL query. YOu can directly use prefixes like: [rdf:,rdfs,owl:]
asCNL : should the result names be automatically converted back to their CNL representation (default)
of they should remain being rdf identifiers.
column_names : (default=None). List of column names for the returned dataframe. If None, the names of the variables are used.
Returns:
Pandas DataFrame containing all the results of the query
"""
val = cognipy_call(self._uid, "SparqlQuery", query, True, asCNL)
return self._to_pandas(val["Item2"], val["Item1"] if column_names is None else column_names)
[docs] def why(self, cnl):
"""Explains why
Args:
cnl (str): the cnl string
"""
return cognipy_call(self._uid, "Why", cnl, True)
[docs] def reasoningInfo(self):
self.super_concepts_of("a thing")
return cognipy_call(self._uid, "GetReasoningInfo")
[docs] def create_graph(self, layout="hierarchical",
show={"subsumptions", "types", "relations", "attributes"},
include={}, exclude={},
constrains=[],
format="svg",
filename=None, fontname=None, fontsize=11):
"""Creates the ontology diagram as an image
Args:
layout(str): type of layout (one of: "hierarchical"(default), "force directed")
show(set(str)): one or more of strings {"subsumptions","types","relations","attributes"}
include(set(str)): one or more names of entities that must be included (even if show argument says no)
exclude(set(str)): one or more names of entities that must be excluded (even if show argument says yes)
constrains(list(str)): one or more complex concept expressions that constrain the list of instances to be displayed
this allows to create graphs that are focusing on specific instance. Multiple constrains
are joined using OR expression
format(str): format of the output image(one of: "svg"(default) ,"png")
filename(str): if None then output is returned, otherwize the output is written under the filename on the disk
fontname(str): if None then default font is used, otherwize the fontname is the name of the typeface to be used
fontsize(int): size of the font
"""
showCnc = "subsumptions" in show
showInst = "types" in show
showRels = "relations" in show
showVals = "attributes" in show
SubsumptionEdgeColor = "black"
RelationEdgeColor = "black"
ConceptColor = "aliceblue"
InstanceColor = "whitesmoke"
def addEdge(graph, frm, to):
n = graph.get_node(frm)
if len(n) == 0:
frmN = pydot.Node(frm)
graph.add_node(frmN)
else:
frmN = n[0]
n = graph.get_node(to)
if len(graph.get_node(to)) == 0:
toN = pydot.Node(to)
graph.add_node(toN)
else:
toN = n[0]
e = pydot.Edge(to, frm)
graph.add_edge(e)
e.set_penwidth(0.5)
if fontname is not None:
e.set_fontname(fontname)
e.set_fontsize(fontsize)
if fontname is not None:
frmN.set_fontname(fontname)
frmN.set_fontsize(fontsize)
frmN.set_shape('Mrecord')
frmN.set_penwidth(0.5)
if fontname is not None:
toN.set_fontname(fontname)
toN.set_fontsize(fontsize)
toN.set_shape('Mrecord')
toN.set_penwidth(0.5)
return frmN, e, toN
def addNode(graph, frm):
n = graph.get_node(frm)
if len(n) == 0:
frmN = pydot.Node(frm)
graph.add_node(frmN)
else:
frmN = n[0]
if fontname is not None:
frmN.set_fontname(fontname)
frmN.set_fontsize(fontsize)
frmN.set_shape('Mrecord')
frmN.set_penwidth(0.5)
return frmN
def addInstanceOf(graph, frm, to):
frmN, e, toN = addEdge(graph, frm, to)
e.set_dir("back")
e.set_arrowtail("empty")
e.set_style("dashed")
e.set_color(SubsumptionEdgeColor)
toN.set_style('filled')
toN.set_fillcolor(ConceptColor)
frmN.set_style('filled')
frmN.set_fillcolor(InstanceColor)
def addSubsumption(graph, frm, to):
frmN, e, toN = addEdge(graph, frm, to)
e.set_dir("back")
e.set_arrowtail("empty")
e.set_style("solid")
e.set_color(SubsumptionEdgeColor)
toN.set_style('filled')
toN.set_fillcolor(ConceptColor)
frmN.set_style('filled')
frmN.set_fillcolor(ConceptColor)
def addLabel(labels, frmN, frm, str):
if(frm not in labels.keys()):
lbl = "{"+frm+"}"
else:
lbl = labels[frm]
lbl = lbl[:-1]+"|{"+str+"}}"
frmN.set_label(lbl)
labels[frm] = lbl
def addAssertion(graph, labels, frm, to, etx):
frmN = addNode(graph, frm)
toN = addNode(graph, to)
e = pydot.Edge(frmN, toN)
graph.add_edge(e)
e.set_penwidth(0.5)
if fontname is not None:
e.set_fontname(fontname)
e.set_fontsize(fontsize)
e.set_dir("forward")
e.set_arrowhead("open")
e.set_style("solid")
e.set_color(RelationEdgeColor)
e.set_label(etx)
toN.set_style('filled')
toN.set_fillcolor(InstanceColor)
frmN.set_style('filled')
frmN.set_fillcolor(InstanceColor)
def addDataValue(graph, labels, frm, val, etx):
frmN = addNode(graph, frm)
frmN.set_style('filled')
frmN.set_fillcolor(InstanceColor)
addLabel(labels, frmN, frm, etx+"|"+val)
graph = pydot.Dot(graph_type='graph')
negsubsumptions = set([])
negtypes = set([])
subsumptions = set([])
instances = set([])
relations = set([])
datavalues = set([])
labels = {}
def canAdd(shw, frm, to):
return (shw and (frm not in exclude) and (to not in exclude)) or (not shw and (frm not in exclude) and (to not in exclude) and ((frm in include) or (to in include)))
def canAddR(shw, frm, r, to):
return (shw and (frm not in exclude) and (r not in exclude) and (to not in exclude)) or (not shw and (frm not in exclude) and (r not in exclude) and (to not in exclude) and ((frm in include) or (r in include) or (to in include)))
def constrainsUnionSparql(constrains, v):
if len(constrains) == 0:
return ""
def singSpqr(x):
q = self.sparql_query_for_instances(x)
q = q[q.find("WHERE")+7:]
q = q[:len(q)-1]
q = q.replace("?x0", v).replace("?z0", v)
return q
return "{SELECT "+v+" {{"+str.join("} UNION {", [singSpqr(x) for x in constrains])+"}}}"
SEP='\1'
if showCnc or len(include) > 0:
negres = self.sparql_query(
"select ?x ?y ?z {?x rdfs:subClassOf ?y. ?y rdfs:subClassOf ?z. ?x rdf:type owl:Class. ?y rdf:type owl:Class. ?z rdf:type owl:Class. filter (?x!=?y && ?y !=?z && ?x!=?z)}")
for frm, mid, to in negres.values:
if frm not in exclude and to not in exclude and mid not in exclude:
if not frm+SEP+to in negsubsumptions:
negsubsumptions.add(frm+SEP+to)
res = self.sparql_query(
"select ?x ?y {?x rdfs:subClassOf ?y. ?x rdf:type owl:Class. ?y rdf:type owl:Class. filter (?x!=?y)}")
for frm, to in res.values:
if canAdd(showCnc, frm, to):
if not frm+SEP+to in negsubsumptions:
if not frm+SEP+to in subsumptions:
subsumptions.add(frm+SEP+to)
addSubsumption(graph, frm, to)
xcspq = constrainsUnionSparql(constrains, "?x")
if showInst or len(include) > 0:
negres = self.sparql_query(
"select ?x ?y ?z {?x rdf:type ?y. ?y rdfs:subClassOf ?z. ?x rdf:type owl:NamedIndividual. ?y rdf:type owl:Class. ?z rdf:type owl:Class. filter (?x!=?y && ?y !=?z && ?x!=?z)."+xcspq+" }")
for frm, mid, to in negres.values:
if frm not in exclude and to not in exclude and mid not in exclude:
if not frm+SEP+to in negtypes:
negtypes.add(frm+SEP+to)
res = self.sparql_query(
"select ?x ?y {?x rdf:type ?y. ?x rdf:type owl:NamedIndividual. ?y rdf:type owl:Class."+xcspq+"}")
for frm, to in res.values:
if canAdd(showInst, frm, to):
if not frm+SEP+to in negtypes:
if not frm+SEP+to in instances:
instances.add(frm+SEP+to)
addInstanceOf(graph, frm, to)
if showRels or len(include) > 0:
res = self.sparql_query(
"select ?x ?y ?r {?x ?r ?y. ?x rdf:type owl:NamedIndividual. ?y rdf:type owl:NamedIndividual. ?r rdf:type owl:ObjectProperty. "+xcspq+"}")
for frm, to, rel in res.values:
if canAddR(showRels, frm, rel, to):
if not frm+SEP+rel+SEP+to in relations:
relations.add(frm+SEP+rel+SEP+to)
addAssertion(graph, labels, frm, to, rel)
if showVals or len(include) > 0:
res = self.sparql_query(
"select ?x ?y ?r {?x ?r ?y. ?x rdf:type owl:NamedIndividual. ?r rdf:type owl:DatatypeProperty. "+xcspq+"}")
datavaldic = {}
for frm, to, rel in res.values:
if canAdd(showVals, frm, rel):
to = self._graph_attribute_formatter(to)
if not frm+SEP+rel+SEP+to in datavalues:
datavalues.add(frm+SEP+rel+SEP+to)
if(frm+SEP+rel in datavaldic.keys()):
datavaldic[frm+SEP +
rel] = datavaldic[frm+SEP+rel]+"|"+to
else:
datavaldic[frm+SEP+rel] = to
for k in datavaldic.keys():
try:
frm, rel = k.split(SEP)
addDataValue(graph, labels, frm, "{"+datavaldic[k]+"}", rel)
except:
print(k)
raise
graph.set_K("1")
if layout == "hierarchical":
graph.set_layout("dot")
elif layout == "force directed":
graph.set_layout("fdp")
else:
raise ValueError("unknown layout type")
graph.set_splines("true")
if filename is None:
if format == "svg":
return graph.create_svg(prog='dot',encoding='utf8')
elif format == "png":
return graph.create_png(prog='dot',encoding='utf8')
else:
raise ValueError("unknown image format")
else:
if format == "svg":
return graph.write_svg(filename, prog='dot',encoding='utf8')
elif format == "png":
return graph.write_png(filename, prog='dot',encoding='utf8')
else:
raise ValueError("unknown image format")
[docs] def draw_graph(self, layout="hierarchical", show={"subsumptions", "types", "relations", "attributes"}, include={}, exclude={}, constrains=[], fontname=None, fontsize=11):
"""Draws the ontology
Args:
layout(str): type of layout (one of: "hierarchical"(default), "force directed")
show(set(str)): one or more of strings {"subsumptions","types","relations","attributes"}
include(set(str)): one or more names of entities that must be included (even if show argument says no)
exclude(set(str)): one or more names of entities that must be excluded (even if show argument says yes)
constrains(list(str)): one or more complex concept expressions that constrain the list of instances to be displayed
this allows to create graphs that are focusing on specific instance. Multiple constrains
are joined using OR expression
fontname(str): if None then default font is used, otherwize the fontname is the name of the typeface to be used
fontsize(int): size of the font
"""
return IPython.display.SVG(data=self.create_graph(layout, show, include, exclude, constrains, fontname=fontname, fontsize=fontsize))
[docs]class ABoxBatch:
"""
A class used to create batch for A-Box manipulations on the ontology
"""
def __init__(self):
self._rb = []
self._insts = []
[docs] def has_type(self, inst, cls):
"""[inst] is [cls]. has-type relationship.
Args:
inst(str): instance name
cls(str): concept name
"""
self._rb.append("type")
self._rb.append(inst)
self._rb.append("")
self._rb.append(cls)
return self
[docs] def same_as(self, inst, inst2):
"""[inst] is [inst2]. same-as relationship.
Args:
inst,inst2(str): instance names
"""
self._rb.append("==")
self._rb.append(inst)
self._rb.append("")
self._rb.append(inst2)
return self
[docs] def different_from(self, inst, inst2):
"""[inst] is not [inst2]. different-from relationship.
Args:
inst,inst2(str): instance names
"""
self._rb.append("!=")
self._rb.append(inst)
self._rb.append("")
self._rb.append(inst2)
return self
[docs] def relates(self, inst, prop, inst2):
"""[inst] [prop] [inst2]. object relationship.
Args:
inst,inst2(str): instance names
prop(str): property name
"""
self._rb.append("R")
self._rb.append(inst)
self._rb.append(prop)
self._rb.append(inst2)
return self
[docs] def value(self, inst, prop, v):
"""[inst] [prop] [v]. attribute.
Args:
inst(str): instance names
prop(str): property name
v(int,float,bool,str): an value for the instance property
"""
def tos(v):
if(isinstance(v, int)):
return "I:"+str(v)
elif(isinstance(v, float)):
return "F:"+str(v)
elif(isinstance(v, bool)):
return "B:"+"[1]" if v else "[0]"
else:
return "S:'"+str(v)+"'"
self._rb.append("D")
self._rb.append(inst)
self._rb.append(prop)
self._rb.append(tos(v))
return self
[docs] def delete_instance(self, inst):
"""the instance to be deleted. Used only if delete is called
Args:
inst(str): instance name
"""
self._insts.append(inst)
[docs] def insert(self, onto):
"""Inserts the current A-Box into the specific ontology
Args:
onto(Ontogy): ontology to be modified
"""
cognipy_call(onto._uid, "AssertionsInsert", self._rb)
if onto._verbose:
markdown = ''
for i in range(0, len(self._rb), 4):
if self._rb[i] == "type":
markdown += self._rb[i+3]+"("+self._rb[i+1]+")"
elif self._rb[i] == "==":
markdown += self._rb[i+1]+"=="+self._rb[i+3]
elif self._rb[i] == "!=":
markdown += self._rb[i+1]+"!="+self._rb[i+3]
elif self._rb[i] == "R":
markdown += self._rb[i+2] + \
"("+self._rb[i+1]+","+self._rb[i+3]+")"
else:
markdown += self._rb[i+2] + \
"("+self._rb[i+1]+","+self._rb[i+3]+")"
markdown += "<br>"
markdown += ""
display(Markdown(markdown))
[docs] def delete(self, onto):
"""Deletes the current A-Box from the specific ontology
Args:
onto(Ontogy): ontology to be modified
"""
cognipy_call(onto._uid, "AssertionsDelete", self._rb)
for inst in self._insts:
cognipy_call(onto._uid, "RemoveInstance", inst)
if onto._verbose:
markdown = ''
for i in range(0, len(self._rb), 4):
if self._rb[i] == "type":
markdown += self._rb[i+3]+"("+self._rb[i+1]+")"
elif self._rb[i] == "==":
markdown += self._rb[i+1]+"=="+self._rb[i+3]
elif self._rb[i] == "!=":
markdown += self._rb[i+1]+"!="+self._rb[i+3]
elif self._rb[i] == "R":
markdown += self._rb[i+2] + \
"("+self._rb[i+1]+","+self._rb[i+3]+")"
else:
markdown += self._rb[i+2] + \
"("+self._rb[i+1]+","+self._rb[i+3]+")"
markdown += "<br>"
for inst in self._insts:
markdown += "*("+inst+")"
markdown += "<br>"
markdown += ""
display(Markdown(markdown))