[DRE-commits] [ruby-delayer-deferred] 01/08: New upstream version 2.0.0
Daisuke Higuchi
dai at moszumanska.debian.org
Sun Aug 20 14:07:56 UTC 2017
This is an automated email from the git hooks/post-receive script.
dai pushed a commit to branch exp/debian
in repository ruby-delayer-deferred.
commit afbe41e75bfdd98ea32187a0812a1b9d044ab8e9
Author: HIGUCHI Daisuke (VDR dai) <dai at debian.org>
Date: Sun Aug 20 22:43:35 2017 +0900
New upstream version 2.0.0
---
README.md | 80 +++++++++-
delayer-deferred.gemspec | 1 +
lib/delayer/deferred.rb | 21 +--
lib/delayer/deferred/chain.rb | 10 ++
lib/delayer/deferred/chain/await.rb | 43 ++++++
lib/delayer/deferred/chain/base.rb | 35 +++++
lib/delayer/deferred/chain/next.rb | 16 ++
lib/delayer/deferred/chain/trap.rb | 16 ++
lib/delayer/deferred/deferred.rb | 76 +---------
lib/delayer/deferred/deferredable.rb | 166 +--------------------
lib/delayer/deferred/deferredable/awaitable.rb | 27 ++++
lib/delayer/deferred/deferredable/chainable.rb | 155 +++++++++++++++++++
lib/delayer/deferred/deferredable/graph.rb | 118 +++++++++++++++
lib/delayer/deferred/deferredable/node_sequence.rb | 158 ++++++++++++++++++++
lib/delayer/deferred/deferredable/trigger.rb | 34 +++++
lib/delayer/deferred/enumerator.rb | 12 +-
lib/delayer/deferred/error.rb | 9 ++
lib/delayer/deferred/promise.rb | 78 ++++++++++
lib/delayer/deferred/request.rb | 61 ++++++++
lib/delayer/deferred/response.rb | 26 ++++
lib/delayer/deferred/thread.rb | 43 ++++--
lib/delayer/deferred/tools.rb | 30 ++--
lib/delayer/deferred/version.rb | 2 +-
lib/delayer/deferred/worker.rb | 112 ++++++++++++++
test/deferred_test.rb | 14 ++
test/enumerable_test.rb | 2 +-
test/graph_test.rb | 52 +++++++
test/helper.rb | 11 +-
test/promise_test.rb | 91 +++++++++++
test/thread_test.rb | 14 +-
30 files changed, 1229 insertions(+), 284 deletions(-)
diff --git a/README.md b/README.md
index ba9f63d..b138ec8 100644
--- a/README.md
+++ b/README.md
@@ -1,7 +1,6 @@
# Delayer::Deferred
-Delayerを使って、jsdeferredをRubyに移植したものです。
-jsdeferredでできること以外に、Thread、Enumeratorを拡張します。
+Delayerの、ブロックを実行キューにキューイングする機能を利用し、エラー処理やasync/awaitのような機能をサポートするライブラリです。
## Installation
@@ -101,7 +100,7 @@ Error occured!
```
### Thread
-Threadには、Delayer::Deferred::Deferredableモジュールがincludeされていて、nextやtrapメソッドが使えます。
+Threadには、nextやtrapメソッドが実装されているので、Deferredのように扱うことができます。
```ruby
Delayer.default = Delayer.generate_class # Delayerの準備
@@ -117,6 +116,8 @@ Delayer.run
2
```
+この場合、nextやtrapのブロックは、全て `Delayer.run` メソッドが実行された側のThreadで実行されます。
+
### Automatically Divide a Long Loop
`Enumerable#deach`, `Enumerator#deach`はeachの変種で、Delayerのexpireの値よりループに時間がかかったら一旦処理を中断して、続きを実行するDeferredを新たに作ります。
@@ -156,6 +157,79 @@ divided
また、このメソッドはDeferredを返すので、ループが終わった後に処理をしたり、エラーを受け取ったりできます。
+### Pass to another Delayer
+
+Deferredのコンテキストの中で `Deferred.pass` を呼ぶと、そこで一旦処理が中断し、キューの最後に並び直します。
+他のDelayerが処理され終わると `Deferred.pass` から処理が戻ってきて、再度そこから実行が再開されます。
+
+`Deferred.pass` は常に処理を中断するわけではなく、Delayerの時間制限を過ぎている場合にのみ処理をブレークします。
+用途としては `Enumerator#deach` が使えないようなループの中で毎回呼び出して、長時間処理をブロックしないようにするといった用途が考えられます。
+
+`Enumerator#deach` は `Deferred.pass` を用いて作られています。
+
+### Combine Deferred
+
+`Deferred.when` は、引数に2つ以上のDeferredを受け取り、新たなDeferredを一つ返します。
+
+引数のDeferredすべてが正常に終了したら、戻り値のDeferredのnextブロックが呼ばれ、whenの引数の順番通りに戻り値が渡されます。
+複数のDeferredがあって、それらすべてが終了するのを待ち合わせる時に使うと良いでしょう。
+
+```ruby
+web_a = Thread.new{ open("http://example.com/a.html") }
+web_b = Thread.new{ open("http://example.com/b.html") }
+web_c = Thread.new{ open("http://example.com/c.html") }
+
+# 引数の順番は対応している
+Deferred.when(web_a, web_b, web_c).next do |a, b, c|
+ ...
+end
+
+# 配列を渡すことも出来る
+Deferred.when([web_a, web_b, web_c]).next do |a, b, c|
+ ...
+end
+
+```
+
+引数のDeferredのうち、どれか一つでも失敗すると、直ちに `Deferred.when` の戻り値のtrapブロックが呼ばれます。trapの引数は、失敗したDeferredのそれです。
+
+どれか一つでも失敗すると、他のDeferredが成功していたとしてもその結果は破棄されるということに気をつけてください。より細かく制御したい場合は、Async/Awaitを利用しましょう。
+
+```ruby
+divzero = Delayer::Deferred.new {
+ 1 / 0
+}
+web_a = Thread.new{ open("http://example.com/a.html") }
+
+Deferred.when(divzero, web_a).next{
+ puts 'success'
+}.trap{|err|
+ p err
+}
+```
+
+```
+\#<ZeroDivisionError: divided by 0>
+```
+
+### Async/Await
+
+Deferred#next や Deferred#trap のブロック内では、Deferredable#+@ が使えます。非同期な処理を同期処理のように書くことができます。
+
++@を呼び出すと、呼び出し元のDeferredの処理が一時停止し、+@のレシーバになっているDeferredableが完了した後に処理が再開されます。また、戻り値はレシーバのDeferredableのそれになります。
+
+```
+request = Thread.new{ open("http://mikutter.hachune.net/download/unstable.json").read }
+Deferred.next{
+ puts "最新の不安定版mikutterのバージョンは"
+ response = JSON.parse(+request)
+ puts response.first["version_string"]
+ puts "です"
+}
+```
+
+`+request` が呼ばれた時、リクエスト完了まで処理は一時止まりますが、他にDelayerキューにジョブが溜まっていたら、そちらが実行されます。この機能を使わない場合は、HTTPレスポンスを受け取るまでDelayerの他のジョブは停止してしまいます。
+
## Contributing
1. Fork it ( https://github.com/toshia/delayer-deferred/fork )
diff --git a/delayer-deferred.gemspec b/delayer-deferred.gemspec
index 4ad228b..47f0fb5 100644
--- a/delayer-deferred.gemspec
+++ b/delayer-deferred.gemspec
@@ -25,6 +25,7 @@ Gem::Specification.new do |spec|
spec.add_development_dependency "bundler", "~> 1.7"
spec.add_development_dependency "rake", "~> 10.0"
spec.add_development_dependency "minitest", "~> 5.7"
+ spec.add_development_dependency "simplecov"
spec.add_development_dependency "ruby-prof"
spec.add_development_dependency 'guard'
spec.add_development_dependency 'guard-shell'
diff --git a/lib/delayer/deferred.rb b/lib/delayer/deferred.rb
index 43d0a88..0a660da 100644
--- a/lib/delayer/deferred.rb
+++ b/lib/delayer/deferred.rb
@@ -10,25 +10,26 @@ require "delayer/deferred/version"
module Delayer
module Deferred
- extend Delayer::Deferred::Tools
-
class << self
#真ならデバッグ情報を集める
attr_accessor :debug
- def new(*rest, &proc)
- Delayer::Deferred::Deferred.new(*rest, &proc) end
+ def method_missing(*rest, &block)
+ Delayer::Deferred::Deferred.__send__(*rest, &block)
+ end
end
end
module Extend
- def Deferred
- @deferred ||= begin
- the_delayer = self
- Class.new(::Delayer::Deferred::Deferred) {
- define_singleton_method(:delayer) {
- the_delayer } } end
+ def Promise
+ @promise ||= begin
+ the_delayer = self
+ Class.new(::Delayer::Deferred::Promise) {
+ define_singleton_method(:delayer) {
+ the_delayer } } end
end
+ alias :Deferred :Promise
+ #deprecate :Deferred, "Promise", 2018, 03
end
end
diff --git a/lib/delayer/deferred/chain.rb b/lib/delayer/deferred/chain.rb
new file mode 100644
index 0000000..c83748e
--- /dev/null
+++ b/lib/delayer/deferred/chain.rb
@@ -0,0 +1,10 @@
+# -*- coding: utf-8 -*-
+
+module Delayer::Deferred
+ module Chain; end
+end
+
+require "delayer/deferred/chain/await"
+require "delayer/deferred/chain/base"
+require "delayer/deferred/chain/next"
+require "delayer/deferred/chain/trap"
diff --git a/lib/delayer/deferred/chain/await.rb b/lib/delayer/deferred/chain/await.rb
new file mode 100644
index 0000000..55ab02b
--- /dev/null
+++ b/lib/delayer/deferred/chain/await.rb
@@ -0,0 +1,43 @@
+# -*- coding: utf-8 -*-
+require "delayer/deferred/chain/base"
+
+module Delayer::Deferred::Chain
+ class Await < Base
+ def initialize(worker:, deferred:)
+ super()
+ @worker, @awaiting_deferred = worker, deferred
+ deferred.add_awaited(self)
+ end
+
+ def activate(response)
+ change_sequence(:activate)
+ @worker.give_response(response, @awaiting_deferred)
+ # TODO: 即座にspoilさせてよさそう
+ ensure
+ change_sequence(:complete)
+ end
+
+ def graph_child(output:)
+ output << graph_mynode
+ if has_child?
+ @child.graph_child(output: output)
+ @awaiting_deferred.graph_child(output: output)
+ output << "#{__id__} -> #{@child.__id__}"
+ end
+ nil
+ end
+
+ def node_name
+ "Await"
+ end
+
+ def graph_shape
+ 'circle'.freeze
+ end
+
+ def graph_mynode
+ label = "#{node_name}\n(#{sequence.name})"
+ "#{__id__} [shape=#{graph_shape},label=#{label.inspect}]"
+ end
+ end
+end
diff --git a/lib/delayer/deferred/chain/base.rb b/lib/delayer/deferred/chain/base.rb
new file mode 100644
index 0000000..41b9e5f
--- /dev/null
+++ b/lib/delayer/deferred/chain/base.rb
@@ -0,0 +1,35 @@
+# -*- coding: utf-8 -*-
+require "delayer/deferred/deferredable/chainable"
+require "delayer/deferred/deferredable/node_sequence"
+
+module Delayer::Deferred::Chain
+ class Base
+ include Delayer::Deferred::Deferredable::NodeSequence
+ include Delayer::Deferred::Deferredable::Chainable
+
+ def initialize(&proc)
+ fail Error, "Delayer::Deferred::Chain can't create instance." if self.class == Delayer::Deferred::Chain::Base
+ @proc = proc
+ end
+
+ def activate(response)
+ change_sequence(:activate)
+ if evaluate?(response)
+ @proc.(response.value)
+ else
+ response
+ end
+ ensure
+ change_sequence(:complete)
+ end
+
+ def inspect
+ "#<#{self.class} seq:#{sequence.name} child:#{has_child?}>"
+ end
+
+ def node_name
+ @proc.source_location.join(':'.freeze)
+ end
+ end
+end
+
diff --git a/lib/delayer/deferred/chain/next.rb b/lib/delayer/deferred/chain/next.rb
new file mode 100644
index 0000000..0226bb7
--- /dev/null
+++ b/lib/delayer/deferred/chain/next.rb
@@ -0,0 +1,16 @@
+# -*- coding: utf-8 -*-
+require "delayer/deferred/chain/base"
+
+module Delayer::Deferred::Chain
+ class Next < Base
+ def evaluate?(response)
+ response.ok?
+ end
+
+ private
+
+ def graph_shape
+ 'box'.freeze
+ end
+ end
+end
diff --git a/lib/delayer/deferred/chain/trap.rb b/lib/delayer/deferred/chain/trap.rb
new file mode 100644
index 0000000..462de0a
--- /dev/null
+++ b/lib/delayer/deferred/chain/trap.rb
@@ -0,0 +1,16 @@
+# -*- coding: utf-8 -*-
+require "delayer/deferred/chain/base"
+
+module Delayer::Deferred::Chain
+ class Trap < Base
+ def evaluate?(response)
+ response.ng?
+ end
+
+ private
+
+ def graph_shape
+ 'diamond'.freeze
+ end
+ end
+end
diff --git a/lib/delayer/deferred/deferred.rb b/lib/delayer/deferred/deferred.rb
index 02c5c41..bb84767 100644
--- a/lib/delayer/deferred/deferred.rb
+++ b/lib/delayer/deferred/deferred.rb
@@ -1,78 +1,10 @@
# -*- coding: utf-8 -*-
+require "delayer/deferred/promise"
+require "delayer/deferred/chain"
require "delayer/deferred/deferredable"
-require "delayer/deferred/tools"
+require "delayer/deferred/worker"
require "delayer/deferred/version"
module Delayer::Deferred
- class Deferred
- extend Delayer::Deferred::Tools
- include Deferredable
-
- def self.inherited(subclass)
- subclass.extend(::Delayer::Deferred)
- end
-
- def self.Thread
- @thread_class ||= gen_thread_class end
-
- def self.gen_thread_class
- the_delayer = delayer
- Class.new(Thread) do
- define_singleton_method(:delayer) do
- the_delayer end end end
-
- def self.delayer
- ::Delayer end
-
- def self.new(*args)
- deferred = super(*args)
- if block_given?
- deferred.next(&Proc.new)
- else
- deferred end
- end
-
- def initialize(follow = nil)
- super()
- @follow = follow
- @backtrace = caller if ::Delayer::Deferred.debug end
-
- alias :deferredable_cancel :cancel
- def cancel
- deferredable_cancel
- @follow.cancel if @follow.is_a? Deferredable end
-
- def inspect
- if ::Delayer::Deferred.debug
- sprintf("#<%s: %p %s follow:%p stat:%s value:%s>".freeze, self.class, object_id, @backtrace.find{|n|not n.include?("delayer/deferred".freeze)}, @follow ? @follow.object_id : 0, @next_call_stat.inspect, @next_call_value.inspect)
- else
- sprintf("#<%s: %p stat:%s value:%s>".freeze, self.class, object_id, @next_call_stat.inspect, @next_call_value.inspect)
- end
- end
-
- def self.fiber(&block)
- @_fiber ||= _gen_new_fiber
- begin
- result = @_fiber.resume(block)
- rescue FiberError
- @_fiber = _gen_new_fiber
- result = @_fiber.resume(block)
- end
- if result.is_a?(Delayer::Deferred::ResultContainer)
- result
- else
- _fiber = @_fiber
- @_fiber = nil
- return result, _fiber
- end
- end
-
- def self._gen_new_fiber
- Fiber.new do |b|
- loop do
- b = Fiber.yield(b.())
- end
- end
- end
- end
+ Deferred = Promise
end
diff --git a/lib/delayer/deferred/deferredable.rb b/lib/delayer/deferred/deferredable.rb
index c0d16cd..54c21e1 100644
--- a/lib/delayer/deferred/deferredable.rb
+++ b/lib/delayer/deferred/deferredable.rb
@@ -1,162 +1,12 @@
# -*- coding: utf-8 -*-
require "delayer/deferred/version"
-require "delayer/deferred/result_container"
-
-# なんでもDeferred
-module Delayer::Deferred::Deferredable
- Callback = Struct.new(*%i<ok ng backtrace>)
- CallbackDefaultOK = lambda{ |x| x }
- CallbackDefaultNG = lambda{ |err| Delayer::Deferred.fail(err) }
-
- # このDeferredが成功した場合の処理を追加する。
- # 新しいDeferredのインスタンスを返す
- def next(&proc)
- _post(:ok, &proc) end
- alias deferred next
-
- # このDeferredが失敗した場合の処理を追加する。
- # 新しいDeferredのインスタンスを返す
- def trap(&proc)
- _post(:ng, &proc) end
- alias error trap
-
- # Deferredを直ちに実行する
- def call(value = nil)
- _call(:ok, value) end
-
- # Deferredを直ちに失敗させる
- def fail(exception = nil)
- _call(:ng, exception) end
-
- # _self_ が終了して結果が出るまで呼び出し側のDeferredを停止し、 _self_ の結果を返す。
- # 呼び出し側はDeferredブロック内でなければならないが、 _Deferred#next_ を使わずに
- # 直接戻り値を得ることが出来る。
- # _self_ が失敗した場合は、呼び出し側のDeferredの直近の _trap_ ブロックが呼ばれる。
- def +@
- interrupt = Fiber.yield(self)
- if interrupt.ok?
- interrupt.value
- else
- Delayer::Deferred.fail(interrupt.value)
- end
- end
-
- # この一連のDeferredをこれ以上実行しない
- def cancel
- @callback = Callback.new(CallbackDefaultOK,
- CallbackDefaultNG).freeze end
-
- def callback
- @callback ||= Callback.new(CallbackDefaultOK,
- CallbackDefaultNG) end
-
- # second 秒待って次を実行する
- # ==== Args
- # [second] 待つ秒数(second)
- # ==== Return
- # Deferred
- def wait(second)
- self.next{ Thread.new{ sleep(second) } } end
-
- private
-
- def delayer
- self.class.delayer
- end
-
- def _call(stat = :ok, value = nil)
- delayer.new do
- result, fiber = delayer.Deferred.fiber do
- begin
- result = catch(:__deferredable_fail) do
- Delayer::Deferred::ResultContainer.new(true, _execute(stat, value))
- end
- if result.is_a?(Delayer::Deferred::ResultContainer)
- result
- else
- Delayer::Deferred::ResultContainer.new(false, result)
- end
- rescue Exception => exception
- Delayer::Deferred::ResultContainer.new(false, exception)
- end
- end
- #_wait_fiber(fiber, nil)
- if fiber
- _fiber_stopped(result){|i| _wait_fiber(fiber, i) }
- else
- _fiber_completed(result)
- end
- end
- end
-
-
- def _execute(stat, value)
- callback[stat].call(value)
- end
-
- def _wait_fiber(fiber, resume_value)
- result = fiber.resume(resume_value)
- if result.is_a?(Delayer::Deferred::ResultContainer)
- _fiber_completed(result)
- else
- _fiber_stopped(result){|i| _wait_fiber(fiber, i) }
- end
- end
-
- # Deferredブロックが最後まで終わり、これ以上やることがない時に呼ばれる
- def _fiber_completed(result)
- result_value = result.value
- if result.ok?
- if result_value.is_a?(Delayer::Deferred::Deferredable)
- result_value.next{|v|
- _success_action(v)
- }.trap{|v|
- _fail_action(v)
- }
- else
- _success_action(result_value)
- end
- else
- _fail_action(result_value)
- end
- end
-
- # Deferredable#@+によって停止され、 _defer_ の完了次第処理を再開する必要がある時に呼ばれる
- def _fiber_stopped(defer, &cont)
- defer.next{|v|
- cont.(Delayer::Deferred::ResultContainer.new(true, v))
- }.trap{|v|
- cont.(Delayer::Deferred::ResultContainer.new(false, v))
- }
- end
-
- def _post(kind, &proc)
- @next = delayer.Deferred.new(self)
- @next.callback[kind] = proc
- if defined?(@next_call_stat) and defined?(@next_call_value)
- @next.__send__({ok: :call, ng: :fail}[@next_call_stat], @next_call_value)
- elsif defined?(@follow) and @follow.nil?
- call end
- @next end
-
- def register_next_call(stat, value)
- @next_call_stat, @next_call_value = stat, value
- self end
-
- def _success_action(obj)
- if defined?(@next)
- @next.call(obj)
- else
- register_next_call(:ok, obj)
- end
- end
-
- def _fail_action(err_obj)
- if defined?(@next)
- @next.fail(err_obj)
- else
- register_next_call(:ng, err_obj)
- end
- end
+module Delayer::Deferred
+ module Deferredable; end
end
+
+require "delayer/deferred/deferredable/awaitable"
+require "delayer/deferred/deferredable/chainable"
+require "delayer/deferred/deferredable/graph"
+require "delayer/deferred/deferredable/node_sequence"
+require "delayer/deferred/deferredable/trigger"
diff --git a/lib/delayer/deferred/deferredable/awaitable.rb b/lib/delayer/deferred/deferredable/awaitable.rb
new file mode 100644
index 0000000..f46b264
--- /dev/null
+++ b/lib/delayer/deferred/deferredable/awaitable.rb
@@ -0,0 +1,27 @@
+# -*- coding: utf-8 -*-
+
+module Delayer::Deferred::Deferredable
+ module Awaitable
+
+ # _self_ が終了して結果が出るまで呼び出し側のDeferredを停止し、 _self_ の結果を返す。
+ # 呼び出し側はDeferredブロック内でなければならないが、 _Deferred#next_ を使わずに
+ # 直接戻り値を得ることが出来る。
+ # _self_ が失敗した場合は、呼び出し側のDeferredの直近の _trap_ ブロックが呼ばれる。
+ def +@
+ response = Fiber.yield(Delayer::Deferred::Request::Await.new(self))
+ if response.ok?
+ response.value
+ else
+ Delayer::Deferred.fail(response.value)
+ end
+ end
+
+ def enter_await
+ change_sequence(:await)
+ end
+
+ def exit_await
+ change_sequence(:resume)
+ end
+ end
+end
diff --git a/lib/delayer/deferred/deferredable/chainable.rb b/lib/delayer/deferred/deferredable/chainable.rb
new file mode 100644
index 0000000..5bc4ad9
--- /dev/null
+++ b/lib/delayer/deferred/deferredable/chainable.rb
@@ -0,0 +1,155 @@
+# -*- coding: utf-8 -*-
+require "delayer/deferred/deferredable/awaitable"
+require "delayer/deferred/deferredable/graph"
+require "delayer/deferred/deferredable/node_sequence"
+
+module Delayer::Deferred::Deferredable
+ module Chainable
+ include Awaitable
+ include Graph
+ include NodeSequence
+
+ attr_reader :child
+
+ # このDeferredが成功した場合の処理を追加する。
+ # 新しいDeferredのインスタンスを返す。
+ # このメソッドはスレッドセーフです。
+ # TODO: procが空のとき例外を発生させる
+ def next(&proc)
+ add_child(Delayer::Deferred::Chain::Next.new(&proc))
+ end
+ alias deferred next
+
+ # このDeferredが失敗した場合の処理を追加する。
+ # 新しいDeferredのインスタンスを返す。
+ # このメソッドはスレッドセーフです。
+ # TODO: procが空のとき例外を発生させる
+ def trap(&proc)
+ add_child(Delayer::Deferred::Chain::Trap.new(&proc))
+ end
+ alias error trap
+
+ # この一連のDeferredをこれ以上実行しない。
+ # このメソッドはスレッドセーフです。
+ def cancel
+ change_sequence(:genocide) unless spoiled?
+ end
+
+ def has_child?
+ child ? true : false
+ end
+
+ # 子を追加する。
+ # _Delayer::Deferred::Chainable_ を直接指定できる。通常外部から呼ぶときは _next_ か _trap_ メソッドを使うこと。
+ # このメソッドはスレッドセーフです。
+ # ==== Args
+ # [chainable] 子となるDeferred
+ # ==== Return
+ # 必ず _chainable_ を返す
+ # ==== Raise
+ # [Delayer::Deferred::SequenceError]
+ # 既に子が存在している場合
+ def add_child(chainable)
+ change_sequence(:get_child) do
+ chainable.parent = self
+ @child = chainable
+ end
+ end
+
+ # 子が追加された時に一度だけコールバックするオブジェクトを登録する。
+ # observerと言っているが、実際には _Delayer::Deferred::Worker_ を渡して利用している。
+ # このメソッドはスレッドセーフです。
+ # ==== Args
+ # [observer] pushメソッドを備えているもの。引数に _ at child_ の値が渡される
+ # ==== Return
+ # self
+ def add_child_observer(observer)
+ change_sequence(:gaze) do
+ @child_observer = observer
+ end
+ self
+ end
+
+ def awaited
+ @awaited ||= [].freeze
+ end
+
+ def has_awaited?
+ not awaited.empty?
+ end
+
+ def add_awaited(awaitable)
+ @awaited = [*awaited, awaitable].freeze
+ self
+ end
+
+ # activateメソッドを呼ぶDelayerジョブを登録する寸前に呼ばれる。
+ def reserve_activate
+ change_sequence(:reserve)
+ end
+
+ def enter_pass
+ change_sequence(:pass)
+ end
+
+ def exit_pass
+ change_sequence(:resume)
+ end
+
+ protected
+
+ # 親を再帰的に辿り、一番最初のノードを返す。
+ # 親が複数見つかった場合は、それらを返す。
+ def ancestor
+ if @parent
+ @parent.ancestor
+ else
+ self
+ end
+ end
+
+ # cancelとかデバッグ用のコールグラフを得るために親を登録しておく。
+ # add_childから呼ばれる。
+ def parent=(chainable)
+ @parent = chainable
+ end
+
+ private
+
+ def call_child_observer
+ if has_child? and defined?(@child_observer)
+ change_sequence(:called)
+ @child_observer.push(@child)
+ end
+ end
+
+ def on_sequence_changed(old_seq, flow, new_seq)
+ case new_seq
+ when NodeSequence::BURST_OUT
+ call_child_observer
+ when NodeSequence::GENOCIDE
+ @parent.cancel if defined?(@parent) and @parent
+ when NodeSequence::RESERVED_C, NodeSequence::RUN_C, NodeSequence::PASS_C, NodeSequence::AWAIT_C, NodeSequence::GRAFT_C
+ if !has_child?
+ notice "child: #{@child.inspect}"
+ raise Delayer::Deferred::SequenceError.new("Sequence changed `#{old_seq.name}' to `#{flow}', but it has no child")
+ end
+ end
+ end
+
+ # ノードの名前。サブクラスでオーバライドし、ノードが定義されたファイルの名前や行数などを入れておく。
+ def node_name
+ self.class.to_s
+ end
+
+ def graph_mynode
+ if defined?(@seq_logger)
+ label = "#{node_name}\n(#{@seq_logger.map(&:name).join('→')})"
+ else
+ label = "#{node_name}\n(#{sequence.name})"
+ end
+ "#{__id__} [shape=#{graph_shape},label=#{label.inspect}]"
+ end
+
+ end
+end
diff --git a/lib/delayer/deferred/deferredable/graph.rb b/lib/delayer/deferred/deferredable/graph.rb
new file mode 100644
index 0000000..23ba803
--- /dev/null
+++ b/lib/delayer/deferred/deferredable/graph.rb
@@ -0,0 +1,118 @@
+# -*- coding: utf-8 -*-
+
+module Delayer::Deferred::Deferredable
+=begin rdoc
+graphvizによってChainableなDeferredをDOT言語形式でダンプする機能を追加するmix-in。
+いずれかのノードに対して _graph_ メソッドを呼ぶと、再帰的に親子を全て辿り、digraphの断片の文字列を得ることが出来る。
+
+== 出力例
+
+ 20892180 [shape=egg,label="#<Class:0x000000027da288>.Promise\n(reserved)"]
+ 20892480 [shape=box,label="test/thread_test.rb:53\n(connected)"]
+ 20891440 [shape=diamond,label="test/thread_test.rb:56\n(fresh)"]
+ 20892480 -> 20891440
+ 20892180 -> 20892480
+
+=end
+ module Graph
+ # この一連のDeferredチェインの様子を、DOT言語フォーマットで出力する
+ # ==== Args
+ # [child_only:]
+ # _true_ なら、このノードとその子孫のみを描画する。
+ # _false_ なら、再帰的に親を遡り、そこから描画を開始する。
+ # [output:]
+ # このオブジェクトに、 _<<_ メソッドで内容が書かれる。
+ # 省略した場合は、戻り値が _String_ になる。
+ # ==== Return
+ # [String] DOT言語によるグラフ
+ # [output:] 引数 output: に指定されたオブジェクト
+ def graph(child_only: false, output: String.new)
+ if child_only
+ output << "digraph Deferred {\n".freeze
+ Enumerator.new{ |yielder|
+ graph_child(output: yielder)
+ }.lazy.each{|l|
+ output << "\t#{l}\n"
+ }
+ output << '}'.freeze
+ else
+ ancestor.graph(child_only: true, output: output)
+ end
+ end
+
+ # Graph.graph の結果を内容とする一時ファイルを作成して返す。
+ # ただし、ブロックを渡された場合は、一時ファイルを引数にそのブロックを一度だけ実行し、ブロックの戻り値をこのメソッドの戻り値とする。
+ # ==== Args
+ # [&block] 一時ファイルを利用する処理
+ # ==== Return
+ # [Tempfile] ブロックを指定しなかった場合。作成された一時ファイルオブジェクト
+ # [Object] ブロックが指定された場合。ブロックの実行結果。
+ def graph_save(permanent: false, &block)
+ if block
+ Tempfile.open{|tmp|
+ graph(output: tmp)
+ tmp.seek(0)
+ block.(tmp)
+ }
+ else
+ tmp = Tempfile.open
+ graph(output: tmp).tap{|t|t.seek(0)}
+ end
+ end
+
+ # 画像ファイルとしてグラフを書き出す。
+ # dotコマンドが使えないと失敗する。
+ # ==== Args
+ # [format:] 画像の拡張子
+ # ==== Return
+ # [String] 書き出したファイル名
+ def graph_draw(dir: '/tmp', format: 'svg'.freeze)
+ graph_save do |dotfile|
+ base = File.basename(dotfile.path)
+ dest = File.join(dir, "#{base}.#{format}")
+ system("dot -T#{format} #{dotfile.path} -o #{dest}")
+ dest
+ end
+ end
+
+ # このノードとその子全てのDeferredチェインの様子を、DOT言語フォーマットで出力する。
+ # Delayer::Deferred::Deferredable::Graph#graph の内部で利用されるため、将来このメソッドのインターフェイスは変更される可能性がある。
+ def graph_child(output:)
+ output << graph_mynode
+ if has_child?
+ @child.graph_child(output: output)
+ output << "#{__id__} -> #{@child.__id__}"
+ end
+ if has_awaited?
+ awaited.each do |awaitable|
+ if awaitable.is_a?(Delayer::Deferred::Deferredable::Chainable)
+ awaitable.ancestor.graph_child(output: output)
+ else
+ label = "#{awaitable.class}"
+ output << "#{awaitable.__id__} [shape=oval,label=#{label.inspect}]"
+ end
+ output << "#{awaitable.__id__} -> #{__id__} [label = \"await\", style = \"dotted\"]"
+ end
+ end
+ nil
+ end
+
+ private
+
+ # このノードを描画する時の形の名前を文字列で返す。
+ # 以下のページにあるような、graphvizで取り扱える形の中から選ぶこと。
+ # http://www.graphviz.org/doc/info/shapes.html
+ def graph_shape
+ 'oval'.freeze
+ end
+
+ # このノードの形などをDOT言語の断片で返す。
+ # このメソッドをオーバライドすることで、描画されるノードの見た目を自由に変更することが出来る。
+ # ただし、簡単な変更だけなら別のメソッドをオーバライドするだけで可能なので、このmix-inの他のメソッドも参照すること。
+ def graph_mynode
+ label = "#{node_name}\n(#{sequence.name})"
+ "#{__id__} [shape=#{graph_shape},label=#{label.inspect}]"
+ end
+
+ end
+end
diff --git a/lib/delayer/deferred/deferredable/node_sequence.rb b/lib/delayer/deferred/deferredable/node_sequence.rb
new file mode 100644
index 0000000..dd57887
--- /dev/null
+++ b/lib/delayer/deferred/deferredable/node_sequence.rb
@@ -0,0 +1,158 @@
+# -*- coding: utf-8 -*-
+require 'delayer/deferred/error'
+
+require 'thread'
+
+module Delayer::Deferred::Deferredable
+ module NodeSequence
+ class Sequence
+ attr_reader :name
+
+ def initialize(name)
+ @name = name.to_sym
+ @map = {}
+ @exceptions = Hash.new(Delayer::Deferred::SequenceError)
+ end
+
+ def add(seq, flow = seq.name)
+ @map[flow] = seq
+ self
+ end
+
+ def exception(exc, flow)
+ @exceptions[flow] = exc
+ self
+ end
+
+ def pull(flow)
+ if @map.has_key?(flow.to_sym)
+ @map[flow.to_sym]
+ else
+ raise @exceptions[flow.to_sym], "Invalid sequence flow `#{name}' to `#{flow}'."
+ end
+ end
+
+ def inspect
+ "#<#{self.class}: #{name}>"
+ end
+ end
+
+ FRESH = Sequence.new(:fresh)
+ CONNECTED = Sequence.new(:connected) # 子がいる、未実行
+ RESERVED = Sequence.new(:reserved) # 実行キュー待ち
+ RESERVED_C= Sequence.new(:reserved) # 実行キュー待ち(子がいる)
+ RUN = Sequence.new(:run) # 実行中
+ RUN_C = Sequence.new(:run) # 実行中(子がいる)
+ PASS = Sequence.new(:pass) # パス中
+ PASS_C = Sequence.new(:pass) # パス中
+ AWAIT = Sequence.new(:await) # Await中
+ AWAIT_C = Sequence.new(:await) # Await中(子がいる)
+ GRAFT = Sequence.new(:graft) # 戻り値がAwaitableの時
+ GRAFT_C = Sequence.new(:graft) # 戻り値がAwaitableの時(子がいる)
+ CALL_CHILD= Sequence.new(:call_child) # 完了、子がいる
+ STOP = Sequence.new(:stop) # 完了、子なし
+ WAIT = Sequence.new(:wait) # 完了、オブザーバ登録済み
+ BURST_OUT = Sequence.new(:burst_out) # 完了、オブザーバ登録済み、子追加済み
+ ROTTEN = Sequence.new(:rotten).freeze # 終了
+ GENOCIDE = Sequence.new(:genocide).freeze# この地ではかつて大量虐殺があったという。
+
+ FRESH
+ .add(CONNECTED, :get_child)
+ .add(RESERVED, :reserve)
+ .add(GENOCIDE).freeze
+ CONNECTED
+ .add(RESERVED_C, :reserve)
+ .exception(Delayer::Deferred::MultipleAssignmentError, :get_child)
+ .add(GENOCIDE).freeze
+ RESERVED
+ .add(RUN, :activate)
+ .add(RESERVED_C, :get_child)
+ .add(GENOCIDE).freeze
+ RESERVED_C
+ .add(RUN_C, :activate)
+ .exception(Delayer::Deferred::MultipleAssignmentError, :get_child)
+ .add(GENOCIDE).freeze
+ RUN
+ .add(RUN_C, :get_child)
+ .add(PASS)
+ .add(AWAIT, :await)
+ .add(STOP, :complete)
+ .add(GENOCIDE).freeze
+ RUN_C
+ .add(PASS_C)
+ .add(AWAIT_C, :await)
+ .add(CALL_CHILD, :complete)
+ .exception(Delayer::Deferred::MultipleAssignmentError, :get_child)
+ .add(GENOCIDE).freeze
+ PASS
+ .add(PASS_C, :get_child)
+ .add(RUN, :resume)
+ .add(GENOCIDE).freeze
+ PASS_C
+ .add(RUN_C, :resume)
+ .add(GENOCIDE).freeze
+ AWAIT
+ .add(RUN, :resume)
+ .add(AWAIT_C, :get_child)
+ .add(GENOCIDE).freeze
+ AWAIT_C
+ .add(RUN_C, :resume)
+ .exception(Delayer::Deferred::MultipleAssignmentError, :get_child)
+ .add(GENOCIDE).freeze
+ CALL_CHILD
+ .add(GRAFT_C, :await)
+ .add(ROTTEN, :called)
+ .add(GENOCIDE).freeze
+ GRAFT
+ .add(STOP, :resume)
+ .add(GRAFT_C, :get_child)
+ .add(GENOCIDE).freeze
+ GRAFT_C
+ .add(CALL_CHILD, :resume)
+ .add(GENOCIDE).freeze
+ STOP
+ .add(GRAFT, :await)
+ .add(WAIT, :gaze)
+ .add(GENOCIDE).freeze
+ WAIT
+ .add(BURST_OUT, :get_child)
+ .add(GENOCIDE).freeze
+ BURST_OUT
+ .add(ROTTEN, :called)
+ .add(GENOCIDE).freeze
+
+ SEQUENCE_LOCK = Monitor.new
+
+ def sequence
+ @sequence ||= FRESH
+ end
+
+ # このメソッドはスレッドセーフです
+ def change_sequence(flow, &block)
+ SEQUENCE_LOCK.synchronize do
+ old_seq = sequence
+ new_seq = @sequence = sequence.pull(flow)
+ (@seq_logger ||= [old_seq]) << new_seq
+ if block
+ result = block.()
+ on_sequence_changed(old_seq, flow, new_seq)
+ result
+ else
+ on_sequence_changed(old_seq, flow, new_seq)
+ nil
+ end
+ end
+ end
+
+ def on_sequence_changed(old_seq, flow, new_seq)
+ end
+
+ def activated?
+ ![FRESH, CONNECTED, RUN, RUN_C].include?(sequence)
+ end
+
+ def spoiled?
+ sequence == ROTTEN || sequence == GENOCIDE
+ end
+ end
+end
diff --git a/lib/delayer/deferred/deferredable/trigger.rb b/lib/delayer/deferred/deferredable/trigger.rb
new file mode 100644
index 0000000..63b4fe5
--- /dev/null
+++ b/lib/delayer/deferred/deferredable/trigger.rb
@@ -0,0 +1,34 @@
+# -*- coding: utf-8 -*-
+require "delayer/deferred/deferredable/chainable"
+require "delayer/deferred/deferredable/node_sequence"
+require "delayer/deferred/response"
+
+module Delayer::Deferred::Deferredable
+=begin rdoc
+Promiseなど、親を持たず、自身がWorkerを作成できるもの。
+=end
+ module Trigger
+ include NodeSequence
+ include Chainable
+
+ # Deferredを直ちに実行する。
+ # このメソッドはスレッドセーフです。
+ def call(value = nil)
+ execute(Delayer::Deferred::Response::Ok.new(value))
+ end
+
+ # Deferredを直ちに失敗させる。
+ # このメソッドはスレッドセーフです。
+ def fail(exception = nil)
+ execute(Delayer::Deferred::Response::Ng.new(exception))
+ end
+
+ private
+
+ def execute(value)
+ worker = Delayer::Deferred::Worker.new(delayer: self.class.delayer,
+ initial: value)
+ worker.push(self)
+ end
+ end
+end
diff --git a/lib/delayer/deferred/enumerator.rb b/lib/delayer/deferred/enumerator.rb
index caa3b16..7776a23 100644
--- a/lib/delayer/deferred/enumerator.rb
+++ b/lib/delayer/deferred/enumerator.rb
@@ -5,12 +5,10 @@ require "delayer/deferred/deferred"
class Enumerator
def deach(delayer=Delayer, &proc)
delayer.Deferred.new.next do
- begin
- loop do
- proc.call(self.next())
- if delayer.expire?
- break deach(delayer, &proc) end end
- rescue StopIteration
- nil end end
+ self.each do |node|
+ delayer.Deferred.pass
+ proc.(node)
+ end
+ end
end
end
diff --git a/lib/delayer/deferred/error.rb b/lib/delayer/deferred/error.rb
index ab597df..3d0d2bc 100644
--- a/lib/delayer/deferred/error.rb
+++ b/lib/delayer/deferred/error.rb
@@ -10,4 +10,13 @@ module Delayer::Deferred
@process = process
end
end
+
+ SequenceError = Class.new(Error) do
+ attr_accessor :deferred
+ def initialize(message, deferred: nil)
+ super(message)
+ @deferred = deferred
+ end
+ end
+ MultipleAssignmentError = Class.new(SequenceError)
end
diff --git a/lib/delayer/deferred/promise.rb b/lib/delayer/deferred/promise.rb
new file mode 100644
index 0000000..23ae235
--- /dev/null
+++ b/lib/delayer/deferred/promise.rb
@@ -0,0 +1,78 @@
+# -*- coding: utf-8 -*-
+require "delayer/deferred/tools"
+require "delayer/deferred/deferredable/trigger"
+
+module Delayer::Deferred
+ class Promise
+ extend Delayer::Deferred::Tools
+ include Deferredable::Trigger
+
+ class << self
+ def new(stop=false, name: caller_locations(1,1).first.to_s, &block)
+ result = promise = super(name: name)
+ result = promise.next(&block) if block_given?
+ promise.call(true) unless stop
+ result
+ end
+
+ def Thread
+ @thread_class ||= gen_thread_class end
+
+ def Promise
+ self
+ end
+
+ def delayer
+ ::Delayer
+ end
+
+ def to_s
+ "#{self.delayer}.Promise"
+ end
+
+ private
+
+ def gen_thread_class
+ the_delayer = delayer
+ Class.new(Thread) do
+ define_singleton_method(:delayer) do
+ the_delayer
+ end
+ end
+ end
+ end
+
+ def initialize(name:)
+ super()
+ @name = name
+ end
+
+ def activate(response)
+ change_sequence(:activate)
+ change_sequence(:complete)
+ response
+ end
+
+ def inspect
+ "#<#{self.class} seq:#{sequence.name}>"
+ end
+
+ def ancestor
+ self
+ end
+
+ def parent=(chainable)
+ fail Error, "#{self.class} can't has parent."
+ end
+
+ private
+
+ def graph_shape
+ 'egg'.freeze
+ end
+
+ def node_name
+ @name.to_s
+ end
+ end
+end
diff --git a/lib/delayer/deferred/request.rb b/lib/delayer/deferred/request.rb
new file mode 100644
index 0000000..ca7b631
--- /dev/null
+++ b/lib/delayer/deferred/request.rb
@@ -0,0 +1,61 @@
+# -*- coding: utf-8 -*-
+
+# -*- coding: utf-8 -*-
+
+module Delayer::Deferred::Request
+ class Base
+ attr_reader :value
+ def initialize(value)
+ @value = value
+ end
+ end
+
+=begin rdoc
+Fiberが次のWorkerを要求している時に返す値。
+新たなインスタンスは作らず、 _NEXT_WORKER_ にあるインスタンスを使うこと。
+=end
+ class NextWorker < Base
+ # _deferred_ に渡された次のChainableに、 _deferred_ の戻り値を渡す要求を出す。
+ # ==== Args
+ # [deferred] 実行が完了したDeferred 。次のDeferredとして _deferred.child_ を呼び出す
+ # [worker] このDeferredチェインを実行しているWorker
+ def accept_request(worker:, deferred:)
+ if deferred.has_child?
+ worker.push(deferred.child)
+ else
+ deferred.add_child_observer(worker)
+ end
+ end
+ end
+
+=begin rdoc
+Chainable#+@ が呼ばれた時に、一旦そこで処理を止めるためのリクエスト。
+_value_ には、実行完了を待つDeferredが入っている。
+==== わかりやすい!
+ accept_requestメソッドの引数のdeferred {
+ +value
+ }
+=end
+ class Await < Base
+ alias_method :foreign_deferred, :value
+ def accept_request(worker:, deferred:)
+ deferred.enter_await
+ foreign_deferred.add_child(Delayer::Deferred::Chain::Await.new(worker: worker, deferred: deferred))
+ end
+ end
+
+=begin rdoc
+一旦処理を中断して、Delayerキューに並び直すためのリクエスト。
+Tools#pass から利用される。
+新たなインスタンスは作らず、 _PASS_ にあるインスタンスを使うこと。
+=end
+ class Pass < Base
+ def accept_request(worker:, deferred:)
+ deferred.enter_pass
+ worker.resume_pass(deferred)
+ end
+ end
+
+ NEXT_WORKER = NextWorker.new(nil).freeze
+ PASS = Pass.new(nil).freeze
+end
diff --git a/lib/delayer/deferred/response.rb b/lib/delayer/deferred/response.rb
new file mode 100644
index 0000000..5734df8
--- /dev/null
+++ b/lib/delayer/deferred/response.rb
@@ -0,0 +1,26 @@
+# -*- coding: utf-8 -*-
+
+module Delayer::Deferred::Response
+ class Base
+ attr_reader :value
+ def initialize(value)
+ @value = value
+ end
+
+ def ng?
+ !ok?
+ end
+ end
+
+ class Ok < Base
+ def ok?
+ true
+ end
+ end
+
+ class Ng < Base
+ def ok?
+ false
+ end
+ end
+end
diff --git a/lib/delayer/deferred/thread.rb b/lib/delayer/deferred/thread.rb
index 38c7cee..7bd7d7a 100644
--- a/lib/delayer/deferred/thread.rb
+++ b/lib/delayer/deferred/thread.rb
@@ -1,26 +1,40 @@
# -*- coding: utf-8 -*-
require "delayer"
-require "delayer/deferred/deferredable"
+require "delayer/deferred/deferredable/awaitable"
class Thread
- include ::Delayer::Deferred::Deferredable
+ include ::Delayer::Deferred::Deferredable::Awaitable
def self.delayer
Delayer
end
- def next(*rest, &block)
- __gen_promise.next(*rest, &block)
+ # このDeferredが成功した場合の処理を追加する。
+ # 新しいDeferredのインスタンスを返す。
+ # このメソッドはスレッドセーフです。
+ # TODO: procが空のとき例外を発生させる
+ def next(name: caller_locations(1,1).first.to_s, &proc)
+ add_child(Delayer::Deferred::Chain::Next.new(&proc), name: name)
end
+ alias deferred next
+
+ # このDeferredが失敗した場合の処理を追加する。
+ # 新しいDeferredのインスタンスを返す。
+ # このメソッドはスレッドセーフです。
+ # TODO: procが空のとき例外を発生させる
+ def trap(name: caller_locations(1,1).first.to_s, &proc)
+ add_child(Delayer::Deferred::Chain::Trap.new(&proc), name: name)
+ end
+ alias error trap
- def trap(*rest, &block)
- __gen_promise.trap(*rest, &block)
+ def add_child(chainable, name: caller_locations(1,1).first.to_s)
+ __gen_promise(name).add_child(chainable)
end
private
- def __gen_promise
- promise = delayer.Deferred.new(true)
+ def __gen_promise(name)
+ promise = self.class.delayer.Promise.new(true, name: name)
Thread.new(self) do |tt|
__promise_callback(tt, promise)
end
@@ -28,15 +42,16 @@ class Thread
end
def __promise_callback(tt, promise)
- failed = catch(:__deferredable_fail) do
- begin
- promise.call(tt.value)
- rescue Exception => err
+ begin
+ result = tt.value
+ self.class.delayer.new do
+ promise.call(result)
+ end
+ rescue Exception => err
+ self.class.delayer.new do
promise.fail(err)
end
- return
end
- promise.fail(failed)
end
end
diff --git a/lib/delayer/deferred/tools.rb b/lib/delayer/deferred/tools.rb
index 12784ba..7bbd7f7 100644
--- a/lib/delayer/deferred/tools.rb
+++ b/lib/delayer/deferred/tools.rb
@@ -18,6 +18,12 @@ module Delayer::Deferred
def fail(value)
throw(:__deferredable_fail, value) end
+ # 実行中のDeferredを、Delayerのタイムリミットが来ている場合に限り一旦中断する。
+ # 長期に渡る可能性のある処理で、必要に応じて他のタスクを先に実行してもよい場合に呼び出す。
+ def pass
+ Fiber.yield(Request::PASS) if delayer.expire?
+ end
+
# 複数のdeferredを引数に取って、それら全ての実行が終了したら、
# その結果を引数の順番通りに格納したArrayを引数に呼ばれるDeferredを返す。
# 引数のDeferredが一つでも失敗するとこのメソッドの返すDeferredも失敗する。
@@ -27,16 +33,20 @@ module Delayer::Deferred
# Deferred
def when(*args)
return self.next{[]} if args.empty?
- defer, *follow = args
- raise TypeError, "Argument of Deferred.when must be Delayer::Deferred::Deferredable" unless defer.is_a? Delayer::Deferred::Deferredable
- if follow.empty?
- defer.next{|res| [res] }
- else
- remain = self.when(*follow)
- defer.next do |res|
- remain.next do |follow_res|
- follow_res.unshift(res) end end end end
-
+ args = args.flatten
+ args.each_with_index{|d, index|
+ unless d.is_a?(Deferredable::Chainable) || d.is_a?(Deferredable::Awaitable)
+ raise TypeError, "Argument #{index} of Deferred.when must be #{Deferredable::Chainable}, but given #{d.class}"
+ end
+ if d.respond_to?(:has_child?) && d.has_child?
+ raise "Already assigned child for argument #{index}"
+ end
+ }
+ defer, *follow = *args
+ defer.next{|res|
+ [res, *follow.map{|d| +d }]
+ }
+ end
# Kernel#systemを呼び出して、コマンドが成功たら成功するDeferredを返す。
# 失敗した場合、trap{}ブロックには $? の値(Process::Status)か、例外が発生した場合それが渡される
# ==== Args
diff --git a/lib/delayer/deferred/version.rb b/lib/delayer/deferred/version.rb
index 1f7821f..4d0143c 100644
--- a/lib/delayer/deferred/version.rb
+++ b/lib/delayer/deferred/version.rb
@@ -1,5 +1,5 @@
module Delayer
module Deferred
- VERSION = "1.1.1"
+ VERSION = "2.0.0"
end
end
diff --git a/lib/delayer/deferred/worker.rb b/lib/delayer/deferred/worker.rb
new file mode 100644
index 0000000..0548124
--- /dev/null
+++ b/lib/delayer/deferred/worker.rb
@@ -0,0 +1,112 @@
+# -*- coding: utf-8 -*-
+require "delayer/deferred/request"
+require "delayer/deferred/response"
+
+module Delayer::Deferred
+=begin rdoc
+Deferredを実行するためのWorker。Deferredチェインを実行するFiberを
+管理する。
+
+== pushに渡すオブジェクトについて
+Worker#push に渡す引数は、activateメソッドを実装している必要がある。
+
+=== activate(response)
+==== Args
+response :: Delayer::Deferred::Response::Base Deferredに渡す値
+==== Returns
+[Delayer::Deferred::Response::Base]
+ これを返すと、値の自動変換が行われないため、意図的に失敗させたり、Deferredを次のブロックに伝搬させることができる。
+[Delayer::Deferred::Chainable]
+ 戻り値のDeferredが終わるまでWorkerの処理を停止する。
+ 再開された時、結果は戻り値のDeferredの結果に置き換えられる。
+[else]
+ _Delayer::Deferred::Response::Ok.new_ の引数に渡され、その結果が利用される
+=end
+ class Worker
+ def initialize(delayer:, initial:)
+ @delayer, @initial = delayer, initial
+ end
+
+ def push(deferred)
+ deferred.reserve_activate
+ @delayer.new do
+ next if deferred.spoiled?
+ begin
+ fiber.resume(deferred).accept_request(worker: self,
+ deferred: deferred)
+ rescue Delayer::Deferred::SequenceError => err
+ err.deferred = deferred
+ raise
+ end
+ end
+ nil
+ end
+
+ # Awaitから復帰した時に呼ばれる。
+ # ==== Args
+ # [response] Awaitの結果(Delayer::Deferred::Response::Base)
+ # [deferred] 現在実行中のDeferred
+ def give_response(response, deferred)
+ @delayer.new do
+ next if deferred.spoiled?
+ deferred.exit_await
+ fiber.resume(response).accept_request(worker: self,
+ deferred: deferred)
+ end
+ nil
+ end
+
+ # Tools#pass から復帰した時に呼ばれる。
+ # ==== Args
+ # [deferred] 現在実行中のDeferred
+ def resume_pass(deferred)
+ deferred.exit_pass
+ @delayer.new do
+ next if deferred.spoiled?
+ fiber.resume(nil).accept_request(worker: self,
+ deferred: deferred)
+ end
+ end
+
+ private
+
+ def fiber
+ @fiber ||= Fiber.new{|response|
+ loop do
+ response = wait_and_activate(response)
+ case response.value
+ when Delayer::Deferred::SequenceError
+ raise response.value
+ end
+ end
+ }.tap{|f| f.resume(@initial); @initial = nil }
+ end
+
+ def wait_and_activate(argument)
+ response = catch(:success) do
+ failed = catch(:__deferredable_fail) do
+ begin
+ if argument.value.is_a? Deferredable::Awaitable
+ throw :success, +argument.value
+ else
+ defer = Fiber.yield(Request::NEXT_WORKER)
+ res = defer.activate(argument)
+ if res.is_a? Delayer::Deferred::Deferredable::Awaitable
+ defer.add_awaited(res)
+ end
+ end
+ throw :success, res
+ rescue Exception => err
+ throw :__deferredable_fail, err
+ end
+ end
+ Response::Ng.new(failed)
+ end
+ if response.is_a?(Response::Base)
+ response
+ else
+ Response::Ok.new(response)
+ end
+ end
+ end
+end
diff --git a/test/deferred_test.rb b/test/deferred_test.rb
index ca4fd2a..f7ef9e2 100644
--- a/test/deferred_test.rb
+++ b/test/deferred_test.rb
@@ -98,6 +98,20 @@ describe(Delayer::Deferred) do
assert succeed, "Deferred did not executed."
end
+ it "assign twice" do
+ succeed = false
+ delayer = Delayer.generate_class
+ assert_raises(Delayer::Deferred::MultipleAssignmentError) do
+ eval_all_events(delayer) do
+ defer = delayer.Deferred.new.next {
+ succeed = 0
+ }
+ defer.next{ succeed = 1 }
+ defer.next{ succeed = 2 }
+ end
+ end
+ end
+
describe "Deferred.when" do
it "give 3 deferred" do
result = failure = false
diff --git a/test/enumerable_test.rb b/test/enumerable_test.rb
index 39c34b5..d285ddc 100644
--- a/test/enumerable_test.rb
+++ b/test/enumerable_test.rb
@@ -31,7 +31,7 @@ describe(Enumerable) do
c = a + b
yielder << a
a, b = b, c end end
- timeout(1) {
+ Timeout.timeout(1) {
fib.deach(@delayer) {|digit|
log << digit
}.next{
diff --git a/test/graph_test.rb b/test/graph_test.rb
new file mode 100644
index 0000000..3677afe
--- /dev/null
+++ b/test/graph_test.rb
@@ -0,0 +1,52 @@
+# -*- coding: utf-8 -*-
+require_relative 'helper'
+
+describe(Delayer::Deferred) do
+ include TestUtils
+
+ before do
+ Delayer.default = Delayer.generate_class
+ end
+
+ describe 'auto execution Promise' do
+ it 'should include Promise result of Promise.graph' do
+ promise = Delayer::Deferred::Promise.new
+ assert_includes promise.graph, 'egg', ->{"[[#{promise.graph_draw}]]"}
+ assert_includes promise.graph, 'reserved', ->{"[[#{promise.graph_draw}]]"}
+ end
+ end
+
+ describe 'Promise' do
+ it 'should include Promise result of Promise.graph' do
+ promise = Delayer::Deferred::Promise.new(true)
+ assert_includes promise.graph, 'egg', ->{"[[#{promise.graph_draw}]]"}
+ assert_includes promise.graph, 'fresh', ->{"[[#{promise.graph_draw}]]"}
+ end
+ end
+
+ describe 'Chain' do
+ it 'should include ' do
+ promise = Delayer::Deferred::Promise.new.next{ ; }
+ assert_includes promise.graph, 'graph_test.rb', ->{"[[#{promise.graph_draw}]]"}
+ end
+ end
+
+ describe 'Awaiting' do
+ it 'await' do
+ promise_a = Delayer::Deferred::Promise.new(true).next{ |buf|
+ buf << :a
+ }.next{ |buf|
+ buf << :b
+ }.trap{ |buf|
+ buf << :c
+ }
+ promise_b = Delayer::Deferred::Promise.new.next{
+ +promise_a << :e
+ }.trap{ |buf|
+ buf << :f
+ }
+ eval_all_events
+ end
+ end
+
+end
diff --git a/test/helper.rb b/test/helper.rb
index e3c188d..3c3ee53 100644
--- a/test/helper.rb
+++ b/test/helper.rb
@@ -1,9 +1,14 @@
require 'bundler/setup'
-require 'minitest/autorun'
-require 'delayer/deferred'
require 'securerandom'
require 'set'
require 'timeout'
-
require_relative 'testutils'
+
+require 'simplecov'
+SimpleCov.start do
+ add_filter "/test/"
+end
+
+require 'delayer/deferred'
+require 'minitest/autorun'
diff --git a/test/promise_test.rb b/test/promise_test.rb
new file mode 100644
index 0000000..f3eab1d
--- /dev/null
+++ b/test/promise_test.rb
@@ -0,0 +1,91 @@
+# -*- coding: utf-8 -*-
+
+require_relative 'helper'
+
+describe(Delayer::Deferred::Promise) do
+ include TestUtils
+
+ before do
+ Delayer.default = Delayer.generate_class
+ @delayer = Delayer.generate_class
+ end
+
+ describe 'get instance' do
+ it 'default delayer' do
+ assert_instance_of Delayer::Deferred::Promise, Delayer::Deferred::Promise.new
+ end
+
+ it 'another delayer' do
+ promise = @delayer.Promise.new
+ assert_instance_of @delayer.Promise, promise
+ end
+
+ describe "with block" do
+ before do
+ @promise = @delayer.Promise.new{ ; }
+ end
+
+ it 'was generated' do
+ assert_kind_of Delayer::Deferred::Chain::Next, @promise
+ end
+
+ it "doesn't have child" do
+ refute @promise.has_child?
+ end
+ end
+ end
+
+ describe 'chain' do
+ before do
+ @promise = @delayer.Promise.new(true)
+ end
+
+ describe 'next' do
+ before do
+ @record = nil
+ @chain = @promise.next{|x| @record = x + 1 }
+ end
+
+ it 'should execute next block if called promise#call' do
+ val = rand(1000)
+ eval_all_events(@delayer) do
+ @promise.call(val)
+ end
+ assert_equal val + 1, @record, ->{ "next block did not executed.\n[[#{@chain.graph_draw}]]" }
+ end
+
+ it "shouldn't execute next block if called promise#fail" do
+ val = rand(1000)
+ eval_all_events(@delayer) do
+ @promise.fail(val)
+ end
+ refute_equal val + 1, @record, ->{ "next block did executed.\n[[#{@chain.graph_draw}]]" }
+ end
+ end
+
+ describe 'trap' do
+ before do
+ @record = nil
+ @chain = @promise.trap{|x| @record = x + 1 }
+ end
+
+ it 'should execute trap block if called promise#fail' do
+ val = rand(1000)
+ eval_all_events(@delayer) do
+ @promise.fail(val)
+ end
+ assert_equal val + 1, @record, ->{ "trap block did not executed.\n[[#{@chain.graph_draw}]]" }
+ end
+
+ it "shouldn't execute trap block if called promise#call" do
+ val = rand(1000)
+ eval_all_events(@delayer) do
+ @promise.call(val)
+ end
+ refute_equal val + 1, @record, ->{ "trap block did executed.\n[[#{@chain.graph_draw}]]" }
+ end
+ end
+
+ end
+
+end
diff --git a/test/thread_test.rb b/test/thread_test.rb
index 5b9fb00..9d8e494 100644
--- a/test/thread_test.rb
+++ b/test/thread_test.rb
@@ -46,6 +46,7 @@ describe(Thread) do
thread = succeed = failure = result = false
uuid = SecureRandom.uuid
delayer = Delayer.generate_class
+ assert_equal delayer, delayer.Deferred.Thread.delayer
eval_all_events(delayer) do
delayer.Deferred.Thread.new {
thread = true
@@ -55,7 +56,7 @@ describe(Thread) do
result = param
}.trap{ |exception|
failure = exception } end
- assert_equal false, failure
+ assert_equal false, failure, 'Unexpected failed.'
assert thread, "Thread did not executed."
assert succeed, "next block did not executed."
assert_equal uuid, result
@@ -101,16 +102,19 @@ describe(Thread) do
result = failure = false
delayer = Delayer.generate_class
uuid = SecureRandom.uuid
- eval_all_events(delayer) do
+ node = eval_all_events(delayer) do
delayer.Deferred.new.next{
- delayer.Deferred.Thread.new{
- uuid }
+ delayer.Deferred.new.next{
+ delayer.Deferred.Thread.new{
+ uuid
+ }
+ }
}.next{ |value|
result = value
}.trap{ |exception|
failure = exception }
end
- assert_equal uuid, result
+ assert_equal uuid, result, ->{ "[[#{node.graph_draw}]]" }
assert_equal false, failure
end
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-ruby-extras/ruby-delayer-deferred.git
More information about the Pkg-ruby-extras-commits
mailing list