An Example of TCP Socket
This article presents a simple of using IP socket in Unix. The code has been taken from one of my projects, which was cooperated with Silicon Labs’ EmberZNet App framework. It implements a socket client, and has been tested using a Node.js based server.
An example of Node.js test server.
var net = require('net'),
DeviceController = require('../controller/DeviceController.js'),
DeviceDBManagement = require('../controller/DeviceDBManagement.js'),
Config = require('../Config.js'),
_ = require('underscore'),
Logger = require('../Logger.js');
var clientConnected = false;
var socketServerPort = Config.SOCKET_PORT;
var socketServerAddress = Config.SOCKET_IPv4;
var messageParsed = {};
var socketFd = null;
var socketServer = net.createServer(function(socket) {
Logger.server.log('info', 'Node TCP server connected to client ' + socket.remoteAddress +':'+ socket.remotePort);
clientConnected = true;
socketFd = socket;
socket.on('close', function(data) {
Logger.server.log('info', 'Node TCP server disconnects to client ' + socketServerAddress +':'+ socketServerPort);
clientConnected = false;
});
socket.on('error', function() {
Logger.server.log('info', 'Socket Server Error: ');
});
socket.on('data', function(message) {
try {
var buff = new Buffer(message, 'utf8');
recvMsg = buff.slice(0, -1).toString();
messageParsed = JSON.parse(recvMsg);
if (!messageParsed.topic) throw "No topic error";
msgDispatcher(messageParsed.topic);
} catch (e) {
Logger.server.log('info', 'Error Parsing Socket client Received: ' + e
+ '\nmessage: ' + recvMsg);
}
}.bind(this));
}.bind(this)).listen(socketServerPort, socketServerAddress);
Logger.server.log('info', 'Node TCP server listening on ' + socketServerAddress +':'+ socketServerPort);
socketServer.publishSocket = function (channel, payload, qos, callback) {
if (clientConnected && socketFd) {
socketFd.write(payload + ' ');
callback();
} else {
Logger.server.log('info', 'No TCP socket client connected.');
}
};
var msgDispatcher = function(topic) {
var topicStr = topic.split("/")[2];
Logger.server.log('info', 'Recv\'d remote payload: ' + JSON.stringify(messageParsed));
switch(topicStr) {
case 'heartbeat':
heartbeat_handler(messageParsed);
break;
case 'devicejoined':
devicejoined_handler(messageParsed);
break;
case 'deviceleft':
deviceleft_handler(messageParsed);
break;
case 'devices':
devices_handler(messageParsed);
break;
case 'devicestatechange':
devicestatechange_handler(messageParsed);
break;
case 'relays':
relays_handler(messageParsed);
break;
case 'settings':
settings_handler(messageParsed);
break;
case 'zclresponse':
zclresponse_handler(messageParsed);
break;
case 'otaevent':
otaevent_handler(messageParsed);
break;
case 'executed':
executed_handler(messageParsed);
break;
default:
Logger.server.log('info', 'Unknown topic: ' + topicStr);
break;
}
};
var heartbeat_handler = function(msgJson) {
DeviceController.onHeartbeat(msgJson, msgJson.topic.split('/')[1]);
};
var devicejoined_handler = function(msgJson) {
DeviceDBManagement.onNodeJoin(msgJson, msgJson.topic.split('/')[1], true);
};
var deviceleft_handler = function(msgJson) {
DeviceDBManagement.onNodeLeft(msgJson.eui64);
};
var devices_handler = function(msgJson) {
DeviceDBManagement.onDeviceListReceived(msgJson, msgJson.topic.split('/')[1]);
};
var devicestatechange_handler = function(msgJson) {
DeviceDBManagement.onNodeStateChange(msgJson);
};
var relays_handler = function(msgJson) {
DeviceController.onRelayList(msgJson);
};
var settings_handler = function(msgJson) {
DeviceController.onGatewaySettings(msgJson);
};
var zclresponse_handler = function(msgJson) {
DeviceController.onZCLResponse(msgJson);
};
var otaevent_handler = function(msgJson) {
DeviceController.onOtaEvent(msgJson);
};
var executed_handler = function(msgJson) {
DeviceController.onMessageExecuted(msgJson);
}
module.exports = socketServer;
An example of IP socket client. The client will try to reconnect to the server if the connection is down. As EmberZNet framework provides a feature of delayed events, the code leverage such a feature to complete a longlink connection.
/**
* @file
* \brief The transportation layer based on socket. Note that it only
* supports TCP/IPv4.
*/
#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
#include <string.h>
#include <pthread.h>
#include <sys/uio.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <fcntl.h>
#include <unistd.h>
#include <time.h>
#include <signal.h>
#include "transport-socket.h"
#ifndef INT_TEST
#include PLATFORM_HEADER
#include "stack/include/ember-types.h"
#include "stack/include/event.h"
#include "hal/hal.h"
#include "app/framework/include/af.h"
#else
#include <assert.h>
#endif
// Error codes.
#define SOCKET_SUCCESS (0)
#define SOCKET_LOST_CONN (0)
#define SOCKET_ERROR (-1)
#define RAW_DATA_ERROR (-2)
#define BUFFER_ERROR (-3)
#define INVALID_RD_BYTES (-4)
#define INVALID_SD_BYTES (-5)
// Delays and intervals.
#define CONNECTION_THREAD_DELAY_S (2)
#define WRITE_SOCK_THREAD_DELAY_MS (50)
#define RECV_SOCK_THREAD_DELAY_MS (10)
#define LONG_LINK_ITVL_MS (2000)
// Buffer lengths.
#define RECV_BUF_MAX (1000)
#define WRITE_BUF_MAX (1000)
// Others.
#define RETRY_MAX (5)
#define MSG_CNT_DISCONN_MAX (10)
#define EISCONN_STANDBY_MAX (5)
#define ARRAY_LEN(x) (sizeof(x) / sizeof((x)[0]))
#ifdef INT_TEST
#define TEST_ARRT static
#else
#define TEST_ARRT
#endif
#ifdef INT_TEST
#define DEBUG_PRT printf
#elif EMBER_AF_PRINT_CORE
#define DEBUG_PRT emberAfCorePrint
#else
#define DEBUG_PRT
#endif
typedef enum _SocketStatus {
CONNECTED = 0, // Connected
CON_IN_PROG, // Connetion in progress
DISCONNECTED, // Disconnected
} SocketStatus;
typedef struct _SocketInfo {
int tcpSocketFd;
uint16_t longLinkItvl;
SocketStatus sockStatus;
struct sockaddr_in servAddr;
} SocketInfo;
typedef struct _EmberAfPluginLinkedListElement {
struct _EmberAfPluginLinkedListElement * next;
struct _EmberAfPluginLinkedListElement * previous;
void * content;
} EmberAfPluginLinkedListElement;
typedef struct {
EmberAfPluginLinkedListElement * head;
EmberAfPluginLinkedListElement * tail;
uint32_t count;
} EmberAfPluginLinkedList;
TEST_ARRT const char * defPort = "50000";
TEST_ARRT const char * defHostIP = "127.0.0.1";
TEST_ARRT char connPort[10] = { 0 };
TEST_ARRT char connHostIP[20] = { 0 };
static SocketInfo sockInfo = {
.tcpSocketFd = SOCKET_ERROR,
.longLinkItvl = LONG_LINK_ITVL_MS,
.sockStatus = DISCONNECTED,
};
static pthread_t connectionThreadId;
static pthread_t writeSocketThreadId;
static pthread_t recvSocketThreadId;
static pthread_mutex_t sockInfoMutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t sendBufMutex = PTHREAD_MUTEX_INITIALIZER;
#ifndef INT_TEST
EmberEventControl emberAfPluginTransportSocketLongLinkEventControl;
#endif
TEST_ARRT void emberAfPluginTransportSocketLongLinkEventHandler(void);
static char recvBuf[RECV_BUF_MAX];
static EmberAfPluginLinkedList * sendBuf = NULL;
// Forward declaration.
static void connectionThread(void * param);
static void recvSocketThread(void * param);
static void writeSocketThread(void * param);
#ifdef INT_TEST
TEST_ARRT bool emberAfPluginTransportSocketPublish(const char * payload);
#endif
/**********************************************************************
* The following methods are based the linked-list plugin. *
* The original plugin has the limitation of free allocated memories. *
* The following methods resolve such a limitation. *
**********************************************************************/
static EmberAfPluginLinkedList * emberAfPluginLinkedListInit(void)
{
EmberAfPluginLinkedList *list =
(EmberAfPluginLinkedList *)malloc(sizeof(EmberAfPluginLinkedList));
if (list != NULL) {
memset(list, 0, sizeof(EmberAfPluginLinkedList));
}
return list;
}
static void emberAfPluginLinkedListPushBack(
EmberAfPluginLinkedList *list,
void* content)
{
EmberAfPluginLinkedListElement *element =
(EmberAfPluginLinkedListElement *)malloc(sizeof(
EmberAfPluginLinkedListElement));
if (element != NULL) {
element->content = content;
element->next = NULL;
element->previous = list->tail;
if (list->head == NULL) {
list->head = element;
} else {
list->tail->next = element;
}
list->tail = element;
++(list->count);
}
}
static void emberAfPluginLinkedListPopFront(
EmberAfPluginLinkedList *list)
{
if (list->count > 0) {
EmberAfPluginLinkedListElement* head = list->head;
if (list->tail == head) {
list->tail = NULL;
}
list->head = list->head->next;
if (head->content) free(head->content);
free(head);
--(list->count);
}
}
static bool emberAfPluginLinkedListRemoveElement(
EmberAfPluginLinkedList *list,
EmberAfPluginLinkedListElement *element)
{
if ((element != NULL) && (list->head != NULL)) {
if (element == list->head) {
if (list->head == list->tail) {
list->head = NULL;
list->tail = NULL;
} else {
list->head = element->next;
element->next->previous = NULL;
}
} else if (element == list->tail) {
list->tail = element->previous;
element->previous->next = NULL;
} else {
element->previous->next = element->next;
element->next->previous = element->previous;
}
--(list->count);
if (element->content) free(element->content);
free(element);
return true;
}
return false;
}
static EmberAfPluginLinkedListElement * emberAfPluginLinkedListNextElement(
EmberAfPluginLinkedList *list,
EmberAfPluginLinkedListElement *elementPosition)
{
if (elementPosition == NULL) {
return list->head;
} else {
return elementPosition->next;
}
}
/* Helper methods */
static EmberAfPluginLinkedListElement * findElementInList(char ** buf)
{
EmberAfPluginLinkedListElement *curElement = NULL;
do {
curElement = emberAfPluginLinkedListNextElement(sendBuf,
curElement);
if (curElement != NULL &&
(char *)curElement->content == *buf) {
return curElement;
}
} while (curElement != NULL);
return NULL;
}
static void freeBufInList(char ** buf)
{
EmberAfPluginLinkedListElement *element = NULL;
pthread_mutex_lock(&sendBufMutex);
element = findElementInList(buf);
if (element) {
emberAfPluginLinkedListRemoveElement(sendBuf, element);
}
pthread_mutex_unlock(&sendBufMutex);
}
static void disconnectionEventHandler(void)
{
return;
}
static void socketRawDataRecvdEventHandler(char * buf, size_t recvBytes)
{
if (!buf) return;
buf[recvBytes - 1] = '\0';
#ifdef INT_TEST
static uint32_t msgCnt = 0;
DEBUG_PRT("Recv'd recvBytes-%lu, msgLen-%lu, msg-{%s} [seq#: %u]\r\n",
recvBytes,
strlen(buf),
buf,
msgCnt);
msgCnt++;
#else
emberAfPluginTransportSocketMessageArrivedCallback(buf);
#endif
}
static void writeSocketErrEventHandler(char ** buf)
{
// Basically does nothing. The payload should stay in the list.
(void)buf;
}
static void partialWriteEventHandler(char ** buf, size_t actualBytes)
{
static char * preBuf = NULL;
static uint8_t attempts = 0;
// After attempting to resend several times, remove it from the list.
if (!preBuf && attempts == 0) {
preBuf = *buf;
} else {
if (preBuf == *buf && attempts < RETRY_MAX) {
attempts++;
} else {
attempts = 0;
preBuf = NULL;
freeBufInList(buf);
}
}
}
static void writeSocketSucEventHandler(char ** buf)
{
freeBufInList(buf);
}
static void brokenPipeEventHandler(int sig)
{
(void)sig;
}
/**
* @brief: Set non-blocking mode to a socket.
* @param[Out]: sockfd - sockect to be set.
* @return: -1 indicates error and EAGAIN will be set.
*/
static int setNonBlocking(int * sockfd)
{
int ret;
int flags;
flags = fcntl(*sockfd, F_GETFL, 0);
ret = fcntl(*sockfd, F_SETFL, flags | O_NONBLOCK);
return ret;
}
/**
* @brief: Create a new TCPv4 socket.
* @param[Out]: sockfd - sockect to be set.
* @return: -1 indicates error. Other number indicates a fd.
*/
static int newTcpSocket(int * sockfd)
{
return *sockfd = socket(AF_INET, SOCK_STREAM, 0);
}
/**
* @brief: Validate the server IP and port specified by users.
* @param[Out]: servAddr - server address to be set.
* @return: -1 indicates error and EAFNOSUPPORT will be set.
*/
static int validateServerAddr(struct sockaddr_in * servAddr)
{
int ret = SOCKET_ERROR;
int port;
bzero((void *)servAddr, sizeof(*servAddr));
#ifdef EMBER_AF_PLUGIN_TRANSPORT_SOCKET_SERVER_PORT
port = atoi(EMBER_AF_PLUGIN_TRANSPORT_SOCKET_SERVER_PORT);
strncpy(connPort,
EMBER_AF_PLUGIN_TRANSPORT_SOCKET_SERVER_PORT,
sizeof(connPort));
connPort[sizeof(connPort) - 1] = '\0';
#else
port = atoi(defPort);
strncpy(connPort,
defPort,
sizeof(connPort));
connPort[sizeof(connPort) - 1] = '\0';
#endif
#ifdef EMBER_AF_PLUGIN_TRANSPORT_SOCKET_SERVER_IP
ret = inet_pton(AF_INET,
EMBER_AF_PLUGIN_TRANSPORT_SOCKET_SERVER_IP,
&(servAddr->sin_addr));
strncpy(connHostIP,
EMBER_AF_PLUGIN_TRANSPORT_SOCKET_SERVER_IP,
sizeof(connHostIP));
connHostIP[sizeof(connHostIP) - 1] = '\0';
#else
ret = inet_pton(AF_INET, defHostIP, &(servAddr->sin_addr));
strncpy(connHostIP,
defHostIP,
sizeof(connHostIP));
connHostIP[sizeof(connHostIP) - 1] = '\0';
#endif
servAddr->sin_port = htons(port);
servAddr->sin_family = AF_INET;
return ret;
}
/**
* @brief: Connect to the server IP:port.
* @param[In]: sockfd - socket fd.
* @param[In]: servAddr - server address structure.
* @return: -1 indicates error.
*/
static int connectTcpSocket(const int * sockfd,
const struct sockaddr_in * servAddr)
{
return connect(*sockfd, (struct sockaddr *)servAddr, sizeof(*servAddr));
}
/**
* @brief: Close a specified socket.
* @param[In]: sockfd - socket fd.
* @return: -1 indicates error.
*/
static int closeTcpSocket(const int * sockfd)
{
return close(*sockfd);
}
/**
* @brief: Tries to read bytes from a specified socket. As the commands
* from upper layers are not large, we use a fixed buffer with
* reasonable size to hold the commands.
* @param[In]: sockfd - socket fd.
* @param[Out]: buf - a pointer to the buffer to be filled with.
* @param[In]: bytes - No. of bytes to be read.
* @param[In]: disconnetCb - a callback for socket disconnection.
* @param[In]: rawRecvdCb - a callback for received raw data.
* @return: 0 indicates success. < 0 values indicate error code.
*/
static int getSocketData(const int * sockfd,
char * buf,
size_t bytes,
void (*disconnetCb)(void),
void (*rawRecvdCb)(char * buf, size_t recvBytes))
{
int ret;
if (!buf) {
ret = BUFFER_ERROR;
goto exit;
} else if (bytes == 0) {
ret = INVALID_RD_BYTES;
goto exit;
}
if ((ret = recv(*sockfd, buf, bytes, 0)) == SOCKET_ERROR) {
goto exit;
} else if (ret == 0) {
if (disconnetCb) disconnetCb();
goto exit;
} else {
if (rawRecvdCb) rawRecvdCb(buf, ret);
}
exit:
return ret;
}
/**
* @brief: Tries to send bytes to a specified socket. As long as the
* data to be sent is less than PIPE_BUF (normally 65KBytes),
* we should be fine using this method.
* @param[In]: sockfd - socket fd.
* @param[In]: buf - the buf whose data to be sent.
* @param[In]: bufLen - the size of data to be sent.
* @param[In]: writeSocketErrCb - a callback for the socket error
* when writing. Pass the buffer as param.
* @param[In]: partialWriteCb - a callback for partial bytes written
* error. Pass the buffer and actual written
* bytes as params.
* @param[In]: writeSocketSucCb - a callback for successful writing to a
* socket. Pass the buffer as param.
* @return: 0 indicates success. '< 0' values indicate error code.
*/
static int putSocketData(const int * sockfd,
char ** buf,
size_t bufLen,
void (*writeSocketErrCb)(char ** buf),
void (*partialWriteCb)(char ** buf,
size_t actualBytes),
void (*writeSocketSucCb)(char ** buf))
{
int rc, ret;
if (!buf) {
ret = BUFFER_ERROR;
goto exit;
} else if (bufLen == 0) {
ret = INVALID_SD_BYTES;
goto exit;
}
if ((rc = write(*sockfd, *buf, bufLen)) != SOCKET_ERROR) {
if (rc < bufLen) {
ret = INVALID_SD_BYTES;
if (partialWriteCb) partialWriteCb(buf, (size_t)rc);
} else {
ret = SOCKET_SUCCESS;
if (writeSocketSucCb) writeSocketSucCb(buf);
}
} else {
ret = SOCKET_ERROR;
if (writeSocketErrCb) writeSocketErrCb(buf);
}
exit:
return ret;
}
static void delayMs(uint32_t delayMs)
{
struct timespec delayTime;
delayTime.tv_sec = delayMs / 1000;
delayTime.tv_nsec = (delayMs % 1000) * 1000000;
nanosleep(&delayTime, NULL);
}
static void delayS(uint32_t delayS)
{
sleep(delayS);
}
static void printConnStatus(SocketStatus status)
{
static bool inProgPtrd = false;
switch(status)
{
case CON_IN_PROG:
if (!inProgPtrd) {
DEBUG_PRT("TCP client is connecting to %s:%s\r\n",
connHostIP, connPort);
inProgPtrd = true;
}
break;
case CONNECTED:
DEBUG_PRT("TCP client is connected to %s:%s\r\n",
connHostIP, connPort);
inProgPtrd = false;
break;
case DISCONNECTED:
DEBUG_PRT("TCP client is disconnected from %s:%s\r\n",
connHostIP, connPort);
inProgPtrd = false;
break;
default:
break;
}
}
/* Initialize functions */
static void initSignalHandler(void)
{
signal(SIGPIPE, brokenPipeEventHandler);
}
static void initEventControl(void)
{
#ifndef INT_TEST
emberEventControlSetActive(
emberAfPluginTransportSocketLongLinkEventControl);
#endif
}
static void initBuffer(void)
{
memset((void *)recvBuf, 0, sizeof(recvBuf));
sendBuf = emberAfPluginLinkedListInit();
}
static void initTcpSocket(void)
{
assert(newTcpSocket(&sockInfo.tcpSocketFd) != -1);
assert(validateServerAddr(&sockInfo.servAddr) == 1);
assert(setNonBlocking(&sockInfo.tcpSocketFd) == 0);
}
static void initThreads(void)
{
pthread_mutexattr_t attr;
assert(pthread_mutexattr_init(&attr) == 0);
assert(pthread_mutex_init(&sockInfoMutex, &attr) == 0);
assert(pthread_mutex_init(&sendBufMutex, &attr) == 0);
assert(pthread_create(&connectionThreadId,
NULL,
(void *)connectionThread,
NULL) == 0);
assert(pthread_create(&writeSocketThreadId,
NULL,
(void *)writeSocketThread,
NULL) == 0);
assert(pthread_create(&recvSocketThreadId,
NULL,
(void *)recvSocketThread,
NULL) == 0);
}
/* Methods called by threads. */
static void connectionThread(void * param)
{
int rc;
static uint8_t eiconnCnt = 0;
for(;;)
{
pthread_mutex_lock(&sockInfoMutex);
if (sockInfo.sockStatus != CONNECTED) {
if ((rc = connectTcpSocket(&sockInfo.tcpSocketFd,
&sockInfo.servAddr)) == 0) {
sockInfo.sockStatus = CONNECTED;
} else {
if (errno == EINPROGRESS) {
sockInfo.sockStatus = CON_IN_PROG;
eiconnCnt = 0;
printConnStatus(CON_IN_PROG);
} else if (errno == EISCONN &&
eiconnCnt < EISCONN_STANDBY_MAX) {
// This gives EISCONN_STANDBY_MAX * CONNECTION_THREAD_DELAY_S
// seconds before the client tries to reconnect.
sockInfo.sockStatus = CONNECTED;
eiconnCnt++;
} else {
sockInfo.sockStatus = DISCONNECTED;
closeTcpSocket(&sockInfo.tcpSocketFd);
initTcpSocket();
eiconnCnt = 0;
}
}
}
pthread_mutex_unlock(&sockInfoMutex);
delayS(CONNECTION_THREAD_DELAY_S);
}
}
static void writeSocketThread(void * param)
{
#ifdef INT_TEST
int rc;
char *buf;
char **payload;
EmberAfPluginLinkedListElement *bufHd = NULL;
for(;;)
{
pthread_mutex_lock(&sockInfoMutex);
buf = (char *)malloc(sizeof("Transport-socket - test data!"));
strcpy(buf, "Transport-socket - test data!");
emberAfPluginTransportSocketPublish(buf);
if (sockInfo.sockStatus == CONNECTED) {
bufHd = emberAfPluginLinkedListNextElement(sendBuf,
NULL);
if (!bufHd) continue;
payload = (char **)&bufHd->content;
rc = putSocketData(&sockInfo.tcpSocketFd,
payload,
strlen(*payload) + 1,
writeSocketErrEventHandler,
partialWriteEventHandler,
writeSocketSucEventHandler);
if (rc == SOCKET_ERROR) {
sockInfo.sockStatus = DISCONNECTED;
}
}
pthread_mutex_unlock(&sockInfoMutex);
delayS(1);
}
#else
int rc;
char **payload;
EmberAfPluginLinkedListElement *bufHd = NULL;
for(;;)
{
pthread_mutex_lock(&sockInfoMutex);
if (sockInfo.sockStatus == CONNECTED &&
sendBuf &&
sendBuf->count > 0) {
bufHd = emberAfPluginLinkedListNextElement(sendBuf,
NULL);
if (!bufHd) continue;
payload = (char **)&bufHd->content;
rc = putSocketData(&sockInfo.tcpSocketFd,
payload,
strlen(*payload) + 1,
writeSocketErrEventHandler,
partialWriteEventHandler,
writeSocketSucEventHandler);
if (rc == SOCKET_ERROR) {
sockInfo.sockStatus = DISCONNECTED;
}
}
pthread_mutex_unlock(&sockInfoMutex);
delayMs(WRITE_SOCK_THREAD_DELAY_MS);
}
#endif // INT_TEST
}
static void recvSocketThread(void * param)
{
int rc;
for(;;)
{
pthread_mutex_lock(&sockInfoMutex);
if (sockInfo.sockStatus == CONNECTED) {
rc = getSocketData(&sockInfo.tcpSocketFd,
recvBuf,
RECV_BUF_MAX,
disconnectionEventHandler,
socketRawDataRecvdEventHandler);
if (rc == SOCKET_LOST_CONN) {
sockInfo.sockStatus = DISCONNECTED;
}
}
pthread_mutex_unlock(&sockInfoMutex);
delayMs(RECV_SOCK_THREAD_DELAY_MS);
}
}
/**********************************************************************
* Ember Application Framework - Event handlers and callbacks. *
**********************************************************************/
/**
* @brief: This callback should be called when main init is executing.
*/
TEST_ARRT void emberAfPluginTransportSocketInitCallback(void)
{
initBuffer();
initSignalHandler();
initTcpSocket();
initThreads();
initEventControl();
}
/**
* @brief: The event control will be specified by the properties file, and
* generated by the app-builder.
*/
TEST_ARRT void emberAfPluginTransportSocketLongLinkEventHandler(void)
{
char data;
EmberAfPluginTransportSocketState curStatus;
static EmberAfPluginTransportSocketState lastStatus =
EMBER_AF_PLUGIN_TRANSPORT_SOCKET_STATE_DISCONNECTED;
#ifndef INT_TEST
emberEventControlSetInactive(
emberAfPluginTransportSocketLongLinkEventControl);
#endif
pthread_mutex_lock(&sockInfoMutex);
if (sockInfo.sockStatus == CONNECTED &&
recv(sockInfo.tcpSocketFd, &data, 1, MSG_PEEK) == 0) {
sockInfo.sockStatus = DISCONNECTED;
closeTcpSocket(&sockInfo.tcpSocketFd);
initTcpSocket();
}
if (sockInfo.sockStatus == CONNECTED) {
curStatus = EMBER_AF_PLUGIN_TRANSPORT_SOCKET_STATE_CONNECTED;
} else {
curStatus = EMBER_AF_PLUGIN_TRANSPORT_SOCKET_STATE_DISCONNECTED;
}
pthread_mutex_unlock(&sockInfoMutex);
if (curStatus != lastStatus) {
#ifndef INT_TEST
emberAfPluginTransportSocketStateChangedCallback(curStatus);
#endif
if (curStatus == EMBER_AF_PLUGIN_TRANSPORT_SOCKET_STATE_CONNECTED) {
printConnStatus(CONNECTED);
} else {
printConnStatus(DISCONNECTED);
}
}
lastStatus = curStatus;
#ifndef INT_TEST
emberEventControlSetDelayMS(
emberAfPluginTransportSocketLongLinkEventControl,
sockInfo.longLinkItvl);
#endif
}
/**
* @brief: This method is meant to be called by gatway-relay-socket.
* @param[In]: payload - The payload to be inserted to the send-buffer.
* It should be stored in a malloc'd memory. The
* garbage collection is handled at this layer.
*/
TEST_ARRT bool emberAfPluginTransportSocketPublish(const char * payload)
{
SocketStatus status;
if (!payload) return false;
#ifndef INT_TEST
pthread_mutex_lock(&sockInfoMutex);
#endif
status = sockInfo.sockStatus;
if(status == CONNECTED) {
pthread_mutex_lock(&sendBufMutex);
emberAfPluginLinkedListPushBack(sendBuf, (void *)payload);
pthread_mutex_unlock(&sendBufMutex);
} else {
if (sendBuf && sendBuf->count >= MSG_CNT_DISCONN_MAX) {
pthread_mutex_lock(&sendBufMutex);
emberAfPluginLinkedListPopFront(sendBuf);
emberAfPluginLinkedListPushBack(sendBuf, (void *)payload);
pthread_mutex_unlock(&sendBufMutex);
} else if (sendBuf && sendBuf->count < MSG_CNT_DISCONN_MAX) {
pthread_mutex_lock(&sendBufMutex);
emberAfPluginLinkedListPushBack(sendBuf, (void *)payload);
pthread_mutex_unlock(&sendBufMutex);
} else {
#ifndef INT_TEST
pthread_mutex_unlock(&sockInfoMutex);
#endif
return false;
}
}
#ifndef INT_TEST
pthread_mutex_unlock(&sockInfoMutex);
#endif
return true;
}