APPENDIX D

Appendix D includes the code listing for the client program. A description of this program can be found in Chapter 5.

//****************************************************************

// Author: Hu Imm Lee

// Filename: client.h

// File Created: February 12, 1998

//

// Comments:

// Global settings for the client.

// Include this file before everything else.

//

//****************************************************************

// system start codes

#define PACK_START_CODE 0x000001ba

#define SYSTEM_HEADER_START_CODE 0x000001bb

#define ISO_11172_END_CODE 0x000001b9

// video start codes

#define PICTURE_START_CODE 0x00000100

#define SEQUENCE_HEADER_CODE 0x000001b3

#define SEQUENCE_END_CODE 0x000001b7

#define GROUP_START_CODE 0x000001b8

#define DEFAULT_VID_BITRATE 565000 // in bits per second

#define DEFAULT_FRAMERATE 29.97 // Broadcast rate NTSC (fps)

// frame/packet types

#define CONTROL 0 // not part of MPEG stream

#define I_FRAME 1

#define P_FRAME 2

#define B_FRAME 3

#define HEADER 4

// packet controls

#define RESPONSE 6 // response from a retransmission request

#define RETRANS 7 // retransmission request

#define TERMINAL 8 // signal to end a transmission

#define PINGREQ 9 // request a ping service

#define PINGRES 10 // respond to a ping

#define INVALID 0 // indicates that packet not control type

// TCP packet headers

#define REQUEST 5 // requesting a connection

#define RATE 11 // transmission rate

// TEMPORARILY hard-coded:

// UDP-related stuff

#define UDPSERVERPORT 8080

#define TCPSERVERPORT 8181

#define PINGSERVERPORT 8282

#define MYUDPPORT 8888

#define MYPINGPORT 8989

#define MAXPAYLOAD 2048 // 2K for UDP payload

#define DEFAULT_BUFSIZE 100 // 100K bytes

typedef struct UDP_packet {

WORD seqnum; // NOTE: need to reinitialize seqnum if seqnum >= 2^16-1

WORD pkt_type;

WORD control;

WORD f_parts[2]; // f_parts[0] = order of frame pkt

//f_parts[1] = total number of pkts per frame

WORD pl_size; // payload size

BYTE payload[MAXPAYLOAD];

} UDP_PACKET;

typedef struct TCP_packet {

int pkt_type;

double data;

struct sockaddr_in udpclientaddr;

} TCP_PACKET;

typedef struct PING_packet {

WORD pkt_type;

WORD seqnum;

WORD control;

DWORD timestamp;

} PING_PACKET;

// item of Accounting List, a doubly linked-list for convenience

typedef struct AccountItem {

WORD id; // packet #

DWORD expire; //time to consider retransmission if packet is delayed/lost

DWORD playtime; // time the packet will be consumed/played-out

struct AccountItem* next;

struct AccountItem* back;

} ACC_ITEM;

// item of Playout Buffer

typedef struct PoolItem {

BOOL arrived; // flag to indicate packet's presence in the pool

//ACC_ITEM* pAccounting; // pointer to node item in accounting list

// pAccounting's item shall account for pkt's loss/delay

UDP_PACKET packet;

} POOL_ITEM;

//****************************************************************

//

// Author: Hu Imm Lee

// Filename: main.cpp

// File Created: March 28, 1998

//

// Comments:

// + Contain the main function along with thread functions

//

//****************************************************************

#include <windows.h>

#include <stdio.h>

#include <math.h>

// Direct X Media SDK

#include <initguid.h>

#include <mmsystem.h>

#include <streams.h>

// end Direct X

#include "client.h"

#include "cNetInterface.h"

#include "PlayoutBufferPool.h"

#include "AccountingList.h"

#include "CAsyncIo.h"

#include "CAsyncReader.h"

#include "CPipeStream.h"

#include "memfile.h"

DWORD __stdcall TCPdialogThread(LPVOID);

DWORD __stdcall BufferManagerThread(LPVOID);

DWORD __stdcall RetransmissionManagerThread(LPVOID);

DWORD __stdcall VideoStreamManagerThread(LPVOID);

DWORD __stdcall PingManagerThread(LPVOID);

DWORD __stdcall DisplayManagerThread(LPVOID);

void PrintWin32Error(char *);

// GLOBALS

char hostaddr[12];

HANDLE gh_TCPdialog;

HANDLE gh_BufferManager;

HANDLE gh_RetransmissionManager;

HANDLE gh_VideoStreamManager;

HANDLE gh_DisplayManager = INVALID_HANDLE_VALUE;

HANDLE gh_PingManager;

DWORD rtd = 30; // round-trip-delay (in millisec)

DWORD inv_transmit_rate; // in millisec

DWORD startup_delay = 200; // in millisec

DWORD inv_playout_rate; // in millisec

NetInterface* pNetI = new NetInterface();

PlayoutBufferPool* pBufPool = new PlayoutBufferPool(DEFAULT_BUFSIZE);

AccountingList* pAccountList = new AccountingList();

void __cdecl main(int argc, char* argv[]) {

// for TCPdialog thread

HANDLE h_TCPdialog;

DWORD TCPdialogThreadId;

LPDWORD lpExitCode_TCPdialog = new DWORD;

// for BufferManager thread

HANDLE h_BufferManager;

DWORD BufferManagerThreadId;

LPDWORD lpExitCode_BufferManager = new DWORD;

// for RetransmissionManager thread

HANDLE h_RetransmissionManager;

DWORD RetransmissionManagerThreadId;

LPDWORD lpExitCode_RetransmissionManager = new DWORD;

// for Video Stream Manager thread

HANDLE h_VideoStreamManager;

DWORD VideoStreamManagerThreadId;

LPDWORD lpExitCode_VideoStreamManager = new DWORD;

// for Display Manager (created by VideoStreamManager)

LPDWORD lpExitCode_DisplayManager = new DWORD;

*lpExitCode_DisplayManager = STILL_ACTIVE;

// for Ping Manager thread

HANDLE h_PingManager;

DWORD PingManagerThreadId;

LPDWORD lpExitCode_PingManager = new DWORD;

BOOL done = FALSE;

if (argc != 2) {

printf("usage: client [IP address]\n");

return;

}

strcpy(hostaddr, argv[1]);

if (!pNetI->netinit())

return;

// execute first *ping* at server to determine round-trip-delay

rtd = pNetI->ping();

printf("rtd: %d\n", rtd);

// inverse transmission rate

inv_transmit_rate = ((double)1/DEFAULT_FRAMERATE) * 1000; // in millisecs

inv_playout_rate = ((double)1/DEFAULT_FRAMERATE) * 1000;

// TCP Dialog thread startup

h_TCPdialog = CreateThread(

NULL,

0,

TCPdialogThread,

NULL,

0,

&TCPdialogThreadId);

if (h_TCPdialog == NULL) {

PrintWin32Error("CreateThread for TCPdialog");

exit(0);

}

gh_TCPdialog = h_TCPdialog;

// Buffer Manager thread startup

h_BufferManager = CreateThread(

NULL,

0,

BufferManagerThread,

NULL,

0,

&BufferManagerThreadId);

if (h_BufferManager == NULL) {

PrintWin32Error("CreateThread for BufferManager");

exit(0);

}

gh_BufferManager = h_BufferManager;

// Retransmission Manager thread startup

h_RetransmissionManager = CreateThread(

NULL,

0,

RetransmissionManagerThread,

NULL,

0,

&RetransmissionManagerThreadId);

if (h_RetransmissionManager == NULL) {

PrintWin32Error("CreateThread for RetransmissionManager");

exit(0);

}

gh_RetransmissionManager = h_RetransmissionManager;

// Video Stream Manager thread startup

h_VideoStreamManager = CreateThread(

NULL,

0,

VideoStreamManagerThread,

NULL,

0,

&VideoStreamManagerThreadId);

if (h_VideoStreamManager == NULL) {

PrintWin32Error("CreateThread for VideoManager");

exit(0);

}

gh_VideoStreamManager = h_VideoStreamManager;

// Ping Manager thread startup

h_PingManager = CreateThread(

NULL,

0,

PingManagerThread,

NULL,

0,

&PingManagerThreadId);

if(h_PingManager == NULL) {

PrintWin32Error("CreateThread for PingManager");

exit(0);

}

gh_PingManager = h_PingManager;

while (gh_DisplayManager == INVALID_HANDLE_VALUE)

continue;

WaitForSingleObject(gh_DisplayManager, INFINITE);

}

//****************************************************************

// TCP DIALOG THREAD -

// effects

// + sends a startup request to server via TCP connection to

// initiate sending of UDP

// + waits to receive terminating message from server

//****************************************************************

DWORD __stdcall TCPdialogThread(LPVOID lpdwThreadParam) {

TCP_PACKET tcp_recvpkt;

TCP_PACKET request;

UDP_PACKET ack;

struct sockaddr_in myaddr_info;

printf("TCPdialog started...\n");

// sending request to start ... in TCP

ZeroMemory(&request, sizeof(TCP_PACKET));

request.pkt_type = REQUEST;

myaddr_info.sin_family = AF_INET;

myaddr_info.sin_port = htons(MYUDPPORT);

myaddr_info.sin_addr.s_addr = htonl(INADDR_ANY);

request.udpclientaddr = myaddr_info;

if ( !pNetI->tcpsend(&request) ) {

printf("TCPdialog: failed to set up UDP connection with server\n");

return(0);

}

// wait for a reply ... should contain transmission rate;

TCP_PACKET reply;

pNetI->tcprecv(reply);

if (reply.pkt_type == RATE) {

printf("TCP dialog: receiving RATE\n");

inv_transmit_rate = 1/reply.data * 1000;

printf("\t1/rate = %ld\n", inv_transmit_rate);

}

// wait for TERMINAL packet via TCP

if ( !pNetI->tcprecv(tcp_recvpkt) )

printf("TCPdialog - tcprecv error\n");

if (tcp_recvpkt.pkt_type != TERMINAL) {

printf("TCPdialog: Hmmm ... not a terminal pkt %d\n", tcp_recvpkt.pkt_type);

return(0);

}

else {

// send an ACK to acknowlege termination by server

ZeroMemory(&ack, sizeof(UDP_PACKET));

ack.pkt_type = CONTROL;

ack.control = TERMINAL;

pNetI->sendpacket(&ack);

}

printf("\nTCPdialog exiting...\n");

return(0);

}


//****************************************************************

// BUFFER MANAGER THREAD -

// thread runs in a loop, gets terminated by Main process

//

// responsibilities

// + create accounting item on the Accounting List for next

// anticipated packet

// + fill in Playout Buffer as packets arrive

//

//****************************************************************

DWORD __stdcall BufferManagerThread(LPVOID lpdwThreadParam) {

UDP_PACKET packet;

UDP_PACKET ackpacket;

//UDP_PACKET request_pkt;

POOL_ITEM* pBufItem = new POOL_ITEM;

ACC_ITEM* pAccountItem;

DWORD arrivaltime = 0; // arrival time of 1st packet

DWORD accumulated_timeunits = 0; // accumulated units of time (in millisec)

// since first packet arrived

DWORD nextpacket = 0;

LPDWORD lpExitCode_TCPdialog = new DWORD;

LPDWORD lpExitCode_DisplayManager = new DWORD;

int npkt = 0;

int prev=-1;

printf("BufferManager started...\n");

do {

ZeroMemory(&packet, sizeof(UDP_PACKET)); // zero out packet for next recv

pNetI->recvpacket(packet);

// first packet to arrive (won't detect a loss for any packets prior to

// this one)

if (arrivaltime == 0) {

// approx. time packet was received

arrivaltime = GetTickCount();

nextpacket = packet.seqnum;

}

accumulated_timeunits++;

nextpacket++;

npkt++;

if ( (packet.pkt_type == CONTROL) && (packet.control==TERMINAL) ) {

// send an ACK to server

printf("\nOK. Recvd TERMINAL udp packet fr server\n");

ZeroMemory(&ackpacket, sizeof(UDP_PACKET));

ackpacket.pkt_type = TERMINAL;

pNetI->sendpacket(&ackpacket);

}

else if ( (packet.pkt_type == CONTROL)

&& (packet.control == RETRANS) ) {

printf("\n\tRecvd packet type RETRANS\n");

// place into playout buffer

ZeroMemory(pBufItem, sizeof(POOL_ITEM));

pBufItem->arrived = TRUE;

pBufItem->packet = packet;

pBufPool->add(pBufItem); // add packet to Playout buffer

continue;

}

// get ready to fill playout buffer with packet

ZeroMemory(pBufItem, sizeof(POOL_ITEM));

pBufItem->arrived = TRUE;

pBufItem->packet = packet;

pBufPool->add(pBufItem); // add packet to Playout buffer

// anticipate time to request a retransmission for the NEXT packet,

// etc

pAccountItem = new ACC_ITEM;

pAccountItem->id = nextpacket;

pAccountItem->expire = accumulated_timeunits *

inv_transmit_rate + arrivaltime;

pAccountItem->playtime = accumulated_timeunits * startup_delay

+ arrivaltime;

// add a node into Accountant List to wait for next incoming packet

pAccountList->add(pAccountItem);

if (GetExitCodeThread(gh_DisplayManager, lpExitCode_DisplayManager)

== FALSE)

break;

} while (*lpExitCode_DisplayManager == STILL_ACTIVE);

printf("BufferManager exiting...\n");

printf("\tReceived %d packets\n", npkt);

return(0);

}

//****************************************************************

// RETRANSMISSION MANAGER THREAD

// - Thread runs in a loop, terminated only by Main

//

// effects

// + sleeps for a specific duration (approx the transmission

// rate)

// + when thread wakes up, it verifies the arrival of an

// anticipated packet. If it arrived, account item is removed

// If not, then packet was lost/delayed so do retransmission

// if possible.

//****************************************************************

DWORD __stdcall RetransmissionManagerThread(LPVOID lpdwThreadParam) {

ACC_ITEM* pFrontItem;

UDP_PACKET request;

int sleep_duration;

printf("Retransmission Manager started\n");

DWORD exitCode;

do {

pFrontItem = pAccountList->getfront();

if (pFrontItem == NULL) // empty list means nothing to do

continue;

sleep_duration = pFrontItem->expire - GetTickCount();

if (sleep_duration > 0)

Sleep(sleep_duration);

// wake up and see if the front of list is still the same

pFrontItem = pAccountList->getfront();

if (pFrontItem == NULL)

continue;

POOL_ITEM* pItem = pBufPool->getbufferptr(pFrontItem->id);

if ( pItem->arrived &&

(pItem->packet.seqnum >= pFrontItem->id) ) {

// it's possible that while this thread was sleeping,

// the item arrived, was consumed immediately and

// then replaced by a later packet, so the buffer slot

// now has a later packet

pAccountList->remove(pFrontItem);

continue;

}

if ( pFrontItem->expire >= GetTickCount() ) {

// time is up, packet should've arrived

printf("\tPkt[%d] lost (or delayed)\n", pFrontItem->id);

printf("rtd: %d\n", rtd);

// check to see if retransmission is possible...

if (pFrontItem->expire + rtd < pFrontItem->playtime) {

printf("\trequesting retransmission\n");

// request for lost/delayed packet

ZeroMemory(&request, sizeof(UDP_PACKET));

request.pkt_type = CONTROL;

request.control = RETRANS;

request.seqnum = pFrontItem->id; //prev+1;

pNetI->sendpacket(&request);

// reset expiry time & move this item to the end of list

pFrontItem->expire = pFrontItem->expire + rtd;

pAccountList->popitem(pFrontItem);

pAccountList->add(pFrontItem);

}

else { // if not enough time left to request retransmission, abandon

printf("\tNOT requesting retransmission\n");

pAccountList->remove(pFrontItem);

}

}

if (GetExitCodeThread(gh_DisplayManager, &exitCode) == FALSE)

break;

} while (exitCode == STILL_ACTIVE);

printf("Retransmission Manager exiting\n");

return (0);

}

//****************************************************************

// VIDEO STREAM MANAGER THREAD

// - reconstruct MPEG stream from the packets in Playout buffer

//

//****************************************************************

DWORD __stdcall VideoStreamManagerThread(LPVOID lpdwThreadParam) {

UDP_PACKET packet;

HANDLE pipeH; // pipe handle

BOOL status;

// for Display Manager thread

HANDLE h_DisplayManager;

DWORD DisplayManagerThreadId;

LPDWORD lpExitCode_DisplayManager = new DWORD;

// create a NAMED PIPE to "flush" MPEG stream into the Filter Graph

// data will be written in stream of messages & read in a stream of bytes

// pipe is blocking (synchronous)

LPCTSTR PipeName = "\\\\.\\pipe\\MPEGSTREAMPIPE\0";

pipeH = CreateNamedPipe(

PipeName,

PIPE_ACCESS_OUTBOUND, // pipe open mode

PIPE_TYPE_MESSAGE|PIPE_READMODE_BYTE|PIPE_WAIT, // pipe mode

1, // max number of pipe instances

32768, // out buffer size (not actual but advised)

1024, // in buffer size (obviously won't be used in this pipe)

100, // default timeout in milliseconds (for connection

// establishment)

NULL // security attributes (default)

);

if (pipeH == INVALID_HANDLE_VALUE)

PrintWin32Error("CreateNamedPipe");

printf("Video Stream Manager started...\n");

// Display Manager thread startup

// note: this thread will read an MPEG bytestream from a named pipe

// It will also setup ActiveMovie components to display the stream

h_DisplayManager = CreateThread(

NULL,

0,

DisplayManagerThread,

NULL,

0,

&DisplayManagerThreadId);

if (h_DisplayManager == NULL) {

PrintWin32Error("CreateThread for DisplayManager");

exit(0);

}

gh_DisplayManager = h_DisplayManager;

printf("Creating event\n");

// create event to signal when to start the streaming

HANDLE evtStartStreamOut = CreateEvent(

NULL, // security attributes

FALSE, // auto reset

FALSE, // initial state unsignaled

"StartStream");

if (evtStartStreamOut == NULL)

PrintWin32Error("VideoStreamManagerThread: Create Event");

printf("\nVideoStream Mgr: Event Start Stream created at %d\n", GetTickCount());

// pass handle to playout buffer class in order for it to signal when it's full

pBufPool->StartStreamFlag = evtStartStreamOut;

// wait for the pipe's "client" to connect

status = ConnectNamedPipe(pipeH, NULL);

if (!status)

PrintWin32Error("ConnectNamedPipe");

printf("VideoStreamMgr: pipe connection is est.\n");

// wait till the playout buffer is near full ... or till startup delay? ... may not be accurate

if (WaitForSingleObject(evtStartStreamOut, INFINITE) ==

WAIT_FAILED)

PrintWin32Error("WaitForSingleObject");

CloseHandle(evtStartStreamOut);

printf("\nVideoStream Mgr: Event signaled at time %d\n",

GetTickCount());

DWORD nwrite;

DWORD exitCode;

do {

// get an item from the playout queue

status = pBufPool->frontitem(packet);

if (packet.seqnum == 0)

printf("%d ", packet.seqnum);

// write to the pipe ...

if (status) {

status = WriteFile(pipeH, (char*)&packet.payload[0],

packet.pl_size, &nwrite, NULL);

if (!status)

PrintWin32Error("WriteFile");

ASSERT(nwrite == packet.pl_size);

if (nwrite != packet.pl_size)

printf("requested to write %d bytes to pipe, wrote

%d\n", packet.pl_size, nwrite);

}

Sleep((DWORD)inv_transmit_rate);

GetExitCodeThread(h_DisplayManager, &exitCode);

} while (exitCode == STILL_ACTIVE);

printf("VideoStream Manager exiting\n");

return (0);

}

//****************************************************************

// DISPLAY MANAGER THREAD

// called by VideoStreamManager to ensure that named pipe is

// already established before this thread can connect to it

//****************************************************************

DWORD __stdcall DisplayManagerThread(LPVOID lpdwThreadParam) {

LPCTSTR PipeName = "\\\\.\\pipe\\MPEGSTREAMPIPE\0";

HRESULT hr;

// instantiate DirectX's Filter Graph Manager

IFilterGraph *pFG = NULL;

IGraphBuilder *pGB = NULL;

IMediaControl *pMC = NULL;

IMediaEvent *pME = NULL;

char ErrorMsg[512];

printf("Display Manager started ...\n");

// COM initialization, before you can start using DirectX stuff

hr = CoInitialize(NULL);

if (!SUCCEEDED(hr))

printf("CoInitialize failed\n");

hr = CoCreateInstance(CLSID_FilterGraph, NULL, CLSCTX_INPROC,

IID_IFilterGraph,

(void **)&pFG);

if (FAILED(hr)) {

printf("CoCreateInstance failed\n");

AMGetErrorText(hr, &ErrorMsg[0], sizeof(ErrorMsg));

printf("%s\n", ErrorMsg);

return 0;

}

CMediaType mediatype;

ZeroMemory(&mediatype, sizeof(AM_MEDIA_TYPE));

mediatype.majortype = MEDIATYPE_Stream;

mediatype.subtype = MEDIASUBTYPE_MPEG1System;

CPipeStream Stream(PipeName);

printf("pipe stream created\n");

CPipeReader* rdr = new CPipeReader(&Stream, &mediatype, &hr);

if (!SUCCEEDED(hr)) {

printf("CPipeReader...\n");

AMGetErrorText(hr, &ErrorMsg[0], sizeof(ErrorMsg));

printf("%s\n", ErrorMsg);

return 0;

}

rdr->AddRef();

// add the source filter

hr = pFG->AddFilter(rdr, L"Video Source");

if (FAILED(hr)) {

printf("AddFilter...\n");

AMGetErrorText(hr, &ErrorMsg[0], sizeof(ErrorMsg));

printf("%s\n", ErrorMsg);

return 0;

}

printf("source filter added\n");

hr = pFG->QueryInterface(IID_IGraphBuilder, (void **)&pGB);

if (FAILED(hr)) {

printf("QueryInterface(IGraphBuilder)...\n");

AMGetErrorText(hr, &ErrorMsg[0], sizeof(ErrorMsg));

printf("%s\n", ErrorMsg);

return 0;

}

if (pGB == NULL)

printf("graph builder null!\n");

CBasePin* pin = rdr->GetPin(0);

if (pin == NULL)

printf("null pin\n");

printf("Rendering...\n");

// render

hr = pGB->Render(pin);

if (FAILED(hr)) {

printf("Render...\n");

AMGetErrorText(hr, &ErrorMsg[0], sizeof(ErrorMsg));

printf("%s\n", ErrorMsg);

return 0;

}

printf("rendered\n");

pGB->Release();

// get ready to play

printf("querying IMediaControl\n");

hr = pFG->QueryInterface(IID_IMediaControl, (void **)&pMC);

if (FAILED(hr)) {

printf("QueryInterface(IMediaControl)...\n");

AMGetErrorText(hr, &ErrorMsg[0], sizeof(ErrorMsg));

printf("%s\n", ErrorMsg);

pMC->Release();

return 0;

}

hr = pFG->QueryInterface(IID_IMediaEvent, (void **)&pME);

if (FAILED(hr)) {

printf("QueryInterface(IMediaEvent)...\n");

AMGetErrorText(hr, &ErrorMsg[0], sizeof(ErrorMsg));

printf("%s\n", ErrorMsg);

pMC->Release();

return 0;

}

OAEVENT oEvent;

hr = pME->GetEventHandle(&oEvent);

if (SUCCEEDED(hr)) {

hr = pMC->Run();

}

if (SUCCEEDED(hr)) {

LONG levCode;

hr = pME->WaitForCompletion(INFINITE, &levCode);

}

pMC->Release();

pME->Release();

rdr->Release();

pFG->Release();

// deinitialize COM interfacing

CoUninitialize();

printf("Display thread quitting\n");

return(0);

}

//****************************************************************

// PING MANAGER THREAD

// - thread that manages PINGs to consistently determine avg RTD

// - expects termination from Main()

//****************************************************************

DWORD __stdcall PingManagerThread(LPVOID lpdwThreadParam) {

DWORD sleeptime = 5; // sleep duration in seconds

int val;

// perform a ping at every 'sleeptime' interval

while (1) {

if (val= pNetI->ping() >= 0)

rtd = val;

else break;

Sleep(sleeptime * 1000);

}

printf("Ping Manager Thread exiting\n");

return (0);

}

//****************************************************************

// PRINTWIN32ERROR -

// + formats error number into string and prints it out

// + only format for GetLastError()

//****************************************************************

void PrintWin32Error(char *pszErrorString) {

LPVOID lpMsgBuf;

printf("Error: returned from [%s]\n", pszErrorString);

if (FormatMessage(

FORMAT_MESSAGE_ALLOCATE_BUFFER |

FORMAT_MESSAGE_FROM_SYSTEM,

NULL,

GetLastError(),

MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),

(LPSTR)&lpMsgBuf,

0,

NULL))

printf("Error Message: %s\n", lpMsgBuf);

}

//****************************************************************

// Author: Hu Imm Lee

// Filename: PlayoutBufferPool.h

// File Created: March 28, 1998

//

// Comments: Contains class definition for PlayoutBufferPool

//

//****************************************************************

typedef struct UDP_packet UDP_PACKET;

typedef struct PoolItem POOL_ITEM;

// Playout Buffer class (a fixed-size circular queue)

class PlayoutBufferPool {

public:

// constructor and destructor

PlayoutBufferPool(int minbufsize);

~PlayoutBufferPool();

BOOL add(POOL_ITEM*);

BOOL frontitem(UDP_PACKET&);

POOL_ITEM* getbufferptr(int);

BOOL buf_full; // flag to indicate that buffer is full for a first time

HANDLE StartStreamFlag;

static LPCTSTR semaphore_full; // name of semaphores

static LPCTSTR semaphore_empty;

static LPCTSTR semaphore_mutex;

HANDLE h_full;

HANDLE h_empty;

HANDLE h_mutex;

private:

static int buffsize;

static POOL_ITEM* pBuf; // ptr to circular array

static int iFront; // index to front of buffer

static int iRear; // index to rear of buffer

};

//****************************************************************

// Author: Hu Imm Lee

// Filename: PlayoutBufferPool.cpp

// File Created: March 28, 1998

//

// Comments: Contains implementation of PlayoutBufferPool class

//

//****************************************************************

#include <windows.h>

#include <stdio.h>

#include <math.h>

#include "client.h"

#include "PlayoutBufferPool.h"

#define TIMEOUTVAL 3*1000 // 5 seconds

extern void PrintWin32Error(char *);

//****************************************************************

// Playout Buffer Pool Class Implementation Below

//****************************************************************

// public vars

LPCTSTR PlayoutBufferPool::semaphore_full = "S_Full";

LPCTSTR PlayoutBufferPool::semaphore_empty = "S_Empty";

LPCTSTR PlayoutBufferPool::semaphore_mutex = "S_Mutex";

// private vars

int PlayoutBufferPool::buffsize = 0;

POOL_ITEM* PlayoutBufferPool::pBuf = NULL;

int PlayoutBufferPool::iRear = 0;

int PlayoutBufferPool::iFront = 0;

PlayoutBufferPool::PlayoutBufferPool(int minbufsize):buf_full(FALSE) {

// rear starts off pointing to the front because buffer is empty

// make the buffer size divisible by 8 because the udp

// packet's seqnum is a max of 2^16 - 1, i.e., 2 bytes long

// for example: if minbufsize is 14, then buffsize should be 16

PlayoutBufferPool::buffsize = (int) ceil((double)minbufsize/8) * 8;

printf("PlayoutBufferPool initialization:\n\tbuffsize is: %d\n",

PlayoutBufferPool::buffsize);

// make empty circular buffer here

PlayoutBufferPool::pBuf = new

POOL_ITEM[PlayoutBufferPool::buffsize];

ZeroMemory(pBuf, buffsize); // zero out buffer for 'safety' precautions

// the 'full' semaphore

h_full = CreateSemaphore(NULL, buffsize, buffsize, semaphore_full);

// the 'empty' semaphore

h_empty = CreateSemaphore(NULL, 0, buffsize, semaphore_empty);

// the 'mutex' semaphore

h_mutex = CreateSemaphore(NULL, 1, 1, semaphore_mutex);

}

PlayoutBufferPool::~PlayoutBufferPool() {

if (PlayoutBufferPool::pBuf != NULL)

delete PlayoutBufferPool::pBuf;

PlayoutBufferPool::pBuf = NULL;

}

//****************************************************************

// ADD

// adds to the rear of the buffer pool

// effects

// + moves iRear forward

// + OVERWRITES the front item IF buffer gets full

// + makes deep copy when adding new item

//****************************************************************

BOOL PlayoutBufferPool::add(POOL_ITEM* pNewItem) {

// wait for the 'full' semaphore

// if 'full' reaches 0, it indicates the buffer is full

if (WaitForSingleObject(h_full, TIMEOUTVAL) == WAIT_FAILED) {

PrintWin32Error("Buffer add: WaitForSingleObject");

printf("last item: pkt%d\n", pBuf[iRear].packet.seqnum);

}

// after 'wait', the semaphore count is decremented

// mutual exclusion semaphore, to 'deny' access to 'frontitem'

if (WaitForSingleObject(h_mutex, INFINITE) == WAIT_FAILED) {

PrintWin32Error("Buffer add: WaitForSingleObject");

printf("last item: pkt%d\n", pBuf[iRear].packet.seqnum);

}

/** now, you're safe to write to the buffer **/

// check to see whether iRear is pointing to the last element of pBuf

BOOL rear_at_end = boolean(PlayoutBufferPool::iRear ==

(PlayoutBufferPool::buffsize-1) );

BOOL front_at_end = boolean(PlayoutBufferPool::iFront ==

(PlayoutBufferPool::buffsize-1));

// write to buffer

PlayoutBufferPool::pBuf[PlayoutBufferPool::iRear] = *pNewItem;

// iRear points ahead of the last element in the queue ...

// move iRear to index 0 if it's reached the edge of pBuf

PlayoutBufferPool::iRear = (rear_at_end) ? 0 :

PlayoutBufferPool::iRear+1;

if (buf_full == FALSE) // use a flag because you want to signal only once

if ( iRear == (buffsize - 2) ) {

buf_full = TRUE;

if (SetEvent(StartStreamFlag) == 0)

PrintWin32Error("PlayoutBufferPool: SetEvent");

}

// there is now at least one slot available for a read

// so do ReleaseSemaphore to increment the 'empty' count

if (ReleaseSemaphore(h_empty, 1, NULL) == FALSE)

PrintWin32Error("Buffer add: ReleaseSemaphore");

if (ReleaseSemaphore(h_mutex, 1, NULL) == FALSE)

PrintWin32Error("Buffer add: ReleaseSemaphore(mutex)");

return TRUE;

}

//****************************************************************

// FRONTITEM

// gets the front item of the playout buffer pool

// effects

// + copies front item's packet into parameter pFrontItem

// + zeroes it out

// + corrects PlayoutBufferPool::iFront

//

// returns

// + TRUE if buffer is not empty

// + FALSE if otherwise

//****************************************************************

BOOL PlayoutBufferPool::frontitem(UDP_PACKET& FrontItem) {

// wait for the 'empty' semaphore

// if 'empty' is 0, it indicates that the buffer is empty

// (so there's nothing to read)

if (WaitForSingleObject(h_empty, TIMEOUTVAL) == WAIT_FAILED) {

PrintWin32Error("Buffer frontitem: WaitForSingleObject");

printf("front item: pkt%d\n", pBuf[iFront].packet.seqnum);

}

// after 'wait', the semaphore count is decremented

// mutual exclusion semaphore, to 'deny' access to 'add'

if (WaitForSingleObject(h_mutex, INFINITE) == WAIT_FAILED) {

PrintWin32Error("Buffer frontitem: WaitForSingleObject");

printf("last item: pkt%d\n", pBuf[iRear].packet.seqnum);

}

/** Now, it's safe to read the buffer **/

// check to see whether iFront is pointing to the last buffer in buffer pool

BOOL front_at_end = boolean(iFront == buffsize-1);

FrontItem;

if (iFront != iRear) {

FrontItem = (PlayoutBufferPool::pBuf[iFront]).packet;

// zero out buffer after reading so that this slot can be used

// again by the 'add' function

ZeroMemory(&PlayoutBufferPool::pBuf[iFront],

sizeof(POOL_ITEM));

}

else return FALSE; // buffer virtually empty if both ptrs are equal

// move iFront to the next buffer in the pool

// if iFront has reached the end of the pool, revert back to index 0

iFront = (front_at_end) ? 0 : iFront + 1;

// there is now at least one slot free to be written to

// so do ReleaseSemaphore to increment the 'full' count

if (ReleaseSemaphore(h_full, 1, NULL) == FALSE)

PrintWin32Error("Buffer frontitem: ReleaseSemaphore");

if (ReleaseSemaphore(h_mutex, 1, NULL) == FALSE)

PrintWin32Error("Buffer frontitem: ReleaseSemaphore(mutex)");

return TRUE;

}

//

// GETBUFFERPTR

// + return an indexing pointer to the buffer

// + index based on packet sequence number

POOL_ITEM* PlayoutBufferPool::getbufferptr(int index) {

return &pBuf[index % buffsize];

}

//****************************************************************

// Author: Hu Imm Lee

// Filename: AccountingList.h

// File Created: March 29, 1998

//

// Class Definition of AccountingList

//****************************************************************

typedef struct AccountItem ACC_ITEM;

class AccountingList {

public:

AccountingList();

~AccountingList();

void add(ACC_ITEM*);

void remove(ACC_ITEM*);

ACC_ITEM* getfront();

void popitem(ACC_ITEM*);

private:

ACC_ITEM* pRear;

LPCRITICAL_SECTION lpCriticalSection;

};

//**********************************************************************

// Author: Hu Imm Lee

// Filename: AccountingList.cpp

// File Created: March 29, 1998

//

// Comments:

// + Contains class implementation of Accounting List, used to

// determine when to ask for a retransmitted packet

// + AccountingList is a doubly linked list ...

// eliminates search time

// + rear node points to front and vice versa

//**********************************************************************

#include <windows.h>

#include <stdio.h>

#include "client.h"

#include "AccountingList.h"

AccountingList::AccountingList():pRear(NULL) {

lpCriticalSection = new CRITICAL_SECTION;

InitializeCriticalSection(lpCriticalSection);

}

AccountingList::~AccountingList() {

}

//

// add to the end of the list

//

void AccountingList::add(ACC_ITEM* pNewItem) {

EnterCriticalSection(lpCriticalSection);

if (pRear == NULL) { // starts off empty

pNewItem->next = pNewItem; // point to itself

pNewItem->back = pNewItem; // point to itself

pRear = pNewItem;

}

else {

pNewItem->next = pRear->next; // point new node to front

pRear->next->back = pNewItem; // point front to the new node

pRear->next = pNewItem; // link new node to the current rear

pNewItem->back = pRear; // point new node back the preceding node

pRear = pNewItem;

}

LeaveCriticalSection(lpCriticalSection);

}

void AccountingList::remove(ACC_ITEM* pDeadItem) {

EnterCriticalSection(lpCriticalSection);

if ( (pRear->next == pRear) ) { // only one item left

if (pDeadItem != pRear)

printf("TRAGEDY in AccountingList::remove - item to be removed

does not exist\n");

pRear = NULL;

}

else if (pDeadItem == pRear) { // if it's last item

pRear->back->next = pRear->next;

pRear->next->back = pRear->back;

pRear = pRear->back;

}

else {

pDeadItem->back->next = pDeadItem->next;

pDeadItem->next->back = pDeadItem->back;

}

delete pDeadItem;

pDeadItem = NULL;

LeaveCriticalSection(lpCriticalSection);

}

//**********************************************************************

// GETFRONT

// effects

// + references the front item without modification

// returns

// + NULL if the list is empty

// + pointer to the front item, if otherwise

//**********************************************************************

ACC_ITEM* AccountingList::getfront() {

if (pRear == NULL)

return NULL;

return pRear->next;

}

//****************************************************************

// POPITEM

// 'pops' an item off the list

//

// effects

// + similar to REMOVE except that item is not deallocated

//****************************************************************

void AccountingList::popitem(ACC_ITEM* pPopItem) {

EnterCriticalSection(lpCriticalSection);

if ( (pRear->next == pRear) ) { // only one item left

if (pPopItem != pRear)

printf("TRAGEDY in AccountingList::popitem - item to be

popped does not exist\n");

pRear = NULL;

}

else if (pPopItem == pRear) { // if it's last item

pRear->back->next = pRear->next;

pRear->next->back = pRear->back;

pRear = pRear->back;

}

else {

pPopItem->back->next = pPopItem->next;

pPopItem->next->back = pPopItem->back;

}

LeaveCriticalSection(lpCriticalSection);

}

//****************************************************************

// Author: Hu Imm Lee

// Filename: cNetInterface.h

// File Created: February 12, 1998

//

// Contains class definition for client's NetInterface

//

// Note: this class is slightly different from the server's

//****************************************************************

typedef struct UDP_packet UDP_PACKET;

typedef struct TCP_packet TCP_PACKET;

class NetInterface {

public:

NetInterface();

~NetInterface();

BOOL netinit();

BOOL sendpacket(UDP_PACKET* pPacket);

BOOL recvpacket(UDP_PACKET& packet);

BOOL tcpsend(TCP_PACKET* pPacket);

BOOL tcprecv(TCP_PACKET& packet);

BOOL sendping(PING_PACKET* pPacket);

BOOL recvping(PING_PACKET& packet);

int ping();

private:

static SOCKET tcpd;

static SOCKET udpd;

static SOCKET pingd;

struct sockaddr_in udpaddr; // address of client host

struct sockaddr_in servaddr;

struct sockaddr_in pingservaddr;

};

//****************************************************************

// Author: Hu Imm Lee

// Filename: cNetInterface.cpp

// File Created: February 12, 1998

//

// Comments:

// Contains implementation of the client's

// NetInterface class

//****************************************************************

#include <stdio.h>

#include <windows.h>

#include "client.h"

#include "cNetInterface.h"

//****************************************************************

// NETWORK INTERFACE CLASS IMPLEMENTATION BELOW

//****************************************************************

extern char hostaddr[];

SOCKET NetInterface::tcpd = INVALID_SOCKET;

SOCKET NetInterface::udpd = INVALID_SOCKET;

SOCKET NetInterface::pingd = INVALID_SOCKET;

NetInterface::NetInterface() {

WSADATA WsaData;

WORD wVersionRequested;

int err;

// start initialization of WinSock API

wVersionRequested = MAKEWORD(1,1);

if ( (err = WSAStartup(wVersionRequested, &WsaData)) != 0 ) {

printf("Could not find WinSock 1.1 DLL\n");

exit(0);

}

}

NetInterface::~NetInterface() {

closesocket(udpd);

closesocket(tcpd);

WSACleanup();

}

//****************************************************************

// NETINIT

// initializes the network interface

// effects

// + creates UDP socket and allocates handle to

// NetInterface::udpd

// + binds UDP socket to a "well-known" port

// + creates TCP socket and connect to server

// returns

// + TRUE if UDP & TCP connection startups successful

// + FALSE on failure of any components

//****************************************************************

BOOL NetInterface::netinit() {

struct sockaddr_in udpaddr;

struct sockaddr_in tcpservaddr;

struct sockaddr_in pingaddr;

// UDP/IP setup

if ( (NetInterface::udpd = socket(AF_INET, SOCK_DGRAM,

IPPROTO_UDP)) == INVALID_SOCKET ) {

printf("SOCKET for udp ERR[%d]\n", WSAGetLastError() );

return FALSE;

}

// udp bind

ZeroMemory( (char *)&udpaddr, sizeof(udpaddr) );

udpaddr.sin_family = AF_INET;

udpaddr.sin_addr.s_addr = htonl(INADDR_ANY);

udpaddr.sin_port = htons(MYUDPPORT);

if ( bind(NetInterface::udpd, (struct sockaddr *)&udpaddr,

sizeof(udpaddr)) == SOCKET_ERROR ) {

printf("BIND for udp ERR[%d]\n", WSAGetLastError() );

return FALSE;

}

// allocate server's UDP address for later use when sending UDP packets

ZeroMemory(&servaddr, sizeof(struct sockaddr_in));

servaddr.sin_family = AF_INET;

servaddr.sin_port = htons(UDPSERVERPORT);

servaddr.sin_addr.s_addr = inet_addr(hostaddr);

// TCP/IP setup

if ( (NetInterface::tcpd = socket(AF_INET, SOCK_STREAM, 0))

== INVALID_SOCKET ) {

printf("SOCKET for tcp ERR[%d]\n", WSAGetLastError() );

return FALSE;

}

// connect

ZeroMemory( (char *)&tcpservaddr, sizeof(tcpservaddr) );

tcpservaddr.sin_family = AF_INET;

tcpservaddr.sin_addr.s_addr = inet_addr(hostaddr);

tcpservaddr.sin_port = htons(TCPSERVERPORT);

if ( connect(NetInterface::tcpd, (struct sockaddr *)&tcpservaddr,

sizeof(tcpservaddr)) == SOCKET_ERROR ) {

printf("CONNECT ERR[%d]\n", WSAGetLastError() );

return FALSE;

}

// PING service port setup

if ( (NetInterface::pingd = socket(AF_INET, SOCK_DGRAM,

IPPROTO_UDP)) == INVALID_SOCKET ) {

printf("SOCKET for udp ERR[%d]\n", WSAGetLastError() );

return FALSE;

}

// ping bind

ZeroMemory( (char *)&pingaddr, sizeof(pingaddr) );

pingaddr.sin_family = AF_INET;

pingaddr.sin_addr.s_addr = htonl(INADDR_ANY);

pingaddr.sin_port = htons(MYPINGPORT);

if ( bind(NetInterface::pingd, (struct sockaddr *)&pingaddr,

sizeof(pingaddr)) == SOCKET_ERROR ) {

printf("BIND for udp ERR[%d]\n", WSAGetLastError() );

return FALSE;

}

// allocate server's PING service address for later use when sending PING

// packets

ZeroMemory(&pingservaddr, sizeof(struct sockaddr_in));

pingservaddr.sin_family = AF_INET;

pingservaddr.sin_port = htons(PINGSERVERPORT);

pingservaddr.sin_addr.s_addr = inet_addr(hostaddr);

return TRUE;

}

//****************************************************************

// SENDPACKET -

// effects

// sends a datagram via UDP

// returns

// + TRUE is successful

// + FALSE is failure

//****************************************************************

BOOL NetInterface::sendpacket(UDP_PACKET* pPacket) {

int nsend;

nsend = sendto( NetInterface::udpd, (char *)pPacket,

sizeof(UDP_PACKET), 0,

(struct sockaddr *)&servaddr, sizeof(struct sockaddr_in) );

if (nsend == SOCKET_ERROR) {

printf("NetInterface::sendpacket ERR[%d]\n",

WSAGetLastError() );

return FALSE;

}

return TRUE;

}

//****************************************************************

// RECVPACKET

// effects

// + receives datagram via UDP

//****************************************************************

BOOL NetInterface::recvpacket(UDP_PACKET& packet) {

int nrecv;

struct sockaddr_in address;

int addrlen = sizeof(address);

nrecv = recvfrom( NetInterface::udpd, (char *)&packet,

sizeof(UDP_PACKET), 0,

(struct sockaddr *)&address, &addrlen );

if (nrecv == SOCKET_ERROR) {

printf("NetInterface::recvpacket ERR[%d]\n",

WSAGetLastError() );

return FALSE;

}

return TRUE;

}

//****************************************************************

// TCPSEND -

// sends packet via TCP

//****************************************************************

BOOL NetInterface::tcpsend(TCP_PACKET* pPacket) {

int nsend;

nsend = send( NetInterface::tcpd, (char *)pPacket, sizeof(TCP_PACKET),

0);

if (nsend == SOCKET_ERROR) {

printf("NetInterface::tcpsend ERR[%d]\n", WSAGetLastError() );

return FALSE;

}

return TRUE;

}

//****************************************************************

// TCPRECV -

// receives packet via TCP

//****************************************************************

BOOL NetInterface::tcprecv(TCP_PACKET& packet) {

int nrecv;

nrecv = recv( NetInterface::tcpd, (char *)&packet, sizeof(TCP_PACKET),

0);

if (nrecv == SOCKET_ERROR) {

printf("NetInterface::tcprecv ERR[%d]\n", WSAGetLastError() );

return FALSE;

}

return TRUE;

}

BOOL NetInterface::sendping(PING_PACKET* pPacket) {

int nsend;

nsend = sendto( NetInterface::pingd, (char *)pPacket,

sizeof(PING_PACKET), 0,

(struct sockaddr *)&pingservaddr, sizeof(struct sockaddr_in) );

if (nsend == SOCKET_ERROR) {

printf("NetInterface::sendping ERR[%d]\n",

WSAGetLastError() );

return FALSE;

}

return TRUE;

}

BOOL NetInterface::recvping(PING_PACKET& packet) {

int nrecv;

struct sockaddr_in address;

int addrlen = sizeof(address);

nrecv = recvfrom( NetInterface::pingd, (char *)&packet,

sizeof(PING_PACKET), 0,

(struct sockaddr *)&address, &addrlen );

if (nrecv == SOCKET_ERROR) {

printf("NetInterface::recvping ERR[%d]\n", WSAGetLastError() );

return FALSE;

}

return TRUE;

}

// PING

// returns round-trip-delay in milliseconds

//

int NetInterface::ping() {

PING_PACKET pingreq; // request packet

PING_PACKET pingres; // response packet

DWORD timenow;

DWORD accumulated_duration = 0;

int pingcount = 5;

int actualcount = 0; // number of ping packets actually received (after

// accounting for loss)

WORD seqnum = 0;

fd_set fd_read;

FD_ZERO(&fd_read);

FD_SET(pingd, &fd_read);

struct timeval tv;

tv.tv_sec = 3; // wait for # seconds before timing out on the read

int select_status;

for (int i=0; i < pingcount; i++) { // loop #pingcount times

pingreq.pkt_type = CONTROL;

pingreq.seqnum = seqnum;

pingreq.control = PINGREQ;

pingreq.timestamp = GetTickCount();

sendping(&pingreq);

if ( (select_status = select(pingd+1, &fd_read,

NULL, NULL, &tv)) > 0 ) {

recvping(pingres);

actualcount++;

}

else if (select_status == SOCKET_ERROR) {

printf("select: socket error [%d]\n", WSAGetLastError());

return -1;

}

else continue; // timeout ... indicating possibly lost pkt

// calculate round-trip-delay

timenow = GetTickCount();

accumulated_duration += timenow - pingres.timestamp;

seqnum++;

}

return (accumulated_duration/actualcount); // avg roundtrip

}

//****************************************************************

// Author: Hu Imm Lee

// Filename: CPipeStream.h

//

// Comments:

// + Contains class definition & implemention for CPipeStream

// + CPipsStream is a derived class of CAsyncStream

// + CPipeStream reads data from pipe into stream

//****************************************************************

#include <stdlib.h>

#include <malloc.h>

#define MAXBUF 1024*1024 // 0.5 MB RAM to be allocated to this stream

#define BLOCK MAXBUF/2 // block size the window should slide by

extern void PrintWin32Error(char*);

class CPipeStream : public CAsyncStream {

public:

CPipeStream(LPCTSTR pipename) : m_llPosition(0), m_llAvailable(0),

offset(0) {

OpenPipe(pipename);

// allocate a sliding window

m_pbData = (PBYTE)malloc (MAXBUF);

}

HRESULT OpenPipe(LPCTSTR name) {

HRESULT hr;

// wait (indefinitely) for pipe to become available

BOOL status = WaitNamedPipe(

name,

NMPWAIT_WAIT_FOREVER);

ASSERT(status);

if (!status)

hr = E_FAIL;

// open & connect to the pipe

if (status == TRUE) {

hpipe = CreateFile(name,

GENERIC_READ, // read access only (data stream

// inbound)

0, // object cannot be shared

NULL, // security attributes -- handle cannot be inherited

OPEN_EXISTING, // actions to take if pipe exists

FILE_ATTRIBUTE_NORMAL, // flags and attributes

NULL // template file

);

ASSERT(hpipe != INVALID_HANDLE_VALUE);

if (hpipe == INVALID_HANDLE_VALUE)

hr = E_FAIL;

}

return S_OK;

}

HRESULT SetPointer(LONGLONG llPos) {

if (llPos < 0 || llPos-offset < 0 || llPos-offset > m_llAvailable) {

printf("setpos failed: req(%ld) ", llPos);

printf("actual(%ld) ", llPos-offset);

printf("available(%ld)\n", m_llAvailable);

return S_FALSE;

}

else {

m_llPosition = llPos - offset;

return S_OK;

}

return S_OK;

}

HRESULT Read(PBYTE pbBuffer, DWORD dwBytesToRead,

BOOL bAlign, LPDWORD pdwBytesRead) {

BOOL status;

LONGLONG dwBytesLeftToRead = (m_llAvailable-m_llPosition

< dwBytesToRead) ?

dwBytesToRead - m_llAvailable + m_llPosition // bytes to read

// from the pipe

: 0; // nothing to read fr pipe //dwBytesToRead;

DWORD bytesread;

CAutoLock lck(&m_csLock);

*pdwBytesRead = 0;

// Read from pipe (it blocks if need be) into the allocated window

while ( dwBytesLeftToRead != 0 ) {

// slide the window

if (MAXBUF-m_llAvailable < dwBytesLeftToRead) {

printf("sliding window\n");

PBYTE newdata = (PBYTE) malloc(MAXBUF);

memcpy(&newdata[0], &m_pbData[BLOCK],

MAXBUF-BLOCK );

free(m_pbData);

m_pbData = newdata;

if (m_pbData == NULL) {

printf("realloc failed!\n");

break;

}

// remember to change the offset!

offset += BLOCK;

printf("offset(%ld)\n", offset);

m_llAvailable -= BLOCK;

m_llPosition -= BLOCK;

}

status = ReadFile(hpipe,

&m_pbData[m_llAvailable],

(DWORD)dwBytesLeftToRead, &bytesread, NULL);

if (!status) {

PrintWin32Error("ReadFile");

printf("m_llAvailable(%ld) ", m_llAvailable);

printf("m_llPosition(%ld) ", m_llPosition);

printf("dwBytesLeftToRead(%ld) ",

dwBytesLeftToRead);

printf("sizeof[m_pbData] (%ld)\n",

_msize(m_pbData));

break;

}

// safety check in case pipe is already empty (because

// transmission has already ended)

if (bytesread == 0) {

break;

}

dwBytesLeftToRead -= bytesread;

m_llAvailable += bytesread;

}

memcpy(pbBuffer, &m_pbData[m_llPosition], dwBytesToRead-

(DWORD)dwBytesLeftToRead);

*pdwBytesRead = dwBytesToRead-

(DWORD)dwBytesLeftToRead;

m_llPosition += dwBytesToRead-(DWORD)dwBytesLeftToRead;

return S_OK;

}

LONGLONG Size(LONGLONG *pSizeAvailable) {

*pSizeAvailable = min(m_llAvailable+offset, MAXBUF+offset);

return INFINITE; // this is live data, so don't know how long

}

DWORD Alignment() {

return 1;

}

void Lock() {

m_csLock.Lock();

}

void Unlock() {

m_csLock.Unlock();

}

private:

HANDLE hpipe;

LONGLONG m_llPosition;

LONGLONG m_llAvailable; // data so far available to the stream

PBYTE m_pbData; // ptr to a sliding window

CCritSec m_csLock;

LONGLONG offset;

};

class CPipeReader : public CAsyncReader {

public:

STDMETHODIMP Register() {

return S_OK;

}

STDMETHODIMP Unregister() {

return S_OK;

}

CPipeReader(CPipeStream* pStream, CMediaType* pmt, HRESULT*

phr) :

CAsyncReader(NAME("Pipe Reader"), NULL, pStream, phr) {

m_mt = *pmt;

}

};

//****************************************************************

// Filename: CAsyncReader.h

// Copyright (c) 1992 - 1997 Microsoft Corporation. All Rights Reserved.

//****************************************************************

#ifndef __ASYNCRDR_H__

#define __ASYNCRDR_H__

//

// AsyncRdr

//

// Defines an IO source filter.

//

// This filter (CAsyncReader) supports IBaseFilter and IFileSourceFilter interfaces from the

// filter object itself. It has a single output pin (CAsyncOutputPin)

// which supports IPin and IAsyncReader.

//

// This filter is essentially a wrapper for the CAsyncFile class that does

// all the work.

//

// the filter class (defined below)

class CAsyncReader;

// the output pin class

class CAsyncOutputPin

: public IAsyncReader,

public CBasePin

{

protected:

CAsyncReader* m_pReader;

CAsyncIo * m_pIo;

// This is set every time we're asked to return an IAsyncReader

// interface

// This allows us to know if the downstream pin can use

// this transport, otherwise we can hook up to thinks like the

// dump filter and nothing happens

BOOL m_bQueriedForAsyncReader;

HRESULT InitAllocator(IMemAllocator **ppAlloc);

public:

// constructor and destructor

CAsyncOutputPin(

HRESULT * phr,

CAsyncReader *pReader,

CAsyncIo *pIo,

CCritSec * pLock);

~CAsyncOutputPin();

// --- CUnknown ---

// need to expose IAsyncReader

DECLARE_IUNKNOWN

STDMETHODIMP NonDelegatingQueryInterface(REFIID, void**);

// --- CBasePin methods ---

// return the types we prefer - this will return the known

// file type

HRESULT GetMediaType(int iPosition, CMediaType *pMediaType);

// can we support this type?

HRESULT CheckMediaType(const CMediaType* pType);

// Clear the flag so we see if IAsyncReader is queried for

HRESULT CheckConnect(IPin *pPin)

{

m_bQueriedForAsyncReader = FALSE;

return CBasePin::CheckConnect(pPin);

}

// See if it was asked for

HRESULT CompleteConnect(IPin *pReceivePin)

{

if (m_bQueriedForAsyncReader) {

return CBasePin::CompleteConnect(pReceivePin);

} else {

#ifdef VFW_E_NO_TRANSPORT

return VFW_E_NO_TRANSPORT;

#else

return E_FAIL;

#endif

}

}

// --- IAsyncReader methods ---

// pass in your preferred allocator and your preferred properties.

// method returns the actual allocator to be used. Call GetProperties

// on returned allocator to learn alignment and prefix etc chosen.

// this allocator will be not be committed and decommitted by

// the async reader, only by the consumer.

STDMETHODIMP RequestAllocator(

IMemAllocator* pPreferred,

ALLOCATOR_PROPERTIES* pProps,

IMemAllocator ** ppActual);

// queue a request for data.

// media sample start and stop times contain the requested absolute

// byte position (start inclusive, stop exclusive).

// may fail if sample not obtained from agreed allocator.

// may fail if start/stop position does not match agreed alignment.

// samples allocated from source pin's allocator may fail

// GetPointer until after returning from WaitForNext.

STDMETHODIMP Request(

IMediaSample* pSample,

DWORD dwUser); // user context

// block until the next sample is completed or the timeout occurs.

// timeout (millisecs) may be 0 or INFINITE. Samples may not

// be delivered in order. If there is a read error of any sort, a

// notification will already have been sent by the source filter,

// and STDMETHODIMP will be an error.

STDMETHODIMP WaitForNext(

DWORD dwTimeout,

IMediaSample** ppSample, // completed sample

DWORD * pdwUser); // user context

// sync read of data. Sample passed in must have been acquired from

// the agreed allocator. Start and stop position must be aligned.

// equivalent to a Request/WaitForNext pair, but may avoid the

// need for a thread on the source filter.

STDMETHODIMP SyncReadAligned(

IMediaSample* pSample);

// sync read. works in stopped state as well as run state.

// need not be aligned. Will fail if read is beyond actual total

// length.

STDMETHODIMP SyncRead(

LONGLONG llPosition, // absolute file position

LONG lLength, // nr bytes required

BYTE* pBuffer); // write data here

// return total length of stream, and currently available length.

// reads for beyond the available length but within the total length will

// normally succeed but may block for a long period.

STDMETHODIMP Length(

LONGLONG* pTotal,

LONGLONG* pAvailable);

// cause all outstanding reads to return, possibly with a failure code

// (VFW_E_TIMEOUT) indicating they were cancelled.

// these are defined on IAsyncReader and IPin

STDMETHODIMP BeginFlush(void);

STDMETHODIMP EndFlush(void);

};

//

// The filter object itself. Supports IBaseFilter through

// CBaseFilter and also IFileSourceFilter directly in this object

class CAsyncReader : public CBaseFilter

{

protected:

// filter-wide lock

CCritSec m_csFilter;

// all i/o done here

CAsyncIo m_Io;

// our output pin

CAsyncOutputPin m_OutputPin;

// Type we think our data is

CMediaType m_mt;

public:

// construction / destruction

CAsyncReader(

TCHAR *pName,

LPUNKNOWN pUnk,

CAsyncStream *pStream,

HRESULT *phr);

~CAsyncReader();

// --- CBaseFilter methods ---

int GetPinCount();

CBasePin *GetPin(int n);

// --- Access our media type

const CMediaType *LoadType() const

{

return &m_mt;

}

};

#endif //__ASYNCRDR_H__

//****************************************************************

// Filename: CAsyncReader.cpp

// Copyright (c) 1992 - 1997 Microsoft Corporation. All Rights Reserved.

//

// Implementation of Io source filter methods and output pin methods for

// CAsyncReader and CAsyncOutputPin

//

//****************************************************************

#include <streams.h>

#include "CAsyncIo.h"

#include "CAsyncReader.h"

// --- CAsyncOutputPin implementation ---

CAsyncOutputPin::CAsyncOutputPin(

HRESULT * phr,

CAsyncReader *pReader,

CAsyncIo *pIo,

CCritSec * pLock)

: CBasePin(

NAME("Async output pin"),

pReader,

pLock,

phr,

L"Output",

PINDIR_OUTPUT),

m_pReader(pReader),

m_pIo(pIo)

{

}

CAsyncOutputPin::~CAsyncOutputPin()

{

}

STDMETHODIMP

CAsyncOutputPin::NonDelegatingQueryInterface(REFIID riid, void** ppv)

{

CheckPointer(ppv,E_POINTER);

if (riid == IID_IAsyncReader) {

m_bQueriedForAsyncReader = TRUE;

return GetInterface((IAsyncReader*) this, ppv);

} else {

return CBasePin::NonDelegatingQueryInterface(riid, ppv);

}

}

HRESULT

CAsyncOutputPin::GetMediaType(int iPosition, CMediaType *pMediaType)

{

if (iPosition < 0) {

return E_INVALIDARG;

}

if (iPosition > 0) {

return VFW_S_NO_MORE_ITEMS;

}

*pMediaType = *m_pReader->LoadType();

return S_OK;

}

HRESULT

CAsyncOutputPin::CheckMediaType(const CMediaType* pType)

{

CAutoLock lck(m_pLock);

/* We treat MEDIASUBTYPE_NULL subtype as a wild card */

if ((m_pReader->LoadType()->majortype == pType->majortype) &&

(m_pReader->LoadType()->subtype == MEDIASUBTYPE_NULL ||

m_pReader->LoadType()->subtype == pType->subtype)) {

return S_OK;

}

return S_FALSE;

}

HRESULT

CAsyncOutputPin::InitAllocator(IMemAllocator **ppAlloc)

{

HRESULT hr = NOERROR;

*ppAlloc = NULL;

CMemAllocator *pMemObject = NULL;

/* Create a default memory allocator */

pMemObject = new CMemAllocator(NAME("Base memory allocator"),NULL, &hr);

if (pMemObject == NULL) {

return E_OUTOFMEMORY;

}

if (FAILED(hr)) {

delete pMemObject;

return hr;

}

/* Get a reference counted IID_IMemAllocator interface */

hr = pMemObject->QueryInterface(IID_IMemAllocator,(void **)ppAlloc);

if (FAILED(hr)) {

delete pMemObject;

return E_NOINTERFACE;

}

ASSERT(*ppAlloc != NULL);

return NOERROR;

}

// we need to return an addrefed allocator, even if it is the preferred

// one, since he doesn't know whether it is the preferred one or not.

STDMETHODIMP

CAsyncOutputPin::RequestAllocator(

IMemAllocator* pPreferred,

ALLOCATOR_PROPERTIES* pProps,

IMemAllocator ** ppActual)

{

// we care about alignment but nothing else

if (!pProps->cbAlign || !m_pIo->IsAligned(pProps->cbAlign)) {

m_pIo->Alignment(&pProps->cbAlign);

}

ALLOCATOR_PROPERTIES Actual;

HRESULT hr;

if (pPreferred) {

hr = pPreferred->SetProperties(pProps, &Actual);

if (SUCCEEDED(hr) && m_pIo->IsAligned(Actual.cbAlign)) {

pPreferred->AddRef();

*ppActual = pPreferred;

return S_OK;

}

}

// create our own allocator

IMemAllocator* pAlloc;

hr = InitAllocator(&pAlloc);

if (FAILED(hr)) {

return hr;

}

//...and see if we can make it suitable

hr = pAlloc->SetProperties(pProps, &Actual);

if (SUCCEEDED(hr) && m_pIo->IsAligned(Actual.cbAlign)) {

// we need to release our refcount on pAlloc, and addref

// it to pass a refcount to the caller - this is a net nothing.

*ppActual = pAlloc;

return S_OK;

}

// failed to find a suitable allocator

pAlloc->Release();

// if we failed because of the IsAligned test, the error code will

// not be failure

if (SUCCEEDED(hr)) {

hr = VFW_E_BADALIGN;

}

return hr;

}

// queue an aligned read request. call WaitForNext to get

// completion.

STDMETHODIMP

CAsyncOutputPin::Request(

IMediaSample* pSample,

DWORD dwUser) // user context

{

REFERENCE_TIME tStart, tStop;

HRESULT hr = pSample->GetTime(&tStart, &tStop);

if (FAILED(hr)) {

return hr;

}

LONGLONG llPos = tStart / UNITS;

LONG lLength = (LONG) ((tStop - tStart) / UNITS);

LONGLONG llTotal;

LONGLONG llAvailable;

hr = m_pIo->Length(&llTotal, &llAvailable);

if (llPos + lLength > llTotal) {

// the end needs to be aligned, but may have been aligned

// on a coarser alignment.

LONG lAlign;

m_pIo->Alignment(&lAlign);

llTotal = (llTotal + lAlign -1) & ~(lAlign-1);

if (llPos + lLength > llTotal) {

lLength = (LONG) (llTotal - llPos);

// must be reducing this!

ASSERT((llTotal * UNITS) <= tStop);

tStop = llTotal * UNITS;

pSample->SetTime(&tStart, &tStop);

}

}



BYTE* pBuffer;

hr = pSample->GetPointer(&pBuffer);

if (FAILED(hr)) {

return hr;

}

return m_pIo->Request(

llPos,

lLength,

TRUE,

pBuffer,

(LPVOID)pSample,

dwUser);

}

// sync-aligned request. just like a request/waitfornext pair.

STDMETHODIMP

CAsyncOutputPin::SyncReadAligned(

IMediaSample* pSample)

{

REFERENCE_TIME tStart, tStop;

HRESULT hr = pSample->GetTime(&tStart, &tStop);

if (FAILED(hr)) {

return hr;

}

LONGLONG llPos = tStart / UNITS;

LONG lLength = (LONG) ((tStop - tStart) / UNITS);

LONGLONG llTotal;

LONGLONG llAvailable;

hr = m_pIo->Length(&llTotal, &llAvailable);

if (llPos + lLength > llTotal) {

// the end needs to be aligned, but may have been aligned

// on a coarser alignment.

LONG lAlign;

m_pIo->Alignment(&lAlign);

llTotal = (llTotal + lAlign -1) & ~(lAlign-1);

if (llPos + lLength > llTotal) {

lLength = (LONG) (llTotal - llPos);

// must be reducing this!

ASSERT((llTotal * UNITS) <= tStop);

tStop = llTotal * UNITS;

pSample->SetTime(&tStart, &tStop);

}

}



BYTE* pBuffer;

hr = pSample->GetPointer(&pBuffer);

if (FAILED(hr)) {

return hr;

}

LONG cbActual;

hr = m_pIo->SyncReadAligned(

llPos,

lLength,

pBuffer,

&cbActual

);

pSample->SetActualDataLength(cbActual);

return hr;

}

//

// collect the next ready sample

STDMETHODIMP

CAsyncOutputPin::WaitForNext(

DWORD dwTimeout,

IMediaSample** ppSample, // completed sample

DWORD * pdwUser) // user context

{

LONG cbActual;

IMediaSample* pSample;

HRESULT hr = m_pIo->WaitForNext(

dwTimeout,

(LPVOID*) &pSample,

pdwUser,

&cbActual

);

if (SUCCEEDED(hr)) {

pSample->SetActualDataLength(cbActual);

}

*ppSample = pSample;

return hr;

}

//

// synchronous read that need not be aligned.

STDMETHODIMP

CAsyncOutputPin::SyncRead(

LONGLONG llPosition, // absolute Io position

LONG lLength, // nr bytes required

BYTE* pBuffer) // write data here

{

return m_pIo->SyncRead(llPosition, lLength, pBuffer);

}

// return the length of the file, and the length currently

// available locally. We only support locally accessible files,

// so they are always the same

STDMETHODIMP

CAsyncOutputPin::Length(

LONGLONG* pTotal,

LONGLONG* pAvailable)

{

HRESULT hr = m_pIo->Length(pTotal, pAvailable);

return hr;

}

STDMETHODIMP

CAsyncOutputPin::BeginFlush(void)

{

return m_pIo->BeginFlush();

}

STDMETHODIMP

CAsyncOutputPin::EndFlush(void)

{

return m_pIo->EndFlush();

}



// --- CAsyncReader implementation ---

#pragma warning(disable:4355)

CAsyncReader::CAsyncReader(

TCHAR *pName,

LPUNKNOWN pUnk,

CAsyncStream *pStream,

HRESULT *phr)

: CBaseFilter(

pName,

pUnk,

&m_csFilter,

CLSID_AsyncReader,

NULL

),

m_OutputPin(

phr,

this,

&m_Io,

&m_csFilter),

m_Io(pStream)

{

}

CAsyncReader::~CAsyncReader()

{

}

int

CAsyncReader::GetPinCount()

{

return 1;

}

CBasePin *

CAsyncReader::GetPin(int n)

{

if ((GetPinCount() > 0) &&

(n == 0)) {

return &m_OutputPin;

} else {

return NULL;

}

}

//****************************************************************

// Filename: CAsyncIo.h

// Copyright (c) 1992 - 1997 Microsoft Corporation. All Rights Reserved.

//****************************************************************

#ifndef __ASYNCIO_H__

#define __ASYNCIO_H__

//

// definition of CAsyncFile object that performs file access. It provides

// asynchronous, unbuffered, aligned reads from a file, using a worker thread

// on win95 and potentially overlapped i/o if available.

// !!! Need to use real overlapped i/o if available

// currently only uses worker thread, not overlapped i/o

class CAsyncIo;

class CAsyncStream;

//

// Model the stream we read from based on a file-like interface

//

class CAsyncStream

{

public:

virtual ~CAsyncStream() {};

virtual HRESULT SetPointer(LONGLONG llPos) = 0;

virtual HRESULT Read(PBYTE pbBuffer,

DWORD dwBytesToRead,

BOOL bAlign,

LPDWORD pdwBytesRead) = 0;

virtual LONGLONG Size(LONGLONG *pSizeAvailable = NULL) = 0;

virtual DWORD Alignment() = 0;

virtual void Lock() = 0;

virtual void Unlock() = 0;

};

// represents a single request and performs the i/o. Can be called on either

// worker thread or app thread, but must hold pcsFile across file accesses.

// (ie across SetFilePointer/ReadFile pairs)

class CAsyncRequest

{

CAsyncIo *m_pIo;

CAsyncStream *m_pStream;

LONGLONG m_llPos;

BOOL m_bAligned;

LONG m_lLength;

BYTE* m_pBuffer;

LPVOID m_pContext;

DWORD m_dwUser;

HRESULT m_hr;

public:

// init the params for this request. Issue the i/o

// if overlapped i/o is possible.

HRESULT Request(

CAsyncIo *pIo,

CAsyncStream *pStream,

LONGLONG llPos,

LONG lLength,

BOOL bAligned,

BYTE* pBuffer,

LPVOID pContext, // filter's context

DWORD dwUser); // downstream filter's context

// issue the i/o if not overlapped, and block until i/o complete.

// returns error code of file i/o

HRESULT Complete();

// cancels the i/o. blocks until i/o is no longer pending

HRESULT Cancel()

{

return S_OK;

};

// accessor functions

LPVOID GetContext()

{

return m_pContext;

};

DWORD GetUser()

{

return m_dwUser;

};

HRESULT GetHResult() {

return m_hr;

};

// we set m_lLength to the actual length

LONG GetActualLength() {

return m_lLength;

};

LONGLONG GetStart() {

return m_llPos;

};

};

typedef CGenericList<CAsyncRequest> CRequestList;

// this class needs a worker thread, but the ones defined in classes\base

// are not suitable (they assume you have one message sent or posted per

// request, whereas here for efficiency we want just to set an event when

// there is work on the queue).

//

// we create CAsyncRequest objects and queue them on m_listWork. The worker

// thread pulls them off, completes them and puts them on m_listDone.

// The events m_evWork and m_evDone are set when the corresponding lists are

// not empty.

//

// Synchronous requests are done on the caller thread. These should be

// synchronised by the caller, but to make sure we hold m_csFile across

// the SetFilePointer/ReadFile code.

//

// Flush by calling BeginFlush. This rejects all further requests (by

// setting m_bFlushing within m_csLists), cancels all requests and moves them

// to the done list, and sets m_evDone to ensure that no WaitForNext operations

// will block. Call EndFlush to cancel this state.

//

// we support unaligned calls to SyncRead. This is done by opening the file

// twice if we are using unbuffered i/o (m_dwAlign > 1).

// !!!fix this to buffer on top of existing file handle?

class CAsyncIo

{

CCritSec m_csReader;

CAsyncStream *m_pStream;

CCritSec m_csLists; // locks access to the list and events

BOOL m_bFlushing; // true if between BeginFlush/EndFlush

CRequestList m_listWork;

CRequestList m_listDone;

CAMEvent m_evWork; // set when list is not empty

CAMEvent m_evDone;

// for correct flush behaviour: all protected by m_csLists

LONG m_cItemsOut; // nr of items not on listDone or listWork

BOOL m_bWaiting; // TRUE if someone waiting for m_evAllDone

CAMEvent m_evAllDone; // signal when m_cItemsOut goes to 0 if m_cWaiting

CAMEvent m_evStop; // set when thread should exit

HANDLE m_hThread;

LONGLONG Size() {

ASSERT(m_pStream != NULL);

return m_pStream->Size();

};

// start the thread

HRESULT StartThread(void);

// stop the thread and close the handle

HRESULT CloseThread(void);

// manage the list of requests. hold m_csLists and ensure

// that the (manual reset) event hevList is set when things on

// the list but reset when the list is empty.

// returns null if list empty

CAsyncRequest* GetWorkItem();

// get an item from the done list

CAsyncRequest* GetDoneItem();

// put an item on the work list

HRESULT PutWorkItem(CAsyncRequest* pRequest);

// put an item on the done list

HRESULT PutDoneItem(CAsyncRequest* pRequest);

// called on thread to process any active requests

void ProcessRequests(void);

// initial static thread proc calls ThreadProc with DWORD

// param as this

static DWORD InitialThreadProc(LPVOID pv) {

CAsyncIo * pThis = (CAsyncIo*) pv;

return pThis->ThreadProc();

};

DWORD ThreadProc(void);

public:

CAsyncIo(CAsyncStream *pStream);

~CAsyncIo();

// open the file

HRESULT Open(LPCTSTR pName);

// ready for async activity - call this before

// calling Request

HRESULT AsyncActive(void);

// call this when no more async activity will happen before

// the next AsyncActive call

HRESULT AsyncInactive(void);

// queue a requested read. must be aligned.

HRESULT Request(

LONGLONG llPos,

LONG lLength,

BOOL bAligned,

BYTE* pBuffer,

LPVOID pContext,

DWORD dwUser);

// wait for the next read to complete

HRESULT WaitForNext(

DWORD dwTimeout,

LPVOID *ppContext,

DWORD * pdwUser,

LONG * pcbActual

);

// perform a read of an already aligned buffer

HRESULT SyncReadAligned(

LONGLONG llPos,

LONG lLength,

BYTE* pBuffer,

LONG* pcbActual

);

// perform a synchronous read. will be buffered

// if not aligned.

HRESULT SyncRead(

LONGLONG llPos,

LONG lLength,

BYTE* pBuffer);

// return length

HRESULT Length(LONGLONG *pllTotal, LONGLONG* pllAvailable);

// all Reader positions, read lengths and memory locations must

// be aligned to this.

HRESULT Alignment(LONG* pl);

HRESULT BeginFlush();

HRESULT EndFlush();

LONG Alignment()

{

return m_pStream->Alignment();

};

BOOL IsAligned(LONG l) {

if ((l & (Alignment() -1)) == 0) {

return TRUE;

} else {

return FALSE;

}

};

BOOL IsAligned(LONGLONG ll) {

return IsAligned( (LONG) (ll & 0xffffffff));

};

};

#endif // __ASYNCIO_H__

//****************************************************************

// Filename: CAsyncIo.cpp

// Copyright (c) 1992 - 1997 Microsoft Corporation. All Rights Reserved.

//****************************************************************

#include <streams.h>

#include "CAsyncIo.h"

// --- CAsyncRequest ---

// implementation of CAsyncRequest representing a single

// outstanding request. All the i/o for this object is done

// in the Complete method.

// init the params for this request.

// Read is not issued until the complete call

HRESULT

CAsyncRequest::Request(

CAsyncIo *pIo,

CAsyncStream *pStream,

LONGLONG llPos,

LONG lLength,

BOOL bAligned,

BYTE* pBuffer,

LPVOID pContext, // filter's cont