0%

如何让GreenDao查询数据返回LiveData

问题背景

当新业务模块引入JetPack体系后,数据获取变成单一数据源,要求底层数据库查询支持监听表变化,Room天然支持返回LiveData或Flow来实现单一数据源变化响应。

但是对于老代码,有的数据来源还是内存缓存,或者GreenDao存储的,在新的业务模块也需要读取一部分老的数据,这就需要让老的数据源也支持LiveData或Flow的返回模式。

内存缓存的方式很好实现LiveData机制,只要内存缓存有监听机制,转换为LiveData即可。

GreenDao由于官方就不支持数据表监听,没办法支持,所以只能自己实现。

如何实现GreenDao数据表变化监听?

既然Room支持监听表变化,那就参考Room源码实现思路,把这个功能也添加到GreenDao中。

Room源码实现思路

LiveData或Flow是是Dao对象返回的,所以从Dao对象的源码看起。

可以以architecture-components-samples的GithubBrowserSample为例分析。

对于UserDao,有一个findByLogin方法返回LiveData

1
2
3
4
5
6
7
8
9
  
@Dao
interface UserDao {
@Insert(onConflict = OnConflictStrategy.REPLACE)
fun insert(user: User)

@Query("SELECT * FROM user WHERE login = :login")
fun findByLogin(login: String): LiveData<User>
}

UserDao_ImplfindByLogin方法的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
@Override
public LiveData<User> findByLogin(String login) {
final String _sql = "SELECT * FROM user WHERE login = ?";
final RoomSQLiteQuery _statement = RoomSQLiteQuery.acquire(_sql, 1);
int _argIndex = 1;
if (login == null) {
_statement.bindNull(_argIndex);
} else {
_statement.bindString(_argIndex, login);
}
return new ComputableLiveData<User>(__db.getQueryExecutor()) {
private Observer _observer;

@Override
protected User compute() {
if (_observer == null) {
_observer = new Observer("user") {
@Override
public void onInvalidated(@NonNull Set<String> tables) {
invalidate();
}
};
__db.getInvalidationTracker().addWeakObserver(_observer);
}
final Cursor _cursor = DBUtil.query(__db, _statement, false);
try {
final int _cursorIndexOfLogin = _cursor.getColumnIndexOrThrow("login");
final int _cursorIndexOfAvatarUrl = _cursor.getColumnIndexOrThrow("avatarUrl");
final int _cursorIndexOfName = _cursor.getColumnIndexOrThrow("name");
final int _cursorIndexOfCompany = _cursor.getColumnIndexOrThrow("company");
final int _cursorIndexOfReposUrl = _cursor.getColumnIndexOrThrow("reposUrl");
final int _cursorIndexOfBlog = _cursor.getColumnIndexOrThrow("blog");
final User _result;
if(_cursor.moveToFirst()) {
final String _tmpLogin;
_tmpLogin = _cursor.getString(_cursorIndexOfLogin);
final String _tmpAvatarUrl;
_tmpAvatarUrl = _cursor.getString(_cursorIndexOfAvatarUrl);
final String _tmpName;
_tmpName = _cursor.getString(_cursorIndexOfName);
final String _tmpCompany;
_tmpCompany = _cursor.getString(_cursorIndexOfCompany);
final String _tmpReposUrl;
_tmpReposUrl = _cursor.getString(_cursorIndexOfReposUrl);
final String _tmpBlog;
_tmpBlog = _cursor.getString(_cursorIndexOfBlog);
_result = new User(_tmpLogin,_tmpAvatarUrl,_tmpName,_tmpCompany,_tmpReposUrl,_tmpBlog);
} else {
_result = null;
}
return _result;
} finally {
_cursor.close();
}
}

@Override
protected void finalize() {
_statement.release();
}
}.getLiveData();
}

findByLogin中创建了一个ComputableLiveData,返回其getLiveData()

ComputableLiveData是做什么的?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
public abstract class ComputableLiveData<T> {

private final LiveData<T> mLiveData;
......

/**
* Creates a computable live data which is computed when there are active observers.
* <p>
* It can also be invalidated via {@link #invalidate()} which will result in a call to
* {@link #compute()} if there are active observers (or when they start observing)
*/
@SuppressWarnings("WeakerAccess")
public ComputableLiveData() {
mLiveData = new LiveData<T>() {
@Override
protected void onActive() {
// TODO if we make this class public, we should accept an executor
AppToolkitTaskExecutor.getInstance().executeOnDiskIO(mRefreshRunnable);
}
};
}

/**
* Returns the LiveData managed by this class.
*
* @return A LiveData that is controlled by ComputableLiveData.
*/
@SuppressWarnings("WeakerAccess")
@NonNull
public LiveData<T> getLiveData() {
return mLiveData;
}

@VisibleForTesting
final Runnable mRefreshRunnable = new Runnable() {
@WorkerThread
@Override
public void run() {
boolean computed;
do {
computed = false;
// compute can happen only in 1 thread but no reason to lock others.
if (mComputing.compareAndSet(false, true)) {
// as long as it is invalid, keep computing.
try {
T value = null;
while (mInvalid.compareAndSet(true, false)) {
computed = true;
value = compute();
}
if (computed) {
mLiveData.postValue(value);
}
} finally {
// release compute lock
mComputing.set(false);
}
}
} while (computed && mInvalid.get());
}
};

// invalidation check always happens on the main thread
@VisibleForTesting
final Runnable mInvalidationRunnable = new Runnable() {
@MainThread
@Override
public void run() {
boolean isActive = mLiveData.hasActiveObservers();
if (mInvalid.compareAndSet(false, true)) {
if (isActive) {
// TODO if we make this class public, we should accept an executor.
AppToolkitTaskExecutor.getInstance().executeOnDiskIO(mRefreshRunnable);
}
}
}
};

/**
* Invalidates the LiveData.
* <p>
* When there are active observers, this will trigger a call to {@link #compute()}.
*/
public void invalidate() {
AppToolkitTaskExecutor.getInstance().executeOnMainThread(mInvalidationRunnable);
}

@SuppressWarnings("WeakerAccess")
@WorkerThread
protected abstract T compute();
}

通过源码可知:

  • ComputableLiveData是对LiveData的封装处理。
  • LiveData在onActive()时,用线程池调用mRefreshRunnable。
  • mRefreshRunnable中只要不在计算、并且要求invalidate,就调用compute()求得一个结果,然后发送给LiveData。
  • 单独调用invalidate() 也会触发mRefreshRunnable的运行。

UserDao_Impl.findByLogin()中的ComputableLiveData的compute()中做了什么?

这里主要做两件事:

  • 查询表数据,把Cursor中的数据映射为User对象。
  • 调用了__db.getInvalidationTracker().addWeakObserver(),添加了一个观察者,观察者收到通知后调用ComputableLiveData.invalidate()

调用ComputableLiveData.invalidate()后,其实就是会重新再查询表中数据。

也就是说__db.getInvalidationTracker().addWeakObserver()是实现表变化监听的关键。

__db.getInvalidationTracker()是什么?

  • __dbRoomDatabase对象。
  • getInvalidationTracker()获取的是InvalidationTracker对象。

InvalidationTracker.addWeakObserver()监听的是什么?

涉及到的源码挺多。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public void addWeakObserver(Observer observer) {
addObserver(new WeakObserver(this, observer));
}

public void addObserver(@NonNull Observer observer) {
final String[] tableNames = observer.mTables;
int[] tableIds = new int[tableNames.length];
final int size = tableNames.length;
long[] versions = new long[tableNames.length];

// TODO sync versions ?
for (int i = 0; i < size; i++) {
Integer tableId = mTableIdLookup.get(tableNames[i].toLowerCase(Locale.US));
if (tableId == null) {
throw new IllegalArgumentException("There is no table with name " + tableNames[i]);
}
tableIds[i] = tableId;
versions[i] = mMaxVersion;
}
ObserverWrapper wrapper = new ObserverWrapper(observer, tableIds, tableNames, versions);
ObserverWrapper currentObserver;
synchronized (mObserverMap) {
currentObserver = mObserverMap.putIfAbsent(observer, wrapper);
}
if (currentObserver == null && mObservedTableTracker.onAdded(tableIds)) {
syncTriggers();
}
}

static class ObservedTableTracker {
/**
* @return true if # of triggers is affected.
*/
boolean onAdded(int... tableIds) {
boolean needTriggerSync = false;
synchronized (this) {
for (int tableId : tableIds) {
final long prevObserverCount = mTableObservers[tableId];
mTableObservers[tableId] = prevObserverCount + 1;
if (prevObserverCount == 0) {
mNeedsSync = true;
needTriggerSync = true;
}
}
}
return needTriggerSync;
}
}

addWeakObserver()调用addObserver()

addObserver()主要做了:

  • 读取Observer构造时传递的要监听变化的表名。
  • 调用ObservedTableTracker.onAdded(tableIds)记录要监听的表名。
  • Observer存储到mObserverMap中。

主要就是保存观察者。

观察者什么时候被调用还不清楚,接着看看syncTriggers()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
/**
* Called by RoomDatabase before each beginTransaction call.
* <p>
* It is important that pending trigger changes are applied to the database before any query
* runs. Otherwise, we may miss some changes.
* <p>
* This api should eventually be public.
*/
void syncTriggers() {
if (!mDatabase.isOpen()) {
return;
}
syncTriggers(mDatabase.getOpenHelper().getWritableDatabase());
}

void syncTriggers(SupportSQLiteDatabase database) {
if (database.inTransaction()) {
// we won't run this inside another transaction.
return;
}
try {
// This method runs in a while loop because while changes are synced to db, another
// runnable may be skipped. If we cause it to skip, we need to do its work.
while (true) {
Lock closeLock = mDatabase.getCloseLock();
closeLock.lock();
try {
// there is a potential race condition where another mSyncTriggers runnable
// can start running right after we get the tables list to sync.
final int[] tablesToSync = mObservedTableTracker.getTablesToSync();
if (tablesToSync == null) {
return;
}
final int limit = tablesToSync.length;
try {
database.beginTransaction();
for (int tableId = 0; tableId < limit; tableId++) {
switch (tablesToSync[tableId]) {
case ObservedTableTracker.ADD:
startTrackingTable(database, tableId);
break;
case ObservedTableTracker.REMOVE:
stopTrackingTable(database, tableId);
break;
}
}
database.setTransactionSuccessful();
} finally {
database.endTransaction();
}
mObservedTableTracker.onSyncCompleted();
} finally {
closeLock.unlock();
}
}
} catch (IllegalStateException | SQLiteException exception) {
// may happen if db is closed. just log.
Log.e(Room.LOG_TAG, "Cannot run invalidation tracker. Is the db closed?",
exception);
}
}

private void startTrackingTable(SupportSQLiteDatabase writableDb, int tableId) {
final String tableName = mTableNames[tableId];
StringBuilder stringBuilder = new StringBuilder();
for (String trigger : TRIGGERS) {
stringBuilder.setLength(0);
stringBuilder.append("CREATE TEMP TRIGGER IF NOT EXISTS ");
appendTriggerName(stringBuilder, tableName, trigger);
stringBuilder.append(" AFTER ")
.append(trigger)
.append(" ON `")
.append(tableName)
.append("` BEGIN INSERT OR REPLACE INTO ")
.append(UPDATE_TABLE_NAME)
.append(" VALUES(null, ")
.append(tableId)
.append("); END");
writableDb.execSQL(stringBuilder.toString());
}
}
private static final String[] TRIGGERS = new String[]{"UPDATE", "DELETE", "INSERT"};

private static final String UPDATE_TABLE_NAME = "room_table_modification_log";

注释里说syncTriggers()在每个beginTransaction前会调用。

这里最关键的逻辑就是:

  • 读取之前记录过的要监听变化的表。
  • 调用startTrackingTable()进行监听。

startTrackingTable()做了什么?

创建了一个触发器,当要监听的表发生了UPDATE、DELETE、INSERT后,往room_table_modification_log这个表里插入一条数据,记录发生变化的表的id。

什么时候读取room_table_modification_log的记录?

查看源码其他地方可以发现,在RoomDatabase.endTransaction()中:

1
2
3
4
5
6
7
8
public void endTransaction() {
mOpenHelper.getWritableDatabase().endTransaction();
if (!inTransaction()) {
// enqueue refresh only if we are NOT in a transaction. Otherwise, wait for the last
// endTransaction call to do it.
mInvalidationTracker.refreshVersionsAsync();
}
}

会异步调用InvalidationTracker.refreshVersionsAsync()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84

public void refreshVersionsAsync() {
// TODO we should consider doing this sync instead of async.
if (mPendingRefresh.compareAndSet(false, true)) {
ArchTaskExecutor.getInstance().executeOnDiskIO(mRefreshRunnable);
}
}

Runnable mRefreshRunnable = new Runnable() {
@Override
public void run() {
final Lock closeLock = mDatabase.getCloseLock();
boolean hasUpdatedTable = false;
try {
closeLock.lock();

if (!ensureInitialization()) {
return;
}

if (!mPendingRefresh.compareAndSet(true, false)) {
// no pending refresh
return;
}

if (mDatabase.inTransaction()) {
// current thread is in a transaction. when it ends, it will invoke
// refreshRunnable again. mPendingRefresh is left as false on purpose
// so that the last transaction can flip it on again.
return;
}

mCleanupStatement.executeUpdateDelete();
mQueryArgs[0] = mMaxVersion;
if (mDatabase.mWriteAheadLoggingEnabled) {
// This transaction has to be on the underlying DB rather than the RoomDatabase
// in order to avoid a recursive loop after endTransaction.
SupportSQLiteDatabase db = mDatabase.getOpenHelper().getWritableDatabase();
try {
db.beginTransaction();
hasUpdatedTable = checkUpdatedTable();
db.setTransactionSuccessful();
} finally {
db.endTransaction();
}
} else {
hasUpdatedTable = checkUpdatedTable();
}
} catch (IllegalStateException | SQLiteException exception) {
// may happen if db is closed. just log.
Log.e(Room.LOG_TAG, "Cannot run invalidation tracker. Is the db closed?",
exception);
} finally {
closeLock.unlock();
}
if (hasUpdatedTable) {
synchronized (mObserverMap) {
for (Map.Entry<Observer, ObserverWrapper> entry : mObserverMap) {
entry.getValue().checkForInvalidation(mTableVersions);
}
}
}
}

private boolean checkUpdatedTable() {
boolean hasUpdatedTable = false;
Cursor cursor = mDatabase.query(SELECT_UPDATED_TABLES_SQL, mQueryArgs);
//noinspection TryFinallyCanBeTryWithResources
try {
while (cursor.moveToNext()) {
final long version = cursor.getLong(0);
final int tableId = cursor.getInt(1);

mTableVersions[tableId] = version;
hasUpdatedTable = true;
// result is ordered so we can safely do this assignment
mMaxVersion = version;
}
} finally {
cursor.close();
}
return hasUpdatedTable;
}
};

这里的关键是checkUpdatedTable()会去读取room_table_modification_log表中记录的发生数据变化的表id,以及表的版本,这个表的主键是自增的,所以每次插入同一个表id,主键都不一样,可以作为版本号。

接着会去调用ObserverWrapper.checkForInvalidation()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
static class ObserverWrapper {
final int[] mTableIds;
private final String[] mTableNames;
private final long[] mVersions;
final Observer mObserver;
private final Set<String> mSingleTableSet;

ObserverWrapper(Observer observer, int[] tableIds, String[] tableNames, long[] versions) {
mObserver = observer;
mTableIds = tableIds;
mTableNames = tableNames;
mVersions = versions;
if (tableIds.length == 1) {
ArraySet<String> set = new ArraySet<>();
set.add(mTableNames[0]);
mSingleTableSet = Collections.unmodifiableSet(set);
} else {
mSingleTableSet = null;
}
}

void checkForInvalidation(long[] versions) {
Set<String> invalidatedTables = null;
final int size = mTableIds.length;
for (int index = 0; index < size; index++) {
final int tableId = mTableIds[index];
final long newVersion = versions[tableId];
final long currentVersion = mVersions[index];
if (currentVersion < newVersion) {
mVersions[index] = newVersion;
if (size == 1) {
// Optimization for a single-table observer
invalidatedTables = mSingleTableSet;
} else {
if (invalidatedTables == null) {
invalidatedTables = new ArraySet<>(size);
}
invalidatedTables.add(mTableNames[index]);
}
}
}
if (invalidatedTables != null) {
mObserver.onInvalidated(invalidatedTables);
}
}
}

发现了表的版本号有变化,说明表数据是真的变化了,再调用mObserver.onInvalidated(invalidatedTables)

结论

这里就把所有逻辑串起来了,概括一下。

  • Dao对象查询方法中创建一个ComputableLiveData,并返回其内部的LiveData。
  • ComputableLiveData的compute()方法中:
    • 做数据库查询。
    • 在通过InvalidationTracker监听表变化,表发生变化了就调用ComputableLiveData.invalidate()重新调用compute(),重新查询数据库。

InvalidationTracker监听表数据变化原理:

  • 添加观察者时记录要监听的表。
  • 每次事务开启前,创建要监听的表的触发器。
    • 触发器监听到要创建的表发生了UPDATE或DELTE或INSERT,就往一个临时表记录这个表的id。
  • 每次事务结束时,查询临时表中发生数据变化的表id,通知观察者表数据变化了。

GreenDao如何实现LiveData监听数据表变化?

回到最开始的需求。

只需要把ComputableLiveData和InvalidationTracker的逻辑迁移出来即可。

在GreenDao的beginTransaction()创建触发器,endTransaction()中查询临时表结果通知观察者。