From: Larry on
Hi,

I am getting this close to finish my tiny streaming server...having said
that I have a problem with the following code. It basically fires a runtime
error when I disconect from the sever! (closing the telnet window)

I wound up finding out that the error may be fired because of this line:

WaitForSingleObject(eventi[threadid], INFINITE);

If I replaceit with: Sleep(1000) everything goes ok....


/*
*
* Streaming Server v1.0 by THEARTOFWEB Software
*
*/

#include <iostream>
#include <string>
#include <map>
#include <algorithm>
#include <process.h>
#include <cstdlib>
#include <ctime>
#include "socket.h"
#include <boost/circular_buffer.hpp>
using namespace std;
using namespace boost;

const string CRLF = "\r\n";
const int numbuff = 3;

unsigned int __stdcall Consumer(void* sock);
unsigned int __stdcall Producer(void*);

void getDateTime(char * szTime);

enum buffer_status
{
BUFF_DONE = 1,
BUFF_EMPTY = 0
};

struct buffer
{
unsigned char data[1024];
int bytesRecorded;
int flag;
buffer(const unsigned char * data_, const int bytesRecorded_, const int
flag_) :
bytesRecorded(bytesRecorded_), flag(flag_)
{
copy(data_, data_ + bytesRecorded_, data);
}
};

struct circular
{
circular_buffer<buffer> cb;
};

map<int, circular> users;
map<int, circular>::iterator uit;
map<int, HANDLE> eventi;

int main()
{
// Launch Producer
unsigned int prodRet;
_beginthreadex(0,0,Producer,NULL,0,&prodRet);
if(prodRet)
cout << "Launched Producer Thread!" << endl;

// Set up server (port: 8000, maxconn: 10)
SocketServer sockIn(8000, 10);

while(1)
{
// ...wait for incoming connections...
Socket* s = sockIn.Accept();
unsigned int sockRet;
_beginthreadex(0,0,Consumer,s,0,&sockRet);
if(sockRet)
cout << "Spawned a new thread!" << endl;
}

sockIn.Close();

return EXIT_SUCCESS;
}

// Consumer
unsigned int __stdcall Consumer(void* sock)
{
Socket* s = (Socket*) sock;

s->SendBytes("Hello World!" + CRLF);

int threadid = (int)GetCurrentThreadId();

// Create Event & push it in the event map
HANDLE hevent = CreateEvent(NULL,FALSE,FALSE,NULL);
eventi.insert(make_pair(threadid,hevent));

// Prepare & add circular buffer to the map
circular c;
c.cb.set_capacity(numbuff);

for(int i = 0; i<numbuff; i++)
{
c.cb.push_back(buffer(NULL,0,BUFF_EMPTY));
}

users.insert(make_pair(threadid, c));

//
// TODO:
// Read data from the buffer
// and send it to the client
//
// When using push_back the oldest
// element in the circular buffer
// will be in the index 0
//

Sleep(500);

while(1)
{
// CALLBACK EVENT
WaitForSingleObject(eventi[threadid], INFINITE);
if(users[threadid].cb.at(0).flag == BUFF_DONE)
{
string line = (char*)users[threadid].cb.at(0).data;
int ret = s->SendBytes(line + CRLF);
if(SOCKET_ERROR == ret)
break;
}
}

// Close & remove event from event map
CloseHandle(eventi[threadid]);
eventi.erase(threadid);

// Remove buffer from the map
users.erase(threadid);

// Say bye to the client
s->SendBytes("Bye bye!" + CRLF);

// Disconnect client
cout << "Closing thread..." << endl;
s->Close();
delete s;
return 0;
}

// Producer
unsigned int __stdcall Producer(void*)
{
while(1)
{
Sleep(1000);
char szTime[30]; getDateTime(szTime);
for(uit=users.begin(); uit!=users.end(); ++uit)
{
users[uit->first].cb.push_back(buffer((unsigned char*)szTime,
30, BUFF_DONE));
SetEvent(eventi[uit->first]);
cout << "Producer is writing to: " << uit->first << endl;
}
}
return 0;
}

void getDateTime(char * szTime)
{
time_t rawtime = time(NULL);
struct tm timeinfo;
gmtime_s(&timeinfo, &rawtime);
strftime(szTime, 30, "%a, %d %b %Y %X GMT", &timeinfo);
}

// thanks

From: Ulrich Eckhardt on
Larry wrote:
> It basically fires a runtime error when I disconect from the sever!
> (closing the telnet window)

Which error exactly?

> I wound up finding out that the error may be fired because of this line:
>
> WaitForSingleObject(eventi[threadid], INFINITE);
>
> If I replaceit with: Sleep(1000) everything goes ok....


> unsigned int __stdcall Consumer(void* sock)
> {
> Socket* s = (Socket*) sock;
>
> s->SendBytes("Hello World!" + CRLF);
>
> int threadid = (int)GetCurrentThreadId();
>
> // Create Event & push it in the event map
> HANDLE hevent = CreateEvent(NULL,FALSE,FALSE,NULL);
> eventi.insert(make_pair(threadid,hevent));
[...]
> while(1)
> {
> // CALLBACK EVENT
> WaitForSingleObject(eventi[threadid], INFINITE);
> if(users[threadid].cb.at(0).flag == BUFF_DONE)
> {
> string line = (char*)users[threadid].cb.at(0).data;
> int ret = s->SendBytes(line + CRLF);
> if(SOCKET_ERROR == ret)
> break;
> }
> }
>
> // Close & remove event from event map
> CloseHandle(eventi[threadid]);
> eventi.erase(threadid);
[...]
> }

If you close the connection, you will break out of the while loop and then
destroy the event and erase it from the map...

> unsigned int __stdcall Producer(void*)
> {
> while(1)
> {
> Sleep(1000);
> char szTime[30]; getDateTime(szTime);
> for(uit=users.begin(); uit!=users.end(); ++uit)
> {
> users[uit->first].cb.push_back(
> buffer((unsigned char*)szTime, 30, BUFF_DONE));
> SetEvent(eventi[uit->first]);
> cout << "Producer is writing to: " << uit->first << endl;
> }
> }
> return 0;
> }

.... while the producer is still reading it.

Generally, you didn't protect access to shared data in any way, it can
happen that one thread is reading the map with users while the other is
writing it. This is a no-go for multithreading. Take a look at boost::mutex
or win32's CRITICAL_SECTION.

Uli

--
C++ FAQ: http://parashift.com/c++-faq-lite

Sator Laser GmbH
Geschäftsführer: Thorsten Föcking, Amtsgericht Hamburg HR B62 932
From: Larry on

"Ulrich Eckhardt" <eckhardt(a)satorlaser.com> ha scritto nel messaggio
news:k4r537-krs.ln1(a)satorlaser.homedns.org...

> Which error exactly?

Expression: map/set iterator not incrementable

by the way, is it true that under windows system I should shy away form
using callback event rather I should be using boost::thread?

thanks


From: Larry on

"Ulrich Eckhardt" <eckhardt(a)satorlaser.com> ha scritto nel messaggio
news:k4r537-krs.ln1(a)satorlaser.homedns.org...

> Generally, you didn't protect access to shared data in any way, it can
> happen that one thread is reading the map with users while the other is
> writing it. This is a no-go for multithreading. Take a look at
> boost::mutex
> or win32's CRITICAL_SECTION.

so, do you think I should be using win32 mutex? bu where should I put it in
the code?

thanks

From: Scott McPhillips [MVP] on
"Larry" <dontmewithme(a)got.it> wrote in message
news:4b60e892$0$1102$4fafbaef(a)reader4.news.tin.it...
>
> "Ulrich Eckhardt" <eckhardt(a)satorlaser.com> ha scritto nel messaggio
> news:k4r537-krs.ln1(a)satorlaser.homedns.org...
>
>> Generally, you didn't protect access to shared data in any way, it can
>> happen that one thread is reading the map with users while the other is
>> writing it. This is a no-go for multithreading. Take a look at
>> boost::mutex
>> or win32's CRITICAL_SECTION.
>
> so, do you think I should be using win32 mutex? bu where should I put it
> in the code?
>
> thanks


No, use CRITICAL_SECTION. It has less overhead.

After initializing it (InitializeCriticalSection(&cs)), surround all
accesses to data shared between threads like this:

EnterCriticalSection(&cs);
....access or change shared data
LeaveCriticalSection(&cs);

The first statement suspends the calling thread if another thread is
"inside" a similar code block. When the other thread does the Leave..,.
call the first thread is allowed to proceed.

--
Scott McPhillips [VC++ MVP]