where : ibrtses delphi

Delphi - timer message queue

disclaimer

the source code of this page may not appear correctly in certain browsers
due to special characters. Have a look at the source of this HTML page
with notepad instead


timer message queue

Communication applications may require messages being sent a predefined times.
There is one thread getting the messages from this queue, usually a TxThread,
and multiple threads may queue messages. A TimerQueue does exactly that.
Message entries are added with a time (from now) in ticks. The entries are
inserted in increasing order, and the time is changed to an incremental value.
Should there be multiple messages scheduled for the same tick, the last queueed
is the last retrieved.

simple message queue

The simple message queue is simply first in, first out. access to the queue
is also protected with an internal TCriticalSection. The messages are also
build from the TMyMsg class.

message class

The TMyMsg class is generic. It holds the data as binary array and can
be filled/read as String. The size has to be set at creation and cannot be
changed later. The data contained can be shorter than allocated though.

the code

{
  Message Queue with time control for threads

  A TMyMsg is generated and queued with a set delay (from now)
  The Queue stores the messages in time sequenced order. For multiple 
  messages of the same time the last entered will be the last.
  The message delays are incremental.

  TMyMsg holds any data and mus be filled before queueing

  Access to the queue is locked with TCriticalSection

  Use :
   MAIN
   queue:=TTimerMsgQueue.create;
   p:=TMyMsg.create(10);
   p.Asstring:='1234567890';
   p.delay:=1;
   queue.queuein(p);

   THREAD
   while (not terminated) do begin
    if (queue.count<>0)and(queue.ndelay=0) then begin
      p:=queue.queueout;
      ..
      p.destroy;
     end;
    sleep(..);
    queue.decrement(1);
   end;
   

}
unit MyMsgQueue;

interface

uses
  Windows, Messages, SysUtils, Classes, Graphics, Controls, Forms, Dialogs,
  Syncobjs,StdCtrls;

type
  TMyMsg=class(TObject)
   private
    fprev,fnext:TMyMsg;
    fdata:pointer;
    fsize,falloc:integer;
    forigin:integer;
    fdelay:integer;
   protected
    procedure setitem(index:integer;b:byte);
    function getitem(index:integer):byte;
    function getstring:string;
    procedure setstring(s:string);
   public
    property data[index:integer]:byte read getitem write setitem; default;
    constructor create(alloc:integer);  // size fixed on create
    destructor destroy;    override;
   published
    property size:integer read fsize write fsize;  // true size <= allocated
    property origin:integer read forigin write forigin;
    property delay:integer read fdelay write fdelay;
    property prev:TMyMsg read fprev write fprev;
    property next:TMyMsg read fnext write fnext;
    property AsString:string read getstring write setstring;
  end;

  TSimpleMsgQueue=class(TObject)
  private
    { Private declarations }
   fhead,ftail:TMyMsg;
   fcount:integer;
   fcs:TCriticalSection;
  protected
    { Protected declarations }
  public
    { Public declarations }
   constructor create;
   destructor destroy;    override;   // destroys all messages contained
   procedure queuein(p:TMyMsg);
   function queueout:TMyMsg;
   procedure list(m:TMemo);
  published
    { Published declarations }
   property count:integer read fcount;
  end;   

  TTimerMsgQueue = class(TObject)
  private
    { Private declarations }
   fhead,ftail:TMyMsg;
   fcount,fqdelay,fndelay:integer;
   fcs:TCriticalSection;
  protected
    { Protected declarations }
  public
    { Public declarations }
   constructor create;
   destructor destroy;    override;   // destroys all messages contained
   procedure queuein(p:TMyMsg);
   function queueout:TMyMsg;
   procedure list(m:TMemo);
   procedure decrement(i:integer);  // used by thread - dec ndelay
  published
    { Published declarations }
   property count:integer read fcount;
   property qdelay:integer read fqdelay; // total delay of queue
   property ndelay:integer read fndelay; // delay till next msg
  end;

procedure Register;

implementation
{$define debug}

type
 BytePtr=^Byte;
{$R+}
procedure TMyMsg.setitem(index:integer;b:byte);
var p:BytePtr;
    k:integer absolute p;
begin
 p:=fdata;
 k:=k+index*sizeof(Byte);
 p^:=b;
end;

function TMyMsg.getitem(index:integer):byte;
var p:BytePtr;
    k:integer absolute p;
begin
 p:=fdata;
 k:=k+index*sizeof(Byte);
 result:=p^;
end;

constructor TMyMsg.create(alloc:integer);
begin
 inherited create;
 fnext:=nil; fprev:=nil;
 falloc:=alloc;
 fsize:=alloc;          // size set to full size
 GetMem(fdata,alloc*sizeof(Byte));
end;

destructor TMyMsg.destroy;
begin
 FreeMem(fdata,falloc*sizeof(Byte));
 inherited destroy;
end;

function TMyMsg.getstring:string;
begin
 setlength(result,fsize);
 move(fdata^,result[1],fsize);
end;
procedure TMyMsg.setstring(s:string);
begin
 if length(s) <= falloc then move(s[1],fdata^,length(s))
 else move(s[1],fdata^,falloc);  // exception here ????
 fsize:=length(s);
end;
//##############################################################
constructor TSimpleMsgQueue.create;
begin
 inherited create;
 fhead:=nil; ftail:=nil; fcount:=0; 
 fcs:=TCriticalSection.create;
end;

destructor TSimpleMsgQueue.destroy;
var p:TMymsg;
begin
 while fhead<>nil do begin
  p:=queueout;
  p.destroy;
 end;
 fcs.free;
 inherited destroy;
end;

procedure TSimpleMsgQueue.queuein(p:TMyMsg); 
var q:TMyMsg;

begin
 fcs.enter;
 if fcount=0 then begin // the first
  fhead:=p; ftail:=p; fcount:=1; 
 end
 else begin  // append at tail
  q:=ftail;
  q.next:=p;
  p.prev:=q;
  ftail:=p;
  inc(fcount);
 end;
 fcs.leave;
end;


function TSimpleMsgQueue.queueout:TMyMsg;
begin
 fcs.enter;
 if fcount=0 then result:=nil
 else begin
  if fcount=1 then begin // the only one
   result:=fhead;
   fhead:=nil; ftail:=nil; fcount:=0;
   end
  else begin
   result:=fhead;
   fhead:=fhead.next;
   fhead.prev:=nil;
   dec(fcount);
   end;
  end; 
 fcs.leave;
end;

procedure TSimpleMsgQueue.list(m:TMemo);
var p:TMyMsg;
i:integer;
begin
 m.clear;
 m.lines.add('Queue '+inttostr(fcount)+' items '+ inttohex(integer(fhead),8));
 p:=fhead; i:=1;
 while p<>nil do begin
  m.lines.add(inttohex(i,3)+' '+inttohex(integer(p),8)+' '+inttohex(integer(p.prev),8)+' '+inttohex(integer(p.next),8));
  p:=p.next;
  inc(i);
 end;
end; 

//##############################################################
constructor TTimerMsgQueue.create;
begin
 inherited create;
 fhead:=nil; ftail:=nil; fcount:=0; fqdelay:=0; fndelay:=0;
 fcs:=TCriticalSection.create;
end;

destructor TTimerMsgQueue.destroy;
var p:TMymsg;
begin
 while fhead <> nil do begin
  p:=queueout;
  p.destroy;
 end;
 fcs.free;
 inherited destroy;
end;

procedure TTimerMsgQueue.queuein(p:TMyMsg); 
var q,q2:TMyMsg;
    s,s2:integer;
begin
 fcs.enter;
 if fcount=0 then begin // the first
  fhead:=p; ftail:=p; fcount:=1; fqdelay:=p.delay;
 end
 else begin
  q:=fhead;
  s:=q.delay; 
  s2:=q.delay;
  if s > p.delay then begin // insert before head
   p.next:=fhead; fhead.prev:=p; fhead:=p;
   p.next.delay:=p.next.delay-p.delay;
   end
  else begin
   q2:=q.next; // q:=head, q2:=head.next
   if (q2 <> nil) then s2:=s+q2.delay;
   while(p.delay >= s)and(q2 <> nil)and(p.delay >= s2) do begin
    q:=q2;
    s:=s+q.delay;
    q2:=q.next;
    if (q2 <> nil) then s2:=s+q2.delay;
    end;
   // insert after q
   q.next:=p; p.prev:=q; p.next:=q2;
   p.delay:=p.delay-s; 
   if q2<>nil then begin
    q2.prev:=p;
    q2.delay:=q2.delay-p.delay;
    end
   else begin
    fqdelay:=p.delay+s;
    end;
   end;
  inc(fcount);
 end;
 fcs.leave;
 if fhead<>nil then fndelay:=fhead.delay else fndelay:=0;
end;


function TTimerMsgQueue.queueout:TMyMsg;
//var q:TMyMsg;
begin
 fcs.enter;
 if fcount=0 then result:=nil
 else begin
  if fcount=1 then begin // the last
   result:=fhead;
   fhead:=nil; ftail:=nil; fcount:=0;
   fqdelay:=0; fndelay:=0;
   end
  else begin
   result:=fhead;
   fhead:=result.next;
   fhead.prev:=nil;
   dec(fcount);
   fqdelay:=fqdelay-result.delay;
   fndelay:=fhead.delay;
   end;
  end; 
 fcs.leave;
end;

procedure TTimerMsgQueue.list(m:TMemo);
var p:TMyMsg;
i:integer;
begin
 m.clear;
 m.lines.add('Queue '+inttostr(fcount)+' items '+inttostr(fqdelay)+ ' sum '+inttostr(fndelay)+' nd '+ inttohex(integer(fhead),8));
 p:=fhead; i:=1;
 while p<>nil do begin
  m.lines.add(inttohex(i,3)+' '+inttohex(integer(p),8)+' '+inttohex(integer(p.prev),8)+' '+inttohex(integer(p.next),8)+' '+inttostr(p.delay));
  p:=p.next;
  inc(i);
 end;
end; 

procedure TTimerMsgQueue.decrement(i:integer);
begin
 if fndelay > 0 then dec(fndelay,i);
end;

//#####################################################


procedure Register;
begin
  //RegisterComponents('Samples', [TMyMsgQueue]);
end;

end.



Feedback is welcome





sponsored links




Delphi
home

last updated: 1.june.99

Copyright (99,2000) Ing.Büro R.Tschaggelar