skynet服务究竟是什么,为什么有人说服务是一个lua虚拟机,服务与服务之间的通讯是怎样的,为什么服务的内存高居不下, 为什么拿skynet服务和erlang进程做比较?等等。。。而这一切的答案都在代码里面,让我们一步一步解开她的面纱。
skynet.newservice(name, ...)
-- skynet.lua function skynet.newservice(name, ...) return, lua , LAUNCH, snlua, name, ...) end
-- launcher.lua -- 处理服务的创建 local function launch_service(service, ...) local param = table.concat({...}, ) local inst = skynet.launch(service, param) local response = skynet.response() if inst then services[inst] = service .. .. param instance[inst] = response else response(false) return end return inst end -- 处理 LAUNCH 类消息 function command.LAUNCH(_, service, ...) launch_service(service, ...) return NORET end -- 处理launcher服务接收到的消息 skynet.dispatch(lua, function(session, address, cmd , ...) cmd = string.upper(cmd) local f = command[cmd] if f then local ret = f(address, ...) if ret ~= NORET then skynet.ret(skynet.pack(ret)) end else skynet.ret(skynet.pack {Unknown command} ) end end)
也就是调用 skynet.launch(service, param),实际上 .launcher 服务也是通过这函数实现的。
-- bootstrap.lua local launcher = assert(skynet.launch(snlua,launcher)), launcher)
再来看下skynet.launch(service, param),服务创建的关键api:
-- manager.lua local skynet = require skynet local c = require skynet.core function skynet.launch(...) local addr = c.command(LAUNCH, table.concat({...}, )) if addr then return tonumber(0x .. string.sub(addr , 2)) end endskynet.core这个是c实现的,编译成动态库给lua使用,可以在loadfunc时利用luaopen_* 找到这个c函数。实际接口函数如下:
// lua-skynet.c int luaopen_skynet_core(lua_State *L) { luaL_checkversion(L); luaL_Reg l[] = { { send , _send }, { genid, _genid }, { redirect, _redirect }, { command , _command }, { error, _error }, { tostring, _tostring }, { harbor, _harbor }, { pack, _luaseri_pack }, { unpack, _luaseri_unpack }, { packstring, lpackstring }, { trash , ltrash }, { callback, _callback }, { NULL, NULL }, }; luaL_newlibtable(L, l); lua_getfield(L, LUA_REGISTRYINDEX, skynet_context); struct skynet_context *ctx = lua_touserdata(L,-1); if (ctx == NULL) { return luaL_error(L, Init skynet context first); } luaL_setfuncs(L,l,1); return 1; }
// lua-skynet.c static int _command(lua_State *L) { struct skynet_context * context = lua_touserdata(L, lua_upvalueindex(1)); const char * cmd = luaL_checkstring(L,1); const char * result; const char * parm = NULL; if (lua_gettop(L) == 2) { parm = luaL_checkstring(L,2); } result = skynet_command(context, cmd, parm); if (result) { lua_pushstring(L, result); return 1; } return 0; }
// lua_server.c static struct command_func cmd_funcs[] = { { TIMEOUT, cmd_timeout }, { REG, cmd_reg }, { QUERY, cmd_query }, { NAME, cmd_name }, { NOW, cmd_now }, { EXIT, cmd_exit }, { KILL, cmd_kill }, { LAUNCH, cmd_launch }, { GETENV, cmd_getenv }, { SETENV, cmd_setenv }, { STARTTIME, cmd_starttime }, { ENDLESS, cmd_endless }, { ABORT, cmd_abort }, { MONITOR, cmd_monitor }, { MQLEN, cmd_mqlen }, { LOGON, cmd_logon }, { LOGOFF, cmd_logoff }, { SIGNAL, cmd_signal }, { NULL, NULL }, }; const char * skynet_command(struct skynet_context * context, const char * cmd , const char * param) { struct command_func * method = &cmd_funcs[0]; while(method->name) { if (strcmp(cmd, method->name) == 0) { return method->func(context, param); } ++method; } return NULL; } static const char * cmd_launch(struct skynet_context * context, const char * param) { size_t sz = strlen(param); char tmp[sz+1]; strcpy(tmp,param); char * args = tmp; char * mod = strsep(&args, ); args = strsep(&args, ); struct skynet_context * inst = skynet_context_new(mod,args);// 实例化上下文 if (inst == NULL) { return NULL; } else { id_to_hex(context->result, inst->handle); return context->result; } }再套上最前面的参数,也就是调用
skynet_context_new(snlua, name)再看下这个函数的实现。
// skynet_server.c struct skynet_context * skynet_context_new(const char * name, const char *param) { /* 这一步加载name的动态库,这里是 * snlua模块是 service_snlua.c 然后通过以下接口调用代码 * skynet_module_instance_create() --> snlua_create() * skynet_module_instance_init() --> snlua_init() * skynet_module_instance_release() --> snlua_release() * skynet_module_instance_signal() --> snlua_signal() */ struct skynet_module * mod = skynet_module_query(name); if (mod == NULL) return NULL; void *inst = skynet_module_instance_create(mod); // 执行snlua_create() 完成服务初始化 if (inst == NULL) return NULL; struct skynet_context * ctx = skynet_malloc(sizeof(*ctx)); CHECKCALLING_INIT(ctx) ctx->mod = mod; ctx->instance = inst; ctx->ref = 2; ctx->cb = NULL; ctx->cb_ud = NULL; ctx->session_id = 0; ctx->logfile = NULL; ctx->init = false; ctx->endless = false; // Should set to 0 first to avoid skynet_handle_retireall get an uninitialized handle ctx->handle = 0; ctx->handle = skynet_handle_register(ctx); struct message_queue * queue = ctx->queue = skynet_mq_create(ctx->handle); // init function maybe use ctx->handle, so it must init at last context_inc(); CHECKCALLING_BEGIN(ctx) int r = skynet_module_instance_init(mod, inst, ctx, param); // 执行snlua_init() 完成服务的创建 CHECKCALLING_END(ctx) if (r == 0) { struct skynet_context * ret = skynet_context_release(ctx); if (ret) { ctx->init = true; } skynet_globalmq_push(queue); if (ret) { skynet_error(ret, LAUNCH %s %s, name, param ? param : ); } return ret; } else { skynet_error(ctx, FAILED launch %s, name); uint32_t handle = ctx->handle; skynet_context_release(ctx); skynet_handle_retire(handle); struct drop_t d = { handle }; skynet_mq_release(queue, drop_message, &d); return NULL; } }
看下 snlua_init服务实例化的过程:
// service_snlua.c int snlua_init(struct snlua *l, struct skynet_context *ctx, const char * args) { int sz = strlen(args); char * tmp = skynet_malloc(sz); memcpy(tmp, args, sz); skynet_callback(ctx, l , _launch); // 设置回调函数为 _launch const char * self = skynet_command(ctx, REG, NULL); // 注册这个服务 uint32_t handle_id = strtoul(self+1, NULL, 16); /* it must be first message * 把参数当作消息内容发给这个服务,就是 skynet.newservice(name, ...) 后面的 ... * 目的是驱动服务完成初始化,后面会讲到,skynet服务是消息驱动。 */ skynet_send(ctx, 0, handle_id, PTYPE_TAG_DONTCOPY,0, tmp, sz); return 0; } static int _launch(struct skynet_context * context, void *ud, int type, int session, uint32_t source , const void * msg, size_t sz) { assert(type == 0 && session == 0); struct snlua *l = ud; skynet_callback(context, NULL, NULL); // 设置回调函数为 NULL int err = _init(l, context, msg, sz); if (err) { skynet_command(context, EXIT, NULL); } return 0; } // 完成服务的实例化,执行服务lua代码 static int _init(struct snlua *l, struct skynet_context *ctx, const char * args, size_t sz) { lua_State *L = l->L; l->ctx = ctx; lua_gc(L, LUA_GCSTOP, 0); lua_pushboolean(L, 1); /* signal for libraries to ignore env. vars. */ lua_setfield(L, LUA_REGISTRYINDEX, LUA_NOENV); luaL_openlibs(L); lua_pushlightuserdata(L, ctx); lua_setfield(L, LUA_REGISTRYINDEX, skynet_context); luaL_requiref(L, skynet.codecache, codecache , 0); lua_pop(L,1); const char *path = optstring(ctx, lua_path,./lualib/?.lua;./lualib/?/init.lua); lua_pushstring(L, path); lua_setglobal(L, LUA_PATH); const char *cpath = optstring(ctx, lua_cpath,./luaclib/?.so); lua_pushstring(L, cpath); lua_setglobal(L, LUA_CPATH); const char *service = optstring(ctx, luaservice, ./service/?.lua); lua_pushstring(L, service); lua_setglobal(L, LUA_SERVICE); const char *preload = skynet_command(ctx, GETENV, preload); lua_pushstring(L, preload); lua_setglobal(L, LUA_PRELOAD); lua_pushcfunction(L, traceback); assert(lua_gettop(L) == 1); const char * loader = optstring(ctx, lualoader, ./lualib/loader.lua); int r = luaL_loadfile(L,loader); // 加载loader模块代码 if (r != LUA_OK) { skynet_error(ctx, Can't load %s : %s, loader, lua_tostring(L, -1)); _report_launcher_error(ctx); return 1; } lua_pushlstring(L, args, sz); // 把服务名等参数传入,执行loader模块代码,实际上是通过loader加载和执行服务代码 r = lua_pcall(L,1,0,1); if (r != LUA_OK) { skynet_error(ctx, lua loader error : %s, lua_tostring(L, -1)); _report_launcher_error(ctx); return 1; } lua_settop(L,0); lua_gc(L, LUA_GCRESTART, 0); return 0; }
-- loader.lua SERVICE_NAME = args[1] local main, pattern local err = {} for pat in string.gmatch(LUA_SERVICE, ([^;]+);*) do local filename = string.gsub(pat, ?, SERVICE_NAME) local f, msg = loadfile(filename) -- 加载服务代码 if not f then table.insert(err, msg) else pattern = pat main = f break end end if not main then error(table.concat(err, )) end main(select(2, table.unpack(args))) -- 执行服务代码
顺道看下 skynet_callback 函数,很简单。
// skynet_server.c void skynet_callback(struct skynet_context * context, void *ud, skynet_cb cb) { context->cb = cb; context->cb_ud = ud; }到这里,服务就完成数据结构的初始化,lua层面是 lua state,数据结构是struct skynet_context *ctx
// skynet_start.c // 调度线程的工作函数 static void * thread_worker(void *p) { struct worker_parm *wp = p; int id = wp->id; int weight = wp->weight; struct monitor *m = wp->m; struct skynet_monitor *sm = m->m[id]; skynet_initthread(THREAD_WORKER); struct message_queue * q = NULL; while (!m->quit) { q = skynet_context_message_dispatch(sm, q, weight); // 消息队列的派发和处理 if (q == NULL) { if (pthread_mutex_lock(&m->mutex) == 0) { ++ m->sleep; // spurious wakeup is harmless, // because skynet_context_message_dispatch() can be call at any time. if (!m->quit) pthread_cond_wait(&m->cond, &m->mutex); -- m->sleep; if (pthread_mutex_unlock(&m->mutex)) { fprintf(stderr, unlock mutex error); exit(1); } } } } return NULL; }
// skynet_server.c // 处理服务消息 struct message_queue * skynet_context_message_dispatch(struct skynet_monitor *sm, struct message_queue *q, int weight) { if (q == NULL) { q = skynet_globalmq_pop(); if (q==NULL) return NULL; } uint32_t handle = skynet_mq_handle(q); struct skynet_context * ctx = skynet_handle_grab(handle); if (ctx == NULL) { struct drop_t d = { handle }; skynet_mq_release(q, drop_message, &d); return skynet_globalmq_pop(); } int i,n=1; struct skynet_message msg; for (i=0;i= 0) { n = skynet_mq_length(q); n >>= weight; } int overload = skynet_mq_overload(q); if (overload) { skynet_error(ctx, May overload, message queue length = %d, overload); } skynet_monitor_trigger(sm, msg.source , handle); if (ctx->cb == NULL) { skynet_free(; } else { dispatch_message(ctx, &msg); // 处理回调函数 } skynet_monitor_trigger(sm, 0,0); } assert(q == ctx->queue); struct message_queue *nq = skynet_globalmq_pop(); if (nq) { // If global mq is not empty , push q back, and return next queue (nq) // Else (global mq is empty or block, don't push q back, and return q again // (for next dispatch) skynet_globalmq_push(q); //TODO 为何不判断队列有无消息 q = nq; } skynet_context_release(ctx); return q; }
// 处理回调函数 static void dispatch_message(struct skynet_context *ctx, struct skynet_message *msg) { assert(ctx->init); CHECKCALLING_BEGIN(ctx) pthread_setspecific(G_NODE.handle_key, (void *)(uintptr_t)(ctx->handle)); int type = msg->sz >> HANDLE_REMOTE_SHIFT; size_t sz = msg->sz & HANDLE_MASK; if (ctx->logfile) { skynet_log_output(ctx->logfile, msg->source, type, msg->session, msg->data, sz); } // 执行回调函数 if (!ctx->cb(ctx, ctx->cb_ud, type, msg->session, msg->source, msg->data, sz)) { skynet_free(msg->data); } CHECKCALLING_END(ctx) }
可以看出,每个服务都有一个消息队列,如果有新消息就会加到全局队列,等待skynet取出分发,回调处理若干条消息。然后,利用 ctx->cb 处理,完成事件驱动。
下面以 example/simpledb.lua做说明,这是个典型的skynet服务。
local skynet = require skynet require skynet.manager -- import skynet.register local db = {} local command = {} function command.GET(key) return db[key] end function command.SET(key, value) local last = db[key] db[key] = value return last end skynet.start(function() skynet.dispatch(lua, function(session, address, cmd, ...) local f = command[string.upper(cmd)] if f then skynet.ret(skynet.pack(f(...))) else error(string.format(Unknown command %s, tostring(cmd))) end end) skynet.register SIMPLEDB end)
服务的代码被loader加载后就会执行,这里就会执行到 skynet.start(func) ,完成服务的启动。
-- skynet.lua function skynet.start(start_func) c.callback(skynet.dispatch_message) -- 设置回调函数 skynet.timeout(0, function() skynet.init_service(start_func) end) end function skynet.dispatch_message(...) local succ, err = pcall(raw_dispatch_message,...) -- 处理消息 -- 处理其他 skynet.fork 出来的协程 while true do local key,co = next(fork_queue) if co == nil then break end fork_queue[key] = nil local fork_succ, fork_err = pcall(suspend,co,coroutine.resume(co)) if not fork_succ then if succ then succ = false err = tostring(fork_err) else err = tostring(err) .. .. tostring(fork_err) end end end assert(succ, tostring(err)) end
前面也讨论了,c.XXX是调c函数实现的,从luaopen_skynet_core可以找到 callback的处理函数。
// lua-skynet.c static int _callback(lua_State *L) { struct skynet_context * context = lua_touserdata(L, lua_upvalueindex(1)); int forward = lua_toboolean(L, 2); luaL_checktype(L,1,LUA_TFUNCTION); // 取到上述c.callback(F)的F lua_settop(L,1); lua_rawsetp(L, LUA_REGISTRYINDEX, _cb); // 记录lua函数F到_cb这个索引位置 lua_rawgeti(L, LUA_REGISTRYINDEX, LUA_RIDX_MAINTHREAD); lua_State *gL = lua_tothread(L,-1); if (forward) { skynet_callback(context, gL, forward_cb); } else { skynet_callback(context, gL, _cb); // 设置消息回调处理函数 } return 0; } static int _cb(struct skynet_context * context, void * ud, int type, int session, uint32_t source, const void * msg, size_t sz) { lua_State *L = ud; int trace = 1; int r; int top = lua_gettop(L); if (top == 0) { lua_pushcfunction(L, traceback); lua_rawgetp(L, LUA_REGISTRYINDEX, _cb); // 取出_cb索引位置的lua函数 } else { assert(top == 2); } lua_pushvalue(L,2); lua_pushinteger(L, type); lua_pushlightuserdata(L, (void *)msg); lua_pushinteger(L,sz); lua_pushinteger(L, session); lua_pushinteger(L, source); r = lua_pcall(L, 5, 0 , trace); // 执行lua函数,也就是 skynet.dispatch_message if (r == LUA_OK) { return 0; } // 执行出错,就打印一些调试数据 const char * self = skynet_command(context, REG, NULL); switch (r) { case LUA_ERRRUN: skynet_error(context, lua call [%x to %s : %d msgsz = %d] error : KRED %s KNRM, source , self, session, sz, lua_tostring(L,-1)); break; case LUA_ERRMEM: skynet_error(context, lua memory error : [%x to %s : %d], source , self, session); break; case LUA_ERRERR: skynet_error(context, lua error in error : [%x to %s : %d], source , self, session); break; case LUA_ERRGCMM: skynet_error(context, lua gc error : [%x to %s : %d], source , self, session); break; }; lua_pop(L,1); return 0; }
紧接着回头看下skynet.timeout(0, function() skynet.init_service(start_func) end) 的处理
-- skynet.lua function skynet.timeout(ti, func) local session = c.command(TIMEOUT,tostring(ti)) -- 超时处理 assert(session) session = tonumber(session) local co = co_create(func) -- 从协程池找到空闲的协程来执行这个函数 assert(session_id_coroutine[session] == nil) session_id_coroutine[session] = co end
前面也提到 c.command 的处理,对于“TIMEOUT”的处理过程如下:
// skynet_server.c static const char * cmd_timeout(struct skynet_context * context, const char * param) { char * session_ptr = NULL; int ti = strtol(param, &session_ptr, 10); // 超时时间 int session = skynet_context_newsession(context); skynet_timeout(context->handle, ti, session); // 处理超时功能 sprintf(context->result, %d, session); return context->result; }看下skynet_timeout的处理
// skynet_timer.c int skynet_timeout(uint32_t handle, int time, int session) { if (time == 0) { struct skynet_message message; message.source = 0; message.session = session; = NULL; = PTYPE_RESPONSE << HANDLE_REMOTE_SHIFT; // 如果time为0,把超时事件压到服务的消息队列,等待调度处理 if (skynet_context_push(handle, &message)) { return -1; } } else { struct timer_event event; event.handle = handle; event.session = session; // time不为0,超时事件挂到时间轮上,等待超时处理 timer_add(TI, &event, sizeof(event), time); } return session; }co_create 是从协程池找到空闲的协程来执行这个函数,没有空闲的协程则创建。
-- skynet.lua local function co_create(f) local co = table.remove(coroutine_pool) if co == nil then co = coroutine.create(function(...) f(...) while true do f = nil coroutine_pool[#coroutine_pool+1] = co f = coroutine_yield EXIT -- a. yield 第一次获取函数 f(coroutine_yield()) -- b. yield 第二次获取函数参数,然后执行函数f end end) else -- resume 第一次让协程取到函数,就是 a点 -- 之后再 resume 第二次传入参数,并执行函数,就是b点 coroutine.resume(co, f) end return co end顺道说下 skynet.dispatch的处理(还记得吧,在前面 skynet.start时调用的):
-- skynet.lua function skynet.dispatch(typename, func) local p = proto[typename] if func then local ret = p.dispatch p.dispatch = func -- 设置协议的处理函数 return ret else return p and p.dispatch end end这一步是设置proto[typename].dispatch,的消息都会找到这个回调函数处理,如下:
-- skynet.lua local function raw_dispatch_message(prototype, msg, sz, session, source, ...) -- skynet.PTYPE_RESPONSE = 1, read skynet.h if prototype == 1 then local co = session_id_coroutine[session] if co == BREAK then session_id_coroutine[session] = nil elseif co == nil then unknown_response(session, source, msg, sz) else session_id_coroutine[session] = nil suspend(co, coroutine.resume(co, true, msg, sz)) end else local p = proto[prototype] if p == nil then if session ~= 0 then c.send(source, skynet.PTYPE_ERROR, session, ) else unknown_request(session, source, msg, sz, prototype) end return end local f = p.dispatch -- 找到dispatch函数 if f then local ref = watching_service[source] if ref then watching_service[source] = ref + 1 else watching_service[source] = 1 end local co = co_create(f) session_coroutine_id[co] = session session_coroutine_address[co] = source suspend(co, coroutine.resume(co, session,source, p.unpack(msg,sz, ...))) else unknown_request(session, source, msg, sz, proto[prototype].name) end end end
关于proto[typename],也作下简要的说明。可以看作是对数据的封装,方便不同服务间、不同节点间,以及前后端的数据通讯,不需要手动封包解包。默认支持lua/response/error这3个协议,除了这几个,其他要自己调用skynet.register_protocol 注册
-- skynet.lua function skynet.register_protocol(class) local name = local id = assert(proto[name] == nil) assert(type(name) == string and type(id) == number and id >=0 and id <=255) proto[name] = class proto[id] = class end do local REG = skynet.register_protocol REG { name = lua, id = skynet.PTYPE_LUA, pack = skynet.pack, unpack = skynet.unpack, } REG { name = response, id = skynet.PTYPE_RESPONSE, } REG { name = error, id = skynet.PTYPE_ERROR, unpack = function(...) return ... end, dispatch = _error_dispatch, } end
1、skynet.send 消息发送
2、 消息发送并返回
-- skynet.lua function skynet.send(addr, typename, ...) local p = proto[typename] return c.send(addr,, 0 , p.pack(...)) end function, typename, ...) local p = proto[typename] local session = c.send(addr, , nil , p.pack(...)) if session == nil then error(call to invalid address .. skynet.address(addr)) end return p.unpack(yield_call(addr, session)) end可以看出, 对比多了返回值的处理,所以就以 做说明。
// lua-skynet.c static int _send(lua_State *L) { struct skynet_context * context = lua_touserdata(L, lua_upvalueindex(1)); uint32_t dest = (uint32_t)lua_tointeger(L, 1); const char * dest_string = NULL; // 节点名字 if (dest == 0) { if (lua_type(L,1) == LUA_TNUMBER) { return luaL_error(L, Invalid service address 0); } dest_string = get_dest_string(L, 1); } int type = luaL_checkinteger(L, 2); int session = 0; if (lua_isnil(L,3)) { type |= PTYPE_TAG_ALLOCSESSION; } else { session = luaL_checkinteger(L,3); } int mtype = lua_type(L,4); switch (mtype) { case LUA_TSTRING: { size_t len = 0; void * msg = (void *)lua_tolstring(L,4,&len); if (len == 0) { msg = NULL; } if (dest_string) { // 以节点名字发消息 session = skynet_sendname(context, 0, dest_string, type, session , msg, len); } else { session = skynet_send(context, 0, dest, type, session , msg, len); } break; } case LUA_TLIGHTUSERDATA: { void * msg = lua_touserdata(L,4); int size = luaL_checkinteger(L,5); if (dest_string) { // 以节点名字发消息 session = skynet_sendname(context, 0, dest_string, type | PTYPE_TAG_DONTCOPY, session, msg, size); } else { session = skynet_send(context, 0, dest, type | PTYPE_TAG_DONTCOPY, session, msg, size); } break; } default: luaL_error(L, skynet.send invalid param %s, lua_typename(L, lua_type(L,4))); } if (session < 0) { // send to invalid address // todo: maybe throw an error would be better return 0; } lua_pushinteger(L,session); return 1; }这里看下 skynet_send 的实现吧:
// skynet_server.c int skynet_send(struct skynet_context * context, uint32_t source, uint32_t destination , int type, int session, void * data, size_t sz) { if ((sz & MESSAGE_TYPE_MASK) != sz) { skynet_error(context, The message to %x is too large, destination); if (type & PTYPE_TAG_DONTCOPY) { skynet_free(data); } return -1; } _filter_args(context, type, &session, (void **)&data, &sz); // 复制消息数据,获取session if (source == 0) { source = context->handle; } if (destination == 0) { return session; } if (skynet_harbor_message_isremote(destination)) { // 是否跨节点消息 struct remote_message * rmsg = skynet_malloc(sizeof(*rmsg)); rmsg->destination.handle = destination; rmsg->message = data; rmsg->sz = sz; skynet_harbor_send(rmsg, source, session); // 发给其他节点,这里不讨论 } else { struct skynet_message smsg; smsg.source = source; smsg.session = session; = data; = sz; // 发给其他服务,实际就是复制消息到队列,然后将消息队列加到全局队列 if (skynet_context_push(destination, &smsg)) { skynet_free(data); return -1; } } return session; }到这里,消息算是“发送”出去了, 可能拿到返回的结果,也就是这一步:
p.unpack(yield_call(addr, session))看下 yield_call 的实现,其实就是挂起协程,并把 “CALL”, session 返回给 coroutine.resume 者
local coroutine_yield = coroutine.yield local function yield_call(service, session) watching_session[session] = service local succ, msg, sz = coroutine_yield(CALL, session) watching_session[session] = nil if not succ then error call failed end return msg,sz end
另外补充下,skynet还对coroutine.yield进行了改写,但这个函数的功能不变,还是挂起协程,等待 coroutine.resume
继续看前面的代码,这里有个问题, 返回数据哪里来的?
实际上,还是走了服务间通讯的路子,看回 simpledb 的代码。
-- simpledb.lua skynet.start(function() skynet.dispatch(lua, function(session, address, cmd, ...) local f = command[string.upper(cmd)] if f then skynet.ret(skynet.pack(f(...))) -- 这里 skynet.ret 返回数据 else error(string.format(Unknown command %s, tostring(cmd))) end end) skynet.register SIMPLEDB end)看下skynet.ret 的处理,挂起返回数据。
function skynet.ret(msg, sz) msg = msg or return coroutine_yield(RETURN, msg, sz) end而skynet.dispatch 设置的回调函数,当回调触发时就会走到 raw_dispatch_message (前面有说明),继续执行就到了下面这步(函数较大,做了删节):
-- skynet.lua function suspend(co, result, command, param, size) if not result then local session = session_coroutine_id[co] if session then -- coroutine may fork by others (session is nil) local addr = session_coroutine_address[co] if session ~= 0 then -- only call response error c.send(addr, skynet.PTYPE_ERROR, session, ) end session_coroutine_id[co] = nil session_coroutine_address[co] = nil end error(debug.traceback(co,tostring(command))) end if command == CALL then -- 挂起返回时操作 session_id_coroutine[param] = co elseif command == RETURN then -- skynet.ret 挂起返回时操作 local co_session = session_coroutine_id[co] local co_address = session_coroutine_address[co] if param == nil or session_response[co] then error(debug.traceback(co)) end session_response[co] = true local ret if not dead_service[co_address] then -- 把处理结果当作消息发给请求的服务 ret = c.send(co_address, skynet.PTYPE_RESPONSE, co_session, param, size) ~= nil if not ret then -- If the package is too large, returns nil. so we should report error back c.send(co_address, skynet.PTYPE_ERROR, co_session, ) end elseif size ~= nil then c.trash(param, size) ret = false end return suspend(co, coroutine.resume(co, ret)) else error(Unknown command : .. command .. .. debug.traceback(co)) end dispatch_wakeup() -- 处理所有需要恢复的协程 dispatch_error_queue() end到这里,服务间的通讯就讲完了。可以看出,服务间通讯基于消息,而消息数据也通过复制以避免数据读写加锁。
每个skynet服务都是一个lua state,也就是一个lua虚拟机实例。而且,每个服务都是隔离的,各自使用自己独立的内存空间,服务之间通过发消息来完成数据交换。
lua state本身没有多线程支持的,为了实现cpu的摊分,skynet实现上在一个线程运行多个lua state实例。而同一时间下,调度线程只运行一个服务实例。为了提高系统的并发性,skynet会启动一定数量的调度线程。同时,为了提高服务的并发性,就利用lua协程并发处理。
1. 每个Actor依次处理收到的消息
2. 不同的Actor可同时处理各自的消息
下面给出两个lua代码 main.lua 和 simpledb.lua,和一个配置文件 config
-- main.lua local skynet = require skynet skynet.start(function() print(Server start) skynet.newservice(simpledb) -- 发消息给simpledb服务 skynet.send(SIMPLEDB, lua, TEST) -- 死循环占据cpu local i = 0 while true do i = i>100000000 and 0 or i+1 if i==0 then print(I'm working) end end skynet.exit() end)
-- simpledb.lua local skynet = require skynet require skynet.manager -- import skynet.register local db = {} local command = {} function command.TEST() print(Simpledb test) return true end skynet.start(function() print(Simpledb start) skynet.dispatch(lua, function(session, address, cmd, ...) local f = command[string.upper(cmd)] if f then skynet.ret(skynet.pack(f(...))) else error(string.format(Unknown command %s, tostring(cmd))) end end) skynet.register SIMPLEDB end)配置文件 config
root = ./ thread = 1 logger = nil logpath = . harbor = 1 address = master = start = main bootstrap = snlua bootstrap standalone = luaservice = root..service/?.lua;..root..test/?.lua;..root..examples/?.lua lualoader = lualib/loader.lua snax = root..examples/?.lua;..root..test/?.lua cpath = root..cservice/?.so
注意了,这里特地把 thread 设置为1,表示只启动一个调度线程。
现在,启动skynet执行我们的例子,结果如下:[root@local skynet]# ./skynet config [:01000001] LAUNCH logger [:01000002] LAUNCH snlua bootstrap [:01000003] LAUNCH snlua launcher [:01000004] LAUNCH snlua cmaster [:01000004] master listen socket [:01000005] LAUNCH snlua cslave [:01000005] slave connect to master [:01000004] connect from 4 [:01000006] LAUNCH harbor 1 16777221 [:01000004] Harbor 1 (fd=4) report [:01000005] Waiting for 0 harbors [:01000005] Shakehand ready [:01000007] LAUNCH snlua datacenterd [:01000008] LAUNCH snlua service_mgr [:01000009] LAUNCH snlua main Server start [:0100000a] LAUNCH snlua simpledb Simpledb start I'm working I'm working I'm working |
当然,同步问题也容易解决,加多一个state的标识和一个协程列表,操作执行时,将state置doing,其他协程判断state=doing时就将自己加到协程列表,然后 skynet.wait。在操作执行完后,重置state,然后遍历协程列表依次 skynet.wakeup(co) ,最后将协程列表置空。
天啊,我又写了一篇超长的文章,篇幅过长,通篇阅读的话可以很好锻炼耐心:)。 如果有什么建议和意见都可以评论,我看到就回复。另外补充下,skynet也在不断进步,以后很可能会解决上面提到的一些问题,希望skynet 越来越好。