Appendix D includes the code listing for the client
program. A description of this program can be found in Chapter
5.
//****************************************************************
// Author: Hu Imm Lee
// Filename: client.h
// File Created: February 12, 1998
//
// Comments:
// Global settings for the client.
// Include this file before everything else.
//
//****************************************************************
// system start codes
#define PACK_START_CODE 0x000001ba
#define SYSTEM_HEADER_START_CODE 0x000001bb
#define ISO_11172_END_CODE 0x000001b9
// video start codes
#define PICTURE_START_CODE 0x00000100
#define SEQUENCE_HEADER_CODE 0x000001b3
#define SEQUENCE_END_CODE 0x000001b7
#define GROUP_START_CODE 0x000001b8
#define DEFAULT_VID_BITRATE 565000 // in bits per second
#define DEFAULT_FRAMERATE 29.97 // Broadcast rate
NTSC (fps)
// frame/packet types
#define CONTROL 0 // not part of MPEG stream
#define I_FRAME 1
#define P_FRAME 2
#define B_FRAME 3
#define HEADER 4
// packet controls
#define RESPONSE 6 // response from a retransmission request
#define RETRANS 7 // retransmission request
#define TERMINAL 8 // signal to end a transmission
#define PINGREQ 9 // request a ping service
#define PINGRES 10 // respond to a ping
#define INVALID 0 // indicates that packet not
control type
// TCP packet headers
#define REQUEST 5 // requesting a connection
#define RATE 11 // transmission rate
// TEMPORARILY hard-coded:
// UDP-related stuff
#define UDPSERVERPORT 8080
#define TCPSERVERPORT 8181
#define PINGSERVERPORT 8282
#define MYUDPPORT 8888
#define MYPINGPORT 8989
#define MAXPAYLOAD 2048 // 2K for UDP payload
#define DEFAULT_BUFSIZE 100 // 100K bytes
typedef struct UDP_packet {
WORD seqnum; // NOTE: need to reinitialize seqnum if seqnum >= 2^16-1
WORD pkt_type;
WORD control;
WORD f_parts[2]; // f_parts[0] = order of frame pkt
//f_parts[1] = total number of pkts per frame
WORD pl_size; // payload size
BYTE payload[MAXPAYLOAD];
} UDP_PACKET;
typedef struct TCP_packet {
int pkt_type;
double data;
struct sockaddr_in udpclientaddr;
} TCP_PACKET;
typedef struct PING_packet {
WORD pkt_type;
WORD seqnum;
WORD control;
DWORD timestamp;
} PING_PACKET;
// item of Accounting List, a doubly linked-list for convenience
typedef struct AccountItem {
WORD id; // packet #
DWORD expire; //time to consider retransmission if packet is delayed/lost
DWORD playtime; // time the packet will be consumed/played-out
struct AccountItem* next;
struct AccountItem* back;
} ACC_ITEM;
// item of Playout Buffer
typedef struct PoolItem {
BOOL arrived; // flag to indicate packet's presence in the pool
//ACC_ITEM* pAccounting; // pointer to node item in accounting list
// pAccounting's item shall account for pkt's loss/delay
UDP_PACKET packet;
} POOL_ITEM;
//****************************************************************
//
// Author: Hu Imm Lee
// Filename: main.cpp
// File Created: March 28, 1998
//
// Comments:
// + Contain the main function along with thread functions
//
//****************************************************************
#include <windows.h>
#include <stdio.h>
#include <math.h>
// Direct X Media SDK
#include <initguid.h>
#include <mmsystem.h>
#include <streams.h>
// end Direct X
#include "client.h"
#include "cNetInterface.h"
#include "PlayoutBufferPool.h"
#include "AccountingList.h"
#include "CAsyncIo.h"
#include "CAsyncReader.h"
#include "CPipeStream.h"
#include "memfile.h"
DWORD __stdcall TCPdialogThread(LPVOID);
DWORD __stdcall BufferManagerThread(LPVOID);
DWORD __stdcall RetransmissionManagerThread(LPVOID);
DWORD __stdcall VideoStreamManagerThread(LPVOID);
DWORD __stdcall PingManagerThread(LPVOID);
DWORD __stdcall DisplayManagerThread(LPVOID);
void PrintWin32Error(char *);
// GLOBALS
char hostaddr[12];
HANDLE gh_TCPdialog;
HANDLE gh_BufferManager;
HANDLE gh_RetransmissionManager;
HANDLE gh_VideoStreamManager;
HANDLE gh_DisplayManager = INVALID_HANDLE_VALUE;
HANDLE gh_PingManager;
DWORD rtd = 30; // round-trip-delay (in millisec)
DWORD inv_transmit_rate; // in millisec
DWORD startup_delay = 200; // in millisec
DWORD inv_playout_rate; // in millisec
NetInterface* pNetI = new NetInterface();
PlayoutBufferPool* pBufPool = new PlayoutBufferPool(DEFAULT_BUFSIZE);
AccountingList* pAccountList = new AccountingList();
void __cdecl main(int argc, char* argv[]) {
// for TCPdialog thread
HANDLE h_TCPdialog;
DWORD TCPdialogThreadId;
LPDWORD lpExitCode_TCPdialog = new DWORD;
// for BufferManager thread
HANDLE h_BufferManager;
DWORD BufferManagerThreadId;
LPDWORD lpExitCode_BufferManager = new DWORD;
// for RetransmissionManager thread
HANDLE h_RetransmissionManager;
DWORD RetransmissionManagerThreadId;
LPDWORD lpExitCode_RetransmissionManager = new DWORD;
// for Video Stream Manager thread
HANDLE h_VideoStreamManager;
DWORD VideoStreamManagerThreadId;
LPDWORD lpExitCode_VideoStreamManager = new DWORD;
// for Display Manager (created by VideoStreamManager)
LPDWORD lpExitCode_DisplayManager = new DWORD;
*lpExitCode_DisplayManager = STILL_ACTIVE;
// for Ping Manager thread
HANDLE h_PingManager;
DWORD PingManagerThreadId;
LPDWORD lpExitCode_PingManager = new DWORD;
BOOL done = FALSE;
if (argc != 2) {
printf("usage: client [IP address]\n");
return;
}
strcpy(hostaddr, argv[1]);
if (!pNetI->netinit())
return;
// execute first *ping* at server to determine round-trip-delay
rtd = pNetI->ping();
printf("rtd: %d\n", rtd);
// inverse transmission rate
inv_transmit_rate = ((double)1/DEFAULT_FRAMERATE)
* 1000; // in millisecs
inv_playout_rate = ((double)1/DEFAULT_FRAMERATE)
* 1000;
// TCP Dialog thread startup
h_TCPdialog = CreateThread(
NULL,
0,
TCPdialogThread,
NULL,
0,
&TCPdialogThreadId);
if (h_TCPdialog == NULL) {
PrintWin32Error("CreateThread for TCPdialog");
exit(0);
}
gh_TCPdialog = h_TCPdialog;
// Buffer Manager thread startup
h_BufferManager = CreateThread(
NULL,
0,
BufferManagerThread,
NULL,
0,
&BufferManagerThreadId);
if (h_BufferManager == NULL) {
PrintWin32Error("CreateThread for BufferManager");
exit(0);
}
gh_BufferManager = h_BufferManager;
// Retransmission Manager thread startup
h_RetransmissionManager = CreateThread(
NULL,
0,
RetransmissionManagerThread,
NULL,
0,
&RetransmissionManagerThreadId);
if (h_RetransmissionManager == NULL) {
PrintWin32Error("CreateThread for RetransmissionManager");
exit(0);
}
gh_RetransmissionManager = h_RetransmissionManager;
// Video Stream Manager thread startup
h_VideoStreamManager = CreateThread(
NULL,
0,
VideoStreamManagerThread,
NULL,
0,
&VideoStreamManagerThreadId);
if (h_VideoStreamManager == NULL) {
PrintWin32Error("CreateThread for VideoManager");
exit(0);
}
gh_VideoStreamManager = h_VideoStreamManager;
// Ping Manager thread startup
h_PingManager = CreateThread(
NULL,
0,
PingManagerThread,
NULL,
0,
&PingManagerThreadId);
if(h_PingManager == NULL) {
PrintWin32Error("CreateThread for PingManager");
exit(0);
}
gh_PingManager = h_PingManager;
while (gh_DisplayManager == INVALID_HANDLE_VALUE)
continue;
WaitForSingleObject(gh_DisplayManager, INFINITE);
}
//****************************************************************
// TCP DIALOG THREAD -
// effects
// + sends a startup request to server via TCP connection to
// initiate sending of UDP
// + waits to receive terminating message from server
//****************************************************************
DWORD __stdcall TCPdialogThread(LPVOID lpdwThreadParam)
{
TCP_PACKET tcp_recvpkt;
TCP_PACKET request;
UDP_PACKET ack;
struct sockaddr_in myaddr_info;
printf("TCPdialog started...\n");
// sending request to start ... in TCP
ZeroMemory(&request, sizeof(TCP_PACKET));
request.pkt_type = REQUEST;
myaddr_info.sin_family = AF_INET;
myaddr_info.sin_port = htons(MYUDPPORT);
myaddr_info.sin_addr.s_addr = htonl(INADDR_ANY);
request.udpclientaddr = myaddr_info;
if ( !pNetI->tcpsend(&request) ) {
printf("TCPdialog: failed to set up UDP connection with server\n");
return(0);
}
// wait for a reply ... should contain transmission rate;
TCP_PACKET reply;
pNetI->tcprecv(reply);
if (reply.pkt_type == RATE) {
printf("TCP dialog: receiving RATE\n");
inv_transmit_rate = 1/reply.data * 1000;
printf("\t1/rate = %ld\n", inv_transmit_rate);
}
// wait for TERMINAL packet via TCP
if ( !pNetI->tcprecv(tcp_recvpkt) )
printf("TCPdialog - tcprecv error\n");
if (tcp_recvpkt.pkt_type != TERMINAL) {
printf("TCPdialog: Hmmm ... not a terminal pkt %d\n", tcp_recvpkt.pkt_type);
return(0);
}
else {
// send an ACK to acknowlege termination by server
ZeroMemory(&ack, sizeof(UDP_PACKET));
ack.pkt_type = CONTROL;
ack.control = TERMINAL;
pNetI->sendpacket(&ack);
}
printf("\nTCPdialog exiting...\n");
return(0);
}
//****************************************************************
// BUFFER MANAGER THREAD -
// thread runs in a loop, gets terminated by Main process
//
// responsibilities
// + create accounting item on the Accounting List for next
// anticipated packet
// + fill in Playout Buffer as packets arrive
//
//****************************************************************
DWORD __stdcall BufferManagerThread(LPVOID lpdwThreadParam)
{
UDP_PACKET packet;
UDP_PACKET ackpacket;
//UDP_PACKET request_pkt;
POOL_ITEM* pBufItem = new POOL_ITEM;
ACC_ITEM* pAccountItem;
DWORD arrivaltime = 0; // arrival time of 1st packet
DWORD accumulated_timeunits = 0; // accumulated units of time (in millisec)
// since first packet arrived
DWORD nextpacket = 0;
LPDWORD lpExitCode_TCPdialog = new DWORD;
LPDWORD lpExitCode_DisplayManager = new DWORD;
int npkt = 0;
int prev=-1;
printf("BufferManager started...\n");
do {
ZeroMemory(&packet, sizeof(UDP_PACKET)); // zero out packet for next recv
pNetI->recvpacket(packet);
// first packet to arrive (won't detect a loss for any packets prior to
// this one)
if (arrivaltime == 0) {
// approx. time packet was received
arrivaltime = GetTickCount();
nextpacket = packet.seqnum;
}
accumulated_timeunits++;
nextpacket++;
npkt++;
if ( (packet.pkt_type == CONTROL) && (packet.control==TERMINAL) ) {
// send an ACK to server
printf("\nOK. Recvd TERMINAL udp packet fr server\n");
ZeroMemory(&ackpacket, sizeof(UDP_PACKET));
ackpacket.pkt_type = TERMINAL;
pNetI->sendpacket(&ackpacket);
}
else if ( (packet.pkt_type == CONTROL)
&& (packet.control == RETRANS) ) {
printf("\n\tRecvd packet type RETRANS\n");
// place into playout buffer
ZeroMemory(pBufItem, sizeof(POOL_ITEM));
pBufItem->arrived = TRUE;
pBufItem->packet = packet;
pBufPool->add(pBufItem); // add packet to Playout buffer
continue;
}
// get ready to fill playout buffer with packet
ZeroMemory(pBufItem, sizeof(POOL_ITEM));
pBufItem->arrived = TRUE;
pBufItem->packet = packet;
pBufPool->add(pBufItem); // add packet to Playout
buffer
// anticipate time to request a retransmission for the NEXT packet,
// etc
pAccountItem = new ACC_ITEM;
pAccountItem->id = nextpacket;
pAccountItem->expire = accumulated_timeunits *
inv_transmit_rate + arrivaltime;
pAccountItem->playtime = accumulated_timeunits * startup_delay
+ arrivaltime;
// add a node into Accountant List to wait for next incoming packet
pAccountList->add(pAccountItem);
if (GetExitCodeThread(gh_DisplayManager, lpExitCode_DisplayManager)
== FALSE)
break;
} while (*lpExitCode_DisplayManager == STILL_ACTIVE);
printf("BufferManager exiting...\n");
printf("\tReceived %d packets\n", npkt);
return(0);
}
//****************************************************************
// RETRANSMISSION MANAGER THREAD
// - Thread runs in a loop, terminated only by Main
//
// effects
// + sleeps for a specific duration (approx the transmission
// rate)
// + when thread wakes up, it verifies the arrival of an
// anticipated packet. If it arrived, account item is removed
// If not, then packet was lost/delayed so do retransmission
// if possible.
//****************************************************************
DWORD __stdcall RetransmissionManagerThread(LPVOID
lpdwThreadParam) {
ACC_ITEM* pFrontItem;
UDP_PACKET request;
int sleep_duration;
printf("Retransmission Manager started\n");
DWORD exitCode;
do {
pFrontItem = pAccountList->getfront();
if (pFrontItem == NULL) // empty list means nothing to do
continue;
sleep_duration = pFrontItem->expire - GetTickCount();
if (sleep_duration > 0)
Sleep(sleep_duration);
// wake up and see if the front of list is still the same
pFrontItem = pAccountList->getfront();
if (pFrontItem == NULL)
continue;
POOL_ITEM* pItem = pBufPool->getbufferptr(pFrontItem->id);
if ( pItem->arrived &&
(pItem->packet.seqnum >= pFrontItem->id) ) {
// it's possible that while this thread was sleeping,
// the item arrived, was consumed immediately and
// then replaced by a later packet, so the buffer slot
// now has a later packet
pAccountList->remove(pFrontItem);
continue;
}
if ( pFrontItem->expire >= GetTickCount()
) {
// time is up, packet should've arrived
printf("\tPkt[%d] lost (or delayed)\n",
pFrontItem->id);
printf("rtd: %d\n", rtd);
// check to see if retransmission is possible...
if (pFrontItem->expire + rtd < pFrontItem->playtime)
{
printf("\trequesting retransmission\n");
// request for lost/delayed packet
ZeroMemory(&request, sizeof(UDP_PACKET));
request.pkt_type = CONTROL;
request.control = RETRANS;
request.seqnum = pFrontItem->id; //prev+1;
pNetI->sendpacket(&request);
// reset expiry time & move this item to the end of list
pFrontItem->expire = pFrontItem->expire
+ rtd;
pAccountList->popitem(pFrontItem);
pAccountList->add(pFrontItem);
}
else { // if not enough time left to request retransmission, abandon
printf("\tNOT requesting retransmission\n");
pAccountList->remove(pFrontItem);
}
}
if (GetExitCodeThread(gh_DisplayManager, &exitCode) == FALSE)
break;
} while (exitCode == STILL_ACTIVE);
printf("Retransmission Manager exiting\n");
return (0);
}
//****************************************************************
// VIDEO STREAM MANAGER THREAD
// - reconstruct MPEG stream from the packets in Playout buffer
//
//****************************************************************
DWORD __stdcall VideoStreamManagerThread(LPVOID lpdwThreadParam)
{
UDP_PACKET packet;
HANDLE pipeH; // pipe handle
BOOL status;
// for Display Manager thread
HANDLE h_DisplayManager;
DWORD DisplayManagerThreadId;
LPDWORD lpExitCode_DisplayManager = new DWORD;
// create a NAMED PIPE to "flush" MPEG stream into the Filter Graph
// data will be written in stream of messages & read in a stream of bytes
// pipe is blocking (synchronous)
LPCTSTR PipeName = "\\\\.\\pipe\\MPEGSTREAMPIPE\0";
pipeH = CreateNamedPipe(
PipeName,
PIPE_ACCESS_OUTBOUND, // pipe open mode
PIPE_TYPE_MESSAGE|PIPE_READMODE_BYTE|PIPE_WAIT, // pipe mode
1, // max number of pipe instances
32768, // out buffer size (not actual but advised)
1024, // in buffer size (obviously won't be used in this pipe)
100, // default timeout in milliseconds (for connection
// establishment)
NULL // security attributes (default)
);
if (pipeH == INVALID_HANDLE_VALUE)
PrintWin32Error("CreateNamedPipe");
printf("Video Stream Manager started...\n");
// Display Manager thread startup
// note: this thread will read an MPEG bytestream from a named pipe
// It will also setup ActiveMovie components to display the stream
h_DisplayManager = CreateThread(
NULL,
0,
DisplayManagerThread,
NULL,
0,
&DisplayManagerThreadId);
if (h_DisplayManager == NULL) {
PrintWin32Error("CreateThread for DisplayManager");
exit(0);
}
gh_DisplayManager = h_DisplayManager;
printf("Creating event\n");
// create event to signal when to start the streaming
HANDLE evtStartStreamOut = CreateEvent(
NULL, // security attributes
FALSE, // auto reset
FALSE, // initial state unsignaled
"StartStream");
if (evtStartStreamOut == NULL)
PrintWin32Error("VideoStreamManagerThread:
Create Event");
printf("\nVideoStream Mgr: Event Start Stream
created at %d\n", GetTickCount());
// pass handle to playout buffer class in order for it to signal when it's full
pBufPool->StartStreamFlag = evtStartStreamOut;
// wait for the pipe's "client" to connect
status = ConnectNamedPipe(pipeH, NULL);
if (!status)
PrintWin32Error("ConnectNamedPipe");
printf("VideoStreamMgr: pipe connection is
est.\n");
// wait till the playout buffer is near full ... or till startup delay? ... may not be accurate
if (WaitForSingleObject(evtStartStreamOut, INFINITE) ==
WAIT_FAILED)
PrintWin32Error("WaitForSingleObject");
CloseHandle(evtStartStreamOut);
printf("\nVideoStream Mgr: Event signaled at time %d\n",
GetTickCount());
DWORD nwrite;
DWORD exitCode;
do {
// get an item from the playout queue
status = pBufPool->frontitem(packet);
if (packet.seqnum == 0)
printf("%d ", packet.seqnum);
// write to the pipe ...
if (status) {
status = WriteFile(pipeH, (char*)&packet.payload[0],
packet.pl_size, &nwrite, NULL);
if (!status)
PrintWin32Error("WriteFile");
ASSERT(nwrite == packet.pl_size);
if (nwrite != packet.pl_size)
printf("requested to write %d bytes to pipe, wrote
%d\n", packet.pl_size, nwrite);
}
Sleep((DWORD)inv_transmit_rate);
GetExitCodeThread(h_DisplayManager, &exitCode);
} while (exitCode == STILL_ACTIVE);
printf("VideoStream Manager exiting\n");
return (0);
}
//****************************************************************
// DISPLAY MANAGER THREAD
// called by VideoStreamManager to ensure that named pipe is
// already established before this thread can connect to it
//****************************************************************
DWORD __stdcall DisplayManagerThread(LPVOID lpdwThreadParam)
{
LPCTSTR PipeName = "\\\\.\\pipe\\MPEGSTREAMPIPE\0";
HRESULT hr;
// instantiate DirectX's Filter Graph Manager
IFilterGraph *pFG = NULL;
IGraphBuilder *pGB = NULL;
IMediaControl *pMC = NULL;
IMediaEvent *pME = NULL;
char ErrorMsg[512];
printf("Display Manager started ...\n");
// COM initialization, before you can start using DirectX stuff
hr = CoInitialize(NULL);
if (!SUCCEEDED(hr))
printf("CoInitialize failed\n");
hr = CoCreateInstance(CLSID_FilterGraph, NULL, CLSCTX_INPROC,
IID_IFilterGraph,
(void **)&pFG);
if (FAILED(hr)) {
printf("CoCreateInstance failed\n");
AMGetErrorText(hr, &ErrorMsg[0], sizeof(ErrorMsg));
printf("%s\n", ErrorMsg);
return 0;
}
CMediaType mediatype;
ZeroMemory(&mediatype, sizeof(AM_MEDIA_TYPE));
mediatype.majortype = MEDIATYPE_Stream;
mediatype.subtype = MEDIASUBTYPE_MPEG1System;
CPipeStream Stream(PipeName);
printf("pipe stream created\n");
CPipeReader* rdr = new CPipeReader(&Stream, &mediatype, &hr);
if (!SUCCEEDED(hr)) {
printf("CPipeReader...\n");
AMGetErrorText(hr, &ErrorMsg[0], sizeof(ErrorMsg));
printf("%s\n", ErrorMsg);
return 0;
}
rdr->AddRef();
// add the source filter
hr = pFG->AddFilter(rdr, L"Video Source");
if (FAILED(hr)) {
printf("AddFilter...\n");
AMGetErrorText(hr, &ErrorMsg[0], sizeof(ErrorMsg));
printf("%s\n", ErrorMsg);
return 0;
}
printf("source filter added\n");
hr = pFG->QueryInterface(IID_IGraphBuilder, (void **)&pGB);
if (FAILED(hr)) {
printf("QueryInterface(IGraphBuilder)...\n");
AMGetErrorText(hr, &ErrorMsg[0], sizeof(ErrorMsg));
printf("%s\n", ErrorMsg);
return 0;
}
if (pGB == NULL)
printf("graph builder null!\n");
CBasePin* pin = rdr->GetPin(0);
if (pin == NULL)
printf("null pin\n");
printf("Rendering...\n");
// render
hr = pGB->Render(pin);
if (FAILED(hr)) {
printf("Render...\n");
AMGetErrorText(hr, &ErrorMsg[0], sizeof(ErrorMsg));
printf("%s\n", ErrorMsg);
return 0;
}
printf("rendered\n");
pGB->Release();
// get ready to play
printf("querying IMediaControl\n");
hr = pFG->QueryInterface(IID_IMediaControl, (void **)&pMC);
if (FAILED(hr)) {
printf("QueryInterface(IMediaControl)...\n");
AMGetErrorText(hr, &ErrorMsg[0], sizeof(ErrorMsg));
printf("%s\n", ErrorMsg);
pMC->Release();
return 0;
}
hr = pFG->QueryInterface(IID_IMediaEvent, (void **)&pME);
if (FAILED(hr)) {
printf("QueryInterface(IMediaEvent)...\n");
AMGetErrorText(hr, &ErrorMsg[0], sizeof(ErrorMsg));
printf("%s\n", ErrorMsg);
pMC->Release();
return 0;
}
OAEVENT oEvent;
hr = pME->GetEventHandle(&oEvent);
if (SUCCEEDED(hr)) {
hr = pMC->Run();
}
if (SUCCEEDED(hr)) {
LONG levCode;
hr = pME->WaitForCompletion(INFINITE, &levCode);
}
pMC->Release();
pME->Release();
rdr->Release();
pFG->Release();
// deinitialize COM interfacing
CoUninitialize();
printf("Display thread quitting\n");
return(0);
}
//****************************************************************
// PING MANAGER THREAD
// - thread that manages PINGs to consistently determine avg RTD
// - expects termination from Main()
//****************************************************************
DWORD __stdcall PingManagerThread(LPVOID lpdwThreadParam)
{
DWORD sleeptime = 5; // sleep duration in seconds
int val;
// perform a ping at every 'sleeptime' interval
while (1) {
if (val= pNetI->ping() >= 0)
rtd = val;
else break;
Sleep(sleeptime * 1000);
}
printf("Ping Manager Thread exiting\n");
return (0);
}
//****************************************************************
// PRINTWIN32ERROR -
// + formats error number into string and prints it out
// + only format for GetLastError()
//****************************************************************
void PrintWin32Error(char *pszErrorString) {
LPVOID lpMsgBuf;
printf("Error: returned from [%s]\n",
pszErrorString);
if (FormatMessage(
FORMAT_MESSAGE_ALLOCATE_BUFFER |
FORMAT_MESSAGE_FROM_SYSTEM,
NULL,
GetLastError(),
MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
(LPSTR)&lpMsgBuf,
0,
NULL))
printf("Error Message: %s\n", lpMsgBuf);
}
//****************************************************************
// Author: Hu Imm Lee
// Filename: PlayoutBufferPool.h
// File Created: March 28, 1998
//
// Comments: Contains class definition for PlayoutBufferPool
//
//****************************************************************
typedef struct UDP_packet UDP_PACKET;
typedef struct PoolItem POOL_ITEM;
// Playout Buffer class (a fixed-size circular queue)
class PlayoutBufferPool {
public:
// constructor and destructor
PlayoutBufferPool(int minbufsize);
~PlayoutBufferPool();
BOOL add(POOL_ITEM*);
BOOL frontitem(UDP_PACKET&);
POOL_ITEM* getbufferptr(int);
BOOL buf_full; // flag to indicate that buffer is full for a first time
HANDLE StartStreamFlag;
static LPCTSTR semaphore_full; // name of semaphores
static LPCTSTR semaphore_empty;
static LPCTSTR semaphore_mutex;
HANDLE h_full;
HANDLE h_empty;
HANDLE h_mutex;
private:
static int buffsize;
static POOL_ITEM* pBuf; // ptr to circular array
static int iFront; // index to front of buffer
static int iRear; // index to rear of buffer
};
//****************************************************************
// Author: Hu Imm Lee
// Filename: PlayoutBufferPool.cpp
// File Created: March 28, 1998
//
// Comments: Contains implementation of PlayoutBufferPool class
//
//****************************************************************
#include <windows.h>
#include <stdio.h>
#include <math.h>
#include "client.h"
#include "PlayoutBufferPool.h"
#define TIMEOUTVAL 3*1000 // 5 seconds
extern void PrintWin32Error(char *);
//****************************************************************
// Playout Buffer Pool Class Implementation Below
//****************************************************************
// public vars
LPCTSTR PlayoutBufferPool::semaphore_full = "S_Full";
LPCTSTR PlayoutBufferPool::semaphore_empty = "S_Empty";
LPCTSTR PlayoutBufferPool::semaphore_mutex = "S_Mutex";
// private vars
int PlayoutBufferPool::buffsize = 0;
POOL_ITEM* PlayoutBufferPool::pBuf = NULL;
int PlayoutBufferPool::iRear = 0;
int PlayoutBufferPool::iFront = 0;
PlayoutBufferPool::PlayoutBufferPool(int minbufsize):buf_full(FALSE) {
// rear starts off pointing to the front because
buffer is empty
// make the buffer size divisible by 8 because the udp
// packet's seqnum is a max of 2^16 - 1, i.e., 2 bytes long
// for example: if minbufsize is 14, then buffsize should be 16
PlayoutBufferPool::buffsize = (int) ceil((double)minbufsize/8)
* 8;
printf("PlayoutBufferPool initialization:\n\tbuffsize is: %d\n",
PlayoutBufferPool::buffsize);
// make empty circular buffer here
PlayoutBufferPool::pBuf = new
POOL_ITEM[PlayoutBufferPool::buffsize];
ZeroMemory(pBuf, buffsize); // zero out buffer for
'safety' precautions
// the 'full' semaphore
h_full = CreateSemaphore(NULL, buffsize, buffsize,
semaphore_full);
// the 'empty' semaphore
h_empty = CreateSemaphore(NULL, 0, buffsize, semaphore_empty);
// the 'mutex' semaphore
h_mutex = CreateSemaphore(NULL, 1, 1, semaphore_mutex);
}
PlayoutBufferPool::~PlayoutBufferPool() {
if (PlayoutBufferPool::pBuf != NULL)
delete PlayoutBufferPool::pBuf;
PlayoutBufferPool::pBuf = NULL;
}
//****************************************************************
// ADD
// adds to the rear of the buffer pool
// effects
// + moves iRear forward
// + OVERWRITES the front item IF buffer gets full
// + makes deep copy when adding new item
//****************************************************************
BOOL PlayoutBufferPool::add(POOL_ITEM* pNewItem)
{
// wait for the 'full' semaphore
// if 'full' reaches 0, it indicates the buffer is full
if (WaitForSingleObject(h_full, TIMEOUTVAL) == WAIT_FAILED) {
PrintWin32Error("Buffer add: WaitForSingleObject");
printf("last item: pkt%d\n", pBuf[iRear].packet.seqnum);
}
// after 'wait', the semaphore count is decremented
// mutual exclusion semaphore, to 'deny' access to 'frontitem'
if (WaitForSingleObject(h_mutex, INFINITE) == WAIT_FAILED) {
PrintWin32Error("Buffer add: WaitForSingleObject");
printf("last item: pkt%d\n", pBuf[iRear].packet.seqnum);
}
/** now, you're safe to write to the buffer **/
// check to see whether iRear is pointing to the last element of pBuf
BOOL rear_at_end = boolean(PlayoutBufferPool::iRear ==
(PlayoutBufferPool::buffsize-1) );
BOOL front_at_end = boolean(PlayoutBufferPool::iFront ==
(PlayoutBufferPool::buffsize-1));
// write to buffer
PlayoutBufferPool::pBuf[PlayoutBufferPool::iRear]
= *pNewItem;
// iRear points ahead of the last element in the queue ...
// move iRear to index 0 if it's reached the edge of pBuf
PlayoutBufferPool::iRear = (rear_at_end) ? 0 :
PlayoutBufferPool::iRear+1;
if (buf_full == FALSE) // use a flag because you
want to signal only once
if ( iRear == (buffsize - 2) ) {
buf_full = TRUE;
if (SetEvent(StartStreamFlag) == 0)
PrintWin32Error("PlayoutBufferPool: SetEvent");
}
// there is now at least one slot available for a read
// so do ReleaseSemaphore to increment the 'empty' count
if (ReleaseSemaphore(h_empty, 1, NULL) == FALSE)
PrintWin32Error("Buffer add: ReleaseSemaphore");
if (ReleaseSemaphore(h_mutex, 1, NULL) == FALSE)
PrintWin32Error("Buffer add: ReleaseSemaphore(mutex)");
return TRUE;
}
//****************************************************************
// FRONTITEM
// gets the front item of the playout buffer pool
// effects
// + copies front item's packet into parameter pFrontItem
// + zeroes it out
// + corrects PlayoutBufferPool::iFront
//
// returns
// + TRUE if buffer is not empty
// + FALSE if otherwise
//****************************************************************
BOOL PlayoutBufferPool::frontitem(UDP_PACKET&
FrontItem) {
// wait for the 'empty' semaphore
// if 'empty' is 0, it indicates that the buffer is empty
// (so there's nothing to read)
if (WaitForSingleObject(h_empty, TIMEOUTVAL) == WAIT_FAILED) {
PrintWin32Error("Buffer frontitem: WaitForSingleObject");
printf("front item: pkt%d\n", pBuf[iFront].packet.seqnum);
}
// after 'wait', the semaphore count is decremented
// mutual exclusion semaphore, to 'deny' access to 'add'
if (WaitForSingleObject(h_mutex, INFINITE) == WAIT_FAILED) {
PrintWin32Error("Buffer frontitem: WaitForSingleObject");
printf("last item: pkt%d\n", pBuf[iRear].packet.seqnum);
}
/** Now, it's safe to read the buffer **/
// check to see whether iFront is pointing to the last buffer in buffer pool
BOOL front_at_end = boolean(iFront == buffsize-1);
FrontItem;
if (iFront != iRear) {
FrontItem = (PlayoutBufferPool::pBuf[iFront]).packet;
// zero out buffer after reading so that this slot can be used
// again by the 'add' function
ZeroMemory(&PlayoutBufferPool::pBuf[iFront],
sizeof(POOL_ITEM));
}
else return FALSE; // buffer virtually empty if
both ptrs are equal
// move iFront to the next buffer in the pool
// if iFront has reached the end of the pool, revert back to index 0
iFront = (front_at_end) ? 0 : iFront + 1;
// there is now at least one slot free to be written to
// so do ReleaseSemaphore to increment the 'full' count
if (ReleaseSemaphore(h_full, 1, NULL) == FALSE)
PrintWin32Error("Buffer frontitem: ReleaseSemaphore");
if (ReleaseSemaphore(h_mutex, 1, NULL) == FALSE)
PrintWin32Error("Buffer frontitem: ReleaseSemaphore(mutex)");
return TRUE;
}
//
// GETBUFFERPTR
// + return an indexing pointer to the buffer
// + index based on packet sequence number
POOL_ITEM* PlayoutBufferPool::getbufferptr(int index)
{
return &pBuf[index % buffsize];
}
//****************************************************************
// Author: Hu Imm Lee
// Filename: AccountingList.h
// File Created: March 29, 1998
//
// Class Definition of AccountingList
//****************************************************************
typedef struct AccountItem ACC_ITEM;
class AccountingList {
public:
AccountingList();
~AccountingList();
void add(ACC_ITEM*);
void remove(ACC_ITEM*);
ACC_ITEM* getfront();
void popitem(ACC_ITEM*);
private:
ACC_ITEM* pRear;
LPCRITICAL_SECTION lpCriticalSection;
};
//**********************************************************************
// Author: Hu Imm Lee
// Filename: AccountingList.cpp
// File Created: March 29, 1998
//
// Comments:
// + Contains class implementation of Accounting List, used to
// determine when to ask for a retransmitted packet
// + AccountingList is a doubly linked list ...
// eliminates search time
// + rear node points to front and vice versa
//**********************************************************************
#include <windows.h>
#include <stdio.h>
#include "client.h"
#include "AccountingList.h"
AccountingList::AccountingList():pRear(NULL) {
lpCriticalSection = new CRITICAL_SECTION;
InitializeCriticalSection(lpCriticalSection);
}
AccountingList::~AccountingList() {
}
//
// add to the end of the list
//
void AccountingList::add(ACC_ITEM* pNewItem) {
EnterCriticalSection(lpCriticalSection);
if (pRear == NULL) { // starts off empty
pNewItem->next = pNewItem; // point to itself
pNewItem->back = pNewItem; // point to itself
pRear = pNewItem;
}
else {
pNewItem->next = pRear->next; // point new node to front
pRear->next->back = pNewItem; // point front to the new node
pRear->next = pNewItem; // link new node to the current rear
pNewItem->back = pRear; // point new node back the preceding node
pRear = pNewItem;
}
LeaveCriticalSection(lpCriticalSection);
}
void AccountingList::remove(ACC_ITEM* pDeadItem)
{
EnterCriticalSection(lpCriticalSection);
if ( (pRear->next == pRear) ) { // only one item left
if (pDeadItem != pRear)
printf("TRAGEDY in AccountingList::remove - item to be removed
does not exist\n");
pRear = NULL;
}
else if (pDeadItem == pRear) { // if it's last item
pRear->back->next = pRear->next;
pRear->next->back = pRear->back;
pRear = pRear->back;
}
else {
pDeadItem->back->next = pDeadItem->next;
pDeadItem->next->back = pDeadItem->back;
}
delete pDeadItem;
pDeadItem = NULL;
LeaveCriticalSection(lpCriticalSection);
}
//**********************************************************************
// GETFRONT
// effects
// + references the front item without modification
// returns
// + NULL if the list is empty
// + pointer to the front item, if otherwise
//**********************************************************************
ACC_ITEM* AccountingList::getfront() {
if (pRear == NULL)
return NULL;
return pRear->next;
}
//****************************************************************
// POPITEM
// 'pops' an item off the list
//
// effects
// + similar to REMOVE except that item is not deallocated
//****************************************************************
void AccountingList::popitem(ACC_ITEM* pPopItem)
{
EnterCriticalSection(lpCriticalSection);
if ( (pRear->next == pRear) ) { // only one item left
if (pPopItem != pRear)
printf("TRAGEDY in AccountingList::popitem - item to be
popped does not exist\n");
pRear = NULL;
}
else if (pPopItem == pRear) { // if it's last item
pRear->back->next = pRear->next;
pRear->next->back = pRear->back;
pRear = pRear->back;
}
else {
pPopItem->back->next = pPopItem->next;
pPopItem->next->back = pPopItem->back;
}
LeaveCriticalSection(lpCriticalSection);
}
//****************************************************************
// Author: Hu Imm Lee
// Filename: cNetInterface.h
// File Created: February 12, 1998
//
// Contains class definition for client's NetInterface
//
// Note: this class is slightly different from the server's
//****************************************************************
typedef struct UDP_packet UDP_PACKET;
typedef struct TCP_packet TCP_PACKET;
class NetInterface {
public:
NetInterface();
~NetInterface();
BOOL netinit();
BOOL sendpacket(UDP_PACKET* pPacket);
BOOL recvpacket(UDP_PACKET& packet);
BOOL tcpsend(TCP_PACKET* pPacket);
BOOL tcprecv(TCP_PACKET& packet);
BOOL sendping(PING_PACKET* pPacket);
BOOL recvping(PING_PACKET& packet);
int ping();
private:
static SOCKET tcpd;
static SOCKET udpd;
static SOCKET pingd;
struct sockaddr_in udpaddr; // address of client host
struct sockaddr_in servaddr;
struct sockaddr_in pingservaddr;
};
//****************************************************************
// Author: Hu Imm Lee
// Filename: cNetInterface.cpp
// File Created: February 12, 1998
//
// Comments:
// Contains implementation of the client's
// NetInterface class
//****************************************************************
#include <stdio.h>
#include <windows.h>
#include "client.h"
#include "cNetInterface.h"
//****************************************************************
// NETWORK INTERFACE CLASS IMPLEMENTATION BELOW
//****************************************************************
extern char hostaddr[];
SOCKET NetInterface::tcpd = INVALID_SOCKET;
SOCKET NetInterface::udpd = INVALID_SOCKET;
SOCKET NetInterface::pingd = INVALID_SOCKET;
NetInterface::NetInterface() {
WSADATA WsaData;
WORD wVersionRequested;
int err;
// start initialization of WinSock API
wVersionRequested = MAKEWORD(1,1);
if ( (err = WSAStartup(wVersionRequested, &WsaData)) != 0 ) {
printf("Could not find WinSock 1.1 DLL\n");
exit(0);
}
}
NetInterface::~NetInterface() {
closesocket(udpd);
closesocket(tcpd);
WSACleanup();
}
//****************************************************************
// NETINIT
// initializes the network interface
// effects
// + creates UDP socket and allocates handle to
// NetInterface::udpd
// + binds UDP socket to a "well-known" port
// + creates TCP socket and connect to server
// returns
// + TRUE if UDP & TCP connection startups successful
// + FALSE on failure of any components
//****************************************************************
BOOL NetInterface::netinit() {
struct sockaddr_in udpaddr;
struct sockaddr_in tcpservaddr;
struct sockaddr_in pingaddr;
// UDP/IP setup
if ( (NetInterface::udpd = socket(AF_INET, SOCK_DGRAM,
IPPROTO_UDP)) == INVALID_SOCKET ) {
printf("SOCKET for udp ERR[%d]\n", WSAGetLastError() );
return FALSE;
}
// udp bind
ZeroMemory( (char *)&udpaddr, sizeof(udpaddr) );
udpaddr.sin_family = AF_INET;
udpaddr.sin_addr.s_addr = htonl(INADDR_ANY);
udpaddr.sin_port = htons(MYUDPPORT);
if ( bind(NetInterface::udpd, (struct sockaddr *)&udpaddr,
sizeof(udpaddr)) == SOCKET_ERROR ) {
printf("BIND for udp ERR[%d]\n", WSAGetLastError() );
return FALSE;
}
// allocate server's UDP address for later use when sending UDP packets
ZeroMemory(&servaddr, sizeof(struct sockaddr_in));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(UDPSERVERPORT);
servaddr.sin_addr.s_addr = inet_addr(hostaddr);
// TCP/IP setup
if ( (NetInterface::tcpd = socket(AF_INET, SOCK_STREAM, 0))
== INVALID_SOCKET ) {
printf("SOCKET for tcp ERR[%d]\n", WSAGetLastError() );
return FALSE;
}
// connect
ZeroMemory( (char *)&tcpservaddr, sizeof(tcpservaddr) );
tcpservaddr.sin_family = AF_INET;
tcpservaddr.sin_addr.s_addr = inet_addr(hostaddr);
tcpservaddr.sin_port = htons(TCPSERVERPORT);
if ( connect(NetInterface::tcpd, (struct sockaddr *)&tcpservaddr,
sizeof(tcpservaddr)) == SOCKET_ERROR ) {
printf("CONNECT ERR[%d]\n", WSAGetLastError() );
return FALSE;
}
// PING service port setup
if ( (NetInterface::pingd = socket(AF_INET, SOCK_DGRAM,
IPPROTO_UDP)) == INVALID_SOCKET ) {
printf("SOCKET for udp ERR[%d]\n", WSAGetLastError() );
return FALSE;
}
// ping bind
ZeroMemory( (char *)&pingaddr, sizeof(pingaddr) );
pingaddr.sin_family = AF_INET;
pingaddr.sin_addr.s_addr = htonl(INADDR_ANY);
pingaddr.sin_port = htons(MYPINGPORT);
if ( bind(NetInterface::pingd, (struct sockaddr *)&pingaddr,
sizeof(pingaddr)) == SOCKET_ERROR ) {
printf("BIND for udp ERR[%d]\n", WSAGetLastError() );
return FALSE;
}
// allocate server's PING service address for later use when sending PING
// packets
ZeroMemory(&pingservaddr, sizeof(struct sockaddr_in));
pingservaddr.sin_family = AF_INET;
pingservaddr.sin_port = htons(PINGSERVERPORT);
pingservaddr.sin_addr.s_addr = inet_addr(hostaddr);
return TRUE;
}
//****************************************************************
// SENDPACKET -
// effects
// sends a datagram via UDP
// returns
// + TRUE is successful
// + FALSE is failure
//****************************************************************
BOOL NetInterface::sendpacket(UDP_PACKET* pPacket)
{
int nsend;
nsend = sendto( NetInterface::udpd, (char *)pPacket,
sizeof(UDP_PACKET), 0,
(struct sockaddr *)&servaddr, sizeof(struct sockaddr_in) );
if (nsend == SOCKET_ERROR) {
printf("NetInterface::sendpacket ERR[%d]\n",
WSAGetLastError() );
return FALSE;
}
return TRUE;
}
//****************************************************************
// RECVPACKET
// effects
// + receives datagram via UDP
//****************************************************************
BOOL NetInterface::recvpacket(UDP_PACKET& packet)
{
int nrecv;
struct sockaddr_in address;
int addrlen = sizeof(address);
nrecv = recvfrom( NetInterface::udpd, (char *)&packet,
sizeof(UDP_PACKET), 0,
(struct sockaddr *)&address, &addrlen );
if (nrecv == SOCKET_ERROR) {
printf("NetInterface::recvpacket ERR[%d]\n",
WSAGetLastError() );
return FALSE;
}
return TRUE;
}
//****************************************************************
// TCPSEND -
// sends packet via TCP
//****************************************************************
BOOL NetInterface::tcpsend(TCP_PACKET* pPacket) {
int nsend;
nsend = send( NetInterface::tcpd, (char *)pPacket, sizeof(TCP_PACKET),
0);
if (nsend == SOCKET_ERROR) {
printf("NetInterface::tcpsend ERR[%d]\n", WSAGetLastError() );
return FALSE;
}
return TRUE;
}
//****************************************************************
// TCPRECV -
// receives packet via TCP
//****************************************************************
BOOL NetInterface::tcprecv(TCP_PACKET& packet) {
int nrecv;
nrecv = recv( NetInterface::tcpd, (char *)&packet, sizeof(TCP_PACKET),
0);
if (nrecv == SOCKET_ERROR) {
printf("NetInterface::tcprecv ERR[%d]\n", WSAGetLastError() );
return FALSE;
}
return TRUE;
}
BOOL NetInterface::sendping(PING_PACKET* pPacket)
{
int nsend;
nsend = sendto( NetInterface::pingd, (char *)pPacket,
sizeof(PING_PACKET), 0,
(struct sockaddr *)&pingservaddr, sizeof(struct sockaddr_in) );
if (nsend == SOCKET_ERROR) {
printf("NetInterface::sendping ERR[%d]\n",
WSAGetLastError() );
return FALSE;
}
return TRUE;
}
BOOL NetInterface::recvping(PING_PACKET& packet)
{
int nrecv;
struct sockaddr_in address;
int addrlen = sizeof(address);
nrecv = recvfrom( NetInterface::pingd, (char *)&packet,
sizeof(PING_PACKET), 0,
(struct sockaddr *)&address, &addrlen );
if (nrecv == SOCKET_ERROR) {
printf("NetInterface::recvping ERR[%d]\n", WSAGetLastError() );
return FALSE;
}
return TRUE;
}
// PING
// returns round-trip-delay in milliseconds
//
int NetInterface::ping() {
PING_PACKET pingreq; // request packet
PING_PACKET pingres; // response packet
DWORD timenow;
DWORD accumulated_duration = 0;
int pingcount = 5;
int actualcount = 0; // number of ping packets actually received (after
// accounting for loss)
WORD seqnum = 0;
fd_set fd_read;
FD_ZERO(&fd_read);
FD_SET(pingd, &fd_read);
struct timeval tv;
tv.tv_sec = 3; // wait for # seconds before timing out on the read
int select_status;
for (int i=0; i < pingcount; i++) { // loop #pingcount times
pingreq.pkt_type = CONTROL;
pingreq.seqnum = seqnum;
pingreq.control = PINGREQ;
pingreq.timestamp = GetTickCount();
sendping(&pingreq);
if ( (select_status = select(pingd+1, &fd_read,
NULL, NULL, &tv)) > 0 ) {
recvping(pingres);
actualcount++;
}
else if (select_status == SOCKET_ERROR) {
printf("select: socket error [%d]\n", WSAGetLastError());
return -1;
}
else continue; // timeout ... indicating possibly
lost pkt
// calculate round-trip-delay
timenow = GetTickCount();
accumulated_duration += timenow - pingres.timestamp;
seqnum++;
}
return (accumulated_duration/actualcount); // avg roundtrip
}
//****************************************************************
// Author: Hu Imm Lee
// Filename: CPipeStream.h
//
// Comments:
// + Contains class definition & implemention for CPipeStream
// + CPipsStream is a derived class of CAsyncStream
// + CPipeStream reads data from pipe into stream
//****************************************************************
#include <stdlib.h>
#include <malloc.h>
#define MAXBUF 1024*1024 // 0.5 MB RAM to be allocated to this stream
#define BLOCK MAXBUF/2 // block size the window should
slide by
extern void PrintWin32Error(char*);
class CPipeStream : public CAsyncStream {
public:
CPipeStream(LPCTSTR pipename) : m_llPosition(0), m_llAvailable(0),
offset(0) {
OpenPipe(pipename);
// allocate a sliding window
m_pbData = (PBYTE)malloc (MAXBUF);
}
HRESULT OpenPipe(LPCTSTR name) {
HRESULT hr;
// wait (indefinitely) for pipe to become available
BOOL status = WaitNamedPipe(
name,
NMPWAIT_WAIT_FOREVER);
ASSERT(status);
if (!status)
hr = E_FAIL;
// open & connect to the pipe
if (status == TRUE) {
hpipe = CreateFile(name,
GENERIC_READ, // read access only (data stream
// inbound)
0, // object cannot be shared
NULL, // security attributes -- handle cannot be inherited
OPEN_EXISTING, // actions to take if pipe exists
FILE_ATTRIBUTE_NORMAL, // flags and attributes
NULL // template file
);
ASSERT(hpipe != INVALID_HANDLE_VALUE);
if (hpipe == INVALID_HANDLE_VALUE)
hr = E_FAIL;
}
return S_OK;
}
HRESULT SetPointer(LONGLONG llPos) {
if (llPos < 0 || llPos-offset < 0 || llPos-offset
> m_llAvailable) {
printf("setpos failed: req(%ld) ", llPos);
printf("actual(%ld) ", llPos-offset);
printf("available(%ld)\n", m_llAvailable);
return S_FALSE;
}
else {
m_llPosition = llPos - offset;
return S_OK;
}
return S_OK;
}
HRESULT Read(PBYTE pbBuffer, DWORD dwBytesToRead,
BOOL bAlign, LPDWORD pdwBytesRead) {
BOOL status;
LONGLONG dwBytesLeftToRead = (m_llAvailable-m_llPosition
< dwBytesToRead) ?
dwBytesToRead - m_llAvailable + m_llPosition // bytes to read
// from the pipe
: 0; // nothing to read fr pipe //dwBytesToRead;
DWORD bytesread;
CAutoLock lck(&m_csLock);
*pdwBytesRead = 0;
// Read from pipe (it blocks if need be) into the allocated window
while ( dwBytesLeftToRead != 0 ) {
// slide the window
if (MAXBUF-m_llAvailable < dwBytesLeftToRead)
{
printf("sliding window\n");
PBYTE newdata = (PBYTE) malloc(MAXBUF);
memcpy(&newdata[0], &m_pbData[BLOCK],
MAXBUF-BLOCK );
free(m_pbData);
m_pbData = newdata;
if (m_pbData == NULL) {
printf("realloc failed!\n");
break;
}
// remember to change the offset!
offset += BLOCK;
printf("offset(%ld)\n", offset);
m_llAvailable -= BLOCK;
m_llPosition -= BLOCK;
}
status = ReadFile(hpipe,
&m_pbData[m_llAvailable],
(DWORD)dwBytesLeftToRead, &bytesread, NULL);
if (!status) {
PrintWin32Error("ReadFile");
printf("m_llAvailable(%ld) ", m_llAvailable);
printf("m_llPosition(%ld) ", m_llPosition);
printf("dwBytesLeftToRead(%ld) ",
dwBytesLeftToRead);
printf("sizeof[m_pbData] (%ld)\n",
_msize(m_pbData));
break;
}
// safety check in case pipe is already empty (because
// transmission has already ended)
if (bytesread == 0) {
break;
}
dwBytesLeftToRead -= bytesread;
m_llAvailable += bytesread;
}
memcpy(pbBuffer, &m_pbData[m_llPosition], dwBytesToRead-
(DWORD)dwBytesLeftToRead);
*pdwBytesRead = dwBytesToRead-
(DWORD)dwBytesLeftToRead;
m_llPosition += dwBytesToRead-(DWORD)dwBytesLeftToRead;
return S_OK;
}
LONGLONG Size(LONGLONG *pSizeAvailable) {
*pSizeAvailable = min(m_llAvailable+offset, MAXBUF+offset);
return INFINITE; // this is live data, so don't know how long
}
DWORD Alignment() {
return 1;
}
void Lock() {
m_csLock.Lock();
}
void Unlock() {
m_csLock.Unlock();
}
private:
HANDLE hpipe;
LONGLONG m_llPosition;
LONGLONG m_llAvailable; // data so far available to the stream
PBYTE m_pbData; // ptr to a sliding window
CCritSec m_csLock;
LONGLONG offset;
};
class CPipeReader : public CAsyncReader {
public:
STDMETHODIMP Register() {
return S_OK;
}
STDMETHODIMP Unregister() {
return S_OK;
}
CPipeReader(CPipeStream* pStream, CMediaType* pmt, HRESULT*
phr) :
CAsyncReader(NAME("Pipe Reader"), NULL, pStream, phr) {
m_mt = *pmt;
}
};
//****************************************************************
// Filename: CAsyncReader.h
// Copyright (c) 1992 - 1997 Microsoft Corporation. All Rights Reserved.
//****************************************************************
#ifndef __ASYNCRDR_H__
#define __ASYNCRDR_H__
//
// AsyncRdr
//
// Defines an IO source filter.
//
// This filter (CAsyncReader) supports IBaseFilter and IFileSourceFilter interfaces from the
// filter object itself. It has a single output pin (CAsyncOutputPin)
// which supports IPin and IAsyncReader.
//
// This filter is essentially a wrapper for the CAsyncFile class that does
// all the work.
//
// the filter class (defined below)
class CAsyncReader;
// the output pin class
class CAsyncOutputPin
: public IAsyncReader,
public CBasePin
{
protected:
CAsyncReader* m_pReader;
CAsyncIo * m_pIo;
// This is set every time we're asked to return an IAsyncReader
// interface
// This allows us to know if the downstream pin can use
// this transport, otherwise we can hook up to thinks like the
// dump filter and nothing happens
BOOL m_bQueriedForAsyncReader;
HRESULT InitAllocator(IMemAllocator **ppAlloc);
public:
// constructor and destructor
CAsyncOutputPin(
HRESULT * phr,
CAsyncReader *pReader,
CAsyncIo *pIo,
CCritSec * pLock);
~CAsyncOutputPin();
// --- CUnknown ---
// need to expose IAsyncReader
DECLARE_IUNKNOWN
STDMETHODIMP NonDelegatingQueryInterface(REFIID,
void**);
// --- CBasePin methods ---
// return the types we prefer - this will return the known
// file type
HRESULT GetMediaType(int iPosition, CMediaType
*pMediaType);
// can we support this type?
HRESULT CheckMediaType(const CMediaType* pType);
// Clear the flag so we see if IAsyncReader is queried for
HRESULT CheckConnect(IPin *pPin)
{
m_bQueriedForAsyncReader = FALSE;
return CBasePin::CheckConnect(pPin);
}
// See if it was asked for
HRESULT CompleteConnect(IPin *pReceivePin)
{
if (m_bQueriedForAsyncReader) {
return CBasePin::CompleteConnect(pReceivePin);
} else {
#ifdef VFW_E_NO_TRANSPORT
return VFW_E_NO_TRANSPORT;
#else
return E_FAIL;
#endif
}
}
// --- IAsyncReader methods ---
// pass in your preferred allocator and your preferred properties.
// method returns the actual allocator to be used. Call GetProperties
// on returned allocator to learn alignment and prefix etc chosen.
// this allocator will be not be committed and decommitted by
// the async reader, only by the consumer.
STDMETHODIMP RequestAllocator(
IMemAllocator* pPreferred,
ALLOCATOR_PROPERTIES* pProps,
IMemAllocator ** ppActual);
// queue a request for data.
// media sample start and stop times contain the requested absolute
// byte position (start inclusive, stop exclusive).
// may fail if sample not obtained from agreed allocator.
// may fail if start/stop position does not match agreed alignment.
// samples allocated from source pin's allocator may fail
// GetPointer until after returning from WaitForNext.
STDMETHODIMP Request(
IMediaSample* pSample,
DWORD dwUser); // user
context
// block until the next sample is completed or the timeout occurs.
// timeout (millisecs) may be 0 or INFINITE. Samples may not
// be delivered in order. If there is a read error of any sort, a
// notification will already have been sent by the source filter,
// and STDMETHODIMP will be an error.
STDMETHODIMP WaitForNext(
DWORD dwTimeout,
IMediaSample** ppSample, // completed sample
DWORD * pdwUser); // user
context
// sync read of data. Sample passed in must have been acquired from
// the agreed allocator. Start and stop position must be aligned.
// equivalent to a Request/WaitForNext pair, but may avoid the
// need for a thread on the source filter.
STDMETHODIMP SyncReadAligned(
IMediaSample* pSample);
// sync read. works in stopped state as well as run state.
// need not be aligned. Will fail if read is beyond actual total
// length.
STDMETHODIMP SyncRead(
LONGLONG llPosition, // absolute file position
LONG lLength, // nr bytes required
BYTE* pBuffer); // write data
here
// return total length of stream, and currently available length.
// reads for beyond the available length but within the total length will
// normally succeed but may block for a long period.
STDMETHODIMP Length(
LONGLONG* pTotal,
LONGLONG* pAvailable);
// cause all outstanding reads to return, possibly with a failure code
// (VFW_E_TIMEOUT) indicating they were cancelled.
// these are defined on IAsyncReader and IPin
STDMETHODIMP BeginFlush(void);
STDMETHODIMP EndFlush(void);
};
//
// The filter object itself. Supports IBaseFilter through
// CBaseFilter and also IFileSourceFilter directly
in this object
class CAsyncReader : public CBaseFilter
{
protected:
// filter-wide lock
CCritSec m_csFilter;
// all i/o done here
CAsyncIo m_Io;
// our output pin
CAsyncOutputPin m_OutputPin;
// Type we think our data is
CMediaType m_mt;
public:
// construction / destruction
CAsyncReader(
TCHAR *pName,
LPUNKNOWN pUnk,
CAsyncStream *pStream,
HRESULT *phr);
~CAsyncReader();
// --- CBaseFilter methods ---
int GetPinCount();
CBasePin *GetPin(int n);
// --- Access our media type
const CMediaType *LoadType() const
{
return &m_mt;
}
};
#endif //__ASYNCRDR_H__
//****************************************************************
// Filename: CAsyncReader.cpp
// Copyright (c) 1992 - 1997 Microsoft Corporation. All Rights Reserved.
//
// Implementation of Io source filter methods and output pin methods for
// CAsyncReader and CAsyncOutputPin
//
//****************************************************************
#include <streams.h>
#include "CAsyncIo.h"
#include "CAsyncReader.h"
// --- CAsyncOutputPin implementation ---
CAsyncOutputPin::CAsyncOutputPin(
HRESULT * phr,
CAsyncReader *pReader,
CAsyncIo *pIo,
CCritSec * pLock)
: CBasePin(
NAME("Async output pin"),
pReader,
pLock,
phr,
L"Output",
PINDIR_OUTPUT),
m_pReader(pReader),
m_pIo(pIo)
{
}
CAsyncOutputPin::~CAsyncOutputPin()
{
}
STDMETHODIMP
CAsyncOutputPin::NonDelegatingQueryInterface(REFIID riid, void** ppv)
{
CheckPointer(ppv,E_POINTER);
if (riid == IID_IAsyncReader) {
m_bQueriedForAsyncReader = TRUE;
return GetInterface((IAsyncReader*) this, ppv);
} else {
return CBasePin::NonDelegatingQueryInterface(riid, ppv);
}
}
HRESULT
CAsyncOutputPin::GetMediaType(int iPosition, CMediaType *pMediaType)
{
if (iPosition < 0) {
return E_INVALIDARG;
}
if (iPosition > 0) {
return VFW_S_NO_MORE_ITEMS;
}
*pMediaType = *m_pReader->LoadType();
return S_OK;
}
HRESULT
CAsyncOutputPin::CheckMediaType(const CMediaType* pType)
{
CAutoLock lck(m_pLock);
/* We treat MEDIASUBTYPE_NULL subtype as a wild card */
if ((m_pReader->LoadType()->majortype == pType->majortype) &&
(m_pReader->LoadType()->subtype == MEDIASUBTYPE_NULL ||
m_pReader->LoadType()->subtype == pType->subtype)) {
return S_OK;
}
return S_FALSE;
}
HRESULT
CAsyncOutputPin::InitAllocator(IMemAllocator **ppAlloc)
{
HRESULT hr = NOERROR;
*ppAlloc = NULL;
CMemAllocator *pMemObject = NULL;
/* Create a default memory allocator */
pMemObject = new CMemAllocator(NAME("Base memory allocator"),NULL, &hr);
if (pMemObject == NULL) {
return E_OUTOFMEMORY;
}
if (FAILED(hr)) {
delete pMemObject;
return hr;
}
/* Get a reference counted IID_IMemAllocator
interface */
hr = pMemObject->QueryInterface(IID_IMemAllocator,(void **)ppAlloc);
if (FAILED(hr)) {
delete pMemObject;
return E_NOINTERFACE;
}
ASSERT(*ppAlloc != NULL);
return NOERROR;
}
// we need to return an addrefed allocator, even if it is the preferred
// one, since he doesn't know whether it is the preferred one or not.
STDMETHODIMP
CAsyncOutputPin::RequestAllocator(
IMemAllocator* pPreferred,
ALLOCATOR_PROPERTIES* pProps,
IMemAllocator ** ppActual)
{
// we care about alignment but nothing else
if (!pProps->cbAlign || !m_pIo->IsAligned(pProps->cbAlign)) {
m_pIo->Alignment(&pProps->cbAlign);
}
ALLOCATOR_PROPERTIES Actual;
HRESULT hr;
if (pPreferred) {
hr = pPreferred->SetProperties(pProps, &Actual);
if (SUCCEEDED(hr) && m_pIo->IsAligned(Actual.cbAlign)) {
pPreferred->AddRef();
*ppActual = pPreferred;
return S_OK;
}
}
// create our own allocator
IMemAllocator* pAlloc;
hr = InitAllocator(&pAlloc);
if (FAILED(hr)) {
return hr;
}
//...and see if we can make it suitable
hr = pAlloc->SetProperties(pProps, &Actual);
if (SUCCEEDED(hr) && m_pIo->IsAligned(Actual.cbAlign)) {
// we need to release our refcount on pAlloc, and addref
// it to pass a refcount to the caller - this is a net nothing.
*ppActual = pAlloc;
return S_OK;
}
// failed to find a suitable allocator
pAlloc->Release();
// if we failed because of the IsAligned test, the error code will
// not be failure
if (SUCCEEDED(hr)) {
hr = VFW_E_BADALIGN;
}
return hr;
}
// queue an aligned read request. call WaitForNext to get
// completion.
STDMETHODIMP
CAsyncOutputPin::Request(
IMediaSample* pSample,
DWORD dwUser) // user context
{
REFERENCE_TIME tStart, tStop;
HRESULT hr = pSample->GetTime(&tStart, &tStop);
if (FAILED(hr)) {
return hr;
}
LONGLONG llPos = tStart / UNITS;
LONG lLength = (LONG) ((tStop - tStart) / UNITS);
LONGLONG llTotal;
LONGLONG llAvailable;
hr = m_pIo->Length(&llTotal, &llAvailable);
if (llPos + lLength > llTotal) {
// the end needs to be aligned, but may have been aligned
// on a coarser alignment.
LONG lAlign;
m_pIo->Alignment(&lAlign);
llTotal = (llTotal + lAlign -1) & ~(lAlign-1);
if (llPos + lLength > llTotal) {
lLength = (LONG) (llTotal - llPos);
// must be reducing this!
ASSERT((llTotal * UNITS) <= tStop);
tStop = llTotal * UNITS;
pSample->SetTime(&tStart, &tStop);
}
}
BYTE* pBuffer;
hr = pSample->GetPointer(&pBuffer);
if (FAILED(hr)) {
return hr;
}
return m_pIo->Request(
llPos,
lLength,
TRUE,
pBuffer,
(LPVOID)pSample,
dwUser);
}
// sync-aligned request. just like a request/waitfornext pair.
STDMETHODIMP
CAsyncOutputPin::SyncReadAligned(
IMediaSample* pSample)
{
REFERENCE_TIME tStart, tStop;
HRESULT hr = pSample->GetTime(&tStart, &tStop);
if (FAILED(hr)) {
return hr;
}
LONGLONG llPos = tStart / UNITS;
LONG lLength = (LONG) ((tStop - tStart) / UNITS);
LONGLONG llTotal;
LONGLONG llAvailable;
hr = m_pIo->Length(&llTotal, &llAvailable);
if (llPos + lLength > llTotal) {
// the end needs to be aligned, but may have been aligned
// on a coarser alignment.
LONG lAlign;
m_pIo->Alignment(&lAlign);
llTotal = (llTotal + lAlign -1) & ~(lAlign-1);
if (llPos + lLength > llTotal) {
lLength = (LONG) (llTotal - llPos);
// must be reducing this!
ASSERT((llTotal * UNITS) <= tStop);
tStop = llTotal * UNITS;
pSample->SetTime(&tStart, &tStop);
}
}
BYTE* pBuffer;
hr = pSample->GetPointer(&pBuffer);
if (FAILED(hr)) {
return hr;
}
LONG cbActual;
hr = m_pIo->SyncReadAligned(
llPos,
lLength,
pBuffer,
&cbActual
);
pSample->SetActualDataLength(cbActual);
return hr;
}
//
// collect the next ready sample
STDMETHODIMP
CAsyncOutputPin::WaitForNext(
DWORD dwTimeout,
IMediaSample** ppSample, // completed sample
DWORD * pdwUser) // user context
{
LONG cbActual;
IMediaSample* pSample;
HRESULT hr = m_pIo->WaitForNext(
dwTimeout,
(LPVOID*) &pSample,
pdwUser,
&cbActual
);
if (SUCCEEDED(hr)) {
pSample->SetActualDataLength(cbActual);
}
*ppSample = pSample;
return hr;
}
//
// synchronous read that need not be aligned.
STDMETHODIMP
CAsyncOutputPin::SyncRead(
LONGLONG llPosition, // absolute Io position
LONG lLength, // nr bytes required
BYTE* pBuffer) // write data here
{
return m_pIo->SyncRead(llPosition, lLength, pBuffer);
}
// return the length of the file, and the length currently
// available locally. We only support locally accessible files,
// so they are always the same
STDMETHODIMP
CAsyncOutputPin::Length(
LONGLONG* pTotal,
LONGLONG* pAvailable)
{
HRESULT hr = m_pIo->Length(pTotal, pAvailable);
return hr;
}
STDMETHODIMP
CAsyncOutputPin::BeginFlush(void)
{
return m_pIo->BeginFlush();
}
STDMETHODIMP
CAsyncOutputPin::EndFlush(void)
{
return m_pIo->EndFlush();
}
// --- CAsyncReader implementation ---
#pragma warning(disable:4355)
CAsyncReader::CAsyncReader(
TCHAR *pName,
LPUNKNOWN pUnk,
CAsyncStream *pStream,
HRESULT *phr)
: CBaseFilter(
pName,
pUnk,
&m_csFilter,
CLSID_AsyncReader,
NULL
),
m_OutputPin(
phr,
this,
&m_Io,
&m_csFilter),
m_Io(pStream)
{
}
CAsyncReader::~CAsyncReader()
{
}
int
CAsyncReader::GetPinCount()
{
return 1;
}
CBasePin *
CAsyncReader::GetPin(int n)
{
if ((GetPinCount() > 0) &&
(n == 0)) {
return &m_OutputPin;
} else {
return NULL;
}
}
//****************************************************************
// Filename: CAsyncIo.h
// Copyright (c) 1992 - 1997 Microsoft Corporation. All Rights Reserved.
//****************************************************************
#ifndef __ASYNCIO_H__
#define __ASYNCIO_H__
//
// definition of CAsyncFile object that performs file access. It provides
// asynchronous, unbuffered, aligned reads from a file, using a worker thread
// on win95 and potentially overlapped i/o if available.
// !!! Need to use real overlapped i/o if available
// currently only uses worker thread, not overlapped
i/o
class CAsyncIo;
class CAsyncStream;
//
// Model the stream we read from based on a file-like interface
//
class CAsyncStream
{
public:
virtual ~CAsyncStream() {};
virtual HRESULT SetPointer(LONGLONG llPos) = 0;
virtual HRESULT Read(PBYTE pbBuffer,
DWORD dwBytesToRead,
BOOL bAlign,
LPDWORD pdwBytesRead) = 0;
virtual LONGLONG Size(LONGLONG *pSizeAvailable = NULL) = 0;
virtual DWORD Alignment() = 0;
virtual void Lock() = 0;
virtual void Unlock() = 0;
};
// represents a single request and performs the i/o. Can be called on either
// worker thread or app thread, but must hold pcsFile across file accesses.
// (ie across SetFilePointer/ReadFile pairs)
class CAsyncRequest
{
CAsyncIo *m_pIo;
CAsyncStream *m_pStream;
LONGLONG m_llPos;
BOOL m_bAligned;
LONG m_lLength;
BYTE* m_pBuffer;
LPVOID m_pContext;
DWORD m_dwUser;
HRESULT m_hr;
public:
// init the params for this request. Issue the i/o
// if overlapped i/o is possible.
HRESULT Request(
CAsyncIo *pIo,
CAsyncStream *pStream,
LONGLONG llPos,
LONG lLength,
BOOL bAligned,
BYTE* pBuffer,
LPVOID pContext, // filter's context
DWORD dwUser); // downstream filter's context
// issue the i/o if not overlapped, and block until i/o complete.
// returns error code of file i/o
HRESULT Complete();
// cancels the i/o. blocks until i/o is no longer pending
HRESULT Cancel()
{
return S_OK;
};
// accessor functions
LPVOID GetContext()
{
return m_pContext;
};
DWORD GetUser()
{
return m_dwUser;
};
HRESULT GetHResult() {
return m_hr;
};
// we set m_lLength to the actual length
LONG GetActualLength() {
return m_lLength;
};
LONGLONG GetStart() {
return m_llPos;
};
};
typedef CGenericList<CAsyncRequest> CRequestList;
// this class needs a worker thread, but the ones defined in classes\base
// are not suitable (they assume you have one message sent or posted per
// request, whereas here for efficiency we want just to set an event when
// there is work on the queue).
//
// we create CAsyncRequest objects and queue them on m_listWork. The worker
// thread pulls them off, completes them and puts them on m_listDone.
// The events m_evWork and m_evDone are set when the corresponding lists are
// not empty.
//
// Synchronous requests are done on the caller thread. These should be
// synchronised by the caller, but to make sure we hold m_csFile across
// the SetFilePointer/ReadFile code.
//
// Flush by calling BeginFlush. This rejects all further requests (by
// setting m_bFlushing within m_csLists), cancels all requests and moves them
// to the done list, and sets m_evDone to ensure that no WaitForNext operations
// will block. Call EndFlush to cancel this state.
//
// we support unaligned calls to SyncRead. This is done by opening the file
// twice if we are using unbuffered i/o (m_dwAlign > 1).
// !!!fix this to buffer on top of existing file handle?
class CAsyncIo
{
CCritSec m_csReader;
CAsyncStream *m_pStream;
CCritSec m_csLists; // locks access to the list and events
BOOL m_bFlushing; // true if between BeginFlush/EndFlush
CRequestList m_listWork;
CRequestList m_listDone;
CAMEvent m_evWork; // set when list is not empty
CAMEvent m_evDone;
// for correct flush behaviour: all protected by m_csLists
LONG m_cItemsOut; // nr of items not on listDone or listWork
BOOL m_bWaiting; // TRUE if someone waiting for m_evAllDone
CAMEvent m_evAllDone; // signal when m_cItemsOut
goes to 0 if m_cWaiting
CAMEvent m_evStop; // set when thread should exit
HANDLE m_hThread;
LONGLONG Size() {
ASSERT(m_pStream != NULL);
return m_pStream->Size();
};
// start the thread
HRESULT StartThread(void);
// stop the thread and close the handle
HRESULT CloseThread(void);
// manage the list of requests. hold m_csLists and ensure
// that the (manual reset) event hevList is set when things on
// the list but reset when the list is empty.
// returns null if list empty
CAsyncRequest* GetWorkItem();
// get an item from the done list
CAsyncRequest* GetDoneItem();
// put an item on the work list
HRESULT PutWorkItem(CAsyncRequest* pRequest);
// put an item on the done list
HRESULT PutDoneItem(CAsyncRequest* pRequest);
// called on thread to process any active requests
void ProcessRequests(void);
// initial static thread proc calls ThreadProc with DWORD
// param as this
static DWORD InitialThreadProc(LPVOID pv) {
CAsyncIo * pThis = (CAsyncIo*) pv;
return pThis->ThreadProc();
};
DWORD ThreadProc(void);
public:
CAsyncIo(CAsyncStream *pStream);
~CAsyncIo();
// open the file
HRESULT Open(LPCTSTR pName);
// ready for async activity - call this before
// calling Request
HRESULT AsyncActive(void);
// call this when no more async activity will happen before
// the next AsyncActive call
HRESULT AsyncInactive(void);
// queue a requested read. must be aligned.
HRESULT Request(
LONGLONG llPos,
LONG lLength,
BOOL bAligned,
BYTE* pBuffer,
LPVOID pContext,
DWORD dwUser);
// wait for the next read to complete
HRESULT WaitForNext(
DWORD dwTimeout,
LPVOID *ppContext,
DWORD * pdwUser,
LONG * pcbActual
);
// perform a read of an already aligned buffer
HRESULT SyncReadAligned(
LONGLONG llPos,
LONG lLength,
BYTE* pBuffer,
LONG* pcbActual
);
// perform a synchronous read. will be buffered
// if not aligned.
HRESULT SyncRead(
LONGLONG llPos,
LONG lLength,
BYTE* pBuffer);
// return length
HRESULT Length(LONGLONG *pllTotal, LONGLONG*
pllAvailable);
// all Reader positions, read lengths and memory locations must
// be aligned to this.
HRESULT Alignment(LONG* pl);
HRESULT BeginFlush();
HRESULT EndFlush();
LONG Alignment()
{
return m_pStream->Alignment();
};
BOOL IsAligned(LONG l) {
if ((l & (Alignment() -1)) == 0) {
return TRUE;
} else {
return FALSE;
}
};
BOOL IsAligned(LONGLONG ll) {
return IsAligned( (LONG) (ll & 0xffffffff));
};
};
#endif // __ASYNCIO_H__
//****************************************************************
// Filename: CAsyncIo.cpp
// Copyright (c) 1992 - 1997 Microsoft Corporation. All Rights Reserved.
//****************************************************************
#include <streams.h>
#include "CAsyncIo.h"
// --- CAsyncRequest ---
// implementation of CAsyncRequest representing a single
// outstanding request. All the i/o for this object is done
// in the Complete method.
// init the params for this request.
// Read is not issued until the complete call
HRESULT
CAsyncRequest::Request(
CAsyncIo *pIo,
CAsyncStream *pStream,
LONGLONG llPos,
LONG lLength,
BOOL bAligned,
BYTE* pBuffer,
LPVOID pContext, // filter's cont