Tuesday, April 30, 2013

C# style async/await in C++ - Part 2 Using with Microsoft PPL/PPLX

Last time we talked a little about asynchrony and about the cpp_async_await project. The previous article is located at http://jrb-programming.blogspot.com/2013/04/c-style-asyncawait-in-c-part-1.html. All code for the project is located at https://github.com/jbandela/cpp_async_await/. We talked about how to use the library with Boost.Asio.

As mentioned before the other major C++ library is Microsoft PPL/PPLX (PPLX is the cross platform port of PPL by Microsoft Casablanca Project) You can obtain PPLX and the documentation at http://casablanca.codeplex.com/ along with a host of other really neat stuff such as an http client, json library, etc. From here on out, unless specified otherwise, you can take what I say about PPL and assume that it applies to PPLX.

While Boost.Asio uses a callback model, PPL/PPLX uses a continuation model. The key class is

template < typename _Type>
class task;

_Type specifies the type of value produced by the task and it can be void. Task is very similar to std::future with the addition of the .then method. Whereas std::future has a .get method which blocks until the future is complete, the .then method allows a lambda to be specified which will be called when the task is complete. You can read more about PPL tasks at http://msdn.microsoft.com/en-us/library/dd492427(v=vs.110).aspx.

Here is an example of how to use tasks and continuations taken from the above link

// basic-continuation.cpp 
// compile with: /EHsc
#include <ppltasks.h>
#include <iostream>

using namespace concurrency;
using namespace std;

int wmain()
{
    auto t = create_task([]() -> int
    {
        return 42;
    });

    t.then([](int result)
    {
        wcout << result << endl;
    }).wait();

    // Alternatively, you can chain the tasks directly and 
    // eliminate the local variable. 
    /*create_task([]() -> int
    {
        return 42;
    }).then([](int result)
    {
        wcout << result << endl;
    }).wait();*/
}

/* Output:
    42
*/

This is actually pretty neat and it is easier to chain tasks than in Boost.Asio.

There is currently a proposal to add .then to std::future. You can find the proposal at http://isocpp.org/files/papers/N3558.pdf

However, it gets hard to use once you need to do anything in a loop. Due to this, along with other reasons, there is a proposal to add resumable functions to the C++ standard. You can find the paper at http://isocpp.org/files/papers/N3564.pdf

Here is one of the motivating example from that paper. Note, they are using a future with the .then continuations just like a PPL task currently

auto write =
    [&buf](future<int> size) -> future<bool> 
{ 
    return streamW.write(size.get(), buf).then(
        [](future<int> op){ return op.get() > 0; });
};
auto flse = [](future<int> op){ return 
    future::make_ready_future(false);};
auto copy = do_while(
    [&buf]() -> future<bool> 
{ 
    return streamR.read(512, buf)
        .choice(
        [](future<int> op){ return op.get() > 0; }, write, flse);
});

The code asynchronously reads a stream 512 bytes at a time until no more bytes are read, while asynchronously writing what was read. Here is how the code looks with the proposed C++ language additions. Note that resumable marks a function as resumable and await suspends the function and then resumes the function when the awaited future(task) is complete returning the value generated by the task that was awaited.

int cnt = 0;
do 
{
cnt = await streamR.read(512, buf);
if ( cnt == 0 ) break;
cnt = await streamW.write(cnt, buf);
} while (cnt > 0);

Notice how much easier to follow the code is with the language additions. The downside is you will have to wait for the proposal to be approved, become part of a standard, and for your compiler to implement it.

The good news is you can have much of the same convenience using cpp_async_await now. First a motivating example. In the Casablanca REST SDK that provides PPLX, there is an example of asynchronously searching a file for lines which contain a some string and writing them asynchronously to another file. You can find the code at http://casablanca.codeplex.com/SourceControl/changeset/view/040c323727ca7747beb254ecf2b8eac73632f3be#Release/collateral/Samples/SearchFile/searchfile.cpp. We are using PPLX because it is a bit easier to have a real example with a commandline app. You would use PPL tasks in the same way as PPLX tasks.

#include <filestream.h>
#include <containerstream.h>
#include <producerconsumerstream.h>

using namespace utility;
using namespace concurrency::streams;

/// <summary>
/// A convenient helper function to loop asychronously until a condition is met.
/// </summary>
pplx::task<bool> _do_while_iteration(std::function<pplx::task<bool>(void)> func)
{
    pplx::task_completion_event<bool> ev;
    func().then([=](bool guard)
    {
        ev.set(guard);
    });
    return pplx::create_task(ev);
}
pplx::task<bool> _do_while_impl(std::function<pplx::task<bool>(void)> func)
{
    return _do_while_iteration(func).then([=](bool guard) -> pplx::task<bool>
    {
        if(guard)
        {
            return ::_do_while_impl(func);
        }
        else
        {
            return pplx::task_from_result(false);
        }
    });
}
pplx::task<void> do_while(std::function<pplx::task<bool>(void)> func)
{
    return _do_while_impl(func).then([](bool){});
}

/// <summary>
/// Structure used to store individual line results.
/// </summary>
typedef std::vector<std::string> matched_lines;
namespace Concurrency { namespace streams {
/// <summary>
/// Parser implementation for 'matched_lines' type.
/// </summary>
template <typename CharType>
class _type_parser<CharType, matched_lines>
{
public:
    static pplx::task<matched_lines> parse(streambuf<CharType> buffer)
    {
        basic_istream<CharType> in(buffer);
        auto lines = std::make_shared<matched_lines>();
        return do_while([=]()
        {
            container_buffer<std::string> line;
            return in.read_line(line).then([=](const size_t bytesRead)
            {
                if(bytesRead == 0 && in.is_eof())
                {
                    return false;
                }
                else
                {
                    lines->push_back(std::move(line.collection()));
                    return true;
                }
            });
        }).then([=]()
        {
            return matched_lines(std::move(*lines));
        });
    }
};
}}
/// <summary>
/// Function to create in data from a file and search for a given string writing all lines containing the string to memory_buffer.
/// </summary>
static pplx::task<void> find_matches_in_file(const string_t &fileName, const std::string &searchString, basic_ostream<char> results)
{
    return file_stream<char>::open_istream(fileName).then([=](basic_istream<char> inFile)
    {           
        auto lineNumber = std::make_shared<int>(1);
        return ::do_while([=]()
        {
            container_buffer<std::string> inLine;
            return inFile.read_line(inLine).then([=](size_t bytesRead)
            {
                if(bytesRead == 0 && inFile.is_eof())
                {
                    return pplx::task_from_result(false);
                }

                else if(inLine.collection().find(searchString) != std::string::npos)
                {
                    results.print("line ");
                    results.print((*lineNumber)++);
                    return results.print(":").then([=](size_t)
                    {
                        container_buffer<std::string> outLine(std::move(inLine.collection()));
                        return results.write(outLine, outLine.collection().size());
                    }).then([=](size_t)
                    {
                        return results.print("\r\n");
                    }).then([=](size_t)
                    {
                        return true;
                    });
                }

                else
                {
                    ++(*lineNumber);
                    return pplx::task_from_result(true);
                }
            });
        }).then([=]()
        {
            // Close the file and results stream.
            return inFile.close() && results.close();
        });
    })

    // Continution to erase the bool and return task of void.
    .then([](std::vector<bool>) {});
}

/// <summary>
/// Function to write out results from matched_lines type to file
/// </summary>
static pplx::task<void> write_matches_to_file(const string_t &fileName, matched_lines results)
{
    // Create a shared pointer to the matched_lines structure to copying repeatedly.
    auto sharedResults = std::make_shared<matched_lines>(std::move(results));

    return file_stream<char>::open_ostream(fileName, std::ios::trunc).then([=](basic_ostream<char> outFile)
    {
        auto currentIndex = std::make_shared<size_t>(0);
        return ::do_while([=]()
        {
            if(*currentIndex >= sharedResults->size())
            {
                return pplx::task_from_result(false);
            }

            container_buffer<std::string> lineData((*sharedResults)[(*currentIndex)++]);
            outFile.write(lineData, lineData.collection().size());
            return outFile.print("\r\n").then([](size_t)
            {
                return true;
            });
        }).then([=]()
        {
            return outFile.close();
        });
    })

    // Continution to erase the bool and return task of void.
    .then([](bool) {});
}

#ifdef _MS_WINDOWS
int wmain(int argc, wchar_t *args[])
#else
int main(int argc, char *args[])
#endif
{
    if(argc != 4)
    {
        printf("Usage: SearchFile.exe input_file search_string output_file\n");
        return -1;
    }
    const string_t inFileName = args[1];
    const std::string searchString = utility::conversions::to_utf8string(args[2]);
    const string_t outFileName = args[3];
    producer_consumer_buffer<char> lineResultsBuffer;

    // Find all matches in file.
    basic_ostream<char> outLineResults(lineResultsBuffer);
    find_matches_in_file(inFileName, searchString, outLineResults)

    // Write matches into custom data structure.
    .then([&]()
    {
        basic_istream<char> inLineResults(lineResultsBuffer);
        return inLineResults.extract<matched_lines>();
    })

    // Write out stored match data to a new file.
    .then([&](matched_lines lines)
    {
        return write_matches_to_file(outFileName, std::move(lines));
    })

    // Wait for everything to complete.
    .wait();

    return 0;
}

Notice how painful iteration is. Now here is the code using cpp_await_async pplx_helper. Just a quick note. The code above first copies the matching lines into a producer_consumer_buffer and then into a vector and then to the output file. My code copies into the producer_consumer_buffer and then uses that buffer to copy to output. I think, my code achieves the same level of concurrency as the example program. If I am incorrect in this, please let me know in the comments below. You can find the whole file at https://github.com/jbandela/cpp_async_await/blob/master/PplxExample2.cpp

#include "pplx_helper.hpp"
#include <filestream.h>
#include <containerstream.h>
#include <producerconsumerstream.h>

using namespace utility;
using namespace concurrency::streams;



#ifdef _MS_WINDOWS
int wmain(int argc, wchar_t *args[])
#else
int main(int argc, char *args[])
#endif
{
    if(argc != 4)
    {
        printf("Usage: PplxExample2 input_file search_string output_file\n");
        return -1;
    }
    const string_t inFileName = args[1];
    const std::string searchString = utility::conversions::to_utf8string(args[2]);
    const string_t outFileName = args[3];
    producer_consumer_buffer<char> lineResultsBuffer;

    // Find all matches in file.
    basic_ostream<char> outLineResults(lineResultsBuffer);

    auto reader = pplx_helper::do_async([&](pplx_helper::async_helper<void> helper){
        auto inFile = helper.await(file_stream<char>::open_istream(inFileName));
        int lineNumber = 1;
        bool done = false;
        while(!done){
            container_buffer<std::string> inLine;
            auto bytesRead = helper.await(inFile.read_line(inLine));
            if(bytesRead==0 && inFile.is_eof()){
                done = true;
            }
            else if(inLine.collection().find(searchString) != std::string::npos){
                helper.await(outLineResults.print("line "));
                helper.await(outLineResults.print(lineNumber++));
                helper.await(outLineResults.print(":"));
                container_buffer<std::string> outLine(std::move(inLine.collection()));
                helper.await(outLineResults.write(outLine,outLine.collection().size()));
                helper.await(outLineResults.print("\r\n"));
            }
            else{
                ++lineNumber;
            }

        }
        helper.await(inFile.close() && outLineResults.close());
    });

    auto writer = pplx_helper::do_async([&](pplx_helper::async_helper<void> helper){
        basic_istream<char> inLineResults(lineResultsBuffer);
        auto outFile = helper.await(file_stream<char>::open_ostream(outFileName,std::ios::trunc));
        auto currentIndex = 0;
        bool done = false;
        while(!done){
            container_buffer<std::string> lineData;
            auto bytesRead = helper.await(inLineResults.read_line(lineData));
            if(bytesRead==0 && inLineResults.is_eof()){
                done = true;
            }
            else{
                container_buffer<std::string> lineDataOut(std::move(lineData.collection()));
                helper.await(outFile.write(lineDataOut,lineDataOut.collection().size()));
                helper.await(outFile.print("\r\n"));
            }
        }
        helper.await(inLineResults.close() && outFile.close());

    });


    try{
    // Wait for everything to complete and catch any exceptions
    (reader && writer).wait();

    }
    catch(std::exception& e){
        std::cerr << e.what();
    }

    return 0;
}

Notice how we can easily do iteration. The library is pretty similar to what can be achieved with the language additions. Instead of of resumable to mark a function as resumable, we use pplx_helper::do_async which takes a lambda. The lambda takes a single parameter of pplx_helper::async_helper<void>. If the lambda were to return an int for example it would take pplx_helper::async_helper<int> . In general a lambda return type T takes pplx_helper::async_helper<T>. In the case of the example code the parameter is named helper. In the language proposal you use the unary await keyword to suspend the function until a task is complete and then resume the function returning the value generated by the task were were awaiting. In your code we call helper.await on the task you want to await. helper.await provides pretty much the same convenience as the language keyword await.

You can use the same syntax to work with PPL tasks by using namespace ppl_helper. In summary for PPLX(Project Casablanca) use namespace pplx_helper and for PPL (Shipped with Visual C++ on Windows) use ppl_helper

This functionality is packaged up for you at https://github.com/jbandela/cpp_async_await. It is licensed under the Boost Software License which allows usage for both open source and commercial applications. It is a header only library and does not need to be built, but it does depend on Boost.Coroutine and needs to be linked to the boost_context library. The library has been tested with Visual C++ 2012 on Windows, and G++ 4.7.2 on Fedora Linux.

I hope you have enjoyed this discussion. Download the code and try it out, and let me know what you think. If people are interested, I will talk in a future post about how the library actually works.

Thanks,

John Bandela

Friday, April 26, 2013

C# style async/await in C++ - Part 1: Introduction and use with Boost.Asio

Asynchronous Programming


Asynchronous programming has become more and more important recently as a way to efficiently use the resources available with multicore processors yet at the same time avoid dealing with locking primitives.
In C++, two important libraries for this type of programming are Boost.Asio and Microsoft's Parallel Patterns Library (PPL) Task Library. Boost.Asio provides asynchronous operations with callback handlers. You can learn more about Boost.Asio here. The PPL Task Library provides asynchronous operations using continuations. You can learn more about PPL here.

Async/Await


The problem with using these libraries is that they operate differently from synchronous programming. Your logic ends up being in either multiple callback handlers or in multiple lambda continuations. C# recently added async/await to make it easier to write asynchronous code. You can find out more about them here and watch a presentation here.
There is even a proposal to add this to C++. You can see the proposal here.

Async/Await using Boost.Coroutine


However, you don't want to wait for a language proposal to be approved and then get implemented my compilers to make your programming easier. In fact, you can have a lot of the benefit now. The key that you need is Boost.Coroutine. Boost.Coroutine is in the 1.53 release of Boost. You can read about Boost.Coroutine here. Using Boost.Coroutine, I wrote cpp_async_await which is an open source library with a Boost Software License that allows (as much as possible with a library only solution) async/await style programming in C++ with Boost.Asio and Microsoft PPL/PPLx.

Motivating example


Go take a look at a simple async http client using raw Boost.Asio http://www.boost.org/doc/libs/1_53_0/doc/html/boost_asio/example/http/client/async_client.cpp



Welcome back. Boost.Asio is very powerful, but the callbacks make the logic hard to follow. In contrast here is our version of the same code. You can find the full code at

https://github.com/jbandela/cpp_async_await/blob/master/Example2.cpp

void get_http(boost::asio::io_service& io,std::string server, std::string path){

    using namespace asio_helper::handlers;
    // This allows us to do await
    asio_helper::do_async(io,[=,&io](asio_helper::async_helper helper){

        using boost::asio::ip::tcp;

        // This allows us to use the predefined handlers
        // such as read_handler, write_handler, etc
        using namespace asio_helper::handlers;

        tcp::resolver resolver_(io);
        tcp::socket socket_(io);
        boost::asio::streambuf request_;
        boost::asio::streambuf response_;

        // Form the request. We specify the "Connection: close" header so that the
        // server will close the socket after transmitting the response. This will
        // allow us to treat all data up until the EOF as the content.
        std::ostream request_stream(&request_);
        request_stream << "GET " << path << " HTTP/1.0\r\n";
        request_stream << "Host: " << server << "\r\n";
        request_stream << "Accept: */*\r\n";
        request_stream << "Connection: close\r\n\r\n";

        // Start an asynchronous resolve to translate the server and service names
        // into a list of endpoints.
        tcp::resolver::query query(server, "http");

        // Do async resolve
        tcp::resolver::iterator endpoint_iterator;
        boost::system::error_code ec;
        std::tie(ec,endpoint_iterator) =  helper.await<resolve_handler>(
            [&](resolve_handler::callback_type cb){
                resolver_.async_resolve(query,cb);
        });
        if(ec) {throw boost::system::system_error(ec);}

        // Do async connect
        std::tie(ec,std::ignore) = helper.await<composed_connect_handler>(
            [&](composed_connect_handler::callback_type cb){
                boost::asio::async_connect(socket_,endpoint_iterator,cb);    
        });
        if(ec){throw boost::system::system_error(ec);}

        // Connection was successful, send request
        std::tie(ec,std::ignore) = helper.await<write_handler>(
            [&](write_handler::callback_type cb){
                boost::asio::async_write(socket_,request_,cb);
        });
        if(ec){throw boost::system::system_error(ec);}

        // Read the response status line
        std::tie(ec,std::ignore) = helper.await<read_handler>(
            [&](read_handler::callback_type cb){
                boost::asio::async_read_until(socket_,response_,"\r\n",cb);
        });
        if(ec){throw boost::system::system_error(ec);}

        // Check that the response is OK
        std::istream response_stream(&response_);
        std::string http_version;
        response_stream >> http_version;
        unsigned int status_code;
        response_stream >> status_code;
        std::string status_message;
        std::getline(response_stream, status_message);
        if (!response_stream || http_version.substr(0, 5) != "HTTP/")
        {
            std::cout << "Invalid response\n";
            return;
        }
        if (status_code != 200)
        {
            std::cout << "Response returned with status code ";
            std::cout << status_code << "\n";
            return;
        }

        // Read the response headers, which are terminated by a blank line.
        std::tie(ec,std::ignore) = helper.await<read_handler>(
           [&](read_handler::callback_type cb){
                boost::asio::async_read_until(socket_, response_, "\r\n\r\n",cb);
        });
        if(ec){throw boost::system::system_error(ec);}

        // Process the response headers.
        std::istream response_stream2(&response_);
        std::string header;
        while (std::getline(response_stream2, header) && header != "\r")
            std::cout << header << "\n";
        std::cout << "\n";

        // Write whatever content we already have to output.
        if (response_.size() > 0)
            std::cout << &response_;

        // Continue reading remaining data until EOF.
        bool done = false;
        while(!done){

            std::tie(ec,std::ignore) = helper.await<read_handler>(
                [&](read_handler::callback_type cb){ 
                    boost::asio::async_read(socket_, response_,
                        boost::asio::transfer_at_least(1), cb);         
            });
            if(ec && ec != boost::asio::error::eof){
                throw boost::system::system_error(ec);
            }
            done = (ec == boost::asio::error::eof);
            // Write all of the data so far
            std::cout << &response_;
        }
   });
}

Discussion

Notice how we can have the code all in one function instead of spreading it out, and can read it with a single scan instead of jumping to the handler then back. The magic happens in await. Let's look at a single call
// Connection was successful, send request
std::tie(ec,std::ignore) = helper.await<write_handler>(
        [&](write_handler::callback_type cb){
            boost::asio::async_write(socket_,request_,cb);
});

helper.await takes a template parameter to specify what handler to use. Handlers are defined in namespace asio_helper::handlers. Await takes a single function parameter that consists of a lambda. The lambda takes a parameter of write_handler::callback_type. If we were using a read_handler, it would be read_handler::callback_type and so on.

helper.await returns whatever parameters were passed into the callback handler as a single value, pair,or tuple depending on the number of parameters in the handler. A read_handler has boost::system::error_code ec and std::size_t bytes_transferred as parameters so it returns an std::pair. We then can use std::tie to get the error code and ignore the bytes transferred.

The await function calls the asynchronous Boost.Asio function and then uses Boost.Coroutine to suspend our function and "return" to the calling function. Meanwhile the callback_type is a special function object that when called by Boost.Asio uses Boost.Coroutine to resume our function.
The cpp_async_await library defines handlers for the following Boost.Asio handler types in namespace asio_helper::handlers:
  • read_handler for ReadHandler
  • write_handler for WriteHandler
  • completion_handler for CompletionHandler
  • accept_handler for AcceptHandler
  • composed_connect_handler for ComposedConnectHandler
  • connect_handler for ConnectHandler
  • resolve_handler for ResolveHandler
  • wait_handler for WaitHandler
  • signal_handler for SignalHandler
  • ssl_handshake_handler for HandshakeHandler
  • ssl_shutdown_handler for ShutdownHandler

The handlers allow async_helper::await to return as a value,pair, or tuple whatever values are passed to the callback function.

The code is at https://github.com/jbandela/cpp_async_await/

It is a header only library. For Boost.Asio you need to include asio_helper.hpp. You will need to link to boost_system and boost_context libraries. The code will compile on Windows with MSVC 2012 and on Linux with gcc 4.7.2. You need Boost version 1.53 as that is the version that has Coroutine.

There is also support for Microsoft PPL and PPLx. Include ppl_helper.hpp and pplx_helper.hpp. Due to PPL and PPLx being different from Boost.Asio, there are a few minor changes in how you use the library with PPL and PPLx.

Thanks for taking the time to read this. Download the code and take a look at it and play around with it. Let me know what you think. Next time we will talk about using this library with PPL and PPLx

-John Bandela