[libcatmandu-rdf-perl] 08/20: Adding a streaming parser
Jonas Smedegaard
dr at jones.dk
Sat Oct 28 03:10:21 UTC 2017
This is an automated email from the git hooks/post-receive script.
js pushed a commit to annotated tag upstream/0.32
in repository libcatmandu-rdf-perl.
commit 199d842fff636410b45ff895d87c3a526643b01b
Author: Patrick Hochstenbach <patrick.hochstenbach at ugent.be>
Date: Sat Jul 29 10:41:20 2017 +0200
Adding a streaming parser
---
META.json | 1 +
cpanfile | 1 +
lib/Catmandu/Importer/RDF.pm | 119 +++++++++++++++++++++++++++++++++++--------
3 files changed, 99 insertions(+), 22 deletions(-)
diff --git a/META.json b/META.json
index aeb8947..d3e6735 100644
--- a/META.json
+++ b/META.json
@@ -38,6 +38,7 @@
"runtime" : {
"requires" : {
"Catmandu" : "0.9209",
+ "Future" : "0.35",
"RDF::LDF" : "0.23",
"RDF::NS" : "20140910",
"RDF::Query" : "2.913",
diff --git a/cpanfile b/cpanfile
index 165175a..a571ff0 100644
--- a/cpanfile
+++ b/cpanfile
@@ -6,6 +6,7 @@ requires 'RDF::NS', '20140910';
requires 'RDF::Query', '2.913';
requires 'RDF::Trine', '1.013';
requires 'RDF::aREF', '0.25';
+requires 'Future', '0.35';
test_requires 'Test::More', '0.99';
test_requires 'Test::LWP::UserAgent', '0.025';
diff --git a/lib/Catmandu/Importer/RDF.pm b/lib/Catmandu/Importer/RDF.pm
index d5b5681..c8dbc2c 100644
--- a/lib/Catmandu/Importer/RDF.pm
+++ b/lib/Catmandu/Importer/RDF.pm
@@ -14,6 +14,8 @@ use RDF::LDF;
use RDF::aREF;
use RDF::aREF::Encoder;
use RDF::NS;
+use IO::Pipe;
+use JSON;
use LWP::UserAgent::CHICaching;
our $VERSION = '0.31';
@@ -146,23 +148,27 @@ sub sparql_generator {
sub rdf_generator {
my ($self) = @_;
sub {
- state $stream = $self->_rdf_stream;
+ state $stream = $self->_aref_stream;
return unless $stream;
my $aref = { };
+
if ($self->triples) {
- if (my $triple = $stream->next) {
- $aref = $self->encoder->triple(
- $triple->subject,
- $triple->predicate,
- $triple->object
- );
- } else {
+ if (my $hashref = $stream->()) {
+ $self->encoder->add_hashref($hashref, $aref);
+ }
+ else {
return ($stream = undef);
}
- } else {
+ }
+ else {
# TODO: include namespace mappings if requested
- $self->encoder->add_hashref( $stream->as_hashref, $aref );
+ while (my $hashref = $stream->()) {
+ $self->encoder->add_hashref(
+ $hashref,
+ $aref
+ );
+ }
if ($self->url) {
$aref->{_url} = $self->url;
@@ -241,31 +247,100 @@ sub _sparql_stream {
}
}
-sub _rdf_stream {
- my ($self) = @_;
-
- my $model = RDF::Trine::Model->new;
- my $parser = $self->type
- ? RDF::Trine::Parser->new( $self->type ) : 'RDF::Trine::Parser';
+# sub _rdf_stream {
+# my ($self) = @_;
+#
+# my $model = RDF::Trine::Model->new;
+# my $parser = $self->type
+# ? RDF::Trine::Parser->new( $self->type ) : 'RDF::Trine::Parser';
+#
+# if ($self->url) {
+# $parser->parse_url_into_model( $self->url, $model );
+# } else {
+# my $from_scalar = (ref $self->file // '') eq 'SCALAR';
+# if (!$self->type and $self->file and !$from_scalar) {
+# $parser = $parser->guess_parser_by_filename($self->file);
+# }
+# if ($from_scalar) {
+# $parser->parse_into_model( $self->base, ${$self->file}, $model );
+# } else {
+# $parser->parse_file_into_model( $self->base, $self->file // $self->fh, $model );
+# }
+# }
+#
+# return $model->as_stream;
+# }
+
+sub _aref_stream {
+ my ($self) = @_;
+
+ my $parser = $self->type
+ ? RDF::Trine::Parser->new( $self->type ) : 'RDF::Trine::Parser';
+
+ my $pipe = IO::Pipe->new();
+
+ if (my $pid = fork()) {
+ # parent
+ $pipe->reader();
+
+ return sub {
+ state $line = <$pipe>;
+
+ return decode_json($line) if defined($line);
+
+ waitpid($pid,0);
+
+ return undef;
+ };
+ }
+ else {
+ # child
+ $pipe->writer();
+
+ my $handler = sub {
+ my $triple = shift;
+
+ my $subject = $triple->subject->value;
+ my $predicate = $triple->predicate->value;
+ my $value = $triple->object->is_literal ? $triple->object->literal_value : $triple->object->uri_value;
+ my $type = $triple->object->type;
+ my $lang = $triple->object->is_literal ? $triple->object->literal_value_language : undef;
+ my $datatype = $triple->object->is_literal ? $triple->object->literal_datatype : undef;
+
+ print $pipe encode_json({
+ $subject => {
+ $predicate => [
+ { type => $type , datatype => $datatype , lang => $lang , value => $value }
+ ]
+ }
+ }) , "\n";
+ };
if ($self->url) {
- $parser->parse_url_into_model( $self->url, $model );
- } else {
+ $parser->parse_url( $self->url, $handler);
+ }
+ else {
my $from_scalar = (ref $self->file // '') eq 'SCALAR';
+
if (!$self->type and $self->file and !$from_scalar) {
$parser = $parser->guess_parser_by_filename($self->file);
}
+
if ($from_scalar) {
- $parser->parse_into_model( $self->base, ${$self->file}, $model );
- } else {
- $parser->parse_file_into_model( $self->base, $self->file // $self->fh, $model );
+ $parser->parse( $self->base, ${$self->file}, $handler );
+ }
+ else {
+ $parser->parse_file( $self->base, $self->file // $self->fh, $handler );
}
}
- return $model->as_stream;
+ exit(0);
+ }
}
+
1;
+
__END__
=head1 NAME
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-perl/packages/libcatmandu-rdf-perl.git
More information about the Pkg-perl-cvs-commits
mailing list