[libcatmandu-rdf-perl] 08/20: Adding a streaming parser

Jonas Smedegaard dr at jones.dk
Sat Oct 28 03:10:21 UTC 2017


This is an automated email from the git hooks/post-receive script.

js pushed a commit to annotated tag upstream/0.32
in repository libcatmandu-rdf-perl.

commit 199d842fff636410b45ff895d87c3a526643b01b
Author: Patrick Hochstenbach <patrick.hochstenbach at ugent.be>
Date:   Sat Jul 29 10:41:20 2017 +0200

    Adding a streaming parser
---
 META.json                    |   1 +
 cpanfile                     |   1 +
 lib/Catmandu/Importer/RDF.pm | 119 +++++++++++++++++++++++++++++++++++--------
 3 files changed, 99 insertions(+), 22 deletions(-)

diff --git a/META.json b/META.json
index aeb8947..d3e6735 100644
--- a/META.json
+++ b/META.json
@@ -38,6 +38,7 @@
       "runtime" : {
          "requires" : {
             "Catmandu" : "0.9209",
+            "Future" : "0.35",
             "RDF::LDF" : "0.23",
             "RDF::NS" : "20140910",
             "RDF::Query" : "2.913",
diff --git a/cpanfile b/cpanfile
index 165175a..a571ff0 100644
--- a/cpanfile
+++ b/cpanfile
@@ -6,6 +6,7 @@ requires 'RDF::NS', '20140910';
 requires 'RDF::Query', '2.913';
 requires 'RDF::Trine', '1.013';
 requires 'RDF::aREF', '0.25';
+requires 'Future', '0.35';
 
 test_requires 'Test::More', '0.99';
 test_requires 'Test::LWP::UserAgent', '0.025';
diff --git a/lib/Catmandu/Importer/RDF.pm b/lib/Catmandu/Importer/RDF.pm
index d5b5681..c8dbc2c 100644
--- a/lib/Catmandu/Importer/RDF.pm
+++ b/lib/Catmandu/Importer/RDF.pm
@@ -14,6 +14,8 @@ use RDF::LDF;
 use RDF::aREF;
 use RDF::aREF::Encoder;
 use RDF::NS;
+use IO::Pipe;
+use JSON;
 use LWP::UserAgent::CHICaching;
 
 our $VERSION = '0.31';
@@ -146,23 +148,27 @@ sub sparql_generator {
 sub rdf_generator {
     my ($self) = @_;
     sub {
-        state $stream = $self->_rdf_stream;
+        state $stream = $self->_aref_stream;
         return unless $stream;
 
         my $aref = { };
+
         if ($self->triples) {
-            if (my $triple = $stream->next) {
-                $aref = $self->encoder->triple(
-                        $triple->subject,
-                        $triple->predicate,
-                        $triple->object
-                );
-            } else {
+            if (my $hashref = $stream->()) {
+                $self->encoder->add_hashref($hashref, $aref);
+            }
+            else {
                 return ($stream = undef);
             }
-        } else {
+        }
+        else {
             # TODO: include namespace mappings if requested
-            $self->encoder->add_hashref( $stream->as_hashref, $aref );
+            while (my $hashref = $stream->()) {
+              $self->encoder->add_hashref(
+                  $hashref,
+                  $aref
+              );
+            }
 
             if ($self->url) {
                 $aref->{_url} = $self->url;
@@ -241,31 +247,100 @@ sub _sparql_stream {
     }
 }
 
-sub _rdf_stream {
-    my ($self) = @_;
-
-    my $model  = RDF::Trine::Model->new;
-    my $parser = $self->type
-               ? RDF::Trine::Parser->new( $self->type ) : 'RDF::Trine::Parser';
+# sub _rdf_stream {
+#     my ($self) = @_;
+#
+#     my $model  = RDF::Trine::Model->new;
+#     my $parser = $self->type
+#                ? RDF::Trine::Parser->new( $self->type ) : 'RDF::Trine::Parser';
+#
+#     if ($self->url) {
+#         $parser->parse_url_into_model( $self->url, $model );
+#     } else {
+#         my $from_scalar = (ref $self->file // '') eq 'SCALAR';
+#         if (!$self->type and $self->file and !$from_scalar) {
+#             $parser = $parser->guess_parser_by_filename($self->file);
+#         }
+#         if ($from_scalar) {
+#             $parser->parse_into_model( $self->base, ${$self->file}, $model );
+#         } else {
+#             $parser->parse_file_into_model( $self->base, $self->file // $self->fh, $model );
+#         }
+#     }
+#
+#     return $model->as_stream;
+# }
+
+sub _aref_stream {
+  my ($self) = @_;
+
+  my $parser = $self->type
+             ? RDF::Trine::Parser->new( $self->type ) : 'RDF::Trine::Parser';
+
+  my $pipe = IO::Pipe->new();
+
+  if (my $pid = fork()) {
+    # parent
+    $pipe->reader();
+
+    return sub {
+      state $line = <$pipe>;
+
+      return decode_json($line) if defined($line);
+
+      waitpid($pid,0);
+
+      return undef;
+    };
+  }
+  else {
+    # child
+    $pipe->writer();
+
+    my $handler = sub {
+        my $triple = shift;
+
+        my $subject   = $triple->subject->value;
+        my $predicate = $triple->predicate->value;
+        my $value     = $triple->object->is_literal ? $triple->object->literal_value : $triple->object->uri_value;
+        my $type      = $triple->object->type;
+        my $lang      = $triple->object->is_literal ? $triple->object->literal_value_language : undef;
+        my $datatype  = $triple->object->is_literal ? $triple->object->literal_datatype : undef;
+
+        print $pipe encode_json({
+            $subject => {
+              $predicate => [
+                { type => $type , datatype => $datatype , lang => $lang , value => $value }
+              ]
+            }
+        }) , "\n";
+    };
 
     if ($self->url) {
-        $parser->parse_url_into_model( $self->url, $model );
-    } else {
+        $parser->parse_url( $self->url, $handler);
+    }
+    else {
         my $from_scalar = (ref $self->file // '') eq 'SCALAR';
+
         if (!$self->type and $self->file and !$from_scalar) {
             $parser = $parser->guess_parser_by_filename($self->file);
         }
+
         if ($from_scalar) {
-            $parser->parse_into_model( $self->base, ${$self->file}, $model );
-        } else {
-            $parser->parse_file_into_model( $self->base, $self->file // $self->fh, $model );
+            $parser->parse( $self->base, ${$self->file}, $handler );
+        }
+        else {
+            $parser->parse_file( $self->base, $self->file // $self->fh, $handler );
         }
     }
 
-    return $model->as_stream;
+    exit(0);
+  }
 }
 
+
 1;
+
 __END__
 
 =head1 NAME

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-perl/packages/libcatmandu-rdf-perl.git



More information about the Pkg-perl-cvs-commits mailing list