Friday, November 5, 2010

Parallel ForEach

I will present a class for handling loops in parallel execution, the class is located in BB.Task:


type
  TProc<T> = reference to procedure(aValue: T);

  TParallelForEach<T> = class
  private
    type
      TTask = class(TThread)
      private
        FProc: TProc<T>;
        FItems: TList<T>;
      protected
        procedure Execute; override;
      public
        constructor Create(aProc: TProc<T>; aItems: TList<T>);
      end;

    var
      FItems: TList<T>;
      FMaxThreadsPerCPU: integer;
      FTasks: array of TThread;

    function GetCPUCount: integer;
  public
    constructor Create(aItems: TList<T>);
    destructor Destroy; override;
    procedure Run(aProc: TProc<T>);
    procedure Wait;

    property MaxThreadsPerCPU: integer read FMaxThreadsPerCPU write FMaxThreadsPerCPU;
  end;
You pass in the constructor a list of T (ok, it should be an iterator, but in Delphi, I suppose for compatibility issues no list implement a common iterator interface, what a pitty...)
You pass a closure in the Run() method and also you can express the maximum threads created per CPU (this property is very important!)
And you can wait for all task to be finish with the Wait() method (this will also happend when you free the class)
An easy example:
var
  p: TParallelForEach<integer>;
  list: TList<integer>;
  i: integer;

begin
  t := GetTickCount;

  list := TList<integer>.Create;
  try
    for i := 1 to 100 do
      list.Add(Random(i * 10));

    p := TParallelForEach<integer>.Create(list);
    p.MaxThreadsPerCPU := 10;
    p.Run(procedure(aItem: integer)
          begin
            //Heavy task
            Sleep(aItem);
          end
          );
    p.Wait;
  finally
    list.Free;
  end;
Depending on how heavy is the closure you must indicate a certain amount of threads, for this silly example with 2 CPU's I found out that 10 is the best option (although the default value is 2). It does not make sense to overload the system with 100 threads...


And finally the implementation:



{ TParallelForEach<T> }

constructor TParallelForEach<T>.Create(aItems: TList<T>);
begin
  FItems := aItems;
  FMaxThreadsPerCPU := 2;
end;

destructor TParallelForEach<T>.Destroy;
var
  i: integer;

begin
  for i := 0 to Length(FTasks) - 1 do
    FTasks[i].Free;

  inherited;
end;

function TParallelForEach<T>.GetCPUCount: integer;
var
  ProcessMask, SystemMask: dword;

begin
  //This routine calculates the number of CPUs available to the process, not necessarily on the system
  Result := 1;
  if GetProcessAffinityMask(GetCurrentProcess, ProcessMask, SystemMask) then
  begin
    while ProcessMask <> 0 do
    begin
      if Odd(ProcessMask) then
        Inc(Result);
      ProcessMask := ProcessMask shr 1;
    end;
    Dec(Result);
  end;
end;

procedure TParallelForEach<T>.Run(aProc: TProc<T>);
var
  i, ThreadCount: integer;
  groups: array of TList<T>;

begin
  //Calculate total threads
  ThreadCount := FMaxThreadsPerCPU * GetCPUCount;
  if ThreadCount > FItems.Count then
    ThreadCount := FItems.Count;

  //Create as many data groups as required
  SetLength(groups, ThreadCount);
  for i := 0 to ThreadCount - 1 do
    groups[i] := TList<T>.Create;

  //Dispersion of items
  for i := 0 to FItems.Count - 1 do
    groups[i mod ThreadCount].Add(FItems[i]);

  //Launch all tasks
  SetLength(FTasks, Length(groups));
  for i := Low(groups) to High(groups) do
  begin
    FTasks[i] := TTask.Create(aProc, groups[i]);
    FTasks[i].Start;
  end;
end;

procedure TParallelForEach<T>.Wait;
var
  i: integer;

begin
  for i := 0 to Length(FTasks) - 1 do
    FTasks[i].WaitFor;
end;

{ TParallelForEach<T>.TTask }

constructor TParallelForEach<T>.TTask.Create(aProc: TProc<T>; aItems: TList<T>);
begin
  inherited Create(True);

  FProc := aProc;
  FItems := aItems;

  FreeOnTerminate := False;
end;

procedure TParallelForEach<T>.TTask.Execute;
var
  i: integer;

begin
  for i := 0 to FItems.Count - 1 do
    FProc(FItems[i]);
  FItems.Free;
end;

No comments:

Post a Comment