Source code for markovchain.storage.sqlite

import json
import sqlite3
from collections import deque
from itertools import chain, repeat, islice

from .base import Storage


[docs]class SqliteStorage(Storage): """SQLite storage. Attributes ---------- db : `sqlite3.Connection` Database connection. cursor Database cursor. """
[docs] def __init__(self, db=':memory:', settings=None): """SQLite storage constructor. Parameters ---------- db : `str` or `sqlite3.Connection`, optional Database path or connection (default: ':memory:'). settings: `dict`, optional """ super().__init__(settings) if isinstance(db, str): db = sqlite3.connect(db, isolation_level='IMMEDIATE') self.db = db self.cursor = db.cursor() self.create_tables() self.cursor.execute('SELECT key, id FROM datasets') self.datasets = dict(self.cursor.fetchall())
def __eq__(self, markov): raise NotImplementedError() #return super().__eq__(markov)
[docs] def replace_state_separator(self, old_separator, new_separator): self.cursor.execute( 'UPDATE nodes SET value = replace(value, ?, ?)', (old_separator, new_separator) )
[docs] def get_dataset(self, key, create=False): try: return self.datasets[key] except KeyError: if not create: raise self.cursor.execute( 'INSERT INTO datasets (key) VALUES (?)', (key,) ) ret = self.cursor.lastrowid self.datasets[key] = ret return ret
[docs] def get_state(self, state, size): state = deque(chain(repeat('', size), state), maxlen=size) self.cursor.execute( 'SELECT id FROM nodes WHERE value=?', (self.join_state(state),) ) state = self.cursor.fetchone() if state is None: return None return state[0]
[docs] def get_states(self, dataset, string): dataset = self.get_dataset(dataset) self.cursor.execute( 'SELECT DISTINCT nodes.value' ' FROM nodes' ' INNER JOIN links ON links.source = nodes.id AND links.dataset = ?' ' WHERE nodes.value LIKE ?', (dataset, '%%%s%%' % string) ) ret = self.cursor.fetchall() for i, row in enumerate(ret): ret[i] = row[0] return ret
[docs] def get_tables(self): """Get all table names. Returns ------- `set` of `str` """ self.cursor.execute( 'SELECT name FROM sqlite_master WHERE type="table"' ) return set(x[0] for x in self.cursor.fetchall())
[docs] def get_node(self, value): """Get node ID by value. If a node with the specified value does not exist, create it and return its ID. Parameters ---------- value : `str` Node value. Returns ------- `int` Node ID. """ while True: self.cursor.execute( 'SELECT id FROM nodes WHERE value=?', (value,) ) node = self.cursor.fetchone() if node is not None: return node[0] self.cursor.execute( 'INSERT INTO nodes (value) VALUES (?)', (value,) )
[docs] def update_main_table(self): """Write generator settings to database. """ data = (json.dumps(self.settings),) self.cursor.execute('SELECT * FROM main') if self.cursor.fetchall() == []: self.cursor.execute('INSERT INTO main (settings) VALUES (?)', data) else: self.cursor.execute('UPDATE main SET settings=?', data)
[docs] def create_tables(self): """Create tables if they don't exist. """ self.cursor.execute('PRAGMA foreign_keys=1') self.cursor.execute(''' CREATE TABLE IF NOT EXISTS main ( settings TEXT NOT NULL DEFAULT "{}" ) ''') self.cursor.execute(''' CREATE TABLE IF NOT EXISTS datasets ( id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, key TEXT NOT NULL ) ''') self.cursor.execute(''' CREATE TABLE IF NOT EXISTS nodes ( id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, value TEXT NOT NULL ) ''') self.cursor.execute(''' CREATE TABLE IF NOT EXISTS links ( dataset REFERENCES datasets (id), source REFERENCES nodes (id), target REFERENCES nodes (id), value TEXT, bvalue TEXT, count INTEGER NOT NULL DEFAULT 1 ) ''') self.cursor.execute( 'CREATE UNIQUE INDEX IF NOT EXISTS node ON nodes (value)' ) self.cursor.execute( 'CREATE INDEX IF NOT EXISTS link_source ON links (source, dataset)' ) self.cursor.execute( 'CREATE INDEX IF NOT EXISTS link_target ON links (target, dataset)' )
[docs] def do_save(self, fp=None): """Save. Parameters ---------- fp : `None`, optional """ if fp is not None: raise NotImplementedError() self.update_main_table() self.db.commit()
[docs] def close(self): self.cursor.close() self.db.close() self.cursor = None self.db = None
[docs] @classmethod def load(cls, fp): if not isinstance(fp, sqlite3.Connection): if not isinstance(fp, str): fp.close() fp = fp.name fp = sqlite3.connect(fp, isolation_level='IMMEDIATE') cursor = fp.cursor() try: cursor.execute('SELECT settings FROM main') settings = cursor.fetchone() if settings is not None: settings = json.loads(settings[0]) except sqlite3.OperationalError: settings = None return cls(fp, settings)