Skip to content

Commit 2ab6eea

Browse files
committed
Replication inital push
1 parent c7fb234 commit 2ab6eea

File tree

17 files changed

+301
-150
lines changed

17 files changed

+301
-150
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ endif
2222
cp include/crossdb.h build/
2323

2424
debug:
25-
$(CC) -o build/libcrossdb.so -fPIC -lpthread -shared -g src/crossdb.c
25+
$(CC) -o build/libcrossdb.so -fPIC -lpthread -shared -g -DXDB_DEBUG src/crossdb.c
2626
$(CC) -o build/xdb-cli src/xdb-cli.c -lpthread -g
2727
cp include/crossdb.h build/
2828

src/core/xdb_common.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,12 @@ typedef struct {
194194

195195
static bool s_xdb_vdat[XDB_TYPE_MAX];
196196

197+
typedef enum {
198+
XDB_DUMP_XOID = (1<<0),
199+
XDB_DUMP_EXIST = (1<<1),
200+
XDB_DUMP_FULLNAME = (1<<2),
201+
} xdb_dump_flag_e;
202+
197203

198204
/******************************************************************************
199205
Types

src/core/xdb_crud.c

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2565,8 +2565,10 @@ xdb_dbrow_log (xdb_tblm_t *pTblm, uint32_t type, void *pNewRow, void *pOldRow, x
25652565
xdb_field_t **ppFields, *pField;
25662566
int count;
25672567

2568-
if (!pTblm->bLog || pTblm->pDbm->bSysDb) {
2569-
return XDB_OK;
2568+
if (0 == pTblm->sub_list.count) {
2569+
if (!pTblm->bLog || pTblm->pDbm->bSysDb) {
2570+
return XDB_OK;
2571+
}
25702572
}
25712573

25722574
uint8_t *pNullN = pNewRow + pTblm->pMeta->null_off;
@@ -2631,7 +2633,9 @@ xdb_dbrow_log (xdb_tblm_t *pTblm, uint32_t type, void *pNewRow, void *pOldRow, x
26312633
buf[len++] = '\0';
26322634
}
26332635

2634-
printf ("DBLOG %d: %s\n", len, buf);
2636+
if (xdb_unlikely (pTblm->bLog)) {
2637+
printf ("DBLOG %d: %s\n", len, buf);
2638+
}
26352639
#if (XDB_ENABLE_PUBSUB == 1)
26362640
xdb_pub_notify (pTblm, buf, len);
26372641
#endif

src/core/xdb_db.c

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -327,7 +327,8 @@ xdb_dump_db_schema (xdb_dbm_t *pDbm, const char *out)
327327
xdb_tblm_t *pTblm = XDB_OBJM_GET(pDbm->db_objm, i);
328328
if (NULL != pTblm) {
329329
char buf[65536];
330-
int schema_len = xdb_dump_create_table (pTblm, buf, sizeof(buf), 1);
330+
int schema_len = xdb_dump_create_table (pTblm, buf, sizeof(buf), XDB_DUMP_XOID);
331+
buf[schema_len++]='\n';
331332
int len = fwrite (buf, 1, schema_len, pFile);
332333
if (len != schema_len) {
333334
xdb_errlog ("Failed to write schema file\n");
@@ -438,10 +439,7 @@ xdb_dump_create_db (xdb_dbm_t *pDbm, char buf[], xdb_size size, uint32_t flags)
438439
{
439440
xdb_size len = 0;
440441

441-
if (!pDbm->bMemory) {
442-
len += sprintf (buf+len, "CREATE DATABASE %s;", XDB_OBJ_NAME(pDbm));
443-
} else {
444-
len += sprintf (buf+len, "CREATE DATABASE %s ENGINE=MEMORY;", XDB_OBJ_NAME(pDbm));
445-
}
442+
len += sprintf (buf+len, "CREATE DATABASE%s %s%s;", (flags & XDB_DUMP_EXIST) ? " IF NOT EXISTS" : "", XDB_OBJ_NAME(pDbm), pDbm->bMemory?" ENGINE=MEMORY":"");
443+
446444
return len;
447445
}

src/core/xdb_db.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ typedef struct xdb_dbm_t {
2828

2929
xdb_rwlock_t wal_lock;
3030
xdb_rwlock_t db_lock;
31+
32+
xdb_vec_t sub_list;
3133
} xdb_dbm_t;
3234

3335
typedef struct xdb_dbobj_t {

src/core/xdb_fkey.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ xdb_create_fkey (xdb_stmt_fkey_t *pStmt)
2020
{
2121
xdb_conn_t *pConn = pStmt->pConn;
2222

23-
xdb_dbglog ("create fkeys %s on %s %d", pStmt->fkey_name, pStmt->XDB_OBJ_NAME(pStmt->pTblm), pStmt->fld_count);
23+
xdb_dbglog ("create fkeys %s on %s %d", pStmt->fkey_name, XDB_OBJ_NAME(pStmt->pTblm), pStmt->fld_count);
2424

2525
int rc = -1;
2626
xdb_tblm_t *pTblm = pStmt->pTblm;

src/core/xdb_sql.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,8 +134,8 @@ xdb_stmt_exec (xdb_stmt_t *pStmt)
134134
rc = xdb_create_pub ((xdb_stmt_pub_t*)pStmt);
135135
break;
136136

137-
case XDB_STMT_CREATE_SUB:
138-
rc = xdb_create_sub ((xdb_stmt_sub_t*)pStmt);
137+
case XDB_STMT_CREATE_REPLICA:
138+
rc = xdb_create_replica ((xdb_stmt_replica_t*)pStmt);
139139
break;
140140

141141
case XDB_STMT_SUBSCRIBE:

src/core/xdb_table.c

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ xdb_create_table (xdb_stmt_tbl_t *pStmt)
6666
{
6767
int rc;
6868
xdb_conn_t *pConn = pStmt->pConn;
69-
xdb_dbm_t* pDbm = pConn->pCurDbm;
69+
xdb_dbm_t* pDbm = pStmt->pDbm;
7070
xdb_tblm_t *pTblm = NULL;
7171

7272
if (NULL != pStmt->pTblm) {
@@ -329,7 +329,10 @@ xdb_dump_create_table (xdb_tblm_t *pTblm, char buf[], xdb_size size, uint32_t fl
329329
{
330330
xdb_size len = 0;
331331

332-
len += sprintf (buf+len, "CREATE TABLE %s (\n", XDB_OBJ_NAME(pTblm));
332+
len += sprintf (buf+len, "CREATE TABLE%s %s%s%s (\n", (flags & XDB_DUMP_EXIST) ? " IF NOT EXISTS" : "",
333+
(flags & XDB_DUMP_FULLNAME) ? XDB_OBJ_NAME(pTblm->pDbm) : "",
334+
(flags & XDB_DUMP_FULLNAME) ? "." : "",
335+
XDB_OBJ_NAME(pTblm));
333336

334337
// dump field
335338
for (int i = 0; i < pTblm->fld_count; ++i) {
@@ -375,16 +378,11 @@ xdb_dump_create_table (xdb_tblm_t *pTblm, char buf[], xdb_size size, uint32_t fl
375378
if (pTblm->bMemory && !pTblm->pDbm->bMemory) {
376379
len += sprintf (buf+len, " ENGINE=MEMORY");
377380
}
378-
if (0 == flags) {
379-
len += sprintf (buf+len, ";\n");
380-
} else {
381-
len += sprintf (buf+len, " XOID=%d;\n", XDB_OBJ_ID(pTblm));
382-
}
383-
384-
if (0 == flags) {
385-
len --;
386-
buf[len] = '\0';
381+
if (flags&XDB_DUMP_XOID) {
382+
len += sprintf (buf+len, " XOID=%d", XDB_OBJ_ID(pTblm));
387383
}
384+
buf[len++] = ';';
385+
buf[len] = '\0';
388386

389387
return len;
390388
}

src/lib/xdb_vector.h

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,23 @@ typedef struct {
2121
int count;
2222
} xdb_vec_t;
2323

24+
static inline int
25+
xdb_vec_find (xdb_vec_t *pVec, void *pE)
26+
{
27+
for (int i = 0; i < pVec->count; ++i) {
28+
if (pVec->pEle[i] == pE) {
29+
return i;
30+
}
31+
}
32+
return -1;
33+
}
34+
2435
static inline bool
2536
xdb_vec_add (xdb_vec_t *pVec, void *pE)
2637
{
38+
if (xdb_vec_find (pVec, pE) >= 0) {
39+
return true;
40+
}
2741
if (xdb_unlikely (pVec->count >= pVec->cap)) {
2842
void *pEle = xdb_realloc (pVec->pEle, pVec->cap + 64*sizeof(void*));
2943
if (NULL == pEle) {

src/parser/xdb_parser.c

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -109,10 +109,6 @@ xdb_parse_create (xdb_conn_t* pConn, xdb_token_t *pTkn)
109109
#if (XDB_ENABLE_SERVER == 1)
110110
return xdb_parse_create_server (pConn, pTkn);
111111
#endif
112-
#if (XDB_ENABLE_PUBSUB == 1)
113-
} else if (!strcasecmp (pTkn->token, "SUBSCRIPTION")) {
114-
return xdb_parse_create_sub (pConn, pTkn);
115-
#endif
116112
}
117113
break;
118114
case 'U':
@@ -131,6 +127,14 @@ xdb_parse_create (xdb_conn_t* pConn, xdb_token_t *pTkn)
131127
}
132128
#endif
133129
break;
130+
case 'R':
131+
case 'r':
132+
#if (XDB_ENABLE_PUBSUB == 1)
133+
if (!strcasecmp (pTkn->token, "REPLICA")) {
134+
return xdb_parse_create_replica (pConn, pTkn);
135+
}
136+
#endif
137+
break;
134138
}
135139
}
136140

0 commit comments

Comments
 (0)