Doge log

Abby CTO 雑賀 力王のオフィシャルサイトです

libmemcachedのなか

ちょっと使えるかどうか調べた時に思ったこと。

libmemcached/connect.c
  /* libmemcached will always use nonblocking IO to avoid write deadlocks */
  int flags;

  do
    flags= fcntl(ptr->fd, F_GETFL, 0);
  while (flags == -1 && (errno == EINTR || errno == EAGAIN));

  unlikely (flags == -1)
  {
    return MEMCACHED_CONNECTION_FAILURE;
  }
  else if ((flags & O_NONBLOCK) == 0)
  {
    int rval;

    do
      rval= fcntl(ptr->fd, F_SETFL, flags | O_NONBLOCK);
    while (rval == -1 && (errno == EINTR || errno == EAGAIN));

ふむ、デフォルトででnonblockingを使うのか。
ptr->root->flags.no_blockの初期値も1みたい。
getをのぞく。

libmemcached/get.c
char *memcached_get_by_key(memcached_st *ptr,
                           const char *master_key,
                           size_t master_key_length,
                           const char *key, size_t key_length,
                           size_t *value_length,
                           uint32_t *flags,
                           memcached_return_t *error)
{
  char *value;
  size_t dummy_length;
  uint32_t dummy_flags;
  memcached_return_t dummy_error;

  unlikely (ptr->flags.use_udp)
  {
    *error= MEMCACHED_NOT_SUPPORTED;
    return NULL;
  }

  /* Request the key */
  *error= memcached_mget_by_key_real(ptr, master_key, master_key_length,
                                     (const char * const *)&key,
                                     &key_length, 1, false);

  value= memcached_fetch(ptr, NULL, NULL,
                         value_length, flags, error);

get処理の一部抜粋。

  1. Requestを投げる
  2. データを取得

の2ステップ。

fetchの中で待ち合わせしてるんだろうと想像できる。
まあいろいろあるけど途中をすっ飛ばしてioをみる。
データの読み書きはioのAPI経由で行われる。

libmemcached/io.c
memcached_return_t memcached_io_read(memcached_server_write_instance_st ptr,
                                     void *buffer, size_t length, ssize_t *nread)
{
  char *buffer_ptr;

  buffer_ptr= buffer;

  while (length)
  {
    if (!ptr->read_buffer_length)
    {
      ssize_t data_read;

      while (1)
      {
        data_read= read(ptr->fd, ptr->read_buffer, MEMCACHED_MAX_BUFFER);
        if (data_read > 0)
        {
          break;
        }
        else if (data_read == -1)
        {
          ptr->cached_errno= errno;
          memcached_return_t rc= MEMCACHED_ERRNO;
          switch (errno)
          {
          case EAGAIN:
          case EINTR:
#ifdef TARGET_OS_LINUX
          case ERESTART:
#endif
            if ((rc= io_wait(ptr, MEM_READ)) == MEMCACHED_SUCCESS)
              continue;
            /* fall through */

          default:
            {
              memcached_quit_server(ptr, true);
              *nread= -1;
              return rc;
            }
          }
        }

一回でうまくいけばそのまま。
EAGAINなどnonblockingで読めない場合はio_waitをかまして待ち合わせを行ってるようだ。
待ち合わせなのか。
io_waitはどーなってるのか。

libmemcached/io.c
#include "common.h"
#include <sys/select.h>
#include <poll.h>

typedef enum {
  MEM_READ,
  MEM_WRITE
} memc_read_or_write;

static ssize_t io_flush(memcached_server_write_instance_st ptr,
                        memcached_return_t *error);
static void increment_udp_message_id(memcached_server_write_instance_st ptr);

static memcached_return_t io_wait(memcached_server_write_instance_st ptr,
                                  memc_read_or_write read_or_write)
{
  struct pollfd fds= {
    .fd= ptr->fd,
    .events = POLLIN
  };
  int error;

  if (read_or_write == MEM_WRITE) /* write */
  {
    fds.events= POLLOUT;
    WATCHPOINT_SET(ptr->io_wait_count.write++);
  }
  else
  {
    WATCHPOINT_SET(ptr->io_wait_count.read++);
  }

  /*
   ** We are going to block on write, but at least on Solaris we might block
   ** on write if we haven't read anything from our input buffer..
   ** Try to purge the input buffer if we don't do any flow control in the
   ** application layer (just sending a lot of data etc)
   ** The test is moved down in the purge function to avoid duplication of
   ** the test.
 */
  if (read_or_write == MEM_WRITE)
  {
    memcached_return_t rc= memcached_purge(ptr);
    if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_STORED)
      return MEMCACHED_FAILURE;
  }

  int timeout= ptr->root->poll_timeout;
  if (ptr->root->flags.no_block == false)
    timeout= -1;

  size_t loop_max= 5;
  while (--loop_max)
  {
    error= poll(&fds, 1, timeout);

    switch (error)
    {
    case 1:
      WATCHPOINT_IF_LABELED_NUMBER(read_or_write && loop_max < 4, "read() times we had to loop, decremented down from 5", loop_max);
      WATCHPOINT_IF_LABELED_NUMBER(!read_or_write && loop_max < 4, "write() times we had to loop, decremented down from 5", loop_max);

      return MEMCACHED_SUCCESS;
    case 0:
      return MEMCACHED_TIMEOUT;
    default:
      WATCHPOINT_ERRNO(errno);
      {
        switch (errno)
        {
#ifdef TARGET_OS_LINUX
        case ERESTART:
#endif
        case EINTR:
          continue;
        default:
          ptr->cached_errno= error;
          memcached_quit_server(ptr, true);

          return MEMCACHED_FAILURE;
        }
      }
    }
  }

  if (loop_max == 0 && error == 0)
    return MEMCACHED_TIMEOUT;

ふむ、pollなのか。
ディスクリプタは1つなので別にいいっちゃあいいんだろうけど。

非同期なのに待ち合わせなのか、うーむ。
クライアントなので待ち合わせてもいいかって事なのかな?これ。
よくわかんねーな。。。