From: Peter Makholm on
Ben Morrow <ben(a)morrow.me.uk> writes:

> SGI::FAM only works under Irix. I've been meaning to port it to other
> systems that support fam (and gamin, the GNU rewrite) but haven't got
> round to it yet.

Never used the module myself (should have made that clear) and I have
to admit that my only reason to assume that it is usable on other
platforms is that File::Tail::FAM talk about Linux.

//Makholm
From: nolo contendere on
On Apr 23, 5:24 pm, xhos...(a)gmail.com wrote:
> nolo contendere <simon.c...(a)fmr.com> wrote:
> > Scenario:
> >      I am expecting 3 files in a drop directory. They won't
> > necessarily all arrive at the same time. I want to begin processing
> > the each file as soon as it arrives (or as close to arrival time as is
> > reasonable).
>
> What is the relationship between the 3 files?  Presumably, this whole
> thing will happen more than once, right, otherwise you wouldn't need
> to automate it?  So what is the difference between "3 files show up,
> and that happens 30 times" and just "90 files show up"?

The timing. But you do point out that I can move the glob logic into
each thread. Currently I have it outside the init of my $pm object.

>
> > Would the best way to go about this be to simply have a
> > script that takes a filename as a parameter and marks the file as
> > 'currently processing' when it begins to process the file (or could
> > move the file to a different directory)?
>
> > I could kick off 3 daemon processes looking in the drop directory, and
> > sleep every 5 secs, for instance.
>
> Do the file's contents show up atomically with the file's name?  If not,
> the process could see the file is there and start processing it, even
> though it is not completely written yet.
>
>

Yes. This is handled by a separate, asynchronous process.

>
> > That seems to me, to be a straightforward, if clumsy, approach. I was
> > wondering if there was a module that could accomplish this task more
> > elegantly--Parallel::ForkManager, at least in my experience, doesn't
> > seem entirely suited to this particular task.
>
> Why don't you think it is suited?  It seems well suited, unless there
> are details that I am missing (or maybe you are Windows or something where
> forking isn't as robust).
>
> my $pm=Parallel::ForkManager->new(3);
> foreach my $file (@ARGV) {
>   $pm->start() and next;
>   process($file);
>   $pm->finish();};
>
> $pm->wait_all_children();
>
> Where process() subroutine  first waits for the named $file to exist, then
> processes it.
>

This is what I have, and again, I think I just needed to move the glob
function ( get_files() below ) into each thread. I won't know the
exact filename beforehand, so can't pass that to the child process and
have it wait for it.

my $done = 0;
while ( is_before($stop_checking_time) && !$done ) {
get_files( $loadcount, \$filecount, \@files, \$num_threads );

print "About to process $class files...\n";

if ( $filecount > $loadcount ) {
die "ERROR: Found too many files: expecting $loadcount files,
but found $filecount files. " .
"Maybe you want to increase the 'loadcount' parameter in
'$conf_file'?";
}
else {
my $pm = Parallel::ForkManager->new( $num_threads );
init_pm( $pm );

my $itr;
while ( @files ) {
my $file = shift @files;
++$itr;

my ( $err_log, $txn_log ) = init_logs( $file );
my $id = "file=$file\:\:err_log=$err_log";

my @parms;
if ( $class eq 'PRICE' ) {
@parms = ( $file, $err_log, $txn_log );
}
else {
@parms = ( $file );
}

$pm->start( $id ) and next;

$process{$class}->( @parms );
archive_file( $file );

$pm->finish;
}
$pm->wait_all_children;
if ( $filecount == $loadcount ) {
$done = 1;
}
}
}

sub get_files {
my ( $loadcount, $filecount_ref, $filesref, $numthreads_ref ) =
@_;

if ( $$filecount_ref == $loadcount ) {
++$$filecount_ref;
return;
}

@$filesref = glob("$dropdir/$class\_*");

my $diff = $loadcount - $$filecount_ref;

if ( @$filesref == 0 ) {
print localtime() . " Waiting on $diff out of $loadcount
file(s). " .
"About to sleep $check_interval seconds before checking
again...\n";
sleep $check_interval;
}
else {
$$numthreads_ref = @$filesref;
$$filecount_ref += @$filesref;
show_files( $filesref );
}
}
From: nolo contendere on
On Apr 23, 5:35 pm, Ted Zlatanov <t...(a)lifelogs.com> wrote:
> On Wed, 23 Apr 2008 11:29:42 -0700 (PDT) nolo contendere <simon.c...(a)fmr.com> wrote:
>
> nc>      I am expecting 3 files in a drop directory. They won't
> nc> necessarily all arrive at the same time. I want to begin processing
> nc> the each file as soon as it arrives (or as close to arrival time as is
> nc> reasonable). Would the best way to go about this be to simply have a
> nc> script that takes a filename as a parameter and marks the file as
> nc> 'currently processing' when it begins to process the file (or could
> nc> move the file to a different directory)?
>
> nc> I could kick off 3 daemon processes looking in the drop directory, and
> nc> sleep every 5 secs, for instance.
>
> nc> That seems to me, to be a straightforward, if clumsy, approach. I was
> nc> wondering if there was a module that could accomplish this task more
> nc> elegantly--Parallel::ForkManager, at least in my experience, doesn't
> nc> seem entirely suited to this particular task.
>
> nc> Or I could code my own fork,exec,wait/waitpid.
>
> Get Tie::ShareLite from CPAN.
>
> In each process, lock a shared hash and insert an entry for the new file
> when it's noticed in the idle loop.  If the file already exists in the
> hash, do nothing.  The first process to notice the file wins.
>
> Now, unlock the hash and work with the file.  When done, move the file
> out, lock the hash again, and remove the entry you inserted.
>
> The advantage is that you can store much more in the hash than just the
> filename, so this is handy for complex processing.  Also, no file
> renaming is needed.

This is similar in concept to what I was doing with
Parallel::ForkManager, only with a "global" array.

>
> A simpler version is just to rename the file to "$file.$$" where $$ is
> your PID.  If, after the rename, the renamed file is there, your process
> won against the others and you can work with the file.  Note there could
> be name collisions with an existing file, but since PIDs are unique on
> the machine, you can just remove that bogus file.  Just be aware this is
> the quick and dirty solution.

Yeah, PIDs can be reused, but a filename/timestamp/pid combo would be
effectively unique. This is an example of my "mark the file as
currently processing" tactic. another solution would be to move it to
a tmp or work dir.

>
> Another approach is to use a Maildir structure, which can handle
> multiple readers and writers atomically, even over NFS.  You just need
> to map your incoming queue into a Maildir structure; there's no need to
> actually have mail in the files.  This is good if you expect lots of
> volume, network access, etc. complications to your original model.
>

This is interesting! I'll do some research into Maildir.

Ted, thanks for the ideas! I appreciate the different perspectives.
From: nolo contendere on
On Apr 24, 4:15 am, Peter Makholm <pe...(a)makholm.net> wrote:
> Ben Morrow <b...(a)morrow.me.uk> writes:
> > SGI::FAM only works under Irix. I've been meaning to port it to other
> > systems that support fam (and gamin, the GNU rewrite) but haven't got
> > round to it yet.
>
> Never used the module myself (should have made that clear) and I have
> to admit that my only reason to assume that it is usable on other
> platforms is that File::Tail::FAM talk about Linux.
>
> //Makholm

I appreciate the effort Peter, however I'm currently stuck on Solaris.
From: Ted Zlatanov on
On Thu, 24 Apr 2008 07:28:20 -0700 (PDT) nolo contendere <simon.chao(a)fmr.com> wrote:

nc> On Apr 23, 5:35�pm, Ted Zlatanov <t...(a)lifelogs.com> wrote:
>> Get Tie::ShareLite from CPAN.
>>
>> In each process, lock a shared hash and insert an entry for the new file
>> when it's noticed in the idle loop. �If the file already exists in the
>> hash, do nothing. �The first process to notice the file wins.
>>
>> Now, unlock the hash and work with the file. �When done, move the file
>> out, lock the hash again, and remove the entry you inserted.
>>
>> The advantage is that you can store much more in the hash than just the
>> filename, so this is handy for complex processing. �Also, no file
>> renaming is needed.

nc> This is similar in concept to what I was doing with
nc> Parallel::ForkManager, only with a "global" array.

Yes, but notice you can suddenly access the global hash from any Perl
program, not just the managed ones. The hash becomes your API.

nc> Yeah, PIDs can be reused, but a filename/timestamp/pid combo would be
nc> effectively unique. This is an example of my "mark the file as
nc> currently processing" tactic. another solution would be to move it to
nc> a tmp or work dir.

By the way, if you need a unique name, use the File::Temp module. I
should have mentioned that.

Ted