Class: MPipe
- Inherits:
-
Data
- Object
- Data
- MPipe
- Defined in:
- lib/mpipe.rb,
lib/mpipe/version.rb,
ext/mpipe/mpipe.c
Defined Under Namespace
Modules: Comm
Constant Summary collapse
- VERSION =
"0.3.0"
- MPI_VERSION =
INT2NUM(MPI_VERSION)
- MPI_SUBVERSION =
INT2NUM(MPI_SUBVERSION)
- SUCCESS =
INT2NUM(MPI_SUCCESS)
- PROC_NULL =
INT2NUM(MPI_PROC_NULL)
- @@min_poll =
0.01
- @@max_poll =
0.32
Class Method Summary collapse
- .abort(rerror) ⇒ Object
- .buffer_size ⇒ Object
- .buffer_size=(size) ⇒ Object
- .finalize ⇒ Object
-
.init(*args) ⇒ Object
MPI.
- .max_polling_interval=(time) ⇒ Object
- .min_polling_interval=(time) ⇒ Object
-
.new(vrank) ⇒ Object
:nodoc:.
-
.select(rd_ary, wt_ary = nil, er_ary = nil, timeout = nil) ⇒ Object
emulate IO.select.
Instance Method Summary collapse
- #close ⇒ Object
- #closed? ⇒ Boolean
- #flush ⇒ Object
-
#new(string = ""[, mode]) ⇒ Object
constructor
Creates new MPipe instance from with string and mode.
- #print(str) ⇒ Object
-
#rank ⇒ Object
Returns
rank
. -
#read(length[, outbuf]) ⇒ String?
Reads length bytes from the I/O stream.
-
#read_nonblock(maxlen[, outbuf [, opts]]) ⇒ String
Similar to #read, but raises
EOFError
at end of string unless the exception: false option is passed in. -
#test_recv ⇒ Boolean
calls MPI_Test.
- #write(str) ⇒ Object
- #write_nonblock(str) ⇒ Object
Constructor Details
#new(string = ""[, mode]) ⇒ Object
Creates new MPipe instance from with string and mode.
226 227 228 229 230 231 232 233 234 235 236 |
# File 'ext/mpipe/mpipe.c', line 226
static VALUE
mp_initialize(VALUE self, VALUE rank)
{
struct MPipe *ptr = check_mpipe(self);
if (!ptr) {
DATA_PTR(self) = ptr = mp_alloc();
}
rb_call_super(0, 0);
return mp_init(ptr, self, rank);
}
|
Class Method Details
.abort(rerror) ⇒ Object
82 83 84 85 86 87 88 89 |
# File 'ext/mpipe/mpipe.c', line 82
static VALUE
mp_mpi_abort(VALUE klass, VALUE rerror)
{
int ierror;
ierror = MPI_Abort(MPI_COMM_WORLD, NUM2INT(rerror));
return INT2NUM(ierror);
}
|
.buffer_size ⇒ Object
91 92 93 94 95 |
# File 'ext/mpipe/mpipe.c', line 91
static VALUE
mp_mpi_buffer_size(VALUE mod)
{
return INT2NUM(mp_buffer_size);
}
|
.buffer_size=(size) ⇒ Object
97 98 99 100 101 102 103 104 105 |
# File 'ext/mpipe/mpipe.c', line 97
static VALUE
mp_mpi_set_buffer_size(VALUE mod, VALUE size)
{
if (mp_initialized) {
rb_raise(rb_eStandardError,"buffer_size must be set before MPipe.init");
}
mp_buffer_size = NUM2INT(size);
return size;
}
|
.finalize ⇒ Object
75 76 77 78 79 80 |
# File 'ext/mpipe/mpipe.c', line 75
static VALUE
mp_mpi_finalize(VALUE klass)
{
mp_finalize();
return Qnil;
}
|
.init(*args) ⇒ Object
MPI
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'ext/mpipe/mpipe.c', line 39
static VALUE
mp_mpi_init(int argc, VALUE *argv, VALUE klass)
{
char **cargv;
VALUE progname, mpipe_ary;
int i, size;
cargv = ALLOCA_N(char *, argc+1);
progname = rb_gv_get("$0");
cargv[0] = StringValueCStr(progname);
for(i=0; i<argc; i++) {
if (TYPE(argv[i]) == T_STRING) {
cargv[i+1] = StringValueCStr(argv[i]);
} else {
rb_raise(rb_eArgError, "argument must be string");
}
}
argc++;
MPI_Init(&argc, &cargv);
if (mp_initialized) {
return Qnil;
} else {
mp_initialized = 1;
}
atexit(mp_finalize);
MPI_Comm_size(MPI_COMM_WORLD, &size);
mpipe_ary = rb_ary_new2(size);
rb_ivar_set(klass, id_allocated_mpipe, mpipe_ary);
return Qnil;
}
|
.max_polling_interval=(time) ⇒ Object
13 14 15 |
# File 'lib/mpipe.rb', line 13 def self.max_polling_interval=(time) @@max_poll = time end |
.min_polling_interval=(time) ⇒ Object
9 10 11 |
# File 'lib/mpipe.rb', line 9 def self.min_polling_interval=(time) @@min_poll = time end |
.new(vrank) ⇒ Object
:nodoc:
239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 |
# File 'ext/mpipe/mpipe.c', line 239
static VALUE
mp_s_new(VALUE klass, VALUE vrank)
{
VALUE mpipe, mpipe_ary;
int rank;
rank = NUM2INT(vrank);
mpipe_ary = rb_ivar_get(klass, id_allocated_mpipe);
mpipe = rb_ary_entry(mpipe_ary, rank);
if (NIL_P(mpipe)) {
mpipe = rb_class_new_instance(1, &vrank, klass);
rb_ary_store(mpipe_ary, rank, mpipe);
}
return mpipe;
}
|
.select(rd_ary, wt_ary = nil, er_ary = nil, timeout = nil) ⇒ Object
emulate IO.select
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 |
# File 'lib/mpipe.rb', line 18 def self.select(rd_ary, wt_ary=nil, er_ary=nil, timeout=nil) rd_ary = [] if rd_ary.nil? wt_ary = [] if wt_ary.nil? rd_mpi = [] rd_io = [] wt_io = [] rd_ret = Array.new(rd_ary.size) wt_ret = Array.new(wt_ary.size) rd_idx = {} wt_idx = {} found = false rd_ary.each_with_index do |rd,i| rd_idx[rd] = i case rd when MPipe rd_mpi << rd if rd.test_recv rd_ret[i] = rd found = true end when IO rd_io << rd else raise ArgumentError, "elements should be IO or MPipe" end end wt_ary.each_with_index do |wt,i| wt_idx[wt] = i case wt when MPipe wt_ret[i] = wt found = true when IO wt_io << wt else raise ArgumentError, "elements should be IO or MPipe" end end if er_ary er_ary.each do |er| if !er.kind_of?(IO) raise ArgumentError, "er_ary contains non-IO object" end end end time_start = Time.now # first check rd_res,wt_res,er_res = IO.select(rd_io, wt_io, er_ary, 0) if rd_res rd_res.each{|io| rd_ret[rd_idx[io]] = io} found = true end if wt_res wt_res.each{|io| wt_ret[wt_idx[io]] = io} found = true end if er_res found = true end if found return [rd_ret.compact, wt_ret.compact, er_res] end dt = @@min_poll max_dt = @@max_poll loop do if timeout elap = Time.now - time_start if timeout <= elap return nil else dto = timeout - elap dt = (dto < dt) ? dto : dt end end # check with timeout rd_res,wt_res,er_res = IO.select(rd_io, wt_io, er_ary, dt) if rd_res rd_res.each{|io| rd_ret[rd_idx[io]] = io} found = true end if wt_res wt_res.each{|io| wt_ret[wt_idx[io]] = io} found = true end if er_res found = true end rd_mpi.each do |mp| if mp.test_recv rd_ret[rd_idx[mp]] = mp found = true end end if found return [rd_ret.compact,wt_ret.compact,er_res] end if dt != max_dt dt *= 2 if dt < max_dt dt = max_dt if dt > max_dt end end end |
Instance Method Details
#close ⇒ Object
#closed? ⇒ Boolean
#flush ⇒ Object
#print(str) ⇒ Object
334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 |
# File 'ext/mpipe/mpipe.c', line 334
static VALUE
mp_write(VALUE self, VALUE str)
{
struct MPipe *ptr = MPipe(self);
int istat;
int pos, count;
str = StringValue(str);
pos = 0;
while (pos < RSTRING_LEN(str)) {
count = RSTRING_LEN(str) - pos;
if (count > mp_buffer_size) {
count = mp_buffer_size;
}
memcpy(ptr->send_buffer, RSTRING_PTR(str)+pos, count);
istat = MPI_Send(ptr->send_buffer, count, MPI_CHAR, ptr->rank,
0, MPI_COMM_WORLD);
if (istat != MPI_SUCCESS) {
rb_raise(rb_eStandardError,"MPI_Send failed with status=%d\n",istat);
}
pos += count;
}
return INT2NUM(pos);
}
|
#rank ⇒ Object
Returns rank
.
259 260 261 262 263 264 |
# File 'ext/mpipe/mpipe.c', line 259
static VALUE
mp_rank(VALUE self)
{
struct MPipe *ptr = MPipe(self);
return INT2NUM(ptr->rank);
}
|
#read(length[, outbuf]) ⇒ String?
Reads length bytes from the I/O stream.
length must be a non-negative integer.
390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 |
# File 'ext/mpipe/mpipe.c', line 390
static VALUE
mp_read(int argc, VALUE *argv, VALUE self)
{
struct MPipe *ptr = MPipe(self);
MPI_Status status;
int istat;
int outbuf_pos = 0;
int max_len;
VALUE maxlen = Qnil;
VALUE outbuf = Qnil;
rb_scan_args(argc, argv, "11", &maxlen, &outbuf);
max_len = NUM2INT(maxlen);
if (max_len < 0) {
rb_raise(rb_eArgError, "negative length %d given", max_len);
}
if (NIL_P(outbuf)) {
outbuf = rb_str_new(0, 0);
}
rb_str_resize(outbuf, max_len);
if (ptr->recv_count == -1) { // requesting
istat = MPI_Wait(&ptr->recv_request, &status);
if (istat != MPI_SUCCESS) {
rb_raise(rb_eStandardError,"MPI_Wait failed with status=%d",istat);
}
MPI_Get_count(&status, MPI_CHAR, &ptr->recv_count);
}
if (ptr->recv_count > 0) {
outbuf_pos += copy_substr(ptr, max_len, outbuf, outbuf_pos);
}
while (outbuf_pos < max_len) {
istat = MPI_Recv(ptr->recv_buffer, mp_buffer_size, MPI_CHAR, ptr->rank,
0, MPI_COMM_WORLD, &status);
if (istat != MPI_SUCCESS) {
rb_raise(rb_eStandardError,"MPI_Recv failed with status=%d\n",istat);
}
MPI_Get_count(&status, MPI_CHAR, &ptr->recv_count);
outbuf_pos += copy_substr(ptr, max_len, outbuf, outbuf_pos);
}
return outbuf;
}
|
#read_nonblock(maxlen[, outbuf [, opts]]) ⇒ String
Similar to #read, but raises EOFError
at end of string unless the exception: false option is passed in.
487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 |
# File 'ext/mpipe/mpipe.c', line 487
static VALUE
mp_read_nonblock(int argc, VALUE *argv, VALUE self)
{
struct MPipe *ptr = MPipe(self);
int outbuf_pos = 0;
int max_len;
VALUE maxlen = Qnil;
VALUE outbuf = Qnil;
VALUE opts = Qnil;
rb_scan_args(argc, argv, "11:", &maxlen, &outbuf, &opts);
max_len = NUM2INT(maxlen);
if (max_len < 0) {
rb_raise(rb_eArgError, "negative length %d given", max_len);
}
if (NIL_P(outbuf)) {
outbuf = rb_str_new(0, 0);
}
rb_str_resize(outbuf, max_len);
if (maxlen == 0) {
return outbuf;
}
if (ptr->recv_count == -1) { // requesting
if (call_test(ptr) == 0) {
if (!NIL_P(opts) && rb_hash_lookup2(opts, sym_exception, Qundef) == Qfalse) {
return Qnil;
} else {
rb_raise(eEAGAINWaitReadable,"MPI_Irecv would block");
}
}
}
if (ptr->recv_count > 0) {
outbuf_pos += copy_substr(ptr, max_len, outbuf, outbuf_pos);
}
while (outbuf_pos < max_len) {
call_irecv(ptr);
if (call_test(ptr) == 0) {
if (outbuf_pos > 0) {
rb_str_resize(outbuf, outbuf_pos);
return outbuf;
} else
if (!NIL_P(opts) && rb_hash_lookup2(opts, sym_exception, Qundef) == Qfalse) {
return Qnil;
} else {
rb_raise(eEAGAINWaitReadable,"MPI_Irecv would block");
}
}
outbuf_pos += copy_substr(ptr, max_len, outbuf, outbuf_pos);
}
return outbuf;
}
|
#test_recv ⇒ Boolean
calls MPI_Test
551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 |
# File 'ext/mpipe/mpipe.c', line 551
static VALUE
mp_test_recv(VALUE self)
{
struct MPipe *ptr = MPipe(self);
call_irecv(ptr);
if (ptr->recv_count == -1) { // requesting
call_test(ptr);
}
if (ptr->recv_count > 0) {
return Qtrue;
} else {
return Qfalse;
}
}
|
#write(str) ⇒ Object
334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 |
# File 'ext/mpipe/mpipe.c', line 334
static VALUE
mp_write(VALUE self, VALUE str)
{
struct MPipe *ptr = MPipe(self);
int istat;
int pos, count;
str = StringValue(str);
pos = 0;
while (pos < RSTRING_LEN(str)) {
count = RSTRING_LEN(str) - pos;
if (count > mp_buffer_size) {
count = mp_buffer_size;
}
memcpy(ptr->send_buffer, RSTRING_PTR(str)+pos, count);
istat = MPI_Send(ptr->send_buffer, count, MPI_CHAR, ptr->rank,
0, MPI_COMM_WORLD);
if (istat != MPI_SUCCESS) {
rb_raise(rb_eStandardError,"MPI_Send failed with status=%d\n",istat);
}
pos += count;
}
return INT2NUM(pos);
}
|
#write_nonblock(str) ⇒ Object
334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 |
# File 'ext/mpipe/mpipe.c', line 334
static VALUE
mp_write(VALUE self, VALUE str)
{
struct MPipe *ptr = MPipe(self);
int istat;
int pos, count;
str = StringValue(str);
pos = 0;
while (pos < RSTRING_LEN(str)) {
count = RSTRING_LEN(str) - pos;
if (count > mp_buffer_size) {
count = mp_buffer_size;
}
memcpy(ptr->send_buffer, RSTRING_PTR(str)+pos, count);
istat = MPI_Send(ptr->send_buffer, count, MPI_CHAR, ptr->rank,
0, MPI_COMM_WORLD);
if (istat != MPI_SUCCESS) {
rb_raise(rb_eStandardError,"MPI_Send failed with status=%d\n",istat);
}
pos += count;
}
return INT2NUM(pos);
}
|