Skip to content

Commit

Permalink
Merge branch 'acoustid:main' into schema-2024-05-13
Browse files Browse the repository at this point in the history
  • Loading branch information
sfussenegger authored May 17, 2024
2 parents ed5de27 + 288f409 commit c6ddc97
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 65 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python_version: [ '3.7', '3.8', '3.9', '3.10' ]
python_version: [ '3.8', '3.9', '3.10' ]
steps:
- uses: actions/checkout@v2
- name: Install Python
Expand Down
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,4 @@
/.mypy_cache/
/dist/
mbdata.egg-info/
settings.py
venv/
7 changes: 7 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
Version 28.0.0
==============

- Schema change 28.
- Use MusicBrainz replication packets v2.
- Windows support.

Version 27.1.0
==============

Expand Down
2 changes: 1 addition & 1 deletion mbslave/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (C) 2013 Lukas Lalinsky
# Distributed under the MIT license, see the LICENSE file for details.

__version__ = "27.1.0"
__version__ = "28.0.0"
96 changes: 39 additions & 57 deletions mbslave/replication.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ def load_tar(source: str, fileobj: BytesIO, db, config, ignored_schemas, ignored


def mbslave_import_main(config, args):
db = config.database.connect_db(superuser=True, set_search_path=False)
db = connect_db(config, superuser=True, set_search_path=True)

for source in args.sources:
with ExitStack() as exit_stack:
Expand Down Expand Up @@ -332,48 +332,10 @@ def after_insert(self, table, values):
pass


def parse_data_fields(s):
fields = {}
for name, value in re.findall(r'''"([^"]+)"=('(?:''|[^'])*')? ''', s):
if not value:
value = None
else:
value = value[1:-1].replace("''", "'").replace("\\\\", "\\")
fields[name] = value
return fields


def parse_bool(s):
return s == 't'


ESCAPES = (('\\b', '\b'), ('\\f', '\f'), ('\\n', '\n'), ('\\r', '\r'),
('\\t', '\t'), ('\\v', '\v'), ('\\\\', '\\'))


def unescape(s):
if s == '\\N':
return None
for orig, repl in ESCAPES:
s = s.replace(orig, repl)
return s


def read_psql_dump(fp, types):
for line in fp:
line = line.decode('utf8')
values = list(map(unescape, line.rstrip('\r\n').split('\t')))
for i, value in enumerate(values):
if value is not None:
values[i] = types[i](value)
yield values


class PacketImporter(object):

def __init__(self, db, config, ignored_schemas, ignored_tables, replication_seq, hook):
self._db = db
self._data = {}
self._transactions = {}
self._config = config
self._ignored_schemas = ignored_schemas
Expand All @@ -382,26 +344,46 @@ def __init__(self, db, config, ignored_schemas, ignored_tables, replication_seq,
self._replication_seq = replication_seq

def load_pending_data(self, fp):
dump = read_psql_dump(fp, [int, parse_bool, parse_data_fields])
for id, key, values in dump:
self._data[(id, key)] = values
cursor = self._db.cursor()
cursor.execute('TRUNCATE dbmirror2.pending_data')
cursor.copy_expert('COPY dbmirror2.pending_data FROM STDIN', fp)

def load_pending(self, fp):
dump = read_psql_dump(fp, [int, str, str, int])
for id, table, type, xid in dump:
schema, table = parse_name(self._config, table)
transaction = self._transactions.setdefault(xid, [])
transaction.append((id, schema, table, type))
def load_pending_keys(self, fp):
cursor = self._db.cursor()
cursor.execute('TRUNCATE dbmirror2.pending_keys')
cursor.copy_expert('COPY dbmirror2.pending_keys FROM STDIN', fp)

def process(self):
cursor = self._db.cursor()
stats = {}
self._hook.begin(self._replication_seq)
cursor.execute('''
SELECT pd.xid,
pd.seqid,
pd.tablename,
pd.op,
pk.keys,
pd.olddata,
pd.newdata
FROM dbmirror2.pending_data pd
JOIN dbmirror2.pending_keys pk
ON pk.tablename = pd.tablename
''')
row = cursor.fetchone()
while row:
xid, id, table, type, key_names, old_values, new_values = row
schema, table = parse_name(self._config, table)
keys = None
if type == 'd' or type == 'u':
keys = {key: old_values[key] for key in key_names}
transaction = self._transactions.setdefault(xid, [])
transaction.append((id, schema, table, type, keys, new_values))
row = cursor.fetchone()
for xid in sorted(self._transactions.keys()):
transaction = self._transactions[xid]
# print ' - Running transaction', xid
# print 'BEGIN; --', xid
for id, schema, table, type in sorted(transaction):
for id, schema, table, type, keys, values in sorted(transaction):
if schema == '<ignore>':
continue
if schema in self._ignored_schemas:
Expand All @@ -412,8 +394,6 @@ def process(self):
if fulltable not in stats:
stats[fulltable] = {'d': 0, 'u': 0, 'i': 0}
stats[fulltable][type] += 1
keys = self._data.get((id, True), {})
values = self._data.get((id, False), {})
if type == 'd':
sql = 'DELETE FROM %s' % (fulltable,)
params = []
Expand Down Expand Up @@ -445,6 +425,8 @@ def process(self):
for table in sorted(stats.keys()):
logger.info(' * %-30s\t%d\t%d\t%d' % (table, stats[table]['i'], stats[table]['u'], stats[table]['d']))
self._hook.before_commit()
cursor.execute('TRUNCATE dbmirror2.pending_data')
cursor.execute('TRUNCATE dbmirror2.pending_keys')
self._db.commit()
self._hook.after_commit()

Expand All @@ -467,15 +449,15 @@ def process_tar(fileobj: BytesIO, db, schema, ignored_schemas, ignored_tables, e
elif member.name == 'TIMESTAMP':
ts = member_file.read().strip().decode('utf8')
logger.info('Packet was produced at %s', ts)
elif member.name in ('mbdump/Pending', 'mbdump/dbmirror_pending'):
importer.load_pending(member_file)
elif member.name in ('mbdump/PendingData', 'mbdump/dbmirror_pendingdata'):
elif member.name == 'mbdump/pending_data':
importer.load_pending_data(member_file)
elif member.name == 'mbdump/pending_keys':
importer.load_pending_keys(member_file)
importer.process()


def get_packet_url(base_url, token, replication_seq):
url = base_url.rstrip("/") + "/replication-%d.tar.bz2" % replication_seq
url = base_url.rstrip("/") + "/replication-%d-v2.tar.bz2" % replication_seq
if token:
url += '?token=' + token
return url
Expand All @@ -486,7 +468,7 @@ class PacketNotFoundError(Exception):


def download_packet(base_url: str, token: str, replication_seq: int) -> BytesIO:
url = base_url.rstrip("/") + "/replication-%d.tar.bz2" % replication_seq
url = base_url.rstrip("/") + "/replication-%d-v2.tar.bz2" % replication_seq
if token:
url += '?token=' + token
url = get_packet_url(base_url, token, replication_seq)
Expand Down Expand Up @@ -608,7 +590,7 @@ def create_schemas(config: Config) -> None:


def run_script(script: str) -> None:
subprocess.run(['bash', '-euxc', script], check=True)
subprocess.run(script, check=True, shell=True)


def run_sql_script(name: str, superuser: bool = False) -> None:
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "mbslave"
version = "27.1.0"
version = "28.0.0"
description = "MusicBrainz Database Mirror"
readme = "README.rst"
authors = ["Lukáš Lalinský <[email protected]>"]
Expand Down
4 changes: 0 additions & 4 deletions settings.py.sample

This file was deleted.

0 comments on commit c6ddc97

Please sign in to comment.