I will present a class for handling loops in parallel execution, the class is located in BB.Task:
type
TProc<T> = reference to procedure(aValue: T);
TParallelForEach<T> = class
private
type
TTask = class(TThread)
private
FProc: TProc<T>;
FItems: TList<T>;
protected
procedure Execute; override;
public
constructor Create(aProc: TProc<T>; aItems: TList<T>);
end;
var
FItems: TList<T>;
FMaxThreadsPerCPU: integer;
FTasks: array of TThread;
function GetCPUCount: integer;
public
constructor Create(aItems: TList<T>);
destructor Destroy; override;
procedure Run(aProc: TProc<T>);
procedure Wait;
property MaxThreadsPerCPU: integer read FMaxThreadsPerCPU write FMaxThreadsPerCPU;
end;
You pass in the constructor a list of T (ok, it should be an iterator, but in Delphi, I suppose for compatibility issues no list implement a common iterator interface, what a pitty...)
You pass a closure in the Run() method and also you can express the maximum threads created per CPU (this property is very important!)
And you can wait for all task to be finish with the Wait() method (this will also happend when you free the class)
An easy example:
var
p: TParallelForEach<integer>;
list: TList<integer>;
i: integer;
begin
t := GetTickCount;
list := TList<integer>.Create;
try
for i := 1 to 100 do
list.Add(Random(i * 10));
p := TParallelForEach<integer>.Create(list);
p.MaxThreadsPerCPU := 10;
p.Run(procedure(aItem: integer)
begin
//Heavy task
Sleep(aItem);
end
);
p.Wait;
finally
list.Free;
end;
Depending on how heavy is the closure you must indicate a certain amount of threads, for this silly example with 2 CPU's I found out that 10 is the best option (although the default value is 2). It does not make sense to overload the system with 100 threads...
And finally the implementation:
{ TParallelForEach<T> }
constructor TParallelForEach<T>.Create(aItems: TList<T>);
begin
FItems := aItems;
FMaxThreadsPerCPU := 2;
end;
destructor TParallelForEach<T>.Destroy;
var
i: integer;
begin
for i := 0 to Length(FTasks) - 1 do
FTasks[i].Free;
inherited;
end;
function TParallelForEach<T>.GetCPUCount: integer;
var
ProcessMask, SystemMask: dword;
begin
//This routine calculates the number of CPUs available to the process, not necessarily on the system
Result := 1;
if GetProcessAffinityMask(GetCurrentProcess, ProcessMask, SystemMask) then
begin
while ProcessMask <> 0 do
begin
if Odd(ProcessMask) then
Inc(Result);
ProcessMask := ProcessMask shr 1;
end;
Dec(Result);
end;
end;
procedure TParallelForEach<T>.Run(aProc: TProc<T>);
var
i, ThreadCount: integer;
groups: array of TList<T>;
begin
//Calculate total threads
ThreadCount := FMaxThreadsPerCPU * GetCPUCount;
if ThreadCount > FItems.Count then
ThreadCount := FItems.Count;
//Create as many data groups as required
SetLength(groups, ThreadCount);
for i := 0 to ThreadCount - 1 do
groups[i] := TList<T>.Create;
//Dispersion of items
for i := 0 to FItems.Count - 1 do
groups[i mod ThreadCount].Add(FItems[i]);
//Launch all tasks
SetLength(FTasks, Length(groups));
for i := Low(groups) to High(groups) do
begin
FTasks[i] := TTask.Create(aProc, groups[i]);
FTasks[i].Start;
end;
end;
procedure TParallelForEach<T>.Wait;
var
i: integer;
begin
for i := 0 to Length(FTasks) - 1 do
FTasks[i].WaitFor;
end;
{ TParallelForEach<T>.TTask }
constructor TParallelForEach<T>.TTask.Create(aProc: TProc<T>; aItems: TList<T>);
begin
inherited Create(True);
FProc := aProc;
FItems := aItems;
FreeOnTerminate := False;
end;
procedure TParallelForEach<T>.TTask.Execute;
var
i: integer;
begin
for i := 0 to FItems.Count - 1 do
FProc(FItems[i]);
FItems.Free;
end;
No comments:
Post a Comment