From af9d7517b7d07ef98e9283ac29948c8abfb5bbda Mon Sep 17 00:00:00 2001 From: Chris Tallon Date: Fri, 16 Nov 2007 22:40:14 +0000 Subject: [PATCH] Upgrade to protocol --- mvpclient.c | 124 +++++++++++++++++++++++++++++++++++----------------- tcp.c | 16 +++++-- tcp.h | 8 ++-- 3 files changed, 102 insertions(+), 46 deletions(-) diff --git a/mvpclient.c b/mvpclient.c index 5260240..fcdae49 100644 --- a/mvpclient.c +++ b/mvpclient.c @@ -167,114 +167,158 @@ void MVPClient::run2() tcp.setSoKeepTime(3); tcp.setNonBlocking(); - UCHAR* buffer; - UCHAR* data; - int packetLength; + ULONG channelID; + ULONG serialNumber; ULONG opcode; + ULONG extraDataLength; + UCHAR* data; + int result = 0; while(1) { log->log("Client", Log::DEBUG, "Waiting"); - buffer = (UCHAR*)tcp.receivePacket(); - log->log("Client", Log::DEBUG, "Received packet, length = %u", tcp.getDataLength()); - if (buffer == NULL) + + if (!tcp.readData((UCHAR*)&channelID, sizeof(ULONG))) break; + channelID = ntohl(channelID); + if (channelID != 1) + { + log->log("Client", Log::ERR, "Incoming channel number not 1!"); + break; + } + + log->log("Client", Log::DEBUG, "Got chan"); + + if (!tcp.readData((UCHAR*)&serialNumber, sizeof(ULONG))) break; + serialNumber = ntohl(serialNumber); + + log->log("Client", Log::DEBUG, "Got ser"); + + if (!tcp.readData((UCHAR*)&opcode, sizeof(ULONG))) break; + opcode = ntohl(opcode); + + log->log("Client", Log::DEBUG, "Got op %lu", opcode); + + if (!tcp.readData((UCHAR*)&extraDataLength, sizeof(ULONG))) break; + extraDataLength = ntohl(extraDataLength); + if (extraDataLength > 200000) { - log->log("Client", Log::DEBUG, "Detected connection closed"); + log->log("Client", Log::ERR, "ExtraDataLength > 200000!"); break; } - packetLength = tcp.getDataLength() - 4; - opcode = ntohl(*(ULONG*)buffer); - data = buffer + 4; + log->log("Client", Log::DEBUG, "Got edl %lu", extraDataLength); + + if (extraDataLength) + { + data = (UCHAR*)malloc(extraDataLength); + if (!data) + { + log->log("Client", Log::ERR, "Extra data buffer malloc error"); + break; + } + + if (!tcp.readData(data, extraDataLength)) + { + log->log("Client", Log::ERR, "Could not read extradata"); + free(data); + break; + } + } + else + { + data = NULL; + } + + log->log("Client", Log::DEBUG, "Received chan=%lu, ser=%lu, op=%lu, edl=%lu", channelID, serialNumber, opcode, extraDataLength); if (!loggedIn && (opcode != 1)) { - free(buffer); + log->log("Client", Log::ERR, "Not logged in and opcode != 1"); + if (data) free(data); break; } - log->log("Client", Log::DEBUG, "SwitchOp"); switch(opcode) { case 1: - result = processLogin(data, packetLength); + result = processLogin(data, extraDataLength); break; case 2: - result = processGetRecordingsList(data, packetLength); + result = processGetRecordingsList(data, extraDataLength); break; case 3: - result = processDeleteRecording(data, packetLength); + result = processDeleteRecording(data, extraDataLength); break; case 5: - result = processGetChannelsList(data, packetLength); + result = processGetChannelsList(data, extraDataLength); break; case 6: - result = processStartStreamingChannel(data, packetLength); + result = processStartStreamingChannel(data, extraDataLength); break; case 7: - result = processGetBlock(data, packetLength); + result = processGetBlock(data, extraDataLength); break; case 8: - result = processStopStreaming(data, packetLength); + result = processStopStreaming(data, extraDataLength); break; case 9: - result = processStartStreamingRecording(data, packetLength); + result = processStartStreamingRecording(data, extraDataLength); break; case 10: - result = processGetChannelSchedule(data, packetLength); + result = processGetChannelSchedule(data, extraDataLength); break; case 11: - result = processConfigSave(data, packetLength); + result = processConfigSave(data, extraDataLength); break; case 12: - result = processConfigLoad(data, packetLength); + result = processConfigLoad(data, extraDataLength); break; case 13: - result = processReScanRecording(data, packetLength); // FIXME obselete + result = processReScanRecording(data, extraDataLength); // FIXME obselete break; case 14: - result = processGetTimers(data, packetLength); + result = processGetTimers(data, extraDataLength); break; case 15: - result = processSetTimer(data, packetLength); + result = processSetTimer(data, extraDataLength); break; case 16: - result = processPositionFromFrameNumber(data, packetLength); + result = processPositionFromFrameNumber(data, extraDataLength); break; case 17: - result = processFrameNumberFromPosition(data, packetLength); + result = processFrameNumberFromPosition(data, extraDataLength); break; case 18: - result = processMoveRecording(data, packetLength); + result = processMoveRecording(data, extraDataLength); break; case 19: - result = processGetIFrame(data, packetLength); + result = processGetIFrame(data, extraDataLength); break; case 20: - result = processGetRecInfo(data, packetLength); + result = processGetRecInfo(data, extraDataLength); break; case 21: - result = processGetMarks(data, packetLength); + result = processGetMarks(data, extraDataLength); break; case 22: - result = processGetChannelPids(data, packetLength); + result = processGetChannelPids(data, extraDataLength); break; case 23: - result = processDeleteTimer(data, packetLength); + result = processDeleteTimer(data, extraDataLength); break; case 30: - result = processGetMediaList(data, packetLength); + result = processGetMediaList(data, extraDataLength); break; case 31: - result = processGetPicture(data, packetLength); + result = processGetPicture(data, extraDataLength); break; case 32: - result = processGetImageBlock(data, packetLength); + result = processGetImageBlock(data, extraDataLength); break; } - free(buffer); + if (data) free(data); if (!result) break; } } @@ -695,7 +739,7 @@ int MVPClient::processGetChannelPids(UCHAR* data, int length) #endif // printf("About to send getchannelpids response. length = %u\n", spaceRequired); - tcp.dump(sendBuffer, spaceRequired); + //tcp.dump(sendBuffer, spaceRequired); tcp.sendPacket(sendBuffer, spaceRequired); delete[] sendBuffer; @@ -1581,6 +1625,8 @@ int MVPClient::processGetTimers(UCHAR* buffer, int length) log->log("Client", Log::DEBUG, "recorded size as %u", ntohl(*(ULONG*)&sendBuffer[0])); +//tcp.dump(sendBuffer, count); + tcp.sendPacket(sendBuffer, count); delete[] sendBuffer; log->log("Client", Log::DEBUG, "Written timers list"); diff --git a/tcp.c b/tcp.c index 0a6e402..14ed0d4 100644 --- a/tcp.c +++ b/tcp.c @@ -27,6 +27,7 @@ TCP::TCP(int tsocket) sock = -1; connected = 0; readTimeoutEnabled = 1; + pthread_mutex_init(&sendLock, NULL); if (tsocket) { @@ -221,7 +222,13 @@ int TCP::readData(UCHAR* buffer, int totalBytes) int TCP::sendPacket(UCHAR* buf, size_t count) { - if (!connected) return 0; + pthread_mutex_lock(&sendLock); + + if (!connected) + { + pthread_mutex_unlock(&sendLock); + return 0; + } unsigned int bytesWritten = 0; int thisWrite; @@ -241,6 +248,7 @@ int TCP::sendPacket(UCHAR* buf, size_t count) { cleanup(); log->log("TCP", Log::DEBUG, "TCP: error or timeout"); + pthread_mutex_unlock(&sendLock); return 0; // error, or timeout } @@ -252,6 +260,7 @@ int TCP::sendPacket(UCHAR* buf, size_t count) // and sets errno to EGAGAIN. but we use select so it wouldn't do that anyway. cleanup(); log->log("TCP", Log::DEBUG, "Detected connection closed"); + pthread_mutex_unlock(&sendLock); return 0; } bytesWritten += thisWrite; @@ -259,6 +268,7 @@ int TCP::sendPacket(UCHAR* buf, size_t count) // log->log("TCP", Log::DEBUG, "Bytes written now: %u", bytesWritten); if (bytesWritten == count) { + pthread_mutex_unlock(&sendLock); return 1; } else @@ -267,15 +277,13 @@ int TCP::sendPacket(UCHAR* buf, size_t count) { cleanup(); log->log("TCP", Log::DEBUG, "too many writes"); + pthread_mutex_unlock(&sendLock); return 0; } } } } - - - void TCP::dump(unsigned char* data, USHORT size) { printf("Size = %u\n", size); diff --git a/tcp.h b/tcp.h index 489dfa4..7f0c092 100644 --- a/tcp.h +++ b/tcp.h @@ -36,6 +36,7 @@ #include #include #include +#include #include "log.h" @@ -59,7 +60,8 @@ class TCP int connectTo(char *host, unsigned short port); int sendPacket(UCHAR*, size_t size); UCHAR* receivePacket(); - + int readData(UCHAR* buffer, int totalBytes); + // Get methods int isConnected(); int getDataLength(); @@ -73,9 +75,9 @@ class TCP int connected; int readTimeoutEnabled; int dataLength; - + pthread_mutex_t sendLock; + void cleanup(); - int readData(UCHAR* buffer, int totalBytes); }; #endif -- 2.39.5