Posts Tagged Lambda

Why lambdas seem broken in multithreaded environments (or how closures in C# works).

I’ve been playing around with some code for.NET 3.5 to enable us to split big operations into small parallel tasks. During this work I was reminded why Resharper has the “Access to modified closure” warning. This warning tells us about a inconsistency in handling the ”Immutable” loop variable created in a foreach loop when lambdas or anonymous delegates are involved.

Take this code:

public static void ForEach(IEnumerable list)
{
    foreach(var item in list)
    {
        Action action = () => Console.WriteLine(item);
        action.Invoke();
    }
}

This will yield the expected result. When we call WriteLine the current item will be displayed:

A naïve and not so interesting example, but creates a baseline. Now let’s look at the real code where the reminder was at, a mimic of .NET 4’s parallel foreach:

public static void ForEach(IEnumerable list, Action task)
{
    var waitHandles = new ManualResetEvent[list.Count()];

    var index = 0;
    foreach(var item in list)
    {
        var handleForCurrentTask = new ManualResetEvent(false);
        waitHandles[index] = handleForCurrentTask;
        ThreadPool.QueueUserWorkItem(delegate
         {
             task(item);
             handleForCurrentTask.Set();
         });

        index++;
    }

    WaitHandle.WaitAll(waitHandles);
}

Calling this with the following line:

ForEach(items, Console.WriteLine);

Will give a more unexpected result (if you don’t know what’s going on that is).

So what is going on, or how closures are implemented in C#.

To understand the issue we must examine how the C# compiler handles anonymous delegates and lambdas (called lambda from now on). Since they are really a compiler trick and not a real feature of the Common Language Runtime.

For every lambda we define; the compiler will generate a new method (there are some nice tricks to avoid duplication but for the sake of this post let’s go with this simple definition). As illustrated in this screenshot from Reflector:

As you can see this is encapsulated inside an auto generated class. That class also contains a couple of instances variables that we’ll discuss.

We can see that the method encapsulated the two lines of code we had inside our lambda and has a dependency on “task” and “item” from instance variables that are also auto generated types.

To be able to execute the Foreach method in the auto generated class, we’ll need to initialize the instance variables of the lambda class. The compiler neatly picks this up and creates a couple lines of code in the beginning of the method to encapsulate Task:

public static void ForEach(IEnumerable list, Action task)
{
    <>c__DisplayClass1 CS$<>8__locals2 = new <>c__DisplayClass1();
    CS$<>8__locals2.task = task;

 

A bit further down in the method we can see how the the item variable is encapsulated inside another auto generated instance variable.

  <>c__DisplayClass3 CS$<>8__locals4 = new <>c__DisplayClass3();
        while (CS$5$0000.MoveNext())
        {
            CS$<>8__locals4.item = CS$5$0000.Current;

Notice how the creation of the object that encapsulates the item is declared outside of the iteration and the item is then passed into the object inside of the iteration. Here is the heart of the problem. In the interest of optimization, the compiler encapsulates parameters and variables as few times as possible. It seems like the optimizer do not recognize the immutable nature of the loop variable, nor the perceived local scope of it.

Further down this is passed into the ThreadPool as a delegate:

<>c__DisplayClass5 CS$<>8__locals6 = new <>c__DisplayClass5();
CS$<>8__locals6.CS$<>8__locals4 = CS$<>8__locals4;
CS$<>8__locals6.CS$<>8__locals2 = CS$<>8__locals2;
CS$<>8__locals6.handleForCurrentTask = new ManualResetEvent(false);
waitHandles[index] = CS$<>8__locals6.handleForCurrentTask;
ThreadPool.QueueUserWorkItem(new WaitCallback(CS$<>8__locals6.b__0));

So while the delegate that is passed to the thread pool is instantiated for each iteration, the variables and parameters used by the closure isn’t.

In a scenario where the lambda is executed inside the loop this will not be a problem, the next time the item variable is assigned, the previous execution will already be done. In a multithreaded, or a deferred execution of the lambda the story is quite different.

The assignment of the Item variable is far from safe here. Since the reference to the item encapsulation will be shared between all of the instances of the delegate passed to the ThreadPool. Thus they will share the last (or the last at the time of execution) assignment.

To solve this we need to tell the compiler that the Item variable is locally scoped to the loop and not a method-wide variable.

A simple change to:

foreach(var item in list)
{
    var handleForCurrentTask = new ManualResetEvent(false);
    waitHandles[index] = handleForCurrentTask;
    var currentItem = item;
    ThreadPool.QueueUserWorkItem(delegate
     {
         task(currentItem);
         handleForCurrentTask.Set();
     });

Will be enough for the compiler do do the right thing. It will now move Item inside the closure class for the lambda and not create a separate class:

 

Since this class had an instance created for each iteration, we will now have a separate copy of the item value in each delegate passed to the ThreadPool:

<>c__DisplayClass3 CS$<>8__locals4 = new <>c__DisplayClass3();
CS$<>8__locals4.CS$<>8__locals2 = CS$<>8__locals2;
CS$<>8__locals4.handleForCurrentTask = new ManualResetEvent(false);
waitHandles[index] = CS$<>8__locals4.handleForCurrentTask;
CS$<>8__locals4.currentItem = item;
ThreadPool.QueueUserWorkItem(new WaitCallback(CS$<>8__locals4.b__0)); 

And we’ll get the expected result again:

In conclusion

Most people will not run into this problem, but it is an interesting one never the less. I would argue that for deferred executed lambdas, the semantics of foreach is not the same as it is for the rest of the language. I’m sure the C# team has a really good reason for ignoring that Item really should be handled as a locally scoped variable, but my brain isn’t big enough to figure that out at the moment.

Maybe some of you know?

,

3 Comments

Tip: Mocking callbacks using RhinoMocks when lambdas are involved

There is several cool language features available in C# 3.0, one of them is lambdas. It’s a handy tool when you want to execute a few lines of code where a delegate is expected. Fredrik Normén and I chatted about one of those places today, callbacks in async scenarios.

Given this callback:

public string ResultOfServiceCall { get; private set; }
public ContextThatDoesStuff(IServiceWithCallback service)
{
    service.Begin(
            () => ResultOfServiceCall = service.End()
            );
}

How would you set up a mock to handle that? Somehow you need to intercept that lambda and execute it. One way, there might be others, to solve this with RhinoMocks is to use the WhenCalling feature. A simple test to demonstrate:

 

[TestMethod]
public void TheTest()
{
    var mock = MockRepository.GenerateMock();

    mock.Expect(service => service.Begin(null))
        .IgnoreArguments()
        .WhenCalled(invocation => ((Action)invocation.Arguments[0]).Invoke());

    mock.Expect(service => service.End())
        .Return("The String");

    var context = new ContextThatDoesStuff(mock);


    Assert.AreEqual(context.ResultOfServiceCall, "The String");
}

In the above example we capture the in parameter, here it’s a lambda, casting it to an Action delegate and invoke it.

I just love the flexibility in RhinoMocks, don’t you?

, , , TDD

No Comments

Executing lambdas on transaction commit – or Transactional delegates.

One great thing with the TransactionScope class is that it makes it possible to include multiple levels in your object hierarchy.  Everything that is done from the point that the scope was created until it is disposed will be in the exact same transaction. This makes it easy to control business transactions from high up in the call stack and allow for everything to participate.

Everything but calls to delegates, until now. I checked in code in our project today to allow for this:

   1: using(var scope = new TransactionScope)

   2: {

   3:     repository.Save(order);

   4:     bus.SendMessage(new OrderStatusChanged {OrderId = order.Id, Status = order.Status});

   5:     scope.Complete();

   6: }

   7:  

   8: // In some message listener somwhere that listens to the Message sent

   9: // But still in the transaction scope.

  10:  

  11: public void HandleMessage(OrderStatusChanged message)

  12: {

  13:     new OnTransactionComplete

  14:     (

  15:         () => publisher.NotifyClientsStatusChange(message.OrderId, message.NewStatus);

  16:     ) 

  17: }

  18:  

  19:  

 

Why is this interesting?

Why would anyone want a delegate to be called on commit and only then? The scenarios where I find this and other similar scenarios (the code allows for OnTransactionRollback and AfterTransactionComplete as well) is where you have events or message sent to other subsystems asking them react. It can be more then one that listens and often you might not want to carry out the action if the transaction started high above in the business layer isn’t commited (like telling all clients that the status has changed and then the transaction rolls back).

How can I implement this?

The code to achieve this was fairly simple. Transactions in System.Transaction has a EnlistVoilatile method where you can send in anything that implements IEnlistmentNotification. This handy little interface defines four methods:

   1:  

   2: public interface IEnlistmentNotification

   3: {

   4:     void Prepare(PreparingEnlistment preparingEnlistment);

   5:     void Commit(Enlistment enlistment);

   6:     void Rollback(Enlistment enlistment);

   7:     void InDoubt(Enlistment enlistment);

   8: }

Any object enlisting in a transaction with this interface will be told what’s happening and can participate in the voting. This particular implementation don’t vote it just reacts on Commit or Rollbacks.

The implementation spins around the class TransactionalAction that in it’s constructor accepts an Action delegate as a target and then enlists the object in the current transaction:

 

   1: public TransactionalAction(Action action)

   2: {

   3:      Action = action;

   4:      Enlist();

   5: }

   6:  

   7: public void Enlist()

   8: {

   9:      var currentTransaction = Transaction.Current;

  10:      currentTransaction.EnlistVolatile(this, EnlistmentOptions.None);

  11: }

Enlisting the object into the transaction will effectively add it to the transactions graph. As long as the transaction scope have a root reference, the object will stay alive. That is why this works by only stating new OnTransactionComplete( () => ) and you don’t have to save the reference.

The specific OnTransactionComplete, OnTransactionRollback  and WhenTransactionInDoubt then inherits from TransactionalAction and overrides the appropriate method (the base commit ensures that we behave well in a transactional scenario):

   1: public class OnTransactionCommit : TransactionalAction

   2: {

   3:     public OnTransactionCommit(Action action)

   4:         : base(action)

   5:     { }

   6:  

   7:     public override void Commit(System.Transactions.Enlistment enlistment)

   8:     {

   9:         Action.Invoke();

  10:         base.Commit(enlistment);

  11:     }

  12: }

Hey, what about AfterTransactionCommit?

The IEnlistmentNotification isn’t an interceptor model. It just allows you to participate in the transactional work. To be able to make After calls we need to use another mechanism. The TransactionCompleted event. As the previous code, this is fairly simple;

   1: public class AfterTransactionComplete

   2: {

   3:     private Action action;

   4:  

   5:     public AfterTransactionComplete(Action action)

   6:     {

   7:         this.action = action;

   8:         Enlist();

   9:     }

  10:  

  11:     private void Enlist()

  12:     {

  13:         var currentTransaction = Transaction.Current;

  14:  

  15:         if(NoTransactionInScope(currentTransaction))

  16:             throw new InvalidOperationException("No active transaction in scope");

  17:  

  18:         currentTransaction.TransactionCompleted += TransactionCompleted;

  19:     }

  20:  

  21:     private void TransactionCompleted(object sender, TransactionEventArgs e)

  22:     {

  23:         if(TransactionHasBeenCommited(e))

  24:             action.Invoke();

  25:     }

  26: }

Summary

With some simple tricks using Transaction.Current and lambdas we can now participate in our transactions with our delegates. Download the code to get to do this:

   1: new OnTransactionComplete

   2: (

   3:   () => DoStuff()

   4: )

   5:  

   6: new OnTransactionRollback

   7: (

   8:     () => DoStuff()

   9: )

  10:  

  11: new AfterTransactionCommit

  12: (

  13:     () => DoStuff();

  14: )

  15:  

  16: // Also includes a more "Standard" api:

  17: Call.OnCommit( () => DoStuff );

  18:  

  19: // Since Call has extension methods, this is also valid:

  20: ((Action)(()=> firstActionWasCalled=true)).OnTransactionCommit();

I hope this helps you build more robust solutions.

Code download (project is VS2010 but it works on .NET 2.0) [306kb]

More information on IEnlistmentNotification

More information on EnlistVolatile

, ,

2 Comments