Source code for skosprovider_rdf.providers

'''
This module contains an RDFProvider, an implementation of the
:class:`skosprovider.providers.VocabularyProvider` interface that uses a
:class:`rdflib.graph.Graph` as input.
'''

import logging

import rdflib
from language_tags import tags
from rdflib.namespace import DC
from rdflib.namespace import DCTERMS
from rdflib.namespace import RDF
from rdflib.namespace import SKOS
from rdflib.term import URIRef
from skosprovider.providers import MemoryProvider
from skosprovider.skos import Collection
from skosprovider.skos import Concept
from skosprovider.skos import ConceptScheme
from skosprovider.skos import Label
from skosprovider.skos import Note
from skosprovider.skos import Source
from skosprovider.uri import DefaultConceptSchemeUrnGenerator

from skosprovider_rdf.utils import text_

log = logging.getLogger(__name__)

SKOS_THES = rdflib.Namespace('http://purl.org/iso25964/skos-thes#')


[docs]class RDFProvider(MemoryProvider): ''' Should the provider only take concepts into account explicitly linked to the conceptscheme? ''' check_in_scheme = False ''' A simple vocabulary provider that use an :class:`rdflib.graph.Graph` as input. The provider expects a RDF graph with elements that represent the SKOS concepts and collections. Please be aware that this provider needs to load the entire graph in memory. ''' def __init__(self, metadata, graph, **kwargs): self.graph = graph self.check_in_scheme = False if not 'concept_scheme' in kwargs: kwargs['concept_scheme'] = self._cs_from_graph(metadata, **kwargs) else: self.check_in_scheme = True super().__init__(metadata, [], **kwargs) self.list = self._from_graph() def _cs_from_graph(self, metadata, **kwargs): cslist = [] for sub in self.graph.subjects(RDF.type, SKOS.ConceptScheme): uri = self.to_text(sub) cs = ConceptScheme( uri=uri, labels=self._create_from_subject_typelist( sub, self._scrub_label_types()), notes=self._create_from_subject_typelist( sub, Note.valid_types), sources=self._create_sources(sub), languages=self._create_languages(sub) ) cslist.append(cs) if len(cslist) == 0: return ConceptScheme( uri=DefaultConceptSchemeUrnGenerator().generate( id=metadata.get('id') ) ) elif len(cslist) == 1: return cslist[0] else: if not 'concept_scheme_uri' in kwargs: raise RuntimeError( 'This RDF file contains more than one ConceptScheme. \ Please specify one. The following schemes were found: \ %s' % (", ".join([str(cs.uri) for cs in cslist])) ) else: self.check_in_scheme = True csuri = kwargs['concept_scheme_uri'] filteredcslist = [cs for cs in cslist if cs.uri == csuri] if len(filteredcslist) == 0: raise RuntimeError( 'This RDF file contains more than one ConceptScheme. \ You specified an unexisting one. The following schemes \ were found: %s' % (", ".join([str(cs.uri) for cs in cslist])) ) else: return filteredcslist[0] def _from_graph(self): clist = [] for sub, pred, obj in self.graph.triples((None, RDF.type, SKOS.Concept)): if self.check_in_scheme and self._get_in_scheme(sub) != self.concept_scheme.uri: continue uri = self.to_text(sub) matches = {} for k in Concept.matchtypes: matches[k] = self._create_from_subject_predicate( sub, URIRef(SKOS[k + 'Match'])) con = Concept( id=self._get_id_for_subject(sub, uri), uri=uri, concept_scheme=self.concept_scheme, labels=self._create_from_subject_typelist( sub, self._scrub_label_types()), notes=self._create_from_subject_typelist( sub, Note.valid_types), sources=self._create_sources(sub), broader=self._create_from_subject_predicate(sub, SKOS.broader), narrower=self._create_from_subject_predicate( sub, SKOS.narrower), related=self._create_from_subject_predicate(sub, SKOS.related), member_of=[], subordinate_arrays=self._create_from_subject_predicate( sub, SKOS_THES.subordinateArray), matches=matches ) clist.append(con) for sub, pred, obj in self.graph.triples((None, RDF.type, SKOS.Collection)): if self.check_in_scheme and self._get_in_scheme(sub) != self.concept_scheme.uri: continue uri = self.to_text(sub) col = Collection( id=self._get_id_for_subject(sub, uri), uri=uri, concept_scheme=self.concept_scheme, labels=self._create_from_subject_typelist( sub, self._scrub_label_types()), notes=self._create_from_subject_typelist( sub, (Note.valid_types)), sources=self._create_sources(sub), members=self._create_from_subject_predicate(sub, SKOS.member), member_of=[], superordinates=self._create_from_subject_predicate( sub, SKOS_THES.superOrdinate) ) clist.append(col) self._fill_member_of(clist) self._set_infer_concept_relations(clist) return clist def _get_in_scheme(self, subject): ''' Determine if a subject is part of a scheme. :param subject: Subject to get the sources for. :returns: A URI for the scheme a subject is part of or None if it's not part of a scheme. ''' scheme = None scheme = self.graph.value(subject, SKOS.inScheme) if not scheme: scheme = self.graph.value(subject, SKOS.topConceptOf) return self.to_text(scheme) if scheme else None def _fill_member_of(self, clist): collections = list({c for c in clist if isinstance(c, Collection)}) for col in collections: for c in clist: if c.id in col.members: c.member_of.append(col.id) return def _set_infer_concept_relations(self, clist): collections = list({c for c in clist if isinstance(c, Collection)}) for col in collections: if not col.superordinates: col.infer_concept_relations = False continue def _collect_broader(collection, clist): ''' Collect all broader concepts of members of a collection or their (recursive) members. ''' members = list( {c for c in clist if c.id in collection.members}) broader = [] for m in members: if m.type == 'concept': broader.extend(m.broader) elif m.type == 'collection': broader.extend(_collect_broader(m, clist)) return broader broader = _collect_broader(col, clist) col.infer_concept_relations = len( set(broader).intersection(col.superordinates)) > 0 def _create_from_subject_typelist(self, subject, typelist): list = [] for p in typelist: term = SKOS.__getitem__(p) list.extend(self._create_from_subject_predicate(subject, term)) return list def _get_id_for_subject(self, subject, uri): if (subject, DCTERMS.identifier, None) in self.graph: return self.to_text(self.graph.value(subject=subject, predicate=DCTERMS.identifier, any=False)) elif (subject, DC.identifier, None) in self.graph: return self.to_text(self.graph.value(subject=subject, predicate=DC.identifier, any=False)) else: return uri def _create_from_subject_predicate(self, subject, predicate): list = [] for s, p, o in self.graph.triples((subject, predicate, None)): type = predicate.split('#')[-1] if Label.is_valid_type(type): o = self._create_label(o, type) elif Note.is_valid_type(type): o = self._create_note(o, type) else: o = self._get_id_for_subject(o, self.to_text(o)) list.append(o) return list def _create_label(self, literal, type): if not Label.is_valid_type(type): raise ValueError( 'Type of Label is not valid.' ) return Label(self.to_text(literal), type, self._get_language_from_literal(literal)) def _read_markupped_literal(self, literal): if literal.datatype == RDF.HTML: df = literal.value.cloneNode(True) if df.firstChild and df.firstChild.attributes and 'xml:lang' in df.firstChild.attributes.keys(): lang = self._scrub_language( df.firstChild.attributes.get('xml:lang').value) del df.firstChild.attributes['xml:lang'] else: lang = 'und' return(df.toxml(), lang, 'HTML') else: return (literal, self._get_language_from_literal(literal), None) def _create_note(self, literal, type): if not Note.is_valid_type(type): raise ValueError( 'Type of Note is not valid.' ) l = self._read_markupped_literal(literal) return Note(self.to_text(l[0]), type, l[1], l[2]) def _create_sources(self, subject): ''' Create the sources for this subject. :param subject: Subject to get the sources for. :returns: A :class:`list` of :class:`skosprovider.skos.Source` objects. ''' ret = [] for s, p, o in self.graph.triples((subject, DCTERMS.source, None)): for si, pi, oi in self.graph.triples((o, DCTERMS.bibliographicCitation, None)): ret.append( Source( self.to_text(oi), 'HTML' if oi.datatype == RDF.HTML else None ) ) return ret def _create_languages(self, subject): ''' Create the languages for this subject. :param subject: Subject to get the sources for. :returns: A :class:`list` of IANA language tags. ''' ret = set() for s, p, o in self.graph.triples((subject, DCTERMS.language, None)): ret.add(self.to_text(self._scrub_language(o))) for s, p, o in self.graph.triples((subject, DC.language, None)): ret.add(self.to_text(self._scrub_language(o))) return ret def _scrub_language(self, language): if tags.check(language): return language else: log.warning( 'Encountered an invalid language %s. Falling back to "und".' % language) return 'und' def _scrub_label_types(self): valid_label_types = Label.valid_types[:] if 'sortLabel' in valid_label_types: valid_label_types.remove('sortLabel') return valid_label_types def _get_language_from_literal(self, data): if not hasattr(data, 'language') or data.language is None: return None return self.to_text(self._scrub_language(data.language))
[docs] def to_text(self, data): """ data of binary type or literal type that needs to be converted to text. :param data :return: text representation of the data """ return text_(data.encode('utf-8'), 'utf-8')