[DRE-commits] [ruby-eventmachine] 01/06: Imported Upstream version 1.0.7
zeha at debian.org
zeha at debian.org
Thu Apr 9 13:59:59 UTC 2015
This is an automated email from the git hooks/post-receive script.
zeha pushed a commit to branch master
in repository ruby-eventmachine.
commit 08235b62ab7f9cfcb394d464c442ef96e81bdedb
Author: Christian Hofstaedtler <zeha at debian.org>
Date: Thu Apr 9 15:43:45 2015 +0200
Imported Upstream version 1.0.7
---
.travis.yml | 20 +-
CHANGELOG.md | 53 +++-
README.md | 9 +-
eventmachine.gemspec | 2 +
ext/binder.cpp | 2 +-
ext/cmain.cpp | 33 ++-
ext/ed.cpp | 52 +++-
ext/ed.h | 10 +-
ext/em.cpp | 270 +++++++++++++--------
ext/em.h | 56 ++++-
ext/eventmachine.h | 3 +
ext/extconf.rb | 24 +-
ext/fastfilereader/extconf.rb | 5 +-
ext/fastfilereader/mapper.cpp | 2 +-
ext/project.h | 15 +-
ext/rubymain.cpp | 61 ++++-
ext/ssl.cpp | 5 +-
ext/ssl.h | 4 +
java/src/com/rubyeventmachine/EmReactor.java | 16 ++
.../src/com/rubyeventmachine/EventableChannel.java | 2 +
.../rubyeventmachine/EventableDatagramChannel.java | 6 +
.../rubyeventmachine/EventableSocketChannel.java | 65 ++++-
lib/em/buftok.rb | 119 +++------
lib/em/connection.rb | 2 +-
lib/em/iterator.rb | 49 +---
lib/em/protocols/httpclient.rb | 42 +++-
lib/em/protocols/line_and_text.rb | 5 +-
lib/em/protocols/linetext2.rb | 1 -
lib/em/protocols/smtpserver.rb | 41 +++-
lib/em/pure_ruby.rb | 4 +-
lib/em/resolver.rb | 19 +-
lib/em/tick_loop.rb | 38 +--
lib/em/version.rb | 2 +-
lib/eventmachine.rb | 29 ++-
lib/jeventmachine.rb | 17 ++
metadata.yml | 169 +++++++------
rakelib/package.rake | 11 +-
tests/em_test_helper.rb | 4 +
tests/test_attach.rb | 25 ++
tests/test_basic.rb | 30 ++-
tests/test_completion.rb | 1 +
tests/test_connection_count.rb | 23 +-
tests/test_connection_write.rb | 35 +++
tests/test_epoll.rb | 40 +--
tests/test_httpclient.rb | 43 ++++
tests/test_idle_connection.rb | 10 +-
tests/test_iterator.rb | 97 ++++++++
tests/test_kb.rb | 44 ++--
tests/test_many_fds.rb | 22 ++
tests/test_pause.rb | 29 +++
tests/test_pool.rb | 2 +
tests/test_process_watch.rb | 2 +
tests/test_processes.rb | 14 +-
tests/test_resolver.rb | 40 ++-
tests/test_ssl_args.rb | 4 +-
tests/test_ssl_methods.rb | 8 +-
tests/test_ssl_verify.rb | 124 +++++-----
tests/test_threaded_resource.rb | 8 +
58 files changed, 1285 insertions(+), 583 deletions(-)
diff --git a/.travis.yml b/.travis.yml
index 6ec146a..bfe2217 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -1,12 +1,22 @@
-script: rake compile test
+script: bundle exec rake compile test
+env:
+ global:
+ - TESTOPTS=-v
language: ruby
+sudo: false
rvm:
- 1.8.7
+ - 1.9.2
- 1.9.3
- - rbx-18mode
- - rbx-19mode
+ - 2.0.0
+ - 2.1
+ - 2.2
+ - rbx
- jruby
matrix:
allow_failures:
- - rvm: rbx-18mode
- - rvm: rbx-19mode
+ - rvm: rbx
+ - rvm: jruby
+ include:
+ - rvm: 2.0.0
+ os: osx
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 48d0a9c..9ed8b94 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,6 +1,57 @@
# Changelog
-## 1.0.x
+## 1.0.7 (February 10, 2015)
+* fix delay in kqueue/epoll reactor shutdown when timers exist [#587]
+* fix memory leak introduced in v1.0.5 [#586]
+* expose EM.set_simultaneous_accept_count [#420]
+* fix busy loop when EM.run and EM.next_tick are invoked from exception handler [#452]
+
+## 1.0.6 (February 3, 2015)
+* add support for Rubinius Process::Status [#568]
+* small bugfixes for SmtpServer [#449]
+* update buftok.rb [#547]
+* fix assertion on Write() [#525]
+* work around mkmf.rb bug preventing gem installation [#574]
+* add pause/resume support to jruby reactor [#556]
+* fix pure ruby reactor to use 127.0.0.1 instead of localhost [#439]
+* fix compilation under macruby [#243]
+* add chunked encoding to http client [#111]
+* fix errors on win32 when dealing with pipes [1ea45498] [#105]
+
+## 1.0.5 (February 2, 2015)
+* use monotonic clocks on Linux, OS X, Solaris, and Windows [#563]
+* use the rb_fd_* API to get autosized fd_sets [#502]
+* add basic tests that the DNS resolver isn't leaking timers [#571]
+* update to test-unit 2.x and improve various unit tests [#551]
+* remove EventMachine_t::Popen code marked by ifdef OBSOLETE [#551]
+* ruby 2.0 may fail at Queue.pop, so rescue and complain to $stderr [#551]
+* set file handle to INVALID_HANDLE_VALUE after closing the file [#565]
+* use `defined?` instead of rescuing NameError for flow control [#535]
+* fix closing files and sockets on Windows [#564]
+* fix file uploads in Windows [#562]
+* catch failure to fork [#539]
+* use chunks for SSL write [#545]
+
+## 1.0.4 (December 19, 2014)
+* add starttls_options to smtp server [#552]
+* fix closesocket on windows [#497]
+* fix build on ruby 2.2 [#503]
+* fix build error on ruby 1.9 [#508]
+* fix timer leak during dns resolution [#489]
+* add concurrency validation to EM::Iterator [#468]
+* add get_file_descriptor to get fd for a signature [#467]
+* add EM.attach_server and EM.attach_socket_server [#465, #466]
+* calling pause from receive_data takes effect immediately [#464]
+* reactor_running? returns false after fork [#455]
+* fix infinite loop on double close [edc4d0e6, #441, #445]
+* fix compilation issue on llvm [#433]
+* fix socket error codes on win32 [ff811a81]
+* fix EM.stop latency when timers exist [8b613d05, #426]
+* fix infinite loop when system time changes [1427a2c80, #428]
+* fix crash when callin attach/detach in the same tick [#427]
+* fix compilation issue on solaris [#416]
+
+## 1.0.3 (March 8, 2013)
* EM.system was broken in 1.0.2 release [#413]
## 1.0.2 (March 8, 2013)
diff --git a/README.md b/README.md
index 0138f74..b5321d9 100644
--- a/README.md
+++ b/README.md
@@ -1,4 +1,4 @@
-# About EventMachine #
+# About EventMachine [![Code Climate](https://codeclimate.com/github/eventmachine/eventmachine.png)](https://codeclimate.com/github/eventmachine/eventmachine)
## What is EventMachine ##
@@ -75,7 +75,7 @@ Here's a fully-functional echo server written with EventMachine:
def unbind
puts "-- someone disconnected from the echo server!"
- end
+ end
end
# Note that this will block current thread.
@@ -86,7 +86,7 @@ Here's a fully-functional echo server written with EventMachine:
## EventMachine documentation ##
-Currently we only have [reference documentation](http://eventmachine.rubyforge.org) and a [wiki](https://github.com/eventmachine/eventmachine/wiki).
+Currently we only have [reference documentation](http://rdoc.info/github/eventmachine/eventmachine/frames) and a [wiki](https://github.com/eventmachine/eventmachine/wiki).
## Community and where to get help ##
@@ -105,5 +105,4 @@ Copyright: (C) 2006-07 by Francis Cianfrocca. All Rights Reserved.
## Alternatives ##
-If you are unhappy with EventMachine and want to use Ruby, check out [Cool.io](http://coolio.github.com/).
-One caveat: by May 2011, it did not support JRuby and Windows.
+If you are unhappy with EventMachine and want to use Ruby, check out [Celluloid](https://celluloid.io/).
diff --git a/eventmachine.gemspec b/eventmachine.gemspec
index 3cb0862..3009377 100644
--- a/eventmachine.gemspec
+++ b/eventmachine.gemspec
@@ -8,6 +8,7 @@ Gem::Specification.new do |s|
s.version = EventMachine::VERSION
s.homepage = 'http://rubyeventmachine.com'
s.rubyforge_project = 'eventmachine'
+ s.licenses = ["Ruby", "GPL"]
s.authors = ["Francis Cianfrocca", "Aman Gupta"]
s.email = ["garbagecat10 at gmail.com", "aman at tmm1.net"]
@@ -15,6 +16,7 @@ Gem::Specification.new do |s|
s.files = `git ls-files`.split("\n")
s.extensions = ["ext/extconf.rb", "ext/fastfilereader/extconf.rb"]
+ s.add_development_dependency 'test-unit', '~> 2.0'
s.add_development_dependency 'rake-compiler', '~> 0.8.3'
s.add_development_dependency 'yard', ">= 0.8.5.2"
s.add_development_dependency 'bluecloth' unless RUBY_PLATFORM =~ /java/
diff --git a/ext/binder.cpp b/ext/binder.cpp
index 1b7d172..f62db86 100644
--- a/ext/binder.cpp
+++ b/ext/binder.cpp
@@ -32,7 +32,7 @@ STATIC Bindable_t::CreateBinding
unsigned long Bindable_t::CreateBinding()
{
static unsigned long num = 0;
- while(BindingBag[++num]);
+ while(BindingBag[++num]) {}
return num;
}
diff --git a/ext/cmain.cpp b/ext/cmain.cpp
index b354d2d..e87127d 100644
--- a/ext/cmain.cpp
+++ b/ext/cmain.cpp
@@ -259,6 +259,7 @@ evma_num_close_scheduled
extern "C" int evma_num_close_scheduled ()
{
+ ensure_eventmachine("evma_num_close_scheduled");
return EventMachine->NumCloseScheduled;
}
@@ -282,6 +283,16 @@ extern "C" const unsigned long evma_create_unix_domain_server (const char *filen
return EventMachine->CreateUnixDomainServer (filename);
}
+/***********************
+evma_attach_sd
+************************/
+
+extern "C" const unsigned long evma_attach_sd (int sd)
+{
+ ensure_eventmachine("evma_attach_sd");
+ return EventMachine->AttachSD (sd);
+}
+
/*************************
evma_open_datagram_socket
*************************/
@@ -642,7 +653,6 @@ extern "C" int evma_get_max_timer_count()
return EventMachine_t::GetMaxTimerCount();
}
-
/************************
evma_set_max_timer_count
************************/
@@ -661,6 +671,21 @@ extern "C" void evma_set_max_timer_count (int ct)
}
/******************
+evma_get/set_simultaneous_accept_count
+******************/
+
+extern "C" void evma_set_simultaneous_accept_count (int count)
+{
+ EventMachine_t::SetSimultaneousAcceptCount(count);
+}
+
+extern "C" int evma_get_simultaneous_accept_count()
+{
+ return EventMachine_t::GetSimultaneousAcceptCount();
+}
+
+
+/******************
evma_setuid_string
******************/
@@ -751,8 +776,11 @@ extern "C" int evma_send_file_data_to_connection (const unsigned long binding, c
ensure_eventmachine("evma_send_file_data_to_connection");
+#if defined(OS_WIN32)
+ int Fd = open (filename, O_RDONLY|O_BINARY);
+#else
int Fd = open (filename, O_RDONLY);
-
+#endif
if (Fd < 0)
return errno;
// From here on, all early returns MUST close Fd.
@@ -774,7 +802,6 @@ extern "C" int evma_send_file_data_to_connection (const unsigned long binding, c
return -1;
}
-
r = read (Fd, data, filesize);
if (r != filesize) {
int e = errno;
diff --git a/ext/ed.cpp b/ext/ed.cpp
index 544d3a8..de98965 100644
--- a/ext/ed.cpp
+++ b/ext/ed.cpp
@@ -217,6 +217,8 @@ EventableDescriptor::ScheduleClose
void EventableDescriptor::ScheduleClose (bool after_writing)
{
+ if (IsCloseScheduled())
+ return;
MyEventMachine->NumCloseScheduled++;
// KEEP THIS SYNCHRONIZED WITH ::IsCloseScheduled.
if (after_writing)
@@ -564,11 +566,25 @@ int ConnectionDescriptor::SendOutboundData (const char *data, int length)
#ifdef WITH_SSL
if (SslBox) {
if (length > 0) {
- int w = SslBox->PutPlaintext (data, length);
- if (w < 0)
- ScheduleClose (false);
- else
- _DispatchCiphertext();
+ int writed = 0;
+ char *p = (char*)data;
+
+ while (writed < length) {
+ int to_write = SSLBOX_INPUT_CHUNKSIZE;
+ int remaining = length - writed;
+
+ if (remaining < SSLBOX_INPUT_CHUNKSIZE)
+ to_write = remaining;
+
+ int w = SslBox->PutPlaintext (p, to_write);
+ if (w < 0) {
+ ScheduleClose (false);
+ }else
+ _DispatchCiphertext();
+
+ p += to_write;
+ writed += to_write;
+ }
}
// TODO: What's the correct return value?
return 1; // That's a wild guess, almost certainly wrong.
@@ -600,7 +616,6 @@ int ConnectionDescriptor::_SendRawOutboundData (const char *data, int length)
if (IsCloseScheduled())
return 0;
-
// 25Mar10: Ignore 0 length packets as they are not meaningful in TCP (as opposed to UDP)
// and can cause the assert(nbytes>0) to fail when OutboundPages has a bunch of 0 length pages.
if (length == 0)
@@ -765,7 +780,11 @@ void ConnectionDescriptor::Read()
int r = read (sd, readbuffer, sizeof(readbuffer) - 1);
+#ifdef OS_WIN32
+ int e = WSAGetLastError();
+#else
int e = errno;
+#endif
//cerr << "<R:" << r << ">";
if (r > 0) {
@@ -779,6 +798,8 @@ void ConnectionDescriptor::Read()
// a security guard against buffer overflows.
readbuffer [r] = 0;
_DispatchInboundData (readbuffer, r);
+ if (bPaused)
+ break;
}
else if (r == 0) {
break;
@@ -981,11 +1002,7 @@ void ConnectionDescriptor::_WriteOutboundData()
// Max of 16 outbound pages at a time
if (iovcnt > 16) iovcnt = 16;
- #ifdef CC_SUNWspro
- struct iovec iov[16];
- #else
- struct iovec iov[ iovcnt ];
- #endif
+ iovec iov[16];
for(int i = 0; i < iovcnt; i++){
OutboundPage *op = &(OutboundPages[i]);
@@ -1032,7 +1049,11 @@ void ConnectionDescriptor::_WriteOutboundData()
#endif
bool err = false;
+#ifdef OS_WIN32
+ int e = WSAGetLastError();
+#else
int e = errno;
+#endif
if (bytes_written < 0) {
err = true;
bytes_written = 0;
@@ -1218,7 +1239,7 @@ void ConnectionDescriptor::_DispatchCiphertext()
assert (SslBox);
- char BigBuf [2048];
+ char BigBuf [SSLBOX_OUTPUT_CHUNKSIZE];
bool did_work;
do {
@@ -1410,8 +1431,9 @@ void AcceptorDescriptor::Read()
struct sockaddr_in pin;
socklen_t addrlen = sizeof (pin);
+ int accept_count = EventMachine_t::GetSimultaneousAcceptCount();
- for (int i=0; i < 10; i++) {
+ for (int i=0; i < accept_count; i++) {
int sd = accept (GetSocket(), (struct sockaddr*)&pin, &addrlen);
if (sd == INVALID_SOCKET) {
// This breaks the loop when we've accepted everything on the kernel queue,
@@ -1663,7 +1685,11 @@ void DatagramDescriptor::Write()
// The nasty cast to (char*) is needed because Windows is brain-dead.
int s = sendto (sd, (char*)op->Buffer, op->Length, 0, (struct sockaddr*)&(op->From), sizeof(op->From));
+#ifdef OS_WIN32
+ int e = WSAGetLastError();
+#else
int e = errno;
+#endif
OutboundDataSize -= op->Length;
op->Free();
diff --git a/ext/ed.h b/ext/ed.h
index d28b536..b5da365 100644
--- a/ext/ed.h
+++ b/ext/ed.h
@@ -69,14 +69,14 @@ class EventableDescriptor: public Bindable_t
virtual bool GetSubprocessPid (pid_t*) {return false;}
virtual void StartTls() {}
- virtual void SetTlsParms (const char *privkey_filename, const char *certchain_filename, bool verify_peer) {}
+ virtual void SetTlsParms (const char *, const char *, bool) {}
#ifdef WITH_SSL
virtual X509 *GetPeerCert() {return NULL;}
#endif
virtual uint64_t GetCommInactivityTimeout() {return 0;}
- virtual int SetCommInactivityTimeout (uint64_t value) {return 0;}
+ virtual int SetCommInactivityTimeout (uint64_t) {return 0;}
uint64_t GetPendingConnectTimeout();
int SetPendingConnectTimeout (uint64_t value);
uint64_t GetLastActivity() { return LastActivity; }
@@ -215,7 +215,7 @@ class ConnectionDescriptor: public EventableDescriptor
protected:
struct OutboundPage {
OutboundPage (const char *b, int l, int o=0): Buffer(b), Length(l), Offset(o) {}
- void Free() {if (Buffer) free ((char*)Buffer); }
+ void Free() {if (Buffer) free (const_cast<char*>(Buffer)); }
const char *Buffer;
int Length;
int Offset;
@@ -292,7 +292,7 @@ class DatagramDescriptor: public EventableDescriptor
protected:
struct OutboundPage {
OutboundPage (const char *b, int l, struct sockaddr_in f, int o=0): Buffer(b), Length(l), Offset(o), From(f) {}
- void Free() {if (Buffer) free ((char*)Buffer); }
+ void Free() {if (Buffer) free (const_cast<char*>(Buffer)); }
const char *Buffer;
int Length;
int Offset;
@@ -354,7 +354,7 @@ class PipeDescriptor: public EventableDescriptor
protected:
struct OutboundPage {
OutboundPage (const char *b, int l, int o=0): Buffer(b), Length(l), Offset(o) {}
- void Free() {if (Buffer) free ((char*)Buffer); }
+ void Free() {if (Buffer) free (const_cast<char*>(Buffer)); }
const char *Buffer;
int Length;
int Offset;
diff --git a/ext/em.cpp b/ext/em.cpp
index 670da31..3bcaa4a 100644
--- a/ext/em.cpp
+++ b/ext/em.cpp
@@ -27,6 +27,11 @@ See the file COPYING for complete licensing information.
*/
static unsigned int MaxOutstandingTimers = 100000;
+/* The number of accept() done at once in a single tick when the acceptor
+ * socket becomes readable.
+ */
+static unsigned int SimultaneousAcceptCount = 10;
+
/* Internal helper to convert strings to internet addresses. IPv6-aware.
* Not reentrant or threadsafe, optimized for speed.
@@ -61,6 +66,17 @@ void EventMachine_t::SetMaxTimerCount (int count)
MaxOutstandingTimers = count;
}
+int EventMachine_t::GetSimultaneousAcceptCount()
+{
+ return SimultaneousAcceptCount;
+}
+
+void EventMachine_t::SetSimultaneousAcceptCount (int count)
+{
+ if (count < 1)
+ count = 1;
+ SimultaneousAcceptCount = count;
+}
/******************************
@@ -68,12 +84,12 @@ EventMachine_t::EventMachine_t
******************************/
EventMachine_t::EventMachine_t (EMCallback event_callback):
+ NumCloseScheduled (0),
HeartbeatInterval(2000000),
EventCallback (event_callback),
NextHeartbeatTime (0),
LoopBreakerReader (-1),
LoopBreakerWriter (-1),
- NumCloseScheduled (0),
bTerminateSignalReceived (false),
bEpoll (false),
epfd (-1),
@@ -85,6 +101,11 @@ EventMachine_t::EventMachine_t (EMCallback event_callback):
Quantum.tv_sec = 0;
Quantum.tv_usec = 90000;
+ /* Initialize monotonic timekeeping on OS X before the first call to GetRealTime */
+ #ifdef OS_DARWIN
+ (void) mach_timebase_info(&mach_timebase);
+ #endif
+
// Make sure the current loop time is sane, in case we do any initializations of
// objects before we start running.
_UpdateTime();
@@ -101,6 +122,7 @@ EventMachine_t::EventMachine_t (EMCallback event_callback):
#endif
_InitializeLoopBreaker();
+ SelectData = new SelectData_t();
}
@@ -130,6 +152,8 @@ EventMachine_t::~EventMachine_t()
close (epfd);
if (kqfd != -1)
close (kqfd);
+
+ delete SelectData;
}
@@ -187,6 +211,12 @@ void EventMachine_t::ScheduleHalt()
* The answer is to call evma_stop_machine, which calls here, from a SIGINT handler.
*/
bTerminateSignalReceived = true;
+
+ /* Signal the loopbreaker so we break out of long-running select/epoll/kqueue and
+ * notice the halt boolean is set. Signalling the loopbreaker also uses a single
+ * signal-safe syscall.
+ */
+ SignalLoopBreaker();
}
@@ -352,16 +382,49 @@ void EventMachine_t::_UpdateTime()
EventMachine_t::GetRealTime
***************************/
+// Two great writeups of cross-platform monotonic time are at:
+// http://www.python.org/dev/peps/pep-0418
+// http://nadeausoftware.com/articles/2012/04/c_c_tip_how_measure_elapsed_real_time_benchmarking
+// Uncomment the #pragma messages to confirm which compile-time option was used
uint64_t EventMachine_t::GetRealTime()
{
uint64_t current_time;
- #if defined(OS_UNIX)
+ #if defined(HAVE_CONST_CLOCK_MONOTONIC_RAW)
+ // #pragma message "GetRealTime: clock_gettime CLOCK_MONOTONIC_RAW"
+ // Linux 2.6.28 and above
+ struct timespec tv;
+ clock_gettime (CLOCK_MONOTONIC_RAW, &tv);
+ current_time = (((uint64_t)(tv.tv_sec)) * 1000000LL) + ((uint64_t)((tv.tv_nsec)/1000));
+
+ #elif defined(HAVE_CONST_CLOCK_MONOTONIC)
+ // #pragma message "GetRealTime: clock_gettime CLOCK_MONOTONIC"
+ // Linux, FreeBSD 5.0 and above, Solaris 8 and above, OpenBSD, NetBSD, DragonflyBSD
+ struct timespec tv;
+ clock_gettime (CLOCK_MONOTONIC, &tv);
+ current_time = (((uint64_t)(tv.tv_sec)) * 1000000LL) + ((uint64_t)((tv.tv_nsec)/1000));
+
+ #elif defined(HAVE_GETHRTIME)
+ // #pragma message "GetRealTime: gethrtime"
+ // Solaris and HP-UX
+ current_time = (uint64_t)gethrtime() / 1000;
+
+ #elif defined(OS_DARWIN)
+ // #pragma message "GetRealTime: mach_absolute_time"
+ // Mac OS X
+ // https://developer.apple.com/library/mac/qa/qa1398/_index.html
+ current_time = mach_absolute_time() * mach_timebase.numer / mach_timebase.denom / 1000;
+
+ #elif defined(OS_UNIX)
+ // #pragma message "GetRealTime: gettimeofday"
+ // Unix fallback
struct timeval tv;
gettimeofday (&tv, NULL);
current_time = (((uint64_t)(tv.tv_sec)) * 1000000LL) + ((uint64_t)(tv.tv_usec));
#elif defined(OS_WIN32)
+ // #pragma message "GetRealTime: GetTickCount"
+ // Future improvement: use GetTickCount64 in Windows Vista / Server 2008
unsigned tick = GetTickCount();
if (tick < LastTickCount)
TickCountTickover += 1;
@@ -370,6 +433,8 @@ uint64_t EventMachine_t::GetRealTime()
current_time *= 1000; // convert to microseconds
#else
+ // #pragma message "GetRealTime: time"
+ // Universal fallback
current_time = (uint64_t)time(NULL) * 1000000LL;
#endif
@@ -382,15 +447,27 @@ EventMachine_t::_DispatchHeartbeats
void EventMachine_t::_DispatchHeartbeats()
{
+ // Store the first processed heartbeat descriptor and bail out if
+ // we see it again. This fixes an infinite loop in case the system time
+ // is changed out from underneath MyCurrentLoopTime.
+ const EventableDescriptor *head = NULL;
+
while (true) {
multimap<uint64_t,EventableDescriptor*>::iterator i = Heartbeats.begin();
if (i == Heartbeats.end())
break;
if (i->first > MyCurrentLoopTime)
break;
+
EventableDescriptor *ed = i->second;
+ if (ed == head)
+ break;
+
ed->Heartbeat();
QueueHeartbeat(ed);
+
+ if (head == NULL)
+ head = ed;
}
}
@@ -675,7 +752,7 @@ EventMachine_t::_TimeTilNextEvent
timeval EventMachine_t::_TimeTilNextEvent()
{
- // 29jul11: Changed calculation base from MyCurrentLoopTime to the
+ // 29jul11: Changed calculation base from MyCurrentLoopTime to the
// real time. As MyCurrentLoopTime is set at the beginning of an
// iteration and this calculation is done at the end, evenmachine
// will potentially oversleep by the amount of time the iteration
@@ -697,10 +774,12 @@ timeval EventMachine_t::_TimeTilNextEvent()
if (!NewDescriptors.empty() || !ModifiedDescriptors.empty()) {
next_event = current_time;
}
-
+
timeval tv;
- if (next_event == 0 || NumCloseScheduled > 0) {
+ if (NumCloseScheduled > 0 || bTerminateSignalReceived) {
+ tv.tv_sec = tv.tv_usec = 0;
+ } else if (next_event == 0) {
tv = Quantum;
} else {
if (next_event > current_time) {
@@ -792,22 +871,28 @@ SelectData_t::SelectData_t
SelectData_t::SelectData_t()
{
maxsocket = 0;
- FD_ZERO (&fdreads);
- FD_ZERO (&fdwrites);
- FD_ZERO (&fderrors);
+ rb_fd_init (&fdreads);
+ rb_fd_init (&fdwrites);
+ rb_fd_init (&fderrors);
}
+SelectData_t::~SelectData_t()
+{
+ rb_fd_term (&fdreads);
+ rb_fd_term (&fdwrites);
+ rb_fd_term (&fderrors);
+}
#ifdef BUILD_FOR_RUBY
/*****************
_SelectDataSelect
*****************/
-#ifdef HAVE_TBR
+#if defined(HAVE_TBR) || defined(HAVE_RB_THREAD_CALL_WITHOUT_GVL)
static VALUE _SelectDataSelect (void *v)
{
SelectData_t *sd = (SelectData_t*)v;
- sd->nSockets = select (sd->maxsocket+1, &(sd->fdreads), &(sd->fdwrites), &(sd->fderrors), &(sd->tv));
+ sd->nSockets = select (sd->maxsocket+1, rb_fd_ptr(&(sd->fdreads)), rb_fd_ptr(&(sd->fdwrites)), rb_fd_ptr(&(sd->fderrors)), &(sd->tv));
return Qnil;
}
#endif
@@ -818,18 +903,27 @@ SelectData_t::_Select
int SelectData_t::_Select()
{
- #ifdef HAVE_TBR
+ #if defined(HAVE_RB_THREAD_CALL_WITHOUT_GVL)
+ // added in ruby 1.9.3
+ rb_thread_call_without_gvl ((void *(*)(void *))_SelectDataSelect, (void*)this, RUBY_UBF_IO, 0);
+ return nSockets;
+ #elif defined(HAVE_TBR)
+ // added in ruby 1.9.1, deprecated in ruby 2.0.0
rb_thread_blocking_region (_SelectDataSelect, (void*)this, RUBY_UBF_IO, 0);
return nSockets;
- #endif
-
- #ifndef HAVE_TBR
+ #else
return EmSelect (maxsocket+1, &fdreads, &fdwrites, &fderrors, &tv);
#endif
}
#endif
-
+void SelectData_t::_Clear()
+{
+ maxsocket = 0;
+ rb_fd_zero (&fdreads);
+ rb_fd_zero (&fdwrites);
+ rb_fd_zero (&fderrors);
+}
/******************************
EventMachine_t::_RunSelectOnce
@@ -846,23 +940,17 @@ void EventMachine_t::_RunSelectOnce()
// however it has the same problem interoperating with Ruby
// threads that select does.
- SelectData_t SelectData;
- /*
- fd_set fdreads, fdwrites;
- FD_ZERO (&fdreads);
- FD_ZERO (&fdwrites);
-
- int maxsocket = 0;
- */
+ // Get ready for select()
+ SelectData->_Clear();
// Always read the loop-breaker reader.
// Changed 23Aug06, provisionally implemented for Windows with a UDP socket
// running on localhost with a randomly-chosen port. (*Puke*)
// Windows has a version of the Unix pipe() library function, but it doesn't
// give you back descriptors that are selectable.
- FD_SET (LoopBreakerReader, &(SelectData.fdreads));
- if (SelectData.maxsocket < LoopBreakerReader)
- SelectData.maxsocket = LoopBreakerReader;
+ rb_fd_set (LoopBreakerReader, &(SelectData->fdreads));
+ if (SelectData->maxsocket < LoopBreakerReader)
+ SelectData->maxsocket = LoopBreakerReader;
// prepare the sockets for reading and writing
size_t i;
@@ -875,27 +963,28 @@ void EventMachine_t::_RunSelectOnce()
assert (sd != INVALID_SOCKET);
if (ed->SelectForRead())
- FD_SET (sd, &(SelectData.fdreads));
+ rb_fd_set (sd, &(SelectData->fdreads));
if (ed->SelectForWrite())
- FD_SET (sd, &(SelectData.fdwrites));
+ rb_fd_set (sd, &(SelectData->fdwrites));
#ifdef OS_WIN32
/* 21Sep09: on windows, a non-blocking connect() that fails does not come up as writable.
Instead, it is added to the error set. See http://www.mail-archive.com/openssl-users@openssl.org/msg58500.html
*/
- FD_SET (sd, &(SelectData.fderrors));
+ if (ed->IsConnectPending())
+ rb_fd_set (sd, &(SelectData->fderrors));
#endif
- if (SelectData.maxsocket < sd)
- SelectData.maxsocket = sd;
+ if (SelectData->maxsocket < sd)
+ SelectData->maxsocket = sd;
}
{ // read and write the sockets
//timeval tv = {1, 0}; // Solaris fails if the microseconds member is >= 1000000.
//timeval tv = Quantum;
- SelectData.tv = _TimeTilNextEvent();
- int s = SelectData._Select();
+ SelectData->tv = _TimeTilNextEvent();
+ int s = SelectData->_Select();
//rb_thread_blocking_region(xxx,(void*)&SelectData,RUBY_UBF_IO,0);
//int s = EmSelect (SelectData.maxsocket+1, &(SelectData.fdreads), &(SelectData.fdwrites), NULL, &(SelectData.tv));
//int s = SelectData.nSockets;
@@ -918,15 +1007,19 @@ void EventMachine_t::_RunSelectOnce()
continue;
assert (sd != INVALID_SOCKET);
- if (FD_ISSET (sd, &(SelectData.fdwrites)))
- ed->Write();
- if (FD_ISSET (sd, &(SelectData.fdreads)))
+ if (rb_fd_isset (sd, &(SelectData->fdwrites))) {
+ // Double-check SelectForWrite() still returns true. If not, one of the callbacks must have
+ // modified some value since we checked SelectForWrite() earlier in this method.
+ if (ed->SelectForWrite())
+ ed->Write();
+ }
+ if (rb_fd_isset (sd, &(SelectData->fdreads)))
ed->Read();
- if (FD_ISSET (sd, &(SelectData.fderrors)))
+ if (rb_fd_isset (sd, &(SelectData->fderrors)))
ed->HandleError();
}
- if (FD_ISSET (LoopBreakerReader, &(SelectData.fdreads)))
+ if (rb_fd_isset (LoopBreakerReader, &(SelectData->fdreads)))
_ReadLoopBreaker();
}
else if (s < 0) {
@@ -964,11 +1057,12 @@ void EventMachine_t::_CleanBadDescriptors()
tv.tv_sec = 0;
tv.tv_usec = 0;
- fd_set fds;
- FD_ZERO(&fds);
- FD_SET(sd, &fds);
+ rb_fdset_t fds;
+ rb_fd_init(&fds);
+ rb_fd_set(sd, &fds);
- int ret = select(sd + 1, &fds, NULL, NULL, &tv);
+ int ret = rb_fd_select(sd + 1, &fds, NULL, NULL, &tv);
+ rb_fd_term(&fds);
if (ret == -1) {
if (errno == EBADF)
@@ -1399,6 +1493,14 @@ int EventMachine_t::DetachFD (EventableDescriptor *ed)
// Prevent the descriptor from being modified, in case DetachFD was called from a timer or next_tick
ModifiedDescriptors.erase (ed);
+ // Prevent the descriptor from being added, in case DetachFD was called in the same tick as AttachFD
+ for (size_t i = 0; i < NewDescriptors.size(); i++) {
+ if (ed == NewDescriptors[i]) {
+ NewDescriptors.erase(NewDescriptors.begin() + i);
+ break;
+ }
+ }
+
// Set MySocket = INVALID_SOCKET so ShouldDelete() is true (and the descriptor gets deleted and removed),
// and also to prevent anyone from calling close() on the detached fd
ed->SetSocketInvalid();
@@ -1491,8 +1593,6 @@ const unsigned long EventMachine_t::CreateTcpServer (const char *server, int por
if (!bind_here)
return 0;
- unsigned long output_binding = 0;
-
//struct sockaddr_in sin;
int sd_accept = socket (family, SOCK_STREAM, 0);
@@ -1529,25 +1629,7 @@ const unsigned long EventMachine_t::CreateTcpServer (const char *server, int por
goto fail;
}
- {
- // Set the acceptor non-blocking.
- // THIS IS CRUCIALLY IMPORTANT because we read it in a select loop.
- if (!SetSocketNonblocking (sd_accept)) {
- //int val = fcntl (sd_accept, F_GETFL, 0);
- //if (fcntl (sd_accept, F_SETFL, val | O_NONBLOCK) == -1) {
- goto fail;
- }
- }
-
- { // Looking good.
- AcceptorDescriptor *ad = new AcceptorDescriptor (sd_accept, this);
- if (!ad)
- throw std::runtime_error ("unable to allocate acceptor");
- Add (ad);
- output_binding = ad->GetBinding();
- }
-
- return output_binding;
+ return AttachSD(sd_accept);
fail:
if (sd_accept != INVALID_SOCKET)
@@ -1838,7 +1920,6 @@ const unsigned long EventMachine_t::CreateUnixDomainServer (const char *filename
// The whole rest of this function is only compiled on Unix systems.
#ifdef OS_UNIX
- unsigned long output_binding = 0;
struct sockaddr_un s_sun;
@@ -1876,6 +1957,24 @@ const unsigned long EventMachine_t::CreateUnixDomainServer (const char *filename
goto fail;
}
+ return AttachSD(sd_accept);
+
+ fail:
+ if (sd_accept != INVALID_SOCKET)
+ close (sd_accept);
+ return 0;
+ #endif // OS_UNIX
+}
+
+
+/**************************************
+EventMachine_t::AttachSD
+**************************************/
+
+const unsigned long EventMachine_t::AttachSD (int sd_accept)
+{
+ unsigned long output_binding = 0;
+
{
// Set the acceptor non-blocking.
// THIS IS CRUCIALLY IMPORTANT because we read it in a select loop.
@@ -1900,51 +1999,9 @@ const unsigned long EventMachine_t::CreateUnixDomainServer (const char *filename
if (sd_accept != INVALID_SOCKET)
close (sd_accept);
return 0;
- #endif // OS_UNIX
}
-/*********************
-EventMachine_t::Popen
-*********************/
-#if OBSOLETE
-const char *EventMachine_t::Popen (const char *cmd, const char *mode)
-{
- #ifdef OS_WIN32
- throw std::runtime_error ("popen is currently unavailable on this platform");
- #endif
-
- // The whole rest of this function is only compiled on Unix systems.
- // Eventually we need this functionality (or a full-duplex equivalent) on Windows.
- #ifdef OS_UNIX
- const char *output_binding = NULL;
-
- FILE *fp = popen (cmd, mode);
- if (!fp)
- return NULL;
-
- // From here, all early returns must pclose the stream.
-
- // According to the pipe(2) manpage, descriptors returned from pipe have both
- // CLOEXEC and NONBLOCK clear. Do NOT set CLOEXEC. DO set nonblocking.
- if (!SetSocketNonblocking (fileno (fp))) {
- pclose (fp);
- return NULL;
- }
-
- { // Looking good.
- PipeDescriptor *pd = new PipeDescriptor (fp, this);
- if (!pd)
- throw std::runtime_error ("unable to allocate pipe");
- Add (pd);
- output_binding = pd->GetBinding();
- }
-
- return output_binding;
- #endif
-}
-#endif // OBSOLETE
-
/**************************
EventMachine_t::Socketpair
**************************/
@@ -2330,4 +2387,3 @@ int EventMachine_t::SetHeartbeatInterval(float interval)
return 0;
}
//#endif // OS_UNIX
-
diff --git a/ext/em.h b/ext/em.h
index df68ec1..e0a558b 100644
--- a/ext/em.h
+++ b/ext/em.h
@@ -22,7 +22,16 @@ See the file COPYING for complete licensing information.
#ifdef BUILD_FOR_RUBY
#include <ruby.h>
- #define EmSelect rb_thread_select
+ #ifdef HAVE_RB_THREAD_FD_SELECT
+ #define EmSelect rb_thread_fd_select
+ #else
+ // ruby 1.9.1 and below
+ #define EmSelect rb_thread_select
+ #endif
+
+ #ifdef HAVE_RB_THREAD_CALL_WITHOUT_GVL
+ #include <ruby/thread.h>
+ #endif
#ifdef HAVE_RB_WAIT_FOR_SINGLE_FD
#include <ruby/io.h>
@@ -60,9 +69,33 @@ See the file COPYING for complete licensing information.
#define EmSelect select
#endif
+#if !defined(HAVE_RB_FDSET_T)
+#define fd_check(n) (((n) < FD_SETSIZE) ? 1 : 0*fprintf(stderr, "fd %d too large for select\n", (n)))
+// These definitions are cribbed from include/ruby/intern.h in Ruby 1.9.3,
+// with this change: any macros that read or write the nth element of an
+// fdset first call fd_check to make sure n is in bounds.
+typedef fd_set rb_fdset_t;
+#define rb_fd_zero(f) FD_ZERO(f)
+#define rb_fd_set(n, f) do { if (fd_check(n)) FD_SET((n), (f)); } while(0)
+#define rb_fd_clr(n, f) do { if (fd_check(n)) FD_CLR((n), (f)); } while(0)
+#define rb_fd_isset(n, f) (fd_check(n) ? FD_ISSET((n), (f)) : 0)
+#define rb_fd_copy(d, s, n) (*(d) = *(s))
+#define rb_fd_dup(d, s) (*(d) = *(s))
+#define rb_fd_resize(n, f) ((void)(f))
+#define rb_fd_ptr(f) (f)
+#define rb_fd_init(f) FD_ZERO(f)
+#define rb_fd_init_copy(d, s) (*(d) = *(s))
+#define rb_fd_term(f) ((void)(f))
+#define rb_fd_max(f) FD_SETSIZE
+#define rb_fd_select(n, rfds, wfds, efds, timeout) \
+ select(fd_check((n)-1) ? (n) : FD_SETSIZE, (rfds), (wfds), (efds), (timeout))
+#define rb_thread_fd_select(n, rfds, wfds, efds, timeout) \
+ rb_thread_select(fd_check((n)-1) ? (n) : FD_SETSIZE, (rfds), (wfds), (efds), (timeout))
+#endif
+
class EventableDescriptor;
class InotifyDescriptor;
-
+struct SelectData_t;
/********************
class EventMachine_t
@@ -74,6 +107,9 @@ class EventMachine_t
static int GetMaxTimerCount();
static void SetMaxTimerCount (int);
+ static int GetSimultaneousAcceptCount();
+ static void SetSimultaneousAcceptCount (int);
+
public:
EventMachine_t (EMCallback);
virtual ~EventMachine_t();
@@ -88,6 +124,7 @@ class EventMachine_t
const unsigned long CreateTcpServer (const char *, int);
const unsigned long OpenDatagramSocket (const char *, int);
const unsigned long CreateUnixDomainServer (const char*);
+ const unsigned long AttachSD (int);
const unsigned long OpenKeyboard();
//const char *Popen (const char*, const char*);
const unsigned long Socketpair (char* const*);
@@ -165,7 +202,7 @@ class EventMachine_t
public:
void _ReadLoopBreaker();
void _ReadInotifyEvents();
- int NumCloseScheduled;
+ int NumCloseScheduled;
private:
enum {
@@ -203,8 +240,13 @@ class EventMachine_t
unsigned LastTickCount;
#endif
+ #ifdef OS_DARWIN
+ mach_timebase_info_data_t mach_timebase;
+ #endif
+
private:
bool bTerminateSignalReceived;
+ SelectData_t *SelectData;
bool bEpoll;
int epfd; // Epoll file-descriptor
@@ -229,13 +271,15 @@ struct SelectData_t
struct SelectData_t
{
SelectData_t();
+ ~SelectData_t();
int _Select();
+ void _Clear();
int maxsocket;
- fd_set fdreads;
- fd_set fdwrites;
- fd_set fderrors;
+ rb_fdset_t fdreads;
+ rb_fdset_t fdwrites;
+ rb_fdset_t fderrors;
timeval tv;
int nSockets;
};
diff --git a/ext/eventmachine.h b/ext/eventmachine.h
index 2dd8b1b..985b993 100644
--- a/ext/eventmachine.h
+++ b/ext/eventmachine.h
@@ -64,6 +64,7 @@ extern "C" {
void evma_stop_tcp_server (const unsigned long signature);
const unsigned long evma_create_tcp_server (const char *address, int port);
const unsigned long evma_create_unix_domain_server (const char *filename);
+ const unsigned long evma_attach_sd (int sd);
const unsigned long evma_open_datagram_socket (const char *server, int port);
const unsigned long evma_open_keyboard();
void evma_set_tls_parms (const unsigned long binding, const char *privatekey_filename, const char *certchain_filenane, int verify_peer);
@@ -95,6 +96,8 @@ extern "C" {
void evma_set_timer_quantum (int);
int evma_get_max_timer_count();
void evma_set_max_timer_count (int);
+ int evma_get_simultaneous_accept_count();
+ void evma_set_simultaneous_accept_count (int);
void evma_setuid_string (const char *username);
void evma_stop_machine();
float evma_get_heartbeat_interval();
diff --git a/ext/extconf.rb b/ext/extconf.rb
index 448802a..c21cdf1 100644
--- a/ext/extconf.rb
+++ b/ext/extconf.rb
@@ -39,10 +39,13 @@ def manual_ssl_config
check_libs(libs) and check_heads(heads)
end
+# Eager check devs tools
+have_devel? if respond_to?(:have_devel?)
+
if ENV['CROSS_COMPILING']
- openssl_version = ENV.fetch("OPENSSL_VERSION", "1.0.0j")
+ openssl_version = ENV.fetch("OPENSSL_VERSION", "1.0.1i")
openssl_dir = File.expand_path("~/.rake-compiler/builds/openssl-#{openssl_version}/")
- if File.exists?(openssl_dir)
+ if File.exist?(openssl_dir)
FileUtils.mkdir_p Dir.pwd+"/openssl/"
FileUtils.cp Dir[openssl_dir+"/include/openssl/*.h"], Dir.pwd+"/openssl/", :verbose => true
FileUtils.cp Dir[openssl_dir+"/lib*.a"], Dir.pwd, :verbose => true
@@ -67,9 +70,12 @@ end
add_define 'BUILD_FOR_RUBY'
add_define 'HAVE_RBTRAP' if have_var('rb_trap_immediate', ['ruby.h', 'rubysig.h'])
add_define "HAVE_TBR" if have_func('rb_thread_blocking_region')# and have_macro('RUBY_UBF_IO', 'ruby.h')
+add_define "HAVE_RB_THREAD_CALL_WITHOUT_GVL" if have_header('ruby/thread.h') && have_func('rb_thread_call_without_gvl', 'ruby/thread.h')
add_define "HAVE_INOTIFY" if inotify = have_func('inotify_init', 'sys/inotify.h')
add_define "HAVE_OLD_INOTIFY" if !inotify && have_macro('__NR_inotify_init', 'sys/syscall.h')
add_define 'HAVE_WRITEV' if have_func('writev', 'sys/uio.h')
+add_define 'HAVE_RB_THREAD_FD_SELECT' if have_func('rb_thread_fd_select')
+add_define 'HAVE_RB_FDSET_T' if have_type('rb_fdset_t', 'ruby/intern.h')
have_func('rb_wait_for_single_fd')
have_func('rb_enable_interrupt')
@@ -106,7 +112,7 @@ when /mswin32/, /mingw32/, /bccwin32/
check_libs(%w[kernel32 rpcrt4 gdi32], true)
if GNU_CHAIN
- CONFIG['LDSHARED'] = "$(CXX) -shared -lstdc++"
+ CONFIG['LDSHAREDXX'] = "$(CXX) -shared -static-libgcc -static-libstdc++"
else
$defs.push "-EHs"
$defs.push "-GR"
@@ -137,6 +143,8 @@ when /openbsd/
CONFIG['LDSHAREDXX'] = "$(CXX) -shared -lstdc++ -fPIC"
when /darwin/
+ add_define 'OS_DARWIN'
+
# on Unix we need a g++ link, not gcc.
# Ff line contributed by Daniel Harple.
CONFIG['LDSHARED'] = "$(CXX) " + CONFIG['LDSHARED'].split[1..-1].join(' ')
@@ -164,13 +172,21 @@ else
CONFIG['LDSHARED'] = "$(CXX) -shared"
end
+# Platform-specific time functions
+if have_func('clock_gettime')
+ # clock_gettime is POSIX, but the monotonic clocks are not
+ have_const('CLOCK_MONOTONIC_RAW', 'time.h') # Linux
+ have_const('CLOCK_MONOTONIC', 'time.h') # Linux, Solaris, BSDs
+else
+ have_func('gethrtime') # Older Solaris and HP-UX
+end
# solaris c++ compiler doesn't have make_pair()
TRY_LINK.sub!('$(CC)', '$(CXX)')
add_define 'HAVE_MAKE_PAIR' if try_link(<<SRC, '-lstdc++')
#include <utility>
using namespace std;
- int main(){ pair<int,int> tuple = make_pair(1,2); }
+ int main(){ pair<const int,int> tuple = make_pair(1,2); }
SRC
TRY_LINK.sub!('$(CXX)', '$(CC)')
diff --git a/ext/fastfilereader/extconf.rb b/ext/fastfilereader/extconf.rb
index c618930..c6d163f 100644
--- a/ext/fastfilereader/extconf.rb
+++ b/ext/fastfilereader/extconf.rb
@@ -12,6 +12,9 @@ def add_define(name)
$defs.push("-D#{name}")
end
+# Eager check devs tools
+have_devel? if respond_to?(:have_devel?)
+
add_define 'BUILD_FOR_RUBY'
# Minor platform details between *nix and Windows:
@@ -43,7 +46,7 @@ when /mswin32/, /mingw32/, /bccwin32/
check_libs(%w[kernel32 rpcrt4 gdi32], true)
if GNU_CHAIN
- CONFIG['LDSHARED'] = "$(CXX) -shared -lstdc++"
+ CONFIG['LDSHAREDXX'] = "$(CXX) -shared -static-libgcc -static-libstdc++"
else
$defs.push "-EHs"
$defs.push "-GR"
diff --git a/ext/fastfilereader/mapper.cpp b/ext/fastfilereader/mapper.cpp
index f9a913a..949103d 100644
--- a/ext/fastfilereader/mapper.cpp
+++ b/ext/fastfilereader/mapper.cpp
@@ -195,7 +195,7 @@ void Mapper_t::Close()
}
if (hFile != INVALID_HANDLE_VALUE) {
CloseHandle (hFile);
- hMapping = INVALID_HANDLE_VALUE;
+ hFile = INVALID_HANDLE_VALUE;
}
}
diff --git a/ext/project.h b/ext/project.h
index 2e54bfd..abcfea9 100644
--- a/ext/project.h
+++ b/ext/project.h
@@ -78,6 +78,11 @@ typedef int SOCKET;
#endif
#endif /* _AIX */
+#ifdef OS_DARWIN
+#include <mach/mach.h>
+#include <mach/mach_time.h>
+#endif /* OS_DARWIN */
+
#endif /* OS_UNIX */
#ifdef OS_WIN32
@@ -85,7 +90,9 @@ typedef int SOCKET;
// 18Jun12: fd_setsize must be changed in the ruby binary (not in this extension). redefining it also causes segvs, see eventmachine/eventmachine#333
//#define FD_SETSIZE 1024
+// WIN32_LEAN_AND_MEAN excludes APIs such as Cryptography, DDE, RPC, Shell, and Windows Sockets.
#define WIN32_LEAN_AND_MEAN
+
#include <windows.h>
#include <winsock2.h>
#include <ws2tcpip.h>
@@ -93,9 +100,11 @@ typedef int SOCKET;
#include <fcntl.h>
#include <assert.h>
-typedef int socklen_t;
-typedef int pid_t;
-#endif
+// Use the Win32 wrapper library that Ruby owns to be able to close sockets with the close() function
+#define RUBY_EXPORT
+#include <ruby/defines.h>
+#include <ruby/win32.h>
+#endif /* OS_WIN32 */
#if !defined(_MSC_VER) || _MSC_VER > 1500
#include <stdint.h>
diff --git a/ext/rubymain.cpp b/ext/rubymain.cpp
index 12c3449..a7e37a6 100644
--- a/ext/rubymain.cpp
+++ b/ext/rubymain.cpp
@@ -273,6 +273,17 @@ static VALUE t_start_unix_server (VALUE self, VALUE filename)
return ULONG2NUM (f);
}
+/********************
+t_attach_sd
+********************/
+
+static VALUE t_attach_sd(VALUE self, VALUE sd)
+{
+ const unsigned long f = evma_attach_sd(FIX2INT(sd));
+ if (!f)
+ rb_raise (rb_eRuntimeError, "%s", "no socket descriptor acceptor");
+ return ULONG2NUM (f);
+}
/***********
@@ -397,8 +408,22 @@ static VALUE t_get_subprocess_status (VALUE self, VALUE signature)
if (evma_get_subprocess_status (NUM2ULONG (signature), &status)) {
if (evma_get_subprocess_pid (NUM2ULONG (signature), &pid)) {
proc_status = rb_obj_alloc(rb_cProcStatus);
+
+ /* MRI Ruby uses hidden instance vars */
rb_iv_set(proc_status, "status", INT2FIX(status));
rb_iv_set(proc_status, "pid", INT2FIX(pid));
+
+#ifdef RUBINIUS
+ /* Rubinius uses standard instance vars */
+ rb_iv_set(proc_status, "@pid", INT2FIX(pid));
+ if (WIFEXITED(status)) {
+ rb_iv_set(proc_status, "@status", INT2FIX(WEXITSTATUS(status)));
+ } else if(WIFSIGNALED(status)) {
+ rb_iv_set(proc_status, "@termsig", INT2FIX(WTERMSIG(status)));
+ } else if(WIFSTOPPED(status)){
+ rb_iv_set(proc_status, "@stopsig", INT2FIX(WSTOPSIG(status)));
+ }
+#endif
}
}
@@ -566,6 +591,14 @@ static VALUE t_detach_fd (VALUE self, VALUE signature)
return INT2NUM(evma_detach_fd (NUM2ULONG (signature)));
}
+/*********************
+t_get_file_descriptor
+*********************/
+static VALUE t_get_file_descriptor (VALUE self, VALUE signature)
+{
+ return INT2NUM(evma_get_file_descriptor (NUM2ULONG (signature)));
+}
+
/**************
t_get_sock_opt
**************/
@@ -592,7 +625,7 @@ static VALUE t_set_sock_opt (VALUE self, VALUE signature, VALUE lev, VALUE optna
int fd = evma_get_file_descriptor (NUM2ULONG (signature));
int level = NUM2INT(lev), option = NUM2INT(optname);
int i;
- void *v;
+ const void *v;
socklen_t len;
switch (TYPE(optval)) {
@@ -780,6 +813,21 @@ static VALUE t_set_max_timer_count (VALUE self, VALUE ct)
return Qnil;
}
+/********************
+t_get/set_simultaneous_accept_count
+********************/
+
+static VALUE t_get_simultaneous_accept_count (VALUE self)
+{
+ return INT2FIX (evma_get_simultaneous_accept_count());
+}
+
+static VALUE t_set_simultaneous_accept_count (VALUE self, VALUE ct)
+{
+ evma_set_simultaneous_accept_count (FIX2INT (ct));
+ return Qnil;
+}
+
/***************
t_setuid_string
***************/
@@ -814,7 +862,12 @@ static VALUE t_invoke_popen (VALUE self, VALUE cmd)
}
strings[len] = NULL;
- const unsigned long f = evma_popen (strings);
+ unsigned long f = 0;
+ try {
+ f = evma_popen (strings);
+ } catch (std::runtime_error e) {
+ f = 0; // raise exception below
+ }
if (!f) {
char *err = strerror (errno);
char buf[100];
@@ -1204,6 +1257,7 @@ extern "C" void Init_rubyeventmachine()
rb_define_module_function (EmModule, "start_tcp_server", (VALUE(*)(...))t_start_server, 2);
rb_define_module_function (EmModule, "stop_tcp_server", (VALUE(*)(...))t_stop_server, 1);
rb_define_module_function (EmModule, "start_unix_server", (VALUE(*)(...))t_start_unix_server, 1);
+ rb_define_module_function (EmModule, "attach_sd", (VALUE(*)(...))t_attach_sd, 1);
rb_define_module_function (EmModule, "set_tls_parms", (VALUE(*)(...))t_set_tls_parms, 4);
rb_define_module_function (EmModule, "start_tls", (VALUE(*)(...))t_start_tls, 1);
rb_define_module_function (EmModule, "get_peer_cert", (VALUE(*)(...))t_get_peer_cert, 1);
@@ -1217,6 +1271,7 @@ extern "C" void Init_rubyeventmachine()
rb_define_module_function (EmModule, "attach_fd", (VALUE (*)(...))t_attach_fd, 2);
rb_define_module_function (EmModule, "detach_fd", (VALUE (*)(...))t_detach_fd, 1);
+ rb_define_module_function (EmModule, "get_file_descriptor", (VALUE (*)(...))t_get_file_descriptor, 1);
rb_define_module_function (EmModule, "get_sock_opt", (VALUE (*)(...))t_get_sock_opt, 3);
rb_define_module_function (EmModule, "set_sock_opt", (VALUE (*)(...))t_set_sock_opt, 4);
rb_define_module_function (EmModule, "set_notify_readable", (VALUE (*)(...))t_set_notify_readable, 2);
@@ -1250,6 +1305,8 @@ extern "C" void Init_rubyeventmachine()
rb_define_module_function (EmModule, "set_timer_quantum", (VALUE(*)(...))t_set_timer_quantum, 1);
rb_define_module_function (EmModule, "get_max_timer_count", (VALUE(*)(...))t_get_max_timer_count, 0);
rb_define_module_function (EmModule, "set_max_timer_count", (VALUE(*)(...))t_set_max_timer_count, 1);
+ rb_define_module_function (EmModule, "get_simultaneous_accept_count", (VALUE(*)(...))t_get_simultaneous_accept_count, 0);
+ rb_define_module_function (EmModule, "set_simultaneous_accept_count", (VALUE(*)(...))t_set_simultaneous_accept_count, 1);
rb_define_module_function (EmModule, "setuid_string", (VALUE(*)(...))t_setuid_string, 1);
rb_define_module_function (EmModule, "invoke_popen", (VALUE(*)(...))t_invoke_popen, 1);
rb_define_module_function (EmModule, "send_file_data", (VALUE(*)(...))t_send_file_data, 2);
diff --git a/ext/ssl.cpp b/ext/ssl.cpp
index f4744d6..31c73c7 100644
--- a/ext/ssl.cpp
+++ b/ext/ssl.cpp
@@ -392,13 +392,16 @@ int SslBox_t::PutPlaintext (const char *buf, int bufsize)
bool fatal = false;
bool did_work = false;
+ int pending = BIO_pending(pbioWrite);
- while (OutboundQ.HasPages()) {
+ while (OutboundQ.HasPages() && pending < SSLBOX_WRITE_BUFFER_SIZE) {
const char *page;
int length;
OutboundQ.Front (&page, &length);
assert (page && (length > 0));
int n = SSL_write (pSSL, page, length);
+ pending = BIO_pending(pbioWrite);
+
if (n > 0) {
did_work = true;
OutboundQ.PopFront();
diff --git a/ext/ssl.h b/ext/ssl.h
index 8378394..be9e752 100644
--- a/ext/ssl.h
+++ b/ext/ssl.h
@@ -54,6 +54,10 @@ class SslContext_t
class SslBox_t
**************/
+#define SSLBOX_INPUT_CHUNKSIZE 2019
+#define SSLBOX_OUTPUT_CHUNKSIZE 2048
+#define SSLBOX_WRITE_BUFFER_SIZE 8192 // (SSLBOX_OUTPUT_CHUNKSIZE * 4)
+
class SslBox_t
{
public:
diff --git a/java/src/com/rubyeventmachine/EmReactor.java b/java/src/com/rubyeventmachine/EmReactor.java
index 186902b..a31aa22 100644
--- a/java/src/com/rubyeventmachine/EmReactor.java
+++ b/java/src/com/rubyeventmachine/EmReactor.java
@@ -569,6 +569,22 @@ public class EmReactor {
return Connections.get(sig).isNotifyWritable();
}
+ public boolean pauseConnection (long sig) {
+ return ((EventableSocketChannel) Connections.get(sig)).pause();
+ }
+
+ public boolean resumeConnection (long sig) {
+ return ((EventableSocketChannel) Connections.get(sig)).resume();
+ }
+
+ public boolean isConnectionPaused (long sig) {
+ return ((EventableSocketChannel) Connections.get(sig)).isPaused();
+ }
+
+ public long getOutboundDataSize (long sig) {
+ return Connections.get(sig).getOutboundDataSize();
+ }
+
public int getConnectionCount() {
return Connections.size() + Acceptors.size();
}
diff --git a/java/src/com/rubyeventmachine/EventableChannel.java b/java/src/com/rubyeventmachine/EventableChannel.java
index 2ccd171..3d4f5fd 100644
--- a/java/src/com/rubyeventmachine/EventableChannel.java
+++ b/java/src/com/rubyeventmachine/EventableChannel.java
@@ -57,6 +57,8 @@ public interface EventableChannel {
public boolean writeOutboundData() throws IOException;
+ public long getOutboundDataSize();
+
public void setCommInactivityTimeout (long seconds);
public Object[] getPeerName();
diff --git a/java/src/com/rubyeventmachine/EventableDatagramChannel.java b/java/src/com/rubyeventmachine/EventableDatagramChannel.java
index a85d223..df1c9fd 100644
--- a/java/src/com/rubyeventmachine/EventableDatagramChannel.java
+++ b/java/src/com/rubyeventmachine/EventableDatagramChannel.java
@@ -54,6 +54,7 @@ public class EventableDatagramChannel implements EventableChannel {
Selector selector;
boolean bCloseScheduled;
LinkedList<Packet> outboundQ;
+ long outboundS;
SocketAddress returnAddress;
@@ -63,6 +64,7 @@ public class EventableDatagramChannel implements EventableChannel {
selector = sel;
bCloseScheduled = false;
outboundQ = new LinkedList<Packet>();
+ outboundS = 0;
dc.register(selector, SelectionKey.OP_READ, this);
}
@@ -71,6 +73,7 @@ public class EventableDatagramChannel implements EventableChannel {
try {
if ((!bCloseScheduled) && (bb.remaining() > 0)) {
outboundQ.addLast(new Packet(bb, returnAddress));
+ outboundS += bb.remaining();
channel.register(selector, SelectionKey.OP_WRITE | SelectionKey.OP_READ, this);
}
} catch (ClosedChannelException e) {
@@ -82,6 +85,7 @@ public class EventableDatagramChannel implements EventableChannel {
try {
if ((!bCloseScheduled) && (bb.remaining() > 0)) {
outboundQ.addLast(new Packet (bb, new InetSocketAddress (recipAddress, recipPort)));
+ outboundS += bb.remaining();
channel.register(selector, SelectionKey.OP_WRITE | SelectionKey.OP_READ, this);
}
} catch (ClosedChannelException e) {
@@ -136,6 +140,7 @@ public class EventableDatagramChannel implements EventableChannel {
try {
// With a datagram socket, it's ok to send an empty buffer.
written = channel.send(p.bb, p.recipient);
+ outboundS -= written;
}
catch (IOException e) {
return false;
@@ -192,4 +197,5 @@ public class EventableDatagramChannel implements EventableChannel {
public boolean isWatchOnly() { return false; }
public boolean isNotifyReadable() { return false; }
public boolean isNotifyWritable() { return false; }
+ public long getOutboundDataSize() { return outboundS; }
}
diff --git a/java/src/com/rubyeventmachine/EventableSocketChannel.java b/java/src/com/rubyeventmachine/EventableSocketChannel.java
index cdffda7..2905ec6 100644
--- a/java/src/com/rubyeventmachine/EventableSocketChannel.java
+++ b/java/src/com/rubyeventmachine/EventableSocketChannel.java
@@ -54,6 +54,7 @@ public class EventableSocketChannel implements EventableChannel {
long binding;
LinkedList<ByteBuffer> outboundQ;
+ long outboundS;
boolean bCloseScheduled;
boolean bConnectPending;
@@ -61,6 +62,7 @@ public class EventableSocketChannel implements EventableChannel {
boolean bAttached;
boolean bNotifyReadable;
boolean bNotifyWritable;
+ boolean bPaused;
SSLEngine sslEngine;
SSLContext sslContext;
@@ -76,6 +78,7 @@ public class EventableSocketChannel implements EventableChannel {
bNotifyReadable = false;
bNotifyWritable = false;
outboundQ = new LinkedList<ByteBuffer>();
+ outboundS = 0;
}
public long getBinding() {
@@ -164,12 +167,14 @@ public class EventableSocketChannel implements EventableChannel {
sslEngine.wrap(bb, b);
b.flip();
outboundQ.addLast(b);
+ outboundS += b.remaining();
} catch (SSLException e) {
throw new RuntimeException ("ssl error");
}
}
else {
outboundQ.addLast(bb);
+ outboundS += bb.remaining();
}
updateEvents();
@@ -188,6 +193,8 @@ public class EventableSocketChannel implements EventableChannel {
throw new IOException ("eof");
}
+ public long getOutboundDataSize() { return outboundS; }
+
/**
* Called by the reactor when we have selected writable.
* Return false to indicate an error that should cause the connection to close.
@@ -196,23 +203,35 @@ public class EventableSocketChannel implements EventableChannel {
* this code is written, we're depending on a nonblocking write NOT TO CONSUME
* the whole outbound buffer in this case, rather than firing an exception.
* We should somehow verify that this is indeed Java's defined behavior.
- * Also TODO, see if we can use gather I/O rather than one write at a time.
- * Ought to be a big performance enhancer.
* @return
*/
public boolean writeOutboundData() throws IOException {
+ ByteBuffer[] bufs = new ByteBuffer[64];
+ int i;
+ long written, toWrite;
while (!outboundQ.isEmpty()) {
- ByteBuffer b = outboundQ.getFirst();
- if (b.remaining() > 0)
- channel.write(b);
+ i = 0;
+ toWrite = 0;
+ written = 0;
+ while (i < 64 && !outboundQ.isEmpty()) {
+ bufs[i] = outboundQ.removeFirst();
+ toWrite += bufs[i].remaining();
+ i++;
+ }
+ if (toWrite > 0)
+ written = channel.write(bufs, 0, i);
+ outboundS -= written;
// Did we consume the whole outbound buffer? If yes,
// pop it off and keep looping. If no, the outbound network
// buffers are full, so break out of here.
- if (b.remaining() == 0)
- outboundQ.removeFirst();
- else
+ if (written < toWrite) {
+ while (i > 0 && bufs[i-1].remaining() > 0) {
+ outboundQ.addFirst(bufs[i-1]);
+ i--;
+ }
break;
+ }
}
if (outboundQ.isEmpty() && !bCloseScheduled) {
@@ -244,8 +263,10 @@ public class EventableSocketChannel implements EventableChannel {
public boolean scheduleClose (boolean afterWriting) {
// TODO: What the hell happens here if bConnectPending is set?
- if (!afterWriting)
+ if (!afterWriting) {
outboundQ.clear();
+ outboundS = 0;
+ }
if (outboundQ.isEmpty())
return true;
@@ -331,6 +352,30 @@ public class EventableSocketChannel implements EventableChannel {
}
public boolean isNotifyWritable() { return bNotifyWritable; }
+ public boolean pause() {
+ if (bWatchOnly) {
+ throw new RuntimeException ("cannot pause/resume 'watch only' connections, set notify readable/writable instead");
+ }
+ boolean old = bPaused;
+ bPaused = true;
+ updateEvents();
+ return !old;
+ }
+
+ public boolean resume() {
+ if (bWatchOnly) {
+ throw new RuntimeException ("cannot pause/resume 'watch only' connections, set notify readable/writable instead");
+ }
+ boolean old = bPaused;
+ bPaused = false;
+ updateEvents();
+ return old;
+ }
+
+ public boolean isPaused() {
+ return bPaused;
+ }
+
private void updateEvents() {
if (channelKey == null)
return;
@@ -353,7 +398,7 @@ public class EventableSocketChannel implements EventableChannel {
if (bNotifyWritable)
events |= SelectionKey.OP_WRITE;
}
- else
+ else if (!bPaused)
{
if (bConnectPending)
events |= SelectionKey.OP_CONNECT;
diff --git a/lib/em/buftok.rb b/lib/em/buftok.rb
index 2f2225b..caf4f77 100644
--- a/lib/em/buftok.rb
+++ b/lib/em/buftok.rb
@@ -1,110 +1,59 @@
# BufferedTokenizer takes a delimiter upon instantiation, or acts line-based
# by default. It allows input to be spoon-fed from some outside source which
# receives arbitrary length datagrams which may-or-may-not contain the token
-# by which entities are delimited.
-#
-# By default, new BufferedTokenizers will operate on lines delimited by "\n" by default
-# or allow you to specify any delimiter token you so choose, which will then
-# be used by String#split to tokenize the input data
-#
-# @example Using BufferedTokernizer to parse lines out of incoming data
-#
-# module LineBufferedConnection
-# def receive_data(data)
-# (@buffer ||= BufferedTokenizer.new).extract(data).each do |line|
-# receive_line(line)
-# end
-# end
-# end
-#
-# @author Tony Arcieri
-# @author Martin Emde
+# by which entities are delimited. In this respect it's ideally paired with
+# something like EventMachine (http://rubyeventmachine.com/).
class BufferedTokenizer
- # @param [String] delimiter
- # @param [Integer] size_limit
- def initialize(delimiter = "\n", size_limit = nil)
- @delimiter = delimiter
- @size_limit = size_limit
-
- # The input buffer is stored as an array. This is by far the most efficient
- # approach given language constraints (in C a linked list would be a more
- # appropriate data structure). Segments of input data are stored in a list
- # which is only joined when a token is reached, substantially reducing the
- # number of objects required for the operation.
+ # New BufferedTokenizers will operate on lines delimited by a delimiter,
+ # which is by default the global input delimiter $/ ("\n").
+ #
+ # The input buffer is stored as an array. This is by far the most efficient
+ # approach given language constraints (in C a linked list would be a more
+ # appropriate data structure). Segments of input data are stored in a list
+ # which is only joined when a token is reached, substantially reducing the
+ # number of objects required for the operation.
+ def initialize(delimiter = $/)
+ @delimiter = delimiter
@input = []
-
- # Size of the input buffer
- @input_size = 0
+ @tail = ''
+ @trim = @delimiter.length - 1
end
# Extract takes an arbitrary string of input data and returns an array of
- # tokenized entities, provided there were any available to extract.
- #
- # @example
+ # tokenized entities, provided there were any available to extract. This
+ # makes for easy processing of datagrams using a pattern like:
#
- # tokenizer.extract(data).
- # map { |entity| Decode(entity) }.each { ... }
+ # tokenizer.extract(data).map { |entity| Decode(entity) }.each do ...
#
- # @param [String] data
+ # Using -1 makes split to return "" if the token is at the end of
+ # the string, meaning the last element is the start of the next chunk.
def extract(data)
- # Extract token-delimited entities from the input string with the split command.
- # There's a bit of craftiness here with the -1 parameter. Normally split would
- # behave no differently regardless of if the token lies at the very end of the
- # input buffer or not (i.e. a literal edge case) Specifying -1 forces split to
- # return "" in this case, meaning that the last entry in the list represents a
- # new segment of data where the token has not been encountered
- entities = data.split @delimiter, -1
-
- # Check to see if the buffer has exceeded capacity, if we're imposing a limit
- if @size_limit
- raise 'input buffer full' if @input_size + entities.first.size > @size_limit
- @input_size += entities.first.size
+ if @trim > 0
+ tail_end = @tail.slice!(- at trim, @trim) # returns nil if string is too short
+ data = tail_end + data if tail_end
end
- # Move the first entry in the resulting array into the input buffer. It represents
- # the last segment of a token-delimited entity unless it's the only entry in the list.
- @input << entities.shift
-
- # If the resulting array from the split is empty, the token was not encountered
- # (not even at the end of the buffer). Since we've encountered no token-delimited
- # entities this go-around, return an empty array.
- return [] if entities.empty?
-
- # At this point, we've hit a token, or potentially multiple tokens. Now we can bring
- # together all the data we've buffered from earlier calls without hitting a token,
- # and add it to our list of discovered entities.
- entities.unshift @input.join
+ @input << @tail
+ entities = data.split(@delimiter, -1)
+ @tail = entities.shift
- # Now that we've hit a token, joined the input buffer and added it to the entities
- # list, we can go ahead and clear the input buffer. All of the segments that were
- # stored before the join can now be garbage collected.
- @input.clear
-
- # The last entity in the list is not token delimited, however, thanks to the -1
- # passed to split. It represents the beginning of a new list of as-yet-untokenized
- # data, so we add it to the start of the list.
- @input << entities.pop
-
- # Set the new input buffer size, provided we're keeping track
- @input_size = @input.first.size if @size_limit
+ unless entities.empty?
+ @input << @tail
+ entities.unshift @input.join
+ @input.clear
+ @tail = entities.pop
+ end
- # Now we're left with the list of extracted token-delimited entities we wanted
- # in the first place. Hooray!
entities
end
# Flush the contents of the input buffer, i.e. return the input buffer even though
- # a token has not yet been encountered.
- #
- # @return [String]
+ # a token has not yet been encountered
def flush
+ @input << @tail
buffer = @input.join
@input.clear
+ @tail = "" # @tail.clear is slightly faster, but not supported on 1.8.7
buffer
end
-
- # @return [Boolean]
- def empty?
- @input.empty?
- end
end
diff --git a/lib/em/connection.rb b/lib/em/connection.rb
index e22e204..10febf1 100644
--- a/lib/em/connection.rb
+++ b/lib/em/connection.rb
@@ -409,7 +409,7 @@ module EventMachine
[priv_key, cert_chain].each do |file|
next if file.nil? or file.empty?
raise FileNotFoundException,
- "Could not find #{file} for start_tls" unless File.exists? file
+ "Could not find #{file} for start_tls" unless File.exist? file
end
EventMachine::set_tls_parms(@signature, priv_key || '', cert_chain || '', verify_peer)
diff --git a/lib/em/iterator.rb b/lib/em/iterator.rb
index 3bebf2c..035aa9d 100644
--- a/lib/em/iterator.rb
+++ b/lib/em/iterator.rb
@@ -50,6 +50,7 @@ module EventMachine
#
def initialize(list, concurrency = 1)
raise ArgumentError, 'argument must be an array' unless list.respond_to?(:to_a)
+ raise ArgumentError, 'concurrency must be bigger than zero' unless (concurrency > 0)
@list = list.to_a.dup
@concurrency = concurrency
@@ -224,47 +225,7 @@ module EventMachine
end
end
-if __FILE__ == $0
- $:.unshift File.join(File.dirname(__FILE__), '..')
- require 'eventmachine'
-
- # TODO: real tests
- # TODO: pass in one object instead of two? .each{ |iter| puts iter.current; iter.next }
- # TODO: support iter.pause/resume/stop/break/continue?
- # TODO: create some exceptions instead of using RuntimeError
- # TODO: support proc instead of enumerable? EM::Iterator.new(proc{ return queue.pop })
-
- EM.run{
- EM::Iterator.new(1..50).each{ |num,iter| p num; iter.next }
- EM::Iterator.new([1,2,3], 10).each{ |num,iter| p num; iter.next }
-
- i = EM::Iterator.new(1..100, 5)
- i.each(proc{|num,iter|
- p num.to_s
- iter.next
- }, proc{
- p :done
- })
- EM.add_timer(0.03){
- i.concurrency = 1
- }
- EM.add_timer(0.04){
- i.concurrency = 3
- }
-
- EM::Iterator.new(100..150).map(proc{ |num,iter|
- EM.add_timer(0.01){ iter.return(num) }
- }, proc{ |results|
- p results
- })
-
- EM::Iterator.new(%w[ pwd uptime uname date ], 2).inject({}, proc{ |hash,cmd,iter|
- EM.system(cmd){ |output,status|
- hash[cmd] = status.exitstatus == 0 ? output.strip : nil
- iter.return(hash)
- }
- }, proc{ |results|
- p results
- })
- }
-end
+# TODO: pass in one object instead of two? .each{ |iter| puts iter.current; iter.next }
+# TODO: support iter.pause/resume/stop/break/continue?
+# TODO: create some exceptions instead of using RuntimeError
+# TODO: support proc instead of enumerable? EM::Iterator.new(proc{ return queue.pop })
diff --git a/lib/em/protocols/httpclient.rb b/lib/em/protocols/httpclient.rb
index eb2682f..344f2c0 100644
--- a/lib/em/protocols/httpclient.rb
+++ b/lib/em/protocols/httpclient.rb
@@ -23,8 +23,6 @@
#
#
-
-
module EventMachine
module Protocols
@@ -52,7 +50,6 @@ module EventMachine
# DNS: Some way to cache DNS lookups for hostnames we connect to. Ruby's
# DNS lookups are unbelievably slow.
# HEAD requests.
- # Chunked transfer encoding.
# Convenience methods for requests. get, post, url, etc.
# SSL.
# Handle status codes like 304, 100, etc.
@@ -184,6 +181,8 @@ module EventMachine
@content_length = nil # not zero
@content = ""
@status = nil
+ @chunked = false
+ @chunk_length = nil
@read_state = :header
@connection_close = nil
when :header
@@ -191,7 +190,7 @@ module EventMachine
if ary.length == 2
data = ary.last
if ary.first == ""
- if (@content_length and @content_length > 0) || @connection_close
+ if (@content_length and @content_length > 0) || @chunked || @connection_close
@read_state = :content
else
dispatch_response
@@ -211,6 +210,8 @@ module EventMachine
@content_length ||= $'.to_i
elsif ary.first =~ /\Aconnection:\s*close/i
@connection_close = true
+ elsif ary.first =~ /\Atransfer-encoding:\s*chunked/i
+ @chunked = true
end
end
else
@@ -218,12 +219,32 @@ module EventMachine
data = ""
end
when :content
- # If there was no content-length header, we have to wait until the connection
- # closes. Everything we get until that point is content.
- # TODO: Must impose a content-size limit, and also must implement chunking.
- # Also, must support either temporary files for large content, or calling
- # a content-consumer block supplied by the user.
- if @content_length
+ if @chunked && @chunk_length
+ bytes_needed = @chunk_length - @chunk_read
+ new_data = data[0, bytes_needed]
+ @chunk_read += new_data.length
+ @content += new_data
+ data = data[bytes_needed..-1] || ""
+ if @chunk_length == @chunk_read && data[0,2] == "\r\n"
+ @chunk_length = nil
+ data = data[2..-1]
+ end
+ elsif @chunked
+ if (m = data.match(/\A(\S*)\r\n/m))
+ data = data[m[0].length..-1]
+ @chunk_length = m[1].to_i(16)
+ @chunk_read = 0
+ if @chunk_length == 0
+ dispatch_response
+ @read_state = :base
+ end
+ end
+ elsif @content_length
+ # If there was no content-length header, we have to wait until the connection
+ # closes. Everything we get until that point is content.
+ # TODO: Must impose a content-size limit, and also must implement chunking.
+ # Also, must support either temporary files for large content, or calling
+ # a content-consumer block supplied by the user.
bytes_needed = @content_length - @content.length
@content += data[0, bytes_needed]
data = data[bytes_needed..-1] || ""
@@ -274,6 +295,5 @@ module EventMachine
end
end
end
-
end
end
diff --git a/lib/em/protocols/line_and_text.rb b/lib/em/protocols/line_and_text.rb
index 7417278..c65fa53 100644
--- a/lib/em/protocols/line_and_text.rb
+++ b/lib/em/protocols/line_and_text.rb
@@ -32,7 +32,6 @@ module EventMachine
# for a version which is optimized for correctness with regard to binary text blocks
# that can switch back to line mode.
class LineAndTextProtocol < Connection
- MaxLineLength = 16*1024
MaxBinaryLength = 32*1024*1024
def initialize *args
@@ -42,7 +41,7 @@ module EventMachine
def receive_data data
if @lbp_mode == :lines
begin
- @lpb_buffer.extract(data).each do |line|
+ @lpb_buffer.extract(data).each do |line|
receive_line(line.chomp) if respond_to?(:receive_line)
end
rescue Exception
@@ -116,7 +115,7 @@ module EventMachine
#--
# For internal use, establish protocol baseline for handling lines.
def lbp_init_line_state
- @lpb_buffer = BufferedTokenizer.new("\n", MaxLineLength)
+ @lpb_buffer = BufferedTokenizer.new("\n")
@lbp_mode = :lines
end
private :lbp_init_line_state
diff --git a/lib/em/protocols/linetext2.rb b/lib/em/protocols/linetext2.rb
index 6d4a47f..9e10bc2 100644
--- a/lib/em/protocols/linetext2.rb
+++ b/lib/em/protocols/linetext2.rb
@@ -37,7 +37,6 @@ module EventMachine
# When we get around to that, call #receive_error if the user defined it, otherwise
# throw exceptions.
- MaxLineLength = 16*1024
MaxBinaryLength = 32*1024*1024
#--
diff --git a/lib/em/protocols/smtpserver.rb b/lib/em/protocols/smtpserver.rb
index 920919b..e0f1f05 100644
--- a/lib/em/protocols/smtpserver.rb
+++ b/lib/em/protocols/smtpserver.rb
@@ -227,18 +227,26 @@ module EventMachine
process_unknown
end
end
-
+
# TODO - implement this properly, the implementation is a stub!
- def process_vrfy
+ def process_help
send_data "250 Ok, but unimplemented\r\n"
end
+
+ # RFC2821, 3.5.3 Meaning of VRFY or EXPN Success Response:
+ # A server MUST NOT return a 250 code in response to a VRFY or EXPN
+ # command unless it has actually verified the address. In particular,
+ # a server MUST NOT return 250 if all it has done is to verify that the
+ # syntax given is valid. In that case, 502 (Command not implemented)
+ # or 500 (Syntax error, command unrecognized) SHOULD be returned.
+ #
# TODO - implement this properly, the implementation is a stub!
- def process_help
- send_data "250 Ok, but unimplemented\r\n"
+ def process_vrfy
+ send_data "502 Command not implemented\r\n"
end
# TODO - implement this properly, the implementation is a stub!
def process_expn
- send_data "250 Ok, but unimplemented\r\n"
+ send_data "502 Command not implemented\r\n"
end
#--
@@ -358,12 +366,23 @@ module EventMachine
def process_auth_line(line)
plain = line.unpack("m").first
_,user,psw = plain.split("\000")
- if receive_plain_auth user,psw
+
+ succeeded = proc {
send_data "235 authentication ok\r\n"
@state << :auth
- else
+ }
+ failed = proc {
send_data "535 invalid authentication\r\n"
+ }
+ auth = receive_plain_auth user,psw
+
+ if auth.respond_to?(:callback)
+ auth.callback(&succeeded)
+ auth.errback(&failed)
+ else
+ (auth ? succeeded : failed).call
end
+
@state.delete :auth_incomplete
end
@@ -409,8 +428,12 @@ module EventMachine
#--
# STARTTLS may not be issued before EHLO, or unless the user has chosen
# to support it.
- # TODO, must support user-supplied certificates.
#
+ # If :starttls_options is present and :starttls is set in the parms
+ # pass the options in :starttls_options to start_tls. Do this if you want to use
+ # your own certificate
+ # e.g. {:cert_chain_file => "/etc/ssl/cert.pem", :private_key_file => "/etc/ssl/private/cert.key"}
+
def process_starttls
if @@parms[:starttls]
if @state.include?(:starttls)
@@ -419,7 +442,7 @@ module EventMachine
send_data "503 EHLO required before STARTTLS\r\n"
else
send_data "220 Start TLS negotiation\r\n"
- start_tls
+ start_tls(@@parms[:starttls_options] || {})
@state << :starttls
end
else
diff --git a/lib/em/pure_ruby.rb b/lib/em/pure_ruby.rb
index b81afb6..64bf0ac 100644
--- a/lib/em/pure_ruby.rb
+++ b/lib/em/pure_ruby.rb
@@ -393,7 +393,7 @@ module EventMachine
100.times {
@loopbreak_port = rand(10000) + 40000
begin
- @loopbreak_reader.bind "localhost", @loopbreak_port
+ @loopbreak_reader.bind "127.0.0.1", @loopbreak_port
bound = true
break
rescue
@@ -410,7 +410,7 @@ module EventMachine
def signal_loopbreak
#@loopbreak_writer.write '+' if @loopbreak_writer
- @loopbreak_writer.send('+',0,"localhost", at loopbreak_port) if @loopbreak_writer
+ @loopbreak_writer.send('+',0,"127.0.0.1", at loopbreak_port) if @loopbreak_writer
end
def set_timer_quantum interval_in_seconds
diff --git a/lib/em/resolver.rb b/lib/em/resolver.rb
index fa74e34..ce8a1f8 100644
--- a/lib/em/resolver.rb
+++ b/lib/em/resolver.rb
@@ -66,7 +66,15 @@ module EventMachine
def post_init
@requests = {}
- EM.add_periodic_timer(0.1, &method(:tick))
+ end
+
+ def start_timer
+ @timer ||= EM.add_periodic_timer(0.1, &method(:tick))
+ end
+
+ def stop_timer
+ EM.cancel_timer(@timer)
+ @timer = nil
end
def unbind
@@ -84,6 +92,13 @@ module EventMachine
else
@requests[id] = req
end
+
+ start_timer
+ end
+
+ def deregister_request(id, req)
+ @requests.delete(id)
+ stop_timer if @requests.length == 0
end
def send_packet(pkt)
@@ -109,6 +124,7 @@ module EventMachine
req = @requests[msg.id]
if req
@requests.delete(msg.id)
+ stop_timer if @requests.length == 0
req.receive_answer(msg)
end
end
@@ -140,6 +156,7 @@ module EventMachine
if @tries < @max_tries
send
else
+ @socket.deregister_request(@id, self)
fail 'retries exceeded'
end
end
diff --git a/lib/em/tick_loop.rb b/lib/em/tick_loop.rb
index d8f648d..a95d516 100644
--- a/lib/em/tick_loop.rb
+++ b/lib/em/tick_loop.rb
@@ -7,25 +7,25 @@ module EventMachine
# A TickLoop is useful when one needs to distribute amounts of work
# throughout ticks in order to maintain response times. It is also useful for
# simple repeated checks and metrics.
- #
- # # Here we run through an array one item per tick until it is empty,
- # # printing each element.
- # # When the array is empty, we return :stop from the callback, and the
- # # loop will terminate.
- # # When the loop terminates, the on_stop callbacks will be called.
- # EM.run do
- # array = (1..100).to_a
- #
- # tickloop = EM.tick_loop do
- # if array.empty?
- # :stop
- # else
- # puts array.shift
- # end
- # end
- #
- # tickloop.on_stop { EM.stop }
- # end
+ # @example
+ # # Here we run through an array one item per tick until it is empty,
+ # # printing each element.
+ # # When the array is empty, we return :stop from the callback, and the
+ # # loop will terminate.
+ # # When the loop terminates, the on_stop callbacks will be called.
+ # EM.run do
+ # array = (1..100).to_a
+ #
+ # tickloop = EM.tick_loop do
+ # if array.empty?
+ # :stop
+ # else
+ # puts array.shift
+ # end
+ # end
+ #
+ # tickloop.on_stop { EM.stop }
+ # end
#
class TickLoop
diff --git a/lib/em/version.rb b/lib/em/version.rb
index 23a7311..3d0cc59 100644
--- a/lib/em/version.rb
+++ b/lib/em/version.rb
@@ -1,3 +1,3 @@
module EventMachine
- VERSION = "1.0.3"
+ VERSION = "1.0.7"
end
diff --git a/lib/eventmachine.rb b/lib/eventmachine.rb
index 5060154..7ee146d 100644
--- a/lib/eventmachine.rb
+++ b/lib/eventmachine.rb
@@ -156,7 +156,7 @@ module EventMachine
# will start without release_machine being called and will immediately throw
#
- if reactor_running? and @reactor_pid != Process.pid
+ if @reactor_running and @reactor_pid != Process.pid
# Reactor was started in a different parent, meaning we have forked.
# Clean up reactor state so a new reactor boots up in this child.
stop_event_loop
@@ -531,6 +531,15 @@ module EventMachine
s
end
+ # Attach to an existing socket's file descriptor. The socket may have been
+ # started with {EventMachine.start_server}.
+ def self.attach_server sock, handler=nil, *args, &block
+ klass = klass_from_handler(Connection, handler, *args)
+ sd = sock.respond_to?(:fileno) ? sock.fileno : sock
+ s = attach_sd(sd)
+ @acceptors[s] = [klass,args,block,sock]
+ s
+ end
# Stop a TCP server socket that was started with {EventMachine.start_server}.
# @see EventMachine.start_server
@@ -957,13 +966,16 @@ module EventMachine
callback = @next_tick_mutex.synchronize { @next_tick_queue.shift }
begin
callback.call
+ rescue
+ exception_raised = true
+ raise
ensure
# This is a little nasty. The problem is, if an exception occurs during
# the callback, then we need to send a signal to the reactor to actually
# do some work during the next_tick. The only mechanism we have from the
# ruby side is next_tick itself, although ideally, we'd just drop a byte
# on the loopback descriptor.
- EM.next_tick {} if $!
+ EM.next_tick {} if exception_raised
end
end
end
@@ -1033,7 +1045,12 @@ module EventMachine
thread = Thread.new do
Thread.current.abort_on_exception = true
while true
- op, cback = *@threadqueue.pop
+ begin
+ op, cback = *@threadqueue.pop
+ rescue ThreadError
+ $stderr.puts $!.message
+ break # Ruby 2.0 may fail at Queue.pop
+ end
result = op.call
@resultqueue << [result, cback]
EventMachine.signal_loopbreak
@@ -1181,7 +1198,7 @@ module EventMachine
#
# @return [Boolean] true if the EventMachine reactor loop is currently running
def self.reactor_running?
- (@reactor_running || false)
+ @reactor_running && Process.pid == @reactor_pid
end
@@ -1512,9 +1529,9 @@ module EventMachine
raise ArgumentError, "must provide module or subclass of #{klass.name}" unless klass >= handler
handler
elsif handler
- begin
+ if defined?(handler::EM_CONNECTION_CLASS)
handler::EM_CONNECTION_CLASS
- rescue NameError
+ else
handler::const_set(:EM_CONNECTION_CLASS, Class.new(klass) {include handler})
end
else
diff --git a/lib/jeventmachine.rb b/lib/jeventmachine.rb
index 5ac647f..383e748 100644
--- a/lib/jeventmachine.rb
+++ b/lib/jeventmachine.rb
@@ -268,6 +268,20 @@ module EventMachine
@em.getConnectionCount
end
+ def self.pause_connection(sig)
+ @em.pauseConnection(sig)
+ end
+ def self.resume_connection(sig)
+ @em.resumeConnection(sig)
+ end
+ def self.connection_paused?(sig)
+ @em.isConnectionPaused(sig)
+ end
+ def self._get_outbound_data_size(sig)
+ @em.getOutboundDataSize(sig)
+ end
+
+
def self.set_tls_parms(sig, params)
end
def self.start_tls(sig)
@@ -279,6 +293,9 @@ module EventMachine
def associate_callback_target sig
# No-op for the time being
end
+ def get_outbound_data_size
+ EM._get_outbound_data_size @signature
+ end
end
end
diff --git a/metadata.yml b/metadata.yml
index adc184b..e274ffc 100644
--- a/metadata.yml
+++ b/metadata.yml
@@ -1,70 +1,72 @@
---- !ruby/object:Gem::Specification
+--- !ruby/object:Gem::Specification
name: eventmachine
-version: !ruby/object:Gem::Version
- hash: 17
- prerelease:
- segments:
- - 1
- - 0
- - 3
- version: 1.0.3
+version: !ruby/object:Gem::Version
+ version: 1.0.7
platform: ruby
-authors:
+authors:
- Francis Cianfrocca
- Aman Gupta
autorequire:
bindir: bin
cert_chain: []
-
-date: 2013-03-08 00:00:00 Z
-dependencies:
-- !ruby/object:Gem::Dependency
- name: rake-compiler
+date: 2015-02-10 00:00:00.000000000 Z
+dependencies:
+- !ruby/object:Gem::Dependency
+ name: test-unit
+ requirement: !ruby/object:Gem::Requirement
+ requirements:
+ - - "~>"
+ - !ruby/object:Gem::Version
+ version: '2.0'
+ type: :development
prerelease: false
- requirement: &id001 !ruby/object:Gem::Requirement
- none: false
- requirements:
- - - ~>
- - !ruby/object:Gem::Version
- hash: 57
- segments:
- - 0
- - 8
- - 3
+ version_requirements: !ruby/object:Gem::Requirement
+ requirements:
+ - - "~>"
+ - !ruby/object:Gem::Version
+ version: '2.0'
+- !ruby/object:Gem::Dependency
+ name: rake-compiler
+ requirement: !ruby/object:Gem::Requirement
+ requirements:
+ - - "~>"
+ - !ruby/object:Gem::Version
version: 0.8.3
type: :development
- version_requirements: *id001
-- !ruby/object:Gem::Dependency
- name: yard
prerelease: false
- requirement: &id002 !ruby/object:Gem::Requirement
- none: false
- requirements:
+ version_requirements: !ruby/object:Gem::Requirement
+ requirements:
+ - - "~>"
+ - !ruby/object:Gem::Version
+ version: 0.8.3
+- !ruby/object:Gem::Dependency
+ name: yard
+ requirement: !ruby/object:Gem::Requirement
+ requirements:
- - ">="
- - !ruby/object:Gem::Version
- hash: 31
- segments:
- - 0
- - 8
- - 5
- - 2
+ - !ruby/object:Gem::Version
version: 0.8.5.2
type: :development
- version_requirements: *id002
-- !ruby/object:Gem::Dependency
- name: bluecloth
prerelease: false
- requirement: &id003 !ruby/object:Gem::Requirement
- none: false
- requirements:
+ version_requirements: !ruby/object:Gem::Requirement
+ requirements:
+ - - ">="
+ - !ruby/object:Gem::Version
+ version: 0.8.5.2
+- !ruby/object:Gem::Dependency
+ name: bluecloth
+ requirement: !ruby/object:Gem::Requirement
+ requirements:
- - ">="
- - !ruby/object:Gem::Version
- hash: 3
- segments:
- - 0
- version: "0"
+ - !ruby/object:Gem::Version
+ version: '0'
type: :development
- version_requirements: *id003
+ prerelease: false
+ version_requirements: !ruby/object:Gem::Requirement
+ requirements:
+ - - ">="
+ - !ruby/object:Gem::Version
+ version: '0'
description: |-
EventMachine implements a fast, single-threaded engine for arbitrary network
communications. It's extremely easy to use in Ruby. EventMachine wraps all
@@ -76,15 +78,14 @@ description: |-
are provided with the package, primarily to serve as examples. The real goal
of EventMachine is to enable programs to easily interface with other programs
using TCP/IP, especially if custom protocols are required.
-email:
+email:
- garbagecat10 at gmail.com
- aman at tmm1.net
executables: []
-
-extensions:
+extensions:
- ext/extconf.rb
- ext/fastfilereader/extconf.rb
-extra_rdoc_files:
+extra_rdoc_files:
- README.md
- docs/DocumentationGuidesIndex.md
- docs/GettingStarted.md
@@ -100,10 +101,10 @@ extra_rdoc_files:
- docs/old/SMTP
- docs/old/SPAWNED_PROCESSES
- docs/old/TODO
-files:
-- .gitignore
-- .travis.yml
-- .yardopts
+files:
+- ".gitignore"
+- ".travis.yml"
+- ".yardopts"
- CHANGELOG.md
- GNU
- Gemfile
@@ -218,6 +219,7 @@ files:
- tests/test_channel.rb
- tests/test_completion.rb
- tests/test_connection_count.rb
+- tests/test_connection_write.rb
- tests/test_defer.rb
- tests/test_deferrable.rb
- tests/test_epoll.rb
@@ -232,10 +234,12 @@ files:
- tests/test_httpclient2.rb
- tests/test_idle_connection.rb
- tests/test_inactivity_timeout.rb
+- tests/test_iterator.rb
- tests/test_kb.rb
- tests/test_line_protocol.rb
- tests/test_ltp.rb
- tests/test_ltp2.rb
+- tests/test_many_fds.rb
- tests/test_next_tick.rb
- tests/test_object_protocol.rb
- tests/test_pause.rb
@@ -267,44 +271,37 @@ files:
- tests/test_ud.rb
- tests/test_unbind_reason.rb
homepage: http://rubyeventmachine.com
-licenses: []
-
+licenses:
+- Ruby
+- GPL
+metadata: {}
post_install_message:
-rdoc_options:
-- --title
+rdoc_options:
+- "--title"
- EventMachine
-- --main
+- "--main"
- README.md
-- -x
+- "-x"
- lib/em/version
-- -x
+- "-x"
- lib/jeventmachine
-require_paths:
+require_paths:
- lib
-required_ruby_version: !ruby/object:Gem::Requirement
- none: false
- requirements:
+required_ruby_version: !ruby/object:Gem::Requirement
+ requirements:
- - ">="
- - !ruby/object:Gem::Version
- hash: 3
- segments:
- - 0
- version: "0"
-required_rubygems_version: !ruby/object:Gem::Requirement
- none: false
- requirements:
+ - !ruby/object:Gem::Version
+ version: '0'
+required_rubygems_version: !ruby/object:Gem::Requirement
+ requirements:
- - ">="
- - !ruby/object:Gem::Version
- hash: 3
- segments:
- - 0
- version: "0"
+ - !ruby/object:Gem::Version
+ version: '0'
requirements: []
-
rubyforge_project: eventmachine
-rubygems_version: 1.8.24
+rubygems_version: 2.2.2
signing_key:
-specification_version: 3
+specification_version: 4
summary: Ruby/EventMachine library
test_files: []
-
+has_rdoc:
diff --git a/rakelib/package.rake b/rakelib/package.rake
index 0ff5deb..f12ee2b 100644
--- a/rakelib/package.rake
+++ b/rakelib/package.rake
@@ -31,8 +31,13 @@ else
def hack_cross_compilation(ext)
# inject 1.8/1.9 pure-ruby entry point
# HACK: add these dependencies to the task instead of using cross_compiling
- ext.cross_platform.each do |platform|
- Rake::Task["native:#{GEMSPEC.name}:#{platform}"].prerequisites.unshift "lib/#{ext.name}.rb"
+ if ext.cross_platform.is_a?(Array)
+ ext.cross_platform.each do |platform|
+ task = "native:#{GEMSPEC.name}:#{platform}"
+ if Rake::Task.task_defined?(task)
+ Rake::Task[task].prerequisites.unshift "lib/#{ext.name}.rb"
+ end
+ end
end
end
@@ -60,7 +65,7 @@ end
require "\#{$1}/#{File.basename(t.name, '.rb')}"
eoruby
end
- at_exit{ FileUtils.rm t.name if File.exists?(t.name) }
+ at_exit{ FileUtils.rm t.name if File.exist?(t.name) }
end
end
diff --git a/tests/em_test_helper.rb b/tests/em_test_helper.rb
index 357f6bc..94121f1 100644
--- a/tests/em_test_helper.rb
+++ b/tests/em_test_helper.rb
@@ -45,6 +45,10 @@ class Test::Unit::TestCase
def jruby?
defined? JRUBY_VERSION
end
+
+ def rbx?
+ defined?(RUBY_ENGINE) && RUBY_ENGINE == 'rbx'
+ end
end
include PlatformHelper
diff --git a/tests/test_attach.rb b/tests/test_attach.rb
index 12c9fdf..4a55017 100644
--- a/tests/test_attach.rb
+++ b/tests/test_attach.rb
@@ -4,6 +4,7 @@ require 'socket'
class TestAttach < Test::Unit::TestCase
class EchoServer < EM::Connection
def receive_data data
+ $received_data << data
send_data data
end
end
@@ -31,12 +32,14 @@ class TestAttach < Test::Unit::TestCase
def setup
@port = next_port
$read, $r, $w, $fd = nil
+ $received_data = ""
end
def teardown
[$r, $w].each do |io|
io.close rescue nil
end
+ $received_data = nil
end
def test_attach
@@ -63,6 +66,28 @@ class TestAttach < Test::Unit::TestCase
end
end
+ def test_attach_server
+ omit_if(jruby?)
+ $before = TCPServer.new("127.0.0.1", @port)
+ sig = nil
+ EM.run {
+ sig = EM.attach_server $before, EchoServer
+
+ handler = Class.new(EM::Connection) do
+ def initialize
+ send_data "hello world"
+ close_connection_after_writing
+ EM.add_timer(0.1) { EM.stop }
+ end
+ end
+ EM.connect("127.0.0.1", @port, handler)
+ }
+
+ assert_equal false, $before.closed?
+ assert_equal "hello world", $received_data
+ assert sig.is_a?(Integer)
+ end
+
def test_attach_pipe
EM.run{
$r, $w = IO.pipe
diff --git a/tests/test_basic.rb b/tests/test_basic.rb
index d52d4d0..0202d3c 100644
--- a/tests/test_basic.rb
+++ b/tests/test_basic.rb
@@ -179,18 +179,15 @@ class TestBasic < Test::Unit::TestCase
assert x
end
- if EM.respond_to? :set_heartbeat_interval
- def test_set_heartbeat_interval
- interval = 0.5
- EM.run {
- EM.set_heartbeat_interval interval
- $interval = EM.get_heartbeat_interval
- EM.stop
- }
- assert_equal(interval, $interval)
- end
- else
- warn "EM.set_heartbeat_interval not implemented, skipping a test in #{__FILE__}"
+ def test_set_heartbeat_interval
+ omit_if(jruby?)
+ interval = 0.5
+ EM.run {
+ EM.set_heartbeat_interval interval
+ $interval = EM.get_heartbeat_interval
+ EM.stop
+ }
+ assert_equal(interval, $interval)
end
module PostInitRaiser
@@ -226,6 +223,7 @@ class TestBasic < Test::Unit::TestCase
end
def test_schedule_close
+ omit_if(jruby?)
localhost, port = '127.0.0.1', 9000
timer_ran = false
num_close_scheduled = nil
@@ -247,22 +245,22 @@ class TestBasic < Test::Unit::TestCase
end
def test_fork_safe
- return unless cpid = fork { exit! } rescue false
+ omit_if(jruby?)
+ omit_if(rbx?, 'Omitting test on Rubinius because it hangs for unknown reasons')
read, write = IO.pipe
EM.run do
- cpid = fork do
+ fork do
write.puts "forked"
EM.run do
EM.next_tick do
write.puts "EM ran"
- exit!
+ EM.stop
end
end
end
EM.stop
end
- Process.waitall
assert_equal "forked\n", read.readline
assert_equal "EM ran\n", read.readline
ensure
diff --git a/tests/test_completion.rb b/tests/test_completion.rb
index a2577c7..1bd9a8f 100644
--- a/tests/test_completion.rb
+++ b/tests/test_completion.rb
@@ -1,3 +1,4 @@
+require 'em_test_helper'
require 'em/completion'
class TestCompletion < Test::Unit::TestCase
diff --git a/tests/test_connection_count.rb b/tests/test_connection_count.rb
index 91611f0..416a301 100644
--- a/tests/test_connection_count.rb
+++ b/tests/test_connection_count.rb
@@ -30,4 +30,25 @@ class TestConnectionCount < Test::Unit::TestCase
assert_equal(1, $server_conns)
assert_equal(4, $client_conns + $server_conns)
end
-end
\ No newline at end of file
+
+ module DoubleCloseClient
+ def unbind
+ close_connection
+ $num_close_scheduled_1 = EM.num_close_scheduled
+ EM.next_tick do
+ $num_close_scheduled_2 = EM.num_close_scheduled
+ EM.stop
+ end
+ end
+ end
+
+ def test_num_close_scheduled
+ omit_if(jruby?)
+ EM.run {
+ assert_equal(0, EM.num_close_scheduled)
+ EM.connect("127.0.0.1", 9999, DoubleCloseClient) # nothing listening on 9999
+ }
+ assert_equal(1, $num_close_scheduled_1)
+ assert_equal(0, $num_close_scheduled_2)
+ end
+end
diff --git a/tests/test_connection_write.rb b/tests/test_connection_write.rb
new file mode 100644
index 0000000..7519927
--- /dev/null
+++ b/tests/test_connection_write.rb
@@ -0,0 +1,35 @@
+require 'em_test_helper'
+
+class TestConnectionWrite < Test::Unit::TestCase
+
+ # This test takes advantage of the fact that EM::_RunSelectOnce iterates over the connections twice:
+ # - once to determine which ones to call Write() on
+ # - and once to call Write() on each of them.
+ #
+ # But state may change in the meantime before Write() is finally called.
+ # And that is what we try to exploit to get Write() to be called when bWatchOnly is true, and bNotifyWritable is false,
+ # to cause an assertion failure.
+
+ module SimpleClient
+ def notify_writable
+ $conn2.notify_writable = false # Being naughty in callback
+ # If this doesn't crash anything, the test passed!
+ end
+ end
+
+ def test_with_naughty_callback
+ EM.run do
+ r1, w1 = IO.pipe
+ r2, w2 = IO.pipe
+
+ # Adding EM.watches
+ $conn1 = EM.watch(r1, SimpleClient)
+ $conn2 = EM.watch(r2, SimpleClient)
+
+ $conn1.notify_writable = true
+ $conn2.notify_writable = true
+
+ EM.stop
+ end
+ end
+end
diff --git a/tests/test_epoll.rb b/tests/test_epoll.rb
index e02cac1..47240d6 100644
--- a/tests/test_epoll.rb
+++ b/tests/test_epoll.rb
@@ -25,19 +25,16 @@ class TestEpoll < Test::Unit::TestCase
end
- if windows? || jruby?
- warn "EM.set_descriptor_table_size not implemented, skipping test in #{__FILE__}"
- else
- # We can set the rlimit/nofile of a process but we can only set it
- # higher if we're running as root.
- # On most systems, the default value is 1024.
- def test_rlimit
- unless EM.set_descriptor_table_size >= 1024
- a = EM.set_descriptor_table_size
- assert( a <= 1024 )
- a = EM.set_descriptor_table_size( 1024 )
- assert( a == 1024 )
- end
+ # We can set the rlimit/nofile of a process but we can only set it
+ # higher if we're running as root.
+ # On most systems, the default value is 1024.
+ def test_rlimit
+ omit_if(windows? || jruby?)
+ unless EM.set_descriptor_table_size >= 1024
+ a = EM.set_descriptor_table_size
+ assert( a <= 1024 )
+ a = EM.set_descriptor_table_size( 1024 )
+ assert( a == 1024 )
end
end
@@ -97,7 +94,7 @@ class TestEpoll < Test::Unit::TestCase
assert_equal( "abcdefghij", $out )
end
- # XXX this test fails randomly..
+ # XXX this test fails randomly...
def _test_unix_domain
fn = "/tmp/xxx.chain"
EM.epoll
@@ -126,5 +123,20 @@ class TestEpoll < Test::Unit::TestCase
File.unlink(fn) if File.exist?(fn)
end
+ def test_attach_detach
+ EM.epoll
+ EM.run {
+ EM.add_timer(0.01) { EM.stop }
+
+ r, w = IO.pipe
+
+ # This tests a regression where detach in the same tick as attach crashes EM
+ EM.watch(r) do |connection|
+ connection.detach
+ end
+ }
+
+ assert true
+ end
end
diff --git a/tests/test_httpclient.rb b/tests/test_httpclient.rb
index 06ad768..326d968 100644
--- a/tests/test_httpclient.rb
+++ b/tests/test_httpclient.rb
@@ -187,4 +187,47 @@ class TestHttpClient < Test::Unit::TestCase
assert ok
end
+ #-----------------------------------------
+
+ # Test a server that returns chunked encoding
+ #
+ class ChunkedEncodingContent < EventMachine::Connection
+ def initialize *args
+ super
+ end
+ def receive_data data
+ send_data ["HTTP/1.1 200 OK",
+ "Server: nginx/0.7.67",
+ "Date: Sat, 23 Oct 2010 16:41:32 GMT",
+ "Content-Type: application/json",
+ "Transfer-Encoding: chunked",
+ "Connection: keep-alive",
+ "",
+ "1800",
+ "chunk1" * 1024,
+ "5a",
+ "chunk2" * 15,
+ "0",
+ ""].join("\r\n")
+ close_connection_after_writing
+ end
+ end
+
+ def test_http_chunked_encoding_content
+ ok = false
+ EventMachine.run {
+ EventMachine.start_server "127.0.0.1", 9701, ChunkedEncodingContent
+ c = EventMachine::Protocols::HttpClient.send :request, :host => "127.0.0.1", :port => 9701
+ c.callback {|result|
+ if result[:content] == "chunk1" * 1024 + "chunk2" * 15
+ ok = true
+ end
+ EventMachine.stop
+ }
+ }
+ assert ok
+ end
+
end
+
+
diff --git a/tests/test_idle_connection.rb b/tests/test_idle_connection.rb
index d779092..995463a 100644
--- a/tests/test_idle_connection.rb
+++ b/tests/test_idle_connection.rb
@@ -9,15 +9,17 @@ class TestIdleConnection < Test::Unit::TestCase
$idle_time = conn.get_idle_time
conn.send_data "GET / HTTP/1.0\r\n\r\n"
EM.next_tick{
- $idle_time_after_send = conn.get_idle_time
- conn.close_connection
- EM.stop
+ EM.next_tick{
+ $idle_time_after_send = conn.get_idle_time
+ conn.close_connection
+ EM.stop
+ }
}
}
}
assert_in_delta 3, $idle_time, 0.2
- assert_equal 0, $idle_time_after_send
+ assert_in_delta 0, $idle_time_after_send, 0.1
end
end
end
diff --git a/tests/test_iterator.rb b/tests/test_iterator.rb
new file mode 100644
index 0000000..7014272
--- /dev/null
+++ b/tests/test_iterator.rb
@@ -0,0 +1,97 @@
+require 'em_test_helper'
+
+class TestIterator < Test::Unit::TestCase
+
+ def get_time
+ EM.current_time.strftime('%H:%M:%S')
+ end
+
+ def test_default_concurrency
+ items = {}
+ list = 1..10
+ EM.run {
+ EM::Iterator.new(list).each( proc {|num,iter|
+ time = get_time
+ items[time] ||= []
+ items[time] << num
+ EM::Timer.new(1) {iter.next}
+ }, proc {EM.stop})
+ }
+ assert_equal(10, items.keys.size)
+ assert_equal(list.to_a.sort, items.values.flatten.sort)
+ end
+
+ def test_concurrency_bigger_than_list_size
+ items = {}
+ list = [1,2,3]
+ EM.run {
+ EM::Iterator.new(list,10).each(proc {|num,iter|
+ time = get_time
+ items[time] ||= []
+ items[time] << num
+ EM::Timer.new(1) {iter.next}
+ }, proc {EM.stop})
+ }
+ assert_equal(1, items.keys.size)
+ assert_equal(list.to_a.sort, items.values.flatten.sort)
+ end
+
+
+ def test_changing_concurrency_affects_active_iteration
+ items = {}
+ list = 1..25
+ EM.run {
+ i = EM::Iterator.new(list,5)
+ i.each(proc {|num,iter|
+ time = get_time
+ items[time] ||= []
+ items[time] << num
+ EM::Timer.new(1) {iter.next}
+ }, proc {EM.stop})
+ EM.add_timer(1){
+ i.concurrency = 1
+ }
+ EM.add_timer(3){
+ i.concurrency = 3
+ }
+ }
+ assert_equal(9, items.keys.size)
+ assert_equal(list.to_a.sort, items.values.flatten.sort)
+ end
+
+ def test_map
+ list = 100..150
+ EM.run {
+ EM::Iterator.new(list).map(proc{ |num,iter|
+ EM.add_timer(0.01){ iter.return(num) }
+ }, proc{ |results|
+ assert_equal(list.to_a.size, results.size)
+ EM.stop
+ })
+ }
+ end
+
+ def test_inject
+ list = %w[ pwd uptime uname date ]
+ EM.run {
+ EM::Iterator.new(list, 2).inject({}, proc{ |hash,cmd,iter|
+ EM.system(cmd){ |output,status|
+ hash[cmd] = status.exitstatus == 0 ? output.strip : nil
+ iter.return(hash)
+ }
+ }, proc{ |results|
+ assert_equal(results.keys.sort, list.sort)
+ EM.stop
+ })
+ }
+ end
+
+ def test_concurrency_is_0
+ EM.run {
+ assert_raise ArgumentError do
+ EM::Iterator.new(1..5,0)
+ end
+ EM.stop
+ }
+ end
+end
diff --git a/tests/test_kb.rb b/tests/test_kb.rb
index 8e52408..9c31f5f 100644
--- a/tests/test_kb.rb
+++ b/tests/test_kb.rb
@@ -2,33 +2,27 @@ require 'em_test_helper'
class TestKeyboardEvents < Test::Unit::TestCase
- if !jruby?
- module KbHandler
- include EM::Protocols::LineText2
- def receive_line d
- EM::stop if d == "STOP"
- end
+ module KbHandler
+ include EM::Protocols::LineText2
+ def receive_line d
+ EM::stop if d == "STOP"
end
+ end
- # This test doesn't actually do anything useful but is here to
- # illustrate the usage. If you removed the timer and ran this test
- # by itself on a console, and then typed into the console, it would
- # work.
- # I don't know how to get the test harness to simulate actual keystrokes.
- # When someone figures that out, then we can make this a real test.
- #
- def test_kb
- EM.run {
- EM.open_keyboard KbHandler
- EM::Timer.new(1) { EM.stop }
- } if $stdout.tty? # don't run the test unless it stands a chance of validity.
- end
- else
- warn "EM.open_keyboard not implemented, skipping tests in #{__FILE__}"
-
- # Because some rubies will complain if a TestCase class has no tests
- def test_em_open_keyboard_unsupported
- assert true
+ # This test doesn't actually do anything useful but is here to
+ # illustrate the usage. If you removed the timer and ran this test
+ # by itself on a console, and then typed into the console, it would
+ # work.
+ # I don't know how to get the test harness to simulate actual keystrokes.
+ # When someone figures that out, then we can make this a real test.
+ #
+ def test_kb
+ omit_if(jruby?)
+ omit_if(!$stdout.tty?) # don't run the test unless it stands a chance of validity.
+ EM.run do
+ EM.open_keyboard KbHandler
+ EM::Timer.new(1) { EM.stop }
end
end
+
end
diff --git a/tests/test_many_fds.rb b/tests/test_many_fds.rb
new file mode 100644
index 0000000..74dc926
--- /dev/null
+++ b/tests/test_many_fds.rb
@@ -0,0 +1,22 @@
+require 'em_test_helper'
+require 'socket'
+
+class TestManyFDs < Test::Unit::TestCase
+ def setup
+ @port = next_port
+ end
+
+ def test_connection_class_cache
+ mod = Module.new
+ a = nil
+ Process.setrlimit(Process::RLIMIT_NOFILE,4096);
+ EM.run {
+ EM.start_server '127.0.0.1', @port, mod
+ 1100.times do
+ a = EM.connect '127.0.0.1', @port, mod
+ assert_kind_of EM::Connection, a
+ end
+ EM.stop
+ }
+ end
+end
diff --git a/tests/test_pause.rb b/tests/test_pause.rb
index 3ace948..d078a77 100644
--- a/tests/test_pause.rb
+++ b/tests/test_pause.rb
@@ -67,6 +67,35 @@ class TestPause < Test::Unit::TestCase
end
end
end
+
+ def test_pause_in_receive_data
+ incoming = []
+
+ test_server = Module.new do
+ define_method(:receive_data) do |data|
+ incoming << data
+ pause
+ EM.add_timer(0.5){ close_connection }
+ end
+ define_method(:unbind) do
+ EM.stop
+ end
+ end
+
+ buf = 'a' * 1024
+
+ EM.run do
+ EM.start_server "127.0.0.1", @port, test_server
+ cli = EM.connect "127.0.0.1", @port
+ 128.times do
+ cli.send_data buf
+ end
+ end
+
+ assert_equal 1, incoming.size
+ assert incoming[0].bytesize > buf.bytesize
+ assert incoming[0].bytesize < buf.bytesize * 128
+ end
else
warn "EM.pause_connection not implemented, skipping tests in #{__FILE__}"
diff --git a/tests/test_pool.rb b/tests/test_pool.rb
index e45c26e..6bd117e 100644
--- a/tests/test_pool.rb
+++ b/tests/test_pool.rb
@@ -1,3 +1,5 @@
+require 'em_test_helper'
+
class TestPool < Test::Unit::TestCase
def pool
@pool ||= EM::Pool.new
diff --git a/tests/test_process_watch.rb b/tests/test_process_watch.rb
index a0ecbd4..6e0c49a 100644
--- a/tests/test_process_watch.rb
+++ b/tests/test_process_watch.rb
@@ -27,6 +27,8 @@ if EM.kqueue?
end
def test_events
+ omit_if(rbx?)
+ omit_if(jruby?)
EM.run{
# watch ourselves for a fork notification
EM.watch_process(Process.pid, ParentProcessWatcher)
diff --git a/tests/test_processes.rb b/tests/test_processes.rb
index b3ba9f7..dd03cf0 100644
--- a/tests/test_processes.rb
+++ b/tests/test_processes.rb
@@ -37,8 +37,8 @@ class TestProcesses < Test::Unit::TestCase
}
assert( $out.length > 0 )
- assert_equal($status.exitstatus, 0)
- assert_equal($status.class, Process::Status)
+ assert_equal(0, $status.exitstatus)
+ assert_kind_of(Process::Status, $status)
end
def test_em_system_pid
@@ -57,8 +57,8 @@ class TestProcesses < Test::Unit::TestCase
}
assert( $out.length > 0 )
- assert_equal($status.exitstatus, 0)
- assert_equal($status.class, Process::Status)
+ assert_equal(0, $status.exitstatus)
+ assert_kind_of(Process::Status, $status)
end
def test_em_system_with_two_procs
@@ -111,9 +111,9 @@ class TestProcesses < Test::Unit::TestCase
end
end
- EM.run{
- EM.popen('cat /dev/random', test_client)
- }
+ EM.run do
+ EM.popen('echo 1', test_client)
+ end
assert_equal 1, c_rx
end
diff --git a/tests/test_resolver.rb b/tests/test_resolver.rb
index 9172428..8a2e14d 100644
--- a/tests/test_resolver.rb
+++ b/tests/test_resolver.rb
@@ -30,11 +30,11 @@ class TestResolver < Test::Unit::TestCase
def test_a_pair
EM.run {
- d = EM::DNS::Resolver.resolve "google.com"
- d.errback { assert false }
+ d = EM::DNS::Resolver.resolve "yahoo.com"
+ d.errback { |err| assert false, "failed to resolve yahoo.com: #{err}" }
d.callback { |r|
- assert_equal(Array, r.class)
- assert r.size > 1
+ assert_kind_of(Array, r)
+ assert r.size > 1, "returned #{r.size} results: #{r.inspect}"
EM.stop
}
}
@@ -45,11 +45,37 @@ class TestResolver < Test::Unit::TestCase
d = EM::DNS::Resolver.resolve "localhost"
d.errback { assert false }
d.callback { |r|
- assert_equal("127.0.0.1", r.first)
- assert_equal(Array, r.class)
+ assert_include(["127.0.0.1", "::1"], r.first)
+ assert_kind_of(Array, r)
+
+ EM.stop
+ }
+ }
+ end
+
+ def test_timer_cleanup
+ EM.run {
+ d = EM::DNS::Resolver.resolve "google.com"
+ d.errback { |err| assert false, "failed to resolve google.com: #{err}" }
+ d.callback { |r|
+ # This isn't a great test, but it's hard to get more canonical
+ # confirmation that the timer is cancelled
+ assert_nil(EM::DNS::Resolver.socket.instance_variable_get(:@timer))
+
+ EM.stop
+ }
+ }
+ end
+
+ def test_failure_timer_cleanup
+ EM.run {
+ d = EM::DNS::Resolver.resolve "asdfasdf"
+ d.callback { assert false }
+ d.errback {
+ assert_nil(EM::DNS::Resolver.socket.instance_variable_get(:@timer))
EM.stop
}
}
end
-end
\ No newline at end of file
+end
diff --git a/tests/test_ssl_args.rb b/tests/test_ssl_args.rb
index 2d6a054..d337628 100644
--- a/tests/test_ssl_args.rb
+++ b/tests/test_ssl_args.rb
@@ -45,7 +45,7 @@ class TestSslArgs < Test::Unit::TestCase
def test_tls_params_file_doesnt_exist
priv_file, cert_file = 'foo_priv_key', 'bar_cert_file'
[priv_file, cert_file].all? do |f|
- assert(!File.exists?(f), "Cert file #{f} seems to exist, and should not for the tests")
+ assert(!File.exist?(f), "Cert file #{f} seems to exist, and should not for the tests")
end
# associate_callback_target is a pain! (build!)
@@ -75,4 +75,4 @@ class TestSslArgs < Test::Unit::TestCase
assert(false, 'should not have raised an exception')
end
end
-end if EM.ssl?
\ No newline at end of file
+end if EM.ssl?
diff --git a/tests/test_ssl_methods.rb b/tests/test_ssl_methods.rb
index dade240..e4c9077 100644
--- a/tests/test_ssl_methods.rb
+++ b/tests/test_ssl_methods.rb
@@ -3,7 +3,6 @@ require 'em_test_helper'
class TestSSLMethods < Test::Unit::TestCase
module ServerHandler
-
def post_init
start_tls
end
@@ -12,11 +11,9 @@ class TestSSLMethods < Test::Unit::TestCase
$server_called_back = true
$server_cert_value = get_peer_cert
end
-
end
module ClientHandler
-
def post_init
start_tls
end
@@ -26,10 +23,11 @@ class TestSSLMethods < Test::Unit::TestCase
$client_cert_value = get_peer_cert
EM.stop_event_loop
end
-
end
def test_ssl_methods
+ omit_unless(EM.ssl?)
+ omit_if(rbx?)
$server_called_back, $client_called_back = false, false
$server_cert_value, $client_cert_value = nil, nil
@@ -45,4 +43,4 @@ class TestSSLMethods < Test::Unit::TestCase
assert($client_cert_value.is_a?(String))
end
-end if EM.ssl?
\ No newline at end of file
+end
diff --git a/tests/test_ssl_verify.rb b/tests/test_ssl_verify.rb
index cdaf5fb..8223ef5 100644
--- a/tests/test_ssl_verify.rb
+++ b/tests/test_ssl_verify.rb
@@ -1,82 +1,82 @@
require 'em_test_helper'
-if EM.ssl?
- class TestSslVerify < Test::Unit::TestCase
- def setup
- $dir = File.dirname(File.expand_path(__FILE__)) + '/'
- $cert_from_file = File.read($dir+'client.crt')
- end
+class TestSslVerify < Test::Unit::TestCase
+ def setup
+ $dir = File.dirname(File.expand_path(__FILE__)) + '/'
+ $cert_from_file = File.read($dir+'client.crt')
+ end
- module Client
- def connection_completed
- start_tls(:private_key_file => $dir+'client.key', :cert_chain_file => $dir+'client.crt')
- end
+ module Client
+ def connection_completed
+ start_tls(:private_key_file => $dir+'client.key', :cert_chain_file => $dir+'client.crt')
+ end
- def ssl_handshake_completed
- $client_handshake_completed = true
- close_connection
- end
+ def ssl_handshake_completed
+ $client_handshake_completed = true
+ close_connection
+ end
- def unbind
- EM.stop_event_loop
- end
+ def unbind
+ EM.stop_event_loop
end
+ end
- module AcceptServer
- def post_init
- start_tls(:verify_peer => true)
- end
+ module AcceptServer
+ def post_init
+ start_tls(:verify_peer => true)
+ end
- def ssl_verify_peer(cert)
- $cert_from_server = cert
- true
- end
+ def ssl_verify_peer(cert)
+ $cert_from_server = cert
+ true
+ end
- def ssl_handshake_completed
- $server_handshake_completed = true
- end
+ def ssl_handshake_completed
+ $server_handshake_completed = true
end
+ end
- module DenyServer
- def post_init
- start_tls(:verify_peer => true)
- end
+ module DenyServer
+ def post_init
+ start_tls(:verify_peer => true)
+ end
- def ssl_verify_peer(cert)
- $cert_from_server = cert
- # Do not accept the peer. This should now cause the connection to shut down without the SSL handshake being completed.
- false
- end
+ def ssl_verify_peer(cert)
+ $cert_from_server = cert
+ # Do not accept the peer. This should now cause the connection to shut down without the SSL handshake being completed.
+ false
+ end
- def ssl_handshake_completed
- $server_handshake_completed = true
- end
+ def ssl_handshake_completed
+ $server_handshake_completed = true
end
+ end
- def test_accept_server
- $client_handshake_completed, $server_handshake_completed = false, false
- EM.run {
- EM.start_server("127.0.0.1", 16784, AcceptServer)
- EM.connect("127.0.0.1", 16784, Client).instance_variable_get("@signature")
- }
+ def test_accept_server
+ omit_unless(EM.ssl?)
+ omit_if(rbx?)
+ $client_handshake_completed, $server_handshake_completed = false, false
+ EM.run {
+ EM.start_server("127.0.0.1", 16784, AcceptServer)
+ EM.connect("127.0.0.1", 16784, Client).instance_variable_get("@signature")
+ }
- assert_equal($cert_from_file, $cert_from_server)
- assert($client_handshake_completed)
- assert($server_handshake_completed)
- end
+ assert_equal($cert_from_file, $cert_from_server)
+ assert($client_handshake_completed)
+ assert($server_handshake_completed)
+ end
- def test_deny_server
- $client_handshake_completed, $server_handshake_completed = false, false
- EM.run {
- EM.start_server("127.0.0.1", 16784, DenyServer)
- EM.connect("127.0.0.1", 16784, Client)
- }
+ def test_deny_server
+ omit_unless(EM.ssl?)
+ omit_if(rbx?)
+ $client_handshake_completed, $server_handshake_completed = false, false
+ EM.run {
+ EM.start_server("127.0.0.1", 16784, DenyServer)
+ EM.connect("127.0.0.1", 16784, Client)
+ }
- assert_equal($cert_from_file, $cert_from_server)
- assert(!$client_handshake_completed)
- assert(!$server_handshake_completed)
- end
+ assert_equal($cert_from_file, $cert_from_server)
+ assert(!$client_handshake_completed)
+ assert(!$server_handshake_completed)
end
-else
- warn "EM built without SSL support, skipping tests in #{__FILE__}"
end
diff --git a/tests/test_threaded_resource.rb b/tests/test_threaded_resource.rb
index 10be5a5..dbbd776 100644
--- a/tests/test_threaded_resource.rb
+++ b/tests/test_threaded_resource.rb
@@ -15,6 +15,10 @@ class TestThreadedResource < Test::Unit::TestCase
def test_dispatch_completion
EM.run do
+ EM.add_timer(3) do
+ EM.stop
+ fail 'Resource dispatch timed out'
+ end
completion = resource.dispatch do |o|
o[:foo] = :bar
:foo
@@ -23,6 +27,10 @@ class TestThreadedResource < Test::Unit::TestCase
assert_equal :foo, result
EM.stop
end
+ completion.errback do |error|
+ EM.stop
+ fail "Unexpected error: #{error.message}"
+ end
end
assert_equal :bar, object[:foo]
end
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-ruby-extras/ruby-eventmachine.git
More information about the Pkg-ruby-extras-commits
mailing list