Commit dfd1837c authored by William Woodall's avatar William Woodall
Browse files

Serial Listener changes compile against the example reference, time to merge with John.

parent 7c0c9760
......@@ -36,10 +36,10 @@ int main(void) {
// Method #2:
// comparator - blocking
{
BlockingFilter f2 =
BlockingFilterPtr f2 =
listener.createBlockingFilter(SerialListener::endsWith("post"));
for (size_t i = 0; i < 3; i++) {
std::string token = f2.wait(100); // Wait for 100 ms or a matched token
std::string token = f2->wait(100); // Wait for 100 ms or a matched token
if (token == "")
std::cout << "Found something ending with 'post'" << std::endl;
else
......@@ -54,18 +54,18 @@ int main(void) {
// comparator, token buffer size - blocking
{
// Give it a comparator, then a buffer size of 10
BufferedFilter f3 =
BufferedFilterPtr f3 =
listener.createBufferedFilter(SerialListener::contains("substr"), 10);
SerialListener::sleep(75); // Sleep 75ms, should have about 7
std::cout << "Caught " << f3.count();
std::cout << "Caught " << f3->count();
std::cout << " tokens containing 'substr'" << std::endl;
for(size_t i = 0; i < 20; ++i) {
std::string token = f3.wait(5); // Pull message from the buffer
std::string token = f3->wait(5); // Pull message from the buffer
if (token == "") // If an empty string is returned, a timeout occured
break;
}
f3.clear(); // Empties the buffer
if (f3.wait(0) == "") // Non-blocking wait
f3->clear(); // Empties the buffer
if (f3->wait(0) == "") // Non-blocking wait
std::cout << "We won the race condition!" << std::endl;
else
std::cout << "We lost the race condition..." << std::endl;
......
......@@ -178,160 +178,31 @@ public:
*/
typedef boost::shared_ptr<Filter> FilterPtr;
class BlockingFilter;
/*!
* This is the a filter that provides a wait function for blocking until a
* match is found.
*
* This should probably not be created manually, but instead should be
* constructed using SerialListener::createBlockingFilter(ComparatorType)
* function which returns a BlockingFilter instance.
* Shared Pointer of BlockingFilter, returned by
* SerialListener::createBlockingFilter.
*
* \see serial::SerialListener::ComparatorType,
* serial::SerialListener::createBlockingFilter
* \see serial::BlockingFilter, SerialListener::createBlockingFilter
*/
class BlockingFilter
{
public:
BlockingFilter (ComparatorType comparator,
boost::shared_ptr<SerialListener> listener)
: listener(listener)
{
DataCallback cb = boost::bind(&BlockingFilter::callback, this, _1);
this->filter_ptr = listener.createFilter(comparator, cb);
}
virtual ~BlockingFilter () {
this->listener.removeFilter(filter_ptr);
this->result = "";
this->cond.notify_all();
}
/*!
* Waits a given number of milliseconds or until a token is matched. If a
* token is matched it is returned, otherwise an empty string is returned.
*
* \param ms Time in milliseconds to wait on a new token.
*
* \return std::string token that was matched or "" if none were matched.
*/
std::string wait(size_t ms) {
this->result = "";
boost::unique_lock<boost::mutex> lock(this->mutex);
this->cond.timed_wait(lock, boost::posix_time::milliseconds(ms));
return this->result;
}
typedef boost::shared_ptr<BlockingFilter> BlockingFilterPtr;
private:
void callback(const std::string& token) {
this->cond.notify_all();
this->result = token;
}
FilterPtr filter_ptr;
boost::shared_ptr<SerialListener> listener;
boost::condition_variable cond;
boost::mutex mutex;
std::string result;
};
class BufferedFilter;
/*!
* This is the a filter that provides a wait function for blocking until a
* match is found. It will also buffer up to a given buffer size of tokens so
* that they can be counted or accessed after they are matched by the filter.
* Shared Pointer of BufferedFilter, returned by
* SerialListener::createBufferedFilter.
*
* This should probably not be created manually, but instead should be
* constructed using SerialListener::createBufferedFilter(ComparatorType)
* function which returns a BufferedFilter instance.
*
* The internal buffer is a circular queue buffer, so when the buffer is full,
* the oldest token is dropped and the new one is added. Additionally, when
* wait is a called the oldest available token is returned.
*
* \see serial::SerialListener::ComparatorType,
* serial::SerialListener::createBufferedFilter
* \see serial::BufferedFilter, SerialListener::createBufferedFilter
*/
class BufferedFilter
{
public:
BufferedFilter (ComparatorType comparator, size_t buffer_size,
boost::shared_ptr<SerialListener> listener)
: listener(listener), buffer_size(buffer_size)
{
DataCallback cb = boost::bind(&BlockingFilter::callback, this, _1);
this->filter_ptr = listener.createFilter(comparator, cb);
}
virtual ~BufferedFilter () {
this->listener.removeFilter(filter_ptr);
this->queue.clear();
this->result = "";
this->cond.notify_all();
}
/*!
* Waits a given number of milliseconds or until a matched token is
* available in the buffer. If a token is matched it is returned, otherwise
* an empty string is returned.
*
* \param ms Time in milliseconds to wait on a new token. If ms is set to 0
* then it will try to get a new token if one is available but will not
* block.
*
* \return std::string token that was matched or "" if none were matched.
*/
std::string wait(size_t ms) {
if (ms == 0)
if (!this->queue.try_pop(this->result))
this->result = "";
else
if (!this->queue.timed_wait_and_pop(this->result, ms))
this->result = "";
return result;
}
/*!
* Clears the buffer of any tokens.
*/
void clear() {
queue.clear();
}
/*!
* Returns the number of tokens waiting in the buffer.
*/
size_t count() {
return queue.size();
}
/*!
* Returns the capacity of the buffer.
*/
size_t capacity() {
return buffer_size;
}
private:
void callback(const std::string &token) {
std::string throw_away;
if (this->queue.size() == buffer_size)
this->queue.wait_and_pop(throw_away);
this->queue.push(token);
}
FilterPtr filter_ptr;
size_t buffer_size;
boost::shared_ptr<SerialListener> listener;
ConcurrentQueue<std::string> queue;
std::string result;
};
typedef boost::shared_ptr<BufferedFilter> BufferedFilterPtr;
/*!
* This is a general exception generated by the SerialListener class.
*
* Check the SerialListenerException::what function for the cause.
*
* \param e_what is a std::string that describes the cause of the error.
*/
class SerialListenerException : public std::exception {
......@@ -489,98 +360,115 @@ public:
/***** Filter Functions ******/
/*!
* Setups up a filter that calls a callback when a comparator returns true.
* Creates a filter that calls a callback when the comparator returns true.
*
* The user provides a comparator and a callback, and every time a line is
* received the comparator is called and the comparator has to evaluate the
* line and return true if it matches and false if it doesn't. If it does
* The user provides a comparator and a callback, and every time a line is
* received the comparator is called and the comparator has to evaluate the
* line and return true if it matches and false if it doesn't. If it does
* match, the callback is called with the resulting line.
*
* \param comparator This is a comparator for detecting if a line matches.
* The comparartor receives a std::string reference and must return a true
* The comparartor receives a std::string reference and must return a true
* if it matches and false if it doesn't.
*
* \param callback This is the handler for when a match occurs. It is given
* \param callback This is the handler for when a match occurs. It is given
* a std::string reference of the line that matched your comparator.
*
* \return boost::shared_ptr<Filter> so you can remove it later.
*
* \see SerialListener::stopListeningFor
* \see SerialListener::removeFilter
*/
FilterPtr
listenFor (ComparatorType comparator, DataCallback callback);
createFilter (ComparatorType comparator, DataCallback callback);
/*!
* Blocks until the comparator returns true or until the timeout occurs.
* Creates a BlockingFilter which blocks until the comparator returns true.
*
* The user provides a comparator, and every time a line is
* received the comparator is called and the comparator has to evaluate the
* line and return true if it matches and false if it doesn't. If it does
* match, any threads that have called BlockingFilter::wait will be
* notified. The BlockingFilter will remove itself when its destructor is
* called, i.e. when it leaves the scope, so in those cases an explicit call
* to SerialListener::removeFilter is not needed.
*
* \param comparator ComparatorType function that should return true if the
* given std::string matches otherwise false.
* \param comparator This is a comparator for detecting if a line matches.
* The comparartor receives a std::string reference and must return a true
* if it matches and false if it doesn't.
*
* \param timeout in milliseconds before timing out and returning false.
* Defaults to 1000 milliseconds or 1 second.
* \return BlockingFilterPtr So you can call BlockingFilter::wait on it.
*
* \return std::string the token that was matched, returns an empty string
* if the timeout occurs first.
* i.e. if (listenForOnce(...) != "") // Got match
* \see SerialListener::removeFilter, serial::BlockingFilter,
* serial::BlockingFilterPtr
*/
std::string
listenForOnce (ComparatorType comparator, size_t timeout = 1000);
BlockingFilterPtr
createBlockingFilter (ComparatorType comparator);
/*!
* Writes to the seiral port then blocks until the comparator returns true
* or until the timeout occurs.
* Creates a BlockingFilter blocks until the comparator returns true.
*
* The user provides a comparator, and every time a line is
* received the comparator is called and the comparator has to evaluate the
* line and return true if it matches and false if it doesn't. If it does
* match, any threads that have called BlockingFilter::wait will be
* notified. The BlockingFilter will remove itself when its destructor is
* called, i.e. when it leaves the scope, so in those cases an explicit call
* to SerialListener::removeFilter is not needed.
*
* This function creates a filter, writes the data, then waits for it to
* match atleast once.
* \param comparator This is a comparator for detecting if a line matches.
* The comparartor receives a std::string reference and must return a true
* if it matches and false if it doesn't.
*
* \param to_be_written const std::string reference of data to be written to
* the serial port.
* \param buffer_size This is the number of tokens to be buffered by the
* BufferedFilter, defaults to 1024.
*
* \param comparator ComparatorType function that should return true if the
* given std::string matches otherwise false.
* \return BlockingFilter So you can call BlockingFilter::wait on it.
*
* \see SerialListener::removeFilter, serial::BufferedFilter,
* serial::BufferedFilterPtr
*/
BufferedFilterPtr
createBufferedFilter (ComparatorType comparator, size_t buffer_size = 1024);
/*!
* Removes a filter by a given FilterPtr.
*
* \param timeout in milliseconds before timing out and returning false.
* Defaults to 1000 milliseconds or 1 second.
* \param filter_ptr A shared pointer to the filter to be removed.
*
* \return std::string the token that was matched, returns an empty string
* if the timeout occurs first.
* i.e. if (listenForOnce(...) != "") // Got match
* \see SerialListener::createFilter
*/
std::string
listenForOnce (ComparatorType comparator, size_t timeout = 1000);
void
removeFilter (FilterPtr filter_ptr);
/*!
* Blocks until the given string is detected or until the timeout occurs.
* Removes a BlockingFilter.
*
* \param token std::string that should be watched for, this string must
* match the message exactly.
* The BlockingFilter will remove itself if the destructor is called.
*
* \param timeout in milliseconds before timing out and returning false.
* Defaults to 1000 milliseconds or 1 second.
* \param blocking_filter A BlockingFilter to be removed.
*
* \return bool If true then the token was detected before the token, false
* if the token was not heard and the timeout occured.
* \see SerialListener::createBlockingFilter
*/
bool
listenForStringOnce (std::string token, size_t timeout = 1000);
void
removeFilter (BlockingFilterPtr blocking_filter);
/*!
* Removes a filter by a given uuid.
* Removes a BufferedFilter.
*
* The uuid for a filter is returned by the listenFor function.
* The BufferedFilter will remove itself if the destructor is called.
*
* \param filter_ptr A shared pointer to the filter.
* \param buffered_filter A BufferedFilter to be removed.
*
* \see SerialListener::listenFor
* \see SerialListener::createBufferedFilter
*/
void
stopListeningFor (FilterPtr filter_ptr);
void
removeFilter (BufferedFilterPtr buffered_filter);
/*!
* Stops listening for anything, but doesn't stop reading the serial port.
* Removes all filters.
*/
void
stopListeningForAll ();
removeAllFilters ();
/***** Hooks and Handlers ******/
......@@ -912,6 +800,156 @@ private:
};
}
/*!
* This is the a filter that provides a wait function for blocking until a
* match is found.
*
* This should probably not be created manually, but instead should be
* constructed using SerialListener::createBlockingFilter(ComparatorType)
* function which returns a BlockingFilter instance.
*
* \see serial::SerialListener::ComparatorType,
* serial::SerialListener::createBlockingFilter
*/
class BlockingFilter
{
public:
BlockingFilter (ComparatorType comparator,
boost::shared_ptr<SerialListener> listener)
: listener(listener)
{
DataCallback cb = boost::bind(&BlockingFilter::callback, this, _1);
this->filter_ptr = listener->createFilter(comparator, cb);
}
virtual ~BlockingFilter () {
this->listener->removeFilter(filter_ptr);
this->result = "";
this->cond.notify_all();
}
/*!
* Waits a given number of milliseconds or until a token is matched. If a
* token is matched it is returned, otherwise an empty string is returned.
*
* \param ms Time in milliseconds to wait on a new token.
*
* \return std::string token that was matched or "" if none were matched.
*/
std::string wait(size_t ms) {
this->result = "";
boost::unique_lock<boost::mutex> lock(this->mutex);
this->cond.timed_wait(lock, boost::posix_time::milliseconds(ms));
return this->result;
}
FilterPtr filter_ptr;
void callback(const std::string& token) {
this->cond.notify_all();
this->result = token;
}
private:
boost::shared_ptr<SerialListener> listener;
boost::condition_variable cond;
boost::mutex mutex;
std::string result;
};
/*!
* This is the a filter that provides a wait function for blocking until a
* match is found. It will also buffer up to a given buffer size of tokens so
* that they can be counted or accessed after they are matched by the filter.
*
* This should probably not be created manually, but instead should be
* constructed using SerialListener::createBufferedFilter(ComparatorType)
* function which returns a BufferedFilter instance.
*
* The internal buffer is a circular queue buffer, so when the buffer is full,
* the oldest token is dropped and the new one is added. Additionally, when
* wait is a called the oldest available token is returned.
*
* \see serial::SerialListener::ComparatorType,
* serial::SerialListener::createBufferedFilter
*/
class BufferedFilter
{
public:
BufferedFilter (ComparatorType comparator, size_t buffer_size,
boost::shared_ptr<SerialListener> listener)
: listener(listener), buffer_size(buffer_size)
{
DataCallback cb = boost::bind(&BufferedFilter::callback, this, _1);
this->filter_ptr = listener->createFilter(comparator, cb);
}
virtual ~BufferedFilter () {
this->listener->removeFilter(filter_ptr);
this->queue.clear();
this->result = "";
}
/*!
* Waits a given number of milliseconds or until a matched token is
* available in the buffer. If a token is matched it is returned, otherwise
* an empty string is returned.
*
* \param ms Time in milliseconds to wait on a new token. If ms is set to 0
* then it will try to get a new token if one is available but will not
* block.
*
* \return std::string token that was matched or "" if none were matched.
*/
std::string wait(size_t ms) {
if (ms == 0)
if (!this->queue.try_pop(this->result))
this->result = "";
else
if (!this->queue.timed_wait_and_pop(this->result, ms))
this->result = "";
return result;
}
/*!
* Clears the buffer of any tokens.
*/
void clear() {
queue.clear();
}
/*!
* Returns the number of tokens waiting in the buffer.
*/
size_t count() {
return queue.size();
}
/*!
* Returns the capacity of the buffer.
*/
size_t capacity() {
return buffer_size;
}
FilterPtr filter_ptr;
void callback(const std::string &token) {
std::string throw_away;
if (this->queue.size() == buffer_size)
this->queue.wait_and_pop(throw_away);
this->queue.push(token);
}
private:
size_t buffer_size;
boost::shared_ptr<SerialListener> listener;
ConcurrentQueue<std::string> queue;
std::string result;
};
} // namespace serial
#endif // SERIAL_LISTENER_H
\ No newline at end of file
......@@ -111,7 +111,7 @@ SerialListener::stopListening() {
this->serial_port = NULL;
// Delete all the filters
this->stopListeningForAll();
this->removeAllFilters();
}
size_t
......@@ -186,7 +186,8 @@ SerialListener::listen() {
/***** Filter Functions *****/
FilterPtr
SerialListener::listenFor(ComparatorType comparator, DataCallback callback) {
SerialListener::createFilter(ComparatorType comparator, DataCallback callback)
{
FilterPtr filter_ptr(new Filter(comparator, callback));
boost::mutex::scoped_lock l(filter_mux);
......@@ -195,50 +196,40 @@ SerialListener::listenFor(ComparatorType comparator, DataCallback callback) {
return filter_ptr;
}
typedef boost::shared_ptr<boost::condition_variable> shared_cond_var_ptr_t;
BlockingFilterPtr
SerialListener::createBlockingFilter(ComparatorType comparator) {
return BlockingFilterPtr(
new BlockingFilter(comparator, boost::shared_ptr<SerialListener>(this)));
}
inline void
listenForOnceCallback(const std::string &token,
shared_cond_var_ptr_t cond,
boost::shared_ptr<std::string> result)
BufferedFilterPtr
SerialListener::createBufferedFilter(ComparatorType comparator,
size_t buffer_size)
{
(*result) = token;
cond->notify_all();
return BufferedFilterPtr(
new BufferedFilter(comparator,
buffer_size,
boost::shared_ptr<SerialListener>(this)));
}
std::string
SerialListener::listenForOnce(ComparatorType comparator, size_t ms) {
boost::shared_ptr<std::string> result(new std::string(""));
shared_cond_var_ptr_t cond(new boost::condition_variable());
boost::mutex mutex;
DataCallback callback = boost::bind(listenForOnceCallback,_1,cond,result);
FilterPtr filter_id = this->listenFor(comparator, callback);
boost::unique_lock<boost::mutex> lock(mutex);
cond->timed_wait(lock, boost::posix_time::milliseconds(ms)));
this->stopListeningFor(filter_id);
// If the callback never got called then result will be "" because tokens
// can never be ""
return (*result);
void
SerialListener::removeFilter(FilterPtr filter_ptr) {
boost::mutex::scoped_lock l(filter_mux);
filters.erase(std::find(filters.begin(),filters.end(),filter_ptr));
}
bool
SerialListener::listenForStringOnce(std::string token, size_t milliseconds) {
return this->listenForOnce(exactly(token), milliseconds) == token;
void
SerialListener::removeFilter(BlockingFilterPtr blocking_filter) {
this->removeFilter(blocking_filter->filter_ptr);
}
void
SerialListener::stopListeningFor(FilterPtr filter_ptr) {
boost::mutex::scoped_lock l(filter_mux);
filters.erase(std::find(filters.begin(),filters.end(),filter_ptr));
SerialListener::removeFilter(BufferedFilterPtr buffered_filter) {
this->removeFilter(buffered_filter->filter_ptr);
}
void
SerialListener::stopListeningForAll() {
SerialListener::removeAllFilters() {
boost::mutex::scoped_lock l(filter_mux);
filters.clear();
callback_queue.clear();
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment