From ee941bd9f7520eb1160d6fb8a80cca2552f5d990 Mon Sep 17 00:00:00 2001 From: xuxuepeng Date: Thu, 7 Sep 2023 20:11:55 +0800 Subject: [PATCH 08/33] Add vsock support for exec Signed-off-by: xuxuepeng --- src/daemon/modules/service/CMakeLists.txt | 4 + .../modules/service/service_container.c | 90 +- .../modules/service/vsock_io_handler.cc | 812 ++++++++++++++++++ src/daemon/modules/service/vsock_io_handler.h | 46 + src/daemon/sandbox/sandbox.cc | 21 + src/daemon/sandbox/sandbox.h | 6 + 6 files changed, 966 insertions(+), 13 deletions(-) create mode 100644 src/daemon/modules/service/vsock_io_handler.cc create mode 100644 src/daemon/modules/service/vsock_io_handler.h diff --git a/src/daemon/modules/service/CMakeLists.txt b/src/daemon/modules/service/CMakeLists.txt index b0e9f04a..a7713c15 100644 --- a/src/daemon/modules/service/CMakeLists.txt +++ b/src/daemon/modules/service/CMakeLists.txt @@ -5,6 +5,10 @@ if(NOT ENABLE_NATIVE_NETWORK) list(REMOVE_ITEM local_service_srcs "${CMAKE_CURRENT_SOURCE_DIR}/service_network.c") endif() +if(NOT ENABLE_CRI_API_V1) + list(REMOVE_ITEM local_service_srcs "${CMAKE_CURRENT_SOURCE_DIR}/vsock_io_handler.cc") +endif() + set(SERVICE_SRCS ${local_service_srcs} PARENT_SCOPE diff --git a/src/daemon/modules/service/service_container.c b/src/daemon/modules/service/service_container.c index a9da669a..c4ee0223 100644 --- a/src/daemon/modules/service/service_container.c +++ b/src/daemon/modules/service/service_container.c @@ -71,6 +71,7 @@ #include "id_name_manager.h" #ifdef ENABLE_CRI_API_V1 #include "sandbox_ops.h" +#include "vsock_io_handler.h" #endif #define KATA_RUNTIME "kata-runtime" @@ -2054,9 +2055,41 @@ out: return ret; } -static int exec_prepare_console(const container_t *cont, const container_exec_request *request, int stdinfd, - struct io_write_wrapper *stdout_handler, struct io_write_wrapper *stderr_handler, - char **fifos, char **fifopath, int *sync_fd, pthread_t *thread_id) +#ifdef ENABLE_CRI_API_V1 +static int exec_prepare_vsock(const container_t *cont, const container_exec_request *request, int stdinfd, + struct io_write_wrapper *stdout_handler, struct io_write_wrapper *stderr_handler, + char **vsockpaths, int *sync_fd, pthread_t *thread_id) +{ + uint32_t cid; + const char *task_address = cont->common_config->sandbox_info->task_address; + if (!parse_vsock_path(task_address, &cid, NULL)) { + ERROR("Failed to parse vsock path %s", task_address); + return -1; + } + + if (!request->attach_stdin && !request->attach_stdout && !request->attach_stderr) { + return 0; + } + + if (create_daemon_vsockpaths(cont->common_config->sandbox_info->id, cid, request->attach_stdin, request->attach_stdout, + request->attach_stderr, vsockpaths) != 0) { + return -1; + } + + *sync_fd = eventfd(0, EFD_CLOEXEC); + if (*sync_fd < 0) { + SYSERROR("Failed to create eventfd"); + return -1; + } + + return start_vsock_io_copy(request->suffix, *sync_fd, false, request->stdin, request->stdout, request->stderr, stdinfd, + stdout_handler, stderr_handler, (const char **)vsockpaths, thread_id); +} +#endif + +static int exec_prepare_fifo(const container_t *cont, const container_exec_request *request, int stdinfd, + struct io_write_wrapper *stdout_handler, struct io_write_wrapper *stderr_handler, + char **fifos, char **fifopath, int *sync_fd, pthread_t *thread_id) { int ret = 0; const char *id = cont->common_config->id; @@ -2070,7 +2103,7 @@ static int exec_prepare_console(const container_t *cont, const container_exec_re *sync_fd = eventfd(0, EFD_CLOEXEC); if (*sync_fd < 0) { - ERROR("Failed to create eventfd: %s", strerror(errno)); + SYSERROR("Failed to create eventfd"); ret = -1; goto out; } @@ -2084,6 +2117,26 @@ out: return ret; } +#ifdef ENABLE_CRI_API_V1 +static bool is_vsock_supported(const container_t *cont) +{ + return cont->common_config->sandbox_info != NULL && + is_vsock_path(cont->common_config->sandbox_info->task_address); +} +#endif + +static int exec_prepare_console(const container_t *cont, const container_exec_request *request, int stdinfd, + struct io_write_wrapper *stdout_handler, struct io_write_wrapper *stderr_handler, + char **io_addresses, char **iopath, int *sync_fd, pthread_t *thread_id) +{ +#ifdef ENABLE_CRI_API_V1 + if (is_vsock_supported(cont)) { + return exec_prepare_vsock(cont, request, stdinfd, stdout_handler, stderr_handler, io_addresses, sync_fd, thread_id); + } +#endif + return exec_prepare_fifo(cont, request, stdinfd, stdout_handler, stderr_handler, io_addresses, iopath, sync_fd, thread_id); +} + static void exec_container_end(container_exec_response *response, const container_t *cont, const char *exec_id, uint32_t cc, int exit_code, int sync_fd, pthread_t thread_id) @@ -2117,6 +2170,17 @@ static void exec_container_end(container_exec_response *response, const containe } } +static void cleanup_exec_console_io(const container_t *cont, const char *fifopath, const char *io_addresses[]) +{ +#ifdef ENABLE_CRI_API_V1 + if (is_vsock_supported(cont)) { + delete_daemon_vsockpaths(cont->common_config->sandbox_info->id, io_addresses); + return; + } +#endif + delete_daemon_fifos(fifopath, (const char **)io_addresses); +} + static int get_exec_user_info(const container_t *cont, const char *username, defs_process_user **puser) { int ret = 0; @@ -2178,8 +2242,8 @@ int exec_container(const container_t *cont, const container_exec_request *reques int sync_fd = -1; uint32_t cc = ISULAD_SUCCESS; char *id = NULL; - char *fifos[3] = { NULL, NULL, NULL }; - char *fifopath = NULL; + char *io_addresses[3] = { NULL, NULL, NULL }; + char *iopath = NULL; pthread_t thread_id = 0; defs_process_user *puser = NULL; char exec_command[EVENT_ARGS_MAX] = { 0x00 }; @@ -2237,13 +2301,13 @@ int exec_container(const container_t *cont, const container_exec_request *reques } } - if (exec_prepare_console(cont, request, stdinfd, stdout_handler, stderr_handler, fifos, &fifopath, &sync_fd, + if (exec_prepare_console(cont, request, stdinfd, stdout_handler, stderr_handler, io_addresses, &iopath, &sync_fd, &thread_id)) { cc = ISULAD_ERR_EXEC; goto pack_response; } (void)isulad_monitor_send_container_event(id, EXEC_START, -1, 0, exec_command, NULL); - if (do_exec_container(cont, cont->runtime, (char * const *)fifos, puser, request, &exit_code)) { + if (do_exec_container(cont, cont->runtime, (char * const *)io_addresses, puser, request, &exit_code)) { cc = ISULAD_ERR_EXEC; goto pack_response; } @@ -2253,11 +2317,11 @@ int exec_container(const container_t *cont, const container_exec_request *reques pack_response: exec_container_end(response, cont, request->suffix, cc, exit_code, sync_fd, thread_id); - delete_daemon_fifos(fifopath, (const char **)fifos); - free(fifos[0]); - free(fifos[1]); - free(fifos[2]); - free(fifopath); + cleanup_exec_console_io(cont, iopath, (const char **)io_addresses); + free(io_addresses[0]); + free(io_addresses[1]); + free(io_addresses[2]); + free(iopath); free_defs_process_user(puser); return (cc == ISULAD_SUCCESS) ? 0 : -1; diff --git a/src/daemon/modules/service/vsock_io_handler.cc b/src/daemon/modules/service/vsock_io_handler.cc new file mode 100644 index 00000000..efc74bc8 --- /dev/null +++ b/src/daemon/modules/service/vsock_io_handler.cc @@ -0,0 +1,812 @@ +/****************************************************************************** + * Copyright (c) Huawei Technologies Co., Ltd. 2023. All rights reserved. + * iSulad licensed under the Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR + * PURPOSE. + * See the Mulan PSL v2 for more details. + * Author: xuxuepeng + * Create: 2023-09-04 + * Description: provide vsock io functions + ********************************************************************************/ +#include "vsock_io_handler.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "console.h" +#include "utils.h" +#include "sandbox_manager.h" +#include "sandbox.h" + +const std::string VSOCK_PREFIX = "vsock://"; +const int VSOCK_RETRY_INTERVAL = 1000; // 1000ms +const int VSOCK_RETRY_TIMEOUT = 10000; // 10000ms +const int MILLI_TO_MICRO = 1000; + +bool is_vsock_path(const char *path) +{ + if (path == NULL) { + return false; + } + std::string path_str = path; + if (path_str.find(VSOCK_PREFIX) == 0) { + return true; + } + + return false; +} + +bool parse_vsock_path(const char *vsock_path, uint32_t *cid, uint32_t *port) +{ + uint32_t vsock_cid, vsock_port; + + if (!is_vsock_path(vsock_path)) { + ERROR("Invalid vsock path, %s", vsock_path); + return false; + } + std::string vsock_path_str = vsock_path; + std::string vsock_address = vsock_path_str.substr(VSOCK_PREFIX.size()); + if (vsock_address.empty()) { + ERROR("Invalid vsock address, %s", vsock_path); + return false; + } + + // split vsock_address by ':' + size_t col_pos = vsock_address.find(':'); + if (col_pos == std::string::npos) { + ERROR("Failed to find ':' in vsock address, %s", vsock_path); + return false; + } + + std::string cid_str = vsock_address.substr(0, col_pos); + if (util_safe_uint(cid_str.c_str(), &vsock_cid) != 0) { + ERROR("Failed to parse cid, %s", cid_str.c_str()); + return false; + } + + std::string port_str = vsock_address.substr(col_pos + 1); + if (util_safe_uint(port_str.c_str(), &vsock_port) != 0) { + ERROR("Failed to parse port, %s", port_str.c_str()); + return false; + } + + if (cid != NULL) { + *cid = vsock_cid; + } + + if (port != NULL) { + *port = vsock_port; + } + + return true; +} + +static int find_available_vsock_port_for_sandbox(const char *sandbox_id, uint32_t *port) +{ + if (sandbox_id == NULL || port == NULL) { + ERROR("Invalid NULL sandbox id or port"); + return -1; + } + std::string sandbox_id_str = sandbox_id; + std::shared_ptr sandbox = sandbox::SandboxManager::GetInstance()->GetSandbox(sandbox_id_str); + if (sandbox == nullptr) { + ERROR("Failed to find sandbox %s", sandbox_id); + return -1; + } + + if (sandbox->FindAvailableVsockPort(*port)) { + return 0; + } + + ERROR("Failed to find available vsock port for sandbox %s", sandbox_id); + + return -1; +} + +static void release_vsock_port_for_sandbox(const char *sandbox_id, uint32_t port) +{ + if (sandbox_id == NULL) { + return; + } + std::string sandbox_id_str = sandbox_id; + std::shared_ptr sandbox = sandbox::SandboxManager::GetInstance()->GetSandbox(sandbox_id_str); + if (sandbox == nullptr) { + return; + } + + sandbox->ReleaseVsockPort(port); +} + +static int set_flags(int fd, int flags) +{ + int curflag; + int ret; + + curflag = fcntl(fd, F_GETFL, 0); + if (curflag < 0) { + SYSERROR("Failed to get flags for vsock fd"); + return -1; + } + + ret = fcntl(fd, F_SETFL, curflag | flags); + if (ret != 0) { + SYSERROR("Failed to set flags for vsock fd"); + return -1; + } + + return 0; +} + +static int vsock_connect(uint32_t cid, uint32_t port) +{ + int fd = -1; + struct sockaddr_vm sa = { 0 }; + + fd = socket(AF_VSOCK, SOCK_STREAM, 0); + if (fd < 0) { + SYSERROR("Failed to create vsock socket"); + return -1; + } + + sa.svm_family = AF_VSOCK; + sa.svm_cid = cid; + sa.svm_port = port; + + if (connect(fd, (struct sockaddr *)&sa, sizeof(sa)) !=0) { + SYSERROR("Failed to connect vsock socket"); + close(fd); + return -1; + } + return fd; +} + +/* + * We setup connection as a client, so we need to wait the server to be ready. + * In the following function, we need to keep retrying until connection is established. + * The retrying time is 10s. + */ +int vsock_open(const char *vsock_path, int *fdout, int flags) +{ + int ret; + int fd = -1; + int retry = 0; + uint32_t cid; + uint32_t port; + + if (vsock_path == NULL || fdout == NULL) { + ERROR("Invalid NULL vsock path or fdout"); + return -1; + } + + if (!parse_vsock_path(vsock_path, &cid, &port)) { + ERROR("Failed to parse vsock path, %s", vsock_path); + return -1; + } + + DEBUG("Open vsock, cid %u, port %u", cid, port); + + while (retry < VSOCK_RETRY_TIMEOUT) { + fd = vsock_connect(cid, port); + if (fd >= 0) { + break; + } + DEBUG("Failed to connect vsock socket"); + retry += VSOCK_RETRY_INTERVAL; + usleep(VSOCK_RETRY_INTERVAL * MILLI_TO_MICRO); + } + + if (retry >= VSOCK_RETRY_TIMEOUT) { + ERROR("Failed to connect vsock socket, timeout"); + return -1; + } + + ret = set_flags(fd, flags); + if (ret < 0) { + ERROR("Failed to set flags for vsock fd"); + close(fd); + return -1; + } + + *fdout = fd; + return 0; +} + +static char *create_single_vsockpath(const char *sandbox_id, uint32_t cid) +{ + uint32_t vsock_port; + + if (find_available_vsock_port_for_sandbox(sandbox_id, &vsock_port) != 0) { + ERROR("Failed to find available vsock port for sandbox %s", sandbox_id); + return NULL; + } + std::string vsock_address = VSOCK_PREFIX + std::to_string(cid) + ":" + std::to_string(vsock_port); + + DEBUG("Create vsock path %s for sandbox %s", vsock_address.c_str(), sandbox_id); + + return util_strdup_s(vsock_address.c_str()); +} + +int create_daemon_vsockpaths(const char *sandbox_id, uint32_t cid, bool attach_stdin, bool attach_stdout, + bool attach_stderr, char *vsockpaths[]) +{ + int ret = -1; + + if (sandbox_id == NULL || vsockpaths == NULL) { + return -1; + } + if (attach_stdin) { + vsockpaths[0] = create_single_vsockpath(sandbox_id, cid); + if (vsockpaths[0] == NULL) { + goto errout; + } + } + + if (attach_stdout) { + vsockpaths[1] = create_single_vsockpath(sandbox_id, cid); + if (vsockpaths[1] == NULL) { + goto errout; + } + } + + if (attach_stderr) { + vsockpaths[2] = create_single_vsockpath(sandbox_id, cid); + if (vsockpaths[2] == NULL) { + goto errout; + } + } + + ret = 0; +errout: + if (ret != 0) { + delete_daemon_vsockpaths(sandbox_id, (const char **)vsockpaths); + free(vsockpaths[0]); + free(vsockpaths[1]); + free(vsockpaths[2]); + vsockpaths[0] = NULL; + vsockpaths[1] = NULL; + vsockpaths[2] = NULL; + } + + return ret; +} + +static void delete_single_vsockpath(const char *sandbox_id, const char *vsockpath) +{ + uint32_t cid; + uint32_t port; + + if (vsockpath == NULL) { + return; + } + if (!parse_vsock_path(vsockpath, &cid, &port)) { + ERROR("Failed to parse vsock path, %s", vsockpath); + return; + } + release_vsock_port_for_sandbox(sandbox_id, port); +} + +void delete_daemon_vsockpaths(const char *sandbox_id, const char *vsockpaths[]) +{ + if (sandbox_id == NULL || vsockpaths == NULL) { + return; + } + if (vsockpaths[0] != NULL) { + delete_single_vsockpath(sandbox_id, vsockpaths[0]); + } + if (vsockpaths[1] != NULL) { + delete_single_vsockpath(sandbox_id, vsockpaths[1]); + } + if (vsockpaths[2] != NULL) { + delete_single_vsockpath(sandbox_id, vsockpaths[2]); + } +} + +enum IOFlowType{ + IO_SRC = 0, + IO_DST, + IO_FLOW_INVALID, +}; + +static ssize_t WriteToFIFO(void *context, const void *data, size_t len) +{ + ssize_t ret; + int fd; + + fd = *(int *)context; + ret = util_write_nointr_in_total(fd, static_cast(data), len); + if ((ret < 0) || (size_t)ret != len) { + SYSERROR("Failed to write %d", fd); + return -1; + } + return ret; +} + +static ssize_t WriteToFd(void *context, const void *data, size_t len) +{ + ssize_t ret; + + ret = util_write_nointr(*(int *)context, static_cast(data), len); + if (ret < 0 || (size_t)ret != len) { + SYSERROR("Failed to write"); + return -1; + } + return ret; +} + +class IOEntry { +public: + IOEntry() + { + m_initialized = false; + m_fd = -1; + m_flags = 0; + m_flowType = IO_FLOW_INVALID; + } + + virtual ~IOEntry() = default; + + virtual int Init() = 0; + + bool Initialized() const + { + return m_initialized; + } + + virtual int GetFd() + { + if (!Initialized()) { + return -1; + } + return m_fd; + } + + virtual struct io_write_wrapper *GetWriter() + { + if (!Initialized()) { + return NULL; + } + if (m_flowType == IO_SRC) { + return NULL; + } + return &m_writer; + } + virtual std::string ToString() = 0; +protected: + int m_flags; + bool m_initialized; + int m_fd; + struct io_write_wrapper m_writer; + IOFlowType m_flowType; +}; + + + +class IOFdEntry : public IOEntry { +public: + IOFdEntry(int fd, IOFlowType flowType): IOEntry() + { + m_fd = fd; + m_flowType = flowType; + } + + ~IOFdEntry() override + { + if (m_initialized && m_fd >= 0) { + close(m_fd); + m_fd = -1; + } + } + + int Init() override + { + if (m_initialized) { + return 0; + } + if (m_flowType == IO_DST) { + m_writer.context = &m_fd; + m_writer.write_func = WriteToFd; + } + m_initialized = true; + return 0; + } + + std::string ToString() override + { + return "file descriptor " + std::to_string(m_fd); + } +}; + +class IOFifoEntry : public IOEntry { +public: + IOFifoEntry(const char *path, int flags, IOFlowType flowType): IOEntry() + { + m_fifoPath = path; + m_flags = flags; + m_flowType = flowType; + } + + ~IOFifoEntry() override + { + if (m_initialized && m_fd >= 0) { + console_fifo_close(m_fd); + m_fd = -1; + } + } + + int Init() override + { + if (m_initialized) { + return 0; + } + + if (m_flowType == IO_SRC) { + if (console_fifo_open(m_fifoPath.c_str(), &m_fd, m_flags) != 0) { + ERROR("Failed to open fifo, %s", m_fifoPath.c_str()); + return -1; + } + } else { + if (console_fifo_open_withlock(m_fifoPath.c_str(), &m_fd, m_flags)) { + ERROR("Failed to open console fifo."); + return -1; + } + m_writer.context = &m_fd; + m_writer.write_func = WriteToFIFO; + } + m_initialized = true; + return 0; + } + + std::string ToString() override + { + return "FIFO " + m_fifoPath; + } +private: + std::string m_fifoPath; +}; + +class IOVsockEntry : public IOEntry { +public: + IOVsockEntry(const char *path, int flags, IOFlowType flowType): IOEntry() + { + m_vsockPath = path; + m_flags = flags; + m_flowType = flowType; + } + + ~IOVsockEntry() override + { + if (m_initialized && m_fd >= 0) { + close(m_fd); + m_fd = -1; + } + } + + int Init() override + { + if (m_initialized) { + return 0; + } + if (vsock_open(m_vsockPath.c_str(), &m_fd, m_flags) != 0) { + ERROR("Failed to open vsock, %s", m_vsockPath.c_str()); + return -1; + } + if (m_flowType != IO_SRC) { + m_writer.context = &m_fd; + m_writer.write_func = WriteToFd; + } + m_initialized = true; + return 0; + } + + std::string ToString() override + { + return "vsock " + m_vsockPath; + } +private: + std::string m_vsockPath; +}; + +class IOFuncEntry : public IOEntry { +public: + IOFuncEntry(struct io_write_wrapper *handler, IOFlowType flowType): IOEntry() + { + m_handler = handler; + m_flowType = flowType; + } + + ~IOFuncEntry() override + { + if (m_initialized && m_handler != NULL) { + if (m_handler->close_func != NULL) { + m_handler->close_func(m_handler->context, NULL); + } + m_handler = NULL; + } + } + + int Init() override + { + if (m_initialized) { + return 0; + } + if (m_flowType == IO_SRC) { + ERROR("IO func entry should not be used for stdin channel"); + return -1; + } + m_writer.context = m_handler->context; + m_writer.write_func = m_handler->write_func; + m_writer.close_func = m_handler->close_func; + m_initialized = true; + return 0; + } + + std::string ToString() override + { + return "IO func entry"; + } +private: + struct io_write_wrapper *m_handler; +}; + +/** + * IOCopy defines the copy relationship between two IO. + * It defines source IOEntry to read data from, destination IOEntry to write data to, + * and the transfer channel type. + */ +class IOCopy { +public: + IOCopy(std::unique_ptr src, std::unique_ptr dst, transfer_channel_type channel) + { + m_src = std::move(src); + m_dst = std::move(dst); + m_channel = channel; + } + ~IOCopy() = default; + IOEntry &GetSrc() + { + return *m_src; + } + IOEntry &GetDst() + { + return *m_dst; + } + transfer_channel_type GetChannel() + { + return m_channel; + } +private: + std::unique_ptr m_src; + std::unique_ptr m_dst; + transfer_channel_type m_channel; +}; + +class IOCopyCollection { +public: + IOCopyCollection() = default; + ~IOCopyCollection() = default; + void AddIOCopy(std::unique_ptr src, std::unique_ptr dst, transfer_channel_type channel) + { + m_copies.push_back(std::unique_ptr(new IOCopy(std::move(src), std::move(dst), channel))); + } + + int Init() + { + for (auto © : m_copies) { + if (copy->GetSrc().Init() != 0) { + ERROR("Failed to init src IO, %s", copy->GetSrc().ToString().c_str()); + return -1; + } + if (copy->GetDst().Init() != 0) { + ERROR("Failed to init dst IO, %s", copy->GetDst().ToString().c_str()); + return -1; + } + } + return 0; + } + + size_t Size() + { + return m_copies.size(); + } + + int *GetSrcFds() + { + size_t len = m_copies.size(); + int *fds = new int[len]; + for (size_t i = 0; i < len; i++) { + int fd = m_copies[i]->GetSrc().GetFd(); + if (fd < 0) { + ERROR("Invalid fd: %s", m_copies[i]->GetSrc().ToString().c_str()); + delete[] fds; + return NULL; + } + fds[i] = m_copies[i]->GetSrc().GetFd(); + } + return fds; + } + + struct io_write_wrapper *GetDstWriters() + { + size_t len = m_copies.size(); + struct io_write_wrapper *writers = new struct io_write_wrapper[len]; + for (size_t i = 0; i < len; i++) { + struct io_write_wrapper *writer = m_copies[i]->GetDst().GetWriter(); + if (writer == NULL) { + ERROR("Invalid writer: %s", m_copies[i]->GetDst().ToString().c_str()); + delete[] writers; + return NULL; + } + writers[i] = *writer; + } + return writers; + } + + transfer_channel_type *GetChannels() + { + size_t len = m_copies.size(); + transfer_channel_type *channels = new transfer_channel_type[len]; + for (size_t i = 0; i < len; i++) { + channels[i] = m_copies[i]->GetChannel(); + } + return channels; + } + +private: + std::vector> m_copies; +}; + +/** + * IO Copy module basically connect two IO together, and copy data from one to another. + * For the IO between iSula/Websocket and iSulad, there are two forms: + * 1. FIFO: iSula/Websocket will create three fifo files for communication with iSulad. + * 2. FD and Callback: iSula/Websocket will use fd for input Channel with iSulad, + * and use callback for output and error Channel. + * The IO between iSulad and container could be different types, such as FIFO, VSOCK. + -------------------------------------------------------------------------------------- + | CHANNEL | iSula/Websocket iSulad container| + -------------------------------------------------------------------------------------- + | | fifoin | stdin_fd vsocks[0] | + | IN | RDWR --------> RD RDWR --------> RD | + -------------------------------------------------------------------------------------- + | | fifoout | stdout_handler vsocks[1] | + | OUT | RD <-------- WR RD <-------- WR | + -------------------------------------------------------------------------------------- + | | fifoerr stderr_handler vsocks[2] | + | ERR | RD <-------- WR RD <-------- WR | + -------------------------------------------------------------------------------------- +*/ +static void PrepareIOCopyCollection(const char *fifoin, const char *fifoout, const char *fifoerr, + int stdin_fd, struct io_write_wrapper *stdout_handler, struct io_write_wrapper *stderr_handler, + const char *vsocks[], IOCopyCollection &ioCollection) +{ + if (fifoin != NULL) { + ioCollection.AddIOCopy(std::unique_ptr(new IOFifoEntry(fifoin, O_RDONLY | O_NONBLOCK, IO_SRC)), + std::unique_ptr(new IOVsockEntry(vsocks[0], O_WRONLY | O_NONBLOCK, IO_DST)), STDIN_CHANNEL); + } + if (fifoout != NULL) { + ioCollection.AddIOCopy(std::unique_ptr(new IOVsockEntry(vsocks[1], O_RDONLY | O_NONBLOCK, IO_SRC)), + std::unique_ptr(new IOFifoEntry(fifoout, O_WRONLY | O_NONBLOCK, IO_DST)), STDOUT_CHANNEL); + } + if (fifoerr != NULL) { + ioCollection.AddIOCopy(std::unique_ptr(new IOVsockEntry(vsocks[2], O_RDONLY | O_NONBLOCK, IO_SRC)), + std::unique_ptr(new IOFifoEntry(fifoerr, O_WRONLY | O_NONBLOCK, IO_DST)), STDERR_CHANNEL); + } + if (stdin_fd >= 0) { + ioCollection.AddIOCopy(std::unique_ptr(new IOFdEntry(stdin_fd, IO_SRC)), + std::unique_ptr(new IOVsockEntry(vsocks[0], O_WRONLY | O_NONBLOCK, IO_DST)), STDIN_CHANNEL); + } + if (stdout_handler != NULL) { + ioCollection.AddIOCopy(std::unique_ptr(new IOVsockEntry(vsocks[1], O_RDONLY | O_NONBLOCK, IO_SRC)), + std::unique_ptr(new IOFuncEntry(stdout_handler, IO_DST)), STDOUT_CHANNEL); + } + if (stderr_handler != NULL) { + ioCollection.AddIOCopy(std::unique_ptr(new IOVsockEntry(vsocks[2], O_RDONLY | O_NONBLOCK, IO_SRC)), + std::unique_ptr(new IOFuncEntry(stderr_handler, IO_DST)), STDERR_CHANNEL); + } +} + +struct IOCopyThreadArgs { + IOCopyCollection ioCollection; + int sync_fd; + bool detach; + std::string exec_id; + IOCopyThreadArgs() = default; + ~IOCopyThreadArgs() = default; +}; + +static void *IOCopyThread(void *arg) +{ + if (arg == NULL) { + return NULL; + } + + std::unique_ptr threadArg((struct IOCopyThreadArgs *)arg); + + if (threadArg->detach) { + if (pthread_detach(pthread_self()) != 0) { + CRIT("Set thread detach fail"); + return NULL; + } + } + + std::string tname = "IoCopy"; + if (!threadArg->exec_id.empty()) { + // The name of the thread cannot be longer than 16 bytes, + // so just use the first 4 bytes of exec_id as thread name. + tname = "IoCopy-" + threadArg->exec_id.substr(0, 4); + } + + (void)prctl(PR_SET_NAME, tname.c_str()); + + if (threadArg->ioCollection.Init() != 0) { + ERROR("Failed to init IO copy collection"); + return NULL; + } + + size_t len = threadArg->ioCollection.Size(); + if (len == 0) { + ERROR("No IO copy to be done"); + return NULL; + } + + std::unique_ptr srcfds(threadArg->ioCollection.GetSrcFds()); + if (srcfds == NULL) { + ERROR("Failed to get src fds"); + return NULL; + } + + std::unique_ptr writers(threadArg->ioCollection.GetDstWriters()); + if (writers == NULL) { + ERROR("Failed to get dst writers"); + return NULL; + } + + std::unique_ptr channels(threadArg->ioCollection.GetChannels()); + if (channels == NULL) { + ERROR("Failed to get channels"); + return NULL; + } + + (void)console_loop_io_copy(threadArg->sync_fd, srcfds.get(), writers.get(), channels.get(), len); + return NULL; +} + +int start_vsock_io_copy(const char *exec_id, int sync_fd, bool detach, const char *fifoin, const char *fifoout, const char *fifoerr, + int stdin_fd, struct io_write_wrapper *stdout_handler, struct io_write_wrapper *stderr_handler, + const char *vsocks[], pthread_t *tid) +{ + if (sync_fd < 0 || vsocks == NULL || tid == NULL) { + ERROR("Invalid NULL arguments"); + return -1; + } + + struct IOCopyThreadArgs *args = new IOCopyThreadArgs(); + args->sync_fd = sync_fd; + args->detach = detach; + if (exec_id != NULL) { + args->exec_id = exec_id; + } + + PrepareIOCopyCollection(fifoin, fifoout, fifoerr, stdin_fd, stdout_handler, stderr_handler, vsocks, args->ioCollection); + + int ret = pthread_create(tid, NULL, IOCopyThread, (void *)args); + if (ret != 0) { + CRIT("Thread creation failed"); + delete args; + } + + return ret; +} diff --git a/src/daemon/modules/service/vsock_io_handler.h b/src/daemon/modules/service/vsock_io_handler.h new file mode 100644 index 00000000..cc0c1dd0 --- /dev/null +++ b/src/daemon/modules/service/vsock_io_handler.h @@ -0,0 +1,46 @@ +/****************************************************************************** + * Copyright (c) Huawei Technologies Co., Ltd. 2023. All rights reserved. + * iSulad licensed under the Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR + * PURPOSE. + * See the Mulan PSL v2 for more details. + * Author: xuxuepeng + * Create: 2023-09-04 + * Description: provide vsock io functions + ********************************************************************************/ + +#ifndef DAEMON_MODULES_SERVICE_VSOCK_IO_H +#define DAEMON_MODULES_SERVICE_VSOCK_IO_H + +#include +#include + +#ifdef __cplusplus +extern "C" +{ +#endif + +bool is_vsock_path(const char *path); + +bool parse_vsock_path(const char *vsock_path, uint32_t *cid, uint32_t *port); + +int vsock_open(const char *vsock_path, int *fdout, int flags); + +int create_daemon_vsockpaths(const char *sandbox_id, uint32_t cid, bool attach_stdin, bool attach_stdout, + bool attach_stderr, char *vsockpaths[]); + +void delete_daemon_vsockpaths(const char *sandbox_id, const char *vsockpaths[]); + +int start_vsock_io_copy(const char *exec_id, int sync_fd, bool detach, const char *fifoin, const char *fifoout, const char *fifoerr, + int stdin_fd, struct io_write_wrapper *stdout_handler, struct io_write_wrapper *stderr_handler, + const char *vsocks[], pthread_t *tid); + +#ifdef __cplusplus +} +#endif + +#endif // DAEMON_MODULES_SERVICE_VSOCK_IO_H diff --git a/src/daemon/sandbox/sandbox.cc b/src/daemon/sandbox/sandbox.cc index 968dae24..94c2684a 100644 --- a/src/daemon/sandbox/sandbox.cc +++ b/src/daemon/sandbox/sandbox.cc @@ -43,6 +43,8 @@ namespace sandbox { const std::string SHM_MOUNT_POINT = "/dev/shm"; +const uint32_t VSOCK_START_PORT = 2000; +const uint32_t VSOCK_END_PORT = 65535; static int WriteDefaultSandboxHosts(const std::string &path, const std::string &hostname) { @@ -1014,6 +1016,25 @@ void Sandbox::SetNetworkSettings(const std::string &settings, Errors &error) } } +auto Sandbox::FindAvailableVsockPort(uint32_t &port) -> bool +{ + std::unique_lock lock(m_vsockPortsMutex); + for (uint32_t i = VSOCK_START_PORT; i < VSOCK_END_PORT; i++) { + if (m_vsockPorts.find(i) == m_vsockPorts.end()) { + m_vsockPorts.insert(i); + port = i; + return true; + } + } + return false; +} + +void Sandbox::ReleaseVsockPort(uint32_t port) +{ + std::unique_lock lock(m_vsockPortsMutex); + m_vsockPorts.erase(port); +} + auto Sandbox::GetTaskAddress() const -> const std::string & { return m_taskAddress; diff --git a/src/daemon/sandbox/sandbox.h b/src/daemon/sandbox/sandbox.h index 6ae2750f..0f135e70 100644 --- a/src/daemon/sandbox/sandbox.h +++ b/src/daemon/sandbox/sandbox.h @@ -117,6 +117,8 @@ public: auto UpdateStatsInfo(const StatsInfo &info) -> StatsInfo; void SetNetworkReady(bool ready); void SetNetworkMode(const std::string &networkMode); + auto FindAvailableVsockPort(uint32_t &port) -> bool; + void ReleaseVsockPort(uint32_t port); auto CleanupSandboxFiles(Errors &error) -> bool; // Save to file @@ -205,6 +207,10 @@ private: // it should select accroding to the config std::shared_ptr m_controller { nullptr }; + + // vsock ports + std::mutex m_vsockPortsMutex; + std::set m_vsockPorts; }; } // namespace sandbox -- 2.40.1