`
whitesock
  • 浏览: 478639 次
  • 性别: Icon_minigender_1
  • 来自: 大连
社区版块
存档分类
最新评论

MySQL UDF

阅读更多

    最近公司在做个SNS子项目,需要把交易系统中的交易数据实时地发送到SNS子系统中。最自然的设计是修改各个交易模块,将数据向SNS系统传输。但是在评审和开发的时候遇到了不小的阻力。理由很简单,这些修改直接影响系统的核心交易模块,引入了一定的风险。综合考虑后,决定增加个类似于Memcached Functions for MySQL的备选方案。SNS子系统虽然大量地使用了cache,但是没有使用Memcached或EHCache,原因是这些cache实现无法满足所有的需求。最初的想法是在UDF中使用ActiveMQ CPP,将消息直接发送到SNS子系统的cache中。不过最终还是放弃了这种比较激进的做法,而是在UDF中将数据通过socket发送到某个Java程序,然后再对数据进行分类处理后,转发给SNS cache。

 

    关于user-defined function(UDF),在MySQL的官方文档上有比较详细的说明。为了使用UDF,必须要动态链接mysqld,也就是不能使用--with-mysqld-ldflags=-all-static,而是应该使用--with-mysqld-ldflags=-rdynamic。UDF通常需要用C/C++编写,如果要编写一个名为xxx的UDF,那么需要定义如下的C/C++方法:

  • xxx() (required)。这个方法的返回值就是UDF的结果。SQL数据类型和C/C++类型之间的对应关系是:varchar char *;INTEGER long long;REAL double;DECIMAL char *等。
  • xxx_init() (optional)。xxx()对应的初始化方法。该方法通常用来检查xxx()方法的参数(或者进行参数类型的转换),分配内存,指定返回值的最大长度等。
  • xxx_deinit() (optional)。xxx()对应的清理方法。在这个方法中应该释放之前分配的内存。

    当在SQL中调用名为XXX()的UDF时,MySQL会首先尝试调用名为xxx_init() 的方法,如果该方法返回false,那么MySQL会终止SQL的执行,并返回一个error message(在xxx_init()中保存在message参数中的以null结尾的字符串,最大长度为 MYSQL_ERRMSG_SIZE)。否则MySQL会对每个row调用xxx() 方法。在所有的row都被处理后,MySQL会调用xxx_deinit() 方法来执行相应的清理工作。对于那些象SUM()之类的聚集函数,还有一些其他的C/C++函数需要编写。需要注意的是,如果采用C++编写UDF,由于C++的"name mangling"会导致MySQL无法找到对应的C++函数,因此需要将函数声明包含在extern "C" { ... }中。以下是这些函数的例子:

my_bool xxx_init(UDF_INIT *initid, UDF_ARGS *args, char *message);

char *xxx(UDF_INIT *initid, UDF_ARGS *args, char *result, unsigned long *length, char *is_null, char *error);

void xxx_deinit(UDF_INIT *initid);

 

    initid参数会被传递到所有三个函数中(用于在这三个函数中共享数据),它指向一个UDF_INIT结构体。关于该结构体的成员,可以参考MySQL的在线文档。args参数指向一个UDF_ARGS结构体,它有如下成员:

  • unsigned int arg_count。参数的个数。
  • enum Item_result *arg_type。参数的类型。可选值有STRING_RESULT, INT_RESULT, REAL_RESULT, and DECIMAL_RESULT。DECIMAL_RESULT类型的值是以char *的形式传入到函数中的,因此处理方式同STRING_RESULT类型参数。如果xxx_init()中对该成员赋值,例如args->arg_type[0] = STRING_RESULT;那么MySQl会在调用xxx()是对 相应的参数进行强制地类型转换。
  •  char **args。参数值。如果参数值是NULL,那么args->args[i]是一个空指针(0)。对于STRING_RESUL类型的参数,那么可以通过 args->args[i]访问该字符串,字符串的长度是args->lengths[i](不要假定该字符串是以null结尾的)。对于INT_RESULT类型的参数,通过long long int_val = *((long long*) args->args[i]);进行访问。对于REAL_RESULT类型参数,通过double real_val = *((double*) args->args[i]);进行访问。对于DECIMAL_RESULT类型参数,处理方式同STRING_RESULT。
  • unsigned long *lengths。在xxx_init()函数中,其值是字符串参数的最大长度。在xxx()函数中,其值是字符串参数的实际长度。对于INT_RESULT 或者REAL_RESULT类型参数,其值始终是最大可能值。
  • char *maybe_null。在xxx_init()函数中,其值表明该参数是否可以为null。
  • char **attributes。参数名。默认情况下是SQL中的表达式文本(同样,不要假定该字符串是以null结尾的)。
  • unsigned long *attribute_lengths。参数名的字符串长度。

    以下是个UDF C代码的片段:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
#include <pthread.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include "mysql.h"

///////////////////////////////////////////////////
extern "C" { 
    //
    my_bool send_message_open_init(UDF_INIT *initid, UDF_ARGS *args, char *message);
    char * send_message_open(UDF_INIT *initid, UDF_ARGS *args, char* result, unsigned long *length, char *is_null, char *error);
    void send_message_open_deinit(UDF_INIT *initid);

    //
    my_bool send_message_close_init(UDF_INIT *initid, UDF_ARGS *args, char *message);
    char * send_message_close(UDF_INIT *initid, UDF_ARGS *args, char* result, unsigned long *length, char *is_null, char *error);
    void send_message_close_deinit(UDF_INIT *initid);
    
    //
    my_bool send_message_init(UDF_INIT *initid, UDF_ARGS *args, char *message);
    char * send_message(UDF_INIT *initid, UDF_ARGS *args, char* result, unsigned long *length, char *is_null, char *error);
    void send_message_deinit(UDF_INIT *initid);
}

////////////////////////////////////////////////////
// Open
////////////////////////////////////////////////////
my_bool send_message_open_init(UDF_INIT *initid, UDF_ARGS *args, char *message) {
    // Validation
    if(args->arg_count != 2) {
        strcpy(message,"usage: select send_message_open('10.15.3.68', 7777)");
        return -1;
    } else if(args->arg_type[0] != STRING_RESULT || args->arg_type[1] != INT_RESULT){
        strcpy(message,"usage: select send_message_open('10.15.3.68', 7777)");
        return -1;
    } else {
        return 0;
    }
}

char * send_message_open(UDF_INIT *initid, UDF_ARGS *args, char* result, unsigned long *length, char *is_null, char *error){
    ...
}

void send_message_open_deinit(UDF_INIT *initid) {
}

...

 

    编译:

    g++ -I/usr/local/mysql/include/mysql/ -shared -o send_message.so send_message.c

    编译后需要将so文件拷贝到MySQL可以加载的位置,如:
    cp send_message.so /usr/local/mysql6320/lib/mysql/plugin/

 

    在MySQL中创建UDF:

DROP FUNCTION IF EXISTS send_message_open;
DROP FUNCTION IF EXISTS send_message_close;
DROP FUNCTION IF EXISTS send_message;
CREATE FUNCTION send_message_open RETURNS STRING SONAME 'send_message.so'; 
CREATE FUNCTION send_message_close RETURNS STRING SONAME 'send_message.so'; 
CREATE FUNCTION send_message RETURNS STRING SONAME 'send_message.so'; 

 

    测试:

select send_message_open('10.15.3.68', 7777);
select send_message('test message');
select send_message_close();
8
0
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics