Squongo is a ruby gem encapsulating WAL mode sqlite3 databases to provide an alternative to large document stores utilizing the JSON1 extension. It’s migrationless and provides the user with the ability to have many readers without locking the database while providing a save method that will simply enque on the writer process. It was born out of a need in a personal project and decided to extract it for public release. It’s not my best code I’ve ever written and it still needs comprehensive RSpecs but it’s worth publishing in my opinion.

Source Code

Squongo Ruby Gem

Usage

require 'squongo'

class Post < Squongo::Document
  TABLE = 'posts'
end

This will give the Post class all the document methods that make saving and querying squongo documents possible. Methods such as savefind(id)find_by(set)firstlast, and all.

Squongo.connect('cms.db')

The connect method opens the database for usage in the current process. If you use squongo in a multi-process application you’ll need to call reconnect after forking.

Squongo.start_writer

Will be ideally called right after connecting. It forks off the writer process that handles all writes to the database taking advantage of WAL mode.

article = Post.new data: { title: 'Example Post' }
article.save
puts article.id

Post.find(43)
Post.find_by title: 'Example Post'

Implementation

connection.rb

The connection implementation is rather simple but there’s a few comments to provide some context.

# frozen_string_literal: true

require 'sqlite3'

class Squongo::Connection
  # The query that ensures WAL mode journaling
  WAL_QUERY = 'PRAGMA journal_mode=WAL;'

  attr_reader :db, :path

  def initialize(path)
    @path = path
    @db = SQLite3::Database.open(path)
    ensure_mode
  end

  def reconnect
    @db = SQLite3::Database.open(path)
  end

  def self.connect(path)
    new(path)
  end

  def ensure_mode
    # WAL mode persists after being set on a sqlite instance so we only need to
    # set the journal mode on the first connection or creation of the database.
    return unless first_connection

    wal_mode
  end

  def wal_mode
    @db.execute WAL_QUERY
  end

  def first_connection
    !File.exist?(path)
  end
end

document.rb

# frozen_string_literal: true

require 'date'

class Squongo::Document
  attr_accessor :data

  # These properties are only readable because they are handled by squongo
  # The only data any squongo document should care about is the json data
  # which prevents needing to worry about schema changes and migrations
  attr_reader :id, :created_at, :updated_at

  def initialize(data: {}, id: nil, created_at: nil, updated_at: nil)
    @data = data
    @id = id
    @created_at = created_at
    @updated_at = updated_at
  end

  def save
    # Decorates out to the save static method which returns an id from the
    # Writer process and reloads the class data from the new saved document.
    id = Squongo.save(id: id, table: table, data: data)
    document = self.class.find(id)

    @id = document.id
    @created_at = document.created_at
    @updated_at = document.updated_at
  end

  def self.from_row(fields)
    # SQLite3 returns data in unformatted rows which we turn in to a new
    # instance of our class
    id, data, created_at, updated_at = fields

    new(
      data: JSON.parse(data),
      id: id,
      created_at: DateTime.parse(created_at),
      updated_at: DateTime.parse(updated_at)
    )
  end

  def self.find(id)
    # Simple query to find by id
    query = "SELECT * FROM #{table} WHERE id = ?"
    Squongo.connection.db.execute(query, id).map { |row| from_row(row) }.first
  end

  def self.find_by(set)
    # This method finds a row based on a top level key/value comparison
    # A future version will support variable depth comparisons, but for now it
    # it just takes single key value pairs
    field = set.keys.first
    value = set.values.first
    query = "SELECT * FROM #{table} WHERE json_extract(data, ?) = ?"
    rows = Squongo.connection.db.execute(query, ["$.#{field}", value])
    documents = rows.map { |row| from_row(row) }

    return documents.first if documents.length == 1

    documents
  end

  def self.first
    Squongo.connection.db.execute("SELECT * FROM #{table} ORDER BY id LIMIT 1")
           .map { |row| from_row(row) }.first
  end

  def self.last
    Squongo.connection.db.execute("SELECT * FROM #{table} ORDER BY id DESC LIMIT 1")
           .map { |row| from_row(row) }.first
  end

  def self.all
    Squongo.connection.db.execute("SELECT * FROM #{table}")
           .map { |row| from_row(row) }
  end

  def table
    self.class.table
  end

  def self.table
    const_get :TABLE
  end
end

writer.rb

The writer class manages its process taking models queued for saving off the process i/o pipe and either updating the row if the model already exists or persisting the new model if it does not.

# frozen_string_literal: true

class Squongo::Writer
  attr_reader :reader, :writer, :response_writer

  def initialize(reader, writer, response_reader, response_writer, parent_pid)
    # The reader is used to take models off the queue from the main process
    # The response writer is used to return the successfull update or ID from
    # the new persisted model.
    @reader = reader
    @writer = writer

    @response_reader = response_reader
    @response_writer = response_writer

    @parent_pid = parent_pid
  end

  def start
    monitor_parent

    while packet = @reader.gets
      # Blocks on the .gets call and then decodes the packet from Base64 to keep
      # the process from reading an incomplete model if the JSON data included
      # a new line character.
      model_information = Squongo.ipc_decode(packet)

      id = model_information['id']
      table = model_information['table']
      data  = model_information['data']

      if id.nil?
        id = insert(table, data)
        respond(id)
      else
        update(id, table, data)
      end
    end
  end

  def update(id, table, data)
    Squongo.connection.db.execute(
      "UPDATE #{table} SET data = json(?), updated_at = ? WHERE id = ?",
      [data.to_json, Squongo.timestamp, id]
    )
  end

  def insert(table, data)
    timestamp = Squongo.timestamp

    Squongo.connection.db.execute(
      "INSERT INTO #{table} (data, created_at, updated_at) VALUES(?, ?, ?)",
      [data.to_json, timestamp, timestamp]
    )

    Squongo.connection.db.last_insert_row_id
  end

  def respond(id)
    response_writer.puts id
  end

  # Simple method that checks if the parent process is still alive
  def should_live
    Process.getpgid(@parent_pid)
    true
  rescue Errno::ESRCH
    false
  end

  # If the parent process is no longer alive and we haven't been killed
  # ourselves we should go ahead and exit
  def monitor_parent
    @monitor_thread = Thread.new do
      loop do
        exit unless should_live
        sleep 0.1
      end
    end
  end
end