首页 > 编程 > Java > 正文

java实现memcache服务器的示例代码

2019-11-26 10:06:12
字体:
来源:转载
供稿:网友

什么是Memcache?

Memcache集群环境下缓存解决方案

Memcache是一个高性能的分布式的内存对象缓存系统,通过在内存里维护一个统一的巨大的hash表,它能够用来存储各种格式的数据,包括图像、视频、文件以及数据库检索的结果等。简单的说就是将数据调用到内存中,然后从内存中读取,从而大大提高读取速度。  

 Memcache是danga的一个项目,最早是LiveJournal 服务的,最初为了加速 LiveJournal 访问速度而开发的,后来被很多大型的网站采用。  

Memcached是以守护程序方式运行于一个或多个服务器中,随时会接收客户端的连接和操作

为什么会有Memcache和memcached两种名称?

其实Memcache是这个项目的名称,而memcached是它服务器端的主程序文件名,知道我的意思了吧。一个是项目名称,一个是主程序文件名,在网上看到了很多人不明白,于是混用了。 

Memcached是高性能的,分布式的内存对象缓存系统,用于在动态应用中减少数据库负载,提升访问速度。Memcached由Danga Interactive开发,用于提升LiveJournal.com访问速度的。LJ每秒动态页面访问量几千次,用户700万。Memcached将数据库负载大幅度降低,更好的分配资源,更快速访问。

这篇文章将会涉及以下内容:

  1. Java Socket多线程服务器
  2. Java IO
  3. Concurrency
  4. Memcache特性和协议

Memcache

Memcache is an in-memory key-value store for small chunks of arbitrary data (strings, objects) from results of databasecalls, API calls, or page rendering.

即内存缓存数据库,是一个键值对数据库。该数据库的存在是为了将从其他服务中获取的数据暂存在内存中,在重复访问时可以直接从命中的缓存中返回。既加快了访问速率,也减少了其他服务的负载。这里将实现一个单服务器版本的Memcache,并且支持多个客户端的同时连接。

客户端将与服务器建立telnet连接,然后按照Memcache协议与服务器缓存进行交互。这里实现的指令为get,set和del。先来看一下各个指令的格式

set

set属于存储指令,存储指令的特点时,第一行输入基本信息,第二行输入其对应的value值。

set <key> <flags> <exptime> <bytes> [noreply]/r/n
<value>/r/n

如果存储成功,将会返回STORED,如果指令中包含noreply属性,则服务器将不会返回信息。

该指令中每个域的内容如下:

  1. key: 键
  2. flags: 16位无符号整数,会在get时随键值对返回
  3. exptime: 过期时间,以秒为单位
  4. bytes:即将发送的value的长度
  5. noreply:是否需要服务器响应,为可选属性

如果指令不符合标准,服务器将会返回ERROR。

get

get属于获取指令,该指令特点如下:

get <key>*/r/n

它支持传入多个key的值,如果缓存命中了一个或者多个key,则会返回相应的数据,并以END作为结尾。如果没有命中,则返回的消息中不包含该key对应的值。格式如下:

VALUE <key> <flags> <bytes>/r/n<data block>/r/nVALUE <key> <flags> <bytes>/r/n<data block>/r/nENDdel

删除指令,该指令格式如下:

del <key> [noreply]/r/n

如果删除成功,则返回DELETED/r/n,否则返回NOT_FOUND。如果有noreply参数,则服务器不会返回响应。

JAVA SOCKET

JAVA SOCKET需要了解的只是包括TCP协议,套接字,以及IO流。这里就不详细赘述,可以参考我的这系列文章,也建议去阅读JAVA Network Programming。一书。

代码实现

这里贴图功能出了点问题,可以去文末我的项目地址查看类图。

这里采用了指令模式和工厂模式实现指令的创建和执行的解耦。指令工厂将会接收commandLine并且返回一个Command实例。每一个Command都拥有execute方法用来执行各自独特的操作。这里只贴上del指令的特殊实现。

 /** * 各种指令 * 目前支持get,set,delete * * 以及自定义的 * error,end */public interface Command {  /**   * 执行指令   * @param reader   * @param writer   */  void execute(Reader reader, Writer writer);  /**   * 获取指令的类型   * @return   */  CommandType getType();}
/** * 指令工厂 单一实例 */public class CommandFactory {  private static CommandFactory commandFactory;  private static Cache<Item> memcache;  private CommandFactory(){}  public static CommandFactory getInstance(Cache<Item> cache) {    if (commandFactory == null) {      commandFactory = new CommandFactory();      memcache = cache;    }    return commandFactory;  }  /**   * 根据指令的类型获取Command   * @param commandLine   * @return   */  public Command getCommand(String commandLine){    if (commandLine.matches("^set .*$")){      return new SetCommand(commandLine, memcache);    }else if (commandLine.matches("^get .*$")){      return new GetCommand(commandLine, memcache);    }else if (commandLine.matches("^del .*$")){      return new DeleteCommand(commandLine, memcache);    }else if (commandLine.matches("^end$")){      return new EndCommand(commandLine);    }else{      return new ErrorCommand(commandLine, ErrorCommand.ErrorType.ERROR);    }  }}
/** * 删除缓存指令 */public class DeleteCommand implements Command{  private final String command;  private final Cache<Item> cache;  private String key;  private boolean noReply;  public DeleteCommand(final String command, final Cache<Item> cache){    this.command = command;    this.cache = cache;    initCommand();  }  private void initCommand(){    if (this.command.contains("noreply")){      noReply = true;    }    String[] info = command.split(" ");    key = info[1];  }  @Override  public void execute(Reader reader, Writer writer) {    BufferedWriter bfw = (BufferedWriter) writer;    Item item = cache.delete(key);    if (!noReply){      try {        if (item == null){          bfw.write("NOT_FOUND/r/n");        }else {          bfw.write("DELETED/r/n");        }        bfw.flush();      } catch (IOException e) {        try {          bfw.write("ERROR/r/n");          bfw.flush();        } catch (IOException e1) {          e1.printStackTrace();        }        e.printStackTrace();      }    }  }  @Override  public CommandType getType() {    return CommandType.SEARCH;  }}

然后是实现内存服务器,为了支持先进先出功能,这里使用了LinkedTreeMap作为底层实现,并且重写了removeOldest方法。同时还使用CacheManager的后台线程及时清除过期的缓存条目。

public class Memcache implements Cache<Item>{  private Logger logger = Logger.getLogger(Memcache.class.getName());  //利用LinkedHashMap实现LRU  private static LinkedHashMap<String, Item> cache;  private final int maxSize;  //负载因子  private final float DEFAULT_LOAD_FACTOR = 0.75f;  public Memcache(final int maxSize){    this.maxSize = maxSize;    //确保cache不会在达到maxSize之后自动扩容    int capacity = (int) Math.ceil(maxSize / DEFAULT_LOAD_FACTOR) + 1;    this.cache = new LinkedHashMap<String, Item>(capacity, DEFAULT_LOAD_FACTOR, true){      @Override      protected boolean removeEldestEntry(Map.Entry<String,Item> eldest) {        if (size() > maxSize){          logger.info("缓存数量已经达到上限,会删除最近最少使用的条目");        }        return size() > maxSize;      }    };    //实现同步访问    Collections.synchronizedMap(cache);  }  public synchronized boolean isFull(){    return cache.size() >= maxSize;  }  @Override  public Item get(String key) {    Item item = cache.get(key);    if (item == null){      logger.info("缓存中key:" + key + "不存在");      return null;    }else if(item!=null && item.isExpired()){ //如果缓存过期则删除并返回null      logger.info("从缓存中读取key:" + key + " value:" + item.getValue() + "已经失效");      cache.remove(key);      return null;    }    logger.info("从缓存中读取key:" + key + " value:" + item.getValue() + " 剩余有效时间" + item.remainTime());    return item;  }  @Override  public void set(String key, Item value) {    logger.info("向缓存中写入key:" + key + " value:" + value);    cache.put(key, value);  }  @Override  public Item delete(String key) {    logger.info("从缓存中删除key:" + key);    return cache.remove(key);  }  @Override  public int size(){    return cache.size();  }  @Override  public int capacity() {    return maxSize;  }  @Override  public Iterator<Map.Entry<String, Item>> iterator() {    return cache.entrySet().iterator();  }}
/** * 缓存管理器 * 后台线程 * 将cache中过期的缓存删除 */public class CacheManager implements Runnable {  private Logger logger = Logger.getLogger(CacheManager.class.getName());  //缓存  public Cache<Item> cache;  public CacheManager(Cache<Item> cache){    this.cache = cache;  }  @Override  public void run() {    while (true){      Iterator<Map.Entry<String, Item>> itemIterator = cache.iterator();      while (itemIterator.hasNext()){        Map.Entry<String, Item> entry = itemIterator.next();        Item item = entry.getValue();        if(item.isExpired()){          logger.info("key:" + entry.getKey() + " value" + item.getValue() + " 已经过期,从数据库中删除");          itemIterator.remove();        }      }      try {        //每隔5秒钟再运行该后台程序        TimeUnit.SECONDS.sleep(5);      } catch (InterruptedException e) {        e.printStackTrace();      }    }  }}

最后是实现一个多线程的Socket服务器,这里就是将ServerSocket绑定到一个接口,并且将accept到的Socket交给额外的线程处理。

/** * 服务器 */public class IOServer implements Server {  private boolean stop;  //端口号  private final int port;  //服务器线程  private ServerSocket serverSocket;  private final Logger logger = Logger.getLogger(IOServer.class.getName());  //线程池,线程容量为maxConnection  private final ExecutorService executorService;  private final Cache<Item> cache;  public IOServer(int port, int maxConnection, Cache<Item> cache){    if (maxConnection<=0) throw new IllegalArgumentException("支持的最大连接数量必须为正整数");    this.port = port;    executorService = Executors.newFixedThreadPool(maxConnection);    this.cache = cache;  }  @Override  public void start() {    try {      serverSocket = new ServerSocket(port);      logger.info("服务器在端口"+port+"上启动");      while (true){        try {          Socket socket = serverSocket.accept();          logger.info("收到"+socket.getLocalAddress()+"的连接");          executorService.submit(new SocketHandler(socket, cache));        } catch (IOException e) {          e.printStackTrace();        }      }    } catch (IOException e) {      logger.log(Level.WARNING, "服务器即将关闭...");      e.printStackTrace();    } finally {      executorService.shutdown();      shutDown();    }  }  /**   * 服务器是否仍在运行   * @return   */  public boolean isRunning() {    return !serverSocket.isClosed();  }  /**   * 停止服务器   */  public void shutDown(){    try {      if (serverSocket!=null){        serverSocket.close();      }    } catch (IOException e) {      e.printStackTrace();    }  }}
/** * 处理各个客户端的连接 * 在获得end指令后关闭连接s */public class SocketHandler implements Runnable{  private static Logger logger = Logger.getLogger(SocketHandler.class.getName());  private final Socket socket;  private final Cache<Item> cache;  private boolean finish;  public SocketHandler(Socket s, Cache<Item> cache){    this.socket = s;    this.cache = cache;  }  @Override  public void run() {    try {      //获取socket输入流      final BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));      //获取socket输出流      final BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));      CommandFactory commandFactory = CommandFactory.getInstance(cache);      while (!finish){        final String commandLine = reader.readLine();        logger.info("ip:" + socket.getLocalAddress() + " 指令:" + commandLine);        if (commandLine == null || commandLine.trim().isEmpty()) {          continue;        }        //使用指令工厂获取指令实例        final Command command = commandFactory.getCommand(commandLine);        command.execute(reader, writer);        if (command.getType() == CommandType.END){          logger.info("请求关闭连接");          finish = true;        }      }    } catch (IOException e) {      e.printStackTrace();      logger.info("关闭来自" + socket.getLocalAddress() + "的连接");    } finally {      try {        if (socket != null){          socket.close();        }      } catch (IOException e) {        e.printStackTrace();      }    }  }}

项目地址请戳这里,如果觉得还不错的话,希望能给个星哈><

参考资料

memcached官网
memcache协议

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持武林网。

发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表