[boinc-app-seti] 01/05: Imported Upstream version 7.28~svn2781
Gianfranco Costamagna
locutusofborg-guest at moszumanska.debian.org
Mon Feb 23 14:19:47 UTC 2015
This is an automated email from the git hooks/post-receive script.
locutusofborg-guest pushed a commit to branch master
in repository boinc-app-seti.
commit 9088dc52774c23b54a4f05d00d22f49912d1fe50
Author: Gianfranco Costamagna <costamagnagianfranco at yahoo.it>
Date: Mon Feb 23 14:31:53 2015 +0100
Imported Upstream version 7.28~svn2781
---
splitter_pfb/Makefile.am | 1 +
splitter_pfb/mb_read_blocks_dr2.cpp | 133 ++++++++++
splitter_pfb/mb_read_blocks_dr2.h | 7 +
splitter_pfb/mb_splitter.cpp | 510 ++++++++++++++----------------------
tools/Makefile.am | 17 +-
tools/readwu.cpp | 161 ++++++++++++
tools/readwu.dlm | 5 +
7 files changed, 515 insertions(+), 319 deletions(-)
diff --git a/splitter_pfb/Makefile.am b/splitter_pfb/Makefile.am
index 990f9ef..eb5fd96 100644
--- a/splitter_pfb/Makefile.am
+++ b/splitter_pfb/Makefile.am
@@ -35,6 +35,7 @@ noinst_PROGRAMS = mb_splitter
mb_splitter_SOURCES=mb_angdist.cpp \
mb_message.cpp \
mb_splitter.cpp \
+ mb_read_blocks_dr2.cpp \
mb_wufiles.cpp \
mb_dotransform.cpp \
mb_validrun.cpp \
diff --git a/splitter_pfb/mb_read_blocks_dr2.cpp b/splitter_pfb/mb_read_blocks_dr2.cpp
new file mode 100644
index 0000000..20f5615
--- /dev/null
+++ b/splitter_pfb/mb_read_blocks_dr2.cpp
@@ -0,0 +1,133 @@
+#include "boinc_db.h"
+#include "backend_lib.h"
+#include "setilib.h"
+#include "mb_splitter.h"
+#include "mb_validrun.h"
+
+//-------------------------------------------------------------------------
+int read_blocks_dr2(int tape_fd,
+ long startblock,
+ long num_blocks_to_read,
+ int beam,
+ int pol,
+ int vflag) {
+//-------------------------------------------------------------------------
+
+ static bool first_time = true;
+ blanking_filter<complex<signed char> > blanker_filter;
+ int num_blocks_read;
+ int good_read = 0; // assume bad read until proven otherwise
+
+ if(first_time) {
+ first_time = false;
+ // set up blanking filter
+ if (strcmp(splitter_settings.splitter_cfg->blanker_filter, "randomize") == 0) {
+ log_messages.printf(SCHED_MSG_LOG::MSG_DEBUG,"Setting blanker filter to randomize\n" );
+ blanker_filter = randomize;
+ } else {
+ log_messages.printf(SCHED_MSG_LOG::MSG_DEBUG,"Setting blanker filter to null\n" );
+ blanker_filter = NULLC; // no blanking
+ }
+ }
+
+ num_blocks_read = seti_StructureDr2Data(tape_fd, beam, pol,
+ num_blocks_to_read,
+ tapebuffer, blanker_filter);
+ if(num_blocks_read == num_blocks_to_read) {
+ //check for an uninterrupted run
+ if (valid_run(tapebuffer,splitter_settings.receiver_cfg->min_vgc)) {
+ std::vector<dr2_compact_block_t>::iterator i=tapebuffer.begin();
+ // insert telescope coordinates into the coordinate history.
+ // this should be converted to a more accurate routine.
+ for (;i!=tapebuffer.end();i++) {
+ coord_history[i->header.coord_time].ra = i->header.ra;
+ coord_history[i->header.coord_time].dec = i->header.dec;
+ coord_history[i->header.coord_time].time = i->header.coord_time.jd().uval();
+ }
+ good_read = 1;
+ }
+ }
+ return(good_read);
+}
+
+//-------------------------------------------------------------------------
+int find_start_point_dr2(int tape_fd, int beam, int pol) {
+//-------------------------------------------------------------------------
+ char buf[1024];
+
+ // In early tapes there was a bug where the first N blocks would be duplicates
+ // of data from previous files. So we do a preemptive fast forward until we see a
+ // frame sequence number of 1.
+ int i,readbytes=HeaderSize;
+ dataheader_t header;
+ header.frameseq=100000;
+
+ while ((readbytes==HeaderSize) && (header.frameseq>10)) {
+ char buffer[HeaderSize];
+ int nread;
+ readbytes=0;
+ while ((readbytes!=HeaderSize) && (nread = read(tape_fd,buffer,HeaderSize-readbytes))) {
+ readbytes+=nread;
+ }
+ if (nread < 0) {
+ log_messages.printf(SCHED_MSG_LOG::MSG_CRITICAL,"File error %d.\n", errno);
+ exit(1);
+ }
+ if (readbytes == HeaderSize) {
+ header.populate_from_data(buffer);
+ if (header.frameseq>10) {
+ lseek64(tape_fd,DataSize,SEEK_CUR);
+ } else {
+ lseek64(tape_fd,-1*(off64_t)HeaderSize,SEEK_CUR);
+ }
+ }
+ }
+
+ if (readbytes != HeaderSize) {
+ // we fast forwarded through the entire tape without finding the first frame
+ // maybe this is one of the really early tapes that was split into chunks.
+ lseek64(tape_fd,0,SEEK_SET);
+ log_messages.printf(SCHED_MSG_LOG::MSG_NORMAL,"Warning: First block not found\n");
+ }
+ // End preemptive fast forward
+
+ // Optionally fast forward to the point of resumption
+ if (resumetape) {
+ tape thistape;
+ thistape.id=0;
+ readbytes=HeaderSize;
+ sprintf(buf,"%d",rcvr.s4_id-AO_ALFA_0_0);
+ if (thistape.fetch(std::string("where name=\'")+header.name+"\' and beam="+buf)) {
+ log_messages.printf(SCHED_MSG_LOG::MSG_DEBUG,"Resuming tape %s beam %d pol %d\n",thistape.name,beam,pol );
+ while ((readbytes==HeaderSize) && (header.dataseq!=thistape.last_block_done)) {
+ int nread=0;
+ char buffer[HeaderSize];
+ readbytes=0;
+ while ((readbytes!=HeaderSize) &&
+ ((nread = read(tape_fd,buffer,HeaderSize-readbytes)) > 0 )) {
+ readbytes+=nread;
+ }
+ if (readbytes == HeaderSize) {
+ header.populate_from_data(buffer);
+ if (header.dataseq!=thistape.last_block_done) {
+ lseek64(tape_fd,(off64_t)(DataSize+HeaderSize)*(thistape.last_block_done-header.dataseq)-HeaderSize,SEEK_CUR);
+ } else {
+ lseek64(tape_fd,-1*(off_t)HeaderSize,SEEK_CUR);
+ log_messages.printf(SCHED_MSG_LOG::MSG_NORMAL,"Found starting point");
+ }
+ }
+ if (nread == 0) {
+ log_messages.printf(SCHED_MSG_LOG::MSG_CRITICAL,"End of file.\n");
+ exit(0);
+ }
+ if (nread < 0) {
+ log_messages.printf(SCHED_MSG_LOG::MSG_CRITICAL,"File error %d.\n",errno);
+ exit(1);
+ }
+ }
+ }
+ }
+ // End fast forward to the point of resumption
+
+ return 0;
+}
diff --git a/splitter_pfb/mb_read_blocks_dr2.h b/splitter_pfb/mb_read_blocks_dr2.h
new file mode 100644
index 0000000..f1a6364
--- /dev/null
+++ b/splitter_pfb/mb_read_blocks_dr2.h
@@ -0,0 +1,7 @@
+int read_blocks_dr2(int tape_fd,
+ long startblock,
+ long num_blocks_to_read,
+ int beam,
+ int pol,
+ int vflag);
+int find_start_point_dr2(int tape_fd, int beam, int pol);
diff --git a/splitter_pfb/mb_splitter.cpp b/splitter_pfb/mb_splitter.cpp
index 6479b5c..0b9714d 100644
--- a/splitter_pfb/mb_splitter.cpp
+++ b/splitter_pfb/mb_splitter.cpp
@@ -27,6 +27,7 @@
#include "splitparms.h"
#include "splittypes.h"
#include "mb_splitter.h"
+#include "mb_read_blocks_dr2.h"
#include "mb_validrun.h"
#include "mb_wufiles.h"
#include "mb_dotransform.h"
@@ -41,8 +42,10 @@ extern "C" {
int sqldetach();
}
+// Global Variables ---------------------------------------------------
char trigger_file_path[1024]="/disks/setifiler1/wutape/tapedir/splitter_stop";
+APP_CONFIG sah_config;
SCHED_CONFIG boinc_config;
DB_APP app;
R_RSA_PRIVATE_KEY key;
@@ -113,18 +116,11 @@ unsigned long minvfsbuf=-1;
char wd[1024];
//char * scidb = NULL;
char * projectdir = NULL;
-
-APP_CONFIG sah_config;
-
//const char *result_template_filename="projectdir"/"SAH_APP_NAME"_result.tpl";
char result_template_filename[1024];
char result_template_filepath[1024];
char wu_template_filename[1024];
-
-int check_for_halt();
-int wait_until_night();
-int check_free_disk_space();
-int wait_for_db_wus_ondisk();
+// End Global Variables ---------------------------------------------------
void cprint(char *p) {
printf("%s\n",p);
@@ -138,7 +134,9 @@ buffer_pos_t start_of_wu,end_of_wu; /* position of start and end of wu in
tape buffer */
int seqno;
+//-------------------------------------------------------------------------
void cleanup(void) {
+//-------------------------------------------------------------------------
FILE *tmpfile;
if ((tmpfile=fopen("seqno.dat","w"))) {
fprintf(tmpfile,"%d\n",seqno);
@@ -149,6 +147,7 @@ void cleanup(void) {
}
+//-------------------------------------------------------------------------
int process_command_line(int argc, char *argv[],char **tape_device,
int *norewind, int *startblock, int *resumetape,
int *nodb, int *dataclass, int *atnight,
@@ -156,6 +155,7 @@ int process_command_line(int argc, char *argv[],char **tape_device,
int *iters, int *beam, int *pol, int *alfa,
int *useanalysiscfgid, int *usereceivercfgid,
int *userecordercfgid, int *usesplittercfgid) {
+//-------------------------------------------------------------------------
int nargs=0,i;
char *ep;
strcpy(appname,SAH_APP_NAME);
@@ -260,52 +260,14 @@ int process_command_line(int argc, char *argv[],char **tape_device,
return 0;
}
-int main(int argc, char *argv[]) {
- int tape_fd;
- char *tape_device;
+//-------------------------------------------------------------------------
+int get_config(telescope_id &tel) {
+//-------------------------------------------------------------------------
FILE *tmpfile;
- int retval;
char keyfile[1024];
- char tmpstr[1024];
char buf[1024];
-
- /* Process command line arguments */
- if (process_command_line(argc,argv,&tape_device,&norewind,&startblock,&resumetape,
- &nodb,&dataclass,&atnight,&max_wus_ondisk,&projectdir,&iters,
- &beam,&pol,&alfa,&useanalysiscfgid,&usereceivercfgid,
- &userecordercfgid,&usesplittercfgid)) {
- fprintf(stderr,"Usage: splitter tape_device -projectdir=s [-atnight] [-nodb]\n"
- "[-xml] [-gregorian] [-resumetape | -norewind | -startblock=n] [-dataclass=n]\n"
- "[-max_wus_on_disk=n] [-iterations=n] [-trigger_file_path=filename]\n"
- "[-alfa=beam,pol] [-analysis_config=id] [-receiver_config=id]\n"
- "[-recorder_config=id] [-splitter_config=id]\n");
- exit(EXIT_FAILURE);
- }
-
- // MATTL
- if (blanking_bit == SOFTWARE_BLANKING_BIT) log_messages.printf(SCHED_MSG_LOG::MSG_NORMAL,"blanking bit: %d (SOFTWARE)\n",blanking_bit);
- else log_messages.printf(SCHED_MSG_LOG::MSG_NORMAL,"blanking bit: %d (HARDWARE)\n",blanking_bit);
-
- /* Open log files */
- log_messages.set_debug_level(3);
-
- retval = sah_config.parse_file(projectdir);
- if (retval) {
- log_messages.printf(SCHED_MSG_LOG::MSG_CRITICAL,"Can't parse config file\n");
- exit(EXIT_FAILURE);
- } else {
- log_messages.printf(SCHED_MSG_LOG::MSG_NORMAL,"Using configuration:\n");
- log_messages.printf(SCHED_MSG_LOG::MSG_NORMAL,"scidb_name = %s\n", sah_config.scidb_name);
- log_messages.printf(SCHED_MSG_LOG::MSG_NORMAL,"max_wus_ondisk = %d\n", sah_config.max_wus_ondisk);
- log_messages.printf(SCHED_MSG_LOG::MSG_NORMAL,"min_quorum = %d\n", sah_config.min_quorum);
- log_messages.printf(SCHED_MSG_LOG::MSG_NORMAL,"target_nresults = %d\n", sah_config.target_nresults);
- log_messages.printf(SCHED_MSG_LOG::MSG_NORMAL,"max_error_results = %d\n", sah_config.max_error_results);
- log_messages.printf(SCHED_MSG_LOG::MSG_NORMAL,"max_success_results = %d\n", sah_config.max_success_results);
- log_messages.printf(SCHED_MSG_LOG::MSG_NORMAL,"max_total_results = %d\n", sah_config.max_total_results);
- }
-
- // Will initially open
+ // Will initially open the science DB
if (!db_change(sah_config.scidb_name)) {
log_messages.printf(SCHED_MSG_LOG::MSG_CRITICAL,"Could not open science database %s\n", sah_config.scidb_name);
if (!nodb) exit(EXIT_FAILURE);
@@ -313,7 +275,6 @@ int main(int argc, char *argv[]) {
log_messages.printf(SCHED_MSG_LOG::MSG_NORMAL,"Using science database %s\n", sah_config.scidb_name);
}
-
boinc_config.parse_file(projectdir);
log_messages.printf(SCHED_MSG_LOG::MSG_NORMAL,"Using boinc config dir %s\n", projectdir);
log_messages.printf(SCHED_MSG_LOG::MSG_NORMAL,"Using boinc database %s\n", boinc_config.db_name);
@@ -354,8 +315,7 @@ int main(int argc, char *argv[]) {
boinc_db.close();
- telescope_id tel=
- channel_to_receiverid[beam_pol_to_channel[bmpol_t(beam,pol)]];
+ tel= channel_to_receiverid[beam_pol_to_channel[bmpol_t(beam,pol)]];
sprintf(buf,"where active=%d and receiver_cfg=(select id from receiver_config where s4_id=%d)",app.id,AO_ALFA_0_0);
if (usereceivercfgid > 0) {
@@ -401,273 +361,12 @@ int main(int argc, char *argv[]) {
exit(1);
}
- check_for_halt();
-
- if ((tape_fd=open(tape_device,O_RDONLY|0x2000, 0777))<0) {
- log_messages.printf(SCHED_MSG_LOG::MSG_CRITICAL,"Unable to open tape device\n");
- exit(EXIT_FAILURE);
- }
-
- // In early tapes there was a bug where the first N blocks would be duplicates
- // of data from previous files. So we need to fast forward until we see a
- // frame sequence number of 1.
- int i,readbytes=HeaderSize;
- dataheader_t header;
- header.frameseq=100000;
-
- while ((readbytes==HeaderSize) && (header.frameseq>10)) {
- char buffer[HeaderSize];
- int nread;
- readbytes=0;
- while ((readbytes!=HeaderSize) && (nread = read(tape_fd,buffer,HeaderSize-readbytes))) {
- readbytes+=nread;
- }
- if (nread < 0) {
- log_messages.printf(SCHED_MSG_LOG::MSG_CRITICAL,"File error %d.\n", errno);
- exit(1);
- }
- if (readbytes == HeaderSize) {
- header.populate_from_data(buffer);
- if (header.frameseq>10) {
- lseek64(tape_fd,DataSize,SEEK_CUR);
- } else {
- lseek64(tape_fd,-1*(off64_t)HeaderSize,SEEK_CUR);
- }
- }
- }
-
- if (readbytes != HeaderSize) {
- // we fast forwarded through the entire tape without finding the first frame
- // maybe this is one of the really early tapes that was split into chunks.
- lseek64(tape_fd,0,SEEK_SET);
- log_messages.printf(SCHED_MSG_LOG::MSG_NORMAL,"Warning: First block not found\n");
- }
-
- if (resumetape) {
- tape thistape;
- thistape.id=0;
- readbytes=HeaderSize;
- sprintf(buf,"%d",rcvr.s4_id-AO_ALFA_0_0);
- if (thistape.fetch(std::string("where name=\'")+header.name+"\' and beam="+buf)) {
- log_messages.printf(SCHED_MSG_LOG::MSG_DEBUG,"Resuming tape %s beam %d pol %d\n",thistape.name,beam,pol );
- while ((readbytes==HeaderSize) && (header.dataseq!=thistape.last_block_done)) {
- int nread=0;
- char buffer[HeaderSize];
- readbytes=0;
- while ((readbytes!=HeaderSize) &&
- ((nread = read(tape_fd,buffer,HeaderSize-readbytes)) > 0 )) {
- readbytes+=nread;
- }
- if (readbytes == HeaderSize) {
- header.populate_from_data(buffer);
- if (header.dataseq!=thistape.last_block_done) {
- lseek64(tape_fd,(off64_t)(DataSize+HeaderSize)*(thistape.last_block_done-header.dataseq)-HeaderSize,SEEK_CUR);
- } else {
- lseek64(tape_fd,-1*(off_t)HeaderSize,SEEK_CUR);
- log_messages.printf(SCHED_MSG_LOG::MSG_NORMAL,"Found starting point");
- }
- }
- if (nread == 0) {
- log_messages.printf(SCHED_MSG_LOG::MSG_CRITICAL,"End of file.\n");
- exit(0);
- }
- if (nread < 0) {
- log_messages.printf(SCHED_MSG_LOG::MSG_CRITICAL,"File error %d.\n",errno);
- exit(1);
- }
- }
- }
- }
-
- /* Start of main loop */
-
-
- atexit(cleanup);
-
- getcwd(wd,1024);
-
- if (polyphase) {
- log_messages.printf(SCHED_MSG_LOG::MSG_CRITICAL,"Polyphase not implemented\n" );
- exit(1);
-#if 0
- int filter_len;
-
- filter_len = P_FFT_LEN*N_WINDOWS;
-
- filter_r = (double *)malloc(sizeof(double)*filter_len);
- filter_i = (double *)malloc(sizeof(double)*filter_len);
-
- f_data = (float *)malloc(sizeof(float)*P_FFT_LEN*2);
-
- make_FIR(filter_len, N_WINDOWS, filter_window, filter_r);
- make_FIR(filter_len, N_WINDOWS, filter_window, filter_i);
-
- /*
- int i;
- float *data;
- data = (float*)malloc(sizeof(float)*2*2048);
- for (double n=-20;n<20;n+=.05) {
- for (i=0;i<2*2048;i+=2) {
- data[i] = cos(2*(24+n)*3.1415926535*i/(2*2048));
- data[i+1] = sin(2*(24+n)*3.1415926535*i/(2*2048));
- }
- polyphase_seg(data);
- //fprintf( stderr, "%f\n", 3+n/8);
- }
- exit(0);
- for(i=0;i<filter_len;i++)
- printf( "%f\n", filter_r[i] );
- exit(0);*/
-#endif
- }
- log_messages.printf(SCHED_MSG_LOG::MSG_DEBUG,"Entering loop\n" );
-
- blanking_filter<complex<signed char> > blanker_filter;
- if (strcmp(splitter_settings.splitter_cfg->blanker_filter, "randomize") == 0) {
- log_messages.printf(SCHED_MSG_LOG::MSG_DEBUG,"Setting blanker filter to randomize\n" );
- blanker_filter = randomize;
- } else {
- log_messages.printf(SCHED_MSG_LOG::MSG_DEBUG,"Setting blanker filter to null\n" );
- blanker_filter = NULLC; // no blanking
- }
- while (iters-- &&
- ((512-tapebuffer.size())==seti_StructureDr2Data(tape_fd, beam, pol,
- 512-tapebuffer.size(), tapebuffer, blanker_filter))) {
- /* check if we should be running now */
- fflush(stderr);
- if (atnight) wait_until_night();
-
- sprintf(buf,"where active=%d and receiver_cfg=(select id from receiver_config where s4_id=%d)",app.id,AO_ALFA_0_0);
- if (usereceivercfgid > 0) {
- sprintf(buf,"where active=%d and receiver_cfg=%d",app.id,usereceivercfgid);
- }
- if (!splitter_settings.fetch(std::string(buf))) {
- log_messages.printf(SCHED_MSG_LOG::MSG_CRITICAL,"Unable to find active settings for app.id=%d\n",app.id);
- exit(1);
- }
-
- sprintf(buf,"where s4_id=%d and id>=%d",tel,splitter_settings.receiver_cfg.id);
- if (usereceivercfgid > 0) { sprintf(buf,"where id=%d",usereceivercfgid); }
- rcvr.fetch(buf);
- if (usereceivercfgid > 0) {
- log_messages.printf(SCHED_MSG_LOG::MSG_NORMAL,"Using receiver cfg id: %d (set by user)\n",usereceivercfgid);
- splitter_settings.receiver_cfg = usereceivercfgid;
- splitter_settings.receiver_cfg->fetch();
- } else {
- splitter_settings.receiver_cfg->fetch(buf);
- }
- if (userecordercfgid > 0) {
- log_messages.printf(SCHED_MSG_LOG::MSG_NORMAL,"Using recorder cfg id: %d (set by user)\n",userecordercfgid);
- splitter_settings.recorder_cfg = userecordercfgid;
- }
- splitter_settings.recorder_cfg->fetch();
- if (usesplittercfgid > 0) {
- log_messages.printf(SCHED_MSG_LOG::MSG_NORMAL,"Using splitter cfg id: %d (set by user)\n",usesplittercfgid);
- splitter_settings.splitter_cfg = usesplittercfgid;
- }
- splitter_settings.splitter_cfg->fetch();
- if (useanalysiscfgid > 0) {
- log_messages.printf(SCHED_MSG_LOG::MSG_NORMAL,"Using analysis cfg id: %d (set by user)\n",useanalysiscfgid);
- splitter_settings.analysis_cfg = useanalysiscfgid;
- }
- splitter_settings.analysis_cfg->fetch();
-
- log_messages.printf(SCHED_MSG_LOG::MSG_DEBUG, "settings: \n\n%s\n", splitter_settings.print_xml(1,1,0).c_str());
-
- check_for_halt();
-
- // Make sure we have enough free disk space
- check_free_disk_space();
-
- // Wait for ondisk wus in the database to drop below the threshold
- if (!nodb) wait_for_db_wus_ondisk();
- //fprintf( stderr, "Read data\n" );
- log_messages.printf(SCHED_MSG_LOG::MSG_NORMAL,"less than max wus on disk, continuing\n" );
- fflush(stderr);
-
- //printf( "Read tape data, analyzing\n" );
- check_for_halt();
-
- //check for an uninterrupted run
- if (valid_run(tapebuffer,splitter_settings.receiver_cfg->min_vgc)) {
- std::vector<dr2_compact_block_t>::iterator i=tapebuffer.begin();
- // insert telescope coordinates into the coordinate history.
- // this should be converted to a more accurate routine.
- for (;i!=tapebuffer.end();i++) {
- coord_history[i->header.coord_time].ra = i->header.ra;
- coord_history[i->header.coord_time].dec = i->header.dec;
- coord_history[i->header.coord_time].time = i->header.coord_time.jd().uval();
- }
- if (make_wu_headers(tapebuffer,tel,wuheaders)) {
- int child_pid=-1;
- switch (polyphase) {
- case 1:
- #if 0
- do_polyphase(&start_of_wu,&end_of_wu);
- #endif
- break;
- default:
- log_messages.printf(SCHED_MSG_LOG::MSG_DEBUG,"doing transform..." );
- do_transform(tapebuffer);
- log_messages.printf(SCHED_MSG_LOG::MSG_DEBUG," done\n" );
- }
- wait(0);
- if (!nodb) sql_finish();
-
- do {
- sleep(1);
- child_pid=fork();
- if (child_pid<0) log_messages.printf(SCHED_MSG_LOG::MSG_CRITICAL,"splitter cannot fork");
- //fprintf( stderr, "child pid: %d\n", child_pid);
- } while (child_pid<0);
-
- if (!child_pid) {
- if (!nodb) {
- sqldetach();
- while (!sql_database(sah_config.scidb_name)) {
- //fprintf(stderr,"child sleeping\n");
- sleep(10);
- }
- }
- rename_wu_files();
- if (!nodb) sql_finish();
- _exit(0);
- }
-
- if (!nodb) {
- while (!sql_database(sah_config.scidb_name)) {
- //fprintf(stderr,"parent sleeping\n");
- sleep(10);
- }
- }
- }
- }
- }
-
- // iters is initalized to -1. If the -iters flag is specified on the command
- // line, iters is re-initialized to the user desired number of interations.
- // Each time a WUG is proccessed iters is decremineted. If the user specifies
- // iters and we do that number of iterations without reaching EOF then iters
- // here will be zero. !Zero means we have reached EOF prior to performing the user
- // desired number of iterations.
- //
- // On the other hand, if iters is not specified by the user, iters is decrmented
- // downward from -1. It will never be zero in this case. But since we are not
- // limiting the number of iterations, the only way will get here is if we reach EOF.
- //
- // Thus in both cases, a non-zero iters means we have reached EOF.
- if (iters != 0) {
- log_messages.printf(SCHED_MSG_LOG::MSG_CRITICAL,"End of file.\n");
- // clean stop at EOF
- return (EXIT_NORMAL_EOF);
- } else {
- // clean stop, but not EOF (iters satisfied or triggered stop)
- // (A return of 1 is an error exit somewhere else in the code.)
- return(EXIT_NORMAL_NOT_EOF);
- }
+ return 0;
}
+//-------------------------------------------------------------------------
int check_for_halt() {
+//-------------------------------------------------------------------------
FILE *tf;
tf = fopen(trigger_file_path, "r");
@@ -679,7 +378,9 @@ int check_for_halt() {
return(0); // Keep going
}
+//-------------------------------------------------------------------------
int wait_until_night() {
+//-------------------------------------------------------------------------
time_t t;
struct tm *lt;
do {
@@ -693,7 +394,9 @@ int wait_until_night() {
return 0;
}
+//-------------------------------------------------------------------------
int check_free_disk_space() {
+//-------------------------------------------------------------------------
struct statvfs vfsbuf;
/* check disk free space in working directory */
@@ -724,7 +427,9 @@ int check_free_disk_space() {
return 0;
}
+//-------------------------------------------------------------------------
int wait_for_db_wus_ondisk() {
+//-------------------------------------------------------------------------
//int wus_ondisk,rv;
DB_STATE_COUNTS state_counts;
int retval;
@@ -766,7 +471,6 @@ int wait_for_db_wus_ondisk() {
log_messages.printf(SCHED_MSG_LOG::MSG_NORMAL,"%d WUs ondisk which is greater than the max of %d\n",
state_counts.result_server_state_2, sah_config.max_wus_ondisk);
if (state_counts.result_server_state_2>sah_config.max_wus_ondisk) sleep(600);
-
#if 0
sprintf(query,"where appid=%d and server_state=2",app.id);
rv=boinc_result.count(wus_ondisk,query);
@@ -787,6 +491,176 @@ int wait_for_db_wus_ondisk() {
return 0;
}
+//-------------------------------------------------------------------------
+int main(int argc, char *argv[]) {
+//-------------------------------------------------------------------------
+ int tape_fd;
+ char *tape_device;
+ FILE *tmpfile;
+ int retval;
+ char keyfile[1024];
+ char tmpstr[1024];
+ char buf[1024];
+ telescope_id tel;
+
+ int good_read;
+ long num_blocks_read;
+ int vflag;
+ enum file_type_t {dr2, guppi};
+ int file_type = 0; // default to dr2 for now
+
+
+ /* Process command line arguments */
+ if (process_command_line(argc,argv,&tape_device,&norewind,&startblock,&resumetape,
+ &nodb,&dataclass,&atnight,&max_wus_ondisk,&projectdir,&iters,
+ &beam,&pol,&alfa,&useanalysiscfgid,&usereceivercfgid,
+ &userecordercfgid,&usesplittercfgid)) {
+ fprintf(stderr,"Usage: splitter tape_device -projectdir=s [-atnight] [-nodb]\n"
+ "[-xml] [-gregorian] [-resumetape | -norewind | -startblock=n] [-dataclass=n]\n"
+ "[-max_wus_on_disk=n] [-iterations=n] [-trigger_file_path=filename]\n"
+ "[-alfa=beam,pol] [-analysis_config=id] [-receiver_config=id]\n"
+ "[-recorder_config=id] [-splitter_config=id]\n");
+ exit(EXIT_FAILURE);
+ }
+
+ // MATTL
+ if (blanking_bit == SOFTWARE_BLANKING_BIT) log_messages.printf(SCHED_MSG_LOG::MSG_NORMAL,"blanking bit: %d (SOFTWARE)\n",blanking_bit);
+ else log_messages.printf(SCHED_MSG_LOG::MSG_NORMAL,"blanking bit: %d (HARDWARE)\n",blanking_bit);
+
+ /* Open log files */
+ log_messages.set_debug_level(3);
+
+ retval = sah_config.parse_file(projectdir);
+ if (retval) {
+ log_messages.printf(SCHED_MSG_LOG::MSG_CRITICAL,"Can't parse config file\n");
+ exit(EXIT_FAILURE);
+ } else {
+ log_messages.printf(SCHED_MSG_LOG::MSG_NORMAL,"Using configuration:\n");
+ log_messages.printf(SCHED_MSG_LOG::MSG_NORMAL,"scidb_name = %s\n", sah_config.scidb_name);
+ log_messages.printf(SCHED_MSG_LOG::MSG_NORMAL,"max_wus_ondisk = %d\n", sah_config.max_wus_ondisk);
+ log_messages.printf(SCHED_MSG_LOG::MSG_NORMAL,"min_quorum = %d\n", sah_config.min_quorum);
+ log_messages.printf(SCHED_MSG_LOG::MSG_NORMAL,"target_nresults = %d\n", sah_config.target_nresults);
+ log_messages.printf(SCHED_MSG_LOG::MSG_NORMAL,"max_error_results = %d\n", sah_config.max_error_results);
+ log_messages.printf(SCHED_MSG_LOG::MSG_NORMAL,"max_success_results = %d\n", sah_config.max_success_results);
+ log_messages.printf(SCHED_MSG_LOG::MSG_NORMAL,"max_total_results = %d\n", sah_config.max_total_results);
+ }
+
+ get_config(tel);
+
+ check_for_halt();
+
+ if ((tape_fd=open(tape_device,O_RDONLY|0x2000, 0777))<0) {
+ log_messages.printf(SCHED_MSG_LOG::MSG_CRITICAL,"Unable to open tape device\n");
+ exit(EXIT_FAILURE);
+ }
+
+ if(file_type == dr2) {
+ find_start_point_dr2(tape_fd, beam, pol);
+ } else if(file_type == guppi) {
+ log_messages.printf(SCHED_MSG_LOG::MSG_CRITICAL,"guppi file type not implemented");
+ exit(1);
+ } else {
+ log_messages.printf(SCHED_MSG_LOG::MSG_CRITICAL,"invalid file type");
+ exit(1);
+ }
+
+ atexit(cleanup);
+ getcwd(wd,1024);
+
+ log_messages.printf(SCHED_MSG_LOG::MSG_DEBUG,"Entering main loop\n" );
+ // Here is where we read the data that will be processed into WUGs and WUs.
+ // seti_StructureDr2Data() handles both the reading of header and data.
+ // iters will be a positve number if the command line specified a certain
+ // number of iterations. Otherwise, it is initialized to -1 and will thus
+ // always be true.
+ // We always want tapebuffer to be 512 blocks in size upon return from
+ // seti_StructureDr2Data(). If this is not the case then we have hit
+ // EOF or a read error and it time to exit. Otherwise, tapebuffer.size()
+ // will change during the body of this loop in one of two ways:
+ // - valid_run() will erase invalid elements of tapebuffer.
+ // - do_transform() will erase from the start of tapebuffer up to
+ // what is needed for WU overlap.
+ while (iters--) {
+
+ // check, and maaybe wait on, various run conditions
+ fflush(stderr); // TODO why are we fflushing stderr?
+ if (atnight) wait_until_night();
+ check_for_halt();
+ check_free_disk_space(); // Make sure we have enough free disk space
+ if (!nodb) wait_for_db_wus_ondisk(); // Wait for ondisk wus in the database to drop below the threshold
+ log_messages.printf(SCHED_MSG_LOG::MSG_NORMAL,"less than max wus on disk, continuing\n" );
+ check_for_halt();
+ fflush(stderr);
+ // End check, and maybe wait on, various run conditions
+
+ if(file_type == dr2) {
+ good_read = read_blocks_dr2(tape_fd,
+ startblock, // long int
+ 512-tapebuffer.size(), // the 512 allows for WU time overlap
+ beam,
+ pol,
+ vflag);
+ } else if(file_type == guppi) {
+ log_messages.printf(SCHED_MSG_LOG::MSG_CRITICAL,"guppi file type not implemented");
+ exit(1);
+ } else {
+ log_messages.printf(SCHED_MSG_LOG::MSG_CRITICAL,"invalid file type");
+ exit(1);
+ }
+
+ if(!good_read) {
+ break; // drop out of main loop on first bad read
+ } else {
+ if (make_wu_headers(tapebuffer,tel,wuheaders)) {
+ int child_pid=-1;
+ log_messages.printf(SCHED_MSG_LOG::MSG_DEBUG,"doing transform..." );
+ do_transform(tapebuffer);
+ log_messages.printf(SCHED_MSG_LOG::MSG_DEBUG," done\n" );
+ wait(0);
+ if (!nodb) sql_finish();
+
+ do {
+ sleep(1);
+ child_pid=fork();
+ if (child_pid<0) log_messages.printf(SCHED_MSG_LOG::MSG_CRITICAL,"splitter cannot fork");
+ //fprintf( stderr, "child pid: %d\n", child_pid);
+ } while (child_pid<0);
+
+ if (!child_pid) {
+ if (!nodb) {
+ sqldetach();
+ while (!sql_database(sah_config.scidb_name)) {
+ //fprintf(stderr,"child sleeping\n");
+ sleep(10);
+ }
+ }
+ rename_wu_files();
+ if (!nodb) sql_finish();
+ _exit(0);
+ }
+
+ if (!nodb) {
+ while (!sql_database(sah_config.scidb_name)) {
+ //fprintf(stderr,"parent sleeping\n");
+ sleep(10);
+ }
+ }
+ } // End if(make_wu_headers())
+ } // End if(good_read)
+ } // End main while() loop
+
+ // A non-zero iters always means we have reached EOF (or had a read error).
+ if (iters != 0) {
+ log_messages.printf(SCHED_MSG_LOG::MSG_CRITICAL,"End of file.\n");
+ // clean stop at EOF
+ return (EXIT_NORMAL_EOF);
+ } else {
+ // clean stop, but not EOF (iters satisfied or triggered stop)
+ // (A return of 1 is an error exit somewhere else in the code.)
+ return(EXIT_NORMAL_NOT_EOF);
+ }
+} // End main()
+
/*
* $Log: mb_splitter.cpp,v $
* Revision 1.1.2.6 2007/08/16 23:03:19 jeffc
diff --git a/tools/Makefile.am b/tools/Makefile.am
index 5bef77d..e4a45ee 100644
--- a/tools/Makefile.am
+++ b/tools/Makefile.am
@@ -18,7 +18,7 @@ CLIENT_C_FLAGS = $(CFLAGS) \
CLIENT_LD_FLAGS = $(PTHREAD_CFLAGS) $(LDFLAGS) $(PTHREAD_LIBS) $(BOINC_LIBS)
-noinst_PROGRAMS = fakedata workunit_resample
+noinst_PROGRAMS = fakedata workunit_resample readwu.so
fakedata_SOURCES = \
fakedata.cpp \
@@ -48,3 +48,18 @@ workunit_resample_CXXFLAGS = $(CLIENT_C_FLAGS)
workunit_resample_LDFLAGS = $(CLIENT_LD_FLAGS)
workunit_resample_LDADD = -lfftw3f
+readwu_so_SOURCES = \
+ readwu.cpp \
+ ../client/seti_header.cpp \
+ ../client/timecvt.cpp \
+ ../client/s_util.cpp \
+ ../db/schema_master.cpp \
+ ../db/sqlrow.cpp \
+ ../db/sqlblob.cpp \
+ ../db/xml_util.cpp
+
+readwu_so_CFLAGS = $(CLIENT_C_FLAGS) -fPIC -I/usr/local/rsi/idl/external
+readwu_so_CXXFLAGS = $(CLIENT_C_FLAGS) -fPIC -I/usr/local/rsi/idl/external
+readwu_so_LDFLAGS = $(CLIENT_LD_FLAGS) -shared -fPIC -Bsymbolic --warn-once
+readwu_so_LDADD = $(BOINCDIR)/lib/libboinc.a
+readwu_so_LINK = $(CXX) $(LDFLAGS) $(readwu_so_LDFLAGS) -o $@
diff --git a/tools/readwu.cpp b/tools/readwu.cpp
new file mode 100644
index 0000000..14d1c71
--- /dev/null
+++ b/tools/readwu.cpp
@@ -0,0 +1,161 @@
+// SETI_BOINC is free software; you can redistribute it and/or modify it under
+// the terms of the GNU General Public License as published by the Free
+// Software Foundation; either version 2, or (at your option) any later
+// version.
+
+// SETI_BOINC is distributed in the hope that it will be useful, but WITHOUT
+// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+// more details.
+
+// You should have received a copy of the GNU General Public License along
+// with SETI_BOINC; see the file COPYING. If not, write to the Free Software
+// Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+
+// In addition, as a special exception, the Regents of the University of
+// California give permission to link the code of this program with libraries
+// that provide specific optimized fast Fourier transform (FFT) functions and
+// distribute a linked executable. You must obey the GNU General Public
+// License in all respects for all of the code used other than the FFT library
+// itself. Any modification required to support these libraries must be
+// distributed in source code form. If you modify this file, you may extend
+// this exception to your version of the file, but you are not obligated to
+// do so. If you do not wish to do so, delete this exception statement from
+// your version.
+
+// workunit_resample - a program to read in a workunit and convert it to
+// real samples at twice the sampling rate with all frequencies shifted to
+// be positive.
+
+#include "sah_config.h"
+
+#include <cstdio>
+#ifdef HAVE_SYS_TYPES_H
+#include <sys/types.h>
+#endif
+#ifdef HAVE_SYS_STAT_H
+#include <sys/stat.h>
+#endif
+#ifdef HAVE_UNISTD_H
+#include <unistd.h>
+#endif
+#include <ctime>
+#include <cstdlib>
+#include <cstring>
+#include <string>
+#include <vector>
+#include "export.h"
+
+
+#ifdef HAVE_IEEEFP_H
+#include <ieeefp.h>
+#endif
+
+#include "client/timecvt.h"
+#include "client/s_util.h"
+#include "db/db_table.h"
+#include "db/schema_master.h"
+#include "seti.h"
+
+#define ARRLEN(arr) (sizeof(arr)/sizeof(arr[0]))
+
+workunit_header header;
+extern "C" {
+IDL_VPTR readwu(int argc, IDL_VPTR argv[], char *argk);
+void readwu_exit_handler(void) { }
+int IDL_Load(void);
+}
+
+static IDL_SYSFUN_DEF2 readwu_fns[] = {
+ {(IDL_VARIABLE *(*)())readwu, "READWU", 1, 1, IDL_SYSFUN_DEF_F_KEYWORDS, 0},
+};
+
+static IDL_MSG_DEF msg_arr[] = {
+ {"READWU_ERROR", "%NError: %s"},
+};
+
+static IDL_MSG_BLOCK readwu_msg_block;
+
+int readwu_startup(void) {
+ if (!IDL_SysRtnAdd(readwu_fns,TRUE,ARRLEN(readwu_fns))) {
+ return NULL;
+ }
+ IDL_ExitRegister(readwu_exit_handler);
+ return IDL_TRUE;
+}
+
+int IDL_Load(void) {
+ if (!(readwu_msg_block=IDL_MessageDefineBlock("readwu", ARRLEN(msg_arr),msg_arr))) {
+ return NULL;
+ }
+ if (!readwu_startup()) {
+ IDL_MessageFromBlock(readwu_msg_block,0,IDL_MSG_RET,"can't load readwu");
+ }
+ return IDL_TRUE;
+}
+
+
+
+IDL_VPTR readwu(int argc, IDL_VPTR argv[], char *argk) {
+ IDL_VPTR filename=NULL;
+ static IDL_VARIABLE rv;
+ rv.type=IDL_TYP_INT;
+ rv.flags=IDL_V_CONST|IDL_V_NULL;
+ rv.value.i=-1;
+
+ char *outfile=NULL, buf[256];
+ struct stat statbuf;
+ int nbytes,nread,nsamples;
+ std::string tmpbuf("");
+ int i=0,j;
+
+ if (argc != 1) {
+ fprintf(stderr,"argc=%d\n",argc);
+ fprintf(stderr,"array=readwu(wufile_name)\n");
+ return &rv;
+ }
+ IDL_STRING *infile=NULL;
+ if (argv[0]->type != IDL_TYP_STRING) {
+ IDL_MessageFromBlock(readwu_msg_block,0,IDL_MSG_RET,"Parameter 1 must be type STRING");
+ } else {
+ infile=(IDL_STRING *)(&argv[0]->value.s);
+ }
+ FILE *in=fopen(infile->s,"r");
+ if (!in) {
+ IDL_MessageFromBlock(readwu_msg_block,0,IDL_MSG_RET,"File not found");
+ return &rv;
+ }
+ stat(infile->s,&statbuf);
+ nbytes=statbuf.st_size;
+ fseek(in,0,SEEK_SET);
+ tmpbuf.reserve(nbytes);
+ // read entire file into a buffer.
+ while ((nread=(int)fread(buf,1,sizeof(buf),in))) {
+ tmpbuf+=std::string(&(buf[0]),nread);
+ }
+ // parse the header
+ header.parse_xml(tmpbuf);
+ // decode the data
+ std::vector<unsigned char> datav(
+ xml_decode_field<unsigned char>(tmpbuf,"data")
+ );
+ tmpbuf.clear();
+ nsamples=header.group_info->data_desc.nsamples;
+ nbytes=nsamples*header.group_info->recorder_cfg->bits_per_sample/8;
+ if (datav.size() < nbytes) {
+ fprintf(stderr,"Data size does not match number of samples\n");
+ return &rv;
+ }
+ // convert the data to floating point
+ sah_complex *fpdata=(sah_complex *)IDL_MemAlloc(nsamples*sizeof(sah_complex),0,IDL_MSG_RET);
+ if (!fpdata) {
+ fprintf(stderr,"Unable to allocate memory!\r\n");
+ return &rv;
+ }
+ bits_to_floats(&(datav[0]),fpdata,nsamples);
+ datav.clear();
+ IDL_MEMINT dims[]={nsamples};
+ return IDL_ImportArray(1,dims,IDL_TYP_COMPLEX,(UCHAR *)fpdata,NULL,NULL);
+}
+
+
diff --git a/tools/readwu.dlm b/tools/readwu.dlm
new file mode 100644
index 0000000..86dc205
--- /dev/null
+++ b/tools/readwu.dlm
@@ -0,0 +1,5 @@
+
+MODULE readwu
+DESCRIPTION reads data from a SETI at home Work Unit
+BUILD_DATE DECEMBER 2014
+FUNCTION READWU 1 1 KEYWORDS
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-boinc/boinc-app-seti.git
More information about the pkg-boinc-commits
mailing list