DMTN-322
Design of APDB replication#
Abstract
This technical note describes requirements for APDB (Alert Production Database) replication, related design decision, and details of current implementation.
APDB overview#
APDB keeps the results of Difference Image Analysis parformed as a part of the Alert Production (AP) pipeline. There are three major types of data stored in APDB - DIAObject, DIASource, and DIAForcedSource. The AP pipeline reads existing data from APDB and writes newly generated records back into it. Since the pipeline works on individual detector-size images, all APDB queries are limited to the region of the sky covered by a single detector.
The production APDB instance uses Apache Cassandra database as a backend.
To ensure adequate query performance all three types of tables are spatially partitioned using MQ3C pixelization.
DiaSource
and DiaForcedSource
tables are also partitioned temporaly, as their queries are also constrained to a 12 months history.
The default time partitioning granularity is 30 days.
An additional DiaObjectLast
table serves as a partial index for the latest versions of DIAObject records, duplicating a subset of data in DiaObject
table.
This data model is optimized for queries generated by AP pipeline, but it is not efficient for other query types.
Replication to PPDB#
The Prompt Products Database (PPDB) will contain a copy of the APDB data accessible to users other than AP pipeline. PPDB is supposed to handle almost arbitrary ADQL queries via TAP service.
Data from APDB should be replicated to PPDB regularly, optimally with a minimal possible delay so that day-time processing can start early. Several strategies for replication were discussed (2021-09-21 PPDB Tag Up) with a consensus that APDB should be the only source of data for replication.
Replication process will run continuosly, checking APDB for new data and moving that new data to PPDB. With that approach the queries on APDB are inherently temporally constrained (“find new records added since a previous replication time”). However, such queries are not supported by the data model of the standard APDB tables which are spatially partitioned. To support efficient temporal queries APDB must maintain a copy of the data using a different, temporally partitioned data model.
Temporal partitioning#
For efficient temporal queries the data need to be partitioned temporally and the queries should involve a reasonably small number of partitions. The granularity of time partitioning should be balanced so that single partition covers a significant time period but also allows reasonable replication cadence. A reasonable initial guess for temporal granularity could be between few minutes and one hour, APDB uses 10 minutes as a default. The granularity of the partitions should not be fixed once and for all, it should be configurable.
Since multiple AP processes genrate their data with aprroximately the same timestamp, replication data from all processes falls into the same time partition. This is not optimal as it causes all data be sent to a single Cassandra host, causing high load for single node. To spread that load across the whole cluster the time partition needs to be further sub-divided using some non-temporal property. Range of values for that property needs to be reasonable low to avoid creating very many partitions. One option for such value could be a random number generated separately on each of the clients.
Replication chunks#
For efficiency reason it is preferred to copy one temporal partition (with all its sub-partitions) as a whole. We call data in all tables in the same temporal partition a replication chunk.
The chunks are identified by the start time of the time partition expressed as a seconds since epoch.
All writers calculate time partition using the same algorithm and parameters ((sec_since_epoch // period) * period
).
As a result all writers will store their data in the chunks with the increasing chunk ID (e.g. 1752282600, 1752283200, 1752283800, and so on).
Every write operation creates or updates chunk record in a special table ApdbReplicaChunks
.
The records in this tables have three fields:
apdb_replica_chunk
- chunk ID,last_update_time
- last time when this chunk was updated,unique_id
- UUID value which is updated on every write operation.
The last field is used to detect possible synchronization issues during replication.
Writer synchronization#
The main reason for above scheme is to avoid synchronization between clients when switching from one chunk to another.
More specifically, this scheme is based on local clock of each write node and it relies on clock syncronization with a resonable precision.
It also depends on a fact that writes to all three APDB tables happen in one operation with a single chunk ID, so that data in the same chunk are consistent.
Time used for calculating partition and chunk ID corresponds to AP processing time (even though it is called visit_time
in APDB API).
Replication decision#
The code that does replication will copy APDB chunks that are ready to be copied to PPDB or some intermediate representation. The chunks have to be ingested into PPDB in the time order, which is the same as order of their chunk IDs, to guarantee consistent view of PPDB. An APDB chunk is ready to copy when it is possible to say that there will be no more updates to that chunk. It is possible to guess that a chunk will not receive new updates when:
a later chunk exists in
ApdbReplicaChunks
and a reasonable time (few minutes) passed since that later chunk was created,or if there is no later chunk, but enogh time passed since this chunk was created, that is longer than the partition length plus extra few minutes.
Replication mechanism is supposed work along the following lines:
It will need to track chunks that were already replicated, and to have a table similar to
ApdbReplicaChunks
for those replicated chunks (usually in PPDB or a separate database).It will periodically query
ApdbReplicaChunks
for chunks known to APDB and also query the list of replicated chunks.For replicated chunks if will compare
unique_id
of a replicated chunk withunique_id
of APDB chunk. If they are different it means that APDB chunk was updated after it was replicated. This is a problematic situation, depending on PPDB backend it may be possible to update existing data in PPDB, but this needs further research.Find earliest APDB chunk that was not yet replicated, query APDB for all data in that chunk, transfer it to PPDB or some external representation, and record chunk information in the replicated chunks table.
Chunks cleanup#
The number of existing chunks will grow with time and they can be cleared to free storage. APDB provides interface for dropping individual chunks implemented as deletion of whole partitions. This operation is efficient but it can trigger significant compaction activity on Cassandra side. Alternatively we can consider an option of truncating all replica tables after making sure that all replica chunks were safely transferred, as truncation is a much cheaper operation in Cassandra.
Unsolved issues#
Current APDB implementation only supports insertion into three APDB tables and writing replication chunks for them. APDB will also have to support in-place updates for existing records, e.g. re-associating DIASources. The exact mechanism for replicating that sort of updates is still under development (DM-50190).