From: Maciej Sobczak on
Hi all,

Imagine a server with fixed number of worker tasks. There is no queue
of jobs and jobs are immediately passed to one of the tasks that is
currently idle. There is a separate task (or just the main one) that
provides jobs for worker tasks.

I am concerned with the proper structure of objects - I mean in the
sense of recommended Ada practice.
Obviously there is a need for some shared resource where the
requesting task will put the job and from where the worker task will
pick it up.

This is more or less what I came up with, where the "channel" is a
single processing pipeline:

type Worker_State is (Idle, Ready, Working);

protected type Channel_State is
procedure Post (J : in Job_Type);
entry Get_Job (J : out Job_Type);
function Busy return Boolean;
private
State : Worker_State := Idle;
Job_To_Do : Job_Type;
end Channel_State;

protected body Channel_State is

procedure Post (J : in Job_Type) is
begin
if State /= Idle then
raise Program_Error;
end if;

Job_To_Do := J;
State := Ready;
end Post;

entry Get_Job (J : out Job_Type) when State = Ready is
begin
J := Job_To_Do;
State := Working;
end Get_Job;

function Busy return Boolean is
begin
return State /= Idle;
end Busy;

end Channel_State;

type Channel;
task type Worker_Task (Ch : access Channel);

type Channel is record
State : Channel_State;
Worker : Worker_Task (Channel'Access);
end record;

task body Worker_Task is
Job : Job_Type;
begin
loop
Ch.all.Get_Job (Job);

-- do the job ...

end loop;
end Worker_Task;

Max_Channels : constant := 5;

Channels : array (1 .. Max_Channels) of Channel;

My question is whether this is what a seasoned Ada programmer would
do.
Initially I tried to have two separate arrays, one for jobs and one
for worker tasks, but I found it difficult to link workers with their
respective jobs. After bundling them together in a single record that
is referenced from the task it worked and I actually find it
structured better.

The main task after constructing a job object finds some channel where
the worker task is not busy and posts the job to its shared state
component:

loop
Job := ...

Found_Worker := False;
for I in Channels'Range loop
if not Channels (I).State.Busy then
Channels (I).State.Post (Job);
Found_Worker := True;
exit;
end if;
end loop;

if not Found_Worker then
-- all pipelines are busy,
-- but the overflow handling is not shown...
end if;
end loop;

All this works fine, but my question considers the choice of language
constructs and idioms.

--
Maciej Sobczak * www.msobczak.com * www.inspirel.com

Database Access Library for Ada: www.inspirel.com/soci-ada
From: Jean-Pierre Rosen on
Maciej Sobczak a �crit :
> Hi all,
>
> Imagine a server with fixed number of worker tasks. There is no queue
> of jobs and jobs are immediately passed to one of the tasks that is
> currently idle. There is a separate task (or just the main one) that
> provides jobs for worker tasks.
[...]

Why not simply use a rendezvous?
Each worker has an entry Get_Job:

task body Worker_Task is
Job : Job_Type;
begin
loop
Get_Job (Job);

-- do the job ...

end loop;
end Worker_Task;

and the server is simply (assuming Servers is an array of Worker_Task):

loop
Job := ...

Found_Worker := False;
for I in Servers'Range loop
select
Server (I).Get_Job (Job);
Found_Worker := True;
exit;
else
-- This server is busy
null;
end select;
end loop;

if not Found_Worker then
-- all pipelines are busy,
-- but the overflow handling is not shown...
end if;
end loop;
--
---------------------------------------------------------
J-P. Rosen (rosen(a)adalog.fr)
Visit Adalog's web site at http://www.adalog.fr
From: Dmitry A. Kazakov on
On Fri, 19 Sep 2008 15:34:00 +0200, Jean-Pierre Rosen wrote:

> Maciej Sobczak a �crit :
>>
>> Imagine a server with fixed number of worker tasks. There is no queue
>> of jobs and jobs are immediately passed to one of the tasks that is
>> currently idle. There is a separate task (or just the main one) that
>> provides jobs for worker tasks.
> [...]
>
> Why not simply use a rendezvous?
> Each worker has an entry Get_Job:
>
> task body Worker_Task is
> Job : Job_Type;
> begin
> loop
> Get_Job (Job);

You mean:

accept Get_Job (Requested : Job_Type) do
Job := Requested;
end Get_Job;

> end loop;
> end Worker_Task;
>
> and the server is simply (assuming Servers is an array of Worker_Task):
>
> loop
> Job := ...
>
> Found_Worker := False;
> for I in Servers'Range loop
> select
> Server (I).Get_Job (Job);
> Found_Worker := True;
> exit;
> else
> -- This server is busy
> null;
> end select;
> end loop;
>
> if not Found_Worker then
> -- all pipelines are busy,
> -- but the overflow handling is not shown...
> end if;
> end loop;

This scheme requires some additional efforts in order to maintain the list
of idle workers.

I would use an inverse one, which is much simpler:

with Ada.Text_IO; use Ada.Text_IO;

type Job_Type is (Quit, Run);
type Job (Kind : Job_Type := Quit) is null record;
task type Worker;
task Server is
entry Get (Work : out Job);
end Server;

task body Server is
Request : Job;
Workers : array (1..5) of Worker;
begin
loop
-- Get a job to do
Request := (Kind => Run);
-- Wait for a worker to come
accept Get (Work : out Job) do
Work := Request;
end Get;
end loop;

-- Terminating workers
for Index in Workers'Range loop
Request := (Kind => Quit);
accept Get (Work : out Job) do
Work := Request;
end Get;
end loop;
end Server;

task body Worker is
Work : Job;
begin
loop
Server.Get (Work);
case Work.Kind is
when Quit => exit;
when Run => Put_Line ("Doing things");
end case;
end loop;
Put_Line ("I am done");
end Worker;

Note that task termination is usually a difficult problem in Ada. You
should pay an attention to this early. (The "terminate" alternative is
unusable in 80% cases.) In the solution above a special job type is used to
kill the worker.

--
Regards,
Dmitry A. Kazakov
http://www.dmitry-kazakov.de
From: anon on
Since you are using Channels, I am assuming that your talking about TCP/IP
servers. In this case you should look into using the Check_Selector with
the use of Signalling_Fds and Socket_Sets instead of using arrays.

Now the way GNAT has written the Selector function and the way the C_Select
is written the maximum number of server per Check_Selector is 27. That is,
each C_Select function can only monitor 32 sockets, but
GNAT.Sockets.Create_Selector uses two socket and the TCP/IP sub-system
uses or predefines 3, so you are left with 27 user define servers.

Found this clent/server on the net. The following client/server shows how to
use the Check_Selector routine. They were both were written in all caps and
use Ada-95 spec so, you may need to adjust the Signalling_Fds routine
calls for Ada-2005 (GNAT-2008), but they do compile and excute under
GNAT GPL 2007 using Linux.

Note: 1) I alter for spacing and cap format. To make them more readable.
2) The procedure "Set_Socket_Option" may need to be commented
out. Some TCP/IP system do not like GNAT version of that
routine.
3) As for Windows, I did not test!


--------------------------------------------------------------
--
-- Pool_Server
--
with GNAT.Sockets ;
use GNAT.Sockets ;
with Ada.Text_IO ;

procedure Pool_Server is

MaxTasks : constant Positive := 5 ; -- buffer size
type Index is mod MaxTasks ;

function Rev ( S : String ) return String is
Res : String ( S'Range ) ;
J : Integer := S'First ;
begin
for I in reverse S'Range loop
Res ( J ) := S ( I ) ;
J := J + 1 ;
end loop ;
return Res ;
end Rev ;

protected Aborted is
procedure Set ;
function Check return Boolean ;
private
Done : Boolean := False ;
end Aborted ;

protected body Aborted is
procedure Set is
begin
Done := True ;
end Set ;

function Check return Boolean is
begin
return Done ;
end Check ;
end Aborted ;


type Echo ;
type Echo_Access is access Echo ;

task type Echo is
entry Start ( N_Sock : IN Socket_Type ;
Self : IN Echo_Access ) ;
entry ReStart ( N_Sock : IN Socket_Type ) ;
end Echo ;

type Task_Array is array ( Index ) of Echo_Access ;

protected Buffer is
entry Deposit ( X : in Echo_Access ) ;
entry Extract ( X : out Echo_Access ) ;
function NumWaiting return Natural ;
private
Buf : Task_Array ;
I, J : Index := 0 ;
Count : Natural range 0 .. MaxTasks := 0 ;
end Buffer ;


task body Echo is
Sock : Socket_Type ;
S : Stream_Access ;
Me : Echo_Access ;
Input_Selector : Selector_Type ;
Input_Set : Socket_Set_Type ;
WSet : Socket_Set_Type ;
Input_Status : Selector_Status ;
begin
--set up selector
Create_Selector ( Input_Selector ) ;

--Initialise socket sets
--WSet is always empty as we are not interested in output events
-- RSet only ever contains one socket namely Sock
Empty ( Input_Set ) ;
Empty ( WSet ) ;

ACCEPT Start ( N_Sock : IN Socket_Type ;
Self : IN Echo_Access ) DO
Sock := N_Sock ;
Me := Self ;
end Start ;

loop
begin -- block for exception handling
S := Stream ( Sock ) ; -- set up stream on socket
Boolean'Write ( S, True ) ; -- acknowledge connection

loop
-- check for input on Sock socket
Set ( Input_Set, Sock ) ;

-- time-out on check if no input within 0.5 second
Check_Selector ( Input_Selector,
Input_Set,
WSet,
Input_Status,
0.5 ) ;
if Input_Status = Completed then
-- we have input, so process it
declare
Str : String := String'Input ( S ) ;
begin
exit when Str = "quit" ;
String'Output ( S, Rev ( Str ) ) ;
end ;
end if ;
if Aborted.Check then
String'Output ( S, "Server aborted" ) ;
exit ;
end if ;
end loop ;

Ada.Text_IO.New_Line ;
Ada.Text_IO.Put_Line ( "Slave Closing Connection" ) ;
ShutDown_Socket ( Sock, Shut_Read_Write ) ;
Buffer.Deposit ( Me ) ;

exception
-- The mostly likely exception is if client quits unexpectedly
-- close the socket and deposit ourselves in the buffer
when others =>
Ada.Text_IO.New_Line ;
Ada.Text_IO.Put_Line ( "Connection closed unexpectedly" ) ;
Close_Socket ( Sock ) ;
Buffer.Deposit ( Me ) ;
end ;

select
ACCEPT ReStart ( N_Sock : IN Socket_Type ) DO
Sock := N_Sock ;
end ReStart ;
or
-- terminate if all slaves are queued here and
-- if the main server task has finished
terminate ;
end select ;

end loop ;
end Echo ;

protected body Buffer is
entry Deposit ( X : IN Echo_Access ) when Count < MaxTasks is
begin
Buf ( I ) := X ;
I := I + 1 ;
Count := Count + 1 ;
end Deposit ;

entry Extract ( X : OUT Echo_Access ) when Count > 0 is
begin
X := Buf ( J ) ;
J := J + 1 ;
Count := Count - 1 ;
end Extract ;

function NumWaiting return Natural is
begin
return Count ;
end NumWaiting ;
end Buffer ;

Server : Socket_Type ;
New_Sock : Socket_Type ;
Slave : Echo_Access ;
Addr : Sock_Addr_Type ( Family_Inet ) ;
Peer_Addr : Sock_Addr_Type ( Family_Inet ) ;
Avail : Boolean := False ;
Ch : Character ;
TotalTasks : Natural := 0 ;
Accept_Selector : Selector_Type ;
Accept_Set : Socket_Set_Type ;
WSet : Socket_Set_Type ;
Accept_Status : Selector_Status ;

begin -- main server task
Ada.Text_IO.Put_Line ( "WARNING server loops for ever." ) ;
Ada.Text_IO.Put ( "Press A to terminate server and all " ) ;
Ada.Text_IO.Put_Line ( "tasks immediately or press Q to ") ;
Ada.Text_IO.Put ( "accept no further connections and " ) ;
Ada.Text_IO.Put ( "terminate gracefully when all clients " ) ;
Ada.Text_IO.Put ( "are fully when all clients are through." ) ;
Ada.Text_IO.New_Line ;
Initialize ;
Create_Socket ( Server) ;
Addr := ( Family_Inet,
Addresses ( Get_Host_By_Name ( Host_Name ), 1 ),
50000 ) ;
-- allow server address to be reused for multiple connections
Set_Socket_Option ( Server,
Socket_Level,
( Reuse_Address, True ) ) ;

Bind_Socket ( Server, Addr ) ;
Listen_Socket ( Server, 4 ) ;

-- set up selector
Create_Selector ( Accept_Selector ) ;

-- Initialise socket sets
-- WSet is always empty as we are not interested in output
-- events Accept_Set only ever contains one socket namely
-- Server
Empty ( Accept_Set ) ;
Empty ( WSet ) ;
loop
Ada.Text_IO.Get_Immediate ( Ch, Avail ) ;
if Avail and then
( Ch = 'q' or Ch = 'Q' or Ch = 'a' or Ch = 'A' ) then
exit ;
end if ;

-- check for input (connection requests) on Server socket
Set ( Accept_Set, Server ) ;
-- time-out on check if no request within 1 second
Check_Selector ( Accept_Selector,
Accept_Set,
WSet,
Accept_Status,
1.0 ) ;

if Accept_Status = Completed then
-- must be an event on Server socket as it is the only
-- one that we are checking.
-- Hence the Accept_Socket call should not block.

Accept_Socket ( Server, New_Sock, Peer_Addr ) ;
Ada.Text_IO.New_Line ;
Ada.Text_IO.Put_Line
( "Connection accepted -- allocating slave" ) ;


if Buffer.NumWaiting = 0 and TotalTasks < MaxTasks then
Slave := NEW Echo ; -- start new task
TotalTasks := TotalTasks + 1 ;
Ada.Text_IO.Put_Line ( "New slave task started" ) ;
-- call entry Start to activate task
Slave.Start ( New_Sock, Slave ) ;
else
Ada.Text_IO.Put_Line ( "Waiting for an idle slave task" ) ;
Buffer.Extract ( Slave ) ;
-- call entry Start to re-activate task
Slave.ReStart ( New_Sock ) ;
Ada.Text_IO.Put_Line ( " Idle slave task reactivated" ) ;
end if ;
end if ;
end loop ;

if Ch = 'a' or Ch = 'A' then
-- signal slave tasks to terminate
Aborted.Set ;
end if ;

-- tidy up
Close_Selector ( Accept_Selector ) ;
Empty ( Accept_Set ) ;

Close_Socket ( Server ) ;
Ada.Text_IO.New_Line ;
Ada.Text_IO.Put_Line ( "Main server task exiting ..." ) ;
Finalize ;
end Pool_Server ;


--------------------------------------------------------------
--
-- Pool_Client
--
with Gnat.Sockets ;
use Gnat.Sockets ;
with Ada.Command_Line ;
use Ada.Command_Line ;
with Ada.Text_IO ;
use Ada.Text_IO ;

procedure Pool_Client is
Sock : Socket_Type ;
S : Stream_Access ;
Addr : Sock_Addr_Type ( Family_Inet ) ;
Msg : String ( 1 .. 80 ) ;
Last : Natural ;
B : Boolean ;
Read_Selector : Selector_Type ;
Read_Set, WSet : Socket_Set_Type ;
Read_Status : Selector_Status ;
begin
Initialize ;
Create_Socket ( Sock ) ;
Addr := ( Family_Inet,
Addresses ( Get_Host_By_Name ( Argument ( 1 ) ), 1 ),
50000 ) ;
Create_Selector ( Read_Selector ) ;
Empty ( Read_Set ) ;
Empty ( WSet ) ;


Connect_Socket ( Sock, Addr ) ;
S := Stream ( Sock ) ;
Boolean'Read ( S, B ) ;
-- wait for connection to be accepted

loop
Set ( Read_Set, Sock ) ;

-- check for input on socket (server may be aborting)
-- time-out immediately if no input pending
-- We seem to need a small delay here (using zero seems to block
-- forever)
-- Is this a GNAT bug or AB misreading Check_Selector docs?

Check_Selector ( Read_Selector,
Read_Set,
WSet,
Read_Status,
0.005 ) ;
if Read_Status = Expired then
Ada.Text_IO.Put ( "Message> " ) ; -- prompt user for message
Ada.Text_IO.Get_Line ( Msg, Last ) ;

-- send message to socket unless server is aborting
String'Output ( S, Msg ( 1 .. Last ) ) ;
exit when Msg ( 1 .. Last ) = "quit" ;
end if ;

declare
-- receive message
Str : String := String'Input ( S ) ;
begin Ada.Text_IO.Put_Line ( Str ) ;
exit when Str = "Server aborted" ;
end ;
end loop ;

Ada.Text_IO.Put_Line ( "Client quitting ..." ) ;
ShutDown_Socket ( Sock ) ;
Close_Selector ( Read_Selector ) ;
Finalize ;
exception
when others =>
Ada.Text_IO.Put_Line ("Exception: Client quitting ..." ) ;
Close_Socket ( Sock ) ;
Close_Selector( Read_Selector ) ;
Finalize ;
end Pool_Client ;


In <8b4d1170-22e6-40d3-8ed1-096dc0163491(a)m36g2000hse.googlegroups.com>, Maciej Sobczak <see.my.homepage(a)gmail.com> writes:
>Hi all,
>
>Imagine a server with fixed number of worker tasks. There is no queue
>of jobs and jobs are immediately passed to one of the tasks that is
>currently idle. There is a separate task (or just the main one) that
>provides jobs for worker tasks.
>
>I am concerned with the proper structure of objects - I mean in the
>sense of recommended Ada practice.
>Obviously there is a need for some shared resource where the
>requesting task will put the job and from where the worker task will
>pick it up.
>
>This is more or less what I came up with, where the "channel" is a
>single processing pipeline:
>
> type Worker_State is (Idle, Ready, Working);
>
> protected type Channel_State is
> procedure Post (J : in Job_Type);
> entry Get_Job (J : out Job_Type);
> function Busy return Boolean;
> private
> State : Worker_State := Idle;
> Job_To_Do : Job_Type;
> end Channel_State;
>
> protected body Channel_State is
>
> procedure Post (J : in Job_Type) is
> begin
> if State /= Idle then
> raise Program_Error;
> end if;
>
> Job_To_Do := J;
> State := Ready;
> end Post;
>
> entry Get_Job (J : out Job_Type) when State = Ready is
> begin
> J := Job_To_Do;
> State := Working;
> end Get_Job;
>
> function Busy return Boolean is
> begin
> return State /= Idle;
> end Busy;
>
> end Channel_State;
>
> type Channel;
> task type Worker_Task (Ch : access Channel);
>
> type Channel is record
> State : Channel_State;
> Worker : Worker_Task (Channel'Access);
> end record;
>
> task body Worker_Task is
> Job : Job_Type;
> begin
> loop
> Ch.all.Get_Job (Job);
>
> -- do the job ...
>
> end loop;
> end Worker_Task;
>
> Max_Channels : constant := 5;
>
> Channels : array (1 .. Max_Channels) of Channel;
>
>My question is whether this is what a seasoned Ada programmer would
>do.
>Initially I tried to have two separate arrays, one for jobs and one
>for worker tasks, but I found it difficult to link workers with their
>respective jobs. After bundling them together in a single record that
>is referenced from the task it worked and I actually find it
>structured better.
>
>The main task after constructing a job object finds some channel where
>the worker task is not busy and posts the job to its shared state
>component:
>
> loop
> Job := ...
>
> Found_Worker := False;
> for I in Channels'Range loop
> if not Channels (I).State.Busy then
> Channels (I).State.Post (Job);
> Found_Worker := True;
> exit;
> end if;
> end loop;
>
> if not Found_Worker then
> -- all pipelines are busy,
> -- but the overflow handling is not shown...
> end if;
> end loop;
>
>All this works fine, but my question considers the choice of language
>constructs and idioms.
>
>--
>Maciej Sobczak * www.msobczak.com * www.inspirel.com
>
>Database Access Library for Ada: www.inspirel.com/soci-ada

From: Maciej Sobczak on
On 19 Wrz, 15:34, Jean-Pierre Rosen <ro...(a)adalog.fr> wrote:

> Why not simply use a rendezvous?

This is something that I deliberately wanted to avoid.
I don't like the idea of tasks interacting with each other directly
and I prefer to have the communication part extracted away (Ravenscar
got that right for a reason, I think?).

In theory, I could also change the queuing strategy without modifying
the worker task.

--
Maciej Sobczak * www.msobczak.com * www.inspirel.com

Database Access Library for Ada: www.inspirel.com/soci-ada
 |  Next  |  Last
Pages: 1 2 3 4
Prev: Using GtkAda in Ubuntu
Next: gnat on opensuse 11.1