/*** * ==++== * * Copyright (c) Microsoft Corporation. All rights reserved. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * * ==--== * =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ * * Asynchronous I/O: streams API, used for formatted input and output, based on unformatted I/O using stream buffers * * For the latest on this and related APIs, please see http://casablanca.codeplex.com. * * =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- ****/ #pragma once #ifndef _CASA_STREAMS_H #define _CASA_STREAMS_H #include "cpprest/astreambuf.h" #include namespace Concurrency { namespace streams { template class basic_ostream; template class basic_istream; namespace details { template class basic_ostream_helper { public: basic_ostream_helper(streams::streambuf buffer) : m_buffer(buffer) { } ~basic_ostream_helper() { } private: template friend class streams::basic_ostream; concurrency::streams::streambuf m_buffer; }; template class basic_istream_helper { public: basic_istream_helper(streams::streambuf buffer) : m_buffer(buffer) { } ~basic_istream_helper() { } private: template friend class streams::basic_istream; concurrency::streams::streambuf m_buffer; }; template struct Value2StringFormatter { template static std::basic_string format(const T &val) { std::basic_ostringstream ss; ss << val; return ss.str(); } }; template <> struct Value2StringFormatter { template static std::basic_string format(const T &val) { std::basic_ostringstream ss; ss << val; return reinterpret_cast(ss.str().c_str()); } static std::basic_string format(const utf16string &val) { return format(utility::conversions::utf16_to_utf8(val)); } }; static const char *_in_stream_msg = "stream not set up for input of data"; static const char *_in_streambuf_msg = "stream buffer not set up for input of data"; static const char *_out_stream_msg = "stream not set up for output of data"; static const char *_out_streambuf_msg = "stream buffer not set up for output of data"; } /// /// Base interface for all asynchronous output streams. /// template class basic_ostream { public: typedef Concurrency::streams::char_traits traits; typedef typename traits::int_type int_type; typedef typename traits::pos_type pos_type; typedef typename traits::off_type off_type; /// /// Default constructor /// basic_ostream() {} /// /// Copy constructor /// /// The source object basic_ostream(const basic_ostream &other) : m_helper(other.m_helper) { } /// /// Assignment operator /// /// The source object /// A reference to the stream object that contains the result of the assignment. basic_ostream & operator =(const basic_ostream &other) { m_helper = other.m_helper; return *this; } /// /// Constructor /// /// A stream buffer. basic_ostream(streams::streambuf buffer) : m_helper(std::make_shared>(buffer)) { _verify_and_throw(details::_out_streambuf_msg); } /// /// Close the stream, preventing further write operations. /// pplx::task close() const { return is_valid() ? helper()->m_buffer.close(std::ios_base::out) : pplx::task_from_result(); } /// /// Close the stream with exception, preventing further write operations. /// /// Pointer to the exception. pplx::task close(std::exception_ptr eptr) const { return is_valid() ? helper()->m_buffer.close(std::ios_base::out, eptr) : pplx::task_from_result(); } /// /// Put a single character into the stream. /// /// A character pplx::task write(CharType ch) const { pplx::task result; if ( !_verify_and_return_task(details::_out_stream_msg, result) ) return result; return helper()->m_buffer.putc(ch); } /// /// Write a single value of "blittable" type T into the stream. /// /// A value of type T. /// /// This is not a replacement for a proper binary serialization solution, but it may /// form the foundation for one. Writing data bit-wise to a stream is a primitive /// operation of binary serialization. /// Currently, no attention is paid to byte order. All data is written in the platform's /// native byte order, which means little-endian on all platforms that have been tested. /// This function is only available for streams using a single-byte character size. /// template CASABLANCA_DEPRECATED("Unsafe API that will be removed in future releases, use one of the other write overloads instead.") pplx::task write(T value) const { static_assert(sizeof(CharType) == 1, "binary write is only supported for single-byte streams"); static_assert(std::is_trivial::value, "unsafe to use with non-trivial types"); pplx::task result; if ( !_verify_and_return_task(details::_out_stream_msg, result) ) return result; auto copy = std::make_shared(std::move(value)); return helper()->m_buffer.putn_nocopy((CharType*)copy.get(), sizeof(T)).then([copy](pplx::task op) -> size_t { return op.get(); }); } /// /// Write a number of characters from a given stream buffer into the stream. /// /// A source stream buffer. /// The number of characters to write. pplx::task write(streams::streambuf source, size_t count) const { pplx::task result; if ( !_verify_and_return_task(details::_out_stream_msg, result) ) return result; if ( !source.can_read() ) return pplx::task_from_exception(std::make_exception_ptr(std::runtime_error("source buffer not set up for input of data"))); if (count == 0) return pplx::task_from_result((size_t)0); auto buffer = helper()->m_buffer; auto data = buffer.alloc(count); if ( data != nullptr ) { auto post_read = [buffer](pplx::task op)-> pplx::task { auto b = buffer; b.commit(op.get()); return op; }; return source.getn(data, count).then(post_read); } else { size_t available = 0; const bool acquired = source.acquire(data, available); if (available >= count) { auto post_write = [source,data](pplx::task op)-> pplx::task { auto s = source; s.release(data,op.get()); return op; }; return buffer.putn_nocopy(data, count).then(post_write); } else { // Always have to release if acquire returned true. if(acquired) { source.release(data, 0); } std::shared_ptr buf(new CharType[count], [](CharType *buf) { delete [] buf; }); auto post_write = [buf](pplx::task op)-> pplx::task { return op; }; auto post_read = [buf,post_write,buffer](pplx::task op) -> pplx::task { auto b = buffer; return b.putn_nocopy(buf.get(), op.get()).then(post_write); }; return source.getn(buf.get(), count).then(post_read); } } } /// /// Write the specified string to the output stream. /// /// Input string. pplx::task print(const std::basic_string& str) const { pplx::task result; if ( !_verify_and_return_task(details::_out_stream_msg, result) ) return result; if (str.empty()) { return pplx::task_from_result(0); } else { auto sharedStr = std::make_shared>(str); return helper()->m_buffer.putn_nocopy(sharedStr->c_str(), sharedStr->size()).then([sharedStr](size_t size) { return size; }); } } /// /// Write a value of type T to the output stream. /// /// /// The data type of the object to be written to the stream /// /// Input object. template pplx::task print(const T& val) const { pplx::task result; if ( !_verify_and_return_task(details::_out_stream_msg, result) ) return result; // TODO in the future this could be improved to have Value2StringFormatter avoid another unnecessary copy // by putting the string on the heap before calling the print string overload. return print(details::Value2StringFormatter::format(val)); } /// /// Write a value of type T to the output stream and append a newline character. /// /// /// The data type of the object to be written to the stream /// /// Input object. template pplx::task print_line(const T& val) const { pplx::task result; if ( !_verify_and_return_task(details::_out_stream_msg, result) ) return result; auto str = details::Value2StringFormatter::format(val); str.push_back(CharType('\n')); return print(str); } /// /// Flush any buffered output data. /// pplx::task flush() const { pplx::task result; if ( !_verify_and_return_task(details::_out_stream_msg, result) ) return result; return helper()->m_buffer.sync(); } /// /// Seeks to the specified write position. /// /// An offset relative to the beginning of the stream. /// The new position in the stream. pos_type seek(pos_type pos) const { _verify_and_throw(details::_out_stream_msg); return helper()->m_buffer.seekpos(pos, std::ios_base::out); } /// /// Seeks to the specified write position. /// /// An offset relative to the beginning, current write position, or the end of the stream. /// The starting point (beginning, current, end) for the seek. /// The new position in the stream. pos_type seek(off_type off, std::ios_base::seekdir way) const { _verify_and_throw(details::_out_stream_msg); return helper()->m_buffer.seekoff(off, way, std::ios_base::out); } /// /// Get the current write position, i.e. the offset from the beginning of the stream. /// /// The current write position. pos_type tell() const { _verify_and_throw(details::_out_stream_msg); return helper()->m_buffer.getpos(std::ios_base::out); } /// /// can_seek is used to determine whether the stream supports seeking. /// /// true if the stream supports seeking, false otherwise. bool can_seek() const { return is_valid() && m_helper->m_buffer.can_seek(); } /// /// Test whether the stream has been initialized with a valid stream buffer. /// /// true if the stream has been initialized with a valid stream buffer, false otherwise. bool is_valid() const { return (m_helper != nullptr) && ((bool)m_helper->m_buffer); } /// /// Test whether the stream has been initialized or not. /// operator bool() const { return is_valid(); } /// /// Test whether the stream is open for writing. /// /// true if the stream is open for writing, false otherwise. bool is_open() const { return is_valid() && m_helper->m_buffer.can_write(); } /// /// Get the underlying stream buffer. /// /// The underlying stream buffer. concurrency::streams::streambuf streambuf() const { return helper()->m_buffer; } protected: void set_helper(std::shared_ptr> helper) { m_helper = helper; } private: template bool _verify_and_return_task(const char *msg, pplx::task &tsk) const { auto buffer = helper()->m_buffer; if ( !(buffer.exception() == nullptr) ) { tsk = pplx::task_from_exception(buffer.exception()); return false; } if ( !buffer.can_write() ) { tsk = pplx::task_from_exception(std::make_exception_ptr(std::runtime_error(msg))); return false; } return true; } void _verify_and_throw(const char *msg) const { auto buffer = helper()->m_buffer; if ( !(buffer.exception() == nullptr) ) std::rethrow_exception(buffer.exception()); if ( !buffer.can_write() ) throw std::runtime_error(msg); } std::shared_ptr> helper() const { if ( !m_helper ) throw std::logic_error("uninitialized stream object"); return m_helper; } std::shared_ptr> m_helper; }; template struct _type_parser_integral_traits { typedef std::false_type _is_integral; typedef std::false_type _is_unsigned; }; #ifdef _WIN32 #define _INT_TRAIT(_t,_low,_high) template<> struct _type_parser_integral_traits<_t>{typedef std::true_type _is_integral;typedef std::false_type _is_unsigned;static const int64_t _min = _low;static const int64_t _max = _high;}; #define _UINT_TRAIT(_t,_low,_high) template<> struct _type_parser_integral_traits<_t>{typedef std::true_type _is_integral;typedef std::true_type _is_unsigned;static const uint64_t _max = _high;}; _INT_TRAIT(char,INT8_MIN,INT8_MAX) _INT_TRAIT(signed char,INT8_MIN,INT8_MAX) _INT_TRAIT(short,INT16_MIN,INT16_MAX) _INT_TRAIT(utf16char,INT16_MIN,INT16_MAX) _INT_TRAIT(int,INT32_MIN,INT32_MAX) _INT_TRAIT(long, LONG_MIN, LONG_MAX) _INT_TRAIT(long long, LLONG_MIN, LLONG_MAX) _UINT_TRAIT(unsigned char,UINT8_MIN,UINT8_MAX) _UINT_TRAIT(unsigned short,UINT16_MIN,UINT16_MAX) _UINT_TRAIT(unsigned int,UINT32_MIN,UINT32_MAX) _UINT_TRAIT(unsigned long, ULONG_MIN, ULONG_MAX) _UINT_TRAIT(unsigned long long, ULLONG_MIN, ULLONG_MAX) #else #define _INT_TRAIT(_t) template<> struct _type_parser_integral_traits<_t>{typedef std::true_type _is_integral;typedef std::false_type _is_unsigned;static const int64_t _min = std::numeric_limits<_t>::min();static const int64_t _max = (std::numeric_limits<_t>::max)();}; #define _UINT_TRAIT(_t) template<> struct _type_parser_integral_traits<_t>{typedef std::true_type _is_integral;typedef std::true_type _is_unsigned;static const uint64_t _max = (std::numeric_limits<_t>::max)();}; _INT_TRAIT(char) _INT_TRAIT(signed char) _INT_TRAIT(short) _INT_TRAIT(utf16char) _INT_TRAIT(int) _INT_TRAIT(long) _INT_TRAIT(long long) _UINT_TRAIT(unsigned char) _UINT_TRAIT(unsigned short) _UINT_TRAIT(unsigned int) _UINT_TRAIT(unsigned long) _UINT_TRAIT(unsigned long long) #endif template class _type_parser_base { public: typedef typename ::concurrency::streams::char_traits::int_type int_type; _type_parser_base() { } protected: // Aid in parsing input: skipping whitespace characters. static pplx::task _skip_whitespace(streams::streambuf buffer); // Aid in parsing input: peek at a character at a time, call type-specific code to examine, extract value when done. // AcceptFunctor should model std::function, int_type)> // ExtractFunctor should model std::function(std::shared_ptr)> template static pplx::task _parse_input(streams::streambuf buffer, AcceptFunctor accept_character, ExtractFunctor extract); }; /// /// Class used to handle asychronous parsing for basic_istream::extract. To support new /// types create a new template specialization and implement the parse function. /// template class type_parser { public: static pplx::task parse(streams::streambuf buffer) { typename _type_parser_integral_traits::_is_integral ii; typename _type_parser_integral_traits::_is_unsigned ui; return _parse(buffer, ii, ui); } private: static pplx::task _parse(streams::streambuf buffer, std::false_type, std::false_type) { _parse_floating_point(buffer); } static pplx::task _parse(streams::streambuf, std::false_type, std::true_type) { #ifdef _WIN32 static_assert(false, "type is not supported for extraction from a stream"); #else throw std::runtime_error("type is not supported for extraction from a stream"); #endif } static pplx::task _parse(streams::streambuf buffer, std::true_type, std::false_type) { return type_parser::parse(buffer).then( [] (pplx::task op) -> T { int64_t val = op.get(); if ( val <= _type_parser_integral_traits::_max && val >= _type_parser_integral_traits::_min ) return (T)val; else throw std::range_error("input out of range for target type"); }); } static pplx::task _parse(streams::streambuf buffer, std::true_type, std::true_type) { return type_parser::parse(buffer).then( [] (pplx::task op) -> T { uint64_t val = op.get(); if ( val <= _type_parser_integral_traits::_max ) return (T)val; else throw std::range_error("input out of range for target type"); }); } }; /// /// Base interface for all asynchronous input streams. /// template class basic_istream { public: typedef char_traits traits; typedef typename char_traits::int_type int_type; typedef typename traits::pos_type pos_type; typedef typename traits::off_type off_type; /// /// Default constructor /// basic_istream() {} /// /// Constructor /// /// /// The data type of the basic element of the stream. /// /// A stream buffer. basic_istream(streams::streambuf buffer) : m_helper(std::make_shared>(buffer)) { _verify_and_throw(details::_in_streambuf_msg); } /// /// Copy constructor /// /// The source object basic_istream(const basic_istream &other) : m_helper(other.m_helper) { } /// /// Assignment operator /// /// The source object /// A reference to the stream object that contains the result of the assignment. basic_istream & operator =(const basic_istream &other) { m_helper = other.m_helper; return *this; } /// /// Close the stream, preventing further read operations. /// pplx::task close() const { return is_valid() ? helper()->m_buffer.close(std::ios_base::in) : pplx::task_from_result(); } /// /// Close the stream with exception, preventing further read operations. /// /// Pointer to the exception. pplx::task close(std::exception_ptr eptr) const { return is_valid() ? m_helper->m_buffer.close(std::ios_base::in, eptr) : pplx::task_from_result(); } /// /// Tests whether last read cause the stream reach EOF. /// /// True if the read head has reached the end of the stream, false otherwise. bool is_eof() const { return is_valid() ? m_helper->m_buffer.is_eof() : false; } /// /// Get the next character and return it as an int_type. Advance the read position. /// /// A task that holds the next character as an int_type on successful completion. pplx::task read() const { pplx::task result; if ( !_verify_and_return_task(details::_in_stream_msg, result) ) return result; return helper()->m_buffer.bumpc(); } /// /// Read a single value of "blittable" type T from the stream. /// /// A value of type T. /// /// This is not a replacement for a proper binary serialization solution, but it may /// form the foundation for one. Reading data bit-wise to a stream is a primitive /// operation of binary serialization. /// Currently, no attention is paid to byte order. All data is read in the platform's /// native byte order, which means little-endian on all platforms that have been tested. /// This function is only available for streams using a single-byte character size. /// template CASABLANCA_DEPRECATED("Unsafe API that will be removed in future releases, use one of the other read overloads instead.") pplx::task read() const { static_assert(sizeof(CharType) == 1, "binary read is only supported for single-byte streams"); static_assert(std::is_trivial::value, "unsafe to use with non-trivial types"); pplx::task result; if ( !_verify_and_return_task(details::_in_stream_msg, result) ) return result; auto copy = std::make_shared(); return helper()->m_buffer.getn((CharType*)copy.get(), sizeof(T)).then([copy](pplx::task op) -> T { return std::move(*copy); }); } /// /// Reads up to count characters and place into the provided buffer. /// /// An async stream buffer supporting write operations. /// The maximum number of characters to read /// A task that holds the number of characters read. This number is 0 if the end of the stream is reached. pplx::task read(streams::streambuf target, size_t count) const { pplx::task result; if ( !_verify_and_return_task(details::_in_stream_msg, result) ) return result; if ( !target.can_write() ) return pplx::task_from_exception(std::make_exception_ptr(std::runtime_error("target not set up for output of data"))); // Capture 'buffer' rather than 'helper' here due to VC++ 2010 limitations. auto buffer = helper()->m_buffer; auto data = target.alloc(count); if ( data != nullptr ) { auto post_read = [target](pplx::task op)-> pplx::task { auto t = target; t.commit(op.get()); return op; }; return buffer.getn(data, count).then(post_read); } else { size_t available = 0; const bool acquired = buffer.acquire(data, available); if (available >= count) { auto post_write = [buffer,data](pplx::task op)-> pplx::task { auto b = buffer; b.release(data, op.get()); return op; }; return target.putn_nocopy(data, count).then(post_write); } else { // Always have to release if acquire returned true. if(acquired) { buffer.release(data, 0); } std::shared_ptr buf(new CharType[count], [](CharType *buf) { delete [] buf; }); auto post_write = [buf](pplx::task op) -> pplx::task { return op; }; auto post_read = [buf,target,post_write](pplx::task op) -> pplx::task { auto trg = target; return trg.putn_nocopy(buf.get(), op.get()).then(post_write); }; return helper()->m_buffer.getn(buf.get(), count).then(post_read); } } } /// /// Get the next character and return it as an int_type. Do not advance the read position. /// /// A task that holds the character, widened to an integer. This character is EOF when the peek operation fails. pplx::task peek() const { pplx::task result; if ( !_verify_and_return_task(details::_in_stream_msg, result) ) return result; return helper()->m_buffer.getc(); } /// /// Read characters until a delimiter or EOF is found, and place them into the target. /// Proceed past the delimiter, but don't include it in the target buffer. /// /// An async stream buffer supporting write operations. /// The delimiting character to stop the read at. /// A task that holds the number of characters read. pplx::task read_to_delim(streams::streambuf target, int_type delim) const { pplx::task result; if ( !_verify_and_return_task(details::_in_stream_msg, result) ) return result; if ( !target.can_write() ) return pplx::task_from_exception(std::make_exception_ptr(std::runtime_error("target not set up for output of data"))); // Capture 'buffer' rather than 'helper' here due to VC++ 2010 limitations. auto buffer = helper()->m_buffer; int_type req_async = ::concurrency::streams::char_traits::requires_async(); std::shared_ptr<_read_helper> _locals = std::make_shared<_read_helper>(); auto flush = [=]() mutable { return target.putn_nocopy(_locals->outbuf, _locals->write_pos).then([=](size_t wrote) mutable { _locals->total += wrote; _locals->write_pos = 0; return target.sync(); }); }; auto update = [=](int_type ch) mutable { if (ch == ::concurrency::streams::char_traits::eof()) return false; if (ch == delim) return false; _locals->outbuf[_locals->write_pos] = static_cast(ch); _locals->write_pos += 1; if (_locals->is_full()) { // Flushing synchronously because performance is terrible if we // schedule an empty task. This isn't on a user's thread. flush().get(); } return true; }; auto loop = pplx::details::do_while([=]() mutable -> pplx::task { while (buffer.in_avail() > 0) { int_type ch = buffer.sbumpc(); if (ch == req_async) { break; } if (!update(ch)) { return pplx::task_from_result(false); } } return buffer.bumpc().then(update); }); return loop.then([=](bool) mutable { return flush().then([=] { return _locals->total; }); }); } /// /// Read until reaching a newline character. The newline is not included in the target. /// /// An asynchronous stream buffer supporting write operations. /// A task that holds the number of characters read. This number is 0 if the end of the stream is reached. pplx::task read_line(streams::streambuf target) const { pplx::task result; if ( !_verify_and_return_task(details::_in_stream_msg, result) ) return result; if ( !target.can_write() ) return pplx::task_from_exception(std::make_exception_ptr(std::runtime_error("target not set up for receiving data"))); // Capture 'buffer' rather than 'helper' here due to VC++ 2010 limitations. concurrency::streams::streambuf buffer = helper()->m_buffer; typename concurrency::streams::char_traits::int_type req_async = concurrency::streams::char_traits::requires_async(); std::shared_ptr<_read_helper> _locals = std::make_shared<_read_helper>(); auto flush = [=]() mutable { return target.putn_nocopy(_locals->outbuf, _locals->write_pos).then([=](size_t wrote) mutable { _locals->total += wrote; _locals->write_pos = 0; return target.sync(); }); }; auto update = [=](typename concurrency::streams::char_traits::int_type ch) mutable { if (ch == concurrency::streams::char_traits::eof()) return false; if (ch == '\n') return false; if (ch == '\r') { _locals->saw_CR = true; return true; } _locals->outbuf[_locals->write_pos] = static_cast(ch); _locals->write_pos += 1; if (_locals->is_full()) { // Flushing synchronously because performance is terrible if we // schedule an empty task. This isn't on a user's thread. flush().wait(); } return true; }; auto update_after_cr = [=] (typename concurrency::streams::char_traits::int_type ch) mutable -> pplx::task { if (ch == concurrency::streams::char_traits::eof()) return pplx::task_from_result(false); if (ch == '\n') { return buffer.bumpc().then([]( #ifndef _WIN32 // Required by GCC typename #endif concurrency::streams::char_traits::int_type) { return false; }); } return pplx::task_from_result(false); }; auto loop = pplx::details::do_while([=]() mutable -> pplx::task { while ( buffer.in_avail() > 0 ) { #ifndef _WIN32 // Required by GCC, because concurrency::streams::char_traits is a dependent scope typename #endif concurrency::streams::char_traits::int_type ch; if (_locals->saw_CR) { ch = buffer.sgetc(); if (ch == '\n') buffer.sbumpc(); return pplx::task_from_result(false); } ch = buffer.sbumpc(); if (ch == req_async) break; if (!update(ch)) { return pplx::task_from_result(false); } } if (_locals->saw_CR) { return buffer.getc().then(update_after_cr); } return buffer.bumpc().then(update); }); return loop.then([=](bool) mutable { return flush().then([=] { return _locals->total; }); }); } /// /// Read until reaching the end of the stream. /// /// An asynchronous stream buffer supporting write operations. /// The number of characters read. pplx::task read_to_end(streams::streambuf target) const { pplx::task result; if ( !_verify_and_return_task("stream not set up for output of data", result) ) return result; if ( !target.can_write() ) return pplx::task_from_exception(std::make_exception_ptr(std::runtime_error("source buffer not set up for input of data"))); auto l_buffer = helper()->m_buffer; auto l_buf_size = this->buf_size; std::shared_ptr<_read_helper> l_locals = std::make_shared<_read_helper>(); auto copy_to_target = [l_locals, target, l_buffer, l_buf_size]() mutable -> pplx::task { // We need to capture these, because the object itself may go away // before we're done processing the data. //auto locs = _locals; //auto trg = target; return l_buffer.getn(l_locals->outbuf, l_buf_size).then([=](size_t rd) mutable -> pplx::task { if (rd == 0) return pplx::task_from_result(false); // Must be nested to capture rd return target.putn_nocopy(l_locals->outbuf, rd).then([target, l_locals, rd](size_t wr) mutable -> pplx::task { l_locals->total += wr; if (rd != wr) // Number of bytes written is less than number of bytes received. throw std::runtime_error("failed to write all bytes"); return target.sync().then([]() { return true; }); }); }); }; auto loop = pplx::details::do_while(copy_to_target); return loop.then([=](bool) mutable -> size_t { return l_locals->total; }); } /// /// Seeks to the specified write position. /// /// An offset relative to the beginning of the stream. /// The new position in the stream. pos_type seek(pos_type pos) const { _verify_and_throw(details::_in_stream_msg); return helper()->m_buffer.seekpos(pos, std::ios_base::in); } /// /// Seeks to the specified write position. /// /// An offset relative to the beginning, current write position, or the end of the stream. /// The starting point (beginning, current, end) for the seek. /// The new position in the stream. pos_type seek(off_type off, std::ios_base::seekdir way) const { _verify_and_throw(details::_in_stream_msg); return helper()->m_buffer.seekoff(off, way, std::ios_base::in); } /// /// Get the current write position, i.e. the offset from the beginning of the stream. /// /// The current write position. pos_type tell() const { _verify_and_throw(details::_in_stream_msg); return helper()->m_buffer.getpos(std::ios_base::in); } /// /// can_seek is used to determine whether the stream supports seeking. /// /// true if the stream supports seeking, false otherwise. bool can_seek() const { return is_valid() && m_helper->m_buffer.can_seek(); } /// /// Test whether the stream has been initialized with a valid stream buffer. /// bool is_valid() const { return (m_helper != nullptr) && ((bool)m_helper->m_buffer); } /// /// Test whether the stream has been initialized or not. /// operator bool() const { return is_valid(); } /// /// Test whether the stream is open for writing. /// /// true if the stream is open for writing, false otherwise. bool is_open() const { return is_valid() && m_helper->m_buffer.can_read(); } /// /// Get the underlying stream buffer. /// concurrency::streams::streambuf streambuf() const { return helper()->m_buffer; } /// /// Read a value of type T from the stream. /// /// /// Supports the C++ primitive types. Can be expanded to additional types /// by adding template specializations for type_parser. /// /// /// The data type of the element to be read from the stream. /// /// A task that holds the element read from the stream. template pplx::task extract() const { pplx::task result; if ( !_verify_and_return_task(details::_in_stream_msg, result) ) return result; return type_parser::parse(helper()->m_buffer); } private: template bool _verify_and_return_task(const char *msg, pplx::task &tsk) const { auto buffer = helper()->m_buffer; if ( !(buffer.exception() == nullptr) ) { tsk = pplx::task_from_exception(buffer.exception()); return false; } if ( !buffer.can_read() ) { tsk = pplx::task_from_exception(std::make_exception_ptr(std::runtime_error(msg))); return false; } return true; } void _verify_and_throw(const char *msg) const { auto buffer = helper()->m_buffer; if ( !(buffer.exception() == nullptr) ) std::rethrow_exception(buffer.exception()); if ( !buffer.can_read() ) throw std::runtime_error(msg); } std::shared_ptr> helper() const { if ( !m_helper ) throw std::logic_error("uninitialized stream object"); return m_helper; } static const size_t buf_size = 16*1024; struct _read_helper { size_t total; CharType outbuf[buf_size]; size_t write_pos; bool saw_CR; bool is_full() const { return write_pos == buf_size; } _read_helper() : total(0), write_pos(0), saw_CR(false) { } }; std::shared_ptr> m_helper; }; typedef basic_ostream ostream; typedef basic_istream istream; typedef basic_ostream wostream; typedef basic_istream wistream; template pplx::task concurrency::streams::_type_parser_base::_skip_whitespace(streams::streambuf buffer) { int_type req_async = concurrency::streams::char_traits::requires_async(); auto update = [=] (int_type ch) mutable { if (isspace(ch)) { if (buffer.sbumpc() == req_async) { // Synchronously because performance is terrible if we // schedule an empty task. This isn't on a user's thread. buffer.nextc().wait(); } return true; } return false; }; auto loop = pplx::details::do_while([=]() mutable -> pplx::task { while (buffer.in_avail() > 0) { int_type ch = buffer.sgetc(); if (ch == req_async) break; if (!update(ch)) { return pplx::task_from_result(false); } } return buffer.getc().then(update); }); return loop.then([=](pplx::task op) { op.wait(); }); } template template pplx::task concurrency::streams::_type_parser_base::_parse_input( concurrency::streams::streambuf buffer, AcceptFunctor accept_character, ExtractFunctor extract) { std::shared_ptr state = std::make_shared(); auto update = [=] (pplx::task op) -> pplx::task { int_type ch = op.get(); if (ch == concurrency::streams::char_traits::eof()) return pplx::task_from_result(false); bool accptd = accept_character(state, ch); if (!accptd) return pplx::task_from_result(false); // We peeked earlier, so now we must advance the position. concurrency::streams::streambuf buf = buffer; return buf.bumpc().then([](int_type) { return true; }); }; auto peek_char = [=]() -> pplx::task { concurrency::streams::streambuf buf = buffer; // If task results are immediately available, there's little need to use ".then()," // so optimize for prompt values. auto get_op = buf.getc(); while (get_op.is_done()) { auto condition = update(get_op); if (!condition.is_done() || !condition.get()) return condition; get_op = buf.getc(); } return get_op.then(update); }; auto finish = [=](pplx::task op) -> pplx::task { op.wait(); pplx::task result = extract(state); return result; }; return _skip_whitespace(buffer).then([=](pplx::task op) -> pplx::task { op.wait(); return pplx::details::do_while(peek_char).then(finish); }); } template class type_parser> : public _type_parser_base { typedef typename _type_parser_base::int_type int_type; public: static pplx::task parse(streams::streambuf buffer) { return concurrency::streams::_type_parser_base::template _parse_input, std::string>(buffer, _accept_char, _extract_result); } private: static bool _accept_char(std::shared_ptr> state, int_type ch) { if ( ch == concurrency::streams::char_traits::eof() || isspace(ch)) return false; state->push_back(CharType(ch)); return true; } static pplx::task> _extract_result(std::shared_ptr> state) { return pplx::task_from_result(*state); } }; template class type_parser : public _type_parser_base { public: typedef typename _type_parser_base::int_type int_type; static pplx::task parse(streams::streambuf buffer) { return _type_parser_base::template _parse_input<_int64_state, int64_t>(buffer, _accept_char, _extract_result); } private: struct _int64_state { _int64_state() : result(0), correct(false), minus(0) {} int64_t result; bool correct; char minus; // 0 -- no sign, 1 -- plus, 2 -- minus }; static bool _accept_char(std::shared_ptr<_int64_state> state, int_type ch) { if ( ch == concurrency::streams::char_traits::eof()) return false; if ( state->minus == 0 ) { // OK to find a sign. if ( !::isdigit(ch) && ch != int_type('+') && ch != int_type('-') ) return false; } else { if ( !::isdigit(ch) ) return false; } // At least one digit was found. state->correct = true; if ( ch == int_type('+') ) { state->minus = 1; } else if ( ch == int_type('-') ) { state->minus = 2; } else { if (state->minus == 0) state->minus = 1; // Shift the existing value by 10, then add the new value. bool positive = state->result >= 0; state->result *= 10; state->result += int64_t(ch-int_type('0')); if ( (state->result >= 0) != positive ) { state->correct = false; return false; } } return true; } static pplx::task _extract_result(std::shared_ptr<_int64_state> state) { if (!state->correct) throw std::range_error("integer value is too large to fit in 64 bits"); int64_t result = (state->minus == 2) ? -state->result : state->result; return pplx::task_from_result(result); } }; template struct _double_state { _double_state() : result(0), minus(0), after_comma(0), exponent(false), exponent_number(0), exponent_minus(0), complete(false), p_exception_string() {} FloatingPoint result; char minus; // 0 -- no sign, 1 -- plus, 2 -- minus int after_comma; bool exponent; int exponent_number; char exponent_minus; // 0 -- no sign, 1 -- plus, 2 -- minus bool complete; std::string p_exception_string; }; template static std::string create_exception_message(int_type ch, bool exponent) { std::ostringstream os; os << "Invalid character '" << char(ch) << "'" << (exponent ? " in exponent" : ""); return os.str(); } template static bool _accept_char(std::shared_ptr<_double_state> state, int_type ch) { if ( state->minus == 0 ) { if ( !::isdigit(ch) && ch != int_type('.') && ch != int_type('+') && ch != int_type('-') ) { if (!state->complete) state->p_exception_string = create_exception_message(ch, false); return false; } } else { if (!state->exponent && !::isdigit(ch) && ch != int_type('.') && ch != int_type('E') && ch != int_type('e')) { if (!state->complete) state->p_exception_string = create_exception_message(ch, false); return false; } if (state->exponent && !::isdigit(ch) && ch != int_type('+') && ch != int_type('-')) { if (!state->complete) state->p_exception_string = create_exception_message(ch, true); return false; } } switch (ch) { case int_type('+') : state->complete = false; if (state->exponent) { if (state->exponent_minus != 0) { state->p_exception_string = "The exponent sign already set"; return false; } state->exponent_minus = 1; } else { state->minus = 1; } break; case int_type('-') : state->complete = false; if (state->exponent) { if (state->exponent_minus != 0) { state->p_exception_string = "The exponent sign already set"; return false; } state->exponent_minus = 2; } else { state->minus = 2; } break; case int_type('.') : state->complete = false; if (state->after_comma > 0) return false; state->after_comma = 1; break; case int_type('E') : case int_type('e') : state->complete = false; if (state->exponent) return false; state->exponent_number = 0; state->exponent = true; break; default: state->complete = true; if (!state->exponent) { if (state->minus == 0) state->minus = 1; state->result *= 10; state->result += int64_t(ch-int_type('0')); if (state->after_comma > 0) state->after_comma++; } else { if (state->exponent_minus == 0) state->exponent_minus = 1; state->exponent_number *= 10; state->exponent_number += int64_t(ch-int_type('0')); } } return true; } template static pplx::task _extract_result(std::shared_ptr<_double_state> state) { if (state->p_exception_string.length() > 0) throw std::runtime_error(state->p_exception_string.c_str()); if (!state->complete && state->exponent) throw std::runtime_error("Incomplete exponent"); FloatingPoint result = static_cast((state->minus == 2) ? -state->result : state->result); if (state->exponent_minus == 2) state->exponent_number = 0 - state->exponent_number; if (state->after_comma > 0) state->exponent_number -= state->after_comma-1; if (state->exponent_number >= 0) { result *= pow(FloatingPoint(10.0), state->exponent_number); #pragma push_macro ("max") #undef max if (result > std::numeric_limits::max() || result < -std::numeric_limits::max()) throw std::overflow_error("The value is too big"); #pragma pop_macro ("max") } else { bool is_zero = (result == 0); result /= pow(FloatingPoint(10.0), -state->exponent_number); if (!is_zero && result > -std::numeric_limits::denorm_min() && result < std::numeric_limits::denorm_min()) throw std::underflow_error("The value is too small"); } return pplx::task_from_result(result); } template class type_parser : public _type_parser_base { public: typedef typename _type_parser_base::int_type int_type; static pplx::task parse(streams::streambuf buffer) { return _type_parser_base::template _parse_input<_double_state, double>(buffer, _accept_char, _extract_result); } protected: }; template class type_parser : public _type_parser_base { public: typedef typename _type_parser_base::int_type int_type; static pplx::task parse(streams::streambuf buffer) { return _type_parser_base::template _parse_input<_double_state, float>(buffer, _accept_char, _extract_result); } protected: }; template class type_parser : public _type_parser_base { public: typedef typename _type_parser_base::int_type int_type; static pplx::task parse(streams::streambuf buffer) { return _type_parser_base::template _parse_input<_uint64_state,uint64_t>(buffer, _accept_char, _extract_result); } private: struct _uint64_state { _uint64_state() : result(0), correct(false) {} uint64_t result; bool correct; }; static bool _accept_char(std::shared_ptr<_uint64_state> state, int_type ch) { if ( !::isdigit(ch) ) return false; // At least one digit was found. state->correct = true; // Shift the existing value by 10, then add the new value. state->result *= 10; state->result += uint64_t(ch-int_type('0')); return true; } static pplx::task _extract_result(std::shared_ptr<_uint64_state> state) { if (!state->correct) throw std::range_error("integer value is too large to fit in 64 bits"); return pplx::task_from_result(state->result); } }; template class type_parser : public _type_parser_base { public: typedef typename _type_parser_base::int_type int_type; static pplx::task parse(streams::streambuf buffer) { return _type_parser_base::template _parse_input<_bool_state,bool>(buffer, _accept_char, _extract_result); } private: struct _bool_state { _bool_state() : state(0) { } // { 0 -- not started, 1 -- 't', 2 -- 'tr', 3 -- 'tru', 4 -- 'f', 5 -- 'fa', 6 -- 'fal', 7 -- 'fals', 8 -- 'true', 9 -- 'false' } short state; }; static bool _accept_char(std::shared_ptr<_bool_state> state, int_type ch) { switch (state->state) { case 0: if ( ch == int_type('t') ) state->state = 1; else if ( ch == int_type('f') ) state->state = 4; else if ( ch == int_type('1') ) state->state = 8; else if ( ch == int_type('0') ) state->state = 9; else return false; break; case 1: if ( ch == int_type('r') ) state->state = 2; else return false; break; case 2: if ( ch == int_type('u') ) state->state = 3; else return false; break; case 3: if ( ch == int_type('e') ) state->state = 8; else return false; break; case 4: if ( ch == int_type('a') ) state->state = 5; else return false; break; case 5: if ( ch == int_type('l') ) state->state = 6; else return false; break; case 6: if ( ch == int_type('s') ) state->state = 7; else return false; break; case 7: if ( ch == int_type('e') ) state->state = 9; else return false; break; case 8: case 9: return false; } return true; } static pplx::task _extract_result(std::shared_ptr<_bool_state> state) { bool correct = (state->state == 8 || state->state == 9); if (!correct) { std::runtime_error exc("cannot parse as Boolean value"); throw exc; } return pplx::task_from_result(state->state == 8); } }; template class type_parser : public _type_parser_base { typedef typename concurrency::streams::streambuf::int_type int_type; public: static pplx::task parse(streams::streambuf buffer) { return _type_parser_base::_skip_whitespace(buffer).then( [=](pplx::task op) -> pplx::task { op.wait(); return type_parser::_get_char(buffer); }); } private: static pplx::task _get_char(streams::streambuf buffer) { concurrency::streams::streambuf buf = buffer; return buf.bumpc().then( [=](pplx::task::int_type> op) -> signed char { int_type val = op.get(); if (val == concurrency::streams::char_traits::eof()) throw std::runtime_error("reached end-of-stream while constructing a value"); return static_cast(val); }); } }; template class type_parser : public _type_parser_base { typedef typename concurrency::streams::streambuf::int_type int_type; public: static pplx::task parse(streams::streambuf buffer) { return _type_parser_base::_skip_whitespace(buffer).then( [=](pplx::task op) -> pplx::task { op.wait(); return type_parser::_get_char(buffer); }); } private: static pplx::task _get_char(streams::streambuf buffer) { concurrency::streams::streambuf buf = buffer; return buf.bumpc().then( [=](pplx::task::int_type> op) -> unsigned char { int_type val = op.get(); if (val == concurrency::streams::char_traits::eof()) throw std::runtime_error("reached end-of-stream while constructing a value"); return static_cast(val); }); } }; template class type_parser : public _type_parser_base { typedef typename concurrency::streams::streambuf::int_type int_type; public: static pplx::task parse(streams::streambuf buffer) { return _type_parser_base::_skip_whitespace(buffer).then( [=](pplx::task op) -> pplx::task { op.wait(); return _get_char(buffer); }); } private: static pplx::task _get_char(streams::streambuf buffer) { concurrency::streams::streambuf buf = buffer; return buf.bumpc().then( [=](pplx::task::int_type> op) -> char { int_type val = op.get(); if (val == concurrency::streams::char_traits::eof()) throw std::runtime_error("reached end-of-stream while constructing a value"); return char(val); }); } }; #ifdef _WIN32 template<> class type_parser> : public _type_parser_base { public: static pplx::task parse(streams::streambuf buffer) { return _parse_input,std::basic_string>(buffer, _accept_char, _extract_result); } private: static bool _accept_char(const std::shared_ptr> &state, int_type ch) { if ( ch == concurrency::streams::char_traits::eof() || isspace(ch)) return false; state->push_back(char(ch)); return true; } static pplx::task> _extract_result(std::shared_ptr> state) { return pplx::task_from_result(utility::conversions::utf8_to_utf16(*state)); } }; template<> class type_parser> : public _type_parser_base { public: static pplx::task parse(streams::streambuf buffer) { return _parse_input,std::basic_string>(buffer, _accept_char, _extract_result); } private: static bool _accept_char(const std::shared_ptr> &state, int_type ch) { if ( ch == concurrency::streams::char_traits::eof() || isspace(ch)) return false; state->push_back(char(ch)); return true; } static pplx::task> _extract_result(std::shared_ptr> state) { return pplx::task_from_result(utility::conversions::utf8_to_utf16(*state)); } }; template<> class type_parser> : public _type_parser_base { public: static pplx::task parse(streams::streambuf buffer) { return _parse_input,std::basic_string>(buffer, _accept_char, _extract_result); } private: static bool _accept_char(const std::shared_ptr> &state, int_type ch) { if ( ch == concurrency::streams::char_traits::eof() || isspace(ch)) return false; state->push_back(char(ch)); return true; } static pplx::task> _extract_result(std::shared_ptr> state) { return pplx::task_from_result(utility::conversions::utf8_to_utf16(*state)); } }; #endif //_WIN32 }} #endif