File: /var/www/web.enelar.com.co/node_modules/lmdb/src/misc.cpp
#include "lmdb-js.h"
#include <atomic>
#include <string.h>
#include <stdio.h>
#include <node_version.h>
#include <time.h>
using namespace Napi;
static thread_local char* globalUnsafePtr;
static thread_local size_t globalUnsafeSize;
void setupExportMisc(Napi::Env env, Object exports) {
Object versionObj = Object::New(env);
int major, minor, patch;
char *str = mdb_version(&major, &minor, &patch);
versionObj.Set("versionString", String::New(env, str));
versionObj.Set("major", Number::New(env, major));
versionObj.Set("minor", Number::New(env, minor));
versionObj.Set("patch", Number::New(env, patch));
#if ENABLE_V8_API
versionObj.Set("nodeCompiledVersion", Number::New(env, NODE_MAJOR_VERSION));
#endif
exports.Set("version", versionObj);
EXPORT_NAPI_FUNCTION("setGlobalBuffer", setGlobalBuffer)
EXPORT_NAPI_FUNCTION("lmdbError", lmdbError)
EXPORT_NAPI_FUNCTION("enableDirectV8", enableDirectV8)
EXPORT_NAPI_FUNCTION("createBufferForAddress", createBufferForAddress);
EXPORT_NAPI_FUNCTION("getAddress", getAddress);
EXPORT_NAPI_FUNCTION("getBufferAddress", getBufferAddress);
EXPORT_NAPI_FUNCTION("detachBuffer", detachBuffer);
EXPORT_NAPI_FUNCTION("startRead", startRead);
EXPORT_NAPI_FUNCTION("setReadCallback", setReadCallback);
EXPORT_NAPI_FUNCTION("enableThreadSafeCalls", enableThreadSafeCalls);
napi_value globalBuffer;
napi_create_buffer(env, SHARED_BUFFER_THRESHOLD, (void**) &globalUnsafePtr, &globalBuffer);
globalUnsafeSize = SHARED_BUFFER_THRESHOLD;
exports.Set("globalBuffer", Object(env, globalBuffer));
}
void setFlagFromValue(int *flags, int flag, const char *name, bool defaultValue, Object options) {
Value opt = options.Get(name);
if (opt.IsBoolean() ? opt.As<Boolean>().Value() : defaultValue)
*flags |= flag;
}
/*
Value valToStringUnsafe(MDB_val &data) {
auto resource = new CustomExternalOneByteStringResource(&data);
auto str = Nan::New<v8::String>(resource);
return str.ToLocalChecked();
}*/
Value valToUtf8(Env env, MDB_val &data) {
return String::New(env, (const char*) data.mv_data, data.mv_size);
}
Value valToString(Env env, MDB_val &data) {
// UTF-16 buffer
const uint16_t *buffer = reinterpret_cast<const uint16_t*>(data.mv_data);
// Number of UTF-16 code points
size_t n = data.mv_size / sizeof(uint16_t);
// Check zero termination
if (n < 1 || buffer[n - 1] != 0) {
return throwError(env, "Invalid zero-terminated UTF-16 string");
}
size_t length = n - 1;
return String::New(env, (const char16_t*)data.mv_data, length);
}
bool valToBinaryFast(MDB_val &data, DbiWrap* dw) {
Compression* compression = dw->compression;
if (compression) {
if (data.mv_data == compression->decompressTarget) {
// already decompressed to the target, nothing more to do
} else {
if (data.mv_size > compression->decompressSize) {
// indicates we could not copy, won't fit
return false;
}
// copy into the buffer target
memcpy(compression->decompressTarget, data.mv_data, data.mv_size);
}
} else {
if (data.mv_size > globalUnsafeSize) {
// indicates we could not copy, won't fit
return false;
}
memcpy(globalUnsafePtr, data.mv_data, data.mv_size);
}
return true;
}
Value valToBinaryUnsafe(MDB_val &data, DbiWrap* dw, Env env) {
valToBinaryFast(data, dw);
return Number::New(env, data.mv_size);
}
int getVersionAndUncompress(MDB_val &data, DbiWrap* dw) {
//fprintf(stdout, "uncompressing %u\n", compressionThreshold);
unsigned char* charData = (unsigned char*) data.mv_data;
if (dw->hasVersions) {
memcpy((dw->ew->keyBuffer + 16), charData, 8);
// fprintf(stderr, "getVersion %u\n", lastVersion);
charData = charData + 8;
data.mv_data = charData;
data.mv_size -= 8;
}
if (data.mv_size == 0) {
return 1;// successFunc(data);
}
unsigned char statusByte = (dw->compression && dw->compression->startingOffset < data.mv_size)
? charData[dw->compression->startingOffset] : 0;
//fprintf(stdout, "uncompressing status %X\n", statusByte);
if (statusByte >= 250) {
bool isValid;
dw->compression->decompress(data, isValid, !dw->getFast);
return isValid ? 2 : 0;
}
return 1;
}
NAPI_FUNCTION(lmdbError) {
ARGS(1)
int32_t error_code;
GET_INT32_ARG(error_code, 0);
return throwLmdbError(env, error_code);
}
NAPI_FUNCTION(setGlobalBuffer) {
ARGS(1)
napi_get_buffer_info(env, args[0], (void**) &globalUnsafePtr, &globalUnsafeSize);
RETURN_UNDEFINED
}
/*Value getBufferForAddress) {
char* address = (char*) (size_t) Nan::To<v8::Number>(info[0]).ToLocalChecked()->Value();
std::unique_ptr<v8::BackingStore> backing = v8::ArrayBuffer::NewBackingStore(
address, 0x100000000, [](void*, size_t, void*){}, nullptr);
auto array_buffer = v8::ArrayBuffer::New(Isolate::GetCurrent(), std::move(backing));
info.GetReturnValue().Set(array_buffer);
}*/
NAPI_FUNCTION(createBufferForAddress) {
ARGS(2)
GET_INT64_ARG(0)
void* data = (void*) i64;
uint32_t length;
GET_UINT32_ARG(length, 1);
napi_create_external_buffer(env, length, data, nullptr, nullptr, &returnValue);
return returnValue;
}
NAPI_FUNCTION(getAddress) {
ARGS(1)
void* data;
size_t length;
napi_get_arraybuffer_info(env, args[0], &data, &length);
napi_create_double(env, (double) (size_t) data, &returnValue);
return returnValue;
}
NAPI_FUNCTION(getBufferAddress) {
ARGS(1)
void* data;
size_t length;
napi_get_buffer_info(env, args[0], &data, &length);
napi_create_double(env, (double) (size_t) data, &returnValue);
return returnValue;
}
NAPI_FUNCTION(detachBuffer) {
ARGS(1)
#if (NAPI_VERSION > 6)
napi_detach_arraybuffer(env, args[0]);
#endif
RETURN_UNDEFINED;
}
class ReadWorker : public AsyncWorker {
public:
ReadWorker(uint32_t* start, const Function& callback)
: AsyncWorker(callback), start(start) {}
void Execute() {
uint32_t instruction;
uint32_t* gets = start;
while((instruction = std::atomic_exchange((std::atomic<uint32_t>*)(gets + 2), (uint32_t)0xf0000000))) {
MDB_val key;
key.mv_size = *(gets + 3);
MDB_dbi dbi = (MDB_dbi) (instruction & 0xffff);
MDB_val data;
MDB_txn* txn = (MDB_txn*) (size_t) *((double*)gets);
unsigned int flags;
mdb_dbi_flags(txn, dbi, &flags);
bool dupSort = flags & MDB_DUPSORT;
int effected = 0;
MDB_cursor *cursor;
int rc = mdb_cursor_open(txn, dbi, &cursor);
if (rc)
return SetError(mdb_strerror(rc));
key.mv_data = (void*) gets;
rc = mdb_cursor_get(cursor, &key, &data, MDB_SET_KEY);
MDB_env* env = mdb_txn_env(txn);
*(gets + 3) = data.mv_size;
*((double*)gets) = (double) (size_t) data.mv_data;
gets += (key.mv_size + 28) >> 2;
while (!rc) {
// access one byte from each of the pages to ensure they are in the OS cache,
// potentially triggering the hard page fault in this thread
int pages = (data.mv_size + 0xfff) >> 12;
// TODO: Adjust this for the page headers, I believe that makes the first page slightly less 4KB.
for (int i = 0; i < pages; i++) {
effected += *(((uint8_t*)data.mv_data) + (i << 12));
}
if (dupSort) // in dupsort databases, access the rest of the values
rc = mdb_cursor_get(cursor, &key, &data, MDB_NEXT_DUP);
else
rc = 1; // done
}
mdb_cursor_close(cursor);
}
}
void OnOK() {
// TODO: For each entry, find the shared buffer
uint32_t* gets = start;
// EnvWrap::toSharedBuffer();
Callback().Call({Env().Null()});
}
private:
uint32_t* start;
};
typedef struct { // this is read results data buffer that is actively being used by a JS thread (read results are written to it)
int id;
char* data;
uint32_t offset;
uint32_t size;
} read_results_buffer_t;
static thread_local std::unordered_map<void*, read_results_buffer_t*>* buffersByWorker;
typedef struct {
napi_ref callback;
napi_ref resource;
} read_callback_t;
static int next_buffer_id = -1;
typedef struct {
uint32_t* instructionAddress;
uint32_t callback_id;
napi_async_work work;
//napi_deferred deferred;
js_buffers_t* buffers;
} read_instruction_t;
const uint32_t ZERO = 0;
void do_read(napi_env nenv, void* instruction_pointer) {
read_instruction_t* readInstruction = (read_instruction_t*) instruction_pointer;
//fprintf(stderr, "lock %p\n", &readInstruction->buffers->modification_lock);
uint32_t* instruction = readInstruction->instructionAddress;
MDB_val key;
key.mv_size = *(instruction + 3);
MDB_dbi dbi = (MDB_dbi) (*(instruction + 2) & 0xffff) ;
MDB_val data;
TxnWrap* tw = (TxnWrap*) (size_t) *((double*)instruction);
MDB_txn* txn = tw->txn;
mdb_txn_renew(txn);
unsigned int flags;
mdb_dbi_flags(txn, dbi, &flags);
bool dupSort = flags & MDB_DUPSORT;
int effected = 0;
MDB_cursor *cursor;
MDB_env* env = mdb_txn_env(txn);
int rc = mdb_cursor_open(txn, dbi, &cursor);
if (rc) {
*instruction = rc;
return;
}
key.mv_data = (void*) (instruction + 4);
rc = mdb_cursor_get(cursor, &key, &data, MDB_SET_KEY);
*(instruction + 3) = data.mv_size;
//instruction += (key.mv_size + 28) >> 2;
while (!rc) {
// access one byte from each of the pages to ensure they are in the OS cache,
// potentially triggering the hard page fault in this thread
int pages = (data.mv_size + 0xfff) >> 12;
// TODO: Adjust this for the page headers, I believe that makes the first page slightly less than 4KB.
for (int i = 0; i < pages; i++) {
effected += *(((uint8_t*)data.mv_data) + (i << 12));
}
if (dupSort) // in dupsort databases, access the rest of the values
rc = mdb_cursor_get(cursor, &key, &data, MDB_NEXT_DUP);
else
rc = 1; // done
}
*instruction = rc;
unsigned int env_flags = 0;
mdb_env_get_flags(env, &env_flags);
if (data.mv_size > 4096
#ifdef MDB_RPAGE_CACHE
&& !(env_flags & MDB_REMAP_CHUNKS)
#endif
) {
EnvWrap::toSharedBuffer(env, instruction, data);
*((double*)instruction) = (double) (size_t) data.mv_data;
} else {
if (!buffersByWorker)
buffersByWorker = new std::unordered_map<void*, read_results_buffer_t*>;
read_results_buffer_t* read_buffer;
auto buffer_search = buffersByWorker->find(readInstruction->buffers);
if (buffer_search == buffersByWorker->end()) {
// create new one
buffersByWorker->emplace(readInstruction->buffers, read_buffer = new read_results_buffer_t);
read_buffer->size = 0;
read_buffer->offset = 0; // force it re-malloc
} else
read_buffer = buffer_search->second;
if ((int) read_buffer->size - (int) read_buffer->offset - 4 < (int) data.mv_size) {
size_t size = 0x40000;// 256KB
read_buffer->data = (char*) malloc(size);
read_buffer->size = size;
read_buffer->offset = 0;
buffer_info_t buffer_info;
buffer_info.end = read_buffer->data + size;
buffer_info.env = nullptr;
buffer_info.isSharedMap = false;
pthread_mutex_lock(&readInstruction->buffers->modification_lock);
buffer_info.id = read_buffer->id = readInstruction->buffers->nextId++;
readInstruction->buffers->buffers.emplace(read_buffer->data, buffer_info);
pthread_mutex_unlock(&readInstruction->buffers->modification_lock);
}
auto position = (uint32_t*) (read_buffer->data + read_buffer->offset);
memcpy(position, data.mv_data, data.mv_size);
position += (data.mv_size + 7) >> 2;
*(instruction + 1) = read_buffer->id;
*(instruction + 2) = read_buffer->offset;
read_buffer->offset = (char*)position - read_buffer->data;
}
mdb_cursor_close(cursor);
//fprintf(stderr, "unlock %p\n", &readInstruction->buffers->modification_lock);
}
static thread_local napi_ref* read_callback;
void read_complete(napi_env env, napi_status status, void* data) {
read_instruction_t* readInstruction = (read_instruction_t*) data;
napi_value callback;
napi_get_reference_value(env, *read_callback, &callback);
//uint32_t count;
napi_value result;
napi_value callback_id;
napi_create_int32(env, readInstruction->callback_id, &callback_id);
status = napi_call_function(env, callback, callback, 1, &callback_id, &result);
napi_delete_async_work(env, readInstruction->work);
delete readInstruction;
//napi_resolve_deferred(env, readInstruction->deferred, resolution);
}
NAPI_FUNCTION(enableThreadSafeCalls) {
WriteWorker::threadSafeCallsEnabled = true;
napi_value returnValue;
RETURN_UNDEFINED;
}
NAPI_FUNCTION(setReadCallback) {
ARGS(1)
read_callback = new napi_ref;
napi_create_reference(env, args[0], 1, read_callback);
RETURN_UNDEFINED;
}
NAPI_FUNCTION(startRead) {
ARGS(4)
GET_INT64_ARG(0);
uint32_t* instructionAddress = (uint32_t*) i64;
read_instruction_t* readInstruction = new read_instruction_t;
readInstruction->instructionAddress = instructionAddress;
uint32_t callback_id;
GET_UINT32_ARG(callback_id, 1);
//napi_create_reference(env, args[1], 1, &readInstruction->callback);
readInstruction->callback_id = callback_id;
readInstruction->buffers = EnvWrap::sharedBuffers;
napi_status status;
status = napi_create_async_work(env, args[2], args[3], do_read, read_complete, readInstruction, &readInstruction->work);
status = napi_queue_async_work(env, readInstruction->work);
RETURN_UNDEFINED;
}/*
NAPI_FUNCTION(nextRead) {
ARGS(1)
uint32_t offset;
GET_UINT32_ARG(offset, 0);
uint32_t* instructionAddress = (uint32_t*) currentReadAddress + offset;
read_callback_t* callback = lastCallback;
uint32_t count;
napi_reference_ref(callback->env, callback->callback, &count);
read_instruction_t* readInstruction = new read_instruction_t;
readInstruction->instructionAddress = instructionAddress;
readInstruction->callback = callback;
napi_async_work work;
napi_create_async_work(callback->env, nullptr, readInstruction, &work);
napi_queue_async_work(env, work);
ReadWorker* worker = new ReadWorker(instructionAddress, Function(env, args[1]));
worker->Queue();
RETURN_UNDEFINED;
}*/
Value lmdbNativeFunctions(const CallbackInfo& info) {
// no-op, just doing this to give a label to the native functions
return info.Env().Undefined();
}
Napi::Value throwLmdbError(Napi::Env env, int rc) {
if (rc < 0 && !(rc < -30700 && rc > -30800))
rc = -rc;
Error error = Error::New(env, mdb_strerror(rc));
error.Set("code", Number::New(env, rc));
error.ThrowAsJavaScriptException();
return env.Undefined();
}
Napi::Value throwError(Napi::Env env, const char* message) {
Error::New(env, message).ThrowAsJavaScriptException();
return env.Undefined();
}
const int ASSIGN_NEXT_TIMESTAMP = 0;
const int ASSIGN_LAST_TIMESTAMP = 1;
const int ASSIGN_NEXT_TIMESTAMP_AND_RECORD_PREVIOUS = 2;
const int ASSIGN_PREVIOUS_TIMESTAMP = 3;
int putWithVersion(MDB_txn * txn,
MDB_dbi dbi,
MDB_val * key,
MDB_val * data,
unsigned int flags, double version) {
// leave 8 header bytes available for version and copy in with reserved memory
char* source_data = (char*) data->mv_data;
int size = data->mv_size;
data->mv_size = size + 8;
int rc = mdb_put(txn, dbi, key, data, flags | MDB_RESERVE);
if (rc == 0) {
// if put is successful, data->mv_data will point into the database where we copy the data to
memcpy((char*) data->mv_data + 8, source_data, size);
memcpy(data->mv_data, &version, 8);
//*((double*) data->mv_data) = version; // this doesn't work on ARM v7 because it is not (guaranteed) memory-aligned
}
data->mv_data = source_data; // restore this so that if it points to data that needs to be freed, it points to the right place
return rc;
}
static uint64_t last_time; // actually encoded as double
#ifdef _WIN32
int pthread_mutex_init(pthread_mutex_t *mutex, pthread_mutexattr_t *attr)
{
(void)attr;
if (mutex == NULL)
return 1;
InitializeCriticalSection(mutex);
return 0;
}
int pthread_mutex_destroy(pthread_mutex_t *mutex)
{
if (mutex == NULL)
return 1;
DeleteCriticalSection(mutex);
return 0;
}
int pthread_mutex_lock(pthread_mutex_t *mutex)
{
if (mutex == NULL)
return 1;
EnterCriticalSection(mutex);
return 0;
}
int pthread_mutex_unlock(pthread_mutex_t *mutex)
{
if (mutex == NULL)
return 1;
LeaveCriticalSection(mutex);
return 0;
}
int cond_init(pthread_cond_t *cond)
{
if (cond == NULL)
return 1;
InitializeConditionVariable(cond);
return 0;
}
int pthread_cond_destroy(pthread_cond_t *cond)
{
/* Windows does not have a destroy for conditionals */
(void)cond;
return 0;
}
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)
{
if (cond == NULL || mutex == NULL)
return 1;
if (!SleepConditionVariableCS(cond, mutex, INFINITE))
return 1;
return 0;
}
int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex, uint64_t ms)
{
if (cond == NULL || mutex == NULL)
return 1;
if (!SleepConditionVariableCS(cond, mutex, ms))
return 1;
return 0;
}
int pthread_cond_signal(pthread_cond_t *cond)
{
if (cond == NULL)
return 1;
WakeConditionVariable(cond);
return 0;
}
int pthread_cond_broadcast(pthread_cond_t *cond)
{
if (cond == NULL)
return 1;
WakeAllConditionVariable(cond);
return 0;
}
uint64_t get_time64() {
return GetTickCount64();
}
// from: https://github.com/wadey/node-microtime/blob/master/src/microtime.cc#L19
uint64_t next_time_double() {
FILETIME ft;
GetSystemTimePreciseAsFileTime(&ft);
unsigned long long t = ft.dwHighDateTime;
t <<= 32;
t |= ft.dwLowDateTime;
t /= 10;
t -= 11644473600000000ULL;
double next_time = (double)t/ 1000;
return *((uint64_t*)&next_time);
}
#else
int cond_init(pthread_cond_t *cond) {
pthread_condattr_t attr;
pthread_condattr_init( &attr);
#if defined(__linux)
// only tested in linux, not available on macos
pthread_condattr_setclock( &attr, CLOCK_MONOTONIC);
#endif
return pthread_cond_init(cond, &attr);
}
int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex, uint64_t cms)
{
struct timespec ts;
#if defined(__linux)
clock_gettime(CLOCK_MONOTONIC, &ts);
#else
// without being able to set the clock for condition/mutexes, need to use default realtime clock on macos
clock_gettime(CLOCK_REALTIME, &ts);
#endif
uint64_t ns = ts.tv_nsec + cms * 10000;
ts.tv_sec += ns / 1000000000;
ts.tv_nsec = ns % 1000000000;
return pthread_cond_timedwait(cond, mutex, &ts);
}
uint64_t get_time64() {
struct timespec time;
clock_gettime(CLOCK_MONOTONIC, &time);
return time.tv_sec * 1000000000ll + time.tv_nsec;
}
uint64_t next_time_double() {
struct timespec time;
clock_gettime(CLOCK_REALTIME, &time);
double next_time = (double)time.tv_sec * 1000 + (double)time.tv_nsec / 1000000;
return *((uint64_t*)&next_time);
}
#endif
// This file contains code from the node-lmdb project
// Copyright (c) 2013-2017 Timur Kristóf
// Copyright (c) 2021 Kristopher Tate
// Licensed to you under the terms of the MIT license
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.