mysql的udf编程之非阻塞超时重传

2023年 4月 23日 51.5k 0

MySQL的UDF(User Defined Function)类似于一种API, 用户根据一定的规范用C/C++(或采用C调用规范的语言)编写一组函数(UDF),然后编译成动态链接库,通过DROP FUNCTION语句来加载和卸载UDF。UDF被加载

MySQL的UDF(User Defined Function)类似于一种API, 用户根据一定的规范用C/C++(或采用C调用规范的语言)编写一组函数(UDF),然后编译成动态链接库,通过DROP FUNCTION语句来加载和卸载UDF。UDF被加载后可以像调用MySQL的内置函数一样来调用它,并且服务器在启动时会自动加载原来存在的UDF。复制代码 代码如下:#ifdef STANDARD/* STANDARD is defined, don't use any mysql functions */#include <stdlib.h>#include <stdio.h>#include <string.h>#ifdef __WIN__typedef unsigned __int64 ulonglong;    /* Microsofts 64 bit types */typedef __int64 longlong;#elsetypedef unsigned long long ulonglong;typedef long long longlong;#endif /*__WIN__*/#else#include <my_global.h>#include <my_sys.h>#if defined(MYSQL_SERVER)#include <m_string.h>        /* To get strmov() */#else/* when compiled as standalone */#include <string.h>#endif#endif#include <mysql.h>#include <m_ctype.h>#include <m_string.h>#include <stdlib.h>#include <errno.h>#include <netdb.h>#include <unistd.h>#include<fcntl.h>#include<sys time.h="">#include<sys ioctl.h="">#include <sys types.h="">#include <netinet in.h="">#include <sys socket.h="">#include <sys wait.h="">#include<arpa inet.h="">#include<unistd.h>#include <mysql.h>#include <ctype.h>#ifdef HAVE_DLOPEN

my_bool http_post_init(UDF_INIT *initid, UDF_ARGS *args, char *message);void http_post_deinit(UDF_INIT *initid);longlong http_post(UDF_INIT *initid, UDF_ARGS *args, char *is_null,char *error);/*************************************************************************** Example of init function** Arguments:** initid                        Points to a structure that the init function should fill.**            char *ptr;            A pointer that the function can use.** message                        Error message**RETURN                        This function should return 1 if something goes wrong. In this case**************************************************************************/my_bool http_post_init(UDF_INIT *initid, UDF_ARGS *args, char *message){if (args->arg_count < 3 ){    strcpy(message,"Wrong arguments to http_post; ");    return 1;}

if(args->arg_count == 4 && args->args[3]!=NULL){      int flexibleLength = strlen(args->args[3]);      if(flexibleLength > 160000)      {          int allocLength = 200 + flexibleLength;          if (!(initid->ptr=(char*) malloc(allocLength) ) )          {                strcpy(message,"Couldn't allocate memory in http_post_init");                return 1;          }        return 0;      }      else      {          initid->ptr=NULL;    }

}   return 0;

}

/****************************************************************************** Deinit function. This should all resources allocated by** this function.** Arguments:** initid    Return value from xxxx_init****************************************************************************/void http_post_deinit(UDF_INIT *initid){     if (initid!=NULL && initid->ptr!=NULL)          {              free(initid->ptr);              initid->ptr = NULL;          }

}

/***************************************************************************** UDF string function.** Arguments:** initid    Structure filled by xxx_init** args        The same structure as to xxx_init. This structure** This function should return a pointer to the result string.** Normally this is 'result' but may also be an alloced string.***************************************************************************/longlong http_post(    UDF_INIT *initid, UDF_ARGS *args,                char *is_null __attribute__((unused)),                char *error __attribute__((unused))){    int sockfd=0;    int numbytes=0;    int flags=0;    int cycletimes=0;    char* sendBuffer=NULL;

    fd_set wset;    struct timeval tval;    tval.tv_sec = 0;    tval.tv_usec = 300000;

    if(initid->ptr == NULL)    {        char sendArray[160000] = "\0";        sendBuffer=sendArray;    }    else    {        sendBuffer = initid->ptr;    }

    struct sockaddr_in serv_addr;    serv_addr.sin_family = AF_INET;    serv_addr.sin_port = htons(atoi(args->args[1]));    serv_addr.sin_addr.s_addr = inet_addr(args->args[0]);    bzero(&(serv_addr.sin_zero),8);

    if(args->arg_count == 4 && (args->args[3]!=NULL) )    {        int argsNum = strlen(args->args[3]);        sprintf(sendBuffer,"POST /?%s HTTP/1.1\r\nContent-Length:%d\r\n\r\n%s",args->args[2],argsNum,args->args[3]);    }    else    {        sprintf(sendBuffer,"POST /?%s HTTP/1.1\r\n",args->args[2]);    }

    if((sockfd = socket(AF_INET,SOCK_STREAM,0)) == -1)    {        close(sockfd);        return 2;    }    flags = fcntl(sockfd,F_GETFL,0);    fcntl(sockfd,F_SETFL,flags|O_NONBLOCK);//设置为非阻塞    do    {        connect(sockfd, (struct sockaddr *)&serv_addr, sizeof(struct sockaddr));        FD_ZERO(&wset);        FD_SET(sockfd,&wset);        if( select(sockfd+1, NULL, &wset, NULL,&tval) <= 0 && cycletimes==5)        {            close(sockfd);            return 5;        }        numbytes = send(sockfd,sendBuffer,strlen(sendBuffer),0);        if(numbytes<0)        {            usleep(20000);        }        cycletimes++;    }while(numbytes<0 && cycletimes!=5);    if(numbytes<0)    {        close(sockfd);        return 4;    }    close(sockfd);    return 0;}#endif /* HAVE_DLOPEN */</ctype.h></mysql.h></unistd.h></arpa></sys></sys></netinet></sys></sys></sys></fcntl.h></unistd.h></netdb.h></errno.h></stdlib.h></m_string.h></m_ctype.h></mysql.h></string.h></m_string.h></my_sys.h></my_global.h></string.h></stdio.h></stdlib.h>

相关文章

Oracle如何使用授予和撤销权限的语法和示例
Awesome Project: 探索 MatrixOrigin 云原生分布式数据库
下载丨66页PDF,云和恩墨技术通讯(2024年7月刊)
社区版oceanbase安装
Oracle 导出CSV工具-sqluldr2
ETL数据集成丨快速将MySQL数据迁移至Doris数据库

发布评论