Monday, September 29, 2008

More A Sink Kronos programming

Today, we'll look at the implementation behind some of the code presented in this post. Let's start by looking at the implementation of the IAsyncResult interface. Because there are two pairs of BeginInvoke/EndInvoke, one that just calls a procedure and one that is designed to call a function, we'll start with a base implementation that will be shared between these two patterns. Here's the declaration for the TBaseAsyncResult class that forms the core of the IAsyncResult implementation:

  TBaseAsyncResult = class;

IGetAsyncResult = interface
['{E833C81B-0F16-4A67-BE7C-48E21A784B1A}']
function GetAsyncResult: TBaseAsyncResult;

property AsyncResult: TBaseAsyncResult read GetAsyncResult;
end;

TBaseAsyncResult = class(TInterfacedObject, IAsyncResult, IGetAsyncResult)
private
FCompleted: Boolean;
FSynchronous: Boolean;
FAsyncHandle: TSimpleEvent;
FInvokingThread: Cardinal;
procedure Complete;
procedure DoAsyncDispatch;
{ IGetAsyncResult }
function GetAsyncResult: TBaseAsyncResult;
{ IAsyncResult }
function GetAsyncHandleObject: THandleObject;
function GetCompletedSynchronously: Boolean;
function GetIsCompleted: Boolean;

property AsyncHandleObject: THandleObject read GetAsyncHandleObject;
protected
FInvokingException: TObject;
procedure AsyncDispatch; virtual; abstract;
function Invoke: IAsyncResult;
procedure InitDispatch;
procedure WaitForCompletion;
public
destructor Destroy; override;
end;

Let me explain the IGetAsyncResult interface. Since you cannot merely typecast an interface instance back to an object, we needed some way to get back to the actual implementation. Using this internal-only interface we can simply perform and "as" cast on an IAsyncResult variable or parameter to get access to the underlying implementation. If an exception is raised due to an invalid cast, most likely someone passed in another implementation of IAsyncResult. The the rule is that you can only use the result of BeginInvoke, with the corresponding EndInvoke. Calling the wrong EndInvoke method will cause an invalid type cast exception to be raised.


We could have done all of this by returning an object instance (say a pure abstract TAsyncResult type) and eliminated the interface altogether. However, that would have meant that the BeginInvoke caller would now be responsible for destroying the instance, it would also have actually complicated the implementation because There would now have to be two object instances because the lifetime of the instance returned by BeginInvoke() would have to specifically be independent from the actual asynchronous call. Otherwise, the destructor would have to block until the call completed. Another reason is that we'll re-use this interface in other asynchronous scenarios.


Let's cover the Invoke and WaitForCompletion methods first:

function TBaseAsyncResult.Invoke: IAsyncResult;
begin
FInvokingThread := GetCurrentThreadId;
Result := Self;
_AddRef();
TThread.Queue(nil, DoAsyncDispatch);
end;

procedure TBaseAsyncResult.WaitForCompletion;
var
LException: TObject;
begin
if not FCompleted then
ASyncHandleObject.WaitFor(INFINITE);
LException := InterlockedExchangePointer(Pointer(FInvokingException), nil);
if LException <> nil then
raise LException;
end;

The intended usage of the Invoke method is to be called immediately upon successful construction, preferably within the same expression:

  Result := TAsyncProcResult.Create(AProc).Invoke;

We don't schedule, or queue, the procedure until the Invoke call because that causes a race-condition. Because the queued procedure, DoAsyncDispatch, is a normal method pointer, it doesn't serve as an interface reference. There is a chance that the BeginInvoke caller could schedule a call, then immediately drop the IAsyncResult (or simply go out of scope) reference which would have freed the instance prematurely. We could have done some of the Invoke work in the constructor, but we'd also have to deal with the way TInterfacedObject does its construction (another topic, look in System.pas).


Invoke, first gets the invoking thread so we know which thread scheduled the call. We'll use this later to determine whether or not TThread.Queue made a direct method call because BeginInvoke was called from the main thread and is the basis for the CompletedSynchronously property on IAsyncResult. Next is assigns the Result. Here the compiler will perform a static cast of "Self" to get the interface and then call _AddRef(). The caller is guaranteed a valid reference now. Next we manually call _AddRef() because as indicated in the previous paragraph, the caller is free to drop the IAsyncResult reference at any point. By doing the _AddRef() before the call is queued, we can now guarantee that when DoAsyncDispatch is called, it is still a valid instance. Finally the call is queued and the function returns.


WaitForCompletion, serves a dual-fold purpose. The first is as the name implies, it merely blocks the caller until the queued method is completed. The other purpose is to properly propagate any exception that may have occurred during the call on the main thread. We'll see how this is handled when we look at DoAsyncDispatch() and Complete() next.

procedure TBaseAsyncResult.DoAsyncDispatch;
begin
FSynchronous := FInvokingThread = GetCurrentThreadId;
try
try
AsyncDispatch;
except
FInvokingException := AcquireExceptionObject
end;
finally
Complete;
_Release;
end;
end;

procedure TBaseAsyncResult.Complete;
begin
System.TMonitor.Enter(Self);
try
FCompleted := True;
if FAsyncHandle <> nil then
FAsyncHandle.SetEvent;
finally
System.TMonitor.Exit(Self);
end;
end;

The first thing that DoAsyncDispatch does is to determine whether or not this method was called synchronously. Again, this can happen if the TThread.Queue call above decided to directly execute the method. Then it proceeds to do the actual work of the call by calling AsyncDispatch, which is an abstract virtual method. The intent here is that only descendants should ever be instantiated and they must override AsyncDispatch. Here is also where we'll trap all exceptions. The AquireExceptionObject function is from the System unit which will find the current exception object from the raise list and return a reference to it. It will also drop that reference from the raise list which will mean it will no longer be automatically destroyed. When an except block is exited normally or another object instance is raised (raise; is a special case), the existing exception object instance is automatically freed. This prevents that instance from being freed and allows us to hang on to a reference for use in WaitForCompletion where we can raise it again.


The next call is to Complete which encapsulates the steps necessary to safely set the completed state and optionally release any callers blocked in WaitForCompletion. Here is uses the new "monitor" support available on any object instance for locking. Now back in DoAsyncDispatch, the last call is to _Release. This undoes the manual _AddRef call Invoke because we now know the TThread queue doesn't have a reference to this instance anymore. It is also entirely possible that this _Release call could cause this instance to be freed. This is safe because there is no other code following this call that accesses any instance fields. Only the local stack and exception frames are cleaned up and control is returned to the caller.


Here are the remaining methods of TBaseAsyncResult. They should be self-explanatory:

destructor TBaseAsyncResult.Destroy;
begin
FAsyncHandle.Free;
FInvokingException.Free;
inherited;
end;

function TBaseAsyncResult.GetAsyncHandleObject: THandleObject;
begin
if FAsyncHandle = nil then
begin
System.TMonitor.Enter(Self);
try
if FAsyncHandle = nil then
begin
FAsyncHandle := TSimpleEvent.Create();
if FCompleted then
FAsyncHandle.SetEvent;
end;
finally
System.TMonitor.Exit(Self);
end;
end;
Result := FAsyncHandle;
end;

function TBaseAsyncResult.GetAsyncResult: TBaseAsyncResult;
begin
Result := Self;
end;

function TBaseAsyncResult.GetCompletedSynchronously: Boolean;
begin
Result := FSynchronous and FCompleted;
end;

function TBaseAsyncResult.GetIsCompleted: Boolean;
begin
Result := FCompleted;
end;

Now that the base functionality is defined, we can create the specialized descendant classes that will provide the specific functionality for the procedure and function call cases. We'll start with the simple procedure call first:

type
TAsyncProcedureResult = class sealed (TBaseAsyncResult)
private
FAsyncProcedure: TProc;
protected
procedure AsyncDispatch; override;
constructor Create(const AAsyncProcedure: TProc);
end;

As you can imagine, this one is pretty simple now. All the "meat" of the functionality is in TBaseAsyncResult. All this class needs to do is to add an instance field for a place to store the TProc reference, and override the AsyncDispatch virtual method. There is some tricky code here :-):

constructor TAsyncProcedureResult.Create(const AAsyncProcedure: TProc);
begin
inherited Create;
FAsyncProcedure := AAsyncProcedure;
end;

procedure TAsyncProcedureResult.AsyncDispatch;
begin
FAsyncProcedure();
end;

The function call version gets a little more interesting and is made possible through the judicious application of a generic type:

  TAsyncFunctionResult<T> = class sealed (TBaseAsyncResult)
private
FAsyncFunction: TFunc<T>;
FRetVal: T;
protected
procedure AsyncDispatch; override;
function GetRetVal: T;
constructor Create(const AAsyncFunction: TFunc<T>);
end;

Here we've created a generic type from a non-generic ancestor. The reason is that we don't know what the result type will be. Like the TAsyncProcedureResult type, we need an instance field to hold on to the TFunc<T> reference. For this class, however, we also need to save off the result of the function call, thus the FRetVal: T; field. We also need to have a way to get this value, so I've added the GetRetVal: T; function. The reason for the function call rather than directly accessing the field is to ensure that the caller blocks until the actual async call is completed by calling WaitForCompletion.

constructor TAsyncFunctionResult<T>.Create(const AAsyncFunction: TFunc<T>);
begin
inherited Create;
FAsyncFunction := AAsyncFunction;
end;

procedure TAsyncFunctionResult<T>.AsyncDispatch;
begin
FRetVal := FAsyncFunction();
end;

function TAsyncFunctionResult<T>.GetRetVal: T;
begin
WaitForCompletion;
Result := FRetVal;
end;

The final step is to put all of this together. Now that the core functionality is handled, the implementation of BeginInvoke/EndInvoke becomes very simple:

function TControlHelper.BeginInvoke(const AProc: TProc): IAsyncResult;
begin
Result := TAsyncProcedureResult.Create(AProc).Invoke;
end;

function TControlHelper.BeginInvoke<T>(const AFunc: TFunc<T>): IAsyncResult;
begin
Result := TAsyncFunctionResult<T>.Create(AFunc).Invoke;
end;

procedure TControlHelper.EndInvoke(const ASyncResult: IAsyncResult);
begin
(AsyncResult as IGetAsyncResult).AsyncResult.WaitForCompletion;
end;

function TControlHelper.EndInvoke<T>(const AsyncResult: IAsyncResult): T;
begin
Result := ((AsyncResult as IGetAsyncResult).AsyncResult as TAsyncFunctionResult<T>).GetRetVal;
end;

Finally, I need to explain the little "gotcha" with the usage example from my last post. Understanding how anonymous methods "capture" the enclosing context will serve to protect you from potentially confusing runtime errors, especially when dealing with asynchronous code. When the compiler "captures" the context it does so by capturing variables, not values. This is an important and critical distinction. It essentially means that you still have to work to protect variables being accessed by multiple threads. In the example code, the SR: TSearchRec; variable is used both inside and outside the anonymous method. The outer context is looping and updating the SR structure, and the inner anonymous method context is reading the value. Because of the asynchronous nature of the whole BeginInvoke/EndInvoke thing, they are now running completely independent of one another. Because of the very thing we're trying to accomplish, this causes a race on the SR variable.  The example implicitly synchronizes access to SR by not allowing FindNext() to execute until after the FForm.EndInvoke(AR); call. In this case, it essentially "re-serializes" the code and doesn't really demonstrate truly async code. Next time I'll present a technique that allows you to capture the values instead of the variables so that we don't actually "capture" SR and can "stack" lots of asynchronous calls. We'll ignore the exception propagation thing for now.

2 comments:

  1. No Comments? Are comments busted here? Is Allen's peanut gallery asleep on the job here?

    Nick

    ReplyDelete
  2. The Comment Crisis is hitting hard..
    Just kidding, but there is a lot of new Delphi news, blogs code samples and all to absorb, these last weeks. You know, I actually printed these Async topics on paper, to read them later.

    ReplyDelete

Please keep your comments related to the post on which you are commenting. No spam, personal attacks, or general nastiness. I will be watching and will delete comments I find irrelevant, offensive and unnecessary.